Skip to main content

Subscription Commands

Commands for real-time event streaming.

WATCH_INSTANCE

Subscribes to events on a specific instance.

Request

{
"op": "WATCH_INSTANCE",
"params": {
"instance_id": "order-001"
}
}
ParameterTypeRequiredDescription
instance_idstringYesInstance to watch

Response

{
"status": "ok",
"result": {
"subscription_id": "sub-abc123",
"instance_id": "order-001",
"current_state": "pending"
}
}

Events

After subscribing, events are pushed when the instance changes:

{
"type": "event",
"subscription_id": "sub-abc123",
"event": {
"instance_id": "order-001",
"machine": "order",
"version": 1,
"event": "PAY",
"from_state": "pending",
"to_state": "paid",
"payload": {"amount": 99.99},
"timestamp": "2024-01-15T10:30:00Z",
"wal_offset": 12345
}
}

Errors

CodeDescription
INSTANCE_NOT_FOUNDInstance doesn't exist

WATCH_ALL

Subscribes to events across all instances with optional filtering.

Request

{
"op": "WATCH_ALL",
"params": {
"machines": ["order", "payment"],
"events": ["PAY", "REFUND"],
"from_states": ["pending"],
"to_states": ["paid", "refunded"],
"from_offset": 0
}
}
ParameterTypeRequiredDescription
machinesstring[]NoFilter by machine names
eventsstring[]NoFilter by event names
from_statesstring[]NoFilter by source state
to_statesstring[]NoFilter by target state
from_offsetintegerNoStart from WAL offset

All filters are optional. If omitted, all events are delivered.

Filter Behavior

Filters use AND logic between different filter types:

{
"machines": ["order"], // AND
"to_states": ["shipped"] // = orders that transition TO shipped
}

Within a filter, values use OR logic:

{
"to_states": ["shipped", "delivered"] // shipped OR delivered
}

Response

{
"status": "ok",
"result": {
"subscription_id": "sub-xyz789",
"filters": {
"machines": ["order", "payment"],
"to_states": ["paid", "refunded"]
}
}
}

Events

{
"type": "event",
"subscription_id": "sub-xyz789",
"event": {
"instance_id": "order-001",
"machine": "order",
"version": 1,
"event": "PAY",
"from_state": "pending",
"to_state": "paid",
"payload": {"amount": 99.99},
"timestamp": "2024-01-15T10:30:00Z",
"wal_offset": 12345
}
}

Replay from Offset

Use from_offset to replay events from a specific point:

{
"op": "WATCH_ALL",
"params": {
"from_offset": 10000
}
}

This delivers:

  1. All events from offset 10000 to current (historical)
  2. New events as they occur (live)

Useful for:

  • Catching up after disconnect
  • Building read models
  • Event synchronization

UNWATCH

Cancels a subscription.

Request

{
"op": "UNWATCH",
"params": {
"subscription_id": "sub-abc123"
}
}
ParameterTypeRequiredDescription
subscription_idstringYesSubscription to cancel

Response

{
"status": "ok",
"result": {
"cancelled": true
}
}

Errors

CodeDescription
NOT_FOUNDSubscription doesn't exist

Event Message Format

All subscription events have this format:

{
"type": "event",
"subscription_id": "sub-abc123",
"event": {
"instance_id": "order-001",
"machine": "order",
"version": 1,
"event": "PAY",
"from_state": "pending",
"to_state": "paid",
"payload": {"amount": 99.99},
"context": {"customer": "alice", "amount": 99.99},
"timestamp": "2024-01-15T10:30:00Z",
"wal_offset": 12345
}
}
FieldDescription
instance_idAffected instance
machineMachine name
versionMachine version
eventEvent that was applied
from_statePrevious state
to_stateNew state
payloadEvent payload
contextFull context after event
timestampEvent timestamp
wal_offsetWAL position

Examples

Watch Shipped Orders

{
"op": "WATCH_ALL",
"params": {
"machines": ["order"],
"to_states": ["shipped"]
}
}

Watch All Failures

{
"op": "WATCH_ALL",
"params": {
"events": ["FAIL", "ERROR", "REJECT"]
}
}

Replay and Follow

// Get current WAL position
{"op": "WAL_STATS"}
// Response: {"result": {"current_offset": 50000}}

// Subscribe from beginning
{"op": "WATCH_ALL", "params": {"from_offset": 0}}

// Or from a saved checkpoint
{"op": "WATCH_ALL", "params": {"from_offset": 45000}}

Multiple Subscriptions

A single connection can have multiple subscriptions:

// Watch orders
{"op": "WATCH_ALL", "params": {"machines": ["order"]}}
// Response: subscription_id = "sub-1"

// Watch payments
{"op": "WATCH_ALL", "params": {"machines": ["payment"]}}
// Response: subscription_id = "sub-2"

// Events arrive with their subscription_id
{"type": "event", "subscription_id": "sub-1", ...}
{"type": "event", "subscription_id": "sub-2", ...}

Best Practices

Track Offsets for Reliability

Store the last processed offset for recovery:

let lastOffset = loadCheckpoint();

subscribe({from_offset: lastOffset}, (event) => {
processEvent(event);
lastOffset = event.wal_offset;
saveCheckpoint(lastOffset);
});

Use Specific Filters

Narrow filters reduce network traffic and processing:

// Good - specific
{"machines": ["order"], "to_states": ["shipped"]}

// Avoid - too broad
{} // All events

Handle Disconnects

Re-subscribe from last known offset:

async function reliableSubscribe(filters, handler) {
let offset = loadOffset();

while (true) {
try {
await subscribe({...filters, from_offset: offset}, (event) => {
handler(event);
offset = event.wal_offset;
saveOffset(offset);
});
} catch (error) {
if (isDisconnect(error)) {
await sleep(1000);
continue; // Reconnect
}
throw error;
}
}
}