Skip to content

Event Streaming

The Question: How does the Company Brain stay current when work happens across Slack, Jira, GitHub, CRM, and 50 other systems?


The Problem: Batch vs. Real-Time

The existing ingest_external_batch API supports batch ingestion — an actor explicitly triggers a data import. This works for periodic synchronization but fails for operational state: by the time you batch-import, the state is already stale.

Real-time event streaming means the Company Brain updates as work happens — a Slack message posts, a Jira ticket moves, a PR merges, a customer call ends, and the graph reflects it immediately.


Event Sources

The EventStreamIngester supports multiple source types:

Source Type Description Example
WEBHOOK External system pushes events via HTTP Slack events API, Jira webhooks, GitHub webhooks
KAFKA Apache Kafka topic consumer Enterprise event bus
NATS NATS JetStream consumer Microservice events
REDIS_STREAM Redis Streams consumer Real-time notifications
POLLING Active polling of an external API REST APIs without webhook support
CDC Change Data Capture from a database PostgreSQL logical replication
MCP Model Context Protocol server events Agent ecosystem events
A2A Agent-to-Agent protocol messages Inter-agent communication

Registering Event Streams

from agent_utilities.knowledge_graph.core.company_brain import CompanyBrain
from agent_utilities.models.company_brain import (
    ActorType, EventSourceType, EventStreamConfig, WebhookEvent
)

brain = CompanyBrain()

# Register a Slack webhook stream
brain.events.register_stream(EventStreamConfig(
    name="Slack Engineering",
    source_type=EventSourceType.WEBHOOK,
    endpoint="https://hooks.slack.com/events/T0001/...",
    tenant_id="engineering",
    actor_id="service:slack-integration",
    actor_type=ActorType.AUTOMATED_SERVICE,
    batch_size=10,
))

# Register a GitHub webhook stream
brain.events.register_stream(EventStreamConfig(
    name="GitHub PRs",
    source_type=EventSourceType.WEBHOOK,
    endpoint="https://api.github.com/webhooks/...",
    tenant_id="engineering",
    actor_id="service:github-integration",
    actor_type=ActorType.AUTOMATED_SERVICE,
))

Submitting Events

Events are submitted to the queue and processed in batches:

# A human posts a message in Slack
brain.events.submit_event(WebhookEvent(
    source_type="slack",
    event_type="message.posted",
    payload={
        "channel": "#engineering",
        "user": "jane",
        "text": "Deploying API gateway v2.3 to production",
    },
    actor_id="analyst:jane",
    actor_type=ActorType.HUMAN,
    tenant_id="engineering",
))

# An AI agent creates a Jira ticket
brain.events.submit_event(WebhookEvent(
    source_type="jira",
    event_type="issue.created",
    payload={
        "project": "ENG",
        "summary": "API gateway health check timeout",
        "priority": "high",
    },
    actor_id="agent:monitoring",
    actor_type=ActorType.AI_AGENT,
    tenant_id="engineering",
))

# Process all queued events
result = brain.events.process_batch()
print(f"Ingested {result.events_ingested}/{result.events_received} events")
print(f"Created {result.nodes_created} nodes in {result.duration_ms:.1f}ms")

Ingestion Results

Every batch produces an IngestionResult with metrics:

Field Description
events_received Total events in the batch
events_ingested Successfully ingested
events_failed Failed (with retry)
nodes_created New graph nodes
edges_created New graph edges
conflicts_detected Conflicts found during ingestion
duration_ms Processing time

Continuous OWL Reasoning

When significant state changes arrive via event streams, the Company Brain can trigger OWLBridge.run_cycle() to discover new inferred facts. This transforms passive data ingestion into active knowledge discovery: a customer call triggers a Slack event, which updates the graph, which triggers OWL reasoning, which discovers that the customer is now in a new risk category.