Event Bus
The AEGIS domain event bus — tokio::sync::broadcast, DomainEvent catalog, subscriber patterns, and Phase 1 constraints.
Event Bus
The AEGIS event bus is an in-process publish-subscribe channel that carries DomainEvent values between subsystems within the orchestrator. It is the integration mechanism between bounded contexts — subsystems publish events about state changes, and other subsystems react without direct coupling.
Implementation
The event bus is built on tokio::sync::broadcast. Each published event is delivered to all active subscribers.
// Configuration (aegis-config.yaml)
event_bus:
capacity: 1000 # ring buffer size; older events are dropped if fullThe channel has a ring‑buffer capacity of 1000 events. If subscribers are slow and the buffer fills, the oldest events are dropped. In Phase 1, this is acceptable because all subscribers are in-process and fast. Future persistent event streaming (e.g., Kafka) will replace this for durability.
DomainEvent Enum
All events on the bus are variants of the DomainEvent enum:
pub enum DomainEvent {
AgentLifecycle(AgentLifecycleEvent),
Execution(ExecutionEvent),
Policy(PolicyEvent),
Volume(VolumeEvent),
Storage(StorageEvent),
McpTool(MCPToolEvent),
Command(CommandExecutionEvent),
Smcp(SmcpEvent),
Swarm(SwarmEvent),
Workflow(WorkflowEvent),
}Event Catalog
Agent Lifecycle Events
| Event | Published By | When |
|---|---|---|
AgentDeployed | DeployAgentUseCase | Manifest accepted and persisted |
AgentUpdated | DeployAgentUseCase | Manifest updated for an existing agent |
AgentPaused | PauseAgentUseCase | Agent status set to paused |
AgentResumed | ResumeAgentUseCase | Agent status set to deployed |
AgentDeleted | DeleteAgentUseCase | Agent archived |
Execution Events
| Event | Published By | When |
|---|---|---|
ExecutionStarted | ExecutionSupervisor | Container started |
IterationStarted | ExecutionSupervisor | Iteration n begins |
IterationCompleted | ExecutionSupervisor | Inner loop returns final response |
IterationFailed | ExecutionSupervisor | Inner loop exits with error |
RefinementApplied | ExecutionSupervisor | Error context injected, next iteration beginning |
ExecutionCompleted | ExecutionSupervisor | All validators passed |
ExecutionFailed | ExecutionSupervisor | Max iterations reached without success |
ExecutionCancelled | CancelExecutionUseCase | Execution explicitly cancelled |
Volume Events
| Event | Published By | When |
|---|---|---|
VolumeCreated | VolumeManager | Volume provisioned in SeaweedFS |
VolumeAttached | VolumeManager | NFS mount active in container |
VolumeDetached | VolumeManager | Container stopped, mount released |
VolumeDeleted | VolumeManager | Volume deleted from SeaweedFS |
VolumeExpired | TTL reaper task | Ephemeral volume TTL elapsed |
VolumeQuotaExceeded | AegisFSAL | Write rejected due to size limit |
Storage Events (per-operation)
Published by AegisFSAL for every file operation:
FileOpened, FileRead, FileWritten, FileClosed, DirectoryListed, FileCreated, FileDeleted, PathTraversalBlocked, FilesystemPolicyViolation, QuotaExceeded, UnauthorizedVolumeAccess
MCP Tool Events
Published by the SmcpMiddleware and tool routing layer:
InvocationRequested, InvocationStarted, InvocationCompleted, InvocationFailed, ToolPolicyViolation
Command Execution Events
Published by the BuiltinDispatcher:
CommandExecutionStarted, CommandExecutionCompleted, CommandExecutionFailed, CommandPolicyViolation
SMCP Events
Published by AttestationService and SmcpMiddleware:
SessionCreated, AttestationSucceeded, AttestationFailed, ToolCallAuthorized, PolicyViolationBlocked, SignatureVerificationFailed, SecurityTokenExpired, SecurityTokenRefreshed, SessionRevoked, SessionExpired
Swarm Events
Published by StandardSwarmService:
SwarmCreated, SwarmDissolved, ChildAgentSpawned, ChildAgentCompleted, SpawnRejected, MessageSent, BroadcastSent, LockAcquired, LockReleased, LockExpired, CascadeCancellationInitiated, ChildCancelled
Active Subscribers
| Subscriber | Events Consumed | Purpose |
|---|---|---|
gRPC streaming handler | All Execution, Swarm events | Push to connected SDK/UI clients in real-time |
Audit logger | All SMCP, Policy, Command events | Structured log entries with signatures |
PostgreSQL event recorder | All events | Persist to domain_events table for history |
Metrics collector | Execution, Iteration, Tool events | Prometheus counters/histograms |
TTL reaper | VolumeCreated | Register ephemeral volumes for auto-deletion |
Publishing Events
Within the domain layer, events are published via the injected EventBus handle:
impl ExecutionSupervisor {
async fn complete_execution(&mut self, output: String) -> Result<()> {
self.execution.status = ExecutionStatus::Completed;
self.execution_repo.save(&self.execution).await?;
self.event_bus.publish(DomainEvent::Execution(
ExecutionEvent::ExecutionCompleted {
execution_id: self.execution.id,
final_output: output,
total_iterations: self.execution.iterations.len() as u8,
completed_at: Utc::now(),
}
))?;
Ok(())
}
}Subscribing to Events
Subscribers receive a cloned Receiver<DomainEvent> handle:
let mut rx = event_bus.subscribe();
tokio::spawn(async move {
while let Ok(event) = rx.recv().await {
match event {
DomainEvent::Execution(ExecutionEvent::ExecutionCompleted { execution_id, .. }) => {
// React to execution completion
}
DomainEvent::Smcp(SmcpEvent::PolicyViolationBlocked { agent_id, tool_name, .. }) => {
// Log security violation
}
_ => {}
}
}
});Phase 1 Constraints
In-memory only: The event bus does not persist events. If the orchestrator restarts, all in-flight events are lost. The PostgreSQL event recorder subscriber durably writes all events to the domain_events table — use this table for historical event queries.
No cross-node delivery: Each orchestrator node has its own event bus. In multi-node deployments (future), events are not automatically shared across nodes. Cross-node event streaming via a message broker (e.g., NATS, Kafka) is planned for post-Phase 1.
Lagged receiver: If a subscriber's receive loop is slow and the ring buffer fills (1000 events), the subscriber receives a RecvError::Lagged(n) indicating n events were dropped. Design subscribers to be fast — offload expensive processing to background tasks.