Most AI API usage fits into two categories. Real-time calls — user asks, system responds, latency matters. And bulk async workloads — nightly classification, dataset annotation, content enrichment at scale — where you don't need the answer in 2 seconds, you need 100,000 answers by tomorrow morning.
Anthropic's Message Batches API is built for the second category, and it's 50% cheaper than standard pricing. If you're doing significant batch work and not using it, you're paying double for every run.
What the Batch API actually is
You submit up to 10,000 requests in one API call. Anthropic processes them asynchronously, with results available within 24 hours (in practice, most batches complete in 1-6 hours). The 50% discount applies to both input and output tokens. Same models, same parameters, same content policies as the standard endpoint — just a different entry point and a polling loop instead of an immediate response.
There's no extra setup, no separate approval, no minimum volume. If you have an ANTHROPIC_API_KEY, you can use the Batch API today.
When to use it (and when not to)
Good fits:
- Nightly product catalog classification (50K items, runs overnight, results ready before morning sync)
- Legal document summarization for discovery (hundreds of contracts, no time pressure)
- Dataset annotation for fine-tuning (1,000 examples need labels before next training run)
- SEO meta description generation at scale (10K product pages, one batch job)
- Async content moderation pipeline (review queue that processes overnight)
Wrong tool for:
- User-facing chat where the user is waiting for a response
- Real-time agent tool calls where the agent needs the result to take the next action
- Stock alerts or any time-sensitive notification
- Streaming UIs where the user sees tokens as they generate
The decision is simple: if there's a human waiting for the result, use real-time. If there's a cron job waiting for the result, use batch.
Cost comparison
At current Claude Sonnet 4.6 pricing, the 50% discount compounds fast with volume:
| Tokens per day | Real-time cost | Batch cost | Monthly savings |
|---|---|---|---|
| 1M tokens | ~$3/day | ~$1.50/day | ~$45/month |
| 10M tokens | ~$30/day | ~$15/day | ~$450/month |
| 100M tokens | ~$300/day | ~$150/day | ~$4,500/month |
At 10M tokens/day — a reasonable number for a product catalog classification job — you're saving $450/month just by switching the endpoint. The only cost is adding a polling loop and waiting a few hours for results.
Complete Python implementation
This is production-ready code for a product classification batch job. The pattern generalizes to any bulk workload.
import os
import time
import anthropic
client = anthropic.Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])
def build_batch_requests(items: list[dict]) -> list[dict]:
return [
{
"custom_id": item["id"],
"params": {
"model": "claude-sonnet-4-6",
"max_tokens": 200,
"messages": [
{
"role": "user",
"content": (
"Classify this product description into exactly one category "
"from [Electronics, Clothing, Food, Home, Sports, Other].\n\n"
f"Product: {item['description']}\n\n"
"Respond with only the category name."
)
}
]
}
}
for item in items
]
def submit_batch(items: list[dict]) -> str:
requests = build_batch_requests(items)
batch = client.messages.batches.create(requests=requests)
print(f"Batch submitted: {batch.id} ({len(requests)} requests)")
return batch.id
def wait_for_batch(batch_id: str, poll_interval: int = 60) -> None:
while True:
status = client.messages.batches.retrieve(batch_id)
counts = status.request_counts
print(
f"Status: {status.processing_status} — "
f"processing: {counts.processing}, "
f"succeeded: {counts.succeeded}, "
f"errored: {counts.errored}"
)
if status.processing_status == "ended":
return
time.sleep(poll_interval)
def collect_results(batch_id: str) -> dict[str, str | None]:
results = {}
for result in client.messages.batches.results(batch_id):
if result.result.type == "succeeded":
results[result.custom_id] = result.result.message.content[0].text.strip()
else:
results[result.custom_id] = None
print(f"Failed: {result.custom_id} — {result.result.error.type}")
return results
if __name__ == "__main__":
items = [
{"id": f"prod-{i}", "description": f"Sample product {i}"}
for i in range(1000)
]
batch_id = submit_batch(items)
wait_for_batch(batch_id)
results = collect_results(batch_id)
succeeded = sum(1 for v in results.values() if v is not None)
print(f"Completed: {succeeded}/{len(items)} succeeded")
A few things worth noting in this implementation. The custom_id field is your responsibility — it's the join key between your input items and the batch results. Use your actual record IDs here, not sequential integers, so you can map results back to your database without a positional lookup.
The poll_interval of 60 seconds is conservative. For a 5,000-item batch that will take 2-3 hours, polling every 60 seconds is fine. For a 200-item batch that might complete in 10 minutes, you might drop it to 20 seconds.
Handling partial failures
Batches don't fail atomically. Individual requests can error while others succeed, and you need to handle both.
The errored results have a result.error.type field. The most common errors you'll encounter:
invalid_request— yourparamsobject has a schema error (wrong field name, missing required field)max_tokenstoo low — you setmax_tokens: 50but the model needed 200 tokens to complete the response- Content policy — the input triggered a content filter; you'll need to inspect and clean those items
Always collect and log every failed custom_id. Then retry:
def retry_failures(
original_items: list[dict],
results: dict[str, str | None]
) -> dict[str, str | None]:
failed_ids = {k for k, v in results.items() if v is None}
if not failed_ids:
return results
retry_items = [item for item in original_items if item["id"] in failed_ids]
print(f"Retrying {len(retry_items)} failed items")
batch_id = submit_batch(retry_items)
wait_for_batch(batch_id)
retry_results = collect_results(batch_id)
return {**results, **retry_results}
One retry pass handles most transient failures. If items are still failing after a retry, they're either content policy issues or malformed inputs — log them for manual review rather than retrying indefinitely.
The hybrid architecture: batch + real-time together
Here's the pattern I use for production systems that need both:
Real-time user-facing calls go through AICredits.in. You get access to Claude and all major LLMs through a single API key with INR billing and UPI payment — no international card required. This is the right path for chatbots, agents, and any feature where a user is waiting for a response.
Indian developers: access Claude and all major LLMs through AICredits.in — INR billing, UPI top-up, no international card.
Batch async workloads go through the native Anthropic SDK directly to the Batch API endpoint. The 50% discount only applies when you hit /v1/messages/batches directly — it's not available through proxy providers.
from openai import OpenAI
import os
realtime_client = OpenAI(
api_key=os.environ["AICREDITS_API_KEY"],
base_url="https://api.aicredits.in/v1"
)
import anthropic
batch_client = anthropic.Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])
Concrete example: a product catalog SaaS. When a merchant adds a new product, you generate the description in real-time (AICredits.in, low latency, user is watching). Every night at 2am, you re-classify all 50K items in the catalog against your updated taxonomy (Anthropic Batch API, 50% off, results ready before the morning sync job).
Two clients, two use cases, each optimized for what it's doing.
Webhook alternative to polling
If you don't want to run a polling loop — or if your batch jobs kick off from a serverless function that can't sleep — Anthropic supports webhook notifications when a batch completes.
Set up a FastAPI endpoint:
from fastapi import FastAPI, Request
app = FastAPI()
@app.post("/batch-complete")
async def handle_batch_complete(request: Request):
payload = await request.json()
batch_id = payload["batch_id"]
print(f"Batch {batch_id} completed — fetching results")
results = collect_results(batch_id)
return {"status": "processed", "count": len(results)}
Then provide the webhook URL when creating the batch. Anthropic POSTs to it when processing status reaches ended. You get zero polling overhead and your handler only runs once.
Constraints and limits
A few hard limits to know before you design around the Batch API:
- 10,000 requests per batch — for larger sets, submit multiple batches sequentially or in parallel
- 29-day expiry — batches and their results expire after 29 days; download results before then
- No streaming — you get the full response for each item when the batch completes, not token-by-token
- Same content policies — the Batch API isn't a way to bypass safety systems; same rules apply
- Standard model access — whatever models your account can access via the standard API are available in batch
For most bulk workloads, the 10,000 request limit per batch is fine — split your 50K items into five batches of 10K each and submit them all at once. They'll process in parallel.
Stack the cost reductions
Batch pricing is one lever. For maximum cost efficiency on high-volume workloads, combine it with prompt caching: if your batch jobs include a large shared system prompt or document, caching that prompt reduces input token costs by another 80-90% on top of the 50% batch discount.
The math compounds: 50% batch discount × 90% cache savings on the shared context = you're paying roughly 5-15% of what you'd pay for naive real-time calls with the same prompts.
See context caching explained for the implementation details — the cache_control parameter works in batch requests exactly the same way it does in real-time requests.
Getting started
If you have an existing bulk workload running through the standard completions endpoint, the migration looks like this:
- Replace your direct
client.messages.create()calls withbuild_batch_requests()to construct the request list - Replace the call with
client.messages.batches.create(requests=requests) - Add the
wait_for_batch()polling loop - Add the
collect_results()collector that maps results back bycustom_id - Add failure handling and retry logic
Total migration time for a simple classification job: under an hour. The 50% cost reduction is permanent from the first run.
The Batch API is one of the most underused cost levers in production AI systems. If you have nightly jobs running against Claude, there's no reason to be on standard pricing.



