Celery Task Lifecycle and Dead Letter Queue in Rabbitmq

When configuring the DLQ support:

app = Celery(
  ...
  task_cls=BaseTask,
class BaseTask
  Request = 'celery.worker.request:Request'
  request: Request

  def on_failure(self, exc, task_id, args, kwargs, einfo):  # pylint: disable=too-many-arguments
    headers = self.request.get('headers')
    if not headers:
      return
    ... 
    if not isinstance(exc, TaskPredicate):
      # TODO find a way to send this to DLQ without raising a warning in the worker
      raise Reject(str(exc), requeue=False) from exc  
default_exchange = Exchange(
  app.conf.task_default_exchange,
  type=app.conf.task_default_exchange_type)
default_queue = Queue(
  app.conf.task_default_queue,
  default_exchange,
  routing_key=app.conf.task_default_routing_key,
  queue_arguments={
    'x-dead-letter-exchange'   : app.conf.dead_letter_exchange,
    'x-dead-letter-routing-key': app.conf.dead_letter_routing_key
  })
class DeclareDLXnDLQ(bootsteps.StartStopStep):
  """
  Celery Bootstep to declare the DL exchange and queues before the worker starts
      processing tasks
  """
  requires = {'celery.worker.components:Pool'}

  def start(self, worker):
    app = worker.app

    # Declare DLX and DLQ
    dlx = Exchange(
      app.conf.dead_letter_exchange,
      type=app.conf.dead_letter_exchange_type)

    dead_letter_queue = Queue(
      app.conf.dead_letter_queue,
      dlx,
      routing_key=app.conf.dead_letter_routing_key)

    with worker.app.pool.acquire() as conn:
      dead_letter_queue.bind(conn).declare()
# Inject the default queue in celery application
app.conf.task_queues = (default_queue,)

# Inject extra bootstep that declares DLX and DLQ
app.steps['worker'].add(DeclareDLXnDLQ)

Don’t do this, this will essentially remove the on_failure lifecycle event:

def onfailure_reject(requeue=False):
    """
    When a task has failed it will raise a Reject exception so
    that the message will be requeued or marked for insertation in Dead Letter Exchange
    """

    def _decorator(f):
        @wraps(f)
        def _wrapper(*args, **kwargs):

            try:
                return f(*args, **kwargs)
            except TaskPredicate:
                raise   # Do not handle TaskPredicate like Retry or Reject
            except Exception as e:
                print("Rejecting")
                raise Reject(str(e), requeue=requeue)
        return _wrapper

    return _decorator