← Writing
article advanced

Celery + Django: Production Task Queue Patterns

Task design, retry strategies, chaining with canvas, beat scheduling, monitoring with Flower, and deployment patterns for production Celery setups.

#django#celery#redis#async#tasks#production

Celery is the standard task queue for Django. Getting it running in development is easy — making it reliable in production requires understanding task design, failure modes, and operational tooling.

Setup

pip install celery redis django-celery-beat django-celery-results
# myproject/celery.py
import os
from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")

app = Celery("myproject")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
# myproject/__init__.py
from .celery import app as celery_app
__all__ = ("celery_app",)
# settings.py
CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = "redis://localhost:6379/1"
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_TIMEZONE = "Asia/Kathmandu"

# Prevent tasks from running too long
CELERY_TASK_SOFT_TIME_LIMIT = 60   # SoftTimeLimitExceeded raised
CELERY_TASK_TIME_LIMIT = 120       # Worker killed after this

# Acknowledge task only after it completes, not when received
CELERY_TASK_ACKS_LATE = True
CELERY_WORKER_PREFETCH_MULTIPLIER = 1  # fair dispatch for long tasks

Writing Tasks Correctly

# orders/tasks.py
from celery import shared_task
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)


@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def process_order(self, order_id: int) -> dict:
    """Process a placed order. Retries up to 3 times on failure."""
    from orders.models import Order  # local import avoids circular deps

    try:
        order = Order.objects.select_related("user", "items").get(pk=order_id)
        order.process()
        logger.info("Processed order %s", order_id)
        return {"status": "ok", "order_id": order_id}

    except Order.DoesNotExist:
        # Don't retry — the order is gone
        logger.error("Order %s not found, skipping", order_id)
        return {"status": "not_found"}

    except Exception as exc:
        logger.warning("Order %s failed, retrying: %s", order_id, exc)
        raise self.retry(exc=exc, countdown=2 ** self.request.retries * 30)

Key Design Rules

Pass IDs, not objects.

# Wrong — serializes the entire object, stale by the time it runs
process_order.delay(order=order_obj)

# Correct — fresh DB lookup inside the task
process_order.delay(order_id=order.pk)

Idempotency. Tasks can run more than once (retries, duplicate delivery). Design them so running twice produces the same result:

@shared_task(bind=True)
def send_welcome_email(self, user_id: int):
    from accounts.models import User, EmailLog

    # Guard against duplicate sends
    if EmailLog.objects.filter(user_id=user_id, type="welcome").exists():
        return {"status": "already_sent"}

    user = User.objects.get(pk=user_id)
    send_mail("Welcome!", "...", to=[user.email])
    EmailLog.objects.create(user_id=user_id, type="welcome")
    return {"status": "sent"}

Retry Strategies

Exponential Backoff

@shared_task(bind=True, max_retries=5)
def call_external_api(self, payload: dict):
    try:
        response = requests.post("https://api.example.com/webhook", json=payload, timeout=10)
        response.raise_for_status()
    except requests.RequestException as exc:
        # Retry with exponential backoff: 30s, 60s, 120s, 240s, 480s
        raise self.retry(exc=exc, countdown=30 * (2 ** self.request.retries))

Retry on Specific Exceptions Only

from requests.exceptions import Timeout, ConnectionError

@shared_task(
    bind=True,
    autoretry_for=(Timeout, ConnectionError),
    retry_kwargs={"max_retries": 4},
    retry_backoff=True,          # exponential
    retry_backoff_max=600,       # cap at 10 minutes
    retry_jitter=True,           # randomize to avoid thundering herd
)
def fetch_data(self, url: str):
    return requests.get(url, timeout=15).json()

Canvas: Chaining, Groups, and Chords

chain — Sequential Tasks

from celery import chain

# Task B receives the return value of Task A as its first argument
pipeline = chain(
    validate_order.s(order_id),
    charge_payment.s(),           # receives validated order data
    send_confirmation_email.s(),  # receives payment result
)
pipeline.delay()

