Python SDK
The ADOS Drone Agent includes a Python scripting SDK. You can write scripts that connect to the agent, read live telemetry, send MAVLink commands, and implement autonomous behaviors. Scripts run directly on the companion computer with full access to the agent’s services.
Script location
Place your Python scripts in /var/ados/scripts/ on the drone. The agent watches this directory and makes scripts available through the CLI, TUI, REST API, and Mission Control.
Running scripts
From the CLI
ados run /var/ados/scripts/my_mission.py
From the REST API
curl -X POST http://localhost:8080/api/scripts/run \
-H "X-ADOS-Key: $KEY" \
-H "Content-Type: application/json" \
-d '{"path": "/var/ados/scripts/my_mission.py"}'
From Mission Control
Open the Command tab, go to the Scripts sub-page, and click “Run” on any script.
Script structure
A basic script connects to the agent, reads telemetry, and sends commands:
"""Simple takeoff and land script."""
import time
from ados.scripting import Agent
# Connect to the local agent
agent = Agent()
agent.connect()
# Wait for FC connection
agent.wait_for_fc(timeout=10)
# Read current state
state = agent.get_state()
print(f"Battery: {state.battery.voltage}V")
print(f"GPS: {state.gps.fix_type} ({state.gps.satellites} sats)")
# Arm and takeoff
agent.arm()
agent.takeoff(altitude=10) # meters
# Wait at altitude
time.sleep(5)
# Land
agent.land()
# Disarm
agent.wait_for_disarmed(timeout=30)
agent.disconnect()
Agent class
The Agent class is the main entry point for scripting:
from ados.scripting import Agent
agent = Agent(
host="localhost", # Agent REST API host
port=8080, # Agent REST API port
api_key=None, # Read from /etc/ados/pairing.json if None
)
Connection methods
| Method | Description |
|---|
agent.connect() | Connect to the agent API |
agent.disconnect() | Clean disconnect |
agent.wait_for_fc(timeout=10) | Block until FC is connected |
Telemetry
| Method | Returns |
|---|
agent.get_state() | Full telemetry snapshot (attitude, position, battery, GPS) |
agent.get_attitude() | Roll, pitch, yaw in degrees |
agent.get_position() | Latitude, longitude, altitude |
agent.get_battery() | Voltage, current, remaining percent |
agent.get_gps() | Fix type, satellites, HDOP |
Commands
| Method | Description |
|---|
agent.arm() | Arm the motors |
agent.disarm() | Disarm the motors |
agent.takeoff(altitude) | Take off to a target altitude (meters) |
agent.land() | Land at current position |
agent.set_mode(mode) | Set flight mode (e.g., “STABILIZE”, “GUIDED”, “AUTO”) |
agent.goto(lat, lon, alt) | Fly to a GPS coordinate |
agent.set_velocity(vx, vy, vz) | Set velocity in m/s (body frame) |
agent.set_yaw(heading) | Set target heading in degrees |
Safety validator
Every command passes through a safety validator before reaching the flight controller. The validator checks:
- Is the drone armed? (blocks arm if pre-arm checks fail)
- Is GPS fix adequate? (blocks takeoff without 3D fix)
- Is battery sufficient? (blocks takeoff below configurable threshold)
- Is the command within geofence? (if a geofence is active)
- Is the altitude within limits?
If a command fails validation, it raises a SafetyError exception with a human-readable message:
from ados.scripting import Agent, SafetyError
agent = Agent()
agent.connect()
try:
agent.arm()
except SafetyError as e:
print(f"Cannot arm: {e}")
# e.g. "Pre-arm check failed: GPS not locked"
Telemetry streaming
For continuous telemetry, use the state socket directly:
import json
import socket
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect("/run/ados/state.sock")
while True:
data = sock.recv(4096)
if data:
state = json.loads(data.decode())
print(f"Alt: {state['position']['alt_agl']:.1f}m "
f"Speed: {state['speed']['groundspeed']:.1f}m/s")
This gives you 10 Hz telemetry updates without HTTP overhead.
Raw MAVLink access
For low-level MAVLink operations, connect to the binary socket:
import struct
import socket
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect("/run/ados/mavlink.sock")
while True:
# Read 4-byte length prefix (little-endian)
length_bytes = sock.recv(4)
if len(length_bytes) < 4:
break
length = struct.unpack('<I', length_bytes)[0]
# Read the MAVLink frame
frame = sock.recv(length)
# Parse with pymavlink or your own decoder
Example: simple survey
A script that flies a grid pattern for aerial survey:
"""Fly a 4-waypoint grid pattern."""
import time
from ados.scripting import Agent
agent = Agent()
agent.connect()
agent.wait_for_fc(timeout=10)
# Define waypoints (lat, lon, alt)
waypoints = [
(12.9716, 77.5946, 50),
(12.9720, 77.5946, 50),
(12.9720, 77.5950, 50),
(12.9716, 77.5950, 50),
]
agent.arm()
agent.takeoff(altitude=50)
time.sleep(5)
agent.set_mode("GUIDED")
for i, (lat, lon, alt) in enumerate(waypoints):
print(f"Flying to waypoint {i+1}/{len(waypoints)}")
agent.goto(lat, lon, alt)
agent.wait_for_arrival(tolerance=2.0, timeout=60)
print("Arrived. Capturing photo.")
agent.trigger_camera()
time.sleep(1)
agent.land()
agent.wait_for_disarmed(timeout=30)
agent.disconnect()
print("Survey complete.")
Concurrent script limits
The agent limits concurrent scripts to prevent resource exhaustion:
scripting:
scripts:
max_concurrent: 3
If you try to start a fourth script while three are running, the API returns a 429 error.
Listing and stopping scripts
# List running scripts
ados scripts
# Stop a script (via API)
curl -X POST http://localhost:8080/api/scripts/stop \
-H "X-ADOS-Key: $KEY" \
-H "Content-Type: application/json" \
-d '{"script_id": "abc123"}'
Scripts that crash or hang are automatically reaped by the scripting service. If a script does not exit within the configured timeout, it is killed with SIGTERM, then SIGKILL.