Celery + Django: Production Task Queue Patterns
Task design, retry strategies, chaining with canvas, beat scheduling, monitoring with Flower, and deployment patterns for production Celery setups.
Celery is the standard task queue for Django. Getting it running in development is easy — making it reliable in production requires understanding task design, failure modes, and operational tooling.
Setup
pip install celery redis django-celery-beat django-celery-results
# myproject/celery.py
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")
app = Celery("myproject")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
# myproject/__init__.py
from .celery import app as celery_app
__all__ = ("celery_app",)
# settings.py
CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = "redis://localhost:6379/1"
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_TIMEZONE = "Asia/Kathmandu"
# Prevent tasks from running too long
CELERY_TASK_SOFT_TIME_LIMIT = 60 # SoftTimeLimitExceeded raised
CELERY_TASK_TIME_LIMIT = 120 # Worker killed after this
# Acknowledge task only after it completes, not when received
CELERY_TASK_ACKS_LATE = True
CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # fair dispatch for long tasks
Writing Tasks Correctly
# orders/tasks.py
from celery import shared_task
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def process_order(self, order_id: int) -> dict:
"""Process a placed order. Retries up to 3 times on failure."""
from orders.models import Order # local import avoids circular deps
try:
order = Order.objects.select_related("user", "items").get(pk=order_id)
order.process()
logger.info("Processed order %s", order_id)
return {"status": "ok", "order_id": order_id}
except Order.DoesNotExist:
# Don't retry — the order is gone
logger.error("Order %s not found, skipping", order_id)
return {"status": "not_found"}
except Exception as exc:
logger.warning("Order %s failed, retrying: %s", order_id, exc)
raise self.retry(exc=exc, countdown=2 ** self.request.retries * 30)
Key Design Rules
Pass IDs, not objects.
# Wrong — serializes the entire object, stale by the time it runs
process_order.delay(order=order_obj)
# Correct — fresh DB lookup inside the task
process_order.delay(order_id=order.pk)
Idempotency. Tasks can run more than once (retries, duplicate delivery). Design them so running twice produces the same result:
@shared_task(bind=True)
def send_welcome_email(self, user_id: int):
from accounts.models import User, EmailLog
# Guard against duplicate sends
if EmailLog.objects.filter(user_id=user_id, type="welcome").exists():
return {"status": "already_sent"}
user = User.objects.get(pk=user_id)
send_mail("Welcome!", "...", to=[user.email])
EmailLog.objects.create(user_id=user_id, type="welcome")
return {"status": "sent"}
Retry Strategies
Exponential Backoff
@shared_task(bind=True, max_retries=5)
def call_external_api(self, payload: dict):
try:
response = requests.post("https://api.example.com/webhook", json=payload, timeout=10)
response.raise_for_status()
except requests.RequestException as exc:
# Retry with exponential backoff: 30s, 60s, 120s, 240s, 480s
raise self.retry(exc=exc, countdown=30 * (2 ** self.request.retries))
Retry on Specific Exceptions Only
from requests.exceptions import Timeout, ConnectionError
@shared_task(
bind=True,
autoretry_for=(Timeout, ConnectionError),
retry_kwargs={"max_retries": 4},
retry_backoff=True, # exponential
retry_backoff_max=600, # cap at 10 minutes
retry_jitter=True, # randomize to avoid thundering herd
)
def fetch_data(self, url: str):
return requests.get(url, timeout=15).json()
Canvas: Chaining, Groups, and Chords
chain — Sequential Tasks
from celery import chain
# Task B receives the return value of Task A as its first argument
pipeline = chain(
validate_order.s(order_id),
charge_payment.s(), # receives validated order data
send_confirmation_email.s(), # receives payment result
)
pipeline.delay()
group — Parallel Tasks
from celery import group
# All tasks run in parallel
job = group(
send_email.s(user_id)
for user_id in User.objects.values_list("id", flat=True)
)
result = job.delay()
results = result.get() # wait for all
chord — Parallel + Callback
from celery import chord
# Run scrape tasks in parallel, then aggregate results
job = chord(
[scrape_page.s(url) for url in urls],
aggregate_results.s(), # called with list of all results
)
job.delay()
Real-World Pipeline
from celery import chain, group, chord
def process_bulk_order(order_ids: list[int]):
return chain(
# Step 1: validate all in parallel
chord(
group(validate_order.s(oid) for oid in order_ids),
collect_validation_results.s(),
),
# Step 2: charge payment sequentially
charge_batch_payment.s(),
# Step 3: notify all users in parallel
group(send_confirmation.s(oid) for oid in order_ids),
).delay()
Beat Scheduler (Periodic Tasks)
# settings.py
INSTALLED_APPS += ["django_celery_beat"]
# Static schedule (in settings)
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
"send-daily-digest": {
"task": "newsletter.tasks.send_daily_digest",
"schedule": crontab(hour=8, minute=0), # 8:00 AM daily
},
"cleanup-expired-sessions": {
"task": "accounts.tasks.cleanup_expired_sessions",
"schedule": crontab(minute=0), # every hour
},
"sync-external-inventory": {
"task": "inventory.tasks.sync_inventory",
"schedule": 300.0, # every 5 minutes
},
}
# Run the beat scheduler (separate process)
celery -A myproject beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler
Dynamic schedules (editable from Django admin) require DatabaseScheduler + django_celery_beat migrations.
Queues and Task Routing
Separate fast (email, notifications) from slow (report generation, bulk processing):
# settings.py
CELERY_TASK_ROUTES = {
"orders.tasks.process_order": {"queue": "orders"},
"reports.tasks.*": {"queue": "reports"},
"notifications.*": {"queue": "default"},
}
CELERY_TASK_DEFAULT_QUEUE = "default"
# Start workers for specific queues
celery -A myproject worker -Q default -c 4 --loglevel=INFO
celery -A myproject worker -Q orders -c 2 --loglevel=INFO
celery -A myproject worker -Q reports -c 1 --loglevel=INFO
Monitoring with Flower
pip install flower
celery -A myproject flower --port=5555
Access at http://localhost:5555. Track task success/failure rates, retry counts, worker status, and queue depths. In production, gate it behind basic auth:
celery -A myproject flower --basic_auth=admin:secretpassword
Deployment with Supervisor
; /etc/supervisor/conf.d/celery.conf
[program:celery_worker_default]
command=/venv/bin/celery -A myproject worker -Q default -c 4 --loglevel=INFO
directory=/app
user=deploy
autostart=true
autorestart=true
stderr_logfile=/var/log/celery/worker.err.log
stdout_logfile=/var/log/celery/worker.out.log
environment=DJANGO_SETTINGS_MODULE="myproject.settings.production"
[program:celery_beat]
command=/venv/bin/celery -A myproject beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler
directory=/app
user=deploy
autostart=true
autorestart=true
stderr_logfile=/var/log/celery/beat.err.log
stdout_logfile=/var/log/celery/beat.out.log
Never run more than one beat process. A second beat instance will fire every scheduled task twice. Use a process manager lock or deploy beat as a single container.
Common Pitfalls
Storing large results in the backend. The result backend (Redis) is for small return values. Don’t return large datasets — write to the DB or S3 and return a reference.
Database connections in tasks. Each worker process holds DB connections. Set CONN_MAX_AGE = 0 in production to prevent stale connections after worker restarts, or configure a connection pooler (PgBouncer).
Importing models at module level. Always import models inside the task function body to avoid circular imports and app-not-ready errors.