Source code for fmn.backends.fasjson
# SPDX-FileCopyrightText: Contributors to the Fedora Project
#
# SPDX-License-Identifier: MIT
import asyncio
import logging
import re
from functools import cache as ft_cache
from functools import cached_property as ft_cached_property
from itertools import chain
from typing import TYPE_CHECKING, Any
from cashews import cache
from httpx_gssapi import HTTPSPNEGOAuth
from ..cache.util import cache_ttl, get_pattern_for_cached_calls
from ..core.config import get_settings
from .base import APIClient, NextPageParams, handle_http_error
if TYPE_CHECKING:
from fedora_messaging.message import Message
log = logging.getLogger(__name__)
[docs]class FASJSONAsyncProxy(APIClient):
"""Proxy for the FASJSON API endpoints used in FMN"""
API_VERSION = "v1"
FAS_TOPIC_RE = re.compile(
r"fas\.(?P<usergroup>user|group)\.(?P<event>member\.sponsor|create|update)$"
)
payload_field = "result"
def __init__(self, base_url: str) -> None:
super().__init__(base_url=base_url, auth=HTTPSPNEGOAuth())
@ft_cached_property
def api_url(self) -> str:
return f"{self.base_url.rstrip('/')}/{self.API_VERSION}"
[docs] def determine_next_page_params(self, url: str, params: dict, result: dict) -> NextPageParams:
if "page" in result and "page_number" in result["page"] and "total_pages" in result["page"]:
page_number = result["page"]["page_number"]
if page_number < result["page"]["total_pages"]:
params["page_number"] = page_number + 1
return url, params
return None, None
[docs] @cache(ttl=cache_ttl("fasjson"), prefix="v1")
async def search_users(
self,
username: str | None = None,
username__exact: str | None = None,
**params: dict[str, Any],
) -> list[dict]:
if username:
params["username"] = username
if username__exact:
params["username__exact"] = username__exact
return [user async for user in self.get_paginated("/search/users/", params=params)]
[docs] @cache(ttl=cache_ttl("fasjson"), prefix="v1")
async def get_user(self, *, username: str) -> dict:
return await self.get_payload(f"/users/{username}/")
[docs] @handle_http_error(list)
@cache(ttl=cache_ttl("fasjson"), prefix="v1")
async def get_user_groups(self, *, username: str) -> dict:
return await self.get_payload(f"/users/{username}/groups/")
[docs] async def invalidate_on_message(self, message: "Message") -> None:
if not self.FAS_TOPIC_RE.search(message.topic):
# Bail out early
log.debug("Skipping message with topic %s", message.topic)
return
if not (msg_user := message.body.get("user")):
log.warning("No information found about affected user")
return
del_keys = [
key
for pattern in chain(
get_pattern_for_cached_calls(self.search_users, self=self, username__exact=None),
get_pattern_for_cached_calls(
self.search_users, self=self, username__exact=msg_user
),
get_pattern_for_cached_calls(self.get_user, self=self, username=msg_user),
get_pattern_for_cached_calls(self.get_user_groups, self=self, username=msg_user),
)
async for key, _ in cache.get_match(pattern)
]
# Delete the things in parallel
del_tasks = [asyncio.create_task(cache.delete(key)) for key in del_keys]
del_results = await asyncio.gather(*del_tasks, return_exceptions=True)
# Follow-up care
if exceptions_in_results := [
result for result in del_results if isinstance(result, Exception)
]:
log.warning(
"Deleting %d cache entries yielded %d exception(s):",
len(del_results),
len(exceptions_in_results),
)
for exc in exceptions_in_results:
log.warning("\t%r", exc)
[docs]@ft_cache
def get_fasjson_proxy() -> FASJSONAsyncProxy:
return FASJSONAsyncProxy(get_settings().services.fasjson_url)