Claude Skills Guide

Claude Code API Rate Limiting Implementation Guide

Rate limiting protects your Claude Code API integrations from abuse, ensures fair resource allocation, and prevents unexpected cost overruns. Whether you’re building a skill that orchestrates multiple API calls or a service that handles concurrent requests from multiple users, implementing proper rate limiting is essential for production systems.

This guide covers practical rate limiting patterns you can implement directly in your Claude Code skills and adjacent services.

Understanding Rate Limiting Basics

Rate limiting controls how frequently your application makes requests to the Claude API. The three most common strategies are:

Fixed Window counts requests within a predefined time window. Once the limit resets, users can make requests again. This approach is simple but can cause burst traffic at window boundaries.

Sliding Window provides smoother traffic handling by tracking requests on a rolling basis. It prevents the burst problem but requires more state management.

Token Bucket allows bursts up to a bucket limit while enforcing an average rate over time. This feels most natural to users and handles variable workloads well.

For Claude Code API integrations, token bucket is often the best choice because API usage naturally varies based on task complexity.

Setting Up Your Rate Limiting Project

Before diving into code, ensure your project is properly configured for rate limiting development. Create a dedicated module for rate limiting logic so it remains testable and maintainable.

Define your rate limits as constants or configuration values using a dataclass:

# config/rate_limits.py
from dataclasses import dataclass

@dataclass
class RateLimitConfig:
    requests_per_minute: int
    requests_per_hour: int
    burst_limit: int
    key_prefix: str

API_RATE_LIMIT = RateLimitConfig(
    requests_per_minute=60,
    requests_per_hour=1000,
    burst_limit=10,
    key_prefix="api:rate_limit"
)

Implementing Token Bucket Rate Limiting

Here’s a practical implementation you can use in a Python-based MCP server or skill helper:

import time
import threading
from typing import Callable

class TokenBucket:
    def __init__(self, rate: float, capacity: int):
        self.rate = rate  # tokens per second
        self.capacity = capacity
        self.tokens = capacity
        self.last_refill = time.time()
        self._lock = threading.Lock()

    def _refill(self):
        now = time.time()
        elapsed = now - self.last_refill
        self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
        self.last_refill = now

    def consume(self, tokens: int = 1) -> bool:
        with self._lock:
            self._refill()
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False

    def wait_for_token(self, tokens=1):
        while not self.consume(tokens):
            time.sleep(0.1)

This implementation is thread-safe and works across multiple concurrent requests. Initialize it with your desired rate (tokens per second) and capacity (maximum burst size):

# Allow 10 requests per second, with bursts up to 20
rate_limiter = TokenBucket(rate=10, capacity=20)

Using Rate Limiting in Your Skills

When building Claude skills that make API calls, wrap your requests with the rate limiter:

def call_claude_api(messages, rate_limiter):
    rate_limiter.wait_for_token()

    response = anthropic.messages.create(
        model="claude-sonnet-4-20250514",
        messages=messages,
        max_tokens=1024
    )
    return response

For skills that coordinate multiple API calls, such as those using the tdd skill for test-driven development or frontend-design for generating UI components, rate limiting ensures consistent performance without hitting API quotas.

Per-User Rate Limiting

In multi-user scenarios, you need isolated rate limiting per user. A RateLimiter wrapper manages per-key buckets and exposes rate limit metadata:

class RateLimiter:
    def __init__(self, bucket: TokenBucket, key_func: Callable):
        self.bucket = bucket
        self.key_func = key_func
        self._buckets: dict[str, TokenBucket] = {}
        self._lock = threading.Lock()

    def check_limit(self, request) -> tuple[bool, dict]:
        key = self.key_func(request)

        with self._lock:
            if key not in self._buckets:
                self._buckets[key] = TokenBucket(
                    rate=self.bucket.rate,
                    capacity=self.bucket.capacity
                )

        allowed = self._buckets[key].consume()

        return allowed, {
            "limit": self.bucket.capacity,
            "remaining": int(self._buckets[key].tokens),
            "reset": int(self._buckets[key].last_refill + self.bucket.capacity / self.bucket.rate)
        }

For simpler cases without request objects, store rate limiter instances in a dictionary keyed by user identifier:

user_rate_limiters = {}

def get_user_limiter(user_id):
    if user_id not in user_rate_limiters:
        user_rate_limiters[user_id] = TokenBucket(rate=5, capacity=10)
    return user_rate_limiters[user_id]

This pattern works well when building services that expose Claude capabilities to multiple users through the supermemory skill or custom MCP tools.

Handling Rate Limit Errors

Even with client-side rate limiting, you should handle API-level rate limit responses gracefully. The Claude API returns a 429 status code when limits are exceeded:

import anthropic

def call_with_retry(messages, max_retries=3):
    for attempt in range(max_retries):
        try:
            return anthropic.messages.create(
                model="claude-sonnet-4-20250514",
                messages=messages
            )
        except anthropic.RateLimitError as e:
            if attempt < max_retries - 1:
                wait_time = int(e.response.headers.get('Retry-After', 60))
                time.sleep(wait_time)
            else:
                raise

