Architecture Blueprint: Queue-Based Generation That Doesn’t Time Out (Retries, Idempotency, Rate Limits)

Processing AI storybook generation in synchronous HTTP requests guarantees timeout failures and terrible user experience. A robust queue-based architecture combines asynchronous task processing, retry logic with exponential backoff, idempotency guarantees, and rate limit coordination to deliver reliable multi-minute generation workflows. This is essential whether you’re building a SaaS platform, scaling beyond MVP, or handling production traffic where 30-45 second generation times exceed every web server timeout.

Key takeaways

A robust queue architecture blends message queuing, worker processes, state management, and failure recovery.

Queue-based processing enables production reliability; synchronous requests cause timeout failures at scale.

Multi-layer retry systems ensure transient failures recover automatically, rate limit backoff prevents API blocking, and idempotency prevents duplicate charges.

Task visibility, dead letter queues, and monitoring matter as much as the queue technology itself.

Musketeers Tech helps design production-grade async generation pipelines that handle retries, coordinate rate limits, and guarantee exactly-once processing, delivering storybooks users trust without timeout errors.

Why synchronous AI generation fails in production

Request-response thinking is the problem. Most AI storybook prototypes generate books synchronously in HTTP request handlers. This works for demos but fails in production because image generation takes 8-15 seconds per image, full books require 5-12 minutes total, web servers timeout after 30-60 seconds, and users close browsers terminating incomplete work. Without async processing, 60-80% of generation attempts fail with timeout errors.

Pure synchronous generation is simple but, without queue-based decoupling, it produces systems that can’t deliver completed books at scale.

What queue-based generation architecture actually means

Queue-based AI generation combines multiple architectural patterns to handle long-running async workflows:

Task queuing from message broker systems (Redis, RabbitMQ, SQS) decoupling API requests from worker execution.

Retry logic based on exponential backoff, jitter, and failure classification distinguishing transient from permanent errors.

Idempotency guarantees ensuring duplicate requests don’t charge users twice or regenerate identical content.

These are integrated into a production pipeline so generation workflows complete reliably despite API failures, rate limits, and user disconnections.

Core components of queue-based generation systems

1. Message queue and task broker

Receives generation requests from API and stores them durably.

Routes tasks to available worker processes based on priority and capacity.

Provides visibility into queue depth, processing rates, and failure counts.

2. Worker pool and execution engine

Spawns multiple worker processes handling generation concurrently.

Executes long-running AI API calls without blocking HTTP request threads.

Scales horizontally adding workers as queue depth increases.

3. State management and result storage

Tracks generation progress through multi-step workflows (planning → text → images → assembly).

Stores partial results enabling resume-from-checkpoint on failures.

Provides status API for frontend polling or webhook notifications.

How queue architecture improves production reliability

1. Better timeout handling and completion rates together

Async processing ensures generation continues regardless of user connection status.

Checkpoint storage refines recovery by resuming from last successful step.

Combined, they increase completion rates from 40-60% (synchronous) to 95-99% (queue-based).

2. Handling API failures and rate limits efficiently

Transient failures shine on retry logic with exponential backoff automatically recovering.

Rate limit errors shine on coordinated backoff preventing worker pile-on amplifying problems.

Flexible failure classification lets your system distinguish temporary from permanent errors.

3. More robust cost control and user experience

Idempotency prevents duplicate API calls when users refresh or retry requests.

Priority queues surface urgent requests (paid users) ahead of background tasks.

This reduces wasted API costs by 20-40% and improves perceived latency for priority traffic.

Designing a queue-based generation architecture

1. Five-layer architecture stack

Maintain API layer, message queue, worker pool, state store, and result storage as distinct components.

Use standard protocols (HTTP, AMQP, Redis protocol) enabling component swapping.

Design for horizontal scaling at worker layer without API or queue changes.

2. Task workflow and state machine

Define generation states: queued → processing → text_complete → images_processing → complete → failed.

Specify valid transitions and rollback procedures for each failure point.

Implement checkpointing storing intermediate results at each state transition.

3. Failure classification and retry policies

Categorize errors: transient (network, rate limit), permanent (invalid input, moderation failure), retriable (API timeout).

Apply retry logic only to transient and retriable categories with exponential backoff.

Route permanent failures to dead letter queue for investigation without retry loops.

Message queue selection and implementation

If you are building production systems, queue technology choice affects reliability, scaling, and operational complexity.

Comparing queue technologies

Different queue systems offer varying trade-offs on durability, throughput, and operations.

Redis (with Celery or Bull)

RabbitMQ (with Celery or Pika)

