Source code for fmn.sender.consumer

# SPDX-FileCopyrightText: Contributors to the Fedora Project
#
# SPDX-License-Identifier: MIT

import json
import logging

from aio_pika import connect_robust

from ..core.amqp import get_url_from_config

log = logging.getLogger(__name__)


CLOSING = object()


[docs]class Consumer: def __init__(self, config, handler): self._destination = config["queue"] self._url = get_url_from_config(config).update_query( connection_name=f"FMN sender on {self._destination}" ) self._handler = handler self._connection = None self._channel = None self._queue = None self._queue_iter = None
[docs] async def connect(self): self._connection = await connect_robust(self._url) self._channel = await self._connection.channel() self._queue = await self._channel.declare_queue( self._destination, durable=True, auto_delete=False, exclusive=False ) await self._queue.bind("amq.direct", f"send.{self._destination}")
[docs] async def start(self): log.info("Starting consuming messages") async with self._queue.iterator() as self._queue_iter: async for message in self._queue_iter: if message == CLOSING: break async with message.process(): await self._handler.handle(json.loads(message.body)) await self._queue_iter.close()
[docs] async def stop(self): log.info("Stopping messages consumption") if self._queue_iter: await self._queue_iter.on_message(CLOSING) # Close the queue now or closing the connection just below will cancel the iterator # and raise an exception in start() await self._queue_iter.close() if self._connection: await self._connection.close() await self._handler.stop()