The Retry-After header tells you how long to wait before retrying. Always respect this value rather than implementing aggressive retry logic.

Advanced: Distributed Rate Limiting

When running multiple instances of your service, you need distributed rate limiting that coordinates across processes. Redis provides a reliable implementation. The Lua script approach ensures atomic check-and-consume operations:

# redis_rate_limiter.py
import redis
import time
from typing import Optional

LUA_SCRIPT = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = tonumber(ARGV[3])

redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
local count = redis.call('ZCARD', key)

if count < limit then
    redis.call('ZADD', key, now, now .. '-' .. math.random())
    redis.call('EXPIRE', key, window)
    return {1, limit - count - 1}
end

return {0, 0}
"""

class RedisRateLimiter:
    def __init__(self, redis_client: redis.Redis, key_prefix: str):
        self.client = redis_client
        self.key_prefix = key_prefix
        self._script = self.client.register_script(LUA_SCRIPT)

    def check_rate_limit(
        self,
        identifier: str,
        limit: int,
        window_seconds: int
    ) -> tuple[bool, int]:
        key = f"{self.key_prefix}:{identifier}"
        now = time.time()

        result = self._script(
            keys=[key],
            args=[limit, window_seconds, now]
        )

        allowed = bool(result[0])
        remaining = int(result[1])

        return allowed, remaining

Integrating with Your API Framework

Integrate the rate limiter into your API framework as middleware. Here’s an example with FastAPI, but the pattern applies to any framework:

# main.py
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from redis import Redis
from redis_rate_limiter import RedisRateLimiter
from config import API_RATE_LIMIT

app = FastAPI()
redis_client = Redis.from_env()
rate_limiter = RedisRateLimiter(redis_client, API_RATE_LIMIT.key_prefix)

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    # Extract identifier (user ID, API key, IP, etc.)
    identifier = get_identifier(request)

    allowed, remaining = rate_limiter.check_rate_limit(
        identifier=identifier,
        limit=API_RATE_LIMIT.requests_per_minute,
        window_seconds=60
    )

    response = await call_next(request)
    response.headers["X-RateLimit-Limit"] = str(API_RATE_LIMIT.requests_per_minute)
    response.headers["X-RateLimit-Remaining"] = str(remaining)

    if not allowed:
        raise HTTPException(
            status_code=429,
            detail="Rate limit exceeded. Please try again later."
        )

    return response

Graceful Degradation

When Redis becomes unavailable, you have two options: fail open (allow requests) or fail closed (deny requests). Fail open risks exceeding limits during outages but prevents service degradation. A HybridRateLimiter implements automatic fallback to in-memory limiting:

class HybridRateLimiter:
    def __init__(self, redis_limiter: RedisRateLimiter, local_limiter: TokenBucket):
        self.redis_limiter = redis_limiter
        self.local_limiter = local_limiter
        self.use_redis = True

    async def check_limit(self, identifier: str, limit: int, window: int):
        try:
            if self.use_redis:
                return await self.redis_limiter.async_check(identifier, limit, window)
        except redis.RedisError:
            self.use_redis = False

        # Fallback to local limiter
        allowed = self.local_limiter.consume()
        return allowed, self.local_limiter.tokens

Practical Recommendations

For most Claude Code skill implementations, start simple. A single in-memory token bucket handling your expected request volume works for months before you need to consider distributed solutions.

Monitor your actual usage patterns. If you’re building skills that process documents using the pdf skill or analyze code with custom tools, track how many API calls each operation requires. This helps you set appropriate rate limits that balance responsiveness with cost control.

Consider implementing circuit breakers alongside rate limiting. When the Claude API experiences issues, circuit breakers stop making requests temporarily, preventing cascading failures in your application.

Monitor your rate limiter in production — track hit rates, rejected requests, and any fallback activations. This data informs tuning decisions and helps you understand whether your rate limits align with actual usage patterns.

Testing Your Implementation

Verify your rate limiting works correctly before deploying:

import unittest

class TestTokenBucket(unittest.TestCase):
    def test_burst_handling(self):
        limiter = TokenBucket(rate=1, capacity=5)
        # Should allow burst up to capacity
        self.assertTrue(limiter.consume(5))
        # Next request should fail
        self.assertFalse(limiter.consume(1))

    def test_refill_over_time(self):
        limiter = TokenBucket(rate=10, capacity=10)
        limiter.consume(10)
        time.sleep(0.5)
        # Should have refilled 5 tokens
        self.assertTrue(limiter.consume(5))

if __name__ == '__main__':
    unittest.main()

Run these tests as part of your skill’s continuous integration pipeline. Key test scenarios also include verifying concurrent requests are handled atomically and that the system degrades gracefully when Redis fails.

Conclusion

Rate limiting protects your Claude Code integrations from unexpected costs and ensures reliable performance. Start with a token bucket implementation using a typed config dataclass, add per-user isolation for multi-user scenarios, and graduate to distributed limiting with Redis and Lua atomicity only when running multi-server deployments. Handle rate limit errors gracefully with proper retry logic, implement hybrid fallback for Redis outages, and always test your implementation under load before production deployment.

Built by theluckystrike — More at zovo.one