News from this site

 Rental advertising space, please contact the webmaster if you need cooperation


+focus
focused

classification  

no classification

tag  

no tag

date  

2024-11(8)

python下celery的基本使用

posted on 2023-06-03 20:45     read(1085)     comment(0)     like(12)     collect(4)


1. Basic introduction

CeleryIt is Pythona simple, flexible, and reliable distributed system for processing large amounts of information written by , which also provides the tools needed to operate and maintain distributed systems. CeleryFocus on real-time task processing and support task scheduling .

Simply put, it is a distributed queue management tool that uses the interface provided by celery to quickly implement and manage a distributed task queue.

One thing we need to be clear is that Celeryit is not a task queue itself, but a task queue 分布式队列的管理工具. Celery encapsulates various operations for operating common task queues, such as listening to a task queue and getting data from the queue for consumption.

2. Usage scenarios

It allows the execution of tasks to be completely separated from the main program, and can even be assigned to run on other hosts. We usually use it to implement asynchronous tasks ( async task) and timed tasks ( crontab).

  • Asynchronous tasks: submit time-consuming operation tasks to Celeryasynchronous execution, such as sending SMS/email, message push, audio and video processing, etc.
  • Timed task: perform something regularly, such as daily data statistics

3. Workflow and Components

Here is a picture to illustrate:
insert image description here

Celery's architecture consists of three parts, message middleware ( message broker), task execution unit ( worker) and task execution result storage ( task result store).

message middleware

Celery itself does not provide message services, but it can be easily integrated with message middleware provided by third parties. Including RabbitMQ, Redisetc., it is officially recommended rabbitMQbecause it is durable and stable.

task execution unit

WorkerIt is Celerythe unit of task execution provided, workerrunning concurrently in distributed system nodes.

Storage of task results

Task result storeIt is used to store the results of tasks executed by Worker, and Celerysupports storing the results of tasks in different ways, including AMQP, redis, etc.

In addition, Celery also supports different means of concurrency and serialization.

  • Concurrency : Prefork, Eventlet, gevent, threads/single threaded
  • Serialization : pickle, json, yaml, msgpack. zlib, bzip2 compression, Cryptographic message signing, etc.

Install the module first

pip install celery
pip install redis

4. Celery executes asynchronous tasks

4.1 Basic use

The project structure here is as follows:

insert image description here

The first step: first create celery related configuration configurationcelery_object.py

import celery

# 执行如下命令: celery -A celery_object worker -l info

backend = "redis://127.0.0.1:6379/4"  # 设置redis的4号数据库来存放结果
broker = "redis://127.0.0.1:6379/5"  # 设置redis的5号数据库存放消息中间件
celery_app = celery.Celery(
    "celery_demo",
    backend=backend,
    broker=broker,
    include=[
        "celery_task",
    ],
)

celery_app.conf.task_serializer = "json"
celery_app.conf.result_serializer = "json"
celery_app.conf.accept_content = ["json"]

celery_app.conf.timezone = "Asia/Shanghai"  # 时区
celery_app.conf.enable_utc = False  # 是否使用UTC

Parameter Description:

  • backend is where the result is stored after the execution of the asynchronous task is completed.
  • The broker is the worker node that specifically executes the task.
  • The celery.Celery() method is to instantiate a celery object.
  • Include needs to write all tasks of celery into the list.

Step 2: Create task-related filescelery_task.py

import time

from celery_object import celery_app

@celery_app.task
def send_email(name):
    print("向%s发送邮件..." % name)
    time.sleep(5)
    print("向%s发送邮件完成" % name)
    return f"成功拿到{name}发送的邮件!"

@celery_app.task
def send_msg(name):
    print("向%s发送短信..." % name)
    time.sleep(5)
    print("向%s发送短信完成" % name)
    return f"成功拿到{name}发送的短信!"

Through @celery_app.tasksuch a decorator, the corresponding function is successfully turned into a corresponding celeryasynchronous workerfunction.

Then we execute the command in the directory where the project is currently located:

celery -A celery_object worker -l info
  • -A refers to the application application object
  • celery_object is the name of the script ( ps: celery has high requirements for directory-path-name )
  • worker is a worker (fixed wording)
  • -l refers to the log level, here is infothe log of the print level

After that, the following output can be displayed, which means celerythe startup is successful:

insert image description here
Then we can celerycreate a .py file for the production task produce_result.

from celery_task import send_email, send_msg

if __name__ == "__main__":
    for i in range(10):
        result = send_email.delay(f"张三{i}")
        print(result.id)
        result2 = send_msg.delay(f"李四{i}")
        print(result2.id)

When running the production task program, you will see the following data, where the task ID is printed.

insert image description here

Then you can see the following things on the terminal, which means that celerythe task in the queue has been successfully obtained and consumed.

insert image description here
Then open our and redisyou can see that there are corresponding data records.

insert image description here

At the same time, we can also view the status of the celery task ID , check_result.pywhich is written as follows:

from celery.result import AsyncResult
from celery_object import celery_app

async_result = AsyncResult(id="d1c722fa-4ebf-432e-967e-a462bdefeac4", app=celery_app)
print("任务状态:", async_result.status)
if async_result.successful():
    result = async_result.get()
    print(result)
    # result.forget() # 将结果删除
elif async_result.failed():
    print("执行失败")
elif async_result.status == "PENDING":
    print("任务等待中被执行")
elif async_result.status == "RETRY":
    print("任务异常后正在重试")
elif async_result.status == "STARTED":
    print("任务已经开始被执行")

operation result:

任务状态: SUCCESS
成功拿到李四0发送的短信!

4.2 Specify task queue

By default, Celery uses a queue celerynamed to store tasks. I found such a way of writing on the Internet

