Architecture Blueprint: Queue-Based Generation That Doesn’t Time Out (Retries, Idempotency, Rate Limits)
Processing AI storybook generation in synchronous HTTP requests guarantees timeout failures and terrible user experience. A robust queue-based architecture combines asynchronous task processing, retry logic with exponential backoff, idempotency guarantees, and rate limit coordination to deliver reliable multi-minute generation workflows. This is essential whether you’re building a SaaS platform, scaling beyond MVP, or handling production traffic where 30-45 second generation times exceed every web server timeout.
Key takeaways
A robust queue architecture blends message queuing, worker processes, state management, and failure recovery.
Queue-based processing enables production reliability; synchronous requests cause timeout failures at scale.
Multi-layer retry systems ensure transient failures recover automatically, rate limit backoff prevents API blocking, and idempotency prevents duplicate charges.
Task visibility, dead letter queues, and monitoring matter as much as the queue technology itself.
Musketeers Tech helps design production-grade async generation pipelines that handle retries, coordinate rate limits, and guarantee exactly-once processing, delivering storybooks users trust without timeout errors.
Why synchronous AI generation fails in production
Request-response thinking is the problem. Most AI storybook prototypes generate books synchronously in HTTP request handlers. This works for demos but fails in production because image generation takes 8-15 seconds per image, full books require 5-12 minutes total, web servers timeout after 30-60 seconds, and users close browsers terminating incomplete work. Without async processing, 60-80% of generation attempts fail with timeout errors.
Pure synchronous generation is simple but, without queue-based decoupling, it produces systems that can’t deliver completed books at scale.
What queue-based generation architecture actually means
Queue-based AI generation combines multiple architectural patterns to handle long-running async workflows:
Task queuing from message broker systems (Redis, RabbitMQ, SQS) decoupling API requests from worker execution.
Retry logic based on exponential backoff, jitter, and failure classification distinguishing transient from permanent errors.
Idempotency guarantees ensuring duplicate requests don’t charge users twice or regenerate identical content.
These are integrated into a production pipeline so generation workflows complete reliably despite API failures, rate limits, and user disconnections.
Core components of queue-based generation systems
1. Message queue and task broker
Receives generation requests from API and stores them durably.
Routes tasks to available worker processes based on priority and capacity.
Provides visibility into queue depth, processing rates, and failure counts.
2. Worker pool and execution engine
Spawns multiple worker processes handling generation concurrently.
Executes long-running AI API calls without blocking HTTP request threads.
Scales horizontally adding workers as queue depth increases.
3. State management and result storage
Tracks generation progress through multi-step workflows (planning → text → images → assembly).
Stores partial results enabling resume-from-checkpoint on failures.
Provides status API for frontend polling or webhook notifications.
How queue architecture improves production reliability
1. Better timeout handling and completion rates together
Async processing ensures generation continues regardless of user connection status.
Checkpoint storage refines recovery by resuming from last successful step.
Combined, they increase completion rates from 40-60% (synchronous) to 95-99% (queue-based).
2. Handling API failures and rate limits efficiently
Transient failures shine on retry logic with exponential backoff automatically recovering.
Rate limit errors shine on coordinated backoff preventing worker pile-on amplifying problems.
Flexible failure classification lets your system distinguish temporary from permanent errors.
3. More robust cost control and user experience
Idempotency prevents duplicate API calls when users refresh or retry requests.
Priority queues surface urgent requests (paid users) ahead of background tasks.
This reduces wasted API costs by 20-40% and improves perceived latency for priority traffic.
Designing a queue-based generation architecture
1. Five-layer architecture stack
Maintain API layer, message queue, worker pool, state store, and result storage as distinct components.
Use standard protocols (HTTP, AMQP, Redis protocol) enabling component swapping.
Design for horizontal scaling at worker layer without API or queue changes.
2. Task workflow and state machine
Define generation states: queued → processing → text_complete → images_processing → complete → failed.
Specify valid transitions and rollback procedures for each failure point.
Implement checkpointing storing intermediate results at each state transition.
3. Failure classification and retry policies
Categorize errors: transient (network, rate limit), permanent (invalid input, moderation failure), retriable (API timeout).
Apply retry logic only to transient and retriable categories with exponential backoff.
Route permanent failures to dead letter queue for investigation without retry loops.
Message queue selection and implementation
If you are building production systems, queue technology choice affects reliability, scaling, and operational complexity.
Comparing queue technologies
Different queue systems offer varying trade-offs on durability, throughput, and operations.
Redis (with Celery or Bull)
- Throughput: 100k+ tasks/sec
- Durability: In-memory (optional persistence)
- Complexity: Low (single binary, simple ops)
- Cost: Low ($15-50/month managed)
- Best for: Moderate scale, low latency, simple setup
RabbitMQ (with Celery or Pika)
- Throughput: 10k-50k messages/sec
- Durability: Disk-backed guaranteed delivery
- Complexity: Medium (clustering, management)
- Cost: Medium ($30-100/month managed)
- Best for: High reliability, complex routing
AWS SQS (with Celery-SQS or native SDK)
- Throughput: Unlimited (managed service)
- Durability: Distributed, redundant storage
- Complexity: Low (fully managed)
- Cost: Usage-based ($0.40-0.80 per million requests)
- Best for: AWS-native, serverless, variable load
Google Cloud Tasks
- Throughput: 500 tasks/sec per queue
- Durability: Managed, distributed
- Complexity: Low (fully managed)
- Cost: $0.40 per million operations
- Best for: GCP-native, HTTP-based workers
Technology selection matrix:
QUEUE_COMPARISON = {
'redis_celery': {
'setup_time': '1-2 hours',
'monthly_cost': 20, # Managed Redis
'max_throughput': 100000,
'durability': 'medium',
'operational_burden': 'low',
'best_for': 'Fast iteration, moderate scale'
},
'rabbitmq_celery': {
'setup_time': '3-5 hours',
'monthly_cost': 50, # Managed RabbitMQ
'max_throughput': 30000,
'durability': 'high',
'operational_burden': 'medium',
'best_for': 'High reliability requirements'
},
'sqs_celery': {
'setup_time': '2-3 hours',
'monthly_cost': 15, # At 1M tasks/month
'max_throughput': 'unlimited',
'durability': 'very_high',
'operational_burden': 'very_low',
'best_for': 'AWS ecosystem, variable load'
}
}
def recommend_queue(monthly_tasks, reliability_tier, team_size):
"""Recommend queue technology based on requirements."""
if monthly_tasks < 100000: # Low volume
return 'redis_celery' # Simplest setup
if reliability_tier == 'critical' and team_size >= 3:
return 'rabbitmq_celery' # Can handle ops burden
if monthly_tasks > 500000: # High volume
return 'sqs_celery' # Managed scaling
return 'redis_celery' # Default for most use cases
Risk factor: MEDIUM. Wrong queue choice causes operational burden or reliability issues at scale.
Implementing Redis + Celery (recommended starting point)
Celery with Redis broker provides best balance of simplicity and capability for most AI generation workloads.
Installation and configuration:
# requirements.txt
celery==5.3.4
redis==5.0.1
flower==2.0.1 # Monitoring dashboard
# celery_config.py
from celery import Celery
# Initialize Celery with Redis broker
celery_app = Celery(
'storybook_generator',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1' # Result storage
)
# Configuration
celery_app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
# Task routing
task_routes={
'tasks.generate_storybook': {'queue': 'generation'},
'tasks.generate_image': {'queue': 'images'},
'tasks.generate_text': {'queue': 'text'}
},
# Retry configuration
task_acks_late=True, # Acknowledge after completion
task_reject_on_worker_lost=True, # Requeue if worker dies
# Rate limiting
task_annotations={
'tasks.generate_image': {'rate_limit': '10/m'}, # 10 per minute
'tasks.generate_text': {'rate_limit': '60/m'}
},
# Result expiration
result_expires=3600, # 1 hour
# Worker configuration
worker_prefetch_multiplier=1, # One task at a time (for long tasks)
worker_max_tasks_per_child=50 # Restart worker after 50 tasks
)
Task definition with retry logic:
from celery import Task
from celery.exceptions import Retry
import time
import random
class BaseGenerationTask(Task):
"""Base task class with retry logic."""
autoretry_for = ( # Auto-retry on these exceptions
ConnectionError,
TimeoutError
)
retry_kwargs = {
'max_retries': 5,
'countdown': 5 # Initial delay in seconds
}
retry_backoff = True # Exponential backoff
retry_backoff_max = 600 # Max 10 minutes
retry_jitter = True # Add randomness to prevent thundering herd
@celery_app.task(base=BaseGenerationTask, bind=True)
def generate_storybook(self, book_id, user_id, params):
"""
Main task orchestrating full book generation.
Args:
self: Task instance (for retry access)
book_id: Unique book identifier
user_id: User requesting generation
params: Generation parameters
"""
try:
# 1. Update status
update_book_status(book_id, 'processing')
# 2. Generate story structure
story = generate_story_structure.apply_async(
args=[book_id, params],
queue='text'
).get(timeout=120)
# 3. Generate page text in parallel
text_tasks = [
generate_page_text.apply_async(
args=[book_id, page_num, story],
queue='text'
)
for page_num in range(1, params['page_count'] + 1)
]
page_texts = [task.get(timeout=60) for task in text_tasks]
# 4. Generate images in parallel (with rate limiting)
image_tasks = [
generate_image.apply_async(
args=[book_id, page_num, page_text],
queue='images'
)
for page_num, page_text in enumerate(page_texts, 1)
]
# Wait for images with progress tracking
images = []
for i, task in enumerate(image_tasks):
try:
image_url = task.get(timeout=300) # 5 min per image
images.append(image_url)
update_progress(book_id, (i + 1) / len(image_tasks))
except Exception as e:
# Image failed, mark and continue
images.append(None)
log_error(book_id, f"Image {i+1} failed: {e}")
# 5. Assemble final PDF
pdf_url = assemble_pdf.apply_async(
args=[book_id, page_texts, images],
queue='generation'
).get(timeout=180)
# 6. Mark complete
update_book_status(book_id, 'complete', pdf_url=pdf_url)
return {'book_id': book_id, 'pdf_url': pdf_url}
except Exception as exc:
# Retry logic
if self.request.retries < self.max_retries:
# Exponential backoff: 5s, 10s, 20s, 40s, 80s
countdown = min(
5 * (2 ** self.request.retries) + random.randint(0, 5),
600 # Max 10 minutes
)
raise self.retry(exc=exc, countdown=countdown)
else:
# Max retries exceeded, mark as failed
update_book_status(book_id, 'failed', error=str(exc))
raise
Worker process startup:
# Start Celery worker with concurrency
celery -A celery_config worker \
--loglevel=info \
--concurrency=4 \
--queues=generation,images,text \
--max-tasks-per-child=50
# Start Flower monitoring dashboard
celery -A celery_config flower --port=5555
Total implementation time: 8-12 hours for basic setup including retry logic and monitoring.
Risk factor: LOW. Celery + Redis is battle-tested with extensive documentation.
Retry logic and exponential backoff
If you need reliable generation, retry logic is absolutely essential for handling transient API failures.
Classifying failures for retry decisions
Not all errors should trigger retries. Permanent failures waste API costs.
Retriable errors:
- Network timeouts (
requests.exceptions.Timeout) - Connection errors (
ConnectionError,ConnectionResetError) - API rate limit errors (HTTP 429)
- Temporary server errors (HTTP 502, 503, 504)
- Provider-specific transient errors (OpenAI RateLimitError, APIConnectionError)
Non-retriable errors:
- Invalid API keys (HTTP 401)
- Malformed requests (HTTP 400)
- Content moderation failures
- Insufficient credits
- Invalid parameters
Error classification:
from openai import OpenAI, APIError, RateLimitError, APIConnectionError
import requests
def classify_error(exception):
"""
Classify errors into retriable vs permanent.
Returns: 'retriable', 'permanent', or 'rate_limit'
"""
# OpenAI-specific errors
if isinstance(exception, RateLimitError):
return 'rate_limit' # Special handling
if isinstance(exception, APIConnectionError):
return 'retriable' # Network issues
if isinstance(exception, APIError):
# Check HTTP status code
if hasattr(exception, 'status_code'):
if exception.status_code in [429]:
return 'rate_limit'
if exception.status_code in [500, 502, 503, 504]:
return 'retriable'
if exception.status_code in [400, 401, 403]:
return 'permanent'
# Generic network errors
if isinstance(exception, (ConnectionError, TimeoutError,
requests.exceptions.Timeout)):
return 'retriable'
# Unknown errors - treat as permanent to avoid infinite retry
return 'permanent'
# Usage in task
@celery_app.task(bind=True)
def generate_image_with_classification(self, book_id, page_num, prompt):
"""Generate image with smart retry logic."""
try:
image_url = call_dalle_api(prompt)
return image_url
except Exception as exc:
error_type = classify_error(exc)
if error_type == 'permanent':
# Don't retry, fail immediately
raise
elif error_type == 'rate_limit':
# Longer backoff for rate limits
countdown = 60 * (2 ** self.request.retries) # 1m, 2m, 4m...
raise self.retry(exc=exc, countdown=countdown)
elif error_type == 'retriable':
# Standard exponential backoff
countdown = 5 * (2 ** self.request.retries) # 5s, 10s, 20s...
raise self.retry(exc=exc, countdown=countdown, max_retries=5)
Implementing exponential backoff with jitter
Exponential backoff prevents overwhelming APIs during recovery periods.
Jitter prevents thundering herd when multiple workers retry simultaneously.
Backoff calculation:
import random
import time
def calculate_backoff(
attempt: int,
base_delay: float = 1.0,
max_delay: float = 300.0,
jitter: bool = True
) -> float:
"""
Calculate exponential backoff delay with optional jitter.
Args:
attempt: Current retry attempt (0-indexed)
base_delay: Initial delay in seconds
max_delay: Maximum delay cap
jitter: Add randomness to prevent thundering herd
Returns:
Delay in seconds before next retry
"""
# Exponential component: base_delay * 2^attempt
delay = min(base_delay * (2 ** attempt), max_delay)
# Add jitter: random value between 50-100% of delay
if jitter:
delay = delay * random.uniform(0.5, 1.0)
return delay
# Example progression
for attempt in range(6):
delay = calculate_backoff(attempt, base_delay=2.0, max_delay=120.0)
print(f"Attempt {attempt}: wait {delay:.1f}s")
"""
Output with jitter:
Attempt 0: wait 1.2s (base: 2s)
Attempt 1: wait 3.1s (base: 4s)
Attempt 2: wait 5.8s (base: 8s)
Attempt 3: wait 13.2s (base: 16s)
Attempt 4: wait 28.7s (base: 32s)
Attempt 5: wait 67.9s (base: 64s, capped at 120s with jitter)
"""
Backoff implementation in tasks:
@celery_app.task(bind=True, max_retries=7)
def generate_with_backoff(self, resource_id, params):
"""Task with custom exponential backoff."""
try:
result = call_expensive_api(params)
return result
except Exception as exc:
error_type = classify_error(exc)
if error_type == 'permanent':
raise # Don't retry
# Calculate backoff
attempt = self.request.retries
if error_type == 'rate_limit':
# Longer backoff for rate limits
base_delay = 30.0
max_delay = 600.0
else:
# Standard backoff for transient errors
base_delay = 2.0
max_delay = 120.0
countdown = calculate_backoff(
attempt,
base_delay=base_delay,
max_delay=max_delay,
jitter=True
)
logger.info(
f"Retry attempt {attempt + 1}/{self.max_retries} "
f"after {countdown:.1f}s delay"
)
raise self.retry(exc=exc, countdown=countdown)
Total retry implementation time: 4-6 hours including error classification and testing.
Risk factor: MEDIUM. Improper backoff amplifies load during outages; test thoroughly.
Idempotency and duplicate prevention
If you are charging users or consuming API credits, idempotency is non-negotiable to prevent duplicate processing.
Understanding idempotency requirements
Idempotent operations produce same result regardless of how many times executed.
Critical for AI generation because:
- Users may click “Generate” multiple times
- Network failures cause automatic retries
- Browser refresh resubmits form data
- Each duplicate wastes $2-8 in API costs
Without idempotency, duplicate requests charge users multiple times for identical books.
Implementing idempotency keys
Use unique idempotency keys to detect and reject duplicate requests.
Idempotency key strategy:
import hashlib
import json
import uuid
from datetime import datetime, timedelta
class IdempotencyManager:
"""Manage idempotency keys and duplicate detection."""
def __init__(self, redis_client):
self.redis = redis_client
self.key_ttl = 86400 # 24 hours
def generate_key(self, user_id, request_params):
"""
Generate idempotency key from request parameters.
Args:
user_id: User making request
request_params: Dict of generation parameters
Returns:
Idempotency key string
"""
# Create deterministic hash from params
params_str = json.dumps(request_params, sort_keys=True)
content_hash = hashlib.sha256(params_str.encode()).hexdigest()[:16]
# Combine user + params hash
key = f"idempotency:{user_id}:{content_hash}"
return key
def check_duplicate(self, idempotency_key):
"""
Check if request with this key was already processed.
Returns:
(is_duplicate, existing_book_id)
"""
existing = self.redis.get(idempotency_key)
if existing:
return True, existing.decode('utf-8')
else:
return False, None
def register_request(self, idempotency_key, book_id):
"""
Register new request to prevent duplicates.
Args:
idempotency_key: Unique request identifier
book_id: Book ID being generated
"""
self.redis.setex(
idempotency_key,
self.key_ttl,
book_id.encode('utf-8')
)
def clear_key(self, idempotency_key):
"""Clear idempotency key (on generation failure)."""
self.redis.delete(idempotency_key)
# Usage in API endpoint
from flask import Flask, request, jsonify
from redis import Redis
app = Flask(__name__)
redis_client = Redis(host='localhost', port=6379, db=2)
idempotency = IdempotencyManager(redis_client)
@app.route('/api/generate', methods=['POST'])
def generate_book_api():
"""API endpoint with idempotency protection."""
user_id = request.user.id # From authentication
params = request.json
# Generate idempotency key
idem_key = idempotency.generate_key(user_id, params)
# Check for duplicate
is_duplicate, existing_book_id = idempotency.check_duplicate(idem_key)
if is_duplicate:
# Return existing book
return jsonify({
'status': 'success',
'book_id': existing_book_id,
'message': 'Using existing generation',
'duplicate': True
}), 200
# Generate new book ID
book_id = str(uuid.uuid4())
# Register idempotency key
idempotency.register_request(idem_key, book_id)
try:
# Queue generation task
task = generate_storybook.apply_async(
args=[book_id, user_id, params],
task_id=book_id # Use book_id as task_id for tracking
)
return jsonify({
'status': 'queued',
'book_id': book_id,
'task_id': task.id
}), 202 # Accepted
except Exception as e:
# Clear idempotency key on queue failure
idempotency.clear_key(idem_key)
raise
Handling task retries with idempotency
Tasks themselves must be idempotent to safely retry on failures.
Task-level idempotency:
@celery_app.task(bind=True, max_retries=5)
def generate_image_idempotent(self, book_id, page_num, prompt):
"""
Generate image with idempotency at task level.
Checks if image already generated before calling API.
"""
# Check if already generated
existing_url = get_existing_image(book_id, page_num)
if existing_url:
logger.info(f"Image {page_num} already exists, skipping generation")
return existing_url
# Generate new image
try:
image_url = call_dalle_api(prompt)
# Store result atomically
store_image_result(book_id, page_num, image_url)
return image_url
except Exception as exc:
# Retry logic (safe because we check existing above)
raise self.retry(exc=exc, countdown=calculate_backoff(self.request.retries))
def store_image_result(book_id, page_num, image_url):
"""
Store image result atomically to prevent race conditions.
"""
# Use Redis SET NX (set if not exists) for atomic operation
key = f"book:{book_id}:image:{page_num}"
success = redis_client.setnx(key, image_url)
if success:
redis_client.expire(key, 3600) # 1 hour expiration
return True
else:
# Another worker already stored result
return False
Total idempotency implementation time: 6-8 hours including key management and race condition handling.
Risk factor: HIGH. Missing idempotency causes duplicate charges; test edge cases thoroughly.
Rate limit coordination across workers
If you are running multiple workers, coordinated rate limiting prevents API blocking.
Understanding rate limit requirements
AI APIs enforce rate limits per account/API key:
- OpenAI DALL-E 3: 50 requests/minute (Tier 1)
- OpenAI GPT-4: 10,000 requests/minute, 150,000 tokens/minute
- Anthropic Claude: 1,000 requests/minute (Tier 1)
Multiple workers sharing same API key must coordinate to stay within limits.
Implementing distributed rate limiting
Use Redis-backed rate limiting to coordinate across worker processes.
Distributed rate limiter:
import time
from redis import Redis
class DistributedRateLimiter:
"""
Token bucket rate limiter using Redis.
Coordinates rate limiting across multiple worker processes.
"""
def __init__(self, redis_client, namespace='rate_limit'):
self.redis = redis_client
self.namespace = namespace
def acquire(self, key, max_tokens, refill_rate, refill_period=60):
"""
Acquire a token from the rate limiter.
Args:
key: Rate limit key (e.g., 'dalle3', 'gpt4')
max_tokens: Maximum tokens in bucket
refill_rate: Tokens added per refill_period
refill_period: Seconds between refills (default 60)
Returns:
(acquired, wait_time)
acquired: bool, whether token was acquired
wait_time: seconds to wait before retry if not acquired
"""
now = time.time()
bucket_key = f"{self.namespace}:{key}"
# Lua script for atomic token bucket operation
script = """
local bucket_key = KEYS[1]
local now = tonumber(ARGV[1])
local max_tokens = tonumber(ARGV[2])
local refill_rate = tonumber(ARGV[3])
local refill_period = tonumber(ARGV[4])
-- Get current bucket state
local bucket = redis.call('HMGET', bucket_key, 'tokens', 'last_refill')
local tokens = tonumber(bucket[1]) or max_tokens
local last_refill = tonumber(bucket[2]) or now
-- Calculate refill
local time_passed = now - last_refill
local refills = math.floor(time_passed / refill_period)
if refills > 0 then
tokens = math.min(max_tokens, tokens + (refills * refill_rate))
last_refill = last_refill + (refills * refill_period)
end
-- Try to acquire token
if tokens >= 1 then
tokens = tokens - 1
redis.call('HMSET', bucket_key, 'tokens', tokens, 'last_refill', last_refill)
redis.call('EXPIRE', bucket_key, refill_period * 2)
return {1, 0} -- Success, no wait
else
-- Calculate wait time until next token
local wait = refill_period - (now - last_refill)
return {0, wait} -- Failed, return wait time
end
"""
result = self.redis.eval(
script,
1, # Number of keys
bucket_key, # KEYS[1]
now, max_tokens, refill_rate, refill_period # ARGV
)
acquired = bool(result[0])
wait_time = float(result[1])
return acquired, wait_time
def wait_and_acquire(self, key, max_tokens, refill_rate, refill_period=60,
max_wait=300):
"""
Block until token can be acquired.
Args:
max_wait: Maximum seconds to wait before giving up
Returns:
True if acquired, False if timed out
"""
start_time = time.time()
while True:
acquired, wait_time = self.acquire(
key, max_tokens, refill_rate, refill_period
)
if acquired:
return True
# Check timeout
if time.time() - start_time > max_wait:
return False
# Wait before retry
sleep_time = min(wait_time, max_wait - (time.time() - start_time))
if sleep_time > 0:
time.sleep(sleep_time)
# Usage in task
redis_client = Redis(host='localhost', port=6379, db=3)
rate_limiter = DistributedRateLimiter(redis_client)
@celery_app.task(bind=True, max_retries=10)
def generate_image_rate_limited(self, book_id, page_num, prompt):
"""Generate image with distributed rate limiting."""
# Acquire rate limit token
# DALL-E 3 Tier 1: 50 requests/minute
acquired = rate_limiter.wait_and_acquire(
key='dalle3',
max_tokens=50,
refill_rate=50,
refill_period=60,
max_wait=180 # Wait up to 3 minutes
)
if not acquired:
# Rate limit wait exceeded, retry task
raise self.retry(countdown=60, exc=Exception("Rate limit timeout"))
# Token acquired, make API call
try:
image_url = call_dalle_api(prompt)
return image_url
except RateLimitError as e:
# Hit rate limit despite our limiting (API limit changed?)
# Back off significantly
raise self.retry(exc=e, countdown=120)
Rate limiting for multiple API keys:
class APIKeyPool:
"""
Manage multiple API keys with load balancing.
Distributes requests across keys to maximize throughput.
"""
def __init__(self, api_keys, redis_client):
self.api_keys = api_keys
self.redis = redis_client
self.rate_limiter = DistributedRateLimiter(redis_client)
def get_available_key(self, service='dalle3', max_wait=60):
"""
Get an available API key respecting rate limits.
Args:
service: Service identifier (dalle3, gpt4, etc.)
max_wait: Max seconds to wait for available key
Returns:
API key string or None if all keys rate limited
"""
start_time = time.time()
# Try each key in round-robin
for key_index in range(len(self.api_keys)):
# Calculate which key to try
current_key_idx = (int(time.time()) + key_index) % len(self.api_keys)
key = self.api_keys[current_key_idx]
# Check rate limit for this specific key
rate_key = f"{service}:key_{current_key_idx}"
acquired, wait_time = self.rate_limiter.acquire(
key=rate_key,
max_tokens=50, # DALL-E 3 limit
refill_rate=50,
refill_period=60
)
if acquired:
return key
# Check timeout
if time.time() - start_time > max_wait:
break
return None # All keys rate limited
# Usage
import os
api_pool = APIKeyPool(
api_keys=[
os.getenv('OPENAI_KEY_1'),
os.getenv('OPENAI_KEY_2'),
os.getenv('OPENAI_KEY_3')
],
redis_client=redis_client
)
@celery_app.task
def generate_with_key_pool(book_id, page_num, prompt):
"""Generate using API key pool."""
api_key = api_pool.get_available_key('dalle3', max_wait=120)
if not api_key:
raise Exception("All API keys rate limited")
# Use selected key
from openai import OpenAI
client = OpenAI(api_key=api_key)
response = client.images.generate(model="dall-e-3", prompt=prompt)
return response.data[0].url
Total rate limiting implementation time: 8-12 hours including Lua scripts and key pooling.
Risk factor: HIGH. Rate limit violations can cause temporary API blocks; thoroughly test coordination.
State tracking and progress monitoring
If you want users to see generation progress, robust state management is essential.
Database schema for task state
Track generation state and progress in database for reliable status queries.
State schema:
-- PostgreSQL schema
CREATE TABLE book_generations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id),
status VARCHAR(50) NOT NULL DEFAULT 'queued',
-- States: queued, processing, text_complete, images_processing,
-- assembling, complete, failed
-- Progress tracking
progress_percent DECIMAL(5,2) DEFAULT 0.00,
current_step VARCHAR(100),
steps_completed INTEGER DEFAULT 0,
total_steps INTEGER,
-- Generation parameters
page_count INTEGER NOT NULL,
theme VARCHAR(200),
age_group VARCHAR(50),
-- Results
pdf_url TEXT,
error_message TEXT,
-- Metadata
task_id UUID,
started_at TIMESTAMP,
completed_at TIMESTAMP,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
-- Index for status queries
CREATE INDEX idx_book_gen_user_status ON book_generations(user_id, status);
CREATE INDEX idx_book_gen_task_id ON book_generations(task_id);
-- Page-level tracking
CREATE TABLE book_generation_pages (
id SERIAL PRIMARY KEY,
book_id UUID NOT NULL REFERENCES book_generations(id) ON DELETE CASCADE,
page_num INTEGER NOT NULL,
-- Text generation
text_status VARCHAR(50) DEFAULT 'pending',
page_text TEXT,
text_generated_at TIMESTAMP,
-- Image generation
image_status VARCHAR(50) DEFAULT 'pending',
image_url TEXT,
image_prompt TEXT,
image_generated_at TIMESTAMP,
retry_count INTEGER DEFAULT 0,
UNIQUE(book_id, page_num)
);
State update functions:
from sqlalchemy import create_engine, Column, String, Integer, Decimal, DateTime, Text
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
import uuid
Base = declarative_base()
class BookGeneration(Base):
__tablename__ = 'book_generations'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
user_id = Column(UUID(as_uuid=True), nullable=False)
status = Column(String(50), default='queued')
progress_percent = Column(Decimal(5, 2), default=0.00)
current_step = Column(String(100))
steps_completed = Column(Integer, default=0)
total_steps = Column(Integer)
page_count = Column(Integer, nullable=False)
pdf_url = Column(Text)
error_message = Column(Text)
task_id = Column(UUID(as_uuid=True))
started_at = Column(DateTime)
completed_at = Column(DateTime)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def update_book_status(book_id, status, **kwargs):
"""Update book generation status and metadata."""
session = Session()
try:
book = session.query(BookGeneration).filter_by(id=book_id).first()
if not book:
raise ValueError(f"Book {book_id} not found")
# Update status
book.status = status
book.updated_at = datetime.utcnow()
# Update additional fields
for key, value in kwargs.items():
if hasattr(book, key):
setattr(book, key, value)
# Set timestamps
if status == 'processing' and not book.started_at:
book.started_at = datetime.utcnow()
elif status in ['complete', 'failed']:
book.completed_at = datetime.utcnow()
session.commit()
finally:
session.close()
def update_progress(book_id, progress_percent, current_step=None):
"""Update generation progress."""
session = Session()
try:
book = session.query(BookGeneration).filter_by(id=book_id).first()
if book:
book.progress_percent = min(100.00, progress_percent * 100)
book.updated_at = datetime.utcnow()
if current_step:
book.current_step = current_step
book.steps_completed = (book.steps_completed or 0) + 1
session.commit()
finally:
session.close()
Real-time progress via WebSocket
Push progress updates to frontend for live status tracking.
WebSocket implementation:
from flask import Flask
from flask_socketio import SocketIO, emit, join_room
app = Flask(__name__)
socketio = SocketIO(app, cors_allowed_origins="*")
@socketio.on('subscribe_book')
def handle_subscribe(data):
"""Subscribe client to book generation updates."""
book_id = data['book_id']
join_room(book_id)
# Send current status
book = get_book_status(book_id)
emit('book_status', book, room=book_id)
def broadcast_progress(book_id, progress_data):
"""Broadcast progress update to subscribed clients."""
socketio.emit('book_progress', progress_data, room=book_id)
# Call from Celery task
def update_progress_with_broadcast(book_id, progress_percent, current_step):
"""Update progress and broadcast to clients."""
# Update database
update_progress(book_id, progress_percent, current_step)
# Broadcast to WebSocket clients
broadcast_progress(book_id, {
'book_id': str(book_id),
'progress': progress_percent * 100,
'step': current_step
})
Frontend WebSocket client:
import io from 'socket.io-client';
class GenerationTracker {
constructor(bookId) {
this.bookId = bookId;
this.socket = io('http://localhost:5000');
// Subscribe to book updates
this.socket.emit('subscribe_book', { book_id: bookId });
// Listen for progress
this.socket.on('book_progress', (data) => {
this.handleProgress(data);
});
this.socket.on('book_status', (data) => {
this.handleStatus(data);
});
}
handleProgress(data) {
// Update UI with progress
console.log(`Progress: ${data.progress}% - ${data.step}`);
// Update progress bar
document.getElementById('progress-bar').style.width = `${data.progress}%`;
document.getElementById('current-step').textContent = data.step;
}
handleStatus(data) {
if (data.status === 'complete') {
// Show download button
window.location.href = `/books/${this.bookId}`;
} else if (data.status === 'failed') {
// Show error message
alert(`Generation failed: ${data.error_message}`);
}
}
disconnect() {
this.socket.disconnect();
}
}
// Usage
const tracker = new GenerationTracker('book-uuid-here');
Total state tracking implementation time: 10-14 hours including database schema, WebSocket setup, and frontend integration.
Risk factor: MEDIUM. State synchronization bugs can show incorrect progress; thorough testing needed.
Where Musketeers Tech fits into queue architecture
If you are starting from scratch
Help you move from synchronous prototypes to production-ready async systems with queue infrastructure, retry logic, and rate limit coordination.
Design task workflows, state machines, and checkpoint strategies matching your generation pipeline stages.
Implement monitoring dashboards, dead letter queues, and alerting catching failures before users notice.
If you already have basic queuing but experiencing failures
Diagnose timeout issues, identify where retry logic fails or rate limits get exceeded, and pinpoint idempotency gaps.
Add exponential backoff, distributed rate limiting, and error classification on top of existing queue without re-architecting.
Tune worker concurrency, queue routing, and resource allocation for cost-optimal throughput.
So what should you do next?
Audit current architecture: track synchronous request timeout rates, measure end-to-end generation completion percentage, identify failure points and bottlenecks.
Introduce basic queuing by setting up Redis + Celery, migrating one generation endpoint to async processing, implementing status polling API for frontend.
Pilot retry and idempotency with exponential backoff on transient errors, idempotency keys preventing duplicates, collect metrics on retry success rates before expanding to all endpoints.
Frequently Asked Questions (FAQs)
1. Should I use Redis, RabbitMQ, or AWS SQS for my queue?
For most AI generation workloads, start with Redis + Celery. It provides best balance of simplicity (single binary), performance (100k+ ops/sec), and cost ($15-20/month). Upgrade to RabbitMQ only if you need guaranteed delivery for critical workflows. Use SQS if you’re AWS-native and want zero operational burden.
2. How do I prevent users from submitting the same generation request multiple times?
Implement idempotency keys combining user ID + request parameters hash. Store keys in Redis for 24 hours. When duplicate detected, return existing book_id instead of queuing new task. This prevents duplicate API costs and ensures consistent results across retries.
3. What’s the right retry strategy for AI API calls?
Use error classification: permanent errors (invalid key, bad request) don’t retry. Transient errors (network, timeouts) retry with exponential backoff starting at 2-5 seconds. Rate limit errors need longer backoff (60+ seconds). Always add jitter to prevent thundering herd. Max retries: 5-7 attempts.
4. How many Celery workers should I run?
Start with worker count = 2× CPU cores for I/O-bound tasks (AI API calls). Monitor queue depth and task wait time. Scale workers horizontally when queue depth consistently exceeds 100 tasks or wait time exceeds 2 minutes. Each worker handles 1 long-running task at a time (set prefetch_multiplier=1).
5. How does Musketeers Tech help implement production-grade queue systems?
Musketeers Tech designs and implements robust async generation architectures, including Redis/RabbitMQ/SQS queue setup and configuration, Celery task workflows with retry logic and exponential backoff, distributed rate limiting preventing API blocks, idempotency systems preventing duplicate processing, monitoring dashboards and dead letter queues, and horizontal scaling strategies, so your AI storybook platform handles production traffic reliably without timeout failures or duplicate charges.
← Back