posted on 2023-06-03 20:45 read(1085) comment(0) like(12) collect(4)
Celery
It is Python
a simple, flexible, and reliable distributed system for processing large amounts of information written by , which also provides the tools needed to operate and maintain distributed systems. Celery
Focus on real-time task processing and support task scheduling .
Simply put, it is a distributed queue management tool that uses the interface provided by celery to quickly implement and manage a distributed task queue.
One thing we need to be clear is that Celery
it is not a task queue itself, but a task queue 分布式队列的管理工具
. Celery encapsulates various operations for operating common task queues, such as listening to a task queue and getting data from the queue for consumption.
It allows the execution of tasks to be completely separated from the main program, and can even be assigned to run on other hosts. We usually use it to implement asynchronous tasks ( async task
) and timed tasks ( crontab
).
Celery
asynchronous execution, such as sending SMS/email, message push, audio and video processing, etc.Here is a picture to illustrate:
Celery's architecture consists of three parts, message middleware ( message broker
), task execution unit ( worker
) and task execution result storage ( task result store
).
message middleware
Celery itself does not provide message services, but it can be easily integrated with message middleware provided by third parties. Including RabbitMQ
, Redis
etc., it is officially recommended rabbitMQ
because it is durable and stable.
task execution unit
Worker
It is Celery
the unit of task execution provided, worker
running concurrently in distributed system nodes.
Storage of task results
Task result store
It is used to store the results of tasks executed by Worker, and Celery
supports storing the results of tasks in different ways, including AMQP, redis
, etc.
In addition, Celery also supports different means of concurrency and serialization.
pip install celery
pip install redis
The project structure here is as follows:
The first step: first create celery related configuration configurationcelery_object.py
import celery # 执行如下命令: celery -A celery_object worker -l info backend = "redis://127.0.0.1:6379/4" # 设置redis的4号数据库来存放结果 broker = "redis://127.0.0.1:6379/5" # 设置redis的5号数据库存放消息中间件 celery_app = celery.Celery( "celery_demo", backend=backend, broker=broker, include=[ "celery_task", ], ) celery_app.conf.task_serializer = "json" celery_app.conf.result_serializer = "json" celery_app.conf.accept_content = ["json"] celery_app.conf.timezone = "Asia/Shanghai" # 时区 celery_app.conf.enable_utc = False # 是否使用UTC
Parameter Description:
Step 2: Create task-related filescelery_task.py
import time from celery_object import celery_app @celery_app.task def send_email(name): print("向%s发送邮件..." % name) time.sleep(5) print("向%s发送邮件完成" % name) return f"成功拿到{name}发送的邮件!" @celery_app.task def send_msg(name): print("向%s发送短信..." % name) time.sleep(5) print("向%s发送短信完成" % name) return f"成功拿到{name}发送的短信!"
Through @celery_app.task
such a decorator, the corresponding function is successfully turned into a corresponding celery
asynchronous worker
function.
Then we execute the command in the directory where the project is currently located:
celery -A celery_object worker -l info
info
the log of the print levelAfter that, the following output can be displayed, which means celery
the startup is successful:
Then we can celery
create a .py file for the production task produce_result
.
from celery_task import send_email, send_msg
if __name__ == "__main__":
for i in range(10):
result = send_email.delay(f"张三{i}")
print(result.id)
result2 = send_msg.delay(f"李四{i}")
print(result2.id)
When running the production task program, you will see the following data, where the task ID is printed.
Then you can see the following things on the terminal, which means that celery
the task in the queue has been successfully obtained and consumed.
Then open our and redis
you can see that there are corresponding data records.
At the same time, we can also view the status of the celery task ID , check_result.py
which is written as follows:
from celery.result import AsyncResult from celery_object import celery_app async_result = AsyncResult(id="d1c722fa-4ebf-432e-967e-a462bdefeac4", app=celery_app) print("任务状态:", async_result.status) if async_result.successful(): result = async_result.get() print(result) # result.forget() # 将结果删除 elif async_result.failed(): print("执行失败") elif async_result.status == "PENDING": print("任务等待中被执行") elif async_result.status == "RETRY": print("任务异常后正在重试") elif async_result.status == "STARTED": print("任务已经开始被执行")
operation result:
任务状态: SUCCESS
成功拿到李四0发送的短信!
By default, Celery uses a queue celery
named to store tasks. I found such a way of writing on the Internet
# 定义任务队列. Queue(‘default’, routing_key=“task.#”), # 路由键 以 “task.” 开头的消息都进入 default 队列. Queue(‘web_tasks’, routing_key=“web.#”) # 路由键 以 “web.” 开头的消息都进入 web_tasks 队列.) CELERY_DEFAULT_EXCHANGE = ‘tasks’ # 默认的交换机名字为 tasksCELERY_DEFAULT_EXCHANGE_KEY = ‘topic’ # 默认的交换机类型为 topicCELERY_DEFAULT_ROUTING_KEY = ‘task.default’ # 默认的路由键是 task.default , 这个路由键符合上面的 default 队列. CELERY_ROUTES = { ‘proj.tasks.add’: { ‘queue’: ‘web_tasks’, ‘routing_key’: ‘web.add’, }} # 使用指定队列的方式启动消费者进程.$ celery -A proj worker -Q web_tasks -l info # 该 worker 只会执行 web_tasks 中任务, 我们可以合理安排消费者数量, 让 web_tasks 中任务的优先级更高.
But it doesn't feel very elegant. Here the blogger recommends another way of writing.
The directory structure has been slightly adjusted:
Create celery_object.py
a file and write as follows:
import celery backend = "redis://127.0.0.1:6379/4" # 设置redis的4号数据库来存放结果 broker = "amqp://guest:guest@127.0.0.1:5672" # 设置mq当做boker中间人消息件 celery_app = celery.Celery( "celery_demo", backend=backend, broker=broker, include=[ "celery_email_task", "celery_msg_task", ], ) celery_app.conf.task_serializer = "json" celery_app.conf.result_serializer = "json" celery_app.conf.accept_content = ["json"] celery_app.conf.timezone = "Asia/Shanghai" # 时区 celery_app.conf.enable_utc = False # 是否使用UTC celery_app.conf.ONCE = { "backend": "celery_once.backends.Redis", "settings": {"url": "redis://localhost:6379/8", "default_timeout": 60 * 60}, } celery_app.conf.task_routes = { "send_email": "send_email_queue", "send_msg": "send_msg_queue", } # 启动执行命令: celery -A celery_object worker -l info -Q send_email_queue # 启动执行命令: celery -A celery_object worker -l info -Q send_msg_queue
celery
A slight adjustment has been made to the previous configuration file, adding an ONCE
attribute ( necessary ) and another task_routes
attribute ( necessary ). This attribute configuration will select different woker
task queues for different tasks.
Then celery_email_task.py
, write as follows:
import time from celery_once import QueueOnce from celery_object import celery_app class SendEmailClass(QueueOnce): name = "send_email" once = {"graceful": True} ignore_result = True def __init__(self, *args, **kwargs): super(SendEmailClass, self).__init__(*args, **kwargs) def run(self, name): print("向%s发送邮件..." % name) time.sleep(5) print("向%s发送邮件完成" % name) return f"成功拿到{name}发送的邮件!" send_email = celery_app.register_task(SendEmailClass())
celery_msg_task.py
Write as follows:
import time from celery_once import QueueOnce from celery_object import celery_app class SendEmailClass(QueueOnce): name = "send_email" once = {"graceful": True} ignore_result = True def __init__(self, *args, **kwargs): super(SendEmailClass, self).__init__(*args, **kwargs) def run(self, name): print("向%s发送邮件..." % name) time.sleep(5) print("向%s发送邮件完成" % name) return f"成功拿到{name}发送的邮件!" send_email = celery_app.register_task(SendEmailClass())
Parameter explanation:
register_task()
It is to decorate the corresponding function or class as a celery object, the effect is the same as @celery_app.task.QueueOnce
A class is a celery_once
subclass inherited from a parent class. Custom class objects and methods must inherit from this class, and this class will execute run()
methods by default and support parameter passing. Here I am name
.Celery Once
It is also implemented by Redis
locking . Celery Once Task
implements QueueOnce
the class . This class provides the function of task deduplication. Therefore, when using it, the method we implement needs to be QueueOnce
set to base
@task(base=QueueOnce, once={'graceful': True})
The latter once parameter indicates the processing method when a repeated method is encountered. By default graceful 为 False
, Celery will throw AlreadyQueued
an exception . If it is manually set True
, it will be silently processed.
Then create produce_task.py
and write the following code:
from celery_object import celery_app
if __name__ == "__main__":
for i in range(10):
result3 = celery_app.send_task("send_email", args=[f"王五{i}"])
print(result3.id)
send_task()
The method will send the task to the specified woker
one, supports passing parameters args, and can also specify a specific queue queue
, the effect is delay()
similar to the method.
Then run the command:
celery -A celery_object worker -l info -Q send_email_queue
Seeing the following output means that celery
the startup is successful
Run produce_task.py
to start the production task:
Open the terminal to see the consumption data celery
from the specified queuesend_email_queue
Then check mq
whether the queue is successfully created
. Through our operation, we successfully celery
monitor the specified queue and consume data.
Author:Abstract
link:http://www.pythonblackhole.com/blog/article/78476/a6ab7a541c4b5b260616/
source:python black hole net
Please indicate the source for any form of reprinting. If any infringement is discovered, it will be held legally responsible.
name:
Comment content: (supports up to 255 characters)
Copyright © 2018-2021 python black hole network All Rights Reserved All rights reserved, and all rights reserved.京ICP备18063182号-7
For complaints and reports, and advertising cooperation, please contact vgs_info@163.com or QQ3083709327
Disclaimer: All articles on the website are uploaded by users and are only for readers' learning and communication use, and commercial use is prohibited. If the article involves pornography, reactionary, infringement and other illegal information, please report it to us and we will delete it immediately after verification!