LangGraph streaming with workflow streams#1500
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds an experimental integration path between LangGraph’s get_stream_writer() streaming mechanism and Temporal “Workflow Streams”, so activity-wrapped LangGraph nodes can publish incremental chunks that external subscribers can tail via WorkflowStreamClient.
Changes:
- Add
streaming_topicandstreaming_batch_intervaloptions toLangGraphPluginand thread them into the activity wrapper. - Extend LangGraph runtime config restoration to accept an optional
stream_writer, enabling activity invocations to install a per-call writer. - Add tests covering (1) activity-side
get_stream_writer()streaming to workflow streams and (2) workflow-side forwarding ofastream()chunks to an external topic.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
tests/contrib/langgraph/test_streaming.py |
Adds workflow-stream-backed streaming tests (activity-side writer + workflow-side republish of astream() chunks). |
temporalio/contrib/langgraph/_plugin.py |
Introduces plugin-level streaming configuration and passes it into the activity wrapper. |
temporalio/contrib/langgraph/_langgraph_config.py |
Adds stream_writer plumbing to set_langgraph_config so activities can inject a writer into the reconstructed Runtime. |
temporalio/contrib/langgraph/_activity.py |
Uses WorkflowStreamClient.from_within_activity() to back the injected writer with workflow-stream topic publishes when streaming is enabled. |
temporalio/contrib/langgraph/__init__.py |
Minor export ordering change (no functional changes). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
The LangGraph interceptor now checks at workflow start that a WorkflowStream has been registered (via the publish signal handler) when the plugin was configured with streaming_topic. Misconfigured workflows fail fast with a clear error pointing at @workflow.init, instead of silently producing no-op streams.
Wrap execute_in='workflow' nodes with wrap_workflow(), which mirrors wrap_activity() and (when streaming_topic is set) overrides the LangGraph Runtime's stream_writer to publish synchronously to the in-workflow WorkflowStream — no signal round-trip. Parametrized the streaming test over execute_in to cover both paths.
Expand the README streaming section with a self-contained snippet (plugin, WorkflowStream in __init__, external subscriber loop), an explicit callout that streaming_topic only covers stream_mode='custom' with an astream() bridge example for other modes, and at-least-once retry semantics. Add an Args section to LangGraphPlugin's docstring covering all constructor parameters.
Pick the raw user function from runnable.func instead of LangGraph's async runnable.afunc adapter, which wraps sync nodes in loop.run_in_executor — that's incompatible with the workflow event loop. wrap_activity now schedules sync funcs on a thread via asyncio.to_thread so the activity loop stays free for the streaming flusher, with stream_writer calls marshaled back to the loop thread to keep the workflow_streams client's asyncio.Event safe. Parametrize the streaming test over (execute_in, sync/async).
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| chunks: list[dict[str, Any]] = [] | ||
| async for item in ws_client.topic("tokens", type=dict).subscribe( | ||
| from_offset=0, | ||
| poll_cooldown=timedelta(0), |
| chunks: list[dict[str, Any]] = [] | ||
| async for item in ws_client.topic("astream", type=dict).subscribe( | ||
| from_offset=0, | ||
| poll_cooldown=timedelta(0), |
| runtime = set_langgraph_config( | ||
| input.langgraph_config, stream_writer=effective_writer | ||
| ) | ||
| kwargs = dict(input.kwargs) | ||
| if "runtime" in signature(func).parameters: | ||
| kwargs["runtime"] = runtime |
DABH
left a comment
There was a problem hiding this comment.
All my critiques have been addressed - thanks!
Test plan