Queue-Driven Agent Dispatch¶
Concept: ORCH-1.45 — session-partitioned agent-turn queue consumed by a stateless dispatch-worker fleet. Builds directly on OS-5.16 (state externalization), KG-2.54–2.57 (the durable task-queue stack), OS-5.14 (worker auth) and OS-5.18 (the fleet supervisory plane).
The problem¶
The cognitive scheduler (core/cognitive_scheduler.py) is an in-process
asyncio priority queue: max_concurrent agent turns per process, a
per-process _processes table, and no cross-host dispatch. Even with
sessions/goals externalized to Postgres (OS-5.16), execution stayed pinned
to the host that accepted the goal — a busy gateway could not hand a turn to
an idle peer, and the scheduler tier could not scale horizontally.
The model¶
caller ──► enqueue seam ──► agent_turns queue ──► dispatch worker (any host)
(job handle) key = session_id │
├─ session_execution_guard
├─ claim (idempotent)
├─ EXISTING execution body
└─ durable writeback + ack
- Envelope, not payload.
AgentTurnEnvelope(orchestration/agent_dispatch.py) carriesjob_id(the idempotency key),session_id,kind(goal_loop|orchestrator_task),payload_ref, tenant, priority, deadline. Bodies live in the durable stores the envelope references — thegoals/sessionsrows (the full goal spec is persisted in the session'smetadata_json) or the:Taskgraph node. - Transport = the existing KG-2.55 stack.
TASK_QUEUE_BACKEND/auto picks Kafka (agent_turnstopic, consumer groupagent-dispatch), Postgres (SKIP LOCKED claims onagent_dispatch_queuein the shared state store), or the zero-infra per-host SQLite file. The same fail-loud contract applies: an explicitly selected kafka/postgres transport that is unreachable raisesTaskQueueUnavailableinstead of silently islanding the queue. - Enqueue seams.
graph_orchestrate action=dispatchand the goal machinery (core/sessions.create_goal) check one flag,AGENT_DISPATCH_BACKEND(inline|queue, defaultinline). Inline is the previous in-process behavior byte-for-byte. In queue mode the caller gets a job handle — pollgraph_orchestrate action=status//api/graph/orchestrate/job/{job_id}(orchestrator jobs) or the goals API (goal runs). - Worker (
orchestration/agent_dispatch_worker.py, console scriptagent-dispatch-worker): claims the referenced record, rehydrates state from the shared store, and executes through the existing bodies —run_goal_loopfor goals, the orchestration manager's agent execution for orchestrator jobs (the same extraction discipline as the KG-2.57kg-ingestworker: relocate, never duplicate). Those bodies already write turns, iterations and final status back into the durable stores.
Ordering: session beats tenant¶
partition_key_for (KG-2.56) gains session:<id> at the top of the key
hierarchy. Turn N+1 of a session reads the state turn N wrote — interleaving
two turns of one session corrupts the conversation, so per-session serial
execution is a correctness requirement, while tenant keying is only an
ordering/fairness grouping for ingest work. A session never spans tenants, so
session-keying cannot weaken tenant isolation. Distinct sessions spread
across partitions and execute in parallel; AGENT_TURNS_PARTITIONS
(default 6, grow-only) bounds fleet-wide session concurrency on Kafka.
Delivery and idempotency guarantees¶
- At-least-once. The ack/offset-commit happens strictly AFTER a turn is processed or durably marked failed. A worker crash redelivers the envelope: Kafka group rebalance, Postgres visibility timeout (600 s), SQLite head-until-ack.
- Idempotent claims.
job_id/payload_refis the idempotency key. The claim check skips terminal jobs (duplicate delivery) and jobs whoserunningclaim is fresh (a live worker owns them); a claim older thanCLAIM_TTL_S(1 h) is presumed dead and re-claimed — crash → claim expiry → requeue, the ingest reaper pattern folded into the claim itself. - Per-session mutual exclusion.
session_execution_guardholds a process-local per-session lock plus the OS-5.16 Postgres advisory lock (agent-session:<id>) for the claim+execute+writeback cycle. At-least-once delivery can hand one session's turn to two workers; the guard guarantees exactly one executes, the other claims-and-skips. A crashed holder releases the advisory lock server-side, so recovery is redelivery + re-claim, never a stuck session. - Deadlines. A turn consumed after
deadline_unixis durably failed ("expired") without execution.
Placement: queue-pull, no central placer¶
Workers pull turns when they have capacity; nothing pushes work at them. At this stage that is the correct design, not a shortcut: the partitioned queue already provides per-session serialization (the only hard placement constraint), uniform load spreading across consumers, and automatic rebalancing when workers join/leave — a central placer would add a coordination point, a failure mode, and rebalance churn while enforcing nothing the partition key doesn't already enforce. Affinity-aware placement (HRW routing toward workers with warm session caches) is future work layered on the same envelope, worth its complexity only once checkpoint-rehydration cost dominates turn latency.
Fleet visibility¶
- Workers heartbeat into the
dispatch_workerstable of the sessions store (the same registry surface OS-5.18 reads): worker id, host, capacity, active sessions, transport, liveness. Stale heartbeats (> 90 s) drop out. /api/fleet/topologyreturnsdispatch_workers(+ a totals count).graph_orchestrate job/{id}shows which worker/host executed: the claim stampsclaimed_by/dispatch_hoston the:Tasknode and completion stampsexecuted_by; goal rows carry the worker token inowner_host.- Metrics on the OS-5.23 registry:
agent_utilities_dispatch_queue_depth{backend},agent_utilities_dispatch_turns_total{outcome},agent_utilities_dispatch_workers.
Deployment shape¶
A horizontally scaled deployment is now four independent tiers:
| Tier | Scales by | State |
|---|---|---|
Stateless gateways (GATEWAY_WORKERS, N hosts) |
request volume | none (OS-5.16) |
N dispatch workers (agent-dispatch-worker) |
active agent turns | none — claims + shared store |
M ingest workers (kg-ingest-worker, KG-2.57) |
ingest backlog | none |
Engine shards (GRAPH_SERVICE_ENDPOINTS, KG-2.58) + state Postgres + Kafka |
resident population / events | durable |
Flags: STATE_DB_URI (required for multi-host dispatch — sessions must be
shared), TASK_QUEUE_BACKEND=kafka (or postgres), AGENT_DISPATCH_BACKEND=queue,
AGENT_TURNS_PARTITIONS. Single-host/dev needs none of them: inline dispatch
and the SQLite transport remain the zero-infra default.
Testing¶
tests/unit/test_agent_dispatch.py — envelope round-trip, session-key
precedence, inline-mode live-path (unchanged behavior), queue-mode job
handles on both seams, worker claim/execute/writeback against a fake queue +
real SQLite sessions store, two-workers-one-session mutual exclusion,
stale-claim crash-requeue, deadline expiry, poison-envelope tolerance,
heartbeat/topology/metrics surfaces. No broker, Postgres or engine daemon
required.