fmn.backends.pagure module

class fmn.backends.pagure.PagureAsyncProxy(base_url: str | None = None, **kwargs)[source]

Bases: APIClient

Proxy for the Pagure API endpoints used in FMN.

TODO: Drop this implementation once the direct-db implementation has proven itself. (in a few months? let’s say sept 2025)

API_VERSION = '0'
PROJECT_TOPIC_RE = re.compile('pagure\\.project\\.(?P<usergroup>user|group)\\.(?P<action>access\\.updated|added|removed)$')
property api_url: str
determine_next_page_params(url: str, params: dict, result: dict) tuple[str, dict] | tuple[None, None][source]

Determine parameters for next page.

Parameters:
  • url – API endpoint URL

  • params – Query parameters (can be modified)

  • result – Result dictionary of previous query

Returns:

Tuple of (new URL, new params dict) or (None, None) if last page

async get_group_projects(*, name: str, acl: PagureRole | None = None) list[dict[str, Any]][source]
async get_project_groups(*, project_path: str, roles: PagureRole = <PagureRole.GROUP_ROLES_MAINTAINER: 14>) list[str][source]
async get_project_users(*, project_path: str, roles: PagureRole = <PagureRole.USER_ROLES_MAINTAINER: 15>) list[str][source]
async get_projects(*, namespace: str | None = None, pattern: str | None = None, username: str | None = None, owner: str | None = None, short: bool = True, fork: bool = False) list[dict[str, Any]][source]
async get_user_projects(*, username: str) list[dict[str, Any]][source]
async invalidate_on_message(message: Message, db: AsyncSession) None[source]
class fmn.backends.pagure.PagureDBProxy(engine: AsyncEngine, base_url: str | None = None)[source]

Bases: object

Proxy for the Pagure DB queries used in FMN

PROJECT_TOPIC_RE = re.compile('pagure\\.project\\.(?P<usergroup>user|group)\\.(?P<action>access\\.updated|added|removed)$')
async get_group_projects(*, name: str, acl: PagureRole | None = None) list[dict[str, Any]][source]
async get_project_groups(*, project_path: str, roles: PagureRole = <PagureRole.GROUP_ROLES_MAINTAINER: 14>) list[str][source]
async get_project_users(*, project_path: str, roles: PagureRole = <PagureRole.USER_ROLES_MAINTAINER: 15>) list[str][source]
async get_projects(*, namespace: str | None = None, pattern: str | None = None, maintainer: str | None = None, owner: str | None = None, fork: bool = False) list[dict[str, Any]][source]
async get_user_projects(*, username: str) list[dict[str, Any]][source]
async invalidate_on_message(message: Message, db: AsyncSession) None[source]
async start()[source]
async stop()[source]
class fmn.backends.pagure.PagureRole(*values)[source]

Bases: IntFlag

ADMIN = 2
COLLABORATOR = 8
COMMIT = 4
GROUP_ROLES = 30
GROUP_ROLES_MAINTAINER = 14
OWNER = 1
TICKET = 16
USER_ROLES = 31
USER_ROLES_MAINTAINER = 15
fmn.backends.pagure.get_distgit_proxy(settings: Settings | None = None) PagureAsyncProxy[source]