import os
import pika
username = os.getenv('RABBITMQ_USER')
password = os.getenv('RABBITMQ_PASS')
rabbitmq = os.getenv('RABBITMQ_HOST')
credentials = pika.PlainCredentials(username=username, password=password)
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=rabbitmq,
credentials=credentials,
))
channel = connection.channel()
dlq_queue = 'dlq'
task_queue = 'celery'
import os
import sys
from pika import BasicProperties
from pika.adapters.blocking_connection import BlockingChannel
from pika.spec import Basic
from rabbitmq_connection import connection, task_queue
def main():
from rabbitmq_connection import channel, dlq_queue
def callback(ch: BlockingChannel, method: Basic.Deliver, properties: BasicProperties, body: bytes):
send_to_task_queue(body=body, properties=properties)
ch.basic_ack(delivery_tag=method.delivery_tag)
info = {
'task_name': properties.headers['task'],
'date': properties.headers['x-death'][0]['time'].isoformat(),
'retry_count': properties.headers['x-death'][0]['count'],
}
print(f'message requeued: {info}')
def send_to_task_queue(body: bytes, properties: BasicProperties):
channel.basic_publish(
exchange='',
routing_key=task_queue,
body=body,
properties=properties
)
channel.basic_consume(
queue=dlq_queue,
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()