Streaming
By default, the API waits for the full response before returning it. With streaming enabled (stream: true), tokens are delivered in real-time via Server-Sent Events (SSE) as the model generates them.
Streamed responses are stored server-side just like non-streamed ones; you can retrieve or continue them later using previous_response_id.
Basic Streaming
-
curl
-
Python (OpenAI SDK)
-
Python (PydanticAI)
-
Python (LangGraph)
curl -N -X POST $BASE_URL/v1/responses \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $AA_TOKEN" \
-d '{
"model": "qwen3-32b-tool",
"input": "Write a short poem about the ocean.",
"instructions": "You are a creative writer. Keep responses short.",
"stream": true
}'
The response is a stream of data: lines:
data: {"type":"response.created","response":{"id":"resp_abc123",...}}
data: {"type":"response.output_item.added","item":{"type":"message",...}}
data: {"type":"response.output_text.delta","delta":"The "}
data: {"type":"response.output_text.delta","delta":"ocean "}
data: {"type":"response.output_text.delta","delta":"waves..."}
data: {"type":"response.completed","response":{...}}
data: [DONE]
stream = client.responses.create(
model="qwen3-32b-tool",
input="Write a short poem about the ocean.",
instructions="You are a creative writer. Keep responses short.",
stream=True,
)
for event in stream:
if event.type == "response.output_text.delta":
print(event.delta, end="", flush=True)
elif event.type == "response.completed":
response_id = event.response.id
PydanticAI handles streaming internally; agent.run() returns the complete result. You don’t need to manage SSE events yourself:
from pydantic_ai import Agent
from pydantic_ai.models.openai import OpenAIResponsesModel
agent = Agent(
model=OpenAIResponsesModel("qwen3-32b-tool", provider=provider),
system_prompt="You are a creative writer. Keep responses short.",
)
result = await agent.run("Write a short poem about the ocean.")
print(result.output)
|
PydanticAI uses streaming under the hood when communicating with the Responses API, but exposes a simple synchronous-style interface. For fine-grained SSE control, use the OpenAI SDK directly. |
Stream tokens directly from the underlying ChatOpenAI and capture the final response id from the last chunk. Use this id to chain the next turn (streamed or not).
from langchain_core.messages import HumanMessage, SystemMessage
messages = [
SystemMessage("You are a creative writer. Keep responses short."),
HumanMessage("Write a short poem about the ocean."),
]
response_id = None
for chunk in llm.stream(messages):
print(chunk.text, end="", flush=True)
# The final chunk carries the completed response metadata.
if chunk.response_metadata.get("id"):
response_id = chunk.response_metadata["id"]
print() # newline after the stream
# Pass `response_id` as `previous_response_id` on the next call to chain.
|
To stream from inside a multi-node graph instead of the LLM directly, use |
SSE Event Types
Core events (always present):
| Event Type | Description |
|---|---|
|
Stream started. Contains the initial response object with |
|
Response processing has begun. |
|
A new output item (message, reasoning, tool call) was added. |
|
A new content part was added to a message. |
|
A text chunk. The |
|
Stream finished. Contains the full response object. |
|
Terminal signal: the stream is closed. |
Additional events (present depending on model capabilities and tool usage):
| Event Type | Description |
|---|---|
|
A reasoning/chain-of-thought chunk (if the model supports it). |
|
Incremental function call arguments (for function tool calling). |
|
Incremental MCP tool call arguments. |
|
An MCP tool call finished executing. |
Streaming with Conversation History
Streaming works seamlessly with previous_response_id. The streamed response is stored and can be used for further chaining.
-
curl
-
Python (OpenAI SDK)
# Turn 1 (non-streaming)
curl -X POST $BASE_URL/v1/responses \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $AA_TOKEN" \
-d '{
"model": "qwen3-32b-tool",
"input": "My favorite color is blue.",
"instructions": "You are a helpful assistant."
}'
# → {"id": "resp_001", ...}
# Turn 2 (streaming), chains from Turn 1
curl -N -X POST $BASE_URL/v1/responses \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $AA_TOKEN" \
-d '{
"model": "qwen3-32b-tool",
"input": "What is my favorite color?",
"previous_response_id": "resp_001",
"stream": true
}'
# Streams: "Your favorite color is blue."
# Turn 1 (non-streaming)
response1 = client.responses.create(
model="qwen3-32b-tool",
input="My favorite color is blue.",
)
# Turn 2 (streaming), chains from Turn 1
stream = client.responses.create(
model="qwen3-32b-tool",
input="What is my favorite color?",
previous_response_id=response1.id,
stream=True,
)
response_id_2 = None
for event in stream:
if event.type == "response.output_text.delta":
print(event.delta, end="", flush=True)
elif event.type == "response.completed":
response_id_2 = event.response.id
# Turn 3 can chain from the streamed response
response3 = client.responses.create(
model="qwen3-32b-tool",
input="Can you remind me what we discussed about colors?",
previous_response_id=response_id_2,
)
Retrieving a Streamed Response
Streamed responses are persisted just like non-streamed ones. You can retrieve them as JSON or replay them as SSE:
# Retrieve as JSON
curl $BASE_URL/v1/responses/resp_abc123 \
-H "Authorization: Bearer $AA_TOKEN"
# Replay as SSE stream
curl $BASE_URL/v1/responses/resp_abc123?stream=true \
-H "Authorization: Bearer $AA_TOKEN"