# ORCHESTRATOR_IMPLEMENTATION.md — SatSim Orchestrator (Python) Detailed Plan This document is a task-driven implementation plan for the **SatSim Orchestrator**. It is intended to be handed to an LLM to implement the orchestrator in Python. It focuses on **architecture, module boundaries, data flow, and concrete tasks** (not deep RPC wire details). The orchestrator is responsible for: - Loading a scenario - Starting and coordinating subcomponents (Geo/RF engine, OMNeT++ lane, Mininet lane) - Driving a unified timebase - Fanning out LinkState/Event updates to lane adapters - Recording reproducible artifacts (manifest, logs, optional traces, metrics) --- ## Decision lock (2026-02-18) These ambiguities are now resolved and should be treated as fixed v1 design: - **Authoritative tick source**: `StreamLinkDeltas` is the only control-plane tick source. Lanes apply link state from deltas, not from events. - **Event stream alignment (Option B accepted)**: evolve Geo/RF event API so `StreamEventsRequest` carries `dt` + `selector`, and `EngineEvent` carries `tick_index`, aligned to the same tick grid as deltas. - **Error contract handling**: orchestrator must handle `NOT_FOUND`, `INVALID_ARGUMENT`, `FAILED_PRECONDITION`, and `RESOURCE_EXHAUSTED` as first-class engine responses. - **Scenario translation strictness**: translation to Geo/RF `ScenarioSpec` is fail-fast and must satisfy all required engine fields/constraints. - **Tooling policy**: Python workflows use `uv` (`uv run`, `uv add`); do not rely on `pip`. --- ## 1) Repository layout (recommended) ``` satsim/ orchestrator/ pyproject.toml README.md ORCHESTRATOR_IMPLEMENTATION.md ``` satsim_orch/ __init__.py cli.py main.py config/ __init__.py schema.py loader.py defaults.py normalize.py runtime/ __init__.py run_manager.py manifest.py artifact_store.py logging.py versioning.py process.py timebase/ __init__.py clock.py modes.py scheduler.py bus/ __init__.py messages.py queues.py fanout.py geomrf/ __init__.py client.py translate.py health.py lanes/ __init__.py base.py registry.py mininet_lane/ __init__.py adapter.py topo.py shaping.py controller.py capture.py omnet_lane/ __init__.py adapter.py trace_ingest.py runner.py metrics/ __init__.py prom.py records.py exporters.py util/ __init__.py ids.py units.py asyncx.py errors.py tests/ test_config_validation.py test_timebase_scheduler.py test_bus_fanout.py test_run_manifest.py test_lane_adapter_contract.py test_geomrf_client_smoke.py ``` subprojects/ geomrf-engine/ # separate project; orchestrator consumes it via gRPC lanes/ omnet/ mininet/ observability/ artifacts/ ``` --- ## 2) Orchestrator design summary (targets) ### 2.1 Orchestrator responsibilities - Scenario loading & validation - Run directory + manifest creation - Geo/RF engine lifecycle (create scenario; start streams; close) - Lane lifecycle: - `prepare()` (build topology / start processes) - `apply_tick()` (apply link deltas / events) - `finalize()` (stop processes, collect outputs) - Unified runtime pacing: - offline apply-fast - real-time apply-paced (wall-clock aligned) - parallel lane fanout (same incoming ticks feed multiple lanes) - Artifact collection: - config snapshot, manifest - optional LinkState trace logging - metrics export - PCAP capture (Mininet lane) ### 2.2 Key architectural choices - Python 3.11+ with **asyncio** - gRPC async client (`grpc.aio`) for Geo/RF streaming - In-process **async fanout bus** using bounded queues (v1) - Pluggable lane adapters via a registry - Everything stamped with versions/seeds for reproducibility --- ## 3) Implementation checklist (extremely detailed) ## 3.1 Project bootstrap and build - [x] Create `orchestrator/pyproject.toml` - [x] Define package name (e.g., `satsim-orchestrator`) - [x] Set Python version (>=3.11) - [x] Add dependencies: - [x] `pydantic` - [x] `pyyaml` - [x] `grpcio`, `grpcio-tools`, `protobuf` - [x] `rich` (optional, for CLI UX) - [x] `prometheus-client` (optional) - [x] `aiofiles` (optional, async file writes) - [x] Add dev dependencies: - [x] `pytest`, `pytest-asyncio` - [x] `ruff` / `black` - [x] `mypy` (optional) - [x] Add task runner and `uv` commands: - [x] `uv run pytest` - [x] `uv run ruff check .` - [x] `uv run python -m satsim_orch.cli run ...` ## 3.2 CLI and entrypoints - [x] Implement `satsim_orch/cli.py` with commands: - [x] `run --mode {omnet|mininet|parallel} --dt 1s --t0 ... --t1 ...` - [x] `validate ` - [x] `list-runs` - [x] `show-run ` - [x] Implement `satsim_orch/main.py` - [x] Parse CLI args - [x] Load scenario - [x] Create `RunContext` - [x] Run orchestrator loop - [x] Define exit codes and error messages: - [x] Invalid config → exit 2 - [x] Missing dependency/lane binary → exit 3 - [x] Runtime error → exit 1 --- ## 4) Configuration system ### 4.1 Scenario schema (Pydantic) - [x] Implement `config/schema.py` with a canonical `ScenarioConfig` - [x] Global: - [x] `name` - [x] `seed` - [x] `time: {t0, t1, dt, mode}` - [x] `execution: {lane_mode, strict_reproducible, record_trace, record_pcap}` - [x] `paths: {artifacts_root}` - [x] Geo/RF engine connection: - [x] `geomrf: {grpc_target, request_dt, selector_defaults, thresholds_defaults}` - [x] Geo/RF scenario payload: - [x] `geomrf.scenario_spec` maps 1:1 to Geo/RF `ScenarioSpec` required fields (`nodes`, `terminal`, orbit/site, link/adaptation policy) - [x] optional high-level shorthand may exist, but must compile deterministically to valid `ScenarioSpec` - [x] Lane configs: - [x] `mininet: {controller: {type, addr}, topo: {...}, shaping: {...}}` - [x] `omnet: {project_path, ini_path, run_args, trace_mode}` - [x] Add validators: - [x] `t0 < t1` - [x] `dt > 0` - [x] `seed >= 0` - [x] lane configs exist for chosen mode - [x] fail-fast if engine-required scenario fields are missing/invalid - [x] fail-fast if `request_dt` is outside engine capabilities (`min_dt`, `max_dt`) - [x] if `mode=mininet` require Linux + OVS checks (soft validate with warnings) - [x] Add defaulting rules in `config/defaults.py` - [x] dt default (e.g., 1s) - [x] thresholds default (delay/capacity/loss) - [x] artifacts root default `./artifacts/runs` ### 4.2 Loader and normalization - [x] Implement `config/loader.py` - [x] load YAML/JSON - [x] environment variable expansion (optional) - [x] include/merge support (optional) - [x] Implement `config/normalize.py` - [x] produce a normalized config (canonical types, timezone normalization) - [x] compute derived fields (run duration, tick count) - [x] Implement `config/normalize.py` to build: - [x] `GeomrfScenarioSpec` (engine-facing) from `ScenarioConfig` - [x] `LaneScenarioSpec` (lane-facing) from `ScenarioConfig` --- ## 5) Run manager and artifacts ### 5.1 Run context and directory structure - [x] Implement `runtime/run_manager.py` - [x] Generate `run_id` (timestamp + short random, or UUID) - [x] Create run directory: - [x] `artifacts/runs//` - [x] `logs/`, `metrics/`, `pcaps/`, `traces/`, `manifests/` - [x] Save copies of: - [x] raw scenario file - [x] normalized scenario JSON - [x] Implement `runtime/manifest.py` - [x] manifest fields: - [x] run_id, scenario name, timestamps - [x] seeds - [x] component versions (orchestrator, geomrf engine, lanes) - [x] execution mode, dt, tick count - [x] git SHAs if available - [x] host info (OS, python version) (optional) - [x] Implement `runtime/versioning.py` - [x] orchestrator version string - [x] best-effort git SHA discovery ### 5.2 Logging - [x] Implement `runtime/logging.py` - [x] structured JSON logs to file - [x] human-readable console logs - [x] include `run_id` and correlation IDs - [x] Implement log rotation policy (optional) - [x] Implement `util/errors.py` with typed exceptions: - [x] `ScenarioError`, `GeomrfError`, `LaneError`, `TimebaseError` ### 5.3 Artifact store helpers - [x] Implement `runtime/artifact_store.py` - [x] `write_text(path, text)` - [x] `write_json(path, obj)` - [x] `append_jsonl(path, obj)` - [x] atomic writes (write temp then rename) - [x] Implement trace recording option: - [x] If `record_trace=true`, append received LinkDeltaBatch to JSONL/Parquet later --- ## 6) Timebase and pacing ### 6.1 Time modes - [x] Implement `timebase/modes.py` enum: - [x] `OFFLINE` (apply incoming ticks as fast as possible; no sleeping) - [x] `REALTIME` (apply incoming ticks at wall-clock pace) - [x] `PARALLEL` (lane selection mode; both lanes consume the same incoming ticks) - [x] Implement `timebase/clock.py` - [x] `SimulationTime` type for formatting/validation of incoming stream ticks - [x] conversions and formatting - [x] Implement `timebase/scheduler.py` - [x] implement **pacing**, not tick generation - [x] for REALTIME: sleep until expected wall-clock for next received tick - [x] for OFFLINE: apply each received tick immediately - [x] Add drift handling for REALTIME: - [x] if late by > 1 tick, either skip ticks or catch up (configurable) - [x] default: never skip control-plane ticks; warn if drift accumulates --- ## 7) Internal bus and message contracts ### 7.1 Canonical internal messages - [x] Implement `bus/messages.py` dataclasses: - [x] `TickUpdate`: - [x] run_id, scenario_ref - [x] tick_index, time - [x] link_updates: list - [x] link_removals: list - [x] events: list - [x] stats: compute timing, counts - [x] `RunControl` messages: - [x] start/pause/resume/stop - [x] `LaneStatus` messages: - [x] ready/running/error/stopped - [x] Implement `bus/queues.py` - [x] bounded asyncio queues - [x] per-lane queue limits (configurable) - [x] Implement `bus/fanout.py` - [x] one producer (Geo/RF stream consumer) - [x] N consumers (lane adapters + recorder) - [x] backpressure policy: - [x] default: block producer when any lane queue is full (strict sync) - [x] option: drop trace recorder only (never drop lane updates) - [x] Add message ordering rules: - [x] tick updates delivered in increasing tick_index - [x] within a tick: removals applied before updates by consumers (documented) --- ## 8) Geo/RF engine client integration ### 8.1 gRPC client - [x] Implement `geomrf/client.py` - [x] gRPC channel creation (`grpc.aio.insecure_channel(target)`) - [x] stub creation from generated proto - [x] `get_version()`, `get_capabilities()` - [x] `create_scenario(scenario_spec) -> scenario_ref` - [x] `close_scenario(scenario_ref)` - [x] `stream_link_deltas(request) -> async iterator` - [x] `stream_events(request) -> async iterator` - [x] Implement `geomrf/health.py` - [x] connect + health check on startup - [x] gate event-consumer features on engine schema/version support - [x] Implement `geomrf/translate.py` - [x] translate orchestrator ScenarioConfig to Geo/RF ScenarioSpec (engine-facing) - [x] enforce deterministic key ordering where needed for reproducible payloads - [x] validate all required proto fields before RPC call; reject locally on mismatch - [x] translate Geo/RF `LinkDeltaBatch` into internal `TickUpdate` - [x] Implement robust error mapping: - [x] map `NOT_FOUND`, `INVALID_ARGUMENT`, `FAILED_PRECONDITION`, `RESOURCE_EXHAUSTED` to typed `GeomrfError` - [x] define retry policy for `UNAVAILABLE`/`DEADLINE_EXCEEDED` (bounded retries + backoff) - [x] include scenario_ref and tick_index in error logs ### 8.2 Stream consumption tasks - [x] Implement `geomrf` stream consumer coroutine: - [x] starts `StreamLinkDeltas` with `emit_full_snapshot_first=true` - [x] reads batches and pushes `TickUpdate` to bus producer - [x] Implement event stream consumer coroutine (optional in v1): - [x] call `StreamEvents` with same `t_start/t_end/dt/selector` used for deltas - [x] consume `EngineEvent.tick_index` directly (no nearest-tick heuristics) - [x] record events to trace/metrics channel for observability - [x] if connected engine does not support aligned event schema, disable event consumer and warn once - [x] Merge/control strategy: - [x] lane control path uses `TickUpdate` from `StreamLinkDeltas` only - [x] event stream is informational and must not mutate lane state --- ## 9) Lane adapter architecture ### 9.1 Adapter base contract - [x] Implement `lanes/base.py`: - [x] `class LaneAdapter(Protocol)` or ABC with: - [x] `name: str` - [x] `async prepare(run_context, scenario_config) -> None` - [x] `async apply_tick(tick: TickUpdate) -> None` - [x] `async finalize(run_context) -> None` - [x] `async health() -> dict` (optional) - [x] Implement `lanes/registry.py` - [x] register adapters by name - [x] instantiate chosen adapters based on `lane_mode` - [x] Implement `tests/test_lane_adapter_contract.py` for interface compliance ### 9.2 Mininet lane adapter (detailed tasks) - [x] Implement `lanes/mininet_lane/adapter.py` - [x] `prepare()`: - [x] validate Linux prereqs (`ovs-vsctl`, `tc`, `ip`) - [x] start controller (ONOS/Ryu) if configured - [x] create Mininet topology (delegate to `topo.py`) - [x] start Mininet network - [x] start PCAP capture if enabled (delegate to `capture.py`) - [x] `apply_tick()`: - [x] apply removals (links down) first - [x] apply updates: - [x] for each link: set up/down state - [x] apply delay/loss/rate using shaping module - [x] `finalize()`: - [x] stop captures - [x] stop Mininet - [x] stop controller if orchestrator started it - [x] Implement `lanes/mininet_lane/topo.py` - [x] create a Mininet graph from ScenarioConfig node roles - [x] map SatSim node IDs to Mininet host/switch names - [x] define OVS switches and host attachments - [x] decide representation: - [x] v1 recommended: represent satellites as OVS switches; GS/UT as hosts - [x] allow optional SAT as hosts if needed - [x] create links but keep them initially “neutral” (shaping applied per tick) - [x] Implement `lanes/mininet_lane/shaping.py` - [x] provide functions: - [x] `set_link_up(link_id)` / `set_link_down(link_id)` - [x] `apply_netem(link_id, delay_ms, loss_pct)` - [x] `apply_rate(link_id, rate_mbps)` - [x] `clear_shaping(link_id)` - [x] implement using: - [x] `tc qdisc replace dev root netem delay ... loss ...` - [x] `tc qdisc ... tbf/htb` for rate - [x] ensure idempotency (repeated calls safe) - [x] log every applied shaping change with tick_index - [x] Implement `lanes/mininet_lane/controller.py` - [x] support controller options: - [x] external controller address (already running) - [x] orchestrator-launched controller container/process (optional v1) - [x] store controller version info in manifest - [x] Implement `lanes/mininet_lane/capture.py` - [x] start tcpdump for relevant interfaces - [x] rotate PCAP per time or per run (v1: one PCAP per run) - [x] store PCAP path in manifest ### 9.3 OMNeT lane adapter (trace-first v1) - [x] Implement `lanes/omnet_lane/adapter.py` - [x] v1 assumption: OMNeT consumes a **trace file** (offline) rather than live streaming - [x] Implement `lanes/omnet_lane/trace_ingest.py` - [x] orchestrator writes a LinkState trace file suitable for OMNeT adapter - [x] define a simple trace format: - [x] JSONL per tick containing updates/removals - [x] or CSV-like with (tick, src, dst, up, delay, rate, loss) - [x] ensure deterministic ordering of entries - [x] Implement `lanes/omnet_lane/runner.py` - [x] launch OMNeT simulation via subprocess: - [x] capture stdout/stderr to run logs - [x] exit code handling - [x] place outputs into artifacts directory --- ## 10) Orchestrator main runtime loop ### 10.1 Lifecycle coordination - [x] Implement `satsim_orch/main.py` orchestration steps: - [x] Create run context + artifact directories - [x] Log environment + versions - [x] Initialize Geo/RF client and fetch version/capabilities - [x] Create Geo/RF scenario - [x] Instantiate chosen lane adapters (mininet/omnet/parallel) - [x] Call `prepare()` for each lane - [x] Start stream consumer tasks - [x] Start realtime pacing task only when `time.mode=REALTIME` - [x] Await completion conditions: - [x] reached t_end - [x] user stop signal (CTRL+C) - [x] error in any task - [x] Finalize lanes - [x] Close Geo/RF scenario - [x] Write final manifest + summary ### 10.2 Streaming-driven execution (locked) - Geo/RF stream is the authoritative tick source. - Orchestrator does not generate ticks; it consumes them and fans out. Tasks: - [x] In streaming consumer, for each LinkDeltaBatch: - [x] translate to `TickUpdate` - [x] push to fanout bus ### 10.3 Fanout to lanes - [x] For each lane, run a consumer task: - [x] `while True: tick = await queue.get(); await lane.apply_tick(tick)` - [x] handle cancellation and lane errors - [x] Implement strict ordering: - [x] do not allow lane to process tick k+1 before tick k - [x] Implement shutdown handshake: - [x] send `RunControl(STOP)` to lanes on exit - [x] drain queues if configured --- ## 11) Error handling and shutdown ### 11.1 Exception strategy - [x] Any uncaught exception in: - [x] Geo/RF stream consumer - [x] any lane consumer - [x] any lane adapter method triggers a coordinated shutdown. - [x] Implement `runtime/process.py`: - [x] subprocess management with kill/terminate escalation - [x] collect exit codes and stderr tails - [x] Add SIGINT/SIGTERM handling: - [x] first CTRL+C: graceful stop - [x] second CTRL+C: immediate stop ### 11.2 Cleanup correctness - [x] Always attempt: - [x] `finalize()` lanes - [x] `close_scenario()` Geo/RF even when errors occur. - [x] Write final manifest including failure reason. --- ## 12) Metrics and run summaries ### 12.1 Metric recording - [x] Implement `metrics/records.py` - [x] standard metric record format for: - [x] tick compute time - [x] links emitted - [x] lane apply times (optional) - [x] Implement `metrics/exporters.py` - [x] JSONL writer to `metrics/` - [x] optional Prometheus exporter - [x] Implement per-tick timing: - [x] time spent translating batches - [x] time spent applying to each lane ### 12.2 Summary report (v1) - [x] Write a `summary.json` at end of run: - [x] total ticks, total links emitted, mean compute time, runtime duration - [x] lane success/failure states - [x] artifact paths (pcaps, traces, logs) --- ## 13) Integration tests (practical, not huge) ### 13.1 Smoke tests - [x] `test_geomrf_client_smoke.py` - [x] connect to Geo/RF engine on localhost - [x] create a tiny scenario (1 GS + 1 SAT) - [x] stream first 3 ticks and assert non-empty output - [x] `test_event_alignment_smoke.py` - [x] request deltas/events with identical `t_start/t_end/dt/selector` - [x] assert each event has `tick_index` and maps to existing/expected delta tick ### 13.2 Bus correctness - [x] `test_bus_fanout.py` - [x] ensure ticks delivered to all lanes in order - [x] ensure backpressure blocks producer when lane queue is full ### 13.3 Run manifest correctness - [x] `test_run_manifest.py` - [x] run manager writes expected keys - [x] manifest includes versions and config snapshot --- ## 14) Minimum viable orchestrator (v1) — acceptance criteria - [x] Can run `satsim run scenario.yaml --mode mininet` - [x] Geo/RF scenario created - [x] Link deltas streamed and applied via `tc/netem` - [x] PCAP recorded (optional) - [x] run artifacts written (logs, manifest) - [x] Can run `satsim run scenario.yaml --mode omnet` - [x] Geo/RF stream recorded to trace - [x] OMNeT launched consuming trace (or stubbed with clear TODO if not ready) - [x] run artifacts written - [x] Can run `satsim run scenario.yaml --mode parallel` - [x] both lane adapters receive identical tick updates - [x] lane adapters derive control only from link deltas - [x] optional events are captured in artifacts without driving lane state - [x] orchestrator shuts down cleanly on completion or CTRL+C --- ## 15) Optional but valuable v1.1 tasks (safe additions) - [ ] Orchestrator exposes its own gRPC stream `StreamTickUpdates` so lanes can subscribe remotely - [ ] Add NATS internal bus option for multi-process fanout - [ ] Add replay command: `satsim replay ` (use stored trace) - [ ] Add sweep runner: parameter grid search with repeated runs and consolidated summary ---