Source code for fmn.consumer.send_queue

# SPDX-FileCopyrightText: Contributors to the Fedora Project
#
# SPDX-License-Identifier: MIT

import json
import logging
import sys
import traceback

import backoff
from aio_pika import Message, connect_robust
from aio_pika.exceptions import AMQPConnectionError

from ..core.amqp import get_url_from_config
from ..rules.notification import Notification

log = logging.getLogger(__name__)


[docs]async def backoff_hdlr(details): log.warning("Publishing message failed. Retrying. %s", traceback.format_tb(sys.exc_info()[2])) self = details["args"][0] await self.connect()
[docs]def giveup_hdlr(details): log.error("Publishing message failed. Giving up. %s", traceback.format_tb(sys.exc_info()[2]))
[docs]class SendQueue: def __init__(self, config: dict): self.config = config self._url = get_url_from_config(config).update_query( connection_name="FMN consumer to sender" ) self._connection = None self._channel = None self._exchange = None
[docs] async def connect(self): self._connection = await connect_robust(self._url) self._channel = await self._connection.channel() self._exchange = await self._channel.get_exchange("amq.direct")
[docs] @backoff.on_exception( backoff.expo, AMQPConnectionError, max_tries=3, on_backoff=backoff_hdlr, on_giveup=giveup_hdlr, ) async def send(self, notification: Notification): body = json.dumps(notification.content.dict()) await self._exchange.publish( Message(body=body.encode("utf-8")), routing_key=f"send.{notification.protocol}", )
[docs] async def close(self): if self._connection: await self._connection.close()