Skip to main content

Python Client

The official Python client library for rstmdb.

Repository: github.com/rstmdb/rstmdb-py

Installation

pip install rstmdb

For development:

pip install rstmdb[dev]

Requirements: Python 3.9+

Features

  • Async-first design using asyncio
  • Full feature parity with the Rust client
  • Complete type hints with Pydantic models
  • TLS/mTLS support
  • Event streaming via async iterators

Quick Start

import asyncio
from rstmdb import Client

async def main():
# Connect to server
client = Client("localhost", 7401, token="my-secret-token")
await client.connect()

# Define a state machine
await client.put_machine("order", 1, {
"states": ["pending", "paid", "shipped", "delivered"],
"initial": "pending",
"transitions": [
{"from": "pending", "event": "PAY", "to": "paid"},
{"from": "paid", "event": "SHIP", "to": "shipped"},
{"from": "shipped", "event": "DELIVER", "to": "delivered"}
]
})

# Create an instance
instance = await client.create_instance(
machine="order",
version=1,
instance_id="order-001",
context={"customer": "alice", "total": 99.99}
)
print(f"Created: {instance.id} in state {instance.state}")

# Apply events
result = await client.apply_event(
instance_id="order-001",
event="PAY",
payload={"payment_id": "pay-123"}
)
print(f"Transitioned: {result.previous_state} -> {result.current_state}")

await client.close()

asyncio.run(main())

Connection

Basic Connection

from rstmdb import Client

client = Client("localhost", 7401, token="my-secret-token")
await client.connect()

TLS Connection

client = Client(
host="secure.example.com",
port=7401,
token="my-secret-token",
tls=True,
ca_cert="/path/to/ca.pem"
)
await client.connect()

Mutual TLS (mTLS)

client = Client(
host="secure.example.com",
port=7401,
token="my-secret-token",
tls=True,
ca_cert="/path/to/ca.pem",
client_cert="/path/to/client.pem",
client_key="/path/to/client-key.pem"
)
await client.connect()

Development Mode (Insecure)

# Skip TLS verification - development only!
client = Client(
host="localhost",
port=7401,
tls=True,
insecure=True
)

API Reference

Machine Operations

put_machine

Register a state machine definition.

await client.put_machine(
name="order",
version=1,
definition={
"states": ["pending", "paid", "shipped"],
"initial": "pending",
"transitions": [
{"from": "pending", "event": "PAY", "to": "paid"},
{"from": "paid", "event": "SHIP", "to": "shipped"}
]
}
)

get_machine

Retrieve a machine definition.

machine = await client.get_machine("order", version=1)
print(machine.definition.states)
print(machine.definition.initial)

list_machines

List all machines.

machines = await client.list_machines()
for m in machines:
print(f"{m.name}: {m.versions}")

Instance Operations

create_instance

Create a new instance.

instance = await client.create_instance(
machine="order",
version=1,
instance_id="order-001",
context={"customer": "alice"}
)

get_instance

Get instance state and context.

instance = await client.get_instance("order-001")
print(f"State: {instance.state}")
print(f"Context: {instance.context}")

delete_instance

Delete an instance.

await client.delete_instance("order-001")

Event Operations

apply_event

Apply an event to trigger a state transition.

result = await client.apply_event(
instance_id="order-001",
event="PAY",
payload={"amount": 99.99}
)

print(f"Previous: {result.previous_state}")
print(f"Current: {result.current_state}")

Streaming

watch_all

Subscribe to events with filtering.

async with client.watch_all(
machines=["order"],
to_states=["shipped", "delivered"]
) as stream:
async for event in stream.events():
print(f"{event.instance_id}: {event.event} -> {event.to_state}")

watch_instance

Watch a specific instance.

async with client.watch_instance("order-001") as stream:
async for event in stream.events():
print(f"Event: {event.event}, New state: {event.to_state}")

System Operations

ping

Health check.

await client.ping()

info

Get server information.

info = await client.info()
print(f"Version: {info.version}")
print(f"Instances: {info.stats.instances}")

Error Handling

from rstmdb import (
Client,
NotFoundError,
InvalidTransitionError,
AuthenticationError,
ConnectionError
)

try:
await client.apply_event("order-001", "PAY")
except NotFoundError:
print("Instance not found")
except InvalidTransitionError as e:
print(f"Cannot apply event from current state: {e}")
except AuthenticationError:
print("Authentication failed")
except ConnectionError:
print("Connection lost")

Examples

Order Processing

import asyncio
from rstmdb import Client

async def process_order(client: Client, order_id: str):
# Create order
await client.create_instance(
machine="order",
version=1,
instance_id=order_id,
context={"items": ["item-1", "item-2"], "total": 149.99}
)

# Process payment
await client.apply_event(order_id, "PAY", {"payment_id": "pay-123"})

# Ship order
await client.apply_event(order_id, "SHIP", {"tracking": "1Z999"})

# Get final state
order = await client.get_instance(order_id)
print(f"Order {order_id} is now: {order.state}")

async def main():
client = Client("localhost", 7401)
await client.connect()
await process_order(client, "order-001")
await client.close()

asyncio.run(main())

Event Consumer

import asyncio
from rstmdb import Client

async def consume_events():
client = Client("localhost", 7401)
await client.connect()

print("Listening for shipped orders...")

async with client.watch_all(
machines=["order"],
to_states=["shipped"]
) as stream:
async for event in stream.events():
print(f"Order {event.instance_id} shipped!")
# Send notification, update external system, etc.

asyncio.run(consume_events())

Resources