Streaming AI Agent with FastAPI & LangGraph (2025‑26 Guide)

# fastapi# langgraph# streaming# react
Streaming AI Agent with FastAPI & LangGraph (2025‑26 Guide)kasi viswanath vandanapu

Build real‑time, file‑uploading AI agents that stream progress via SSE

Streaming AI Agent with FastAPI & LangGraph (2025‑26 Guide)

Real‑world AI workflows—data pipelines, planning bots, ETL orchestration—often leave users staring at a loading spinner for 10–60 seconds. In 2025‑26 the expectation is instant feedback: step transitions, progress bars, intermediate results, and especially human‑in‑the‑loop (HITL) pauses.

In this post I’ll walk you through a production‑ready stack that:

  • Accepts a user message plus an optional file upload via POST
  • Runs a LangGraph agent with planning → data‑model resolution → HITL review
  • Streams every meaningful step and progress update to the frontend using Server‑Sent Events (SSE)
  • Uses get_stream_writer() so any node can emit clean, structured progress messages without manual callbacks
  • Persists sessions via thread_id / session_id
  • Comes with a minimal React front‑end that consumes the stream

Let’s dive in.

Tech Stack

Component Version / Notes
Python 3.11+ (required for reliable async get_stream_writer() propagation)
FastAPI 0.115+ (built‑in streaming & file upload support)
LangGraph 0.2.70+ (latest as of Feb 2026)
LangChain ecosystem 0.1.x+
python-multipart pip install python-multipart (for UploadFile)
React 18+ (TypeScript optional, shown with plain JS for brevity)
Uvicorn For running the FastAPI app locally

Gotcha: Use Python ≥ 3.11 to avoid ContextVar issues when propagating the stream writer across async tasks.

Project Structure

streaming-ai-agent/
├── app/
│   ├── __init__.py
│   ├── main.py               # FastAPI app entry point
│   ├── api/
│   │   └── router.py         # /api/stream endpoint
│   ├── graph/
│   │   ├── __init__.py
│   │   ├── state.py          # TypedDict for AgentState
│   │   ├── nodes/
│   │   │   ├── __init__.py
│   │   │   ├── plan_data_model.py
│   │   │   └── hitl_review.py
│   │   └── service.py        # Graph builder + streaming logic
│   └── dependencies.py       # (optional) shared checkpointer
├── frontend/                 # React app (Vite / CRA)
│   └── src/
│       └── App.jsx           # Streaming consumer example
├── requirements.txt
└── README.md
Enter fullscreen mode Exit fullscreen mode

Step 1: Define the State

We’ll use a TypedDict with Annotated to let LangGraph know how to merge message lists.

# app/graph/state.py
from typing import Annotated, TypedDict, List
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage

class AgentState(TypedDict):
    messages: Annotated[List[BaseMessage], add_messages]
    user_message: str
    file_content: str | None
    data_model_plan: dict | None
    resolved_model: dict | None
    human_feedback: str | None
    status: str
Enter fullscreen mode Exit fullscreen mode

Step 2: Create Nodes with Progress Emission

Each node pulls the stream writer, emits a structured event (JSON‑serialisable dict), and updates state. No real LLM calls are needed for this demo.

# app/graph/nodes/plan_data_model.py
from langgraph.config import get_stream_writer
from ..state import AgentState

def plan_data_model_node(state: AgentState) -> AgentState:
    writer = get_stream_writer()

    writer({
        "type": "step_start",
        "step": "plan_data_model",
        "message": "Analyzing user request and file (if any)..."
    })

    # Simulated planning logic
    plan = {
        "entities": ["user", "order", "product"],
        "relationships": ["user → order", "order → product"]
    }
    if state.get("file_content"):
        plan["note"] = "CSV detected → adding transaction fields"

    writer({
        "type": "step_update",
        "step": "plan_data_model",
        "progress": 0.6,
        "message": "Generated initial data model plan"
    })

    return {
        **state,
        "data_model_plan": plan,
        "status": "planning_complete"
    }
Enter fullscreen mode Exit fullscreen mode
# app/graph/nodes/hitl_review.py
from langgraph.config import get_stream_writer
from langgraph.types import interrupt
from ..state import AgentState

def hitl_review_node(state: AgentState) -> AgentState:
    writer = get_stream_writer()

    writer({
        "type": "waiting_human",
        "step": "human_review",
        "message": "Please review the proposed data model",
        "plan": state["data_model_plan"]
    })

    # Raise an interrupt so the agent pauses until we resume with feedback
    raise interrupt(
        value={"needs_review": True}
    )

    # Code after this line runs only when resumed
    return state
