LLM responses are fundamentally different from traditional API responses. A typical database query returns in under 100ms. A GPT-4 completion for a long prompt can take 15-30 seconds to fully generate. Users will abandon a blank screen after 2-3 seconds. The solution: stream tokens as they arrive, so users see output immediately.
This article is a complete technical implementation: how to stream LLM completions through a FastAPI backend to a browser or mobile client using Server-Sent Events (SSE) and WebSockets. We cover token-by-token rendering, error handling, multi-turn conversation streaming, and production considerations like backpressure and connection management.
The Streaming Architecture
Client (browser/app)
↕ SSE / WebSocket
FastAPI Backend
↕ HTTP streaming
OpenAI API / local LLM
The backend acts as a streaming proxy: it opens a streaming connection to the LLM provider, receives tokens as they arrive, and forwards them to the connected client. The client renders each token as it arrives, producing the characteristic "typing" effect.
Two transport options:
| Protocol | Use Case |
|---|---|
| Server-Sent Events (SSE) | Unidirectional streams: assistant-only replies, status updates |
| WebSockets | Bidirectional: user sends messages, assistant streams back |
For most chat UIs, SSE per request is simpler and sufficient. WebSockets make sense when you need real-time bidirectional communication (collaborative editing, multi-user sessions, live interruption).
Setup
uv add fastapi[standard] openai httpx
You'll also need an OpenAI API key (or swap in any OpenAI-compatible endpoint: Anthropic, local Ollama, etc.).
Streaming with Server-Sent Events
SSE is an HTTP-based protocol: the client makes a GET or POST request, the server responds with Content-Type: text/event-stream, and keeps the connection open, pushing events as newline-delimited data: lines.
Backend: SSE streaming endpoint
# routers/chat.py
import json
import asyncio
from typing import AsyncIterator
from fastapi import APIRouter, HTTPException
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI
from pydantic import BaseModel
router = APIRouter(prefix="/chat", tags=["chat"])
client = AsyncOpenAI() # reads OPENAI_API_KEY from environment
class ChatRequest(BaseModel):
messages: list[dict] # [{"role": "user", "content": "..."}]
model: str = "gpt-4o"
max_tokens: int = 2048
async def token_stream(request: ChatRequest) -> AsyncIterator[str]:
"""Yields SSE-formatted lines from an OpenAI streaming completion."""
try:
stream = await client.chat.completions.create(
model=request.model,
messages=request.messages,
max_tokens=request.max_tokens,
stream=True,
)
async for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
# SSE format: "data: {json}\n\n"
yield f"data: {json.dumps({'token': delta.content})}\n\n"
if chunk.choices[0].finish_reason:
yield f"data: {json.dumps({'done': True, 'finish_reason': chunk.choices[0].finish_reason})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'error': str(e)})}\n\n"
@router.post("/stream")
async def stream_chat(request: ChatRequest):
return StreamingResponse(
token_stream(request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # Disables Nginx buffering
"Connection": "keep-alive",
},
)
The X-Accel-Buffering: no header is critical when behind Nginx. without it, Nginx buffers the response and clients receive tokens in large batches rather than one at a time.
Frontend: consuming SSE
async function streamChat(messages: Message[]) {
const response = await fetch("/api/v1/chat/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ messages }),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n\n");
buffer = lines.pop() ?? ""; // Keep incomplete last chunk
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
const data = JSON.parse(line.slice(6));
if (data.token) {
appendToken(data.token); // update your UI
} else if (data.done) {
onStreamComplete(data.finish_reason);
} else if (data.error) {
onStreamError(data.error);
}
}
}
}
React hook for streaming
// hooks/useStreamingChat.ts
import { useState, useCallback } from "react";
export function useStreamingChat() {
const [response, setResponse] = useState("");
const [isStreaming, setIsStreaming] = useState(false);
const sendMessage = useCallback(async (messages: Message[]) => {
setResponse("");
setIsStreaming(true);
try {
const res = await fetch("/api/v1/chat/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ messages }),
});
const reader = res.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value, { stream: true });
for (const line of text.split("\n\n")) {
if (!line.startsWith("data: ")) continue;
const data = JSON.parse(line.slice(6));
if (data.token) setResponse(prev => prev + data.token);
if (data.done) break;
}
}
} finally {
setIsStreaming(false);
}
}, []);
return { response, isStreaming, sendMessage };
}
Streaming with WebSockets
For bidirectional chat (send a message, receive a streamed reply, send another), WebSockets are cleaner:
# routers/ws_chat.py
import json
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from openai import AsyncOpenAI
router = APIRouter()
client = AsyncOpenAI()
@router.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
await websocket.accept()
conversation_history: list[dict] = []
try:
while True:
# Receive the user's message
raw = await websocket.receive_text()
data = json.loads(raw)
user_message = data.get("content", "")
conversation_history.append({"role": "user", "content": user_message})
# Stream the assistant's reply token by token
full_response = ""
stream = await client.chat.completions.create(
model="gpt-4o",
messages=conversation_history,
stream=True,
)
async for chunk in stream:
token = chunk.choices[0].delta.content or ""
if token:
full_response += token
await websocket.send_text(json.dumps({"type": "token", "content": token}))
# Signal end of stream
await websocket.send_text(json.dumps({"type": "done"}))
conversation_history.append({"role": "assistant", "content": full_response})
except WebSocketDisconnect:
pass # Client disconnected cleanly
except Exception as e:
await websocket.send_text(json.dumps({"type": "error", "message": str(e)}))
await websocket.close()
WebSocket client (browser)
const ws = new WebSocket("wss://yourapi.com/ws/chat");
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === "token") appendToken(data.content);
else if (data.type === "done") onComplete();
else if (data.type === "error") onError(data.message);
};
function sendMessage(content: string) {
ws.send(JSON.stringify({ content }));
}
Production Patterns
Timeout and cancellation
Long LLM completions can run for 30+ seconds. You need timeout handling and client disconnection detection:
import asyncio
@router.post("/stream")
async def stream_chat(request: ChatRequest):
async def generate_with_timeout():
try:
async with asyncio.timeout(60): # 60 second max
async for chunk in token_stream(request):
yield chunk
except asyncio.TimeoutError:
yield f"data: {json.dumps({'error': 'Generation timed out'})}\n\n"
return StreamingResponse(
generate_with_timeout(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
Rate limiting per user
from collections import defaultdict
import time
# Simple in-process rate limiter (use Redis in production for multi-instance)
request_counts: dict[str, list[float]] = defaultdict(list)
def check_rate_limit(user_id: str, max_requests: int = 10, window: int = 60):
now = time.time()
timestamps = [t for t in request_counts[user_id] if now - t < window]
if len(timestamps) >= max_requests:
raise HTTPException(status_code=429, detail="Rate limit exceeded")
timestamps.append(now)
request_counts[user_id] = timestamps
Streaming with authentication
Wrap the streaming endpoint with your JWT dependency:
@router.post("/stream")
async def stream_chat(
request: ChatRequest,
current_user: User = Depends(get_current_user), # auth check before streaming
):
return StreamingResponse(token_stream(request), media_type="text/event-stream", ...)
Persisting conversations
Store messages after the stream completes:
async def token_stream_with_persistence(
request: ChatRequest,
db: AsyncSession,
user: User,
) -> AsyncIterator[str]:
full_response = ""
async for chunk_text in token_stream(request):
# Parse the token from the SSE chunk
if chunk_text.startswith("data: "):
data = json.loads(chunk_text[6:])
if "token" in data:
full_response += data["token"]
yield chunk_text
# Persist after stream completes
await save_conversation(db, user.id, request.messages, full_response)
Backpressure and Client Disconnect Handling
FastAPI + Starlette automatically detect client disconnections and stop the generator. But if you're doing expensive setup before streaming (calling the LLM API), you might want to check disconnect early:
@router.post("/stream")
async def stream_chat(request: ChatRequest, req: Request):
async def generate():
stream = await client.chat.completions.create(...)
async for chunk in stream:
if await req.is_disconnected():
break # Stop generating if client disconnected
token = chunk.choices[0].delta.content or ""
if token:
yield f"data: {json.dumps({'token': token})}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream", ...)
Common Pitfalls
1. Nginx buffering: always set X-Accel-Buffering: no. Without it, tokens are buffered and delivered in large batches. SSE feels broken.
2. Not flushing: StreamingResponse flushes automatically, but if you're behind a proxy, verify end-to-end streaming with curl --no-buffer.
3. Encoding errors in tokens: LLM tokens can split multi-byte Unicode characters. Buffer tokens until you have a complete character before rendering if you see garbled text with emoji or non-Latin scripts.
4. Memory leaks from conversation history: if you store conversation_history in-process per WebSocket connection, it grows indefinitely for long sessions. Cap it or summarise older turns.
5. Not handling model errors: the LLM API can return content policy violations, rate limit errors, or context length exceeded errors mid-stream. Always wrap the stream in try/except and send an error event.
FAQ
Q: Which is better for a chat UI. SSE or WebSockets? SSE for most chat UIs. It's simpler: standard HTTP, works through proxies without special config, and browsers handle reconnection automatically. WebSockets add complexity unless you need the client to send multiple messages on one connection simultaneously.
Q: How do I stream from a local LLM (Ollama, LlamaCpp)?
Ollama exposes an OpenAI-compatible API. Point AsyncOpenAI(base_url="http://localhost:11434/v1") at it and the same code works.
Q: How do I stream to a React Native mobile app? Use WebSockets. SSE support on React Native requires a polyfill; WebSockets are natively supported and simpler.
Q: How do I handle multiple concurrent streams per user? Each request/WebSocket connection is independent. FastAPI handles concurrency via the async event loop. Track active connections per user if you need to enforce limits.
Q: What if I'm using Anthropic or a different provider?
The Anthropic SDK has identical streaming patterns (async for chunk in stream). The token extraction differs slightly; check their docs for the chunk format.
Conclusion
Streaming LLM responses transforms the perceived performance of AI features. Users see output in under a second rather than waiting 15-30 seconds for a complete response. and that UX difference is the difference between a feature that gets used and one that gets abandoned.
SSE is the right default for most chat UIs. WebSockets make sense for real-time bidirectional applications. In both cases, the FastAPI pattern is clean, async, and production-ready.
See also: Python FastAPI: The Complete Guide. the foundation this article builds on.