The following Python async rate limiter uses a token bucket algorithm, but it has several subtle bugs that cause it to fail under concurrent usage. Tokens leak, requests occasionally bypass the rate limit, and it can deadlock under high concurrency. Find and fix ALL the bugs, and explain each one. ```python import asyncio import time class TokenBucketRateLimiter: def __init__(self, rate: float, capacity: int): self.rate = rate # tokens per second self.capacity = capacity self.tokens = capacity self.last_refill = time.time() self._lock = asyncio.Lock() def _refill(self): now = time.time() elapsed = now - self.last_refill self.tokens += elapsed * self.rate self.tokens = min(self.tokens, self.capacity) self.last_refill = now async def acquire(self, tokens: int = 1): while True: async with self._lock: self._refill() if self.tokens >= tokens: self.tokens -= tokens return True wait_time = (tokens - self.tokens) / self.rate await asyncio.sleep(wait_time) async def __aenter__(self): self.acquire() return self async def __aexit__(self, *args): pass async def process(limiter, task_id): async with limiter: print(f"Task {task_id} running at {time.monotonic():.2f}") await asyncio.sleep(0.1) async def main(): limiter = TokenBucketRateLimiter(rate=5, capacity=5) tasks = [process(limiter, i) for i in range(20)] await asyncio.gather(*tasks) asyncio.run(main()) ``` Identify every bug, explain why it's dangerous, and provide the corrected code. Bonus: suggest any design improvements for production use.
Bugs Identified and Explained
Bug 1: __aenter__ doesn't await the acquire() call (CRITICAL)
async def __aenter__(self):
self.acquire() # ❌ Missing await!
return self
Problem: acquire() is async but not awaited, so async with limiter: completes immediately without waiting for tokens. All 20 tasks run concurrently, bypassing the rate limit entirely.
Danger: Complete rate limit failure - allows unlimited concurrency.
Bug 2: Unprotected token refill between lock releases (TOKEN LEAKAGE)
async def acquire(self, tokens: int = 1):
while True:
async with self._lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
wait_time = (tokens - self.tokens) / self.rate # Uses current tokens
await asyncio.sleep(wait_time) # ❌ Time passes, tokens refill UNPROTECTED
Problem: After sleeping, new tokens are added by _refill() but outside the lock. Concurrent tasks can see inconsistent token counts, causing double-spending or leakage.
Danger: Requests bypass limits or tokens disappear unexpectedly.
Bug 3: time.time() mixing with async monotonic time (SUBTLE)
Problem: time.time() (wall-clock) vs time.monotonic() (monotonic). Clock adjustments cause incorrect refill calculations.
Danger: Negative elapsed time → no tokens added → artificial throttling.
Bug 4: Potential livelock under extreme contention
Problem: Short wait_time calculations cause tight loops with minimal sleep, exhausting CPU while holding/releasing lock rapidly.
Danger: Degraded performance under high load.
✅ Fixed Code
import asyncio
import time
class TokenBucketRateLimiter:
def __init__(self, rate: float, capacity: int):
self.rate = rate # tokens per second
self.capacity = capacity
self.tokens = capacity
self.last_refill = time.monotonic() # ✅ Monotonic time
self._lock = asyncio.Lock()
def _refill(self):
now = time.monotonic() # ✅ Monotonic time
elapsed = now - self.last_refill
if elapsed > 0: # ✅ Avoid negative time
self.tokens += elapsed * self.rate
self.tokens = min(self.tokens, self.capacity)
self.last_refill = now
async def acquire(self, tokens: int = 1):
while True:
async with self._lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
# ✅ Calculate minimum sleep (avoid livelock)
async with self._lock:
self._refill()
deficit = max(0, tokens - self.tokens)
wait_time = deficit / self.rate if self.rate > 0 else 1.0
await asyncio.sleep(wait_time)
async def __aenter__(self):
await self.acquire() # ✅ AWAIT the acquire!
return self
async def __aexit__(self, *args):
pass
async def process(limiter, task_id):
async with limiter:
print(f"Task {task_id} running at {time.monotonic():.2f}")
await asyncio.sleep(0.1)
async def main():
limiter = TokenBucketRateLimiter(rate=5, capacity=5)
tasks = [process(limiter, i) for i in range(20)]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
🔧 Key Fixes Summary
| Bug | Fix | Impact |
|---|---|---|
Missing await in __aenter__ | await self.acquire() | ✅ Rate limiting now works |
| Token leakage | Double-lock pattern for consistent reads | ✅ No double-spending |
| Time source mismatch | time.monotonic() everywhere | ✅ Correct refill math |
| Livelock risk | Minimum sleep calculation | ✅ Better CPU usage |
🚀 Production Improvements
import asyncio
import time
from typing import Optional
from dataclasses import dataclass
@dataclass
class AcquireResult:
acquired: bool
wait_time: float
class ProductionTokenBucketRateLimiter:
def __init__(self, rate: float, capacity: int, refill_interval: float = 0.01):
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.last_refill = time.monotonic()
self._lock = asyncio.Lock()
self._refill_interval = refill_interval # ✅ Background refill
self._refill_task: Optional[asyncio.Task] = None
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, *args):
pass
async def acquire(self, tokens: int = 1) -> AcquireResult:
"""Non-blocking version with wait time prediction"""
async with self._lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return AcquireResult(acquired=True, wait_time=0.0)
deficit = tokens - self.tokens
wait_time = deficit / self.rate
return AcquireResult(acquired=False, wait_time=wait_time)
async def start_background_refill(self):
"""✅ Production: Background token refill"""
self._refill_task = asyncio.create_task(self._refill_loop())
async def _refill_loop(self):
while True:
await asyncio.sleep(self._refill_interval)
async with self._lock:
self._refill()
async def stop(self):
if self._refill_task:
self._refill_task.cancel()
try:
await self._refill_task
except asyncio.CancelledError:
pass
# ... rest same as fixed version
Production Benefits:
- Background refill: No blocking on every acquire
- Non-blocking acquire: Predict wait time without sleeping
- Graceful shutdown: Proper task cleanup
- Metrics-ready: Easy to add counters/histograms
Test it: Run the fixed version - you'll see tasks properly spaced ~0.2s apart (5 tokens/sec, 0.1s work = effective 5 reqs/sec).
Comments