Celery Task Lifecycle and Dead Letter Queue in Rabbitmq
When configuring the DLQ support:
app = Celery(
...
task_cls=BaseTask,
class BaseTask
Request = 'celery.worker.request:Request'
request: Request
def on_failure(self, exc, task_id, args, kwargs, einfo): # pylint: disable=too-many-arguments
headers = self.request.get('headers')
if not headers:
return
...
if not isinstance(exc, TaskPredicate):
# TODO find a way to send this to DLQ without raising a warning in the worker
raise Reject(str(exc), requeue=False) from exc
default_exchange = Exchange(
app.conf.task_default_exchange,
type=app.conf.task_default_exchange_type)
default_queue = Queue(
app.conf.task_default_queue,
default_exchange,
routing_key=app.conf.task_default_routing_key,
queue_arguments={
'x-dead-letter-exchange' : app.conf.dead_letter_exchange,
'x-dead-letter-routing-key': app.conf.dead_letter_routing_key
})
class DeclareDLXnDLQ(bootsteps.StartStopStep):
"""
Celery Bootstep to declare the DL exchange and queues before the worker starts
processing tasks
"""
requires = {'celery.worker.components:Pool'}
def start(self, worker):
app = worker.app
# Declare DLX and DLQ
dlx = Exchange(
app.conf.dead_letter_exchange,
type=app.conf.dead_letter_exchange_type)
dead_letter_queue = Queue(
app.conf.dead_letter_queue,
dlx,
routing_key=app.conf.dead_letter_routing_key)
with worker.app.pool.acquire() as conn:
dead_letter_queue.bind(conn).declare()
# Inject the default queue in celery application
app.conf.task_queues = (default_queue,)
# Inject extra bootstep that declares DLX and DLQ
app.steps['worker'].add(DeclareDLXnDLQ)
Don’t do this, this will essentially remove the on_failure
lifecycle event:
def onfailure_reject(requeue=False):
"""
When a task has failed it will raise a Reject exception so
that the message will be requeued or marked for insertation in Dead Letter Exchange
"""
def _decorator(f):
@wraps(f)
def _wrapper(*args, **kwargs):
try:
return f(*args, **kwargs)
except TaskPredicate:
raise # Do not handle TaskPredicate like Retry or Reject
except Exception as e:
print("Rejecting")
raise Reject(str(e), requeue=requeue)
return _wrapper
return _decorator