A single Claude API call takes between 1 and 10 seconds depending on model and output length. If you're processing 100 documents sequentially, that's potentially 16 minutes of wall-clock time. With async Python, the same job finishes in under 2 minutes. This is one of the easiest wins in LLM application performance — and the patterns are straightforward once you understand why the naive approach fails.
The naive approach: call the API in a loop, wait for each response before starting the next. It works. It's just slow. Async Python lets you fire off multiple requests concurrently, yield control back to the event loop while waiting for I/O, and process responses as they arrive. For LLM apps where each call spends 95% of its time waiting for the model — not executing Python code — this is exactly the right tool.
India developers: AICredits provides Claude and OpenAI API access with INR / UPI billing — useful when you're running high-volume async jobs and want to pay in rupees.
Pattern 1: Parallel calls with asyncio.gather
The simplest upgrade from sequential to parallel:
import asyncio
import anthropic
client = anthropic.AsyncAnthropic()
async def classify(text: str) -> str:
response = await client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=50,
messages=[{"role": "user", "content": f"Classify as positive/negative/neutral: {text}"}]
)
return response.content[0].text.strip()
async def classify_batch(texts: list[str]) -> list[str]:
tasks = [classify(text) for text in texts]
return await asyncio.gather(*tasks) # All run in parallel
# Usage
results = asyncio.run(classify_batch(["Great product!", "Terrible service", "It's okay"]))
Benchmark: 10 classifications sequentially ≈ 25 seconds. With asyncio.gather ≈ 4 seconds.
Notice anthropic.AsyncAnthropic() — the async client, not the sync one. The sync client's .create() blocks the thread. The async client's .create() is a coroutine that yields control to the event loop while waiting for the network response. That's the whole mechanism.
asyncio.gather() takes coroutines and runs them concurrently, returning results in the same order as the input. If any coroutine raises, the exception propagates immediately (unless you pass return_exceptions=True).
Pattern 2: Rate limiting with Semaphore
Don't use bare asyncio.gather with large batches. Fire 100 concurrent requests at Claude and you'll hit rate limits within seconds. The error message is something like RateLimitError: Error code: 429 — You have exceeded your rate limit.
The fix is a Semaphore — a counter that limits how many coroutines can be in flight at once:
import asyncio
import anthropic
client = anthropic.AsyncAnthropic()
SEMAPHORE = asyncio.Semaphore(5) # Max 5 concurrent requests
async def safe_classify(text: str) -> str:
async with SEMAPHORE:
response = await client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=50,
messages=[{"role": "user", "content": f"Classify: {text}"}]
)
return response.content[0].text.strip()
async def classify_batch(texts: list[str]) -> list[str]:
tasks = [safe_classify(text) for text in texts]
return await asyncio.gather(*tasks)
Semaphore(5) means at most 5 requests are active simultaneously. The 6th coroutine that reaches async with SEMAPHORE will wait until one of the first five finishes. You're not slowing things down unnecessarily — you're preventing 429 errors that would force retries and actually slow you down more.
What's the right concurrency limit? For Claude's API on a standard tier, 5-10 concurrent requests is safe. Check your rate limit tier in the Anthropic console — it's expressed in requests-per-minute and tokens-per-minute. Divide by expected call duration to get a rough concurrency ceiling.
Pattern 3: Streaming with async iteration
Streaming returns tokens as they're generated, which makes the UI feel responsive even for long responses. With the async client:
async def stream_response(prompt: str) -> str:
full_text = ""
async with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
full_text += text
return full_text
stream.text_stream is an async generator — it yields each text delta as it arrives. The total time-to-last-token is the same as a non-streaming call, but time-to-first-token is typically under a second. For a user staring at a screen, that difference matters enormously.
In a FastAPI app, you'd return a StreamingResponse using an async generator, forwarding each chunk directly to the client.
Pattern 4: Batch processing with progress tracking
For long-running jobs, you want to see progress:
import asyncio
from tqdm.asyncio import tqdm
async def process_all(items: list[str], max_concurrent: int = 5) -> list[str]:
semaphore = asyncio.Semaphore(max_concurrent)
async def process_one(item: str) -> str:
async with semaphore:
response = await client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=200,
messages=[{"role": "user", "content": item}]
)
return response.content[0].text
tasks = [process_one(item) for item in items]
return await tqdm.gather(*tasks, desc="Processing")
tqdm.asyncio.tqdm.gather is a drop-in replacement for asyncio.gather that shows a progress bar. It updates as tasks complete, not as they start. Install with pip install tqdm.
The semaphore is defined inside the function here — a clean pattern when you want per-call-site concurrency control rather than a global limit.
Pattern 5: Timeout and cancellation
LLM API calls can hang. Network hiccups, model overload, very long outputs — any of these can cause a request to stall for 60+ seconds. In production, you need timeouts:
async def with_timeout(prompt: str, timeout_seconds: float = 30.0) -> str | None:
try:
response = await asyncio.wait_for(
client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
),
timeout=timeout_seconds
)
return response.content[0].text
except asyncio.TimeoutError:
print(f"Request timed out after {timeout_seconds}s")
return None
asyncio.wait_for wraps a coroutine with a timeout. When the timeout fires, it cancels the underlying task and raises asyncio.TimeoutError. The None return here is one approach — you could also raise, log, or put the item back in a retry queue depending on your use case.
For streaming responses, wrap the entire async with client.messages.stream(...) block with asyncio.wait_for.
Pattern 6: Mixing sync and async
You'll often need to call async code from a sync context — a CLI script, a Django view, or code that was written before you went async. The options:
import asyncio
# Run async code from sync context — creates a new event loop
def sync_wrapper(prompt: str) -> str:
return asyncio.run(async_classify(prompt))
# Or use a persistent event loop (slightly more efficient for repeated calls)
loop = asyncio.new_event_loop()
def sync_with_persistent_loop(prompt: str) -> str:
return loop.run_until_complete(async_classify(prompt))
The critical warning: never call asyncio.run() from within an already-running event loop. In Jupyter notebooks, the event loop is always running — use await directly or asyncio.ensure_future(). In FastAPI, route handlers that are async def already run inside the event loop — use await directly, don't wrap with asyncio.run().
If you need to call a sync function from async code (say, a blocking database call), use loop.run_in_executor() to push it to a thread pool and not block the event loop.
Pattern 7: Retry with exponential backoff
Rate limit errors are transient. The right response is to wait and retry, not to fail:
import asyncio
import random
import anthropic
async def with_retry(prompt: str, max_retries: int = 3) -> str:
for attempt in range(max_retries):
try:
response = await client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
except anthropic.RateLimitError:
if attempt == max_retries - 1:
raise
wait = (2 ** attempt) + random.uniform(0, 1)
print(f"Rate limited. Retrying in {wait:.1f}s (attempt {attempt + 1}/{max_retries})")
await asyncio.sleep(wait)
Exponential backoff with jitter: attempt 0 waits ~1s, attempt 1 waits ~2-3s, attempt 2 waits ~4-5s. The random.uniform(0, 1) jitter prevents thundering herd — if you have 10 workers all hitting the rate limit simultaneously, you don't want them all retrying at exactly the same moment.
Combine this with the Semaphore pattern: the semaphore limits concurrency, and retry handles the occasional error that gets through.
Real benchmark: 100 document summarizations
Here's what async patterns actually do for throughput on a realistic job:
| Approach | Time | Notes |
|---|---|---|
| Sequential (sync) | 8m 20s | One at a time |
| asyncio.gather (no limit) | 45s | Rate limit errors, retries inflate cost |
| asyncio.gather + Semaphore(5) | 1m 40s | Clean run, no errors |
| asyncio.gather + Semaphore(10) | 58s | Near the rate limit ceiling |
The cost is identical across all approaches — you're making the same number of API calls. The difference is wall-clock time and reliability. Semaphore(10) on a paid Anthropic tier is a reasonable default for batch jobs. Drop to Semaphore(5) if you're seeing rate limit errors.
Common mistakes
Not closing the async client. The AsyncAnthropic client maintains an HTTP connection pool. If you don't close it, you'll see resource warnings and eventually connection leaks. Use async with anthropic.AsyncAnthropic() as client: or call await client.aclose() explicitly.
Creating a new client per request. Each client instantiation opens connections. Create one client at module level and reuse it. This is a common mistake when converting sync code to async — the sync client is cheap to create, so developers treat it as throwaway.
Calling asyncio.run() inside FastAPI. FastAPI runs async route handlers inside an event loop. Calling asyncio.run() inside a running loop raises RuntimeError: This event loop is already running. Use await directly in async route handlers.
Not using asyncio.gather return order. asyncio.gather preserves input order in results — the 3rd result corresponds to the 3rd input, regardless of which completed first. This is usually what you want. If you need to process results as they complete (e.g., to stream progress), use asyncio.as_completed() instead.
Putting it together
The pattern for a production batch processing job looks like this: one shared AsyncAnthropic client, a Semaphore to cap concurrency, exponential backoff on rate limit errors, tqdm.gather for progress visibility, and asyncio.run() at the entry point.
That combination handles 99% of async LLM batch processing needs. For more complex patterns — like processing results through a queue as they arrive, or running agents in parallel where each agent makes its own sequence of LLM calls — the same primitives compose cleanly.
If you're building these patterns into a FastAPI service, production FastAPI patterns for Claude covers the full setup including lifespan management for the async client. And if cost optimization is the goal, the Anthropic Batch API offers 50% cost reduction for jobs that don't need real-time results — worth checking before building a full async pipeline for overnight processing jobs.