AWS SQS (with Celery-SQS or native SDK)

Google Cloud Tasks

Technology selection matrix:

QUEUE_COMPARISON = {
    'redis_celery': {
        'setup_time': '1-2 hours',
        'monthly_cost': 20,  # Managed Redis
        'max_throughput': 100000,
        'durability': 'medium',
        'operational_burden': 'low',
        'best_for': 'Fast iteration, moderate scale'
    },
    'rabbitmq_celery': {
        'setup_time': '3-5 hours',
        'monthly_cost': 50,  # Managed RabbitMQ
        'max_throughput': 30000,
        'durability': 'high',
        'operational_burden': 'medium',
        'best_for': 'High reliability requirements'
    },
    'sqs_celery': {
        'setup_time': '2-3 hours',
        'monthly_cost': 15,  # At 1M tasks/month
        'max_throughput': 'unlimited',
        'durability': 'very_high',
        'operational_burden': 'very_low',
        'best_for': 'AWS ecosystem, variable load'
    }
}

def recommend_queue(monthly_tasks, reliability_tier, team_size):
    """Recommend queue technology based on requirements."""
    
    if monthly_tasks < 100000:  # Low volume
        return 'redis_celery'  # Simplest setup
    
    if reliability_tier == 'critical' and team_size >= 3:
        return 'rabbitmq_celery'  # Can handle ops burden
    
    if monthly_tasks > 500000:  # High volume
        return 'sqs_celery'  # Managed scaling
    
    return 'redis_celery'  # Default for most use cases

Risk factor: MEDIUM. Wrong queue choice causes operational burden or reliability issues at scale.

Celery with Redis broker provides best balance of simplicity and capability for most AI generation workloads.

Installation and configuration:

# requirements.txt
celery==5.3.4
redis==5.0.1
flower==2.0.1  # Monitoring dashboard

# celery_config.py
from celery import Celery

# Initialize Celery with Redis broker
celery_app = Celery(
    'storybook_generator',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1'  # Result storage
)

# Configuration
celery_app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
    
    # Task routing
    task_routes={
        'tasks.generate_storybook': {'queue': 'generation'},
        'tasks.generate_image': {'queue': 'images'},
        'tasks.generate_text': {'queue': 'text'}
    },
    
    # Retry configuration
    task_acks_late=True,  # Acknowledge after completion
    task_reject_on_worker_lost=True,  # Requeue if worker dies
    
    # Rate limiting
    task_annotations={
        'tasks.generate_image': {'rate_limit': '10/m'},  # 10 per minute
        'tasks.generate_text': {'rate_limit': '60/m'}
    },
    
    # Result expiration
    result_expires=3600,  # 1 hour
    
    # Worker configuration
    worker_prefetch_multiplier=1,  # One task at a time (for long tasks)
    worker_max_tasks_per_child=50  # Restart worker after 50 tasks
)

Task definition with retry logic:

from celery import Task
from celery.exceptions import Retry
import time
import random

class BaseGenerationTask(Task):
    """Base task class with retry logic."""
    
    autoretry_for = (  # Auto-retry on these exceptions
        ConnectionError,
        TimeoutError
    )
    retry_kwargs = {
        'max_retries': 5,
        'countdown': 5  # Initial delay in seconds
    }
    retry_backoff = True  # Exponential backoff
    retry_backoff_max = 600  # Max 10 minutes
    retry_jitter = True  # Add randomness to prevent thundering herd

