Back to Notes
·9 min read

Controlling 20,000 Requests Without Burning Money or Getting Banned

#LLM#Redis#Celery#RabbitMQ#Cost Control#Architecture
Controlling 20,000 Requests Without Burning Money or Getting Banned

The platform had three services that cost money per request:

  1. Web scraping API — ~$0.01 per search
  2. LLM API (OpenAI/Claude) — $0.01-0.06 per 1K tokens
  3. Embedding API — $0.0001 per 1K tokens

First version: fire requests as fast as possible.

# What I wrote first (don't do this)
tasks = [process_lead(lead) for lead in leads]
results = await asyncio.gather(*tasks)  # 5000 parallel requests

What happened:

  • Hit rate limits on all three services simultaneously
  • Retries created a retry storm
  • $47 bill from one afternoon of "testing"
  • Half the work failed anyway

I needed proper request control. Not just "slow down" — actual orchestration.


Architecture: Why I Needed Both Redis AND RabbitMQ

This took me a while to understand. Everyone says "just use Redis" or "just use RabbitMQ." I needed both, for different things.

┌─────────────────────────────────────────────────────────┐
│                     Coordinator                          │
└─────────────────┬───────────────────────────────────────┘

    ┌─────────────┼─────────────┐
    ▼             ▼             ▼
┌────────┐  ┌──────────┐  ┌──────────┐
│ Redis  │  │ RabbitMQ │  │ Postgres │
└────────┘  └──────────┘  └──────────┘
    │             │
    │             └──→ Celery Workers
- Scraping (2 workers)
- LLM Batch (4 workers)
- LLM Preview (2 workers)

    └──→ Semaphores, Circuit Breakers, Rate Limits, Pub/Sub

Redis for: Real-time coordination. Semaphores, rate limits, circuit breakers, caching, pub/sub for progress updates.

RabbitMQ for: Reliable task queuing. Acknowledgments, retries, dead letter queues, priority routing.

Why not Redis for queuing? I tried. Lost tasks when workers crashed. No built-in acknowledgment. Had to build retry logic myself. Switched to RabbitMQ and those problems disappeared.

Why not RabbitMQ for rate limiting? Too slow for real-time checks. I need sub-millisecond "can I make this request?" decisions. Redis handles that.


Pattern 1: Redis Semaphore with Lua Scripts

Problem: I have 20 API slots total. Preview requests (user waiting) should get priority over bulk batch jobs.

First attempt — Python semaphore:

semaphore = asyncio.Semaphore(20)

Problems:

  • Not shared across workers
  • No priority support
  • Lost state on restart

Solution: Redis-based semaphore with atomic Lua operations

class RedisSemaphore:
    def __init__(self, redis, name: str, max_concurrent: int = 20):
        self.redis = redis
        self.name = name
        self.max_concurrent = max_concurrent
 
        # Lua script for atomic acquire
        self.acquire_script = """
        local current = tonumber(redis.call('GET', KEYS[1]) or '0')
        local max_slots = tonumber(ARGV[1])
        local priority = tonumber(ARGV[2])
        local reserved_high_priority = tonumber(ARGV[3])
 
        -- High priority (>=8) gets reserved slots
        local available = max_slots - current
        if priority >= 8 then
            available = available + reserved_high_priority
        end
 
        if available > 0 then
            redis.call('INCR', KEYS[1])
            redis.call('EXPIRE', KEYS[1], 300)
            return 1
        end
        return 0
        """
 
    async def acquire(self, priority: int = 5, timeout: float = 30.0):
        """
        priority: 1-10 (10 = highest, preview requests)
        Reserved slots: priority >= 8 gets 5 reserved slots
        """
        start = time.time()
        while time.time() - start < timeout:
            acquired = await self.redis.eval(
                self.acquire_script,
                1,
                f"semaphore:{self.name}",
                self.max_concurrent,
                priority,
                5  # Reserved high-priority slots
            )
            if acquired:
                return True
 
            # Back off based on priority (high priority retries faster)
            await asyncio.sleep(0.1 if priority >= 8 else 0.5)
 
        raise TimeoutError(f"Could not acquire semaphore within {timeout}s")
 
    async def release(self):
        await self.redis.decr(f"semaphore:{self.name}")

Why Lua? Atomic operations. Check-and-increment happens in one Redis call. No race conditions between workers.

Why reserved slots? Without them, a flood of batch jobs would starve preview requests. Users would wait 30+ seconds for a preview because 20 batch jobs got there first.

Configuration I landed on:

  • Max concurrent: 20
  • Reserved for preview (priority ≥ 8): 5 slots
  • Batch jobs (priority 5): compete for remaining 15

Pattern 2: Circuit Breaker

