Streaming from Workflows - Python SDK
Streaming is currently in Public Preview.
Temporal's message-passing primitives — Signals, Updates, and Queries — provide the building blocks for real-time streaming from durable Workflows without additional infrastructure.
The temporalio.contrib.pubsub module provides a reusable pub/sub implementation on top of these primitives.
See Streaming from Workflows for a conceptual overview of the streaming problem and the architecture patterns this page implements.
Prerequisites
- Python 3.12 or later
temporalioSDK installed- An LLM provider with a streaming API (the examples use the OpenAI Responses API)
Use PubSubMixin in your Workflow
The code that follows is part of a working streaming agents sample.
Add PubSubMixin as a base class to your Workflow. The mixin registers the Signal handler that receives batched events from Activities, the Update handler that external clients use to long-poll for new events, and the Query handler that returns the current stream offset.
Call self.init_pubsub() in your __init__ method to initialize the mixin's internal state.
Pass prior_state if your Workflow accepts previously accumulated pub/sub state like when continuing from a prior session.
from temporalio import workflow
from temporalio.contrib.pubsub import PubSubMixin
@workflow.defn
class AnalyticsWorkflow(PubSubMixin):
@workflow.init
def __init__(self, state: WorkflowState) -> None:
self.init_pubsub(prior_state=state.pubsub_state)
The Workflow can publish events directly for lifecycle events that originate inside the Workflow rather than inside an Activity, such as tool call start and complete:
import json
EVENTS_TOPIC = "events"
def _emit(self, event_type: str, **data) -> None:
event = {
"type": event_type,
"timestamp": workflow.now().isoformat(),
"data": data,
}
self.publish(EVENTS_TOPIC, json.dumps(event).encode())
self.publish() is provided by PubSubMixin and appends the event to the durable in-Workflow buffer, incrementing the global offset. Any blocked poll Update handlers are woken up immediately.
Publish events from an Activity
Inside an Activity, create a PubSubClient and use it to batch and publish streaming events to the Workflow via Signal.
from temporalio import activity
from temporalio.contrib.pubsub import PubSubClient
@activity.defn
async def model_call(input: ModelCallInput) -> ModelCallResult:
pubsub = PubSubClient.create(batch_interval=2.0)
async with pubsub:
async with openai_client.responses.stream(**kwargs) as stream:
async for event in stream:
activity.heartbeat()
pubsub.publish(EVENTS_TOPIC, translate(event))
# Priority flush for significant events such as end of reasoning block
if is_thinking_complete(event):
pubsub.publish(EVENTS_TOPIC, payload, priority=True)
return ModelCallResult(...)
The async with pubsub context manager starts a background flush timer and guarantees a final flush on exit. No manual asyncio.wait() or cancellation logic is needed.
How batching works:
- Events are buffered and flushed to the Workflow via Signal at the
batch_interval(default: 2 seconds). This is a Nagle-like strategy: accumulate small events, send in batches. - Calling
pubsub.publish(..., priority=True)triggers an immediate flush for the current batch. Use this for events that are significant on their own, like at the end of a reasoning block or an error. - The
async with pubsubblock guarantees a final flush on exit so no events are dropped when the Activity completes.
The pattern generalizes to any LLM provider with a streaming API. The translate() function in the example converts provider-specific stream events into your application's event schema.
Subscribe from an external client
The BFF (or any external caller) subscribes to the Workflow's event stream using PubSubClient.subscribe(). This is an async iterator that long-polls the Workflow via Updates internally.
from temporalio.contrib.pubsub import PubSubClient
pubsub = PubSubClient.create(client, session_id)
start_offset = await pubsub.get_offset()
async def event_stream():
async for item in pubsub.subscribe(
topics=[EVENTS_TOPIC], from_offset=start_offset
):
event = json.loads(item.data)
yield f"data: {json.dumps(event)}\n\n"
if event.get("type") == "AGENT_COMPLETE":
return
return StreamingResponse(event_stream(), media_type="text/event-stream")
How the long-poll works:
- Each call to
pubsub.subscribe()sends an Update to the Workflow with the client's current offset. - The Update handler inside
PubSubMixincallsworkflow.wait_condition()to block until new events are available at or beyond that offset. - When events arrive via a Signal from an Activity or via
self.publish()in the Workflow, thewait_conditionreturns and the Update handler delivers the new batch. - The subscribe iterator re-polls automatically, tracking the offset across iterations.
Because events are stored durably inside the Workflow, the client can reconnect at any time, even after a BFF restart and resume from its last known offset without losing events.
Per-turn event indexing
Events use a global offset that increments across all turns within a session.
Capturing the offset before signaling start_turn ensures the SSE stream delivers only events from the current turn.
pubsub = PubSubClient.create(client, session_id)
start_offset = await pubsub.get_offset()
# Signal the Workflow to start a new turn
await handle.signal(AnalyticsWorkflow.start_turn, StartTurnInput(message=text))
# Subscribe from start_offset onward — only events from this turn
async for item in pubsub.subscribe(topics=[EVENTS_TOPIC], from_offset=start_offset):
...
On reconnect, pass the client's last known offset to subscribe().
The Workflow replays events from that point forward, so no events are lost even if the BFF restarts mid-stream.
Sample application
The streaming agents sample is a complete chat-based analytics agent that demonstrates this pattern end-to-end. The agent queries a Chinook music store database (SQLite) using SQL, Python, and shell tools. It reasons about results, recovers from errors, and streams its progress to a React frontend via SSE.
The sample includes:
backend-temporal/— FastAPI proxy and Temporal Worker usingPubSubMixinbackend-ephemeral/— A non-Temporal backend for direct comparisonfrontend/— React frontend that renders streamed events in real time
To test it out, clone the repo and run the app.