@celery_app.task(base=BaseGenerationTask, bind=True)
def generate_storybook(self, book_id, user_id, params):
    """
    Main task orchestrating full book generation.
    
    Args:
        self: Task instance (for retry access)
        book_id: Unique book identifier
        user_id: User requesting generation
        params: Generation parameters
    """
    try:
        # 1. Update status
        update_book_status(book_id, 'processing')
        
        # 2. Generate story structure
        story = generate_story_structure.apply_async(
            args=[book_id, params],
            queue='text'
        ).get(timeout=120)
        
        # 3. Generate page text in parallel
        text_tasks = [
            generate_page_text.apply_async(
                args=[book_id, page_num, story],
                queue='text'
            )
            for page_num in range(1, params['page_count'] + 1)
        ]
        
        page_texts = [task.get(timeout=60) for task in text_tasks]
        
        # 4. Generate images in parallel (with rate limiting)
        image_tasks = [
            generate_image.apply_async(
                args=[book_id, page_num, page_text],
                queue='images'
            )
            for page_num, page_text in enumerate(page_texts, 1)
        ]
        
        # Wait for images with progress tracking
        images = []
        for i, task in enumerate(image_tasks):
            try:
                image_url = task.get(timeout=300)  # 5 min per image
                images.append(image_url)
                update_progress(book_id, (i + 1) / len(image_tasks))
            except Exception as e:
                # Image failed, mark and continue
                images.append(None)
                log_error(book_id, f"Image {i+1} failed: {e}")
        
        # 5. Assemble final PDF
        pdf_url = assemble_pdf.apply_async(
            args=[book_id, page_texts, images],
            queue='generation'
        ).get(timeout=180)
        
        # 6. Mark complete
        update_book_status(book_id, 'complete', pdf_url=pdf_url)
        
        return {'book_id': book_id, 'pdf_url': pdf_url}
        
    except Exception as exc:
        # Retry logic
        if self.request.retries < self.max_retries:
            # Exponential backoff: 5s, 10s, 20s, 40s, 80s
            countdown = min(
                5 * (2 ** self.request.retries) + random.randint(0, 5),
                600  # Max 10 minutes
            )
            raise self.retry(exc=exc, countdown=countdown)
        else:
            # Max retries exceeded, mark as failed
            update_book_status(book_id, 'failed', error=str(exc))
            raise

Worker process startup:

# Start Celery worker with concurrency
celery -A celery_config worker \
    --loglevel=info \
    --concurrency=4 \
    --queues=generation,images,text \
    --max-tasks-per-child=50

# Start Flower monitoring dashboard
celery -A celery_config flower --port=5555

Total implementation time: 8-12 hours for basic setup including retry logic and monitoring.

Risk factor: LOW. Celery + Redis is battle-tested with extensive documentation.

Retry logic and exponential backoff

If you need reliable generation, retry logic is absolutely essential for handling transient API failures.

Classifying failures for retry decisions

Not all errors should trigger retries. Permanent failures waste API costs.

Retriable errors:

Non-retriable errors:

Error classification:

from openai import OpenAI, APIError, RateLimitError, APIConnectionError
import requests

def classify_error(exception):
    """
    Classify errors into retriable vs permanent.
    
    Returns: 'retriable', 'permanent', or 'rate_limit'
    """
    
    # OpenAI-specific errors
    if isinstance(exception, RateLimitError):
        return 'rate_limit'  # Special handling
    
    if isinstance(exception, APIConnectionError):
        return 'retriable'  # Network issues
    
    if isinstance(exception, APIError):
        # Check HTTP status code
        if hasattr(exception, 'status_code'):
            if exception.status_code in [429]:
                return 'rate_limit'
            if exception.status_code in [500, 502, 503, 504]:
                return 'retriable'
            if exception.status_code in [400, 401, 403]:
                return 'permanent'
    
    # Generic network errors
    if isinstance(exception, (ConnectionError, TimeoutError, 
                             requests.exceptions.Timeout)):
        return 'retriable'
    
    # Unknown errors - treat as permanent to avoid infinite retry
    return 'permanent'

# Usage in task
@celery_app.task(bind=True)
def generate_image_with_classification(self, book_id, page_num, prompt):
    """Generate image with smart retry logic."""
    
    try:
        image_url = call_dalle_api(prompt)
        return image_url
        
    except Exception as exc:
        error_type = classify_error(exc)
        
        if error_type == 'permanent':
            # Don't retry, fail immediately
            raise
        
        elif error_type == 'rate_limit':
            # Longer backoff for rate limits
            countdown = 60 * (2 ** self.request.retries)  # 1m, 2m, 4m...
            raise self.retry(exc=exc, countdown=countdown)
        
        elif error_type == 'retriable':
            # Standard exponential backoff
            countdown = 5 * (2 ** self.request.retries)  # 5s, 10s, 20s...
            raise self.retry(exc=exc, countdown=countdown, max_retries=5)

Implementing exponential backoff with jitter

Exponential backoff prevents overwhelming APIs during recovery periods.

Jitter prevents thundering herd when multiple workers retry simultaneously.

Backoff calculation:

import random
import time

def calculate_backoff(
    attempt: int,
    base_delay: float = 1.0,
    max_delay: float = 300.0,
    jitter: bool = True
) -> float:
    """
    Calculate exponential backoff delay with optional jitter.
    
    Args:
        attempt: Current retry attempt (0-indexed)
        base_delay: Initial delay in seconds
        max_delay: Maximum delay cap
        jitter: Add randomness to prevent thundering herd
    
    Returns:
        Delay in seconds before next retry
    """
    # Exponential component: base_delay * 2^attempt
    delay = min(base_delay * (2 ** attempt), max_delay)
    
    # Add jitter: random value between 50-100% of delay
    if jitter:
        delay = delay * random.uniform(0.5, 1.0)
    
    return delay