Enter fullscreen mode Exit fullscreen mode

Step 3: Build & Stream the Graph (service.py)

# app/graph/service.py
import json
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from .state import AgentState
from .nodes.plan_data_model import plan_data_model_node
from .nodes.hitl_review import hitl_review_node

checkpointer = MemorySaver()

graph = StateGraph(state_schema=AgentState)

# Register nodes
graph.add_node("plan_data_model", plan_data_model_node)
graph.add_node("human_review", hitl_review_node)

# Define edges
graph.add_edge(START, "plan_data_model")
graph.add_edge("plan_data_model", "human_review")
graph.add_edge("human_review", END)  # or loop back if resumed

compiled_graph = graph.compile(checkpointer=checkpointer)

async def stream_agent_process(
    inputs: dict,
    thread_id: str,
    file_content: str | None = None
):
    config = {"configurable": {"thread_id": thread_id}}

    initial_state = {
        "messages": [],
        "user_message": inputs.get("user_message", ""),
        "file_content": file_content,
        "status": "starting"
    }

    async for event in compiled_graph.astream(
        initial_state,
        config=config,
        stream_mode=["custom", "updates", "messages"],
        subgraphs=True
    ):
        # When `subgraphs=True`, events are tuples (namespace, payload)
        if isinstance(event, tuple):
            mode, payload = event
        else:
            mode, payload = "unknown", event

        yield f"data: {json.dumps({'mode': mode, 'payload': payload})}\n\n"
Enter fullscreen mode Exit fullscreen mode

Important gotchas

  • subgraphs=True → events become (namespace, data) tuples.
  • Combine modes (["custom","updates","messages"]) to get progress events and state updates.
  • Use json.dumps() so the SSE payload is safe and parseable on the client.

Step 4: FastAPI Streaming Endpoint with File Upload

# app/api/router.py
from fastapi import APIRouter, UploadFile, Form, HTTPException
from fastapi.responses import StreamingResponse
import json
from ..graph.service import stream_agent_process

router = APIRouter(prefix="/api")

@router.post("/stream")
async def stream_agent(
    user_message: str = Form(...),
    session_id: str = Form(...),
    file: UploadFile | None = None
):
    file_content = None
    if file:
        content = await file.read()
        file_content = content.decode("utf-8", errors="ignore")

    async def event_generator():
        try:
            async for chunk in stream_agent_process(
                {"user_message": user_message},
                thread_id=session_id,
                file_content=file_content
            ):
                yield chunk
        except Exception as e:
            yield f"data: {json.dumps({'error': str(e)})}\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"  # Important for nginx proxy
        }
    )
Enter fullscreen mode Exit fullscreen mode

Add router to your FastAPI app in main.py.

Do not set the Content-Type: application/json header on the frontend fetch; SSE expects text/event-stream.

Step 5: React Frontend – Consume the Stream


jsx
// frontend/src/App.jsx
import { useState } from "react";

function App() {
  const [logs, setLogs] = useState([]);
  const [status, setStatus] = useState("Ready");
  const sessionId = crypto.randomUUID();

  const handleSubmit = async (e) => {
    e.preventDefault();
    const form = e.target;
    const message = form.message.value;
    const fileInput = form.file.files?.[0];

    const formData = new FormData();
    formData.append("user_message", message);
    formData.append("session_id", sessionId);
    if (fileInput) formData.append("file", fileInput);

    const response = await fetch("/api/stream", {
      method: "POST",
      body: formData
    });

    if (!response.body) return;

    const reader = response.body.getReader();
    const decoder = new TextDecoder();

    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      const chunk = decoder.decode(value);
      const lines = chunk.split("\n");

      for (const line of lines) {
        if (!line.startsWith("data: ")) continue;
        try {
          const data = JSON.parse(line.slice(6));
          const { mode, payload } = data;

          if (mode === "custom") {
            switch (payload.type) {
              case "step_start":
                setLogs((l) => [...l, `→ Starting: ${payload.step}`]);
                break;
              case "step_update":
                setLogs((l) => [
                  ...l,
                  `  Update: ${payload.message} (${Math.round(payload.progress * 100)}%)`
                ]);
                break;
              case "waiting_human":
                setStatus("Waiting for your review…");
                setLogs((l) => [...l
Enter fullscreen mode Exit fullscreen mode