Source code for fmn.backends.pagure

# SPDX-FileCopyrightText: Contributors to the Fedora Project
#
# SPDX-License-Identifier: MIT

import asyncio
import bisect
import logging
import re
from enum import IntFlag, auto
from functools import cache as ft_cache
from functools import cached_property as ft_cached_property
from itertools import chain
from typing import TYPE_CHECKING, Any

from cashews import cache
from httpx import URL, QueryParams

from ..cache.util import cache_ttl, get_pattern_for_cached_calls
from ..core.config import get_settings
from .base import APIClient, NextPageParams, handle_http_error

if TYPE_CHECKING:
    from fedora_messaging.message import Message

log = logging.getLogger(__name__)


[docs]class PagureRole(IntFlag): OWNER = auto() ADMIN = auto() COMMIT = auto() COLLABORATOR = auto() TICKET = auto() USER_ROLES_MAINTAINER = OWNER | ADMIN | COMMIT | COLLABORATOR USER_ROLES = USER_ROLES_MAINTAINER | TICKET GROUP_ROLES_MAINTAINER = ADMIN | COMMIT | COLLABORATOR GROUP_ROLES = GROUP_ROLES_MAINTAINER | TICKET
# Python < 3.11 doesn’t allow iterating over combined flag values PagureRole.USER_ROLES_MAINTAINER_SET = { role for role in PagureRole if role.bit_count() == 1 and role & PagureRole.USER_ROLES_MAINTAINER } PagureRole.USER_ROLES_SET = { role for role in PagureRole if role.bit_count() == 1 and role & PagureRole.USER_ROLES } PagureRole.GROUP_ROLES_MAINTAINER_SET = { role for role in PagureRole if role.bit_count() == 1 and role & PagureRole.GROUP_ROLES_MAINTAINER } PagureRole.GROUP_ROLES_SET = { role for role in PagureRole if role.bit_count() == 1 and role & PagureRole.GROUP_ROLES }
[docs]class PagureAsyncProxy(APIClient): """Proxy for the FASJSON API endpoints used in FMN""" API_VERSION = "0" PROJECT_TOPIC_RE = re.compile( r"pagure\.project\.(?P<usergroup>user|group)\.(?P<action>access\.updated|added|removed)$" ) @ft_cached_property def api_url(self) -> str: return f"{self.base_url.rstrip('/')}/api/{self.API_VERSION}"
[docs] def determine_next_page_params(self, url: str, params: dict, result: dict) -> NextPageParams: next_url = result.get("pagination", {}).get("next") if next_url: parsed_url = URL(next_url) parsed_query_params = QueryParams(parsed_url.query) next_params = {**params, **parsed_query_params} next_url = str(parsed_url.copy_with(query=None)) return next_url, next_params return None, None
[docs] @handle_http_error(list) @cache(ttl=cache_ttl("pagure"), prefix="v1") async def get_projects( self, *, namespace: str | None = None, pattern: str | None = None, username: str | None = None, owner: str | None = None, short: bool = True, fork: bool = False, ) -> list[dict[str, Any]]: params = {"short": short, "fork": fork} if namespace: params["namespace"] = namespace if pattern: params["pattern"] = pattern if username: params["username"] = username if owner: params["owner"] = owner return [ project async for project in self.get_paginated( "/projects", params=params, payload_field="projects" ) ]
[docs] @handle_http_error(list) @cache(ttl=cache_ttl("pagure"), prefix="v1") async def get_user_projects(self, *, username: str) -> list[dict[str, Any]]: return [ p for p in await self.get_projects(username=username, short=False) if any( username in p.get("access_users", {}).get(role.name.lower(), []) for role in PagureRole.USER_ROLES_MAINTAINER_SET ) ]
[docs] @handle_http_error(list) @cache(ttl=cache_ttl("pagure"), prefix="v1") async def get_project_users( self, *, project_path: str, roles: PagureRole = PagureRole.USER_ROLES_MAINTAINER ) -> list[str]: project = await self.get(project_path) access_users = project.get("access_users", {}) usernames = { username for role in PagureRole for username in access_users.get(role.name.lower(), ()) if role in roles } return sorted(usernames)
[docs] @handle_http_error(list) @cache(ttl=cache_ttl("pagure"), prefix="v1") async def get_project_groups( self, *, project_path: str, roles: PagureRole = PagureRole.GROUP_ROLES_MAINTAINER ) -> list[str]: project = await self.get(project_path) access_groups = project.get("access_groups", {}) groupnames = { groupname for role in PagureRole for groupname in access_groups.get(role.name.lower(), ()) if role in roles } return sorted(groupnames)
[docs] @handle_http_error(list) @cache(ttl=cache_ttl("pagure"), prefix="v1") async def get_group_projects( self, *, name: str, acl: PagureRole | None = None ) -> list[dict[str, Any]]: if not acl: params_seq = ({"projects": True},) else: params_seq = [ {"projects": True, "acl": role.name.lower()} for role in PagureRole.GROUP_ROLES_SET if role & acl ] seen_fullnames = set() sorted_projects = [] for params in params_seq: async for project in self.get_paginated( f"/group/{name}", params=params, payload_field="projects" ): if (fullname := project["fullname"]) in seen_fullnames: continue seen_fullnames.add(fullname) bisect.insort(sorted_projects, project, key=lambda p: p["fullname"]) return sorted_projects
[docs] async def invalidate_on_message(self, message: "Message") -> None: topic = message.topic topic_match = self.PROJECT_TOPIC_RE.search(topic) if not topic_match: # Bail out early log.debug("Skipping message with topic %s", topic) return # Quick access body = message.body usergroup = topic_match.group("usergroup") action = topic_match.group("action") if not (msg_project := body.get("project")): log.warning("No project info found when processing message") return if not (fullname := msg_project.get("fullname")): log.warning("No full name found for affected project when processing message") return if not (full_url := msg_project.get("full_url")): log.warning("No URL found for affected project when processing message") return if not full_url.startswith(self.base_url_with_trailing_slash): # Different Pagure instance log.debug("Skipping message for different Pagure instance %s", full_url) return # These keys describe cache entries to be deleted del_keys = [] # Identify cache entries to be invalidated and create tasks for their deletion if usergroup == "user": # Messages about changes to project users if action == "removed": user = body.get("removed_user") else: user = body.get("new_user") if not user: log.warning("No affected user found when processing message") return del_keys = [ key for pattern in chain( get_pattern_for_cached_calls( self.get_project_users, self=self, project_path=fullname ), get_pattern_for_cached_calls( self.get_projects, self=self, username=None, owner=None ), get_pattern_for_cached_calls(self.get_projects, self=self, username=user), get_pattern_for_cached_calls(self.get_projects, self=self, owner=user), ) async for key, _ in cache.get_match(pattern) ] else: # usergroup == "group" # Messages about changes to project groups if action == "removed": # Messages with topic "project.group.removed" can send a list of groups, but the # code in Pagure sending them guarantees it can be at most one 🤔. group = body.get("removed_groups", [None])[0] else: group = body.get("new_group") if not group: log.warning("No affected group found when processing message") return del_keys = [ key for pattern in chain( get_pattern_for_cached_calls( self.get_project_groups, self=self, project_path=fullname ), get_pattern_for_cached_calls(self.get_group_projects, self=self, name=group), ) async for key, _ in cache.get_match(pattern) ] # Delete the things in parallel del_tasks = [asyncio.create_task(cache.delete(key)) for key in del_keys] del_results = await asyncio.gather(*del_tasks, return_exceptions=True) # Follow-up care if exceptions_in_results := [ result for result in del_results if isinstance(result, Exception) ]: log.warning( "Deleting %d cache entries yielded %d exception(s):", len(del_results), len(exceptions_in_results), ) for exc in exceptions_in_results: log.warning("\t%r", exc)
[docs]@ft_cache def get_distgit_proxy() -> PagureAsyncProxy: return PagureAsyncProxy(get_settings().services.distgit_url)