Run multiple Claude API calls concurrently with asyncio in Python. Use AsyncAnthropic, gather parallel requests, and handle rate limits in async code.
The AsyncAnthropic client integrates natively with asyncio — no threads or run_in_executor needed.
import asyncio
import anthropic
client = anthropic.AsyncAnthropic()
async def summarize(text: str) -> str:
response = await client.messages.create(
model="claude-sonnet-4-6",
max_tokens=512,
messages=[{"role": "user", "content": f"Summarize in 2 sentences: {text}"}]
)
return response.content[0].text
asyncio.run(summarize("...your text..."))
async def batch_summarize(texts: list[str]) -> list[str]:
tasks = [summarize(t) for t in texts]
return await asyncio.gather(*tasks)
results = asyncio.run(batch_summarize(["article 1...", "article 2...", "article 3..."]))
import asyncio
import anthropic
client = anthropic.AsyncAnthropic()
semaphore = asyncio.Semaphore(10) # max 10 concurrent requests
async def safe_call(text: str) -> str:
async with semaphore:
response = await client.messages.create(
model="claude-sonnet-4-6",
max_tokens=256,
messages=[{"role": "user", "content": text}]
)
return response.content[0].text
async def process_all(items: list[str]) -> list[str]:
return await asyncio.gather(*[safe_call(item) for item in items])
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic
app = FastAPI()
client = anthropic.AsyncAnthropic()
@app.post("/chat")
async def chat(body: dict):
async def generate():
async with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": body["prompt"]}]
) as stream:
async for text in stream.text_stream:
yield text
return StreamingResponse(generate(), media_type="text/plain")
For error handling in async code, see the error handling example. To measure costs of concurrent workloads, use the Claude Cost Calculator.