The Python + FastAPI combination is the fastest path from "I want Claude in my backend" to something that doesn't fall over in production. FastAPI's native async support pairs cleanly with Claude's streaming API. The automatic OpenAPI docs mean you can test endpoints without writing a single curl command. And Python's type hints keep the codebase maintainable as it grows.
This isn't a toy example. Everything here — auth, rate limiting, streaming, health checks, Docker — is what you'd actually ship.
Project structure
Keep concerns separated from day one. It's much harder to untangle a monolithic main.py later:
app/
├── main.py # FastAPI app, routes
├── claude.py # Claude client wrapper
├── auth.py # API key validation
├── rate_limit.py # Rate limiting logic
└── models.py # Pydantic request/response models
Install dependencies:
pip install fastapi uvicorn anthropic slowapi python-dotenv
India developers: AICredits gives you Claude API access with INR / UPI billing — handy for production apps where you want to avoid USD billing.
App initialization with lifespan
FastAPI's lifespan context manager handles startup and shutdown cleanly. Use it to initialize the Anthropic async client once — not on every request:
# main.py
from fastapi import FastAPI
from contextlib import asynccontextmanager
import anthropic
claude_client: anthropic.AsyncAnthropic = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global claude_client
claude_client = anthropic.AsyncAnthropic()
yield
await claude_client.close()
app = FastAPI(title="Claude API Backend", lifespan=lifespan)
AsyncAnthropic is the async client — it uses httpx under the hood and works properly with FastAPI's event loop. Using the synchronous Anthropic client in async endpoints blocks the event loop and kills your throughput under load.
Pydantic models for request validation
Let Pydantic do the validation work before your business logic ever runs:
# models.py
from pydantic import BaseModel, Field
class ChatRequest(BaseModel):
message: str = Field(..., min_length=1, max_length=10000)
system_prompt: str = Field(default="You are a helpful assistant.")
model: str = Field(default="claude-sonnet-4-6")
max_tokens: int = Field(default=1024, ge=1, le=8096)
class ChatResponse(BaseModel):
text: str
input_tokens: int
output_tokens: int
max_length=10000 on the message field prevents someone from sending a 100k-token prompt through your API and running up your bill. Set it based on your actual use case — a customer support bot probably doesn't need more than 2,000 characters per message.
Basic completion endpoint
from fastapi import FastAPI, Depends
from models import ChatRequest, ChatResponse
@app.post("/chat", response_model=ChatResponse)
async def chat(
request: ChatRequest,
api_key: str = Depends(verify_api_key)
):
response = await claude_client.messages.create(
model=request.model,
max_tokens=request.max_tokens,
system=request.system_prompt,
messages=[{"role": "user", "content": request.message}],
)
text_block = next(
(b for b in response.content if b.type == "text"), None
)
if not text_block:
raise HTTPException(status_code=502, detail="No text in response")
return ChatResponse(
text=text_block.text,
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
)
Always return input_tokens and output_tokens from your API. Clients need this for cost attribution, and you'll need it for debugging when a request is slower or more expensive than expected.
Streaming with Server-Sent Events
For the FastAPI + Claude API combination, streaming is where the real user experience win happens. A non-streaming endpoint that takes 8 seconds feels broken. The same content streamed feels responsive:
from fastapi.responses import StreamingResponse
import json
@app.post("/chat/stream")
async def chat_stream(
request: ChatRequest,
api_key: str = Depends(verify_api_key)
):
async def generate():
async with claude_client.messages.stream(
model=request.model,
max_tokens=request.max_tokens,
system=request.system_prompt,
messages=[{"role": "user", "content": request.message}],
) as stream:
async for text in stream.text_stream:
yield f"data: {json.dumps({'text': text})}\n\n"
final = await stream.get_final_message()
yield f"data: {json.dumps({'done': True, 'usage': {'input': final.usage.input_tokens, 'output': final.usage.output_tokens}})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # Disable nginx buffering
},
)
The X-Accel-Buffering: no header matters if you're behind nginx. Without it, nginx buffers the entire response before sending it to the client — which defeats the entire point of streaming. Add it.
On the client side, consume the SSE stream with EventSource or a manual fetch + ReadableStream reader. The [DONE] sentinel lets clients know the stream has finished cleanly.
API key authentication
Simple API key auth for protecting your endpoints:
# auth.py
from fastapi import HTTPException, Security
from fastapi.security import APIKeyHeader
import os
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=True)
async def verify_api_key(api_key: str = Security(api_key_header)):
valid_keys = set(
k.strip()
for k in os.environ.get("VALID_API_KEYS", "").split(",")
if k.strip()
)
if not valid_keys or api_key not in valid_keys:
raise HTTPException(
status_code=403,
detail="Invalid or missing API key"
)
return api_key
Store keys in VALID_API_KEYS as a comma-separated list: key1,key2,key3. This lets you issue different keys to different clients and rotate them independently without redeploying.
For production, move key storage to a secrets manager (AWS Secrets Manager, HashiCorp Vault) instead of environment variables. But comma-separated env vars get you 90% of the way there for most use cases.
Rate limiting with slowapi
# rate_limit.py
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from fastapi import Request
from fastapi.responses import JSONResponse
limiter = Limiter(key_func=get_remote_address)
def rate_limit_handler(request: Request, exc: RateLimitExceeded):
return JSONResponse(
status_code=429,
content={"detail": f"Rate limit exceeded: {exc.detail}"},
headers={"Retry-After": "60"},
)
In main.py:
from slowapi import _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, rate_limit_handler)
Apply to routes:
@app.post("/chat")
@limiter.limit("10/minute")
async def chat(request: Request, body: ChatRequest, _=Depends(verify_api_key)):
...
10 requests per minute per IP is a reasonable starting point for a chat endpoint. Adjust based on your actual usage patterns and Claude API quotas. For authenticated users (where you know who they are), key on user ID instead of IP — get_remote_address is easy to spoof.
Background tasks for async processing
Some workloads don't need to block the HTTP response. Document analysis, batch summarization, report generation — these can run in the background while you return a task ID immediately:
import uuid
from fastapi import BackgroundTasks
from typing import Dict
task_results: Dict[str, dict] = {} # Use Redis in production
async def process_document(task_id: str, document: str):
try:
response = await claude_client.messages.create(
model="claude-sonnet-4-6",
max_tokens=2048,
messages=[{
"role": "user",
"content": f"Summarize this document:\n\n{document}"
}],
)
text = response.content[0].text if response.content else ""
task_results[task_id] = {"status": "complete", "result": text}
except Exception as e:
task_results[task_id] = {"status": "failed", "error": str(e)}
@app.post("/analyze")
async def analyze_document(
document: str,
background_tasks: BackgroundTasks,
_=Depends(verify_api_key),
):
task_id = str(uuid.uuid4())
task_results[task_id] = {"status": "processing"}
background_tasks.add_task(process_document, task_id, document)
return {"task_id": task_id, "status": "processing"}
@app.get("/analyze/{task_id}")
async def get_task_result(task_id: str, _=Depends(verify_api_key)):
result = task_results.get(task_id)
if not result:
raise HTTPException(status_code=404, detail="Task not found")
return result
The in-memory task_results dict works for a single-instance deployment. For multi-instance or persistent results, use Redis with a TTL. The pattern is identical — swap the dict for redis.set(task_id, json.dumps(result), ex=3600).
Health check endpoint
A health check that actually validates Claude connectivity:
@app.get("/health")
async def health():
try:
# Minimal test call — Haiku is fast and cheap
await claude_client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=10,
messages=[{"role": "user", "content": "ping"}],
)
return {"status": "healthy", "claude": "connected"}
except anthropic.AuthenticationError:
return JSONResponse(
status_code=503,
content={"status": "degraded", "claude": "auth_error"},
)
except Exception as e:
return JSONResponse(
status_code=503,
content={"status": "degraded", "claude": str(e)},
)
Use claude-haiku-4-5-20251001 for health checks — it's the cheapest model and responds quickly. Don't use Sonnet or Opus for this. At 10 health checks per minute, Haiku costs almost nothing.
Your load balancer or orchestrator (ECS, Kubernetes) should call /health every 30 seconds and route traffic away from unhealthy instances. Return 503 on degraded state — most load balancers treat anything non-2xx as unhealthy.
Error handling
Catch Anthropic SDK errors and convert them to meaningful HTTP status codes:
import anthropic
from fastapi import HTTPException
async def safe_claude_call(request: ChatRequest) -> ChatResponse:
try:
response = await claude_client.messages.create(
model=request.model,
max_tokens=request.max_tokens,
messages=[{"role": "user", "content": request.message}],
)
text = response.content[0].text
return ChatResponse(
text=text,
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
)
except anthropic.RateLimitError:
raise HTTPException(
status_code=429,
detail="Claude rate limit reached. Try again in a moment.",
headers={"Retry-After": "60"},
)
except anthropic.APIConnectionError:
raise HTTPException(
status_code=503,
detail="Could not reach Claude API. Check connectivity.",
)
except anthropic.AuthenticationError:
# Your ANTHROPIC_API_KEY is invalid or expired
raise HTTPException(status_code=500, detail="Internal configuration error")
except anthropic.APIStatusError as e:
if e.status_code == 529:
raise HTTPException(status_code=503, detail="Claude is overloaded. Try again.")
raise HTTPException(status_code=502, detail=f"Claude API error: {e.message}")
Don't expose raw Anthropic error messages to clients — they sometimes include internal details. Map them to clean user-facing strings.
Docker deployment
A 2-vCPU / 8GB RAM VPS handles 50-100 concurrent Claude API requests comfortably. A Hostinger KVM 2 VPS (~₹700/month) is the cheapest way to run this in production — Docker pre-installed, SSD storage, and low-latency connectivity from India.
FROM python:3.12-slim
WORKDIR /app
# Install dependencies first (cached layer)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy app code
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
--workers 4 runs four uvicorn processes. For IO-heavy workloads like Claude API calls, CPU isn't the bottleneck — use --workers $(nproc) to match your container's CPU count, or set it explicitly based on your instance type. A 2-vCPU container should run 2-4 workers.
For production, add --log-level info and pipe logs to your observability stack. uvicorn outputs structured JSON logs with --log-config pointing to a config file — worth setting up before you need to debug something in production.
Environment variable setup in docker-compose:
services:
api:
build: .
ports:
- "8000:8000"
environment:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- VALID_API_KEYS=${VALID_API_KEYS}
restart: unless-stopped
Production checklist
Before you ship:
- Disable debug mode:
app = FastAPI(debug=False)— debug mode leaks tracebacks to clients - Set CORS correctly: Don't use
allow_origins=["*"]in production. List your actual frontend domains - Request logging: Add a middleware that logs
method,path,status_code,duration_ms, andinput_tokens/output_tokensfor Claude endpoints - Timeouts: Set a request timeout on the Anthropic client —
anthropic.AsyncAnthropic(timeout=30.0)— or a single slow request can hang indefinitely - Secrets rotation: Don't hardcode
ANTHROPIC_API_KEYanywhere in your codebase. Use env vars or a secrets manager - Response size limits: Set
max_tokenson every request. Never let a caller request unlimited output
The streaming agents guide covers patterns for more complex agentic backends — tool calling loops, multi-step workflows, and the tradeoffs between blocking and streaming responses in agent architectures. If you're building something that goes beyond a simple chat endpoint, that's the next read.
For a comprehensive checklist before you flip the switch to production traffic, the AI agent production checklist covers observability, error budgets, and rollback strategies that apply directly to FastAPI + Claude deployments.
FastAPI and Claude are both well-suited for production AI backends. The patterns here — typed models, async clients, streaming SSE, proper error classes — are the same ones I'd use for anything handling real user traffic. Start with this foundation, then optimize the parts that actually become bottlenecks.



