Understand Anthropic API rate limits: RPM, TPM, and tier thresholds. How to read rate limit headers, request tier upgrades, and implement retry logic.
Understanding rate limits helps you architect your application to avoid 429 errors and optimize throughput.
| Tier | Requirement | Sonnet 4.6 RPM | Sonnet 4.6 TPM |
|---|---|---|---|
| Free | Create account | 5 | 25,000 |
| Tier 1 | $5 credit purchase | 50 | 50,000 |
| Tier 2 | $40 total spend | 1,000 | 80,000 |
| Tier 3 | $200 total spend | 2,000 | 160,000 |
| Tier 4 | $400 total spend | 4,000 | 400,000 |
Limits vary by model. Verify current limits at docs.anthropic.com/en/api/rate-limits.
import anthropic
client = anthropic.Anthropic()
try:
response = client.messages.with_raw_response.create(
model="claude-sonnet-4-6",
max_tokens=512,
messages=[{"role": "user", "content": "Hello"}]
)
headers = response.headers
print("Requests remaining:", headers.get("anthropic-ratelimit-requests-remaining"))
print("Tokens remaining:", headers.get("anthropic-ratelimit-tokens-remaining"))
print("Resets at:", headers.get("anthropic-ratelimit-requests-reset"))
message = response.parse()
print(message.content[0].text)
except anthropic.RateLimitError as e:
retry_after = e.response.headers.get("retry-after", "60")
print(f"Rate limited. Wait {retry_after}s.")
import time
from collections import deque
class TokenBudgetThrottler:
def __init__(self, tpm_limit: int, window_seconds: int = 60):
self.tpm_limit = tpm_limit
self.window_seconds = window_seconds
self.usage_log: deque[tuple[float, int]] = deque()
def record_usage(self, tokens: int):
now = time.time()
self.usage_log.append((now, tokens))
# Drop old entries
while self.usage_log and now - self.usage_log[0][0] > self.window_seconds:
self.usage_log.popleft()
def tokens_in_window(self) -> int:
return sum(t for _, t in self.usage_log)
def wait_if_needed(self, estimated_tokens: int):
while self.tokens_in_window() + estimated_tokens > self.tpm_limit:
oldest = self.usage_log[0][0]
sleep_for = self.window_seconds - (time.time() - oldest) + 0.1
print(f"TPM budget exceeded. Sleeping {sleep_for:.1f}s...")
time.sleep(max(0, sleep_for))
# Purge expired entries
now = time.time()
while self.usage_log and now - self.usage_log[0][0] > self.window_seconds:
self.usage_log.popleft()
See the error handling example for full retry logic. For pricing and cost estimation across tiers, see the rate limits vs tier explained page.