kasi viswanath vandanapuBuild real‑time, file‑uploading AI agents that stream progress via SSE
Real‑world AI workflows—data pipelines, planning bots, ETL orchestration—often leave users staring at a loading spinner for 10–60 seconds. In 2025‑26 the expectation is instant feedback: step transitions, progress bars, intermediate results, and especially human‑in‑the‑loop (HITL) pauses.
In this post I’ll walk you through a production‑ready stack that:
get_stream_writer() so any node can emit clean, structured progress messages without manual callbacksthread_id / session_id
Let’s dive in.
| Component | Version / Notes |
|---|---|
| Python | 3.11+ (required for reliable async get_stream_writer() propagation) |
| FastAPI | 0.115+ (built‑in streaming & file upload support) |
| LangGraph | 0.2.70+ (latest as of Feb 2026) |
| LangChain ecosystem | 0.1.x+ |
| python-multipart |
pip install python-multipart (for UploadFile) |
| React | 18+ (TypeScript optional, shown with plain JS for brevity) |
| Uvicorn | For running the FastAPI app locally |
Gotcha: Use Python ≥ 3.11 to avoid
ContextVarissues when propagating the stream writer across async tasks.
streaming-ai-agent/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app entry point
│ ├── api/
│ │ └── router.py # /api/stream endpoint
│ ├── graph/
│ │ ├── __init__.py
│ │ ├── state.py # TypedDict for AgentState
│ │ ├── nodes/
│ │ │ ├── __init__.py
│ │ │ ├── plan_data_model.py
│ │ │ └── hitl_review.py
│ │ └── service.py # Graph builder + streaming logic
│ └── dependencies.py # (optional) shared checkpointer
├── frontend/ # React app (Vite / CRA)
│ └── src/
│ └── App.jsx # Streaming consumer example
├── requirements.txt
└── README.md
We’ll use a TypedDict with Annotated to let LangGraph know how to merge message lists.
# app/graph/state.py
from typing import Annotated, TypedDict, List
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage
class AgentState(TypedDict):
messages: Annotated[List[BaseMessage], add_messages]
user_message: str
file_content: str | None
data_model_plan: dict | None
resolved_model: dict | None
human_feedback: str | None
status: str
Each node pulls the stream writer, emits a structured event (JSON‑serialisable dict), and updates state. No real LLM calls are needed for this demo.
# app/graph/nodes/plan_data_model.py
from langgraph.config import get_stream_writer
from ..state import AgentState
def plan_data_model_node(state: AgentState) -> AgentState:
writer = get_stream_writer()
writer({
"type": "step_start",
"step": "plan_data_model",
"message": "Analyzing user request and file (if any)..."
})
# Simulated planning logic
plan = {
"entities": ["user", "order", "product"],
"relationships": ["user → order", "order → product"]
}
if state.get("file_content"):
plan["note"] = "CSV detected → adding transaction fields"
writer({
"type": "step_update",
"step": "plan_data_model",
"progress": 0.6,
"message": "Generated initial data model plan"
})
return {
**state,
"data_model_plan": plan,
"status": "planning_complete"
}
# app/graph/nodes/hitl_review.py
from langgraph.config import get_stream_writer
from langgraph.types import interrupt
from ..state import AgentState
def hitl_review_node(state: AgentState) -> AgentState:
writer = get_stream_writer()
writer({
"type": "waiting_human",
"step": "human_review",
"message": "Please review the proposed data model",
"plan": state["data_model_plan"]
})
# Raise an interrupt so the agent pauses until we resume with feedback
raise interrupt(
value={"needs_review": True}
)
# Code after this line runs only when resumed
return state
service.py)
# app/graph/service.py
import json
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from .state import AgentState
from .nodes.plan_data_model import plan_data_model_node
from .nodes.hitl_review import hitl_review_node
checkpointer = MemorySaver()
graph = StateGraph(state_schema=AgentState)
# Register nodes
graph.add_node("plan_data_model", plan_data_model_node)
graph.add_node("human_review", hitl_review_node)
# Define edges
graph.add_edge(START, "plan_data_model")
graph.add_edge("plan_data_model", "human_review")
graph.add_edge("human_review", END) # or loop back if resumed
compiled_graph = graph.compile(checkpointer=checkpointer)
async def stream_agent_process(
inputs: dict,
thread_id: str,
file_content: str | None = None
):
config = {"configurable": {"thread_id": thread_id}}
initial_state = {
"messages": [],
"user_message": inputs.get("user_message", ""),
"file_content": file_content,
"status": "starting"
}
async for event in compiled_graph.astream(
initial_state,
config=config,
stream_mode=["custom", "updates", "messages"],
subgraphs=True
):
# When `subgraphs=True`, events are tuples (namespace, payload)
if isinstance(event, tuple):
mode, payload = event
else:
mode, payload = "unknown", event
yield f"data: {json.dumps({'mode': mode, 'payload': payload})}\n\n"
Important gotchas
subgraphs=True → events become (namespace, data) tuples.["custom","updates","messages"]) to get progress events and state updates.json.dumps() so the SSE payload is safe and parseable on the client.# app/api/router.py
from fastapi import APIRouter, UploadFile, Form, HTTPException
from fastapi.responses import StreamingResponse
import json
from ..graph.service import stream_agent_process
router = APIRouter(prefix="/api")
@router.post("/stream")
async def stream_agent(
user_message: str = Form(...),
session_id: str = Form(...),
file: UploadFile | None = None
):
file_content = None
if file:
content = await file.read()
file_content = content.decode("utf-8", errors="ignore")
async def event_generator():
try:
async for chunk in stream_agent_process(
{"user_message": user_message},
thread_id=session_id,
file_content=file_content
):
yield chunk
except Exception as e:
yield f"data: {json.dumps({'error': str(e)})}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # Important for nginx proxy
}
)
Add router to your FastAPI app in main.py.
Do not set the
Content-Type: application/jsonheader on the frontend fetch; SSE expectstext/event-stream.
jsx
// frontend/src/App.jsx
import { useState } from "react";
function App() {
const [logs, setLogs] = useState([]);
const [status, setStatus] = useState("Ready");
const sessionId = crypto.randomUUID();
const handleSubmit = async (e) => {
e.preventDefault();
const form = e.target;
const message = form.message.value;
const fileInput = form.file.files?.[0];
const formData = new FormData();
formData.append("user_message", message);
formData.append("session_id", sessionId);
if (fileInput) formData.append("file", fileInput);
const response = await fetch("/api/stream", {
method: "POST",
body: formData
});
if (!response.body) return;
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split("\n");
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
try {
const data = JSON.parse(line.slice(6));
const { mode, payload } = data;
if (mode === "custom") {
switch (payload.type) {
case "step_start":
setLogs((l) => [...l, `→ Starting: ${payload.step}`]);
break;
case "step_update":
setLogs((l) => [
...l,
` Update: ${payload.message} (${Math.round(payload.progress * 100)}%)`
]);
break;
case "waiting_human":
setStatus("Waiting for your review…");
setLogs((l) => [...l