# Example progression
for attempt in range(6):
    delay = calculate_backoff(attempt, base_delay=2.0, max_delay=120.0)
    print(f"Attempt {attempt}: wait {delay:.1f}s")

"""
Output with jitter:
Attempt 0: wait 1.2s   (base: 2s)
Attempt 1: wait 3.1s   (base: 4s)
Attempt 2: wait 5.8s   (base: 8s)
Attempt 3: wait 13.2s  (base: 16s)
Attempt 4: wait 28.7s  (base: 32s)
Attempt 5: wait 67.9s  (base: 64s, capped at 120s with jitter)
"""

Backoff implementation in tasks:

@celery_app.task(bind=True, max_retries=7)
def generate_with_backoff(self, resource_id, params):
    """Task with custom exponential backoff."""
    
    try:
        result = call_expensive_api(params)
        return result
        
    except Exception as exc:
        error_type = classify_error(exc)
        
        if error_type == 'permanent':
            raise  # Don't retry
        
        # Calculate backoff
        attempt = self.request.retries
        
        if error_type == 'rate_limit':
            # Longer backoff for rate limits
            base_delay = 30.0
            max_delay = 600.0
        else:
            # Standard backoff for transient errors
            base_delay = 2.0
            max_delay = 120.0
        
        countdown = calculate_backoff(
            attempt,
            base_delay=base_delay,
            max_delay=max_delay,
            jitter=True
        )
        
        logger.info(
            f"Retry attempt {attempt + 1}/{self.max_retries} "
            f"after {countdown:.1f}s delay"
        )
        
        raise self.retry(exc=exc, countdown=countdown)

Total retry implementation time: 4-6 hours including error classification and testing.

Risk factor: MEDIUM. Improper backoff amplifies load during outages; test thoroughly.

Idempotency and duplicate prevention

If you are charging users or consuming API credits, idempotency is non-negotiable to prevent duplicate processing.

Understanding idempotency requirements

Idempotent operations produce same result regardless of how many times executed.

Critical for AI generation because:

Without idempotency, duplicate requests charge users multiple times for identical books.

Implementing idempotency keys

Use unique idempotency keys to detect and reject duplicate requests.

Idempotency key strategy:

import hashlib
import json
import uuid
from datetime import datetime, timedelta

class IdempotencyManager:
    """Manage idempotency keys and duplicate detection."""
    
    def __init__(self, redis_client):
        self.redis = redis_client
        self.key_ttl = 86400  # 24 hours
    
    def generate_key(self, user_id, request_params):
        """
        Generate idempotency key from request parameters.
        
        Args:
            user_id: User making request
            request_params: Dict of generation parameters
        
        Returns:
            Idempotency key string
        """
        # Create deterministic hash from params
        params_str = json.dumps(request_params, sort_keys=True)
        content_hash = hashlib.sha256(params_str.encode()).hexdigest()[:16]
        
        # Combine user + params hash
        key = f"idempotency:{user_id}:{content_hash}"
        return key
    
    def check_duplicate(self, idempotency_key):
        """
        Check if request with this key was already processed.
        
        Returns:
            (is_duplicate, existing_book_id)
        """
        existing = self.redis.get(idempotency_key)
        
        if existing:
            return True, existing.decode('utf-8')
        else:
            return False, None
    
    def register_request(self, idempotency_key, book_id):
        """
        Register new request to prevent duplicates.
        
        Args:
            idempotency_key: Unique request identifier
            book_id: Book ID being generated
        """
        self.redis.setex(
            idempotency_key,
            self.key_ttl,
            book_id.encode('utf-8')
        )
    
    def clear_key(self, idempotency_key):
        """Clear idempotency key (on generation failure)."""
        self.redis.delete(idempotency_key)

# Usage in API endpoint
from flask import Flask, request, jsonify
from redis import Redis

app = Flask(__name__)
redis_client = Redis(host='localhost', port=6379, db=2)
idempotency = IdempotencyManager(redis_client)

