Source:
docs/manual/event-bus.mdThis page is generated by
site/scripts/sync-manual-docs.mjs.
Event Bus and NATS JetStream Guide
Cruvero runtime events are carried on NATS and persisted with JetStream when CRUVERO_EVENTS_BACKEND=nats.
Source: internal/events/*, cmd/event-bus/, cmd/event-replay/, cmd/event-bench/
Architecture
Cruvero creates and manages these JetStream streams on startup (internal/events/streams.go):
| Stream | Subject Pattern | Retention | Typical Payloads |
|---|---|---|---|
CRUVERO_EVENTS | <prefix>.events.> | LimitsPolicy (72h) | Run lifecycle, step events, tool execution events |
CRUVERO_AUDIT | <prefix>.audit.> | InterestPolicy (30d) | Audit records and verification events |
CRUVERO_EMBED | <prefix>.embed.> | WorkQueuePolicy (24h) | Async embedding requests/results/DLQ |
CRUVERO_MCP | <prefix>.mcp.> | LimitsPolicy (1h, memory) | MCP announce/heartbeat/deregister |
CRUVERO_MCPGW | mcpgw.> | LimitsPolicy (24h) | MCP gateway events and config subjects |
<prefix> is CRUVERO_EVENTS_SUBJECT_PREFIX (default: cruvero).
event-bus CLI
cmd/event-bus is the operator utility for live inspection and control.
Global flags
| Flag | Description | Default |
|---|---|---|
--nats-url | NATS server URL | CRUVERO_NATS_URL or nats://localhost:4222 |
--format | Output format (table or json) | table |
Subcommands
| Subcommand | Purpose | Example |
|---|---|---|
status | Connectivity + stream health summary | go run ./cmd/event-bus --format table status |
streams | List stream configuration and state | go run ./cmd/event-bus streams |
consumers | List consumers across streams | go run ./cmd/event-bus consumers --format json |
subscribe <subject> | Tail live events on a subject/pattern | go run ./cmd/event-bus subscribe 'cruvero.events.>' |
publish <subject> <json> | Publish test event payload | go run ./cmd/event-bus publish cruvero.events.test '{"ok":true}' |
purge <stream> | Purge all messages from a stream | go run ./cmd/event-bus purge CRUVERO_EVENTS |
Note: status currently checks CRUVERO_EVENTS, CRUVERO_AUDIT, CRUVERO_EMBED, and CRUVERO_MCP directly.
Related Operational CLIs
event-replay
Replay historical events from JetStream by subject:
go run ./cmd/event-replay \
--subject 'cruvero.events.run.>' \
--since 2h \
--format table
Key flags:
--subject(required)--sinceor--start-time--stream(optional override)--limit--format json|table
event-bench
Load-test the event path end-to-end (publisher -> NATS -> UI SSE stream):
go run ./cmd/event-bench \
--ui-url http://localhost:8080 \
--workflow-id bench-run-1 \
--steady-events 10000 \
--steady-rate 100
This command verifies delivery ratio and latency percentiles (p50, p95, p99).
Configuration
Required baseline
| Variable | Purpose | Typical Value |
|---|---|---|
CRUVERO_EVENTS_BACKEND | Select event backend | nats |
CRUVERO_EVENTS_SUBJECT_PREFIX | Subject namespace | cruvero |
CRUVERO_NATS_URL | NATS endpoint | nats://localhost:4222 |
NATS connection tuning
| Variable | Purpose |
|---|---|
CRUVERO_NATS_CREDS_FILE | NATS creds file for auth |
CRUVERO_NATS_TLS | TLS mode (auto or false) |
CRUVERO_NATS_CONNECT_TIMEOUT | Initial connect timeout |
CRUVERO_NATS_RECONNECT_WAIT | Delay between reconnect attempts |
CRUVERO_NATS_MAX_RECONNECTS | Max reconnect attempts (-1 = unlimited) |
CRUVERO_NATS_STREAM_REPLICAS | Stream replica count for JetStream |
Common Event Subjects
| Domain | Subject Pattern |
|---|---|
| Runtime events | <prefix>.events.* |
| Audit events | <prefix>.audit.* |
| Embedding pipeline | <prefix>.embed.requests, <prefix>.embed.results.*, <prefix>.embed.dlq |
| MCP discovery | <prefix>.mcp.announce, <prefix>.mcp.heartbeat, <prefix>.mcp.deregister |
| MCP gateway | mcpgw.<gateway_id>.events.*, mcpgw.<gateway_id>.config.* |
Troubleshooting
- Verify NATS and JetStream health:
go run ./cmd/event-bus status
- Confirm stream topology:
go run ./cmd/event-bus streams --format json
- Watch a live subject while exercising the system:
go run ./cmd/event-bus subscribe 'cruvero.events.>'
- Replay recent events for post-incident analysis:
go run ./cmd/event-replay --subject 'cruvero.audit.>' --since 30m --format table
Integration Notes
- Audit logging can consume and persist event-bus traffic.
- MCP discovery uses event-bus subjects for server lifecycle announcements.
- Embedding workers consume
embed.requestsand publish results/DLQ on the same bus.