Skip to main content

Source: docs/manual/event-bus.md

This page is generated by site/scripts/sync-manual-docs.mjs.

Event Bus and NATS JetStream Guide

Cruvero runtime events are carried on NATS and persisted with JetStream when CRUVERO_EVENTS_BACKEND=nats.

Source: internal/events/*, cmd/event-bus/, cmd/event-replay/, cmd/event-bench/

Architecture

Cruvero creates and manages these JetStream streams on startup (internal/events/streams.go):

StreamSubject PatternRetentionTypical Payloads
CRUVERO_EVENTS<prefix>.events.>LimitsPolicy (72h)Run lifecycle, step events, tool execution events
CRUVERO_AUDIT<prefix>.audit.>InterestPolicy (30d)Audit records and verification events
CRUVERO_EMBED<prefix>.embed.>WorkQueuePolicy (24h)Async embedding requests/results/DLQ
CRUVERO_MCP<prefix>.mcp.>LimitsPolicy (1h, memory)MCP announce/heartbeat/deregister
CRUVERO_MCPGWmcpgw.>LimitsPolicy (24h)MCP gateway events and config subjects

<prefix> is CRUVERO_EVENTS_SUBJECT_PREFIX (default: cruvero).

event-bus CLI

cmd/event-bus is the operator utility for live inspection and control.

Global flags

FlagDescriptionDefault
--nats-urlNATS server URLCRUVERO_NATS_URL or nats://localhost:4222
--formatOutput format (table or json)table

Subcommands

SubcommandPurposeExample
statusConnectivity + stream health summarygo run ./cmd/event-bus --format table status
streamsList stream configuration and statego run ./cmd/event-bus streams
consumersList consumers across streamsgo run ./cmd/event-bus consumers --format json
subscribe <subject>Tail live events on a subject/patterngo run ./cmd/event-bus subscribe 'cruvero.events.>'
publish <subject> <json>Publish test event payloadgo run ./cmd/event-bus publish cruvero.events.test '{"ok":true}'
purge <stream>Purge all messages from a streamgo run ./cmd/event-bus purge CRUVERO_EVENTS

Note: status currently checks CRUVERO_EVENTS, CRUVERO_AUDIT, CRUVERO_EMBED, and CRUVERO_MCP directly.

event-replay

Replay historical events from JetStream by subject:

go run ./cmd/event-replay \
--subject 'cruvero.events.run.>' \
--since 2h \
--format table

Key flags:

  • --subject (required)
  • --since or --start-time
  • --stream (optional override)
  • --limit
  • --format json|table

event-bench

Load-test the event path end-to-end (publisher -> NATS -> UI SSE stream):

go run ./cmd/event-bench \
--ui-url http://localhost:8080 \
--workflow-id bench-run-1 \
--steady-events 10000 \
--steady-rate 100

This command verifies delivery ratio and latency percentiles (p50, p95, p99).

Configuration

Required baseline

VariablePurposeTypical Value
CRUVERO_EVENTS_BACKENDSelect event backendnats
CRUVERO_EVENTS_SUBJECT_PREFIXSubject namespacecruvero
CRUVERO_NATS_URLNATS endpointnats://localhost:4222

NATS connection tuning

VariablePurpose
CRUVERO_NATS_CREDS_FILENATS creds file for auth
CRUVERO_NATS_TLSTLS mode (auto or false)
CRUVERO_NATS_CONNECT_TIMEOUTInitial connect timeout
CRUVERO_NATS_RECONNECT_WAITDelay between reconnect attempts
CRUVERO_NATS_MAX_RECONNECTSMax reconnect attempts (-1 = unlimited)
CRUVERO_NATS_STREAM_REPLICASStream replica count for JetStream

Common Event Subjects

DomainSubject Pattern
Runtime events<prefix>.events.*
Audit events<prefix>.audit.*
Embedding pipeline<prefix>.embed.requests, <prefix>.embed.results.*, <prefix>.embed.dlq
MCP discovery<prefix>.mcp.announce, <prefix>.mcp.heartbeat, <prefix>.mcp.deregister
MCP gatewaymcpgw.<gateway_id>.events.*, mcpgw.<gateway_id>.config.*

Troubleshooting

  1. Verify NATS and JetStream health:
go run ./cmd/event-bus status
  1. Confirm stream topology:
go run ./cmd/event-bus streams --format json
  1. Watch a live subject while exercising the system:
go run ./cmd/event-bus subscribe 'cruvero.events.>'
  1. Replay recent events for post-incident analysis:
go run ./cmd/event-replay --subject 'cruvero.audit.>' --since 30m --format table

Integration Notes

  • Audit logging can consume and persist event-bus traffic.
  • MCP discovery uses event-bus subjects for server lifecycle announcements.
  • Embedding workers consume embed.requests and publish results/DLQ on the same bus.