Problem: Scraping API went down. The system kept hammering it. Every request failed. Retry logic made it worse — now I'm paying for retries that also fail.

Circuit breaker pattern: When failures exceed threshold, stop trying. Wait. Test occasionally. Resume when healthy.

CLOSED ──(failures > 5)──→ OPEN ──(30s timeout)──→ HALF_OPEN
   ▲                         │                         │
   │                         │                         │
   └────────(success)────────┴────────(failure)────────┘
class RedisCircuitBreaker:
    """
    States stored in Redis (shared across all workers):
    - circuit:{name}:state = CLOSED|OPEN|HALF_OPEN
    - circuit:{name}:failures = count
    - circuit:{name}:last_failure = timestamp
    """
 
    FAILURE_THRESHOLD = 5
    RESET_TIMEOUT = 30  # seconds
 
    async def can_proceed(self) -> bool:
        state = await self.redis.get(
            f"circuit:{self.name}:state"
        ) or "CLOSED"
 
        if state == "CLOSED":
            return True
 
        if state == "OPEN":
            last_failure = await self.redis.get(
                f"circuit:{self.name}:last_failure"
            )
            if time.time() - float(last_failure) > self.RESET_TIMEOUT:
                # Try half-open
                await self.redis.set(
                    f"circuit:{self.name}:state",
                    "HALF_OPEN"
                )
                return True
            return False  # Still open, reject immediately
 
        if state == "HALF_OPEN":
            return True  # Allow one request to test
 
        return False
 
    async def record_success(self):
        await self.redis.set(f"circuit:{self.name}:state", "CLOSED")
        await self.redis.set(f"circuit:{self.name}:failures", 0)
 
    async def record_failure(self):
        failures = await self.redis.incr(f"circuit:{self.name}:failures")
        await self.redis.set(
            f"circuit:{self.name}:last_failure",
            time.time()
        )
 
        if failures >= self.FAILURE_THRESHOLD:
            await self.redis.set(f"circuit:{self.name}:state", "OPEN")

Real scenario this saved me:

Scraping API had a 10-minute outage. Without circuit breaker: 2,000 failed requests × $0.01 = $20 wasted, plus retry storm.

With circuit breaker: 5 failed requests, circuit opens, waits 30 seconds, tests, still failing, stays open. Total waste: ~$0.10.


Pattern 3: Celery Queue Priorities

Not all tasks are equal. User clicks "Preview" → needs response in 2 seconds. Background batch processing 5,000 leads → can take an hour.

Queue configuration:

# celery_config.py
 
task_queues = {
    'scraping': {
        'exchange': 'scraping',
        'exchange_type': 'direct',
        'routing_key': 'scraping',
    },
    'llm_preview': {
        'exchange': 'llm',
        'exchange_type': 'direct',
        'routing_key': 'llm.preview',
        'queue_arguments': {'x-max-priority': 10}
    },
    'llm_batch': {
        'exchange': 'llm',
        'exchange_type': 'direct',
        'routing_key': 'llm.batch',
        'queue_arguments': {'x-max-priority': 10}
    },
}
 
task_routes = {
    'tasks.generate_preview': {'queue': 'llm_preview'},
    'tasks.generate_batch': {'queue': 'llm_batch'},
    'tasks.scrape_business': {'queue': 'scraping'},
}
 
# Rate limits per task type
task_annotations = {
    'tasks.scrape_business': {'rate_limit': '45/m'},
    'tasks.generate_batch': {'rate_limit': '50/m'},
    'tasks.generate_preview': {'rate_limit': '100/m'},
}

Worker configuration:

# docker-compose.yml
 
scraper_worker:
  command: celery -A config worker -Q scraping --concurrency=2
  # Only 2 concurrent - scraping is slow and expensive
 
llm_worker:
  command: celery -A config worker -Q llm_preview,llm_batch --concurrency=4
  # 4 concurrent - LLM calls are the bottleneck

Why separate queues instead of one queue with priorities?

Tried that first. Problem: One worker type handles everything. If I want 2 scraping workers and 4 LLM workers, I can't configure that with a single queue. Separate queues = separate scaling.


Pattern 4: Retry with Exponential Backoff + Jitter

Naive retry: Wait 5 seconds, try again.

Problem: 100 requests fail at the same time. All retry at exactly 5 seconds. All fail again. Retry storm.

