Source code for fmn.backends.fasjson

# SPDX-FileCopyrightText: Contributors to the Fedora Project
#
# SPDX-License-Identifier: MIT

import logging
import re
from functools import cache as ft_cache
from functools import cached_property as ft_cached_property
from typing import TYPE_CHECKING, Any

import httpx
from cashews import cache
from httpx_gssapi import HTTPSPNEGOAuth

from ..cache.util import cache_ttl
from ..core.config import Settings, get_settings
from .base import APIClient, NextPageParams, handle_http_error

if TYPE_CHECKING:
    from fedora_messaging.message import Message
    from sqlalchemy.ext.asyncio import AsyncSession

log = logging.getLogger(__name__)


[docs] class FASJSONAsyncProxy(APIClient): """Proxy for the FASJSON API endpoints used in FMN""" API_VERSION = "v1" FAS_TOPIC_RE = re.compile( r"fas\.(?P<usergroup>user|group)\.(?P<event>member\.sponsor|create|update)$" ) payload_field = "result" def __init__(self, base_url: str) -> None: super().__init__(base_url=base_url, auth=HTTPSPNEGOAuth()) @ft_cached_property def api_url(self) -> str: return f"{self.base_url.rstrip('/')}/{self.API_VERSION}"
[docs] def determine_next_page_params(self, url: str, params: dict, result: dict) -> NextPageParams: if "page" in result and "page_number" in result["page"] and "total_pages" in result["page"]: page_number = result["page"]["page_number"] if page_number < result["page"]["total_pages"]: params["page_number"] = page_number + 1 return url, params return None, None
[docs] @cache( ttl=cache_ttl("fasjson"), prefix="v1", tags=["fasjson:search_users:username__exact={username__exact}"], ) async def search_users( self, username: str | None = None, username__exact: str | None = None, **params: dict[str, Any], ) -> list[dict]: if username: params["username"] = username if username__exact: params["username__exact"] = username__exact return [user async for user in self.get_paginated("/search/users/", params=params)]
[docs] @cache(ttl=cache_ttl("fasjson"), prefix="v1", tags=["get_user:username={username}"]) async def get_user(self, *, username: str) -> dict | None: try: return await self.get_payload(f"/users/{username}/") except httpx.HTTPStatusError as e: if e.response.status_code == 404: return None raise
[docs] @handle_http_error(list) @cache(ttl=cache_ttl("fasjson"), prefix="v1") async def get_user_groups(self, *, username: str) -> dict: return await self.get_payload(f"/users/{username}/groups/")
[docs] async def invalidate_on_message(self, message: "Message", db: "AsyncSession") -> None: if not self.FAS_TOPIC_RE.search(message.topic): # Bail out early log.debug("Skipping message with topic %s", message.topic) return if not (msg_user := message.body.get("msg", {}).get("user")): log.warning("No information found about affected user") return try: await cache.delete_tags( "fasjson:search_users:username__exact=", f"fasjson:search_users:username__exact={msg_user}", f"fasjson:get_user:username={msg_user}", f"fasjson:get_user_groups:username={msg_user}", ) except Exception as exc: log.warning("Deleting cache entries yielded an exception: %s", exc)
[docs] @ft_cache def get_fasjson_proxy(settings: Settings | None = None) -> FASJSONAsyncProxy: settings = settings or get_settings() return FASJSONAsyncProxy(settings.services.fasjson_url)