Event Backbone Architecture¶
CONCEPT:KG-2.7 — Vendor-Agnostic Event Backbone
Overview¶
The event backbone provides protocol-based pub/sub event streaming for
graph mutations, task queues, telemetry, and evolution triggers. It follows
the same pattern as GraphBackend — abstract protocol with an in-memory
default and optional distributed backends.
Architecture¶
graph TD
subgraph Producers
IGE["IntelligenceGraphEngine"]
TM["TaskManager"]
AHE["AHE Harness"]
SYNC["KafkaGraphSyncDaemon"]
end
subgraph EventBackend["Event Backbone (Protocol)"]
MEB["MemoryEventBackend<br/>(in-process, zero-dep)"]
KEB["RedpandaEventBackend<br/>(distributed, persistent, confluent-kafka)"]
end
subgraph Topics
T1["kg.mutations"]
T2["kg.tasks"]
T3["kg.staging"]
T4["kg.telemetry"]
T5["kg.evolution"]
end
subgraph Consumers
L1["L1 GraphComputeEngine<br/>(via SyncDaemon)"]
WSM["WorkingSetManager"]
TELE["Telemetry Pipeline"]
EVOL["Evolution Engine"]
end
IGE -->|publish| T1
TM -->|publish| T2
AHE -->|publish| T5
T1 -->|subscribe| SYNC
SYNC -->|apply mutations| L1
T2 -->|subscribe| WSM
T4 -->|subscribe| TELE
T5 -->|subscribe| EVOL
style MEB fill:#d5e8d4,stroke:#82b366
style KEB fill:#dae8fe,stroke:#6c8ebf
Topic Taxonomy¶
| Topic | Purpose | Retention | Cleanup Policy |
|---|---|---|---|
kg.mutations |
Graph CRUD events (add/update/delete node/edge) | 7 days | compact + delete |
kg.tasks |
Task queue scheduling and completion | 3 days | delete |
kg.staging |
Staged graph payloads awaiting write | 1 day | delete |
kg.telemetry |
Agent traces, latency, error rates | 1 day | delete |
kg.evolution |
Self-improvement triggers, AHE cycle events | 7 days | compact + delete |
Event Schema¶
All events are JSON-serialized with the following base structure:
{
"action": "add_node",
"data": {
"id": "agent-001",
"properties": {"type": "Agent", "name": "Research Agent"}
},
"timestamp": 1716912000.0,
"source": "IntelligenceGraphEngine"
}
Mutation Actions¶
| Action | Data Fields | Description |
|---|---|---|
add_node |
id, properties |
Add or update a node |
add_edge |
source, target, properties |
Add or update an edge |
remove_node |
id |
Remove a node and its edges |
remove_edge |
source, target |
Remove a specific edge |
Backend Selection¶
from agent_utilities.knowledge_graph.core.event_backend import create_event_backend
# In-memory (default, zero config, single-process)
backend = create_event_backend("memory")
# Redpanda (production, multi-process)
backend = create_event_backend("redpanda", bootstrap_servers="redpanda:9092")
Environment Variables¶
| Variable | Default | Description |
|---|---|---|
EVENT_BACKEND |
redpanda |
Backend type (memory, redpanda) — falls back to memory unless KAFKA_ENABLED=true |
KAFKA_ENABLED |
false |
Gate that enables the distributed backend; otherwise MemoryEventBackend is used |
REDPANDA_BROKERS |
localhost:9092 |
Broker addresses (Kafka/Redpanda) |
REDPANDA_CONSUMER_GROUP |
agent-utilities |
Default consumer group |
REDPANDA_SECURITY_PROTOCOL |
PLAINTEXT |
Security protocol |
Graph Sync Daemon¶
The KafkaGraphSyncDaemon (in core/kafka_graph_sync.py) ensures L1 (Rust
GraphComputeEngine) stays synchronized with L3 (persistent backend):
sequenceDiagram
participant L3 as L3 Persistent
participant EB as EventBackend
participant SD as SyncDaemon
participant L1 as L1 Rust Engine
L3->>EB: publish(kg.mutations, {add_node...})
EB->>SD: on_mutation(topic, event)
Note over SD: Buffer (100ms batch)
SD->>L1: flush_batch() → add_node(id, props)
Note over SD: Every 5 min
SD->>L3: reconcile() → diff L1 vs L3
SD->>L1: repair drift
Failure Modes¶
| Failure Mode | Mitigation |
|---|---|
| Redpanda unavailable | Auto-fallback to MemoryEventBackend |
| Consumer lag > 10K | Circuit breaker → full L1 reload from L3 |
| L1↔L3 drift | 5-minute reconciliation daemon |
| Duplicate events | Idempotent dedup via (action, id, timestamp) key |
Docker Deployment¶
# Start Kafka (KRaft mode, no Zookeeper) — also runs Redpanda-compatible brokers
docker compose -f docker/kafka-kraft.compose.yml up -d
# Verify topics
docker exec agent-utilities-kafka \
/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
Dependencies¶
Ingest Task Queue Scale-Out (CONCEPT:KG-2.55 / KG-2.56 / KG-2.57)¶
Separate from the pub/sub event backbone above, the durable ingest task
queue (the queue submit_task writes and the ingest workers drain) is
selectable, fail-loud, and — with Kafka — horizontally scalable.
Three queue modes¶
| Mode | Selected by | Scope | Workers |
|---|---|---|---|
sqlite |
default (nothing set) | one host (per-host kg_task_queue.db) |
in-process daemon threads on the flock host |
postgres |
auto when STATE_DB_URI is set, or TASK_QUEUE_BACKEND=postgres |
fleet (one shared queue, FOR UPDATE SKIP LOCKED claims — KG-2.54) |
in-process threads on every participating host |
kafka |
TASK_QUEUE_BACKEND=kafka |
fleet (keyed kg_tasks topic) |
the kg-ingest consumer group: the host engine's pool plus any number of decoupled kg-ingest-worker processes |
Selection contract (KG-2.55):
TASK_QUEUE_BACKENDunset → auto:postgreswhenSTATE_DB_URIis set, elsesqlite. Auto stays graceful — an unreachable Postgres degrades to the per-host SQLite queue with a warning.TASK_QUEUE_BACKEND=kafka|postgresset explicitly → fail-loud: an unreachable broker/state store raisesTaskQueueUnavailableat startup with the endpoint and fall-back instructions. Never a silent degrade.
Partition-key hierarchy (KG-2.56)¶
Producers key every kg_tasks message; Kafka guarantees ordering per key
without serializing unrelated work. First match wins:
tenant:<id>— ambientActorContext.tenant_id(multi-tenant ordering);corpus:<repo>— the ingest target's repo/corpus identifier (batch-ingest provenancefull_path, else the path-derived repo root) — per-repo ordering for codebase fan-out;type:<task_type>— coarsest bucket for everything else.
kg_tasks is ensured idempotently at startup with KG_TASKS_PARTITIONS
partitions (default 6). Grow-only: raising the flag adds partitions; an
existing topic is never shrunk. Partition count bounds the consumer group's
maximum parallelism.
Ordering & idempotency guarantees (KG-2.57)¶
- At-least-once delivery. Offsets are committed only after a task completes (or is durably marked failed); worker crashes redeliver.
- Idempotent claims.
job_idis the idempotency key: a consumer claims by MERGE-ing the:Tasknode torunning(skipping jobs already running/completed/failed/cancelled), guarded cross-host by the KG-2.54state_claim_guardadvisory lock whenSTATE_DB_URIis set. Graph writes are MERGE-based, so rare duplicate executions converge. - Per-key ordering only. There is no global order and no cross-partition
priority lane (the graph-polling modes'
priority=highfast path does not apply in Kafka mode). - Zombie recovery. Uncommitted offsets redeliver automatically; the task
reaper additionally re-publishes reaped orphans to the topic (in Kafka mode
nothing polls
pendingnodes).
Worker deployment shape¶
# Host engine (unchanged): in Kafka mode its worker pool simply joins kg-ingest.
TASK_QUEUE_BACKEND=kafka KAFKA_BOOTSTRAP_SERVERS=kafka:9092 graph-os-daemon
# Scale out: N decoupled workers, any host, NO KG host role required.
# Each is an engine *client* (Rust daemon over TCP/UDS + OS-5.14 HMAC secret).
GRAPH_SERVICE_TCP_ADDR=engine-host:9100 \
GRAPH_SERVICE_AUTH_SECRET=... \
KAFKA_BOOTSTRAP_SERVERS=kafka:9092 \
kg-ingest-worker # or: python -m agent_utilities.knowledge_graph.ingest_worker
Per-host concurrency autosizes with the shared CPU/memory sizer
(compute_ingest_worker_count: ~36% of cores, ~3 GB RAM per worker, floor 2);
override with --workers / KG_INGESTION_WORKERS.
Backpressure & lag visibility¶
The leader host's maintenance scheduler samples the queue every pass and publishes to the OS-5.23 gateway Prometheus registry:
agent_utilities_kg_ingest_queue_depth{backend}— pending tasks in the selected backend (uniform across sqlite/postgres/kafka);agent_utilities_kg_ingest_consumer_lag{topic,group}— totalkg-ingestgroup lag onkg_tasks(Kafka mode).
The batch orchestrator's deferral and the maintenance bulk-defer gate read the
same uniform number via engine.ingest_queue_depth() (queue backlog + in-graph
pending/running :Task nodes), so backpressure behaves identically in all
three modes.