group — Parallel Tasks

from celery import group

# All tasks run in parallel
job = group(
    send_email.s(user_id)
    for user_id in User.objects.values_list("id", flat=True)
)
result = job.delay()
results = result.get()  # wait for all

chord — Parallel + Callback

from celery import chord

# Run scrape tasks in parallel, then aggregate results
job = chord(
    [scrape_page.s(url) for url in urls],
    aggregate_results.s(),  # called with list of all results
)
job.delay()

Real-World Pipeline

from celery import chain, group, chord

def process_bulk_order(order_ids: list[int]):
    return chain(
        # Step 1: validate all in parallel
        chord(
            group(validate_order.s(oid) for oid in order_ids),
            collect_validation_results.s(),
        ),
        # Step 2: charge payment sequentially
        charge_batch_payment.s(),
        # Step 3: notify all users in parallel
        group(send_confirmation.s(oid) for oid in order_ids),
    ).delay()

Beat Scheduler (Periodic Tasks)

# settings.py
INSTALLED_APPS += ["django_celery_beat"]

# Static schedule (in settings)
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    "send-daily-digest": {
        "task": "newsletter.tasks.send_daily_digest",
        "schedule": crontab(hour=8, minute=0),  # 8:00 AM daily
    },
    "cleanup-expired-sessions": {
        "task": "accounts.tasks.cleanup_expired_sessions",
        "schedule": crontab(minute=0),  # every hour
    },
    "sync-external-inventory": {
        "task": "inventory.tasks.sync_inventory",
        "schedule": 300.0,  # every 5 minutes
    },
}
# Run the beat scheduler (separate process)
celery -A myproject beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler

Dynamic schedules (editable from Django admin) require DatabaseScheduler + django_celery_beat migrations.

Queues and Task Routing

Separate fast (email, notifications) from slow (report generation, bulk processing):

# settings.py
CELERY_TASK_ROUTES = {
    "orders.tasks.process_order": {"queue": "orders"},
    "reports.tasks.*": {"queue": "reports"},
    "notifications.*": {"queue": "default"},
}

CELERY_TASK_DEFAULT_QUEUE = "default"
# Start workers for specific queues
celery -A myproject worker -Q default -c 4 --loglevel=INFO
celery -A myproject worker -Q orders -c 2 --loglevel=INFO
celery -A myproject worker -Q reports -c 1 --loglevel=INFO

Monitoring with Flower

pip install flower
celery -A myproject flower --port=5555

Access at http://localhost:5555. Track task success/failure rates, retry counts, worker status, and queue depths. In production, gate it behind basic auth:

celery -A myproject flower --basic_auth=admin:secretpassword

Deployment with Supervisor

; /etc/supervisor/conf.d/celery.conf
[program:celery_worker_default]
command=/venv/bin/celery -A myproject worker -Q default -c 4 --loglevel=INFO
directory=/app
user=deploy
autostart=true
autorestart=true
stderr_logfile=/var/log/celery/worker.err.log
stdout_logfile=/var/log/celery/worker.out.log
environment=DJANGO_SETTINGS_MODULE="myproject.settings.production"

[program:celery_beat]
command=/venv/bin/celery -A myproject beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler
directory=/app
user=deploy
autostart=true
autorestart=true
stderr_logfile=/var/log/celery/beat.err.log
stdout_logfile=/var/log/celery/beat.out.log

Never run more than one beat process. A second beat instance will fire every scheduled task twice. Use a process manager lock or deploy beat as a single container.

Common Pitfalls

Storing large results in the backend. The result backend (Redis) is for small return values. Don’t return large datasets — write to the DB or S3 and return a reference.

Database connections in tasks. Each worker process holds DB connections. Set CONN_MAX_AGE = 0 in production to prevent stale connections after worker restarts, or configure a connection pooler (PgBouncer).

Importing models at module level. Always import models inside the task function body to avoid circular imports and app-not-ready errors.