在本文中,我将在 Django 的一个非常基本的任务中使用 Celery、Redis 和 WebSocket,并告诉你它们的工作原理和设置方法。
首先,请允许我解释一下我们的简单任务,以及我们将从中学到什么。我们将有一个 Student 模型。使用 Celery 和 Redis,我们将尝试每 10 秒删除该模型中的一个对象。然后,我们将使用 WebSocket 在 html 页面中实时显示这些事件。这样,我们就能看到网站上添加或删除的所有数据,而无需刷新页面。
Celery 和 Redis
1. 首先安装 Celery 和 Redis:
pip install celery redis
如果 Redis 没有安装到你的电脑上,你会收到一个错误信息,所以请先安装 Redis:
brew install redis
2. 唤醒沉睡巨人的时候了:
redis-server
运行其他命令:
celery -A myproject worker --loglevel=info
celery -A myproject beat --loglevel=info
一开始,你可能会觉得用几个终端工作既累又没意义,不过别担心,我们会用 Docker 解决这个问题。在接下来的步骤中,我们将学习如何分别安装这些工具。
3. 现在,将创建必要的文件并在其中编写代码:
# home/models.py:
from django.db import models
class Student(models.Model):
name = models.CharField(max_length=50)
surname = models.CharField(max_length=50)
age = models.IntegerField()
def __str__(self):
return self.name
# core/celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'core.settings')
app = Celery('core')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(['core'])
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
# core/__init__.py
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ('celery_app',)
# core/settings.py:
CELERY_BROKER_URL = "redis://redis:6379/0"
CELERY_RESULT_BACKEND = "redis://redis:6379/0"
CELERY_BEAT_SCHEDULE = {
"delete-student-objects-every-10-seconds": {
"task": "core.tasks.delete_student_objects",
"schedule": 10.0, # Every 10 seconds
},
}
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
# core/tasks.py
from celery import shared_task
from home.models import Student
@shared_task
def delete_student_objects():
# Find and delete the most recently added student object
latest_student = Student.objects.order_by('-id').first()
if latest_student:
latest_student.delete()
print(f"Deleted student: {latest_student.name} {latest_student.surname}")
else:
print("No students to delete.")
好了,现在模型中的对象将每 10 秒动态删除一次。Celery 和 Redis 的工作已经完成。现在,让我们使用 WebSocket 在网站上实时查看这些更改。
WEBSOCKET | CHANNELS
️pip install channels channels-redis
安装完成后,我们来看看代码。代码部分可能有点长,但看看还是值得的:
# core/settings.py
INSTALLED_APPS = [
# Other apps
'channels',
'home', # Student model in app
]
ASGI_APPLICATION = 'core.asgi.application'
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [("redis", 6379)],
},
},
}
# core/asgi.py
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
import home.routing
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'core.settings')
application = ProtocolTypeRouter({
"http": get_asgi_application(),
"websocket": AuthMiddlewareStack(
URLRouter(
home.routing.websocket_urlpatterns
)
),
})
# core/routing.py
from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application
from channels.auth import AuthMiddlewareStack
import home.routing
application = ProtocolTypeRouter({
"http": get_asgi_application(),
"websocket": AuthMiddlewareStack(
URLRouter(
home.routing.websocket_urlpatterns
)
),
})
# home/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer
class StudentConsumer(AsyncWebsocketConsumer):
async def connect(self):
await self.channel_layer.group_add(
"students",
self.channel_name
)
await self.accept()
async def disconnect(self, close_code):
await self.channel_layer.group_discard(
"students",
self.channel_name
)
async def receive(self, text_data):
data = json.loads(text_data)
message = data['message']
await self.channel_layer.group_send(
"students",
{
'type': 'student_message',
'message': message
}
)
async def student_message(self, event):
message = event['message']
await self.send(text_data=json.dumps({
'message': message
}))
# home/routing.py
from django.urls import path
from . import consumers
websocket_urlpatterns = [
path('ws/students/', consumers.StudentConsumer.as_asgi()),
]
# home/signals.py
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
from .models import Student
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
@receiver(post_save, sender=Student)
def student_saved(sender, instance, **kwargs):
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
"students",
{
'type': 'student_message',
'message': f'Student {instance.name} {instance.surname} has been added.'
}
)
@receiver(post_delete, sender=Student)
def student_deleted(sender, instance, **kwargs):
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
"students",
{
'type': 'student_message',
'message': f'Student {instance.name} {instance.surname} has been deleted.'
}
)
# home/apps.py
from django.apps import AppConfig
class HomeConfig(AppConfig):
name = 'home'
def ready(self):
import home.signals
# home/views.py
from django.shortcuts import render
from .models import Student
def homePage(request):
students = Student.objects.all()
context = {"students": students}
return render(request, 'index.html', context=context)
是的,经过漫长的编码,我们已经为 python 设置好了 WebSocket。现在,让我们在 html 文件中编写代码,然后运行 django 服务器查看结果。请注意,在此过程中,celery 和 redis 应保持运行。
<!-- templates/index.html-->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>The Magnificent Four</title>
</head>
<body>
<h2 style="text-align: center;">Celery | Redis | WebSocket | Docker</h2>
<div class="students">
<ul id="students-list">
{% for student in students %}
<li id="student-{{ student.id }}">Student: {{ student.name }} {{ student.surname }} - Age: {{ student.age }}
</li>
{% endfor %}
</ul>
<p id="no-students" {% if students %} style="display:none;" {% endif %}>No students available</p>
</div>
<script>
const studentList = document.getElementById('students-list');
const noStudentsMessage = document.getElementById('no-students');
const ws = new WebSocket('ws://' + window.location.host + '/ws/students/');
ws.onmessage = function (event) {
const data = JSON.parse(event.data);
const message = data.message;
if (message.includes('added')) {
const li = document.createElement('li');
li.textContent = message;
studentList.appendChild(li);
noStudentsMessage.style.display = 'none';
} else if (message.includes('deleted')) {
const li = document.querySelector(`#students-list li:last-child`);
if (li) {
li.remove();
}
if (!studentList.children.length) {
noStudentsMessage.style.display = 'block';
}
}
};
</script>
</body>
</html>
最后,在执行完 “python3 manage.py runserver “命令后,你会看到位于网址 “http://127.0.0.1:8000/”的以列表格式显示的数据库在 10 秒后被添加和删除。我想这是你喜欢的。我们把 Django 变成了一条龙 :D。
我听到你在说:”嘿,伙计,你要去哪里,你不能把我们一个人留在这些终端上”。不必惊慌,让我们来解决这个问题,增加我们的乐趣。
Docker
1. 首先,需要下载 Docker。下载完成后,加入 Docker。进行必要的设置后,每次进入 Docker 时,它就已经开始工作了。
2. 创建Dockerfile和docker-compose.yml:
# Dockerfile
FROM python:3.12-slim
WORKDIR /code
COPY requirements.txt /code/
RUN pip install --no-cache-dir -r requirements.txt
RUN pip install daphne
COPY . /code/
EXPOSE 8000
CMD ["daphne", "-b", "0.0.0.0", "-p", "8000", "core.asgi:application"]
# docker-compose.yml
services:
redis:
image: redis:alpine
ports:
- "6379:6379"
django:
build: .
command: daphne -b 0.0.0.0 -p 8000 core.asgi:application
volumes:
- .:/code
ports:
- "8000:8000"
depends_on:
- redis
- celery_worker
- celery_beat
celery_worker:
build: .
command: celery -A core worker --loglevel=info
volumes:
- .:/code
depends_on:
- redis
celery_beat:
build: .
command: celery -A core beat --loglevel=info
volumes:
- .:/code
depends_on:
- redis
就这样,终于结束了。现在,你只需在终端输入 “docker compose up – build “并运行它。这样,Celery、Redis 和 WebSocket 就都能正常运行了。
希望这篇文章对你有用。我们探讨了如何在不深入主题的情况下使用必要的工具。当然,如果你能研究一下代码,并如我所说,提前准备学习 Celery、Redis、WebSocket、Docker 等知识,那就更好了。
完整的代码请见:https://github.com/TheHormat/The__Magnificent__Four
作者:TheHormat
本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/51468.html