Source code for fmn.backends.pagure

# SPDX-FileCopyrightText: Contributors to the Fedora Project
#
# SPDX-License-Identifier: MIT

import bisect
import logging
import re
from enum import IntFlag, auto
from functools import cache as ft_cache
from functools import cached_property as ft_cached_property
from typing import TYPE_CHECKING, Any

import sqlalchemy as sa
from cashews import cache
from httpx import URL, QueryParams
from sqlalchemy.ext.asyncio import (
    AsyncEngine,
    AsyncSession,
    async_sessionmaker,
    create_async_engine,
)

from ..cache.util import cache_ttl
from ..core.config import Settings, get_settings
from .base import APIClient, NextPageParams, handle_http_error
from .pagure_models import PagureGroup, PagureUserGroup, Project, ProjectGroup, ProjectUser, User

if TYPE_CHECKING:
    from fedora_messaging.message import Message
    from sqlalchemy.ext.asyncio import AsyncSession

log = logging.getLogger(__name__)


[docs] class PagureRole(IntFlag): OWNER = auto() ADMIN = auto() COMMIT = auto() COLLABORATOR = auto() TICKET = auto() USER_ROLES_MAINTAINER = OWNER | ADMIN | COMMIT | COLLABORATOR USER_ROLES = USER_ROLES_MAINTAINER | TICKET GROUP_ROLES_MAINTAINER = ADMIN | COMMIT | COLLABORATOR GROUP_ROLES = GROUP_ROLES_MAINTAINER | TICKET
[docs] class PagureAsyncProxy(APIClient): """Proxy for the Pagure API endpoints used in FMN. TODO: Drop this implementation once the direct-db implementation has proven itself. (in a few months? let's say sept 2025) """ API_VERSION = "0" PROJECT_TOPIC_RE = re.compile( r"pagure\.project\.(?P<usergroup>user|group)\.(?P<action>access\.updated|added|removed)$" ) @ft_cached_property def api_url(self) -> str: return f"{self.base_url.rstrip('/')}/api/{self.API_VERSION}"
[docs] def determine_next_page_params(self, url: str, params: dict, result: dict) -> NextPageParams: next_url = result.get("pagination", {}).get("next") if next_url: parsed_url = URL(next_url) parsed_query_params = QueryParams(parsed_url.query) next_params = {**params, **parsed_query_params} next_url = str(parsed_url.copy_with(query=None)) return next_url, next_params return None, None
[docs] @cache( ttl=cache_ttl("pagure"), prefix="v1", tags=[ "pagure:get_projects:username={username}:owner={owner}", "pagure:get_projects:username={username}", "pagure:get_projects:owner={owner}", ], ) async def get_projects( self, *, namespace: str | None = None, pattern: str | None = None, username: str | None = None, owner: str | None = None, short: bool = True, fork: bool = False, ) -> list[dict[str, Any]]: params = {"short": short, "fork": fork} if namespace: params["namespace"] = namespace if pattern: params["pattern"] = pattern if username: params["username"] = username if owner: params["owner"] = owner return [ project async for project in self.get_paginated( "/projects", params=params, payload_field="projects" ) ]
[docs] @cache(ttl=cache_ttl("pagure"), prefix="v1") async def get_user_projects(self, *, username: str) -> list[dict[str, Any]]: return [ p for p in await self.get_projects(username=username, short=False) if any( username in p.get("access_users", {}).get(role.name.lower(), []) for role in PagureRole.USER_ROLES_MAINTAINER ) ]
[docs] @handle_http_error(list) @cache( ttl=cache_ttl("pagure"), prefix="v1", tags=["pagure:get_project_users:project_path={project_path}"], ) async def get_project_users( self, *, project_path: str, roles: PagureRole = PagureRole.USER_ROLES_MAINTAINER ) -> list[str]: project = await self.get(project_path) access_users = project.get("access_users", {}) usernames = { username for role in PagureRole for username in access_users.get(role.name.lower(), ()) if role in roles } return sorted(usernames)
[docs] @handle_http_error(list) @cache( ttl=cache_ttl("pagure"), prefix="v1", tags=["pagure:get_project_groups:project_path={project_path}"], ) async def get_project_groups( self, *, project_path: str, roles: PagureRole = PagureRole.GROUP_ROLES_MAINTAINER ) -> list[str]: project = await self.get(project_path) access_groups = project.get("access_groups", {}) groupnames = { groupname for role in PagureRole for groupname in access_groups.get(role.name.lower(), ()) if role in roles } return sorted(groupnames)
[docs] @cache(ttl=cache_ttl("pagure"), prefix="v1", tags=["pagure:get_group_projects:name={name}"]) async def get_group_projects( self, *, name: str, acl: PagureRole | None = None ) -> list[dict[str, Any]]: if not acl: params_seq = ({"projects": True},) else: params_seq = [ {"projects": True, "acl": role.name.lower()} for role in PagureRole.GROUP_ROLES if role & acl ] seen_fullnames = set() sorted_projects = [] for params in params_seq: async for project in self.get_paginated( f"/group/{name}", params=params, payload_field="projects" ): if (fullname := project["fullname"]) in seen_fullnames: continue seen_fullnames.add(fullname) bisect.insort(sorted_projects, project, key=lambda p: p["fullname"]) return sorted_projects
[docs] async def invalidate_on_message(self, message: "Message", db: "AsyncSession") -> None: topic = message.topic topic_match = self.PROJECT_TOPIC_RE.search(topic) if not topic_match: # Bail out early log.debug("Skipping message with topic %s", topic) return # Quick access body = message.body usergroup = topic_match.group("usergroup") action = topic_match.group("action") if not (msg_project := body.get("project")): log.warning("No project info found when processing message") return if not (fullname := msg_project.get("fullname")): log.warning("No full name found for affected project when processing message") return if not (full_url := msg_project.get("full_url")): log.warning("No URL found for affected project when processing message") return if not full_url.startswith(self.base_url_with_trailing_slash): # Different Pagure instance log.debug("Skipping message for different Pagure instance %s", full_url) return # Identify cache entries to be invalidated and create tasks for their deletion if usergroup == "user": # Messages about changes to project users if action == "removed": user = body.get("removed_user") else: user = body.get("new_user") if not user: log.warning("No affected user found when processing message") return del_tags = [ f"pagure:get_project_users:project_path={fullname}", "pagure:get_projects:username=:owner=", f"pagure:get_projects:username={user}", f"pagure:get_projects:owner={user}", ] else: # usergroup == "group" # Messages about changes to project groups if action == "removed": # Messages with topic "project.group.removed" can send a list of groups, but the # code in Pagure sending them guarantees it can be at most one 🤔. group = body.get("removed_groups", [None])[0] else: group = body.get("new_group") if not group: log.warning("No affected group found when processing message") return del_tags = [ f"pagure:get_project_groups:project_path={fullname}", f"pagure:get_group_projects:name={group}", ] try: await cache.delete_tags(*del_tags) except Exception as exc: log.warning("Deleting cache entries yielded an exception: %s", exc)
[docs] class PagureDBProxy: """Proxy for the Pagure DB queries used in FMN""" PROJECT_TOPIC_RE = re.compile( r"pagure\.project\.(?P<usergroup>user|group)\.(?P<action>access\.updated|added|removed)$" ) def __init__(self, engine: AsyncEngine, base_url: str | None = None): self._engine = engine self.Session = async_sessionmaker(self._engine, expire_on_commit=False) self.base_url = base_url.rstrip("/") + "/"
[docs] async def start(self): pass
[docs] async def stop(self): await self._engine.dispose()
[docs] @cache( ttl=cache_ttl("pagure"), prefix="v1", tags=[ "pagure:get_projects:maintainer={maintainer}:owner={owner}", "pagure:get_projects:maintainer={maintainer}", "pagure:get_projects:owner={owner}", ], ) async def get_projects( self, *, namespace: str | None = None, pattern: str | None = None, maintainer: str | None = None, owner: str | None = None, fork: bool = False, ) -> list[dict[str, Any]]: filters = [Project.is_fork == fork] if namespace: filters.append(Project.namespace == namespace) if pattern: if "*" in pattern: filters.append(Project.name.ilike(pattern.replace("*", "%"))) else: filters.append(Project.name == pattern) query = sa.select(Project.id).where(*filters) if owner: query = query.join(User, User.id == Project.user_id).where(User.user == owner) if maintainer: # User created the project query = query.join(User, User.id == Project.user_id).where(User.user == maintainer) permissions = [r.name.lower() for r in PagureRole.USER_ROLES_MAINTAINER] # User got admin or commit right sub_q2 = ( sa.select(Project.id) .join(ProjectUser) .join(User, ProjectUser.user_id == User.id) .where(User.user == maintainer, ProjectUser.access.in_(permissions), *filters) ) # User created a group that has admin or commit right sub_q3 = ( sa.select(Project.id) .join(ProjectGroup, ProjectGroup.project_id == Project.id) .join(PagureGroup, PagureGroup.id == ProjectGroup.group_id) .join(User, PagureGroup.user_id == User.id) .where(User.user == maintainer, ProjectGroup.access.in_(permissions), *filters) ) # User is part of a group that has admin or commit right sub_q4 = ( sa.select(Project.id) .join(ProjectGroup, ProjectGroup.project_id == Project.id) .join(PagureGroup, PagureGroup.id == ProjectGroup.group_id) .join(PagureUserGroup, PagureUserGroup.group_id == PagureGroup.id) .join(User, PagureUserGroup.user_id == User.id) .where( User.user == maintainer, PagureGroup.group_type == "user", ProjectGroup.access.in_(permissions), *filters, ) ) query = sa.union(query, sub_q2, sub_q3, sub_q4) async with self.Session() as session: result = await session.scalars( sa.select(Project).where(Project.id.in_(query.scalar_subquery())) ) return [p.as_dict() for p in result]
[docs] @cache(ttl=cache_ttl("pagure"), prefix="v1") async def get_user_projects(self, *, username: str) -> list[dict[str, Any]]: return await self.get_projects(maintainer=username)
[docs] @cache( ttl=cache_ttl("pagure"), prefix="v1", tags=["pagure:get_project_users:project_path={project_path}"], ) async def get_project_users( self, *, project_path: str, roles: PagureRole = PagureRole.USER_ROLES_MAINTAINER ) -> list[str]: namespace, name = project_path.split("/", 1) project_condition = ( Project.namespace == namespace, Project.name == name, Project.is_fork.is_(False), ) permissions = [r.name.lower() for r in roles] query = ( sa.select(User.user) .join(ProjectUser, ProjectUser.user_id == User.id) .join(Project, Project.id == ProjectUser.project_id) .where(*project_condition, ProjectUser.access.in_(permissions)) ) if PagureRole.OWNER in roles: query = sa.union( query, sa.select(User.user) .join(Project, Project.user_id == User.id) .where(*project_condition), ) async with self.Session() as session: usernames = await session.scalars(query) return sorted(set(usernames))
[docs] @cache( ttl=cache_ttl("pagure"), prefix="v1", tags=["pagure:get_project_groups:project_path={project_path}"], ) async def get_project_groups( self, *, project_path: str, roles: PagureRole = PagureRole.GROUP_ROLES_MAINTAINER ) -> list[str]: namespace, name = project_path.split("/", 1) project_condition = ( Project.namespace == namespace, Project.name == name, Project.is_fork.is_(False), ) permissions = [r.name.lower() for r in roles] query = ( sa.select(PagureGroup.group_name) .join(ProjectGroup, ProjectGroup.group_id == PagureGroup.id) .join(Project, Project.id == ProjectGroup.project_id) .where(*project_condition, ProjectGroup.access.in_(permissions)) .order_by(PagureGroup.group_name) ) async with self.Session() as session: result = await session.scalars(query) return list(result)
[docs] @cache(ttl=cache_ttl("pagure"), prefix="v1", tags=["pagure:get_group_projects:name={name}"]) async def get_group_projects( self, *, name: str, acl: PagureRole | None = None ) -> list[dict[str, Any]]: query = ( sa.select(Project) .join(ProjectGroup, ProjectGroup.project_id == Project.id) .join(PagureGroup, PagureGroup.id == ProjectGroup.group_id) .where(PagureGroup.group_name == name, Project.is_fork.is_(False)) .order_by(Project.namespace, Project.name) ) if acl: permissions = [role.name.lower() for role in PagureRole.GROUP_ROLES if role & acl] query = query.where( ProjectGroup.access.in_(permissions), ) async with self.Session() as session: projects = await session.scalars(query) return [p.as_dict() for p in projects]
[docs] async def invalidate_on_message(self, message: "Message", db: "AsyncSession") -> None: topic = message.topic topic_match = self.PROJECT_TOPIC_RE.search(topic) if not topic_match: # Bail out early log.debug("Skipping message with topic %s", topic) return # Quick access body = message.body usergroup = topic_match.group("usergroup") action = topic_match.group("action") if not (msg_project := body.get("project")): log.warning("No project info found when processing message") return if not (fullname := msg_project.get("fullname")): log.warning("No full name found for affected project when processing message") return if not (full_url := msg_project.get("full_url")): log.warning("No URL found for affected project when processing message") return if not full_url.startswith(self.base_url): # Different Pagure instance log.debug("Skipping message for different Pagure instance %s", full_url) return # Identify cache entries to be invalidated and create tasks for their deletion if usergroup == "user": # Messages about changes to project users if action == "removed": user = body.get("removed_user") else: user = body.get("new_user") if not user: log.warning("No affected user found when processing message") return del_tags = [ f"pagure:get_project_users:project_path={fullname}", "pagure:get_projects:maintainer=:owner=", f"pagure:get_projects:maintainer={user}", f"pagure:get_projects:owner={user}", ] else: # usergroup == "group" # Messages about changes to project groups if action == "removed": # Messages with topic "project.group.removed" can send a list of groups, but the # code in Pagure sending them guarantees it can be at most one 🤔. group = body.get("removed_groups", [None])[0] else: group = body.get("new_group") if not group: log.warning("No affected group found when processing message") return del_tags = [ f"pagure:get_project_groups:project_path={fullname}", f"pagure:get_group_projects:name={group}", ] try: await cache.delete_tags(*del_tags) except Exception as exc: log.warning("Deleting cache entries yielded an exception: %s", exc)
[docs] @ft_cache def get_distgit_proxy(settings: Settings | None = None) -> PagureAsyncProxy: settings = settings or get_settings() if settings.services.distgit_db_url: engine = create_async_engine(settings.services.distgit_db_url, pool_recycle=600) return PagureDBProxy(engine, base_url=settings.services.distgit_url) else: return PagureAsyncProxy(settings.services.distgit_url)