Skip to main content

Streaming from Workflows - Python SDK

Support, stability, and dependency info

Streaming is currently in Public Preview.

Temporal's message-passing primitives — Signals, Updates, and Queries — provide the building blocks for real-time streaming from durable Workflows without additional infrastructure. The temporalio.contrib.pubsub module provides a reusable pub/sub implementation on top of these primitives.

See Streaming from Workflows for a conceptual overview of the streaming problem and the architecture patterns this page implements.

Prerequisites

  • Python 3.12 or later
  • temporalio SDK installed
  • An LLM provider with a streaming API (the examples use the OpenAI Responses API)

Use PubSubMixin in your Workflow

info

The code that follows is part of a working streaming agents sample.

Add PubSubMixin as a base class to your Workflow. The mixin registers the Signal handler that receives batched events from Activities, the Update handler that external clients use to long-poll for new events, and the Query handler that returns the current stream offset.

Call self.init_pubsub() in your __init__ method to initialize the mixin's internal state. Pass prior_state if your Workflow accepts previously accumulated pub/sub state like when continuing from a prior session.

from temporalio import workflow
from temporalio.contrib.pubsub import PubSubMixin

@workflow.defn
class AnalyticsWorkflow(PubSubMixin):
@workflow.init
def __init__(self, state: WorkflowState) -> None:
self.init_pubsub(prior_state=state.pubsub_state)

The Workflow can publish events directly for lifecycle events that originate inside the Workflow rather than inside an Activity, such as tool call start and complete:

import json

EVENTS_TOPIC = "events"

def _emit(self, event_type: str, **data) -> None:
event = {
"type": event_type,
"timestamp": workflow.now().isoformat(),
"data": data,
}
self.publish(EVENTS_TOPIC, json.dumps(event).encode())

self.publish() is provided by PubSubMixin and appends the event to the durable in-Workflow buffer, incrementing the global offset. Any blocked poll Update handlers are woken up immediately.

Publish events from an Activity

Inside an Activity, create a PubSubClient and use it to batch and publish streaming events to the Workflow via Signal.

from temporalio import activity
from temporalio.contrib.pubsub import PubSubClient

@activity.defn
async def model_call(input: ModelCallInput) -> ModelCallResult:
pubsub = PubSubClient.create(batch_interval=2.0)

async with pubsub:
async with openai_client.responses.stream(**kwargs) as stream:
async for event in stream:
activity.heartbeat()
pubsub.publish(EVENTS_TOPIC, translate(event))
# Priority flush for significant events such as end of reasoning block
if is_thinking_complete(event):
pubsub.publish(EVENTS_TOPIC, payload, priority=True)

return ModelCallResult(...)

The async with pubsub context manager starts a background flush timer and guarantees a final flush on exit. No manual asyncio.wait() or cancellation logic is needed.

How batching works:

  • Events are buffered and flushed to the Workflow via Signal at the batch_interval (default: 2 seconds). This is a Nagle-like strategy: accumulate small events, send in batches.
  • Calling pubsub.publish(..., priority=True) triggers an immediate flush for the current batch. Use this for events that are significant on their own, like at the end of a reasoning block or an error.
  • The async with pubsub block guarantees a final flush on exit so no events are dropped when the Activity completes.

The pattern generalizes to any LLM provider with a streaming API. The translate() function in the example converts provider-specific stream events into your application's event schema.

Subscribe from an external client

The BFF (or any external caller) subscribes to the Workflow's event stream using PubSubClient.subscribe(). This is an async iterator that long-polls the Workflow via Updates internally.

from temporalio.contrib.pubsub import PubSubClient

pubsub = PubSubClient.create(client, session_id)
start_offset = await pubsub.get_offset()

async def event_stream():
async for item in pubsub.subscribe(
topics=[EVENTS_TOPIC], from_offset=start_offset
):
event = json.loads(item.data)
yield f"data: {json.dumps(event)}\n\n"
if event.get("type") == "AGENT_COMPLETE":
return

return StreamingResponse(event_stream(), media_type="text/event-stream")

How the long-poll works:

  • Each call to pubsub.subscribe() sends an Update to the Workflow with the client's current offset.
  • The Update handler inside PubSubMixin calls workflow.wait_condition() to block until new events are available at or beyond that offset.
  • When events arrive via a Signal from an Activity or via self.publish() in the Workflow, the wait_condition returns and the Update handler delivers the new batch.
  • The subscribe iterator re-polls automatically, tracking the offset across iterations.

Because events are stored durably inside the Workflow, the client can reconnect at any time, even after a BFF restart and resume from its last known offset without losing events.

Per-turn event indexing

Events use a global offset that increments across all turns within a session. Capturing the offset before signaling start_turn ensures the SSE stream delivers only events from the current turn.

pubsub = PubSubClient.create(client, session_id)
start_offset = await pubsub.get_offset()

# Signal the Workflow to start a new turn
await handle.signal(AnalyticsWorkflow.start_turn, StartTurnInput(message=text))

# Subscribe from start_offset onward — only events from this turn
async for item in pubsub.subscribe(topics=[EVENTS_TOPIC], from_offset=start_offset):
...

On reconnect, pass the client's last known offset to subscribe(). The Workflow replays events from that point forward, so no events are lost even if the BFF restarts mid-stream.

Sample application

The streaming agents sample is a complete chat-based analytics agent that demonstrates this pattern end-to-end. The agent queries a Chinook music store database (SQLite) using SQL, Python, and shell tools. It reasons about results, recovers from errors, and streams its progress to a React frontend via SSE.

The sample includes:

  • backend-temporal/ — FastAPI proxy and Temporal Worker using PubSubMixin
  • backend-ephemeral/ — A non-Temporal backend for direct comparison
  • frontend/ — React frontend that renders streamed events in real time

To test it out, clone the repo and run the app.