Skip to content

Python SDK

The Python SDK enables developers to build custom surfaces and interact with streams programmatically.

Requirements: Python 3.10+

Terminal window
pip install wheelhouse-sdk
# or
uv add wheelhouse-sdk

These four examples cover the full SDK surface. Each is self-contained and copy-pasteable.

# 1. Define a type — no broker needed
import wheelhouse
from dataclasses import dataclass
@wheelhouse.register_type("myapp.Greeting")
@dataclass
class Greeting:
text: str = ""
msg = Greeting(text="hello")
print(msg) # Greeting(text='hello')
# 2. Connect and publish — broker required
import asyncio, wheelhouse
from wheelhouse.types import TextMessage
async def main():
async with await wheelhouse.connect() as conn:
await conn.publish("main", TextMessage(content="hello"))
asyncio.run(main())
# 3. Test without a broker — works in CI without Podman or ZMQ
import asyncio, wheelhouse
from wheelhouse.types import TextMessage
async def main():
conn = await wheelhouse.connect(mock=True)
received = []
async def handler(msg):
received.append(msg)
await conn.subscribe("main", handler)
await conn.publish("main", TextMessage(content="hello"))
assert received[0].content == "hello"
asyncio.run(main())
# 4. Error handling — catch by type, not by string
from wheelhouse.errors import PublishTimeout
import wheelhouse
async def main():
async with await wheelhouse.connect() as conn:
try:
await conn.publish_confirmed("main", TextMessage(content="hi"), timeout=5.0)
except PublishTimeout:
# handle timeout — retry or publish to an error stream
pass
asyncio.run(main())

All SDK operations require a connection. The connect() function is async and returns a Connection object:

import asyncio
import wheelhouse
async def main():
conn = await wheelhouse.connect()
# ... use conn ...
await conn.close()
asyncio.run(main())

Use the async with pattern for automatic cleanup:

import asyncio
import wheelhouse
async def main():
async with await wheelhouse.connect() as conn:
pass # connection auto-closes when block exits
asyncio.run(main())
ParameterTypeDefaultDescription
endpointstr | NoneNoneWheelhouse endpoint URL
mockboolFalseUse in-memory mock (no Wheelhouse needed)

Endpoint resolution priority:

  1. Explicit endpoint= parameter
  2. WH_URL environment variable
  3. Default: tcp://127.0.0.1:5555
# Explicit endpoint
conn = await wheelhouse.connect(endpoint="tcp://192.168.1.10:5555")
# Environment variable
# export WH_URL=tcp://host.docker.internal:5555
conn = await wheelhouse.connect()

If Wheelhouse is not running, connect() raises wheelhouse.ConnectionError with an actionable message.

import asyncio
import wheelhouse
from wheelhouse.types import TextMessage
async def main():
async with await wheelhouse.connect() as conn:
await conn.publish("my-stream", TextMessage(content="Hello!"))
asyncio.run(main())

For confirmed delivery (waits for WAL acknowledgement):

async with await wheelhouse.connect() as conn:
try:
await conn.publish_confirmed("my-stream", msg, timeout=5.0)
except wheelhouse.PublishTimeout:
print("Delivery not confirmed within timeout")

Register an async handler function for a stream:

import asyncio
import wheelhouse
from wheelhouse.types import TextMessage
async def main():
async with await wheelhouse.connect() as conn:
async def on_message(msg):
print(f"Received: {msg.content}")
await conn.subscribe("notifications", on_message)
# Keep running to receive messages
await asyncio.sleep(60)
asyncio.run(main())

Multiple handlers can be registered on the same stream. Each receives every message.

Custom types are Python dataclasses with a namespace prefix. The @register_type decorator validates the type at decoration time:

import wheelhouse
from dataclasses import dataclass
@wheelhouse.register_type("myapp.SensorReading")
@dataclass
class SensorReading:
sensor_id: str = ""
value: float = 0.0
unit: str = ""

Namespace rules:

  • Format: <namespace>.<TypeName> (exactly one dot)
  • The wheelhouse.* namespace is reserved for core types (ADR-004)
  • Classes must have at least one typed annotation

Invalid registrations raise immediately:

ErrorCause
InvalidTypeNameErrorMissing dot, empty namespace, or multiple dots
ReservedNamespaceErrorUsing wheelhouse.* namespace
TypeErrorClass has no typed fields

Import from wheelhouse.types:

