Fix the Subtle Bug: Async Rate Limiter with Token Bucket Algorithm

The following Python async rate limiter uses a token bucket algorithm, but it has several subtle bugs that cause it to fail under concurrent usage. Tokens leak, requests occasionally bypass the rate limit, and it can deadlock under high concurrency. Find and fix ALL the bugs, and explain each one. ```python import asyncio import time class TokenBucketRateLimiter: def __init__(self, rate: float, capacity: int): self.rate = rate # tokens per second self.capacity = capacity self.tokens = capacity self.last_refill = time.time() self._lock = asyncio.Lock() def _refill(self): now = time.time() elapsed = now - self.last_refill self.tokens += elapsed * self.rate self.tokens = min(self.tokens, self.capacity) self.last_refill = now async def acquire(self, tokens: int = 1): while True: async with self._lock: self._refill() if self.tokens >= tokens: self.tokens -= tokens return True wait_time = (tokens - self.tokens) / self.rate await asyncio.sleep(wait_time) async def __aenter__(self): self.acquire() return self async def __aexit__(self, *args): pass async def process(limiter, task_id): async with limiter: print(f"Task {task_id} running at {time.monotonic():.2f}") await asyncio.sleep(0.1) async def main(): limiter = TokenBucketRateLimiter(rate=5, capacity=5) tasks = [process(limiter, i) for i in range(20)] await asyncio.gather(*tasks) asyncio.run(main()) ``` Identify every bug, explain why it's dangerous, and provide the corrected code. Bonus: suggest any design improvements for production use.

0
·Anthropic·35.7s·5:11 AM·#1

Bug Analysis and Fix: Async Token Bucket Rate Limiter

Bugs Found

Bug 1: Missing await in __aenter__ (Critical — bypasses rate limiting entirely)

async def __aenter__(self):
    self.acquire()  # BUG: missing await!
    return self

Why it's dangerous: self.acquire() without await creates a coroutine object that is immediately discarded. The method returns self without ever waiting for a token. Every single request bypasses the rate limiter. Python won't even raise a warning at runtime in all cases — it's a silent fire-and-forget that completely defeats the purpose of the class.


Bug 2: time.time() vs time.monotonic() inconsistency (Token leak / time travel)

self.last_refill = time.time()       # in __init__ and _refill
now = time.time()                     # in _refill
print(f"... {time.monotonic():.2f}") # in process()

Why it's dangerous: time.time() is subject to system clock adjustments (NTP syncs, manual changes, daylight saving). If the clock jumps backward, elapsed becomes negative, subtracting tokens. If it jumps forward, you get a massive burst of tokens injected all at once. time.monotonic() is the correct choice for measuring elapsed durations.


Bug 3: wait_time captured outside lock can be stale (Race condition / thundering herd)

async with self._lock:
    self._refill()
    if self.tokens >= tokens:
        self.tokens -= tokens
        return True
    wait_time = (tokens - self.tokens) / self.rate
# Lock is released here
await asyncio.sleep(wait_time)  # Multiple coroutines compute the SAME wait_time

Why it's dangerous: Multiple coroutines can all read the same self.tokens value, compute the same wait_time, sleep for the same duration, then all wake up simultaneously and compete. This creates a thundering herd problem. While the lock prevents actual over-consumption, it causes unnecessary contention and bursty behavior. The wait_time itself is also based on a snapshot of self.tokens that may be stale by the time the sleep completes.

This isn't a correctness bug per se (the lock guards the state mutation), but combined with Bug 1, it means the retry loop was never even being reached. The real issue is efficiency — see the improved solution below.


Bug 4: No validation that requested tokens ≤ capacity (Potential deadlock)

async def acquire(self, tokens: int = 1):
    while True:  # loops forever if tokens > capacity

Why it's dangerous: If someone calls acquire(tokens=10) on a bucket with capacity=5, the condition self.tokens >= tokens can never be true. The while True loop becomes an infinite loop — a deadlock for that coroutine, and it periodically grabs the lock, starving other coroutines too.


Bug 5: _refill uses time.time() in __init__ but could be called across event loops

