Source code for fmn.sender.matrix

# SPDX-FileCopyrightText: Contributors to the Fedora Project
#
# SPDX-License-Identifier: MIT

import asyncio
import logging
from contextlib import suppress

from nio import AsyncClient

from .handler import Handler

log = logging.getLogger(__name__)


[docs]class MatrixHandler(Handler):
[docs] async def setup(self): self._user_id = self._config["user_id"] self._client = AsyncClient(self._config["host"], self._user_id) # Token login self._client.user_id = self._user_id self._client.access_token = self._config["token"] self._client.device_id = "FMN" log.debug("Establishing connection to Matrix") await self._client.sync(timeout=30000, full_state=True, set_presence="online") log.debug("Synchronized") self._dm_rooms_cache = {} await self.update_dm_rooms_cache() log.debug("Room list updated") self.loop = asyncio.get_event_loop() # Periodically refresh the DM rooms cache self._dm_rooms_cache_refresh_task = self.loop.create_task(self.refresh_dm_rooms_cache())
[docs] async def stop(self): log.debug("Stopping Matrix handler...") self._dm_rooms_cache_refresh_task.cancel() with suppress(asyncio.TimeoutError, asyncio.CancelledError): await asyncio.wait_for(self._dm_rooms_cache_refresh_task, 1) await self._client.disconnect()
[docs] async def handle(self, message): log.info("Sending message to %s: %s", message["to"], message["message"]) room_id = await self.get_dm_room(message["to"]) await self.send_dm(room_id, message["message"]) await self._client.sync(timeout=30000)
[docs] async def update_dm_rooms_cache(self): dm_rooms = {} resp = await self._client.joined_rooms() for room_id in resp.rooms: resp = await self._client.joined_members(room_id) if resp.members and len(resp.members) == 2: if resp.members[0].user_id == self._user_id: # sndr = resp.members[0] rcvr = resp.members[1] elif resp.members[1].user_id == self._user_id: # sndr = resp.members[1] rcvr = resp.members[0] else: continue dm_rooms[rcvr.user_id] = room_id self._dm_rooms_cache = dm_rooms
[docs] async def get_dm_room(self, dest): with suppress(KeyError): return self._dm_rooms_cache[dest] room_id = await self.create_dm_room(dest) self._dm_rooms_cache[dest] = room_id return room_id
[docs] async def create_dm_room(self, dest): resp = await self._client.room_create(is_direct=True, invite=[dest]) return resp.room_id
[docs] async def send_dm(self, room_id, message): await self._client.room_send( room_id, message_type="m.room.message", content={"msgtype": "m.text", "body": message} )
[docs] async def refresh_dm_rooms_cache(self): while True: # Do it every day. Sleep first because we're already running it in setup() await asyncio.sleep(3600 * 24) await self.update_dm_rooms_cache()