Source code for fmn.sender.irc
# SPDX-FileCopyrightText: Contributors to the Fedora Project
#
# SPDX-License-Identifier: MIT
import asyncio
import logging
from urllib.parse import urlparse
from irc.client import ServerConnectionError
from irc.client_aio import AioSimpleIRCClient
from irc.connection import AioFactory
from .handler import Handler, HandlerError
log = logging.getLogger(__name__)
[docs]class IRCHandler(Handler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._client = IRCClient()
[docs] async def setup(self):
irc_url = urlparse(self._config["irc_url"])
await self._client.connect(
irc_url.hostname,
irc_url.port,
irc_url.username,
password=irc_url.password,
connect_factory=AioFactory(ssl=(irc_url.scheme == "ircs")),
)
log.debug("IRC connection established")
[docs] async def stop(self):
log.debug("Stopping IRC handler...")
await self._client.disconnect()
@property
def closed(self):
return self._client.closed
[docs] async def handle(self, message):
log.info("Sending messsage to %s: %s", message["to"], message["message"])
await self._client.privmsg(message["to"], message["message"])
[docs]class IRCClient(AioSimpleIRCClient):
_shutdown_message = "FMN is shutting down"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._connection_future = None
self.closed = asyncio.get_event_loop().create_future()
[docs] async def connect(self, *args, **kwargs):
self._connection_future = asyncio.Future()
await self.connection.connect(*args, **kwargs)
try:
await self._connection_future
except asyncio.exceptions.CancelledError:
self.connection.disconnect("Connection cancelled")
raise
except ServerConnectionError as e:
message = e.args[0]
self.closed.set_result(message)
raise HandlerError(f"the handler could not connect: {message}") from e
[docs] async def privmsg(self, *args, **kwargs):
# This is not async yet.
return self.connection.privmsg(*args, **kwargs)
[docs] async def disconnect(self):
if self.connection.connected:
return self.connection.disconnect(self._shutdown_message)
def _cancel_or_close(self, message):
"""Cancel or close the futures on shutdown.
Cancel the ``_connection_future`` if it's not done yet, otherwise set the ``closed``
future to signal shutdown.
Args:
message (str): The message for the cancellation or the closed future result.
"""
if self._connection_future.done():
self.closed.set_result(message)
else:
# This will disconnect and set the closed future
self._connection_future.cancel(message)
[docs] def on_disconnect(self, connection, event):
message = event.arguments[0]
if message != self._shutdown_message:
self._cancel_or_close(message)
[docs] def on_error(self, connection, event):
self._cancel_or_close(event.arguments[0])
[docs] def on_nicknameinuse(self, connection, event):
message = f"{event.arguments[0]}: {event.arguments[1]}"
self._connection_future.set_exception(ServerConnectionError(message))
[docs] def on_900(self, connection, event):
# When logged in.
# See IRCv3: https://ircv3.net/specs/extensions/sasl-3.1.html#numerics-used-by-this-extension
self._connection_future.set_result(connection)