@app.route('/api/generate', methods=['POST'])
def generate_book_api():
    """API endpoint with idempotency protection."""
    
    user_id = request.user.id  # From authentication
    params = request.json
    
    # Generate idempotency key
    idem_key = idempotency.generate_key(user_id, params)
    
    # Check for duplicate
    is_duplicate, existing_book_id = idempotency.check_duplicate(idem_key)
    
    if is_duplicate:
        # Return existing book
        return jsonify({
            'status': 'success',
            'book_id': existing_book_id,
            'message': 'Using existing generation',
            'duplicate': True
        }), 200
    
    # Generate new book ID
    book_id = str(uuid.uuid4())
    
    # Register idempotency key
    idempotency.register_request(idem_key, book_id)
    
    try:
        # Queue generation task
        task = generate_storybook.apply_async(
            args=[book_id, user_id, params],
            task_id=book_id  # Use book_id as task_id for tracking
        )
        
        return jsonify({
            'status': 'queued',
            'book_id': book_id,
            'task_id': task.id
        }), 202  # Accepted
        
    except Exception as e:
        # Clear idempotency key on queue failure
        idempotency.clear_key(idem_key)
        raise

Handling task retries with idempotency

Tasks themselves must be idempotent to safely retry on failures.

Task-level idempotency:

@celery_app.task(bind=True, max_retries=5)
def generate_image_idempotent(self, book_id, page_num, prompt):
    """
    Generate image with idempotency at task level.
    
    Checks if image already generated before calling API.
    """
    
    # Check if already generated
    existing_url = get_existing_image(book_id, page_num)
    if existing_url:
        logger.info(f"Image {page_num} already exists, skipping generation")
        return existing_url
    
    # Generate new image
    try:
        image_url = call_dalle_api(prompt)
        
        # Store result atomically
        store_image_result(book_id, page_num, image_url)
        
        return image_url
        
    except Exception as exc:
        # Retry logic (safe because we check existing above)
        raise self.retry(exc=exc, countdown=calculate_backoff(self.request.retries))

def store_image_result(book_id, page_num, image_url):
    """
    Store image result atomically to prevent race conditions.
    """
    # Use Redis SET NX (set if not exists) for atomic operation
    key = f"book:{book_id}:image:{page_num}"
    success = redis_client.setnx(key, image_url)
    
    if success:
        redis_client.expire(key, 3600)  # 1 hour expiration
        return True
    else:
        # Another worker already stored result
        return False

Total idempotency implementation time: 6-8 hours including key management and race condition handling.

Risk factor: HIGH. Missing idempotency causes duplicate charges; test edge cases thoroughly.

Rate limit coordination across workers

If you are running multiple workers, coordinated rate limiting prevents API blocking.

Understanding rate limit requirements

AI APIs enforce rate limits per account/API key:

Multiple workers sharing same API key must coordinate to stay within limits.

Implementing distributed rate limiting

Use Redis-backed rate limiting to coordinate across worker processes.

Distributed rate limiter:

import time
from redis import Redis

class DistributedRateLimiter:
    """
    Token bucket rate limiter using Redis.
    
    Coordinates rate limiting across multiple worker processes.
    """
    
    def __init__(self, redis_client, namespace='rate_limit'):
        self.redis = redis_client
        self.namespace = namespace
    
    def acquire(self, key, max_tokens, refill_rate, refill_period=60):
        """
        Acquire a token from the rate limiter.
        
        Args:
            key: Rate limit key (e.g., 'dalle3', 'gpt4')
            max_tokens: Maximum tokens in bucket
            refill_rate: Tokens added per refill_period
            refill_period: Seconds between refills (default 60)
        
        Returns:
            (acquired, wait_time)
                acquired: bool, whether token was acquired
                wait_time: seconds to wait before retry if not acquired
        """
        now = time.time()
        bucket_key = f"{self.namespace}:{key}"
        
        # Lua script for atomic token bucket operation
        script = """
        local bucket_key = KEYS[1]
        local now = tonumber(ARGV[1])
        local max_tokens = tonumber(ARGV[2])
        local refill_rate = tonumber(ARGV[3])
        local refill_period = tonumber(ARGV[4])
        
        -- Get current bucket state
        local bucket = redis.call('HMGET', bucket_key, 'tokens', 'last_refill')
        local tokens = tonumber(bucket[1]) or max_tokens
        local last_refill = tonumber(bucket[2]) or now
        
        -- Calculate refill
        local time_passed = now - last_refill
        local refills = math.floor(time_passed / refill_period)
        
        if refills > 0 then
            tokens = math.min(max_tokens, tokens + (refills * refill_rate))
            last_refill = last_refill + (refills * refill_period)
        end
        
        -- Try to acquire token
        if tokens >= 1 then
            tokens = tokens - 1
            redis.call('HMSET', bucket_key, 'tokens', tokens, 'last_refill', last_refill)
            redis.call('EXPIRE', bucket_key, refill_period * 2)
            return {1, 0}  -- Success, no wait
        else
            -- Calculate wait time until next token
            local wait = refill_period - (now - last_refill)
            return {0, wait}  -- Failed, return wait time
        end
        """
        
        result = self.redis.eval(
            script,
            1,  # Number of keys
            bucket_key,  # KEYS[1]
            now, max_tokens, refill_rate, refill_period  # ARGV
        )
        
        acquired = bool(result[0])
        wait_time = float(result[1])
        
        return acquired, wait_time
    
    def wait_and_acquire(self, key, max_tokens, refill_rate, refill_period=60, 
                        max_wait=300):
        """
        Block until token can be acquired.
        
        Args:
            max_wait: Maximum seconds to wait before giving up
        
        Returns:
            True if acquired, False if timed out
        """
        start_time = time.time()
        
        while True:
            acquired, wait_time = self.acquire(
                key, max_tokens, refill_rate, refill_period
            )
            
            if acquired:
                return True
            
            # Check timeout
            if time.time() - start_time > max_wait:
                return False
            
            # Wait before retry
            sleep_time = min(wait_time, max_wait - (time.time() - start_time))
            if sleep_time > 0:
                time.sleep(sleep_time)

