Subscription Commands
Commands for real-time event streaming.
WATCH_INSTANCE
Subscribes to events on a specific instance.
Request
{
"op": "WATCH_INSTANCE",
"params": {
"instance_id": "order-001"
}
}
| Parameter | Type | Required | Description |
|---|---|---|---|
instance_id | string | Yes | Instance to watch |
Response
{
"status": "ok",
"result": {
"subscription_id": "sub-abc123",
"instance_id": "order-001",
"current_state": "pending"
}
}
Events
After subscribing, events are pushed when the instance changes:
{
"type": "event",
"subscription_id": "sub-abc123",
"event": {
"instance_id": "order-001",
"machine": "order",
"version": 1,
"event": "PAY",
"from_state": "pending",
"to_state": "paid",
"payload": {"amount": 99.99},
"timestamp": "2024-01-15T10:30:00Z",
"wal_offset": 12345
}
}
Errors
| Code | Description |
|---|---|
INSTANCE_NOT_FOUND | Instance doesn't exist |
WATCH_ALL
Subscribes to events across all instances with optional filtering.
Request
{
"op": "WATCH_ALL",
"params": {
"machines": ["order", "payment"],
"events": ["PAY", "REFUND"],
"from_states": ["pending"],
"to_states": ["paid", "refunded"],
"from_offset": 0
}
}
| Parameter | Type | Required | Description |
|---|---|---|---|
machines | string[] | No | Filter by machine names |
events | string[] | No | Filter by event names |
from_states | string[] | No | Filter by source state |
to_states | string[] | No | Filter by target state |
from_offset | integer | No | Start from WAL offset |
All filters are optional. If omitted, all events are delivered.
Filter Behavior
Filters use AND logic between different filter types:
{
"machines": ["order"], // AND
"to_states": ["shipped"] // = orders that transition TO shipped
}
Within a filter, values use OR logic:
{
"to_states": ["shipped", "delivered"] // shipped OR delivered
}
Response
{
"status": "ok",
"result": {
"subscription_id": "sub-xyz789",
"filters": {
"machines": ["order", "payment"],
"to_states": ["paid", "refunded"]
}
}
}
Events
{
"type": "event",
"subscription_id": "sub-xyz789",
"event": {
"instance_id": "order-001",
"machine": "order",
"version": 1,
"event": "PAY",
"from_state": "pending",
"to_state": "paid",
"payload": {"amount": 99.99},
"timestamp": "2024-01-15T10:30:00Z",
"wal_offset": 12345
}
}
Replay from Offset
Use from_offset to replay events from a specific point:
{
"op": "WATCH_ALL",
"params": {
"from_offset": 10000
}
}
This delivers:
- All events from offset 10000 to current (historical)
- New events as they occur (live)
Useful for:
- Catching up after disconnect
- Building read models
- Event synchronization
UNWATCH
Cancels a subscription.
Request
{
"op": "UNWATCH",
"params": {
"subscription_id": "sub-abc123"
}
}
| Parameter | Type | Required | Description |
|---|---|---|---|
subscription_id | string | Yes | Subscription to cancel |
Response
{
"status": "ok",
"result": {
"cancelled": true
}
}
Errors
| Code | Description |
|---|---|
NOT_FOUND | Subscription doesn't exist |
Event Message Format
All subscription events have this format:
{
"type": "event",
"subscription_id": "sub-abc123",
"event": {
"instance_id": "order-001",
"machine": "order",
"version": 1,
"event": "PAY",
"from_state": "pending",
"to_state": "paid",
"payload": {"amount": 99.99},
"context": {"customer": "alice", "amount": 99.99},
"timestamp": "2024-01-15T10:30:00Z",
"wal_offset": 12345
}
}
| Field | Description |
|---|---|
instance_id | Affected instance |
machine | Machine name |
version | Machine version |
event | Event that was applied |
from_state | Previous state |
to_state | New state |
payload | Event payload |
context | Full context after event |
timestamp | Event timestamp |
wal_offset | WAL position |
Examples
Watch Shipped Orders
{
"op": "WATCH_ALL",
"params": {
"machines": ["order"],
"to_states": ["shipped"]
}
}
Watch All Failures
{
"op": "WATCH_ALL",
"params": {
"events": ["FAIL", "ERROR", "REJECT"]
}
}
Replay and Follow
// Get current WAL position
{"op": "WAL_STATS"}
// Response: {"result": {"current_offset": 50000}}
// Subscribe from beginning
{"op": "WATCH_ALL", "params": {"from_offset": 0}}
// Or from a saved checkpoint
{"op": "WATCH_ALL", "params": {"from_offset": 45000}}
Multiple Subscriptions
A single connection can have multiple subscriptions:
// Watch orders
{"op": "WATCH_ALL", "params": {"machines": ["order"]}}
// Response: subscription_id = "sub-1"
// Watch payments
{"op": "WATCH_ALL", "params": {"machines": ["payment"]}}
// Response: subscription_id = "sub-2"
// Events arrive with their subscription_id
{"type": "event", "subscription_id": "sub-1", ...}
{"type": "event", "subscription_id": "sub-2", ...}
Best Practices
Track Offsets for Reliability
Store the last processed offset for recovery:
let lastOffset = loadCheckpoint();
subscribe({from_offset: lastOffset}, (event) => {
processEvent(event);
lastOffset = event.wal_offset;
saveCheckpoint(lastOffset);
});
Use Specific Filters
Narrow filters reduce network traffic and processing:
// Good - specific
{"machines": ["order"], "to_states": ["shipped"]}
// Avoid - too broad
{} // All events
Handle Disconnects
Re-subscribe from last known offset:
async function reliableSubscribe(filters, handler) {
let offset = loadOffset();
while (true) {
try {
await subscribe({...filters, from_offset: offset}, (event) => {
handler(event);
offset = event.wal_offset;
saveOffset(offset);
});
} catch (error) {
if (isDisconnect(error)) {
await sleep(1000);
continue; // Reconnect
}
throw error;
}
}
}