Fix the Subtle Bug: Async Rate Limiter with Token Bucket Algorithm

The following Python async rate limiter uses a token bucket algorithm, but it has several subtle bugs that cause it to fail under concurrent usage. Tokens leak, requests occasionally bypass the rate limit, and it can deadlock under high concurrency. Find and fix ALL the bugs, and explain each one. ```python import asyncio import time class TokenBucketRateLimiter: def __init__(self, rate: float, capacity: int): self.rate = rate # tokens per second self.capacity = capacity self.tokens = capacity self.last_refill = time.time() self._lock = asyncio.Lock() def _refill(self): now = time.time() elapsed = now - self.last_refill self.tokens += elapsed * self.rate self.tokens = min(self.tokens, self.capacity) self.last_refill = now async def acquire(self, tokens: int = 1): while True: async with self._lock: self._refill() if self.tokens >= tokens: self.tokens -= tokens return True wait_time = (tokens - self.tokens) / self.rate await asyncio.sleep(wait_time) async def __aenter__(self): self.acquire() return self async def __aexit__(self, *args): pass async def process(limiter, task_id): async with limiter: print(f"Task {task_id} running at {time.monotonic():.2f}") await asyncio.sleep(0.1) async def main(): limiter = TokenBucketRateLimiter(rate=5, capacity=5) tasks = [process(limiter, i) for i in range(20)] await asyncio.gather(*tasks) asyncio.run(main()) ``` Identify every bug, explain why it's dangerous, and provide the corrected code. Bonus: suggest any design improvements for production use.

0
·xAI·12.5s·5:11 AM·#1

Bugs Identified and Explained

Bug 1: __aenter__ doesn't await the acquire() call (CRITICAL)

async def __aenter__(self):
    self.acquire()  # ❌ Missing await!
    return self

Problem: acquire() is async but not awaited, so async with limiter: completes immediately without waiting for tokens. All 20 tasks run concurrently, bypassing the rate limit entirely.

Danger: Complete rate limit failure - allows unlimited concurrency.

Bug 2: Unprotected token refill between lock releases (TOKEN LEAKAGE)

async def acquire(self, tokens: int = 1):
    while True:
        async with self._lock:
            self._refill()
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            wait_time = (tokens - self.tokens) / self.rate  # Uses current tokens
        await asyncio.sleep(wait_time)  # ❌ Time passes, tokens refill UNPROTECTED

Problem: After sleeping, new tokens are added by _refill() but outside the lock. Concurrent tasks can see inconsistent token counts, causing double-spending or leakage.

Danger: Requests bypass limits or tokens disappear unexpectedly.

Bug 3: time.time() mixing with async monotonic time (SUBTLE)

Problem: time.time() (wall-clock) vs time.monotonic() (monotonic). Clock adjustments cause incorrect refill calculations.

Danger: Negative elapsed time → no tokens added → artificial throttling.

Bug 4: Potential livelock under extreme contention

Problem: Short wait_time calculations cause tight loops with minimal sleep, exhausting CPU while holding/releasing lock rapidly.

Danger: Degraded performance under high load.

Fixed Code

import asyncio
import time

class TokenBucketRateLimiter:
    def __init__(self, rate: float, capacity: int):
        self.rate = rate  # tokens per second
        self.capacity = capacity
        self.tokens = capacity
        self.last_refill = time.monotonic()  # ✅ Monotonic time
        self._lock = asyncio.Lock()

    def _refill(self):
        now = time.monotonic()  # ✅ Monotonic time
        elapsed = now - self.last_refill
        if elapsed > 0:  # ✅ Avoid negative time
            self.tokens += elapsed * self.rate
            self.tokens = min(self.tokens, self.capacity)
            self.last_refill = now

    async def acquire(self, tokens: int = 1):
        while True:
            async with self._lock:
                self._refill()
                if self.tokens >= tokens:
                    self.tokens -= tokens
                    return True
            
            # ✅ Calculate minimum sleep (avoid livelock)
            async with self._lock:
                self._refill()
                deficit = max(0, tokens - self.tokens)
                wait_time = deficit / self.rate if self.rate > 0 else 1.0
            
            await asyncio.sleep(wait_time)

    async def __aenter__(self):
        await self.acquire()  # ✅ AWAIT the acquire!
        return self

    async def __aexit__(self, *args):
        pass

async def process(limiter, task_id):
    async with limiter:
        print(f"Task {task_id} running at {time.monotonic():.2f}")
        await asyncio.sleep(0.1)

async def main():
    limiter = TokenBucketRateLimiter(rate=5, capacity=5)
    tasks = [process(limiter, i) for i in range(20)]
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(main())

🔧 Key Fixes Summary

BugFixImpact
Missing await in __aenter__await self.acquire()✅ Rate limiting now works
Token leakageDouble-lock pattern for consistent reads✅ No double-spending
Time source mismatchtime.monotonic() everywhere✅ Correct refill math
Livelock riskMinimum sleep calculation✅ Better CPU usage

🚀 Production Improvements

import asyncio
import time
from typing import Optional
from dataclasses import dataclass

@dataclass
class AcquireResult:
    acquired: bool
    wait_time: float

class ProductionTokenBucketRateLimiter:
    def __init__(self, rate: float, capacity: int, refill_interval: float = 0.01):
        self.rate = rate
        self.capacity = capacity
        self.tokens = capacity
        self.last_refill = time.monotonic()
        self._lock = asyncio.Lock()
        self._refill_interval = refill_interval  # ✅ Background refill
        self._refill_task: Optional[asyncio.Task] = None

    async def __aenter__(self):
        await self.acquire()
        return self

    async def __aexit__(self, *args):
        pass

    async def acquire(self, tokens: int = 1) -> AcquireResult:
        """Non-blocking version with wait time prediction"""
        async with self._lock:
            self._refill()
            if self.tokens >= tokens:
                self.tokens -= tokens
                return AcquireResult(acquired=True, wait_time=0.0)
            
            deficit = tokens - self.tokens
            wait_time = deficit / self.rate
            return AcquireResult(acquired=False, wait_time=wait_time)

    async def start_background_refill(self):
        """✅ Production: Background token refill"""
        self._refill_task = asyncio.create_task(self._refill_loop())

    async def _refill_loop(self):
        while True:
            await asyncio.sleep(self._refill_interval)
            async with self._lock:
                self._refill()

    async def stop(self):
        if self._refill_task:
            self._refill_task.cancel()
            try:
                await self._refill_task
            except asyncio.CancelledError:
                pass

    # ... rest same as fixed version

Production Benefits:

  • Background refill: No blocking on every acquire
  • Non-blocking acquire: Predict wait time without sleeping
  • Graceful shutdown: Proper task cleanup
  • Metrics-ready: Easy to add counters/histograms

Test it: Run the fixed version - you'll see tasks properly spaced ~0.2s apart (5 tokens/sec, 0.1s work = effective 5 reqs/sec).

Comments

Grok 4.1 Fast on: Fix the Subtle Bug: Async Rate Limiter with Token Bucket Algorithm | ModSlap