@shared_task(
    bind=True,
    max_retries=3,
    autoretry_for=(ConnectionError, TimeoutError, RateLimitError),
    retry_backoff=True,        # Exponential: 1s, 2s, 4s, 8s...
    retry_backoff_max=600,     # Cap at 10 minutes
    retry_jitter=True,         # Random offset to prevent thundering herd
)
def scrape_business(self, business_name: str, search_type: str = 'full'):
    try:
        # Check circuit breaker first
        if not circuit_breaker.can_proceed():
            raise self.retry(countdown=30)  # Circuit open, wait
 
        # Acquire semaphore slot
        priority = 9 if search_type == 'preview' else 5
        with semaphore.acquire(priority=priority):
            result = scraping_api.search(business_name)
 
        circuit_breaker.record_success()
        return result
 
    except RateLimitError as e:
        circuit_breaker.record_failure()
        raise  # Let autoretry handle it
 
    except Exception as e:
        circuit_breaker.record_failure()
        # Don't retry on permanent errors
        if "invalid_api_key" in str(e):
            raise  # Fail permanently
        raise self.retry(exc=e)

The jitter is crucial. Without it:

t=0:  100 requests fail
t=1:  100 requests retry → all fail
t=2:  100 requests retry → all fail

With jitter (±random offset):

t=0:    100 requests fail
t=0.8:  10 requests retry → some succeed
t=1.1:  15 requests retry → some succeed
t=1.3:  12 requests retry → some succeed
...spread out, API recovers

Pattern 5: Cost Tracking and Budgets

Every API call gets logged:

class CostTracker:
    COSTS = {
        'scraping': 0.01,
        'openai_gpt4': 0.00003,
        'openai_gpt35': 0.000002,
        'claude_sonnet': 0.00001,
    }
 
    async def track(self, service: str, units: int, metadata: dict = None):
        cost = self.COSTS.get(service, 0) * units
 
        await self.redis.hincrby(
            f"costs:{today()}",
            service,
            int(cost * 10000)  # Store as cents × 100 for precision
        )
 
        # Check budget
        total_today = await self.get_today_total()
        if total_today >= self.daily_budget:
            raise BudgetExceeded()
 
        if total_today >= self.daily_budget * 0.8:
            await self.send_alert("80% of daily budget used")

Dashboard query:

async def get_cost_breakdown():
    costs = await redis.hgetall(f"costs:{today()}")
    return {
        service: int(cost) / 10000
        for service, cost in costs.items()
    }
 
# Returns: {'scraping': 4.50, 'openai_gpt4': 12.30, 'claude_sonnet': 2.10}

The Full Request Flow

Putting it all together:

User clicks "Generate Emails"


┌─────────────────────┐
│    Coordinator      │
│  Groups leads by    │
│  business name      │
└──────────┬──────────┘


┌─────────────────────┐
│  Check business     │◄──── Cache hit? Skip scraping
│  cache (Redis)      │
└──────────┬──────────┘
           │ Cache miss

┌─────────────────────┐
│  Circuit breaker    │◄──── Open? Fail fast
│  check              │
└──────────┬──────────┘
           │ Closed

┌─────────────────────┐
│  Acquire semaphore  │◄──── Full? Wait in queue
│  slot (priority)    │
└──────────┬──────────┘
           │ Acquired

┌─────────────────────┐
│  Rate limit check   │◄──── Over limit? Backoff
│  (per-minute)       │
└──────────┬──────────┘
OK

┌─────────────────────┐
│  Make API request   │
│  Track cost         │
└──────────┬──────────┘

     ┌─────┴─────┐
     ▼           ▼
  Success     Failure
     │           │
     ▼           ▼
  Release    Record failure
  semaphore  Check circuit
  Cache it   Retry w/ backoff

Results

BEFORE vs AFTER:
 
    Metric                    Before      After
    ─────────────────────────────────────────────
    Failed requests           ~40%        <3%
    Cost per 1,000 leads      ~$12        ~$2
    Preview response time     5-30s       <3s
    API bans/rate limits      Weekly      None in 3 months
    Lost tasks on crash       Common      Zero

Key Takeaways

WHAT I LEARNED:
 
Semaphores need to      Python's asyncio.Semaphore is per-process.
be shared               Redis-based semaphores work across workers.
 
Lua scripts for         Check-then-act in Python has race conditions.
atomicity               Lua in Redis is atomic.
 
Circuit breakers        When an API is down, stop paying to
save money              learn that fact.
 
Priority queues need    Without them, high-priority tasks
reserved slots          starve during load spikes.
 
Jitter prevents         Random backoff spreads retry load.
thundering herd
 
Redis for speed,        Different tools, different jobs.
RabbitMQ for reliability



The real insight: request control isn't about slowing down. It's about making sure the important requests get through first, the failing services don't waste money, and the system recovers gracefully when things go wrong.

Aamir Shahzad

Aamir Shahzad

Author

Software Engineer with 7+ years of experience building scalable data systems. Specializing in Django, Python, and applied AI.