# 定义任务队列.
Queue(‘default’, routing_key=“task.#”),

# 路由键 以 “task.” 开头的消息都进入 default 队列.
Queue(‘web_tasks’, routing_key=“web.#”)

# 路由键 以 “web.” 开头的消息都进入 web_tasks 队列.)
CELERY_DEFAULT_EXCHANGE = ‘tasks’

# 默认的交换机名字为
tasksCELERY_DEFAULT_EXCHANGE_KEY = ‘topic’

# 默认的交换机类型为
topicCELERY_DEFAULT_ROUTING_KEY = ‘task.default’

# 默认的路由键是 task.default , 这个路由键符合上面的 default 队列.
CELERY_ROUTES = { ‘proj.tasks.add’: { ‘queue’: ‘web_tasks’, ‘routing_key’: ‘web.add’, }}

# 使用指定队列的方式启动消费者进程.$ celery -A proj worker -Q web_tasks -l info

# 该 worker 只会执行 web_tasks 中任务, 我们可以合理安排消费者数量, 让 web_tasks 中任务的优先级更高.

But it doesn't feel very elegant. Here the blogger recommends another way of writing.

The directory structure has been slightly adjusted:

Please add a picture description
Create celery_object.pya file and write as follows:

import celery

backend = "redis://127.0.0.1:6379/4"  # 设置redis的4号数据库来存放结果
broker = "amqp://guest:guest@127.0.0.1:5672"  # 设置mq当做boker中间人消息件
celery_app = celery.Celery(
    "celery_demo",
    backend=backend,
    broker=broker,
    include=[
        "celery_email_task",
        "celery_msg_task",
    ],
)

celery_app.conf.task_serializer = "json"
celery_app.conf.result_serializer = "json"
celery_app.conf.accept_content = ["json"]

celery_app.conf.timezone = "Asia/Shanghai"  # 时区
celery_app.conf.enable_utc = False  # 是否使用UTC

celery_app.conf.ONCE = {
    "backend": "celery_once.backends.Redis",
    "settings": {"url": "redis://localhost:6379/8", "default_timeout": 60 * 60},
}


celery_app.conf.task_routes = {
    "send_email": "send_email_queue",
    "send_msg": "send_msg_queue",
}
# 启动执行命令: celery -A celery_object worker -l info -Q send_email_queue
# 启动执行命令: celery -A celery_object worker -l info -Q send_msg_queue

celeryA slight adjustment has been made to the previous configuration file, adding an ONCEattribute ( necessary ) and another task_routesattribute ( necessary ). This attribute configuration will select different wokertask queues for different tasks.

Then celery_email_task.py, write as follows:

import time
from celery_once import QueueOnce

from celery_object import celery_app

class SendEmailClass(QueueOnce):
    name = "send_email"
    once = {"graceful": True}
    ignore_result = True

    def __init__(self, *args, **kwargs):
        super(SendEmailClass, self).__init__(*args, **kwargs)

    def run(self, name):
        print("向%s发送邮件..." % name)
        time.sleep(5)
        print("向%s发送邮件完成" % name)
        return f"成功拿到{name}发送的邮件!"


send_email = celery_app.register_task(SendEmailClass())

celery_msg_task.pyWrite as follows:

import time
from celery_once import QueueOnce

from celery_object import celery_app


class SendEmailClass(QueueOnce):
    name = "send_email"
    once = {"graceful": True}
    ignore_result = True

    def __init__(self, *args, **kwargs):
        super(SendEmailClass, self).__init__(*args, **kwargs)

    def run(self, name):
        print("向%s发送邮件..." % name)
        time.sleep(5)
        print("向%s发送邮件完成" % name)
        return f"成功拿到{name}发送的邮件!"


send_email = celery_app.register_task(SendEmailClass())

Parameter explanation:

  • register_task()It is to decorate the corresponding function or class as a celery object, the effect is the same as @celery_app.task.
  • QueueOnceA class is a celery_oncesubclass inherited from a parent class. Custom class objects and methods must inherit from this class, and this class will execute run()methods by default and support parameter passing. Here I am name.

Celery OnceIt is also implemented by Redislocking . Celery Once Taskimplements QueueOncethe class . This class provides the function of task deduplication. Therefore, when using it, the method we implement needs to be QueueOnceset to base

@task(base=QueueOnce, once={'graceful': True}) 

The latter once parameter indicates the processing method when a repeated method is encountered. By default graceful 为 False, Celery will throw AlreadyQueuedan exception . If it is manually set True, it will be silently processed.

Then create produce_task.pyand write the following code:

from celery_object import celery_app

if __name__ == "__main__":
    for i in range(10):

        result3 = celery_app.send_task("send_email", args=[f"王五{i}"])
        print(result3.id)

send_task()The method will send the task to the specified wokerone, supports passing parameters args, and can also specify a specific queue queue, the effect is delay()similar to the method.

Then run the command:

celery -A celery_object worker -l info -Q send_email_queue
  • The -Q parameter is to monitor and consume the specified task queue

Seeing the following output means that celerythe startup is successful

Please add a picture description
Run produce_task.pyto start the production task:

Please add a picture description

Open the terminal to see the consumption data celeryfrom the specified queuesend_email_queue

Please add a picture description

Then check mqwhether the queue is successfully created
Please add a picture description
. Through our operation, we successfully celerymonitor the specified queue and consume data.



Category of website: technical article > Blog

Author:Abstract

link:http://www.pythonblackhole.com/blog/article/78476/a6ab7a541c4b5b260616/

source:python black hole net

Please indicate the source for any form of reprinting. If any infringement is discovered, it will be held legally responsible.

12 0
collect article
collected

Comment content: (supports up to 255 characters)