大家好,我在项目中使用了Django、Celery和RabbitMQ。我注意到在重负载下,处理的数量不等于成功+失败的数量。
比如现在
Processed: 1598741
Succeeded: 1598591
Failed : 1
Retried : 0
每个任务都会在弹性体中创建一个对象,因此任务的丢失也通过发送的请求数与弹性体中的对象数之间的差异来确认。损失约为 1.5%。同时,我无法在任务中的某处明确捕获错误。我将日志写入文件(一般)和 Centry(用于错误)。例如,这里我在哨兵中看到了这个 Failed,但我不知道还有 150 条消息在哪里。
在任务内部,我设置了日志记录以记录每个任务的启动并跟踪它们丢失的时间点。例如,我们有这样的任务:
task1 - парсинг запроса
task2 - основная работа
task3 - запись результата
Task1 从视图中启动,该任务获取一组对象并循环运行 N 个 task2 任务,其中 N 是集合中对象的数量。每个task2任务,在工作完成后,启动task3。
在这里,task2 和 task3 之间的消息丢失了。此外,我可以计算并确保 task2 收到了正确数量的对象,但 task3 的启动次数较少。
我怎样才能找出原因?
PS:不说的就写吧,我写完)
芹菜设置
CELERY_QUEUES = (
Queue('network', Exchange('network'), routing_key='network', queue_arguments={'x-max-priority': 10}),
Queue('process_item', Exchange('prepare_objects'), routing_key='process_item', queue_arguments={'x-max-priority': 10}),
Queue('dispatch', Exchange('dispatch'), routing_key='dispatch', queue_arguments={'x-max-priority': 10}),
Queue('default', Exchange('default'), routing_key='default', queue_arguments={'x-max-priority': 10}),
)
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_ROUTING_KEY = 'default'
CELERY_TASK_ROUTES = {
'api.tasks.task3': {'queue': 'network'},
'api.tasks.task2': {'queue': 'process_item'},
'api.tasks.task1': {'queue': 'dispatch'},
}
# Setup Celery Logger
CELERYD_HIJACK_ROOT_LOGGER = False
LOGGING['handlers']['celery_file_handler'] = {
'level': 'INFO',
'class': 'logging.handlers.RotatingFileHandler',
'formatter': 'verbose',
'filename': os.path.join(BASE_DIR, 'logs', 'celery', '{}.log'.format(datetime.now().date())),
'maxBytes': 1024000,
'backupCount': 3,
}
LOGGING['loggers']['celery.tasks'] = {
'handlers': ['celery_file_handler', 'console'],
'level': 'INFO',
'propagate': False
}
CELERY_BROKER_URL = 'amqp://@rabbit:'
CELERY_IGNORE_RESULT = True
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
# CELERY_TIMEZONE = 'Europe/Moscow'
CELERY_TASK_DEFAULT_RETRY_DELAY = 30
CELERY_ENABLE_UTC = True
CELERY_WORKER_MAX_MEMORY_PER_CHILD = 60_000 # 60 MB
CELERY_TASK_SERIALIZER = 'json'
CELERY_TASK_TIME_LIMIT = 120
CELERY_TASK_MAX_RETRIES = 10
CELERY_ACKS_LATE = True
CELERYD_PREFETCH_MULTIPLIER = 1
启动芹菜
python -m celery multi start --app=APP --pidfile=/tmp/%n.pid --logfile=/dev/null 50 -c 4 -Q:1-4 dispatch -Q:5-49 process_item,network -c:50 1 -Q:50 celery -E &&
在运行中,服务器的负载大约为 80-95%。
所以,问题的本质是工人太多。一旦我将他们的数量减少到 20,一切都开始完美运行。不知道是什么,如果有知道的人告诉我就好了。或者我稍后会找时间弄清楚。