The platform had three services that cost money per request:
- Web scraping API — ~$0.01 per search
- LLM API (OpenAI/Claude) — $0.01-0.06 per 1K tokens
- Embedding API — $0.0001 per 1K tokens
First version: fire requests as fast as possible.
# What I wrote first (don't do this)
tasks = [process_lead(lead) for lead in leads]
results = await asyncio.gather(*tasks) # 5000 parallel requestsWhat happened:
- Hit rate limits on all three services simultaneously
- Retries created a retry storm
- $47 bill from one afternoon of "testing"
- Half the work failed anyway
I needed proper request control. Not just "slow down" — actual orchestration.
Architecture: Why I Needed Both Redis AND RabbitMQ
This took me a while to understand. Everyone says "just use Redis" or "just use RabbitMQ." I needed both, for different things.
┌─────────────────────────────────────────────────────────┐
│ Coordinator │
└─────────────────┬───────────────────────────────────────┘
│
┌─────────────┼─────────────┐
▼ ▼ ▼
┌────────┐ ┌──────────┐ ┌──────────┐
│ Redis │ │ RabbitMQ │ │ Postgres │
└────────┘ └──────────┘ └──────────┘
│ │
│ └──→ Celery Workers
│ - Scraping (2 workers)
│ - LLM Batch (4 workers)
│ - LLM Preview (2 workers)
│
└──→ Semaphores, Circuit Breakers, Rate Limits, Pub/SubRedis for: Real-time coordination. Semaphores, rate limits, circuit breakers, caching, pub/sub for progress updates.
RabbitMQ for: Reliable task queuing. Acknowledgments, retries, dead letter queues, priority routing.
Why not Redis for queuing? I tried. Lost tasks when workers crashed. No built-in acknowledgment. Had to build retry logic myself. Switched to RabbitMQ and those problems disappeared.
Why not RabbitMQ for rate limiting? Too slow for real-time checks. I need sub-millisecond "can I make this request?" decisions. Redis handles that.
Pattern 1: Redis Semaphore with Lua Scripts
Problem: I have 20 API slots total. Preview requests (user waiting) should get priority over bulk batch jobs.
First attempt — Python semaphore:
semaphore = asyncio.Semaphore(20)Problems:
- Not shared across workers
- No priority support
- Lost state on restart
Solution: Redis-based semaphore with atomic Lua operations
class RedisSemaphore:
def __init__(self, redis, name: str, max_concurrent: int = 20):
self.redis = redis
self.name = name
self.max_concurrent = max_concurrent
# Lua script for atomic acquire
self.acquire_script = """
local current = tonumber(redis.call('GET', KEYS[1]) or '0')
local max_slots = tonumber(ARGV[1])
local priority = tonumber(ARGV[2])
local reserved_high_priority = tonumber(ARGV[3])
-- High priority (>=8) gets reserved slots
local available = max_slots - current
if priority >= 8 then
available = available + reserved_high_priority
end
if available > 0 then
redis.call('INCR', KEYS[1])
redis.call('EXPIRE', KEYS[1], 300)
return 1
end
return 0
"""
async def acquire(self, priority: int = 5, timeout: float = 30.0):
"""
priority: 1-10 (10 = highest, preview requests)
Reserved slots: priority >= 8 gets 5 reserved slots
"""
start = time.time()
while time.time() - start < timeout:
acquired = await self.redis.eval(
self.acquire_script,
1,
f"semaphore:{self.name}",
self.max_concurrent,
priority,
5 # Reserved high-priority slots
)
if acquired:
return True
# Back off based on priority (high priority retries faster)
await asyncio.sleep(0.1 if priority >= 8 else 0.5)
raise TimeoutError(f"Could not acquire semaphore within {timeout}s")
async def release(self):
await self.redis.decr(f"semaphore:{self.name}")Why Lua? Atomic operations. Check-and-increment happens in one Redis call. No race conditions between workers.
Why reserved slots? Without them, a flood of batch jobs would starve preview requests. Users would wait 30+ seconds for a preview because 20 batch jobs got there first.
Configuration I landed on:
- Max concurrent: 20
- Reserved for preview (priority ≥ 8): 5 slots
- Batch jobs (priority 5): compete for remaining 15
Pattern 2: Circuit Breaker
Problem: Scraping API went down. The system kept hammering it. Every request failed. Retry logic made it worse — now I'm paying for retries that also fail.
Circuit breaker pattern: When failures exceed threshold, stop trying. Wait. Test occasionally. Resume when healthy.
CLOSED ──(failures > 5)──→ OPEN ──(30s timeout)──→ HALF_OPEN
▲ │ │
│ │ │
└────────(success)────────┴────────(failure)────────┘class RedisCircuitBreaker:
"""
States stored in Redis (shared across all workers):
- circuit:{name}:state = CLOSED|OPEN|HALF_OPEN
- circuit:{name}:failures = count
- circuit:{name}:last_failure = timestamp
"""
FAILURE_THRESHOLD = 5
RESET_TIMEOUT = 30 # seconds
async def can_proceed(self) -> bool:
state = await self.redis.get(
f"circuit:{self.name}:state"
) or "CLOSED"
if state == "CLOSED":
return True
if state == "OPEN":
last_failure = await self.redis.get(
f"circuit:{self.name}:last_failure"
)
if time.time() - float(last_failure) > self.RESET_TIMEOUT:
# Try half-open
await self.redis.set(
f"circuit:{self.name}:state",
"HALF_OPEN"
)
return True
return False # Still open, reject immediately
if state == "HALF_OPEN":
return True # Allow one request to test
return False
async def record_success(self):
await self.redis.set(f"circuit:{self.name}:state", "CLOSED")
await self.redis.set(f"circuit:{self.name}:failures", 0)
async def record_failure(self):
failures = await self.redis.incr(f"circuit:{self.name}:failures")
await self.redis.set(
f"circuit:{self.name}:last_failure",
time.time()
)
if failures >= self.FAILURE_THRESHOLD:
await self.redis.set(f"circuit:{self.name}:state", "OPEN")Real scenario this saved me:
Scraping API had a 10-minute outage. Without circuit breaker: 2,000 failed requests × $0.01 = $20 wasted, plus retry storm.
With circuit breaker: 5 failed requests, circuit opens, waits 30 seconds, tests, still failing, stays open. Total waste: ~$0.10.
Pattern 3: Celery Queue Priorities
Not all tasks are equal. User clicks "Preview" → needs response in 2 seconds. Background batch processing 5,000 leads → can take an hour.
Queue configuration:
# celery_config.py
task_queues = {
'scraping': {
'exchange': 'scraping',
'exchange_type': 'direct',
'routing_key': 'scraping',
},
'llm_preview': {
'exchange': 'llm',
'exchange_type': 'direct',
'routing_key': 'llm.preview',
'queue_arguments': {'x-max-priority': 10}
},
'llm_batch': {
'exchange': 'llm',
'exchange_type': 'direct',
'routing_key': 'llm.batch',
'queue_arguments': {'x-max-priority': 10}
},
}
task_routes = {
'tasks.generate_preview': {'queue': 'llm_preview'},
'tasks.generate_batch': {'queue': 'llm_batch'},
'tasks.scrape_business': {'queue': 'scraping'},
}
# Rate limits per task type
task_annotations = {
'tasks.scrape_business': {'rate_limit': '45/m'},
'tasks.generate_batch': {'rate_limit': '50/m'},
'tasks.generate_preview': {'rate_limit': '100/m'},
}Worker configuration:
# docker-compose.yml
scraper_worker:
command: celery -A config worker -Q scraping --concurrency=2
# Only 2 concurrent - scraping is slow and expensive
llm_worker:
command: celery -A config worker -Q llm_preview,llm_batch --concurrency=4
# 4 concurrent - LLM calls are the bottleneckWhy separate queues instead of one queue with priorities?
Tried that first. Problem: One worker type handles everything. If I want 2 scraping workers and 4 LLM workers, I can't configure that with a single queue. Separate queues = separate scaling.
Pattern 4: Retry with Exponential Backoff + Jitter
Naive retry: Wait 5 seconds, try again.
Problem: 100 requests fail at the same time. All retry at exactly 5 seconds. All fail again. Retry storm.
@shared_task(
bind=True,
max_retries=3,
autoretry_for=(ConnectionError, TimeoutError, RateLimitError),
retry_backoff=True, # Exponential: 1s, 2s, 4s, 8s...
retry_backoff_max=600, # Cap at 10 minutes
retry_jitter=True, # Random offset to prevent thundering herd
)
def scrape_business(self, business_name: str, search_type: str = 'full'):
try:
# Check circuit breaker first
if not circuit_breaker.can_proceed():
raise self.retry(countdown=30) # Circuit open, wait
# Acquire semaphore slot
priority = 9 if search_type == 'preview' else 5
with semaphore.acquire(priority=priority):
result = scraping_api.search(business_name)
circuit_breaker.record_success()
return result
except RateLimitError as e:
circuit_breaker.record_failure()
raise # Let autoretry handle it
except Exception as e:
circuit_breaker.record_failure()
# Don't retry on permanent errors
if "invalid_api_key" in str(e):
raise # Fail permanently
raise self.retry(exc=e)The jitter is crucial. Without it:
t=0: 100 requests fail
t=1: 100 requests retry → all fail
t=2: 100 requests retry → all failWith jitter (±random offset):
t=0: 100 requests fail
t=0.8: 10 requests retry → some succeed
t=1.1: 15 requests retry → some succeed
t=1.3: 12 requests retry → some succeed
...spread out, API recoversPattern 5: Cost Tracking and Budgets
Every API call gets logged:
class CostTracker:
COSTS = {
'scraping': 0.01,
'openai_gpt4': 0.00003,
'openai_gpt35': 0.000002,
'claude_sonnet': 0.00001,
}
async def track(self, service: str, units: int, metadata: dict = None):
cost = self.COSTS.get(service, 0) * units
await self.redis.hincrby(
f"costs:{today()}",
service,
int(cost * 10000) # Store as cents × 100 for precision
)
# Check budget
total_today = await self.get_today_total()
if total_today >= self.daily_budget:
raise BudgetExceeded()
if total_today >= self.daily_budget * 0.8:
await self.send_alert("80% of daily budget used")Dashboard query:
async def get_cost_breakdown():
costs = await redis.hgetall(f"costs:{today()}")
return {
service: int(cost) / 10000
for service, cost in costs.items()
}
# Returns: {'scraping': 4.50, 'openai_gpt4': 12.30, 'claude_sonnet': 2.10}The Full Request Flow
Putting it all together:
User clicks "Generate Emails"
│
▼
┌─────────────────────┐
│ Coordinator │
│ Groups leads by │
│ business name │
└──────────┬──────────┘
│
▼
┌─────────────────────┐
│ Check business │◄──── Cache hit? Skip scraping
│ cache (Redis) │
└──────────┬──────────┘
│ Cache miss
▼
┌─────────────────────┐
│ Circuit breaker │◄──── Open? Fail fast
│ check │
└──────────┬──────────┘
│ Closed
▼
┌─────────────────────┐
│ Acquire semaphore │◄──── Full? Wait in queue
│ slot (priority) │
└──────────┬──────────┘
│ Acquired
▼
┌─────────────────────┐
│ Rate limit check │◄──── Over limit? Backoff
│ (per-minute) │
└──────────┬──────────┘
│ OK
▼
┌─────────────────────┐
│ Make API request │
│ Track cost │
└──────────┬──────────┘
│
┌─────┴─────┐
▼ ▼
Success Failure
│ │
▼ ▼
Release Record failure
semaphore Check circuit
Cache it Retry w/ backoffResults
BEFORE vs AFTER:
Metric Before After
─────────────────────────────────────────────
Failed requests ~40% <3%
Cost per 1,000 leads ~$12 ~$2
Preview response time 5-30s <3s
API bans/rate limits Weekly None in 3 months
Lost tasks on crash Common ZeroKey Takeaways
WHAT I LEARNED:
Semaphores need to Python's asyncio.Semaphore is per-process.
be shared Redis-based semaphores work across workers.
Lua scripts for Check-then-act in Python has race conditions.
atomicity Lua in Redis is atomic.
Circuit breakers When an API is down, stop paying to
save money learn that fact.
Priority queues need Without them, high-priority tasks
reserved slots starve during load spikes.
Jitter prevents Random backoff spreads retry load.
thundering herd
Redis for speed, Different tools, different jobs.
RabbitMQ for reliabilityRelated Reading
- Which LLM For Which Task - Model selection based on task requirements
The real insight: request control isn't about slowing down. It's about making sure the important requests get through first, the failing services don't waste money, and the system recovers gracefully when things go wrong.