# Usage in task
redis_client = Redis(host='localhost', port=6379, db=3)
rate_limiter = DistributedRateLimiter(redis_client)

@celery_app.task(bind=True, max_retries=10)
def generate_image_rate_limited(self, book_id, page_num, prompt):
    """Generate image with distributed rate limiting."""
    
    # Acquire rate limit token
    # DALL-E 3 Tier 1: 50 requests/minute
    acquired = rate_limiter.wait_and_acquire(
        key='dalle3',
        max_tokens=50,
        refill_rate=50,
        refill_period=60,
        max_wait=180  # Wait up to 3 minutes
    )
    
    if not acquired:
        # Rate limit wait exceeded, retry task
        raise self.retry(countdown=60, exc=Exception("Rate limit timeout"))
    
    # Token acquired, make API call
    try:
        image_url = call_dalle_api(prompt)
        return image_url
    except RateLimitError as e:
        # Hit rate limit despite our limiting (API limit changed?)
        # Back off significantly
        raise self.retry(exc=e, countdown=120)

Rate limiting for multiple API keys:

class APIKeyPool:
    """
    Manage multiple API keys with load balancing.
    
    Distributes requests across keys to maximize throughput.
    """
    
    def __init__(self, api_keys, redis_client):
        self.api_keys = api_keys
        self.redis = redis_client
        self.rate_limiter = DistributedRateLimiter(redis_client)
    
    def get_available_key(self, service='dalle3', max_wait=60):
        """
        Get an available API key respecting rate limits.
        
        Args:
            service: Service identifier (dalle3, gpt4, etc.)
            max_wait: Max seconds to wait for available key
        
        Returns:
            API key string or None if all keys rate limited
        """
        start_time = time.time()
        
        # Try each key in round-robin
        for key_index in range(len(self.api_keys)):
            # Calculate which key to try
            current_key_idx = (int(time.time()) + key_index) % len(self.api_keys)
            key = self.api_keys[current_key_idx]
            
            # Check rate limit for this specific key
            rate_key = f"{service}:key_{current_key_idx}"
            
            acquired, wait_time = self.rate_limiter.acquire(
                key=rate_key,
                max_tokens=50,  # DALL-E 3 limit
                refill_rate=50,
                refill_period=60
            )
            
            if acquired:
                return key
            
            # Check timeout
            if time.time() - start_time > max_wait:
                break
        
        return None  # All keys rate limited

# Usage
import os

api_pool = APIKeyPool(
    api_keys=[
        os.getenv('OPENAI_KEY_1'),
        os.getenv('OPENAI_KEY_2'),
        os.getenv('OPENAI_KEY_3')
    ],
    redis_client=redis_client
)

@celery_app.task
def generate_with_key_pool(book_id, page_num, prompt):
    """Generate using API key pool."""
    
    api_key = api_pool.get_available_key('dalle3', max_wait=120)
    
    if not api_key:
        raise Exception("All API keys rate limited")
    
    # Use selected key
    from openai import OpenAI
    client = OpenAI(api_key=api_key)
    response = client.images.generate(model="dall-e-3", prompt=prompt)
    
    return response.data[0].url

Total rate limiting implementation time: 8-12 hours including Lua scripts and key pooling.

Risk factor: HIGH. Rate limit violations can cause temporary API blocks; thoroughly test coordination.

State tracking and progress monitoring

If you want users to see generation progress, robust state management is essential.

Database schema for task state

Track generation state and progress in database for reliable status queries.

State schema:

