FastAPI Background Tasks with Celery Integration Guide
Building modern web applications often requires handling time-consuming operations without blocking the main request-response cycle. Whether you’re processing large datasets, sending emails, or generating reports, background tasks are essential for maintaining responsive user experiences. This guide explores how to integrate Celery with FastAPI for robust asynchronous task processing, with practical examples you can apply immediately.
Why Background Tasks Matter in FastAPI
FastAPI’s async capabilities handle I/O-bound operations efficiently, but CPU-intensive tasks still block the event loop. When your endpoint needs to process data that takes seconds or minutes, users face frustrating wait times or timeouts. Background tasks solve this by moving heavy processing to separate workers that run independently of HTTP requests.
Common use cases include:
- Image/video processing: Resizing, transcoding, or applying filters
- Data exports: Generating large CSV or Excel files
- Email notifications: Sending welcome emails, password resets, or newsletters
- Web scraping: Collecting data from external sources
- Machine learning inference: Running model predictions asynchronously
Celery provides a battle-tested solution for managing these distributed task queues, with support for task scheduling, retries, and result persistence.
Setting Up Celery with FastAPI
Prerequisites and Installation
First, install the required dependencies:
pip install fastapi celery redis uvicorn
You’ll also need Redis as the message broker (though Celery supports other brokers like RabbitMQ):
# Using Docker
docker run -d -p 6379:6379 redis:7-alpine
Project Structure
A well-organized structure separates your FastAPI app from Celery configuration:
project/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI application
│ ├── tasks.py # Celery tasks
│ └── config.py # Configuration
├── celery_app/
│ └── celery_config.py # Celery configuration
└── requirements.txt
Basic Celery Configuration
Create your Celery configuration file:
# celery_app/celery_config.py
from celery import Celery
celery_app = Celery(
"worker",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
task_track_started=True,
task_time_limit=3600, # 1 hour max
task_soft_time_limit=3000, # Soft limit at 50 minutes
)
Creating the FastAPI Application
Now build your FastAPI app with task endpoints:
# app/main.py
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel
from typing import Optional
import asyncio
app = FastAPI(title="FastAPI Celery Integration")
class TaskRequest(BaseModel):
user_id: int
email: str
action: str
# Import tasks after they're defined
from app.tasks import process_user_task, send_notification
@app.post("/tasks/submit")
async def submit_task(request: TaskRequest, background_tasks: BackgroundTasks):
"""Submit a task for background processing."""
# Queue the task with Celery
task = process_user_task.delay(
user_id=request.user_id,
email=request.email,
action=request.action
)
return {
"task_id": task.id,
"status": "submitted",
"message": "Task queued successfully"
}
@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
"""Check the status of a background task."""
from celery_app.celery_config import celery_app
result = celery_app.AsyncResult(task_id)
return {
"task_id": task_id,
"status": result.status,
"result": result.result if result.ready() else None
}
Defining Celery Tasks
Create your task definitions:
# app/tasks.py
from celery_app.celery_config import celery_app
import time
import logging
logger = logging.getLogger(__name__)
@celery_app.task(bind=True, max_retries=3)
def process_user_task(self, user_id: int, email: str, action: str):
"""Process a user action in the background."""
try:
logger.info(f"Processing task for user {user_id}, action: {action}")
# Simulate processing time
time.sleep(5)
# Your business logic here
result = {
"user_id": user_id,
"email": email,
"action": action,
"processed": True,
"timestamp": time.time()
}
return result
except Exception as exc:
logger.error(f"Task failed: {exc}")
# Retry with exponential backoff
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
@celery_app.task
def send_notification(user_id: int, message: str):
"""Send notification email (mock implementation)."""
# In production, integrate with SendGrid, AWS SES, etc.
logger.info(f"Sending notification to user {user_id}: {message}")
return {"sent": True, "user_id": user_id}
FastAPI BackgroundTasks vs Celery
FastAPI includes a built-in BackgroundTasks class for simple use cases. Understanding when to use each option helps you choose the right tool:
| Feature | FastAPI BackgroundTasks | Celery |
|---|---|---|
| Complexity | Low | High |
| Scalability | Single process | Distributed workers |
| Persistence | In-memory | Redis/database |
| Scheduling | No | Yes (celery beat) |
| Retry logic | Manual | Built-in |
| Monitoring | Limited | Flower dashboard |
Use FastAPI’s BackgroundTasks for simple, single-instance applications. Choose Celery when you need horizontal scaling, persistent queues, scheduled tasks, or robust retry mechanisms.
Advanced Patterns
Task Chaining and Chords
Chain multiple tasks together for complex workflows:
from celery import chain, chord
# Sequential execution
workflow = chain(
process_user_task.s(user_id=1, email="test@example.com", action="signup"),
send_notification.s(message="Welcome!")
)
result = workflow.apply_async()
# Parallel execution with callback
workflow = chord(
[process_user_task.s(user_id=i, email=f"user{i}@example.com", action="batch")
for i in range(10)],
send_notification.s(message="Batch processing complete")
)
result = workflow.apply_async()
Scheduled Tasks with Celery Beat
For recurring tasks, integrate Celery Beat:
# celery_app/celery_config.py additions
celery_app.conf.beat_schedule = {
"cleanup-old-data-daily": {
"task": "app.tasks.cleanup_old_data",
"schedule": 86400.00, # Daily
},
"generate-reports-hourly": {
"task": "app.tasks.generate_daily_report",
"schedule": 3600.00, # Hourly
},
}
Error Handling and Monitoring
Implement comprehensive error handling:
@celery_app.task(bind=True, max_retries=3, autoretry_for=(Exception,),
retry_backoff=True, retry_backoff_max=600)
def robust_task(self, data: dict):
"""Task with automatic retry logic."""
try:
# Process data
result = process_data_safely(data)
return {"status": "success", "result": result}
except ValidationError as exc:
# Don't retry validation errors
return {"status": "failed", "error": str(exc)}
Best Practices
-
Keep tasks idempotent: Design tasks that can run safely multiple times without side effects
-
Use descriptive task names: Enable easy identification in monitoring tools
-
Implement proper logging: Track task execution for debugging
-
Set appropriate timeouts: Prevent runaway tasks from consuming resources
-
Use task routing: Route specific tasks to dedicated queues based on resource needs
-
Monitor with Flower: Install Flower for real-time Celery cluster monitoring
pip install flower
celery -A celery_app.celery_config flower --port=5555
Common Pitfalls to Avoid
-
Blocking the event loop: Never use synchronous I/O within Celery tasks; it’s designed for separate processes
-
Forgetting task serialization: Ensure all task arguments are JSON-serializable
-
Ignoring result backend: Set up a result backend to track task status and retrieve results
-
Resource exhaustion: Configure prefetch limits to prevent worker overload
-
Missing error handling: Always wrap task logic in try-except blocks
Running the Application
Start the components separately:
# Terminal 1: Redis
redis-server
# Terminal 2: Celery worker
celery -A celery_app.celery_config worker --loglevel=info
# Terminal 3: Celery Beat (if using scheduled tasks)
celery -A celery_app.celery_config beat --loglevel=info
# Terminal 4: FastAPI
uvicorn app.main:app --reload
Conclusion
Integrating Celery with FastAPI provides a powerful foundation for handling background processing at scale. Start with simple tasks and progressively adopt advanced patterns like task chaining, scheduling, and comprehensive monitoring as your needs grow. The combination of FastAPI’s async capabilities and Celery’s robust task queue management enables you to build responsive applications that handle demanding workloads efficiently.
Related Reading
- Claude Code for Beginners: Complete Getting Started Guide
- Best Claude Skills for Developers in 2026
- Claude Skills Guides Hub
Built by theluckystrike — More at zovo.one