通过 Docker 在 Django 中使用 Celery,Redis 和 WebSocket

在本文中,我将在 Django 的一个非常基本的任务中使用 Celery、Redis 和 WebSocket,并告诉你它们的工作原理和设置方法。

首先,请允许我解释一下我们的简单任务,以及我们将从中学到什么。我们将有一个 Student 模型。使用 Celery 和 Redis,我们将尝试每 10 秒删除该模型中的一个对象。然后,我们将使用 WebSocket 在 html 页面中实时显示这些事件。这样,我们就能看到网站上添加或删除的所有数据,而无需刷新页面。

Celery 和 Redis

1. 首先安装 Celery 和 Redis:

pip install celery redis

如果 Redis 没有安装到你的电脑上,你会收到一个错误信息,所以请先安装 Redis:

brew install redis

2. 唤醒沉睡巨人的时候了:

redis-server

运行其他命令:

celery -A myproject worker --loglevel=info 
celery -A myproject beat --loglevel=info

一开始,你可能会觉得用几个终端工作既累又没意义,不过别担心,我们会用 Docker 解决这个问题。在接下来的步骤中,我们将学习如何分别安装这些工具。

3. 现在,将创建必要的文件并在其中编写代码:

# home/models.py:
from django.db import models

class Student(models.Model):
    name = models.CharField(max_length=50)
    surname = models.CharField(max_length=50)
    age = models.IntegerField()

    def __str__(self):
        return self.name




# core/celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'core.settings')

app = Celery('core')
app.config_from_object('django.conf:settings', namespace='CELERY')

app.autodiscover_tasks(['core'])

@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')




# core/__init__.py
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app

__all__ = ('celery_app',)



# core/settings.py:
CELERY_BROKER_URL = "redis://redis:6379/0"
CELERY_RESULT_BACKEND = "redis://redis:6379/0"
CELERY_BEAT_SCHEDULE = {
    "delete-student-objects-every-10-seconds": {
        "task": "core.tasks.delete_student_objects",
        "schedule": 10.0,  # Every 10 seconds
    },
}

CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True



# core/tasks.py
from celery import shared_task
from home.models import Student

@shared_task
def delete_student_objects():
    # Find and delete the most recently added student object
    latest_student = Student.objects.order_by('-id').first()
    if latest_student:
        latest_student.delete()
        print(f"Deleted student: {latest_student.name} {latest_student.surname}")
    else:
        print("No students to delete.")

好了,现在模型中的对象将每 10 秒动态删除一次。Celery 和 Redis 的工作已经完成。现在,让我们使用 WebSocket 在网站上实时查看这些更改。

WEBSOCKET | CHANNELS

️pip install channels channels-redis

安装完成后,我们来看看代码。代码部分可能有点长,但看看还是值得的:

# core/settings.py
INSTALLED_APPS = [
    # Other apps
    'channels',
    'home',  # Student model in app
]

ASGI_APPLICATION = 'core.asgi.application'

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [("redis", 6379)],
        },
    },
}



# core/asgi.py
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
import home.routing

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'core.settings')

application = ProtocolTypeRouter({
    "http": get_asgi_application(),
    "websocket": AuthMiddlewareStack(
        URLRouter(
            home.routing.websocket_urlpatterns
        )
    ),
})



# core/routing.py
from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application
from channels.auth import AuthMiddlewareStack
import home.routing

application = ProtocolTypeRouter({
    "http": get_asgi_application(),
    "websocket": AuthMiddlewareStack(
        URLRouter(
            home.routing.websocket_urlpatterns
        )
    ),
})



# home/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer

class StudentConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        await self.channel_layer.group_add(
            "students",
            self.channel_name
        )
        await self.accept()

    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(
            "students",
            self.channel_name
        )

    async def receive(self, text_data):
        data = json.loads(text_data)
        message = data['message']

        await self.channel_layer.group_send(
            "students",
            {
                'type': 'student_message',
                'message': message
            }
        )

    async def student_message(self, event):
        message = event['message']

        await self.send(text_data=json.dumps({
            'message': message
        }))



# home/routing.py
from django.urls import path
from . import consumers

websocket_urlpatterns = [
    path('ws/students/', consumers.StudentConsumer.as_asgi()),
]



# home/signals.py
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
from .models import Student
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer

