Source code for fmn.sender.handler

# SPDX-FileCopyrightText: Contributors to the Fedora Project
#
# SPDX-License-Identifier: MIT

import asyncio
from functools import cached_property


[docs]class Handler: def __init__(self, config): self._config = config
[docs] async def setup(self): # Here we connect to the destination server if relevant. pass
[docs] async def stop(self): pass
@cached_property def closed(self): """Default `closed` Future, can be overridden in child classes. It should be triggered when there is an error and the app should stop. """ return asyncio.get_event_loop().create_future()
[docs] async def handle(self, message): raise NotImplementedError
[docs]class HandlerError(Exception): pass
[docs]class PrintHandler(Handler):
[docs] async def handle(self, message): print("Received:", message)