from wheelhouse.types import TextMessage, FileMessage, TypedMessage
TypeFieldsDescription
TextMessagecontent, user_id, stream_namePlain text message
FileMessagefilename, content, mime_type, user_idFile/binary payload
TypedMessagetype_name, data, raw_bytes, is_knownReceived message wrapper

TypedMessage wraps received messages: if the type is known, data contains the deserialized object and is_known is True. For unknown types, raw_bytes contains the raw payload.

Note: Protobuf types like SkillInvocation, SkillResult, SkillProgress, and CronEvent are defined in proto/wheelhouse/v1/ but are Rust-side only in the current MVP. They will be exposed in the Python SDK in a future release.

The Surface base class wraps a Connection and provides publish, publish_confirmed, and subscribe methods:

import asyncio
import wheelhouse
from wheelhouse.types import TextMessage
class NotificationSurface(wheelhouse.Surface):
async def on_message(self, message):
print(f"Notification: {message}")
async def send(self, text: str):
await self.publish("notifications", TextMessage(content=text))
async def main():
async with await wheelhouse.connect() as conn:
surface = NotificationSurface(conn)
await conn.subscribe("notifications", surface.on_message)
await surface.send("Hello from my surface!")
asyncio.run(main())

All SDK errors inherit from WheelhouseError and include a code attribute referenced in ERRORS.md:

import wheelhouse
try:
conn = await wheelhouse.connect()
except wheelhouse.ConnectionError as e:
print(f"Error [{e.code}]: {e}")

Catch errors by type directly from the wheelhouse namespace:

ExceptionCodeWhen
wheelhouse.ConnectionErrorCONNECTION_ERRORWheelhouse not running or unreachable
wheelhouse.PublishTimeoutPUBLISH_TIMEOUTpublish_confirmed() timed out
wheelhouse.StreamNotFoundSTREAM_NOT_FOUNDRequested stream does not exist

Additional errors from wheelhouse.errors:

ExceptionCodeWhen
InvalidTypeNameErrorINVALID_TYPE_NAMEBad format in @register_type
ReservedNamespaceErrorRESERVED_NAMESPACEUsing wheelhouse.* namespace
RegistryFullErrorREGISTRY_FULLType registry at capacity

Use mock=True to develop and test without a running Wheelhouse instance or Podman installation:

import asyncio
import wheelhouse
from wheelhouse.types import TextMessage
async def main():
conn = await wheelhouse.connect(mock=True)
received = []
async def on_message(msg):
received.append(msg)
await conn.subscribe("test-stream", on_message)
await conn.publish("test-stream", TextMessage(content="hello"))
assert len(received) == 1
assert received[0].content == "hello"
await conn.close()
asyncio.run(main())

In mock mode, published messages are automatically echoed to subscribers in the same session.

conn = await wheelhouse.connect(mock=True)
# Inspect published messages
published = conn.get_published("my-stream") # list of (stream, message) tuples
# Get all messages as TypedMessage objects
messages = conn.get_messages()
# Reset state between test scenarios
conn.reset()

Import from wheelhouse.fixtures for convenient test setup:

from wheelhouse.fixtures import mock_connection
from wheelhouse.types import TextMessage
async def test_my_handler(mock_connection):
received = []
async def handler(msg):
received.append(msg)
await mock_connection.subscribe("stream", handler)
await mock_connection.publish("stream", TextMessage(content="test"))
assert len(received) == 1

Available fixtures: mock_connection, mock_surface (identical, use mock_surface when wrapping in a Surface subclass).

SymbolTypeDescription
connect(endpoint, mock)async functionConnect to Wheelhouse
SurfaceclassBase class for custom surfaces
register_type(name)decoratorRegister a custom Protobuf type
ConnectionErrorexceptionWheelhouse not reachable
PublishTimeoutexceptionConfirmed publish timed out
StreamNotFoundexceptionStream does not exist
MethodDescription
publish(stream, message)Fire-and-forget publish
publish_confirmed(stream, message, timeout)Publish with WAL ack
subscribe(stream, handler)Register async message handler
register_type(type_name, type_class)Instance-level type registration
close()Close connection and release resources

All Connection methods plus:

MethodDescription
get_published(stream)Get published messages, optionally filtered
get_messages()Get all messages as TypedMessage list
simulate_message(stream, message)Inject a message to handlers
reset()Clear all mock state
clear()Clear all mock state (alias)