@receiver(post_save, sender=Student)
def student_saved(sender, instance, **kwargs):
    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)(
        "students",
        {
            'type': 'student_message',
            'message': f'Student {instance.name} {instance.surname} has been added.'
        }
    )

@receiver(post_delete, sender=Student)
def student_deleted(sender, instance, **kwargs):
    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)(
        "students",
        {
            'type': 'student_message',
            'message': f'Student {instance.name} {instance.surname} has been deleted.'
        }
    )




# home/apps.py
from django.apps import AppConfig

class HomeConfig(AppConfig):
    name = 'home'

    def ready(self):
        import home.signals



# home/views.py
from django.shortcuts import render
from .models import Student

def homePage(request):
    students = Student.objects.all()
    context = {"students": students}
    return render(request, 'index.html', context=context)

是的,经过漫长的编码,我们已经为 python 设置好了 WebSocket。现在,让我们在 html 文件中编写代码,然后运行 django 服务器查看结果。请注意,在此过程中,celery 和 redis 应保持运行。

<!-- templates/index.html-->
<!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>The Magnificent Four</title>
</head>

<body>
    <h2 style="text-align: center;">Celery | Redis | WebSocket | Docker</h2>

    <div class="students">
        <ul id="students-list">
            {% for student in students %}
                <li id="student-{{ student.id }}">Student: {{ student.name }} {{ student.surname }} - Age: {{ student.age }}
            </li>
            {% endfor %}
        </ul>
        <p id="no-students" {% if students %} style="display:none;" {% endif %}>No students available</p>
    </div>

    <script>
        const studentList = document.getElementById('students-list');
        const noStudentsMessage = document.getElementById('no-students');

        const ws = new WebSocket('ws://' + window.location.host + '/ws/students/');

        ws.onmessage = function (event) {
            const data = JSON.parse(event.data);
            const message = data.message;

            if (message.includes('added')) {
                const li = document.createElement('li');
                li.textContent = message;
                studentList.appendChild(li);
                noStudentsMessage.style.display = 'none';
            } else if (message.includes('deleted')) {
                const li = document.querySelector(`#students-list li:last-child`);
                if (li) {
                    li.remove();
                }
                if (!studentList.children.length) {
                    noStudentsMessage.style.display = 'block';
                }
            }
        };
    </script>
</body>

</html>

最后,在执行完 “python3 manage.py runserver “命令后,你会看到位于网址 “http://127.0.0.1:8000/”的以列表格式显示的数据库在 10 秒后被添加和删除。我想这是你喜欢的。我们把 Django 变成了一条龙 :D。

我听到你在说:”嘿,伙计,你要去哪里,你不能把我们一个人留在这些终端上”。不必惊慌,让我们来解决这个问题,增加我们的乐趣。

Docker

1. 首先,需要下载 Docker。下载完成后,加入 Docker。进行必要的设置后,每次进入 Docker 时,它就已经开始工作了。

2. 创建Dockerfile和docker-compose.yml:

# Dockerfile
FROM python:3.12-slim

WORKDIR /code

COPY requirements.txt /code/
RUN pip install --no-cache-dir -r requirements.txt
RUN pip install daphne

COPY . /code/

EXPOSE 8000

CMD ["daphne", "-b", "0.0.0.0", "-p", "8000", "core.asgi:application"]




# docker-compose.yml
services:
  redis:
    image: redis:alpine
    ports:
      - "6379:6379"

  django:
    build: .
    command: daphne -b 0.0.0.0 -p 8000 core.asgi:application
    volumes:
      - .:/code
    ports:
      - "8000:8000"
    depends_on:
      - redis
      - celery_worker
      - celery_beat

  celery_worker:
    build: .
    command: celery -A core worker --loglevel=info
    volumes:
      - .:/code
    depends_on:
      - redis

  celery_beat:
    build: .
    command: celery -A core beat --loglevel=info
    volumes:
      - .:/code
    depends_on:
      - redis

就这样,终于结束了。现在,你只需在终端输入 “docker compose up – build “并运行它。这样,Celery、Redis 和 WebSocket 就都能正常运行了。

希望这篇文章对你有用。我们探讨了如何在不深入主题的情况下使用必要的工具。当然,如果你能研究一下代码,并如我所说,提前准备学习 Celery、Redis、WebSocket、Docker 等知识,那就更好了。

完整的代码请见:https://github.com/TheHormat/The__Magnificent__Four

作者:TheHormat

本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/51468.html

(0)

相关推荐

发表回复

登录后才能评论