Skip to main content

Python SDK

The ADOS Drone Agent includes a Python scripting SDK. You can write scripts that connect to the agent, read live telemetry, send MAVLink commands, and implement autonomous behaviors. Scripts run directly on the companion computer with full access to the agent’s services.

Script location

Place your Python scripts in /var/ados/scripts/ on the drone. The agent watches this directory and makes scripts available through the CLI, TUI, REST API, and Mission Control.

Running scripts

From the CLI

ados run /var/ados/scripts/my_mission.py

From the REST API

curl -X POST http://localhost:8080/api/scripts/run \
  -H "X-ADOS-Key: $KEY" \
  -H "Content-Type: application/json" \
  -d '{"path": "/var/ados/scripts/my_mission.py"}'

From Mission Control

Open the Command tab, go to the Scripts sub-page, and click “Run” on any script.

Script structure

A basic script connects to the agent, reads telemetry, and sends commands:
"""Simple takeoff and land script."""

import time
from ados.scripting import Agent

# Connect to the local agent
agent = Agent()
agent.connect()

# Wait for FC connection
agent.wait_for_fc(timeout=10)

# Read current state
state = agent.get_state()
print(f"Battery: {state.battery.voltage}V")
print(f"GPS: {state.gps.fix_type} ({state.gps.satellites} sats)")

# Arm and takeoff
agent.arm()
agent.takeoff(altitude=10)  # meters

# Wait at altitude
time.sleep(5)

# Land
agent.land()

# Disarm
agent.wait_for_disarmed(timeout=30)
agent.disconnect()

Agent class

The Agent class is the main entry point for scripting:
from ados.scripting import Agent

agent = Agent(
    host="localhost",   # Agent REST API host
    port=8080,          # Agent REST API port
    api_key=None,       # Read from /etc/ados/pairing.json if None
)

Connection methods

MethodDescription
agent.connect()Connect to the agent API
agent.disconnect()Clean disconnect
agent.wait_for_fc(timeout=10)Block until FC is connected

Telemetry

MethodReturns
agent.get_state()Full telemetry snapshot (attitude, position, battery, GPS)
agent.get_attitude()Roll, pitch, yaw in degrees
agent.get_position()Latitude, longitude, altitude
agent.get_battery()Voltage, current, remaining percent
agent.get_gps()Fix type, satellites, HDOP

Commands

MethodDescription
agent.arm()Arm the motors
agent.disarm()Disarm the motors
agent.takeoff(altitude)Take off to a target altitude (meters)
agent.land()Land at current position
agent.set_mode(mode)Set flight mode (e.g., “STABILIZE”, “GUIDED”, “AUTO”)
agent.goto(lat, lon, alt)Fly to a GPS coordinate
agent.set_velocity(vx, vy, vz)Set velocity in m/s (body frame)
agent.set_yaw(heading)Set target heading in degrees

Safety validator

Every command passes through a safety validator before reaching the flight controller. The validator checks:
  • Is the drone armed? (blocks arm if pre-arm checks fail)
  • Is GPS fix adequate? (blocks takeoff without 3D fix)
  • Is battery sufficient? (blocks takeoff below configurable threshold)
  • Is the command within geofence? (if a geofence is active)
  • Is the altitude within limits?
If a command fails validation, it raises a SafetyError exception with a human-readable message:
from ados.scripting import Agent, SafetyError

agent = Agent()
agent.connect()

try:
    agent.arm()
except SafetyError as e:
    print(f"Cannot arm: {e}")
    # e.g. "Pre-arm check failed: GPS not locked"

Telemetry streaming

For continuous telemetry, use the state socket directly:
import json
import socket

sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect("/run/ados/state.sock")

while True:
    data = sock.recv(4096)
    if data:
        state = json.loads(data.decode())
        print(f"Alt: {state['position']['alt_agl']:.1f}m  "
              f"Speed: {state['speed']['groundspeed']:.1f}m/s")
This gives you 10 Hz telemetry updates without HTTP overhead. For low-level MAVLink operations, connect to the binary socket:
import struct
import socket

sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect("/run/ados/mavlink.sock")

while True:
    # Read 4-byte length prefix (little-endian)
    length_bytes = sock.recv(4)
    if len(length_bytes) < 4:
        break
    length = struct.unpack('<I', length_bytes)[0]

    # Read the MAVLink frame
    frame = sock.recv(length)
    # Parse with pymavlink or your own decoder

Example: simple survey

A script that flies a grid pattern for aerial survey:
"""Fly a 4-waypoint grid pattern."""

import time
from ados.scripting import Agent

agent = Agent()
agent.connect()
agent.wait_for_fc(timeout=10)

# Define waypoints (lat, lon, alt)
waypoints = [
    (12.9716, 77.5946, 50),
    (12.9720, 77.5946, 50),
    (12.9720, 77.5950, 50),
    (12.9716, 77.5950, 50),
]

agent.arm()
agent.takeoff(altitude=50)
time.sleep(5)

agent.set_mode("GUIDED")

for i, (lat, lon, alt) in enumerate(waypoints):
    print(f"Flying to waypoint {i+1}/{len(waypoints)}")
    agent.goto(lat, lon, alt)
    agent.wait_for_arrival(tolerance=2.0, timeout=60)
    print("Arrived. Capturing photo.")
    agent.trigger_camera()
    time.sleep(1)

agent.land()
agent.wait_for_disarmed(timeout=30)
agent.disconnect()
print("Survey complete.")

Concurrent script limits

The agent limits concurrent scripts to prevent resource exhaustion:
scripting:
  scripts:
    max_concurrent: 3
If you try to start a fourth script while three are running, the API returns a 429 error.

Listing and stopping scripts

# List running scripts
ados scripts

# Stop a script (via API)
curl -X POST http://localhost:8080/api/scripts/stop \
  -H "X-ADOS-Key: $KEY" \
  -H "Content-Type: application/json" \
  -d '{"script_id": "abc123"}'
Scripts that crash or hang are automatically reaped by the scripting service. If a script does not exit within the configured timeout, it is killed with SIGTERM, then SIGKILL.