Source code for fmn.sender.handler
# SPDX-FileCopyrightText: Contributors to the Fedora Project
#
# SPDX-License-Identifier: MIT
import asyncio
from functools import cached_property
[docs]class Handler:
def __init__(self, config):
self._config = config
[docs] async def setup(self):
# Here we connect to the destination server if relevant.
pass
[docs] async def stop(self):
pass
@cached_property
def closed(self):
"""Default `closed` Future, can be overridden in child classes.
It should be triggered when there is an error and the app should stop.
"""
return asyncio.get_event_loop().create_future()
[docs] async def handle(self, message):
raise NotImplementedError
[docs]class HandlerError(Exception):
pass
[docs]class PrintHandler(Handler):
[docs] async def handle(self, message):
print("Received:", message)