Real-time Django: WebSockets with Django Channels
Build production-grade real-time features in Django using Channels, channel layers (Redis), group messaging, and JWT authentication over WebSocket.
Django’s request/response cycle is synchronous and stateless — it closes the connection after every response. WebSockets keep the connection open, allowing the server to push data to clients at any time. Django Channels extends Django to handle WebSockets, background tasks, and other async protocols.
Architecture Overview
Browser Django Channels
│ │
│ WS handshake │
├──────────────────────► ASGI Server (Daphne / Uvicorn)
│ │
│ ▼
│ URL Router
│ │
│ WebsocketConsumer
│ ◄──────────────────── │ (channel layer → Redis)
│ ◄──────────────────── │ (group broadcast)
Installation
pip install channels channels-redis
# settings.py
INSTALLED_APPS = ["channels", ...]
ASGI_APPLICATION = "myproject.asgi.application"
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {"hosts": [("127.0.0.1", 6379)]},
}
}
# myproject/asgi.py
import os
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.security.websocket import AllowedHostsOriginValidator
from django.core.asgi import get_asgi_application
import chat.routing
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")
application = ProtocolTypeRouter({
"http": get_asgi_application(),
"websocket": AllowedHostsOriginValidator(
AuthMiddlewareStack(
URLRouter(chat.routing.websocket_urlpatterns)
)
),
})
Defining a Consumer
A Consumer is the WebSocket equivalent of a Django view.
# chat/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.room_name = self.scope["url_route"]["kwargs"]["room_name"]
self.room_group_name = f"chat_{self.room_name}"
# Join the channel group
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name,
)
await self.accept()
async def disconnect(self, close_code):
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name,
)
async def receive(self, text_data):
data = json.loads(text_data)
message = data["message"]
# Broadcast to all clients in the group
await self.channel_layer.group_send(
self.room_group_name,
{
"type": "chat_message", # maps to method name below
"message": message,
"sender": self.scope["user"].username,
},
)
# Handler called by group_send
async def chat_message(self, event):
await self.send(text_data=json.dumps({
"message": event["message"],
"sender": event["sender"],
}))
# chat/routing.py
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
re_path(r"ws/chat/(?P<room_name>\w+)/$", consumers.ChatConsumer.as_asgi()),
]
The scope Object
self.scope is the ASGI equivalent of Django’s request. Key fields:
self.scope["type"] # "websocket"
self.scope["user"] # authenticated User (via AuthMiddlewareStack)
self.scope["url_route"] # {"kwargs": {"room_name": "general"}}
self.scope["headers"] # list of (name, value) byte tuples
self.scope["cookies"] # dict of cookie values
self.scope["session"] # Django session (dict-like)
self.scope["query_string"] # b"token=abc123"
Authentication Over WebSocket
AuthMiddlewareStack reads Django’s session cookie and populates scope["user"]. This works for browser clients that have already logged in via normal Django views.
For mobile clients or SPAs using JWT, session cookies aren’t available. Authenticate via query param at connect time:
# consumers.py
from channels.db import database_sync_to_async
from django.contrib.auth.models import AnonymousUser
import jwt
from django.conf import settings
@database_sync_to_async
def get_user_from_token(token: str):
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
from django.contrib.auth import get_user_model
User = get_user_model()
return User.objects.get(id=payload["user_id"])
except Exception:
return AnonymousUser()
class SecureChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
# Parse ?token=... from query string
query_string = self.scope["query_string"].decode()
params = dict(p.split("=") for p in query_string.split("&") if "=" in p)
token = params.get("token", "")
self.scope["user"] = await get_user_from_token(token)
if self.scope["user"].is_anonymous:
await self.close(code=4001)
return
self.room_group_name = f"user_{self.scope['user'].id}"
await self.channel_layer.group_add(self.room_group_name, self.channel_name)
await self.accept()
Sending from Outside a Consumer (Server Push)
The real power: push data to connected clients from anywhere — a Django view, a Celery task, a signal.
# anywhere in Django code
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
channel_layer = get_channel_layer()
def notify_user(user_id: int, payload: dict):
async_to_sync(channel_layer.group_send)(
f"user_{user_id}",
{
"type": "user_notification",
"payload": payload,
},
)
# From a Celery task
@shared_task
def send_order_update(order_id: int):
order = Order.objects.get(pk=order_id)
notify_user(order.user_id, {
"event": "order_updated",
"status": order.status,
"order_id": order_id,
})
# In the consumer — handle the pushed message
class NotificationConsumer(AsyncWebsocketConsumer):
async def user_notification(self, event):
await self.send(text_data=json.dumps(event["payload"]))
Sync vs Async Consumers
Prefer AsyncWebsocketConsumer for most cases. Use WebsocketConsumer (sync) only when working with sync-only code you can’t refactor.
# Sync — Django ORM calls are safe but block the thread
from channels.generic.websocket import WebsocketConsumer
class SyncConsumer(WebsocketConsumer):
def connect(self):
self.accept()
def receive(self, text_data):
# Sync ORM call — fine in sync consumer
count = Message.objects.count()
self.send(text_data=str(count))
# Async — wrap all ORM calls with database_sync_to_async
from channels.db import database_sync_to_async
class AsyncConsumer(AsyncWebsocketConsumer):
async def receive(self, text_data):
# WRONG — Django ORM is sync, will raise SynchronousOnlyOperation
# count = Message.objects.count()
# CORRECT
count = await database_sync_to_async(Message.objects.count)()
await self.send(text_data=str(count))
database_sync_to_async shorthand
from channels.db import database_sync_to_async
# Decorator form
@database_sync_to_async
def get_messages(room_name):
return list(Message.objects.filter(room=room_name).order_by("-created_at")[:50])
# Usage
messages = await get_messages(self.room_name)
Presence Tracking
Track connected users with Redis sorted sets (outside the channel layer):
import aioredis
import time
REDIS_URL = "redis://localhost:6379"
class PresenceConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.user_id = str(self.scope["user"].id)
self.redis = await aioredis.from_url(REDIS_URL)
# Add to presence set with timestamp as score
await self.redis.zadd("online_users", {self.user_id: time.time()})
await self.accept()
# Broadcast updated presence list
await self._broadcast_presence()
async def disconnect(self, close_code):
await self.redis.zrem("online_users", self.user_id)
await self._broadcast_presence()
async def _broadcast_presence(self):
# Remove stale connections (> 30s old)
cutoff = time.time() - 30
await self.redis.zremrangebyscore("online_users", 0, cutoff)
online = await self.redis.zrange("online_users", 0, -1)
await self.channel_layer.group_send(
"global_presence",
{"type": "presence_update", "online": [uid.decode() for uid in online]},
)
Production Deployment
# Run with Daphne (ASGI server)
pip install daphne
daphne -b 0.0.0.0 -p 8001 myproject.asgi:application
# Or Uvicorn
pip install uvicorn
uvicorn myproject.asgi:application --host 0.0.0.0 --port 8001 --workers 4
# nginx.conf — proxy WebSocket connections
upstream django_ws {
server 127.0.0.1:8001;
}
server {
location /ws/ {
proxy_pass http://django_ws;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_read_timeout 86400; # 24h — keep WS alive
}
}
Tip: Channel layers (Redis) broadcast to all server processes. This means a user connected to process A will receive messages sent from process B — exactly what you need in multi-worker deployments.