The following Python async rate limiter uses a token bucket algorithm, but it has several subtle bugs that cause it to fail under concurrent usage. Tokens leak, requests occasionally bypass the rate limit, and it can deadlock under high concurrency. Find and fix ALL the bugs, and explain each one. ```python import asyncio import time class TokenBucketRateLimiter: def __init__(self, rate: float, capacity: int): self.rate = rate # tokens per second self.capacity = capacity self.tokens = capacity self.last_refill = time.time() self._lock = asyncio.Lock() def _refill(self): now = time.time() elapsed = now - self.last_refill self.tokens += elapsed * self.rate self.tokens = min(self.tokens, self.capacity) self.last_refill = now async def acquire(self, tokens: int = 1): while True: async with self._lock: self._refill() if self.tokens >= tokens: self.tokens -= tokens return True wait_time = (tokens - self.tokens) / self.rate await asyncio.sleep(wait_time) async def __aenter__(self): self.acquire() return self async def __aexit__(self, *args): pass async def process(limiter, task_id): async with limiter: print(f"Task {task_id} running at {time.monotonic():.2f}") await asyncio.sleep(0.1) async def main(): limiter = TokenBucketRateLimiter(rate=5, capacity=5) tasks = [process(limiter, i) for i in range(20)] await asyncio.gather(*tasks) asyncio.run(main()) ``` Identify every bug, explain why it's dangerous, and provide the corrected code. Bonus: suggest any design improvements for production use.
What’s broken (all the subtle bugs) and why it matters
1) __aenter__ forgets to await acquire()
Bug:
async def __aenter__(self):
self.acquire() # <- not awaited
return self
This creates a coroutine object and discards it. The token is never actually acquired, so tasks can enter the context immediately and bypass the rate limit. You’ll also typically get a RuntimeWarning: coroutine was never awaited.
Fix: await self.acquire().
2) Mixing time.time() and time.monotonic() (and using the wrong one for refills)
Bug: _refill() uses time.time():
now = time.time()
elapsed = now - self.last_refill
time.time() can jump backwards/forwards due to NTP adjustments, VM clock changes, daylight/time corrections, etc. That can produce:
- Negative elapsed → tokens decrease (leak) or wait_time becomes wrong.
- Huge elapsed → bucket refills instantly to capacity (bursts beyond intended).
Meanwhile, logging uses time.monotonic(), so your printed timestamps don’t correspond to the refill clock.
Fix: Use time.monotonic() consistently for rate limiting.
3) Division by zero / invalid configuration can deadlock or crash
Bug: wait_time = (tokens - self.tokens) / self.rate with:
rate == 0→ division by zero (crash)rate < 0→ negative wait times / nonsense behavior
Also, if capacity <= 0 things break conceptually.
Fix: Validate rate > 0, capacity >= 1, and tokens >= 1.
4) Requests for more tokens than capacity can wait forever
Bug: If someone calls acquire(tokens=10) but capacity=5, the bucket can never hold 10 tokens. Your loop will sleep and retry forever (a “deadlock” from the caller’s perspective).
Fix: Either:
- raise
ValueErrorwhentokens > capacity, or - implement “borrowing” (usually not desired for token buckets).
5) “Thundering herd” / unfairness under concurrency causes bursts & apparent bypass
This one is subtle: when tokens are insufficient, every waiter computes a wait_time based on the same shared state, releases the lock, sleeps, then they all wake around the same time and race.
While the lock prevents simultaneous decrements, this pattern causes:
- Herd wakeups (lots of tasks waking repeatedly)
- Unfairness (some tasks can starve)
- Jittery bursts (many tasks wake at once; some immediately succeed; others go back to sleep)
It can look like “occasional bypass” because several tasks may run very close together right after a refill.
Fix: Use a condition variable (asyncio.Condition) or an internal “next available time” scheduler so waiters don’t all wake at once. If you keep the sleep approach, at least ensure monotonic time and clamp wait times.
6) _refill() can be called with negative elapsed (token leak)
Even if you keep time.time(), if the clock goes backwards:
elapsed = now - self.last_refill # negative
self.tokens += elapsed * self.rate # subtracts tokens
That’s the “tokens leak” symptom.
Fix: monotonic time, and/or elapsed = max(0, elapsed) as a defensive clamp.
7) Context manager semantics are incomplete
Not a correctness bug for token bucket (you don’t “release” tokens), but:
__aexit__ignores exceptions (fine), but__aenter__should return something meaningful (it does), and- the missing
awaitmakes the context manager fundamentally broken (covered in #1).
Corrected implementation (robust under concurrency)
Below is a fixed version that:
- uses
time.monotonic() - validates inputs
- prevents impossible requests (
tokens > capacity) - avoids herd wakeups using
asyncio.Condition - clamps elapsed defensively
import asyncio
import time
from typing import Optional
class TokenBucketRateLimiter:
def __init__(self, rate: float, capacity: int):
if rate <= 0:
raise ValueError("rate must be > 0 tokens/sec")
if capacity <= 0:
raise ValueError("capacity must be >= 1")
self.rate = float(rate)
self.capacity = int(capacity)
self._tokens = float(capacity)
self._last_refill = time.monotonic()
self._cond = asyncio.Condition()
def _refill_unlocked(self, now: Optional[float] = None) -> None:
"""Must be called with condition lock held."""
if now is None:
now = time.monotonic()
elapsed = now - self._last_refill
if elapsed <= 0:
# Defensive: monotonic shouldn't go backwards, but don't leak tokens if it does.
self._last_refill = now
return
self._tokens = min(self.capacity, self._tokens + elapsed * self.rate)
self._last_refill = now
async def acquire(self, tokens: int = 1) -> None:
if tokens <= 0:
raise ValueError("tokens must be >= 1")
if tokens > self.capacity:
raise ValueError("tokens cannot exceed bucket capacity")
async with self._cond:
while True:
now = time.monotonic()
self._refill_unlocked(now)
if self._tokens >= tokens:
self._tokens -= tokens
# Wake one waiter: state changed (tokens decreased affects scheduling),
# but refill over time is the main driver. Still, notifying helps fairness.
self._cond.notify(1)
return
# Compute time until enough tokens accumulate.
needed = tokens - self._tokens
wait_time = needed / self.rate # rate > 0 guaranteed
# Wait releases the lock and re-acquires it before returning.
# Timeout prevents sleeping too long if other tasks change state.
await self._cond.wait_for(lambda: False, timeout=wait_time)
async def __aenter__(self):
await self.acquire(1)
return self
async def __aexit__(self, exc_type, exc, tb):
return False # don't suppress exceptions
# Demo
async def process(limiter, task_id):
async with limiter:
print(f"Task {task_id} running at {time.monotonic():.2f}")
await asyncio.sleep(0.1)
async def main():
limiter = TokenBucketRateLimiter(rate=5, capacity=5)
await asyncio.gather(*(process(limiter, i) for i in range(20)))
if __name__ == "__main__":
asyncio.run(main())
Notes on the condition usage
- We use a condition mainly to avoid the “everyone sleeps then wakes together” pattern.
wait_for(lambda: False, timeout=wait_time)is a simple way to “sleep while releasing the lock”; it always times out unless notified earlier.- You could also
await asyncio.sleep(wait_time)outside the lock, but the condition approach reduces herd effects and improves fairness.
Bonus: production design improvements
- Cancellation safety
- If a task is cancelled while waiting,
asyncio.Conditionhandles it fine, but you may want to catchCancelledErrorto log/metrics.
- Fairness / FIFO
- For strict fairness, keep a FIFO queue of waiters (each with requested tokens) and only wake the head when it can proceed.
- Jitter / clock granularity
- Add a tiny minimum sleep (e.g.
max(wait_time, 0)) and consider jitter to avoid synchronized wakeups across processes.
- Metrics & observability
- Track wait times, number of waiters, tokens available, and rate-limited events.
- Multi-process / distributed limiting
- This implementation is per-event-loop / per-process. For distributed rate limiting, use Redis (Lua script), a DB, or a dedicated limiter service.
- Use a single “next available time” algorithm
- Alternative approach: compute
next_free_timeand schedule acquisitions based on it (often simpler, very stable under concurrency), especially for “leaky bucket” style limiting.
If you want, I can provide a strict FIFO token-bucket variant (no starvation, better predictability) or a “next-available-time” limiter that’s even simpler and very concurrency-friendly.
Comments