Claude Skills Guide

FastAPI Background Tasks with Celery Integration Guide

Building modern web applications often requires handling time-consuming operations without blocking the main request-response cycle. Whether you’re processing large datasets, sending emails, or generating reports, background tasks are essential for maintaining responsive user experiences. This guide explores how to integrate Celery with FastAPI for robust asynchronous task processing, with practical examples you can apply immediately.

Why Background Tasks Matter in FastAPI

FastAPI’s async capabilities handle I/O-bound operations efficiently, but CPU-intensive tasks still block the event loop. When your endpoint needs to process data that takes seconds or minutes, users face frustrating wait times or timeouts. Background tasks solve this by moving heavy processing to separate workers that run independently of HTTP requests.

Common use cases include:

Celery provides a battle-tested solution for managing these distributed task queues, with support for task scheduling, retries, and result persistence.

Setting Up Celery with FastAPI

Prerequisites and Installation

First, install the required dependencies:

pip install fastapi celery redis uvicorn

You’ll also need Redis as the message broker (though Celery supports other brokers like RabbitMQ):

# Using Docker
docker run -d -p 6379:6379 redis:7-alpine

Project Structure

A well-organized structure separates your FastAPI app from Celery configuration:

project/
├── app/
│   ├── __init__.py
│   ├── main.py           # FastAPI application
│   ├── tasks.py          # Celery tasks
│   └── config.py         # Configuration
├── celery_app/
│   └── celery_config.py  # Celery configuration
└── requirements.txt

Basic Celery Configuration

Create your Celery configuration file:

# celery_app/celery_config.py
from celery import Celery

celery_app = Celery(
    "worker",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0"
)

celery_app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="UTC",
    enable_utc=True,
    task_track_started=True,
    task_time_limit=3600,  # 1 hour max
    task_soft_time_limit=3000,  # Soft limit at 50 minutes
)

Creating the FastAPI Application

Now build your FastAPI app with task endpoints:

# app/main.py
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel
from typing import Optional
import asyncio

app = FastAPI(title="FastAPI Celery Integration")

class TaskRequest(BaseModel):
    user_id: int
    email: str
    action: str

# Import tasks after they're defined
from app.tasks import process_user_task, send_notification

@app.post("/tasks/submit")
async def submit_task(request: TaskRequest, background_tasks: BackgroundTasks):
    """Submit a task for background processing."""
    
    # Queue the task with Celery
    task = process_user_task.delay(
        user_id=request.user_id,
        email=request.email,
        action=request.action
    )
    
    return {
        "task_id": task.id,
        "status": "submitted",
        "message": "Task queued successfully"
    }

@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
    """Check the status of a background task."""
    from celery_app.celery_config import celery_app
    
    result = celery_app.AsyncResult(task_id)
    
    return {
        "task_id": task_id,
        "status": result.status,
        "result": result.result if result.ready() else None
    }

Defining Celery Tasks

Create your task definitions:

# app/tasks.py
from celery_app.celery_config import celery_app
import time
import logging

logger = logging.getLogger(__name__)

@celery_app.task(bind=True, max_retries=3)
def process_user_task(self, user_id: int, email: str, action: str):
    """Process a user action in the background."""
    try:
        logger.info(f"Processing task for user {user_id}, action: {action}")
        
        # Simulate processing time
        time.sleep(5)
        
        # Your business logic here
        result = {
            "user_id": user_id,
            "email": email,
            "action": action,
            "processed": True,
            "timestamp": time.time()
        }
        
        return result
        
    except Exception as exc:
        logger.error(f"Task failed: {exc}")
        # Retry with exponential backoff
        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))

@celery_app.task
def send_notification(user_id: int, message: str):
    """Send notification email (mock implementation)."""
    # In production, integrate with SendGrid, AWS SES, etc.
    logger.info(f"Sending notification to user {user_id}: {message}")
    return {"sent": True, "user_id": user_id}

FastAPI BackgroundTasks vs Celery

FastAPI includes a built-in BackgroundTasks class for simple use cases. Understanding when to use each option helps you choose the right tool:

Feature FastAPI BackgroundTasks Celery
Complexity Low High
Scalability Single process Distributed workers
Persistence In-memory Redis/database
Scheduling No Yes (celery beat)
Retry logic Manual Built-in
Monitoring Limited Flower dashboard

Use FastAPI’s BackgroundTasks for simple, single-instance applications. Choose Celery when you need horizontal scaling, persistent queues, scheduled tasks, or robust retry mechanisms.

Advanced Patterns

Task Chaining and Chords

Chain multiple tasks together for complex workflows:

from celery import chain, chord

# Sequential execution
workflow = chain(
    process_user_task.s(user_id=1, email="test@example.com", action="signup"),
    send_notification.s(message="Welcome!")
)
result = workflow.apply_async()

# Parallel execution with callback
workflow = chord(
    [process_user_task.s(user_id=i, email=f"user{i}@example.com", action="batch") 
     for i in range(10)],
    send_notification.s(message="Batch processing complete")
)
result = workflow.apply_async()

Scheduled Tasks with Celery Beat

For recurring tasks, integrate Celery Beat:

# celery_app/celery_config.py additions
celery_app.conf.beat_schedule = {
    "cleanup-old-data-daily": {
        "task": "app.tasks.cleanup_old_data",
        "schedule": 86400.00,  # Daily
    },
    "generate-reports-hourly": {
        "task": "app.tasks.generate_daily_report",
        "schedule": 3600.00,  # Hourly
    },
}

Error Handling and Monitoring

Implement comprehensive error handling:

@celery_app.task(bind=True, max_retries=3, autoretry_for=(Exception,), 
                 retry_backoff=True, retry_backoff_max=600)
def robust_task(self, data: dict):
    """Task with automatic retry logic."""
    try:
        # Process data
        result = process_data_safely(data)
        return {"status": "success", "result": result}
    except ValidationError as exc:
        # Don't retry validation errors
        return {"status": "failed", "error": str(exc)}

Best Practices

  1. Keep tasks idempotent: Design tasks that can run safely multiple times without side effects

  2. Use descriptive task names: Enable easy identification in monitoring tools

  3. Implement proper logging: Track task execution for debugging

  4. Set appropriate timeouts: Prevent runaway tasks from consuming resources

  5. Use task routing: Route specific tasks to dedicated queues based on resource needs

  6. Monitor with Flower: Install Flower for real-time Celery cluster monitoring

pip install flower
celery -A celery_app.celery_config flower --port=5555

Common Pitfalls to Avoid

Running the Application

Start the components separately:

# Terminal 1: Redis
redis-server

# Terminal 2: Celery worker
celery -A celery_app.celery_config worker --loglevel=info

# Terminal 3: Celery Beat (if using scheduled tasks)
celery -A celery_app.celery_config beat --loglevel=info

# Terminal 4: FastAPI
uvicorn app.main:app --reload

Conclusion

Integrating Celery with FastAPI provides a powerful foundation for handling background processing at scale. Start with simple tasks and progressively adopt advanced patterns like task chaining, scheduling, and comprehensive monitoring as your needs grow. The combination of FastAPI’s async capabilities and Celery’s robust task queue management enables you to build responsive applications that handle demanding workloads efficiently.

Built by theluckystrike — More at zovo.one