-- PostgreSQL schema
CREATE TABLE book_generations (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID NOT NULL REFERENCES users(id),
    status VARCHAR(50) NOT NULL DEFAULT 'queued',
    -- States: queued, processing, text_complete, images_processing, 
    --         assembling, complete, failed
    
    -- Progress tracking
    progress_percent DECIMAL(5,2) DEFAULT 0.00,
    current_step VARCHAR(100),
    steps_completed INTEGER DEFAULT 0,
    total_steps INTEGER,
    
    -- Generation parameters
    page_count INTEGER NOT NULL,
    theme VARCHAR(200),
    age_group VARCHAR(50),
    
    -- Results
    pdf_url TEXT,
    error_message TEXT,
    
    -- Metadata
    task_id UUID,
    started_at TIMESTAMP,
    completed_at TIMESTAMP,
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);

-- Index for status queries
CREATE INDEX idx_book_gen_user_status ON book_generations(user_id, status);
CREATE INDEX idx_book_gen_task_id ON book_generations(task_id);

-- Page-level tracking
CREATE TABLE book_generation_pages (
    id SERIAL PRIMARY KEY,
    book_id UUID NOT NULL REFERENCES book_generations(id) ON DELETE CASCADE,
    page_num INTEGER NOT NULL,
    
    -- Text generation
    text_status VARCHAR(50) DEFAULT 'pending',
    page_text TEXT,
    text_generated_at TIMESTAMP,
    
    -- Image generation
    image_status VARCHAR(50) DEFAULT 'pending',
    image_url TEXT,
    image_prompt TEXT,
    image_generated_at TIMESTAMP,
    retry_count INTEGER DEFAULT 0,
    
    UNIQUE(book_id, page_num)
);

State update functions:

from sqlalchemy import create_engine, Column, String, Integer, Decimal, DateTime, Text
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
import uuid

Base = declarative_base()

class BookGeneration(Base):
    __tablename__ = 'book_generations'
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    user_id = Column(UUID(as_uuid=True), nullable=False)
    status = Column(String(50), default='queued')
    progress_percent = Column(Decimal(5, 2), default=0.00)
    current_step = Column(String(100))
    steps_completed = Column(Integer, default=0)
    total_steps = Column(Integer)
    page_count = Column(Integer, nullable=False)
    pdf_url = Column(Text)
    error_message = Column(Text)
    task_id = Column(UUID(as_uuid=True))
    started_at = Column(DateTime)
    completed_at = Column(DateTime)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

def update_book_status(book_id, status, **kwargs):
    """Update book generation status and metadata."""
    session = Session()
    
    try:
        book = session.query(BookGeneration).filter_by(id=book_id).first()
        
        if not book:
            raise ValueError(f"Book {book_id} not found")
        
        # Update status
        book.status = status
        book.updated_at = datetime.utcnow()
        
        # Update additional fields
        for key, value in kwargs.items():
            if hasattr(book, key):
                setattr(book, key, value)
        
        # Set timestamps
        if status == 'processing' and not book.started_at:
            book.started_at = datetime.utcnow()
        elif status in ['complete', 'failed']:
            book.completed_at = datetime.utcnow()
        
        session.commit()
        
    finally:
        session.close()

def update_progress(book_id, progress_percent, current_step=None):
    """Update generation progress."""
    session = Session()
    
    try:
        book = session.query(BookGeneration).filter_by(id=book_id).first()
        
        if book:
            book.progress_percent = min(100.00, progress_percent * 100)
            book.updated_at = datetime.utcnow()
            
            if current_step:
                book.current_step = current_step
                book.steps_completed = (book.steps_completed or 0) + 1
            
            session.commit()
    
    finally:
        session.close()

Real-time progress via WebSocket

Push progress updates to frontend for live status tracking.

WebSocket implementation:

from flask import Flask
from flask_socketio import SocketIO, emit, join_room

app = Flask(__name__)
socketio = SocketIO(app, cors_allowed_origins="*")

@socketio.on('subscribe_book')
def handle_subscribe(data):
    """Subscribe client to book generation updates."""
    book_id = data['book_id']
    join_room(book_id)
    
    # Send current status
    book = get_book_status(book_id)
    emit('book_status', book, room=book_id)

def broadcast_progress(book_id, progress_data):
    """Broadcast progress update to subscribed clients."""
    socketio.emit('book_progress', progress_data, room=book_id)

# Call from Celery task
def update_progress_with_broadcast(book_id, progress_percent, current_step):
    """Update progress and broadcast to clients."""
    
    # Update database
    update_progress(book_id, progress_percent, current_step)
    
    # Broadcast to WebSocket clients
    broadcast_progress(book_id, {
        'book_id': str(book_id),
        'progress': progress_percent * 100,
        'step': current_step
    })