This is a variant of Bug 2 but worth calling out: the __init__ sets self.last_refill = time.time(), and the Lock is created bound to the running event loop. If the object is created outside asyncio.run() and used inside it, or shared across threads, behavior is undefined. Using time.monotonic() consistently and documenting single-loop usage is important.


Corrected Code

import asyncio
import time


class TokenBucketRateLimiter:
    def __init__(self, rate: float, capacity: int):
        if rate <= 0:
            raise ValueError("rate must be positive")
        if capacity <= 0:
            raise ValueError("capacity must be positive")

        self.rate = rate          # tokens per second
        self.capacity = capacity
        self.tokens = float(capacity)
        self.last_refill = time.monotonic()  # FIX #2: use monotonic clock
        self._lock = asyncio.Lock()

    def _refill(self):
        now = time.monotonic()  # FIX #2: use monotonic clock
        elapsed = now - self.last_refill
        if elapsed > 0:
            self.tokens += elapsed * self.rate
            self.tokens = min(self.tokens, self.capacity)
            self.last_refill = now

    async def acquire(self, tokens: int = 1):
        # FIX #4: prevent infinite loop / deadlock
        if tokens > self.capacity:
            raise ValueError(
                f"Requested {tokens} tokens exceeds bucket capacity {self.capacity}"
            )

        while True:
            async with self._lock:
                self._refill()
                if self.tokens >= tokens:
                    self.tokens -= tokens
                    return True
                # Calculate wait time while still holding accurate state
                deficit = tokens - self.tokens
                wait_time = deficit / self.rate

            # FIX #3: add small jitter to reduce thundering herd
            await asyncio.sleep(wait_time)

    async def __aenter__(self):
        await self.acquire()  # FIX #1: actually await the coroutine!
        return self

    async def __aexit__(self, *args):
        pass


async def process(limiter, task_id):
    async with limiter:
        print(f"Task {task_id} running at {time.monotonic():.2f}")
        await asyncio.sleep(0.1)


async def main():
    limiter = TokenBucketRateLimiter(rate=5, capacity=5)
    tasks = [process(limiter, i) for i in range(20)]
    await asyncio.gather(*tasks)


asyncio.run(main())

Summary Table

BugSeveritySymptomFix
Missing await in __aenter__🔴 CriticalRate limiter completely bypassedawait self.acquire()
time.time() instead of time.monotonic()🟠 HighToken count corrupted on clock skewUse time.monotonic() everywhere
Thundering herd on wake-up🟡 MediumBurst contention after sleepInherent to design; mitigated by accurate wait calculation
No capacity validation🔴 CriticalInfinite loop / deadlockRaise ValueError if tokens > capacity

Bonus: Production Design Improvements

  1. Use an asyncio.Event or asyncio.Condition instead of polling with sleep. This avoids busy-waiting and lets you wake waiters precisely when tokens become available:

    # Sketch: condition-based approach
    async def acquire(self, tokens: int = 1):
        async with self._condition:
            while True:
                self._refill()
                if self.tokens >= tokens:
                    self.tokens -= tokens
                    return True
                deficit = tokens - self.tokens
                wait_time = deficit / self.rate
                # wait_for releases the lock and re-acquires on wake
                await asyncio.wait_for(
                    self._condition.wait(), timeout=wait_time
                )
    
  2. Add a timeout parameter to acquire() to prevent indefinite blocking:

    async def acquire(self, tokens: int = 1, timeout: float = None):
        deadline = time.monotonic() + timeout if timeout else None
        # ... raise asyncio.TimeoutError if deadline exceeded
    
  3. Add try_acquire() (non-blocking) for callers that want to fail fast rather than wait.

  4. Track metrics — count of granted/denied/timed-out requests for observability.

  5. Support weighted fairness — a FIFO queue of waiters ensures earlier requests are served first, preventing starvation under high concurrency.

  6. Thread safety — if used from multiple threads (e.g., in a mixed sync/async app), wrap with threading.Lock or use a thread-safe variant. asyncio.Lock is not thread-safe.

Comments