1.背景
Celery 是一個功能強大、靈活可擴展的任務隊列框架,廣泛應用于異步任務處理、定時任務調度和分布式系統中。它簡化了任務的管理和執行,提高了系統的可靠性、性能和可擴展性。無論是構建高性能的 Web 應用程序、處理大量的異步任務還是調度定時任務,Celery 都是一個非常有價值的工具。
在某些場景下,我們可能需要配置多個消費者來處理不同類型的任務,以便更好地分配和利用系統資源。本文將重點介紹如何配置 Celery 多消費者,以及如何確保不同任務由不同消費者進行消費。
2.Celery組件介紹
2.1 Celery Worker
是負責執行任務的后臺工作進程。它從消息代理(如 RabbitMQ 或 Redis)接收任務消息,并按照預定義的任務路由和調度規則執行任務。可以啟動多個 Celery Worker 來實現并發處理和分布式處理任務。
2.2 Celery Beat
是 Celery 的任務調度器。它負責按照預定的時間表和規則觸發定時任務的執行。Celery Beat 使用了底層的消息代理來觸發任務的調度,確保任務在指定的時間點或時間間隔內被執行。
2.3 消息代理Broker
是 Celery 的核心組件之一。它負責接收、存儲和分發任務消息。常用的消息代理包括 RabbitMQ、Redis等。消息代理實現了任務消息的隊列和分發機制,確保任務能夠安全、可靠地傳遞給 Celery Worker 進行執行。
2.4 結果存儲組件Result Backend
是用于存儲任務的執行結果。當任務執行完成后,Celery 將結果存儲到指定的后端,如數據庫、緩存或消息代理。結果存儲允許您輕松地獲取任務的執行結果,進行后續處理或展示。
3.配置多消費者
如果想在同一個應用程序app中配置不同的任務隊列,以便不同的消費者處理不同的隊列,可以按照以下步驟進行配置:
3.1 加載 Celery 配置
在 Django 項目的根目錄中的 celery.py 文件中,創建一個 Celery 實例,并從 Django 的配置中加載 Celery 配置:
from celery import Celery
from django.conf import settings
app = Celery('project')
app.config_from_object(settings, namespace='CELERY')
3.2 配置 Celery 的隊列和路由
在 Django 的 settings.py 文件中,配置 Celery 的隊列和路由:
CELERY_QUEUES = {
Queue('default', Exchange('default'), routing_key='default', exchange_type="topic"),
Queue('long_task', Exchange('long_task'), routing_key='long_task', exchange_type='topic')
# 可以配置更多隊列
}
CELERY_ROUTES = {
'app1.tasks.task1': {'queue': 'default'},
'app1.tasks.task2': {'queue': 'long_task'},
# 配置其他任務和隊列的映射關系
}
CELERY_DEFAULT_QUEUE = 'default' # 設置默認隊列名稱
CELERY_DEFAULT_ROUTING_KEY = 'default' # 設置默認路由鍵
# 配置 Celery Broker(消息代理)
CELERY_BROKER_URL = 'amqp://localhost' # RabbitMQ 的連接 URL 或 Redis 的連接 URL
CELERY_RESULT_BACKEND = 'db+postgresql://your_username:your_password@localhost/your_database' # 結果存儲后端配置
在上面的示例中,我們定義了兩個隊列(queue1 和 queue2),并將任務 app1.tasks.task1 分配給 queue1,將任務 app1.tasks.task2 分配給 queue2。也可以根據實際需求配置更多的隊列和任務的映射關系。
3.3 指定隊列
在任務定義中,為每個任務指定相應的隊列:
from celery import shared_task
@shared_task(queue='default')
def task1():
# 處理隊列1的任務邏輯
@shared_task(queue='long_task')
def task2():
# 處理隊列2的任務邏輯
通過為任務添加queue參數,并指定相應的隊列名稱,將任務與特定的隊列關聯起來。
3.4 啟動Worker
啟動多個 Celery Worker,并為每個 Worker 指定相應的隊列:
celery -A project worker -Q default -c 20
celery -A project worker -Q long_task -c 20
在每個命令中使用-Q參數來指定要處理的隊列名稱。
4.總結
通過以上步驟,我們可以在同一個應用程序app中配置不同的隊列,并由不同的消費者處理不同的隊列任務。這樣可以提高系統的可伸縮性和性能,使任務處理更加高效和靈活,更好地管理任務的分配和利用系統資源。