Frontend WebSocket client:

import io from 'socket.io-client';

class GenerationTracker {
    constructor(bookId) {
        this.bookId = bookId;
        this.socket = io('http://localhost:5000');
        
        // Subscribe to book updates
        this.socket.emit('subscribe_book', { book_id: bookId });
        
        // Listen for progress
        this.socket.on('book_progress', (data) => {
            this.handleProgress(data);
        });
        
        this.socket.on('book_status', (data) => {
            this.handleStatus(data);
        });
    }
    
    handleProgress(data) {
        // Update UI with progress
        console.log(`Progress: ${data.progress}% - ${data.step}`);
        
        // Update progress bar
        document.getElementById('progress-bar').style.width = `${data.progress}%`;
        document.getElementById('current-step').textContent = data.step;
    }
    
    handleStatus(data) {
        if (data.status === 'complete') {
            // Show download button
            window.location.href = `/books/${this.bookId}`;
        } else if (data.status === 'failed') {
            // Show error message
            alert(`Generation failed: ${data.error_message}`);
        }
    }
    
    disconnect() {
        this.socket.disconnect();
    }
}

// Usage
const tracker = new GenerationTracker('book-uuid-here');

Total state tracking implementation time: 10-14 hours including database schema, WebSocket setup, and frontend integration.

Risk factor: MEDIUM. State synchronization bugs can show incorrect progress; thorough testing needed.

Where Musketeers Tech fits into queue architecture

If you are starting from scratch

Help you move from synchronous prototypes to production-ready async systems with queue infrastructure, retry logic, and rate limit coordination.

Design task workflows, state machines, and checkpoint strategies matching your generation pipeline stages.

Implement monitoring dashboards, dead letter queues, and alerting catching failures before users notice.

If you already have basic queuing but experiencing failures

Diagnose timeout issues, identify where retry logic fails or rate limits get exceeded, and pinpoint idempotency gaps.

Add exponential backoff, distributed rate limiting, and error classification on top of existing queue without re-architecting.

Tune worker concurrency, queue routing, and resource allocation for cost-optimal throughput.

So what should you do next?

Audit current architecture: track synchronous request timeout rates, measure end-to-end generation completion percentage, identify failure points and bottlenecks.

Introduce basic queuing by setting up Redis + Celery, migrating one generation endpoint to async processing, implementing status polling API for frontend.

Pilot retry and idempotency with exponential backoff on transient errors, idempotency keys preventing duplicates, collect metrics on retry success rates before expanding to all endpoints.

Frequently Asked Questions (FAQs)

1. Should I use Redis, RabbitMQ, or AWS SQS for my queue?

For most AI generation workloads, start with Redis + Celery. It provides best balance of simplicity (single binary), performance (100k+ ops/sec), and cost ($15-20/month). Upgrade to RabbitMQ only if you need guaranteed delivery for critical workflows. Use SQS if you’re AWS-native and want zero operational burden.

2. How do I prevent users from submitting the same generation request multiple times?

Implement idempotency keys combining user ID + request parameters hash. Store keys in Redis for 24 hours. When duplicate detected, return existing book_id instead of queuing new task. This prevents duplicate API costs and ensures consistent results across retries.

3. What’s the right retry strategy for AI API calls?

Use error classification: permanent errors (invalid key, bad request) don’t retry. Transient errors (network, timeouts) retry with exponential backoff starting at 2-5 seconds. Rate limit errors need longer backoff (60+ seconds). Always add jitter to prevent thundering herd. Max retries: 5-7 attempts.

4. How many Celery workers should I run?

Start with worker count = 2× CPU cores for I/O-bound tasks (AI API calls). Monitor queue depth and task wait time. Scale workers horizontally when queue depth consistently exceeds 100 tasks or wait time exceeds 2 minutes. Each worker handles 1 long-running task at a time (set prefetch_multiplier=1).

5. How does Musketeers Tech help implement production-grade queue systems?

Musketeers Tech designs and implements robust async generation architectures, including Redis/RabbitMQ/SQS queue setup and configuration, Celery task workflows with retry logic and exponential backoff, distributed rate limiting preventing API blocks, idempotency systems preventing duplicate processing, monitoring dashboards and dead letter queues, and horizontal scaling strategies, so your AI storybook platform handles production traffic reliably without timeout failures or duplicate charges.

January 20, 2026 Musketeers Tech Musketeers Tech
← Back