A non-streaming research agent that takes 15 seconds looks broken. The user sees nothing, clicks the button again, and you now have two parallel agent runs.
A streaming agent that shows "Searching the web..." → "Found 3 sources" → "Writing your answer..." feels responsive even if it takes the same 15 seconds. Users wait for progress indicators. They abandon blank screens.
The streaming code is three lines of Python. Getting the UI right took a day. This post covers both.
Two types of streaming in agents
Token streaming — stream the final LLM response word by word, like the typing effect in ChatGPT. Users see the answer forming.
Event streaming — stream the agent's intermediate steps as they happen: which tool was called, what it returned, when the model started thinking. This is more useful for agentic systems where the user wants to see the process, not just the output.
Most production agents want both: event streaming during tool calls, token streaming for the final response.
Claude token streaming
The Anthropic SDK's streaming API uses a context manager:
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1000,
messages=[{"role": "user", "content": "Explain quantum entanglement simply"}],
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
# Access the final message after streaming completes
final_message = stream.get_final_message()
print(f"\n\nTotal tokens: {final_message.usage.input_tokens + final_message.usage.output_tokens}")
For tool use with streaming:
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1000,
tools=tools,
messages=messages,
) as stream:
for event in stream:
# Tool use events
if hasattr(event, "type"):
if event.type == "content_block_start":
if hasattr(event.content_block, "type") and event.content_block.type == "tool_use":
print(f"\n[Calling: {event.content_block.name}]")
elif event.type == "content_block_delta":
if hasattr(event.delta, "type") and event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
final = stream.get_final_message()
LangGraph event streaming
LangGraph's astream_events gives you granular visibility into every node execution, tool call, and model response in the graph:
from langgraph.graph import StateGraph, MessagesState
from langgraph.prebuilt import ToolNode
from langchain_anthropic import ChatAnthropic
import asyncio
# Build a simple research graph
llm = ChatAnthropic(model="claude-sonnet-4-6")
llm_with_tools = llm.bind_tools(tools)
def agent_node(state: MessagesState):
return {"messages": [llm_with_tools.invoke(state["messages"])]}
graph = StateGraph(MessagesState)
graph.add_node("agent", agent_node)
graph.add_node("tools", ToolNode(tools))
graph.set_entry_point("agent")
graph.add_conditional_edges("agent", lambda s: "tools" if s["messages"][-1].tool_calls else "__end__")
graph.add_edge("tools", "agent")
app = graph.compile()
async def stream_agent(question: str):
print(f"Question: {question}\n")
async for event in app.astream_events(
{"messages": [("user", question)]},
version="v2",
):
event_type = event["event"]
if event_type == "on_chat_model_stream":
# Token by token for the model's output
chunk = event["data"]["chunk"]
if hasattr(chunk, "content") and isinstance(chunk.content, str):
print(chunk.content, end="", flush=True)
elif event_type == "on_tool_start":
# Tool is about to be called
tool_name = event["name"]
inputs = event["data"].get("input", {})
print(f"\n\n⚙️ {tool_name}({list(inputs.values())[0] if inputs else ''})")
elif event_type == "on_tool_end":
# Tool finished
output = event["data"].get("output", "")
if isinstance(output, str) and len(output) > 100:
output = output[:100] + "..."
print(f" ✓ Got: {output}")
elif event_type == "on_chain_start" and event["name"] == "agent":
print("\n🤔 Thinking...", end="")
print("\n")
asyncio.run(stream_agent("What are the latest developments in Pydantic AI?"))
Output looks like:
Question: What are the latest developments in Pydantic AI?
🤔 Thinking...
⚙️ search_web(pydantic-ai 2026 updates)
✓ Got: Pydantic AI v0.0.30 released with improved streaming...
🤔 Thinking...
⚙️ fetch_page(https://docs.pydantic.ai/latest/changelog/)
✓ Got: ## Changelog\n### v0.0.30\n- Added support for...
Pydantic AI has seen significant development in early 2026. The v0.0.30 release introduced...
Server-sent events (SSE) for a web API
To consume a streaming agent from a browser, expose it as an SSE endpoint. FastAPI supports this natively:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json
app = FastAPI()
async def agent_event_generator(question: str):
"""Yields SSE-formatted events from the streaming agent."""
async for event in app_graph.astream_events(
{"messages": [("user", question)]},
version="v2",
):
event_type = event["event"]
if event_type == "on_tool_start":
payload = json.dumps({
"type": "tool_start",
"tool": event["name"],
"input": event["data"].get("input", {}),
})
yield f"data: {payload}\n\n"
elif event_type == "on_tool_end":
payload = json.dumps({
"type": "tool_end",
"tool": event["name"],
})
yield f"data: {payload}\n\n"
elif event_type == "on_chat_model_stream":
chunk = event["data"]["chunk"]
if hasattr(chunk, "content") and isinstance(chunk.content, str) and chunk.content:
payload = json.dumps({
"type": "token",
"content": chunk.content,
})
yield f"data: {payload}\n\n"
await asyncio.sleep(0) # yield control to event loop
yield f"data: {json.dumps({'type': 'done'})}\n\n"
@app.post("/api/agent")
async def run_agent(request: dict):
return StreamingResponse(
agent_event_generator(request["question"]),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # disable nginx buffering
},
)
Next.js frontend
The React component consumes the SSE stream and updates state incrementally:
"use client";
import { useState, useCallback } from "react";
interface AgentEvent {
type: "tool_start" | "tool_end" | "token" | "done";
tool?: string;
input?: Record<string, unknown>;
content?: string;
}
interface ToolStatus {
name: string;
status: "running" | "done";
}
export default function AgentChat() {
const [answer, setAnswer] = useState("");
const [toolCalls, setToolCalls] = useState<ToolStatus[]>([]);
const [isStreaming, setIsStreaming] = useState(false);
const runAgent = useCallback(async (question: string) => {
setAnswer("");
setToolCalls([]);
setIsStreaming(true);
const response = await fetch("/api/agent", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ question }),
});
if (!response.body) return;
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split("\n");
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
try {
const event: AgentEvent = JSON.parse(line.slice(6));
if (event.type === "token") {
setAnswer(prev => prev + (event.content ?? ""));
} else if (event.type === "tool_start") {
setToolCalls(prev => [...prev, { name: event.tool!, status: "running" }]);
} else if (event.type === "tool_end") {
setToolCalls(prev => prev.map(t =>
t.name === event.tool && t.status === "running"
? { ...t, status: "done" }
: t
));
} else if (event.type === "done") {
setIsStreaming(false);
}
} catch {
// Malformed JSON line — skip
}
}
}
setIsStreaming(false);
}, []);
return (
<div className="max-w-2xl mx-auto p-4">
{/* Tool call status badges */}
{toolCalls.length > 0 && (
<div className="mb-4 flex flex-wrap gap-2">
{toolCalls.map((tool, i) => (
<span
key={i}
className={`text-xs px-2 py-1 rounded-full ${
tool.status === "running"
? "bg-yellow-100 text-yellow-800"
: "bg-green-100 text-green-800"
}`}
>
{tool.status === "running" ? "⚙️" : "✓"} {tool.name}
</span>
))}
</div>
)}
{/* Streaming answer */}
{answer && (
<div className="prose">
{answer}
{isStreaming && <span className="animate-pulse">▊</span>}
</div>
)}
{/* Input */}
<input
className="mt-4 w-full border rounded p-2"
placeholder="Ask anything..."
onKeyDown={(e) => {
if (e.key === "Enter" && e.currentTarget.value) {
runAgent(e.currentTarget.value);
e.currentTarget.value = "";
}
}}
/>
</div>
);
}
Claude's streaming API directly (without LangGraph)
If you're not using LangGraph, the Anthropic SDK's streaming is slightly more involved for tool use:
from anthropic import Anthropic
client = Anthropic()
async def run_streaming_agent(question: str):
messages = [{"role": "user", "content": question}]
while True:
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1000,
tools=tools,
messages=messages,
) as stream:
# Stream tokens as they arrive
for text in stream.text_stream:
yield {"type": "token", "content": text}
final = stream.get_final_message()
if final.stop_reason == "end_turn":
break
# Handle tool calls
tool_results = []
for block in final.content:
if block.type == "tool_use":
yield {"type": "tool_start", "tool": block.name}
result = execute_tool(block.name, block.input)
yield {"type": "tool_end", "tool": block.name}
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": json.dumps(result),
})
messages.append({"role": "assistant", "content": final.content})
messages.append({"role": "user", "content": tool_results})
yield {"type": "done"}
What NOT to stream
Tool call results should show as a completed event, not token by token. If your search tool returns 2,000 tokens of text, streaming those tokens to the user is noise — they don't need to watch JSON being parsed. Show "✓ Search complete" as a status badge. Stream only the model's own reasoning and response.
Also don't stream error states. If a tool fails, show the error as a complete message, not character by character.
Cost impact
Streaming has zero effect on token count. You pay exactly the same whether you stream or not — the same input and output tokens. The streaming is just how you receive the response, not how it's generated.
The LangGraph stateful agents post covers how to persist agent state across sessions — useful when you want streaming agents that can pick up interrupted conversations.



