Fedora Messaging Notifications

fmn is a family of systems to manage end-user notifications triggered by fedora-messaging, it provides a single place for all applications using fedora-messaging to notify users of events.

Documentation

Installation

User Guide

fmn package

Subpackages

fmn.api package

Subpackages
fmn.api.handlers package
Submodules
fmn.api.handlers.admin module
async fmn.api.handlers.admin.get_rules(disabled: bool | None = None, username: str | None = None, identity: Identity = Depends(get_identity_admin), db_session: AsyncSession = Depends(gen_db_session))[source]
async fmn.api.handlers.admin.get_users(search: str | None = None, identity: Identity = Depends(get_identity_admin), db_session: AsyncSession = Depends(gen_db_session))[source]
async fmn.api.handlers.admin.patch_rule(id: int, rule: RulePatch, identity: Identity = Depends(get_identity_admin), db_session: AsyncSession = Depends(gen_db_session))[source]
fmn.api.handlers.misc module
fmn.api.handlers.misc.get_applications()[source]
async fmn.api.handlers.misc.get_artifacts(names: list[str] = Query([]), users: list[str] = Query([]), groups: list[str] = Query([]), distgit_proxy: PagureAsyncProxy = Depends(get_distgit_proxy))[source]

This handler queries artifacts from Pagure

Proxying Pagure queries lets the API cache results to reduce load on the backend service.

Parameters:
  • names – Name patterns of artifacts which should be returned

  • users – Names of users whose artifacts should be returned

  • groups – Names of groups whose artifacts should be returned

async fmn.api.handlers.misc.liveness_check()[source]
async fmn.api.handlers.misc.readiness_check(db_session: AsyncSession = Depends(gen_db_session))[source]
fmn.api.handlers.users module
async fmn.api.handlers.users.create_user_rule(username, rule: NewRule, identity: Identity = Depends(IdentityFactory), db_session: AsyncSession = Depends(gen_db_session))[source]
async fmn.api.handlers.users.delete_user_rule(username: str, id: int, identity: Identity = Depends(IdentityFactory), db_session: AsyncSession = Depends(gen_db_session))[source]
async fmn.api.handlers.users.edit_user_rule(username: str, id: int, rule: Rule, identity: Identity = Depends(IdentityFactory), db_session: AsyncSession = Depends(gen_db_session))[source]
async fmn.api.handlers.users.get_me(identity: Identity = Depends(IdentityFactory), db_session: AsyncSession = Depends(gen_db_session))[source]
async fmn.api.handlers.users.get_user_destinations(username, fasjson_proxy: FASJSONAsyncProxy = Depends(get_fasjson_proxy))[source]
async fmn.api.handlers.users.get_user_groups(username, fasjson_proxy: FASJSONAsyncProxy = Depends(get_fasjson_proxy))[source]
async fmn.api.handlers.users.get_user_info(username, fasjson_proxy: FASJSONAsyncProxy = Depends(get_fasjson_proxy))[source]
async fmn.api.handlers.users.get_user_rule(username: str, id: int, identity: Identity = Depends(IdentityFactory), db_session: AsyncSession = Depends(gen_db_session))[source]
async fmn.api.handlers.users.get_user_rules(username, identity: Identity = Depends(IdentityFactory), db_session: AsyncSession = Depends(gen_db_session))[source]
async fmn.api.handlers.users.get_users(search: str | None = None, identity: Identity = Depends(IdentityFactory), fasjson_proxy: FASJSONAsyncProxy = Depends(get_fasjson_proxy))[source]
fmn.api.handlers.utils module
fmn.api.handlers.utils.db_rule_from_api_rule(rule, user)[source]
Submodules
fmn.api.api_models module
class fmn.api.api_models.Artifact(*, type: ArtifactType, name: str)[source]

Bases: BaseModel

name: str
type: ArtifactType
class fmn.api.api_models.ArtifactOptionsGroup(*, label: str, options: list[fmn.api.api_models.Option[Artifact]])[source]

Bases: BaseModel

label: str
options: list[fmn.api.api_models.Option[Artifact]]
class fmn.api.api_models.ArtifactsFollowedTrackingRule(*, name: Literal['artifacts-followed'], params: list[dict[Literal['name', 'type'], str]])[source]

Bases: BaseModel

name: Literal['artifacts-followed']
params: list[dict[Literal['name', 'type'], str]]
class fmn.api.api_models.BaseModel[source]

Bases: BaseModel

class Config[source]

Bases: object

orm_mode = True
class fmn.api.api_models.Destination(*, protocol: str, address: str)[source]

Bases: BaseModel

address: str
classmethod address_format(v, values)[source]
protocol: str
class fmn.api.api_models.Filters(*, applications: list[str] = [], severities: list[str] = [], topic: str | None = None, my_actions: bool = False)[source]

Bases: BaseModel

applications: list[str]
my_actions: bool
severities: list[str]
topic: str | None
class fmn.api.api_models.GRGetterDict(obj: Any)[source]

Bases: GetterDict

get(key: str, default: Any) Any[source]
class fmn.api.api_models.GenerationRule(*, id: int | None = None, destinations: list[fmn.api.api_models.Destination], filters: Filters)[source]

Bases: BaseModel

class Config[source]

Bases: object

getter_dict

alias of GRGetterDict

destinations: list[fmn.api.api_models.Destination]
filters: Filters
id: int | None
class fmn.api.api_models.ListParamTrackingRule(*, name: Literal['artifacts-owned', 'artifacts-group-owned', 'users-followed'], params: list[str])[source]

Bases: BaseModel

name: Literal['artifacts-owned', 'artifacts-group-owned', 'users-followed']
params: list[str]
class fmn.api.api_models.NewRule(*, name: str | None = None, disabled: bool = False, tracking_rule: ListParamTrackingRule | NoParamTrackingRule | ArtifactsFollowedTrackingRule, generation_rules: list[fmn.api.api_models.GenerationRule])[source]

Bases: BaseModel

disabled: bool
generation_rules: list[fmn.api.api_models.GenerationRule]
name: str | None
tracking_rule: ListParamTrackingRule | NoParamTrackingRule | ArtifactsFollowedTrackingRule
class fmn.api.api_models.NoParamTrackingRule(*, name: Literal['related-events'], params: str | None = None)[source]

Bases: BaseModel

name: Literal['related-events']
params: str | None
class fmn.api.api_models.Option(*, label: str, value: T = None)[source]

Bases: GenericModel, Generic[T]

label: str
value: T
class fmn.api.api_models.Rule(*, name: str | None = None, disabled: bool = False, tracking_rule: ListParamTrackingRule | NoParamTrackingRule | ArtifactsFollowedTrackingRule, generation_rules: list[fmn.api.api_models.GenerationRule], id: int, user: User, generated_last_week: int = 0)[source]

Bases: NewRule

generated_last_week: int
id: int
user: User
class fmn.api.api_models.RulePatch(*, disabled: bool | None = None)[source]

Bases: BaseModel

disabled: bool | None
class fmn.api.api_models.User(*, id: int | None = None, name: str, is_admin: bool = False)[source]

Bases: BaseModel

id: int | None
is_admin: bool
name: str
fmn.api.auth module
class fmn.api.auth.Identity(*, name: str, admin: bool, expires_at: float, user_info: dict[str, Any])[source]

Bases: BaseModel

class Config[source]

Bases: object

extra = 'ignore'
admin: bool
classmethod client() AsyncClient[source]
expires_at: float
async classmethod from_oidc_token(token: str) Identity[source]
name: str
user_info: dict[str, Any]
class fmn.api.auth.IdentityFactory(optional=False)[source]

Bases: object

async process_oidc_auth(creds: HTTPAuthorizationCredentials | None) Identity | None[source]
exception fmn.api.auth.TokenExpired[source]

Bases: ValueError

async fmn.api.auth.get_identity_admin(identity: Identity = Depends(IdentityFactory))[source]
fmn.api.cli module
fmn.api.database module
async fmn.api.database.gen_db_session() Iterator[AsyncSession][source]

Generate database sessions for FastAPI request handlers.

This lets users declare the session as a dependency in request handler functions, e.g.:

@app.get("/path")
def process_path(db_session: AsyncSession = Depends(gen_db_session)):
    query = select(Model).filter_by(...)
    result = await db_session.execute(query)
    ...
Returns:

A sqlalchemy.ext.asyncio.AsyncSession object for the current request

fmn.api.main module
fmn.api.main.configure_cache_on_startup()[source]
async fmn.api.main.global_execution_handler(request: Request, exc: Exception) Callable[[MutableMapping[str, Any], Callable[[], Awaitable[MutableMapping[str, Any]]], Callable[[MutableMapping[str, Any]], Awaitable[None]]], Awaitable[None]][source]
async fmn.api.main.init_model()[source]
fmn.api.messaging module
fmn.api.messaging.backoff_hdlr(details)[source]
fmn.api.messaging.giveup_hdlr(details)[source]
async fmn.api.messaging.publish(message)[source]

fmn.backends package

Submodules
fmn.backends.base module
class fmn.backends.base.APIClient(base_url: str | None = None, **kwargs)[source]

Bases: ABC

property api_url: str | None
property base_url_with_trailing_slash: str
abstract determine_next_page_params(url: str, params: dict, result: dict) tuple[str, dict] | tuple[None, None][source]

Determine parameters for next page.

Parameters:
  • url – API endpoint URL

  • params – Query parameters (can be modified)

  • result – Result dictionary of previous query

Returns:

Tuple of (new URL, new params dict) or (None, None) if last page

extract_payload(result: dict, payload_field: str | None = None) Any[source]
async get(url: str, **kwargs) Any[source]

Query the API for a single result.

async get_paginated(url: str, *, params: dict | None = None, payload_field: str | None = None, **kwargs) AsyncIterator[source]

Query the API and iterate over paginated results if applicable.

async get_payload(url: str, *, payload_field: str | None = None, **kwargs) Any[source]
payload_field: str | None

The payload field in a paginated response.

exception fmn.backends.base.PaginationRecursionError[source]

Bases: RuntimeError

fmn.backends.base.handle_http_error(default_factory)[source]
fmn.backends.fasjson module
class fmn.backends.fasjson.FASJSONAsyncProxy(base_url: str)[source]

Bases: APIClient

Proxy for the FASJSON API endpoints used in FMN

API_VERSION = 'v1'
FAS_TOPIC_RE = re.compile('fas\\.(?P<usergroup>user|group)\\.(?P<event>member\\.sponsor|create|update)$')
property api_url: str
determine_next_page_params(url: str, params: dict, result: dict) tuple[str, dict] | tuple[None, None][source]

Determine parameters for next page.

Parameters:
  • url – API endpoint URL

  • params – Query parameters (can be modified)

  • result – Result dictionary of previous query

Returns:

Tuple of (new URL, new params dict) or (None, None) if last page

async get_user(*, username: str) dict[source]
async get_user_groups(*, username: str) dict[source]
async invalidate_on_message(message: Message) None[source]
payload_field: str | None = 'result'

The payload field in a paginated response.

async search_users(username: str | None = None, username__exact: str | None = None, **params: dict[str, Any]) list[dict][source]
fmn.backends.fasjson.get_fasjson_proxy() FASJSONAsyncProxy[source]
fmn.backends.pagure module
class fmn.backends.pagure.PagureAsyncProxy(base_url: str | None = None, **kwargs)[source]

Bases: APIClient

Proxy for the FASJSON API endpoints used in FMN

API_VERSION = '0'
PROJECT_TOPIC_RE = re.compile('pagure\\.project\\.(?P<usergroup>user|group)\\.(?P<action>access\\.updated|added|removed)$')
property api_url: str
determine_next_page_params(url: str, params: dict, result: dict) tuple[str, dict] | tuple[None, None][source]

Determine parameters for next page.

Parameters:
  • url – API endpoint URL

  • params – Query parameters (can be modified)

  • result – Result dictionary of previous query

Returns:

Tuple of (new URL, new params dict) or (None, None) if last page

async get_group_projects(*, name: str, acl: PagureRole | None = None) list[dict[str, Any]][source]
async get_project_groups(*, project_path: str, roles: PagureRole = PagureRole.GROUP_ROLES_MAINTAINER) list[str][source]
async get_project_users(*, project_path: str, roles: PagureRole = PagureRole.USER_ROLES_MAINTAINER) list[str][source]
async get_projects(*, namespace: str | None = None, pattern: str | None = None, username: str | None = None, owner: str | None = None, short: bool = True, fork: bool = False) list[dict[str, Any]][source]
async get_user_projects(*, username: str) list[dict[str, Any]][source]
async invalidate_on_message(message: Message) None[source]
class fmn.backends.pagure.PagureRole(value)[source]

Bases: IntFlag

An enumeration.

ADMIN = 2
COLLABORATOR = 8
COMMIT = 4
GROUP_ROLES = 30
GROUP_ROLES_MAINTAINER = 14
GROUP_ROLES_MAINTAINER_SET = {PagureRole.ADMIN, PagureRole.COMMIT, PagureRole.COLLABORATOR}
GROUP_ROLES_SET = {PagureRole.ADMIN, PagureRole.COMMIT, PagureRole.COLLABORATOR, PagureRole.TICKET}
OWNER = 1
TICKET = 16
USER_ROLES = 31
USER_ROLES_MAINTAINER = 15
USER_ROLES_MAINTAINER_SET = {PagureRole.OWNER, PagureRole.ADMIN, PagureRole.COMMIT, PagureRole.COLLABORATOR}
USER_ROLES_SET = {PagureRole.OWNER, PagureRole.ADMIN, PagureRole.COMMIT, PagureRole.COLLABORATOR, PagureRole.TICKET}
fmn.backends.pagure.get_distgit_proxy() PagureAsyncProxy[source]

fmn.cache package

Submodules
fmn.cache.base module
class fmn.cache.base.CachedValue[source]

Bases: object

Manage a cached value.

async compute_value(*args, **kwargs)[source]
async get_value(*args, **kwargs)[source]
async invalidate()[source]
async invalidate_on_message(message: Message)[source]
name = None
async refresh(*args, **kwargs)[source]
fmn.cache.cli module
fmn.cache.rules module
class fmn.cache.rules.RulesCache[source]

Bases: CachedValue

Cache the rules currently in the database.

cached_method = '_get_value'
async get_rules(db: AsyncSession)[source]
async get_value(db: AsyncSession)[source]
async invalidate_on_message(message: Message)[source]
name = 'rules'
fmn.cache.tracked module
class fmn.cache.tracked.Tracked(packages: set = <factory>, containers: set = <factory>, modules: set = <factory>, flatpaks: set = <factory>, usernames: set = <factory>, agent_name: set = <factory>)[source]

Bases: object

agent_name: set
containers: set
flatpaks: set
modules: set
packages: set
usernames: set
class fmn.cache.tracked.TrackedCache(requester: Requester, rules_cache: RulesCache)[source]

Bases: CachedValue

Used to quickly know whether we want to process an incoming message.

It can be called outside of the message-processing loop to refresh the cache.

Cases when the cache should be refreshed: - a rule is changed - a user is added or removed to/from a group - an artifact has their owners (users or groups) changed

The Consumer listens to those events as messages on the bus.

If this happens too frequently, we can just refresh after X minutes have passed and tell users that their changes will take X minutes to be active.

async get_value(db: AsyncSession)[source]
async invalidate_on_message(message: Message)[source]
name = 'tracked'
fmn.cache.util module
fmn.cache.util.cache_arg(arg: str, scope: str | None = None) Callable[[str, str | None], Any][source]

Generate a cached function for cashews decorator arguments.

The purpose of this is to evaluate the settings late (i.e. the first time a decorated callable is called), so customized settings can be applied effectively.

fmn.cache.util.cache_ttl(scope: str | None = None) Callable[[str, str | None], Any]

Generate a cached function for cashews decorator arguments.

The purpose of this is to evaluate the settings late (i.e. the first time a decorated callable is called), so customized settings can be applied effectively.

fmn.cache.util.configure_cache(**kwargs)[source]
fmn.cache.util.get_pattern_for_cached_calls(func: Callable, **kwargs: dict[str, Any]) list[str][source]
fmn.cache.util.lock_ttl(scope: str | None = None) Callable[[str, str | None], Any]

Generate a cached function for cashews decorator arguments.

The purpose of this is to evaluate the settings late (i.e. the first time a decorated callable is called), so customized settings can be applied effectively.

fmn.consumer package

Submodules
fmn.consumer.consumer module
class fmn.consumer.consumer.Consumer[source]

Bases: object

async handle_or_rollback(message: Message)[source]
async is_tracked(message: Message, db: AsyncSession)[source]
async refresh_cache_if_needed(message: Message)[source]
async setup()[source]
fmn.consumer.send_queue module
class fmn.consumer.send_queue.SendQueue(config: dict)[source]

Bases: object

async close()[source]
async connect()[source]
async send(notification: Notification)[source]
async fmn.consumer.send_queue.backoff_hdlr(details)[source]
fmn.consumer.send_queue.giveup_hdlr(details)[source]

fmn.core package

Submodules
fmn.core.amqp module
fmn.core.amqp.get_url_from_config(config: dict)[source]
fmn.core.cli module
fmn.core.config module
class fmn.core.config.CacheArgsModel(*, ttl: int | float | str | timedelta | None = None, lock_ttl: int | float | str | timedelta | None = None, early_ttl: int | float | str | timedelta | None = None)[source]

Bases: BaseModel

early_ttl: int | float | str | timedelta | None
lock_ttl: int | float | str | timedelta | None
ttl: int | float | str | timedelta | None
class fmn.core.config.CacheModel(*, url: UrlValue = 'mem://', setup_args: dict[str, Any] | None = None, default_args: CacheArgsModel = CacheArgsModel(ttl='1h', lock_ttl=None, early_ttl=None), scoped_args: CacheScopedArgsModel = CacheScopedArgsModel(tracked=CacheArgsModel(ttl='1d', lock_ttl='1h', early_ttl='20h'), rules=CacheArgsModel(ttl='1d', lock_ttl='5m', early_ttl='20h'), pagure=None, fasjson=None))[source]

Bases: BaseModel

default_args: CacheArgsModel
scoped_args: CacheScopedArgsModel
setup_args: dict[str, Any] | None
url: UrlValue
class fmn.core.config.CacheScopedArgsModel(*, tracked: CacheArgsModel = CacheArgsModel(ttl='1d', lock_ttl='1h', early_ttl='20h'), rules: CacheArgsModel = CacheArgsModel(ttl='1d', lock_ttl='5m', early_ttl='20h'), pagure: CacheArgsModel | None = None, fasjson: CacheArgsModel | None = None)[source]

Bases: BaseModel

fasjson: CacheArgsModel | None
pagure: CacheArgsModel | None
rules: CacheArgsModel
tracked: CacheArgsModel
class fmn.core.config.DBModel(*, sqlalchemy: SQLAlchemyModel = SQLAlchemyModel(url='sqlite:///:memory:', echo=False))[source]

Bases: BaseModel

sqlalchemy: SQLAlchemyModel
class fmn.core.config.SQLAlchemyModel(*, url: UrlValue = 'sqlite:///:memory:', echo: bool = False, **extra_data: Any)[source]

Bases: BaseModel

class Config[source]

Bases: object

extra = 'allow'
echo: bool
url: UrlValue
class fmn.core.config.ServicesModel(*, fasjson_url: UrlValue = 'https://fasjson.fedoraproject.org', distgit_url: UrlValue = 'https://src.fedoraproject.org')[source]

Bases: BaseModel

distgit_url: UrlValue
fasjson_url: UrlValue
class fmn.core.config.Settings(_env_file: str | PathLike | List[str | PathLike] | Tuple[str | PathLike, ...] | None = '<object object>', _env_file_encoding: str | None = None, _env_nested_delimiter: str | None = None, _secrets_dir: str | PathLike | None = None, *, cors_origins: str = 'https://notifications.fedoraproject.org', oidc_provider_url: str = 'https://id.fedoraproject.org/openidc', oidc_conf_endpoint: str = '/.well-known/openid-configuration', oidc_token_info_endpoint: str = '/TokenInfo', oidc_user_info_endpoint: str = '/UserInfo', oidc_client_id: str = '0123456789abcdef0123456789abcdef', oidc_client_secret: str = '0123456789abcdef0123456789abcdef', admin_groups: list[str] = ['sysadmin-main'], oidc_conf_url: str | None = None, oidc_token_info_url: str | None = None, id_cache_gc_interval: int = 300, database: DBModel = DBModel(sqlalchemy=SQLAlchemyModel(url='sqlite:///:memory:', echo=False)), cache: CacheModel = CacheModel(url='mem://', setup_args=None, default_args=CacheArgsModel(ttl='1h', lock_ttl=None, early_ttl=None), scoped_args=CacheScopedArgsModel(tracked=CacheArgsModel(ttl='1d', lock_ttl='1h', early_ttl='20h'), rules=CacheArgsModel(ttl='1d', lock_ttl='5m', early_ttl='20h'), pagure=None, fasjson=None)), services: ServicesModel = ServicesModel(fasjson_url='https://fasjson.fedoraproject.org', distgit_url='https://src.fedoraproject.org'))[source]

Bases: BaseSettings

class Config[source]

Bases: object

env_file = 'fmn.cfg'
env_nested_delimiter = '__'
admin_groups: list[str]
cache: CacheModel
classmethod compute_compound_fields(values) dict[source]
cors_origins: str
database: DBModel
id_cache_gc_interval: int
oidc_client_id: str
oidc_client_secret: str
oidc_conf_endpoint: str
oidc_conf_url: str | None
oidc_provider_url: str
oidc_token_info_endpoint: str
oidc_token_info_url: str | None
oidc_user_info_endpoint: str
services: ServicesModel
fmn.core.config.get_settings() Settings[source]
fmn.core.config.set_settings_file(path: str) None[source]
fmn.core.constants module
class fmn.core.constants.ArtifactType(value)[source]

Bases: Enum

An enumeration.

containers = 'containers'
flatpaks = 'flatpaks'
classmethod has_value(value)[source]
modules = 'modules'
packages = 'rpms'
fmn.core.version module

fmn.database package

Subpackages
fmn.database.model package
Submodules
fmn.database.model.destination module
class fmn.database.model.destination.Destination(**kwargs)[source]

Bases: Base

address
generate(message: Message) Notification.content[source]
generation_rule
generation_rule_id
id
protocol
fmn.database.model.filter module
class fmn.database.model.filter.Filter(**kwargs)[source]

Bases: Base

generation_rule
generation_rule_id
get_implementation(requester: Requester)[source]
id
matches(message: Message, requester: Requester)[source]
name
params
fmn.database.model.generated module
class fmn.database.model.generated.Generated(**kwargs)[source]

Bases: Base

count
id
rule
rule_id
when
fmn.database.model.generation_rule module
class fmn.database.model.generation_rule.GenerationRule(**kwargs)[source]

Bases: Base

destinations
filters
async handle(message: Message, requester: Requester) AsyncIterator[Notification][source]
id
rule
rule_id
fmn.database.model.rule module
class fmn.database.model.rule.Rule(**kwargs)[source]

Bases: Base

disabled
generated
generation_rules
async handle(message: Message, requester: Requester)[source]
id
name

Convenience method to query rules and related property objects.

This tells SQLAlchemy to query ORM objects related to a Rule right in the query which is necessary when accessing their respective properties in an async context.

tracking_rule
user
user_id
fmn.database.model.tracking_rule module
class fmn.database.model.tracking_rule.TrackingRule(**kwargs)[source]

Bases: Base

get_implementation(requester: Requester)[source]
id
async matches(message: Message, requester: Requester)[source]
name
params
async prime_cache(cache, requester: Requester)[source]
rule
rule_id
fmn.database.model.user module
class fmn.database.model.user.User(**kwargs)[source]

Bases: Base

id
name
rules
Submodules
fmn.database.cli module
fmn.database.cli.verify_db_url_not_default()[source]

Verify the DB URL is set to a valid value.

fmn.database.main module
class fmn.database.main.CustomBase[source]

Bases: object

async classmethod async_get(db_session: AsyncSession, **attrs) Base[source]

Get an object from the datbase.

Parameters:

db_session – The SQLAlchemy session to use

Returns:

the object

async classmethod async_get_or_create(db_session: AsyncSession, **attrs) Base[source]

Get an object from the database or create if missing.

Parameters:

db_session – The SQLAlchemy session to use

Returns:

the object

The returned object will have an (ephemeral) boolean attribute _was_created which allows finding out if it existed previously or not.

fmn.database.main.get_async_engine()[source]
fmn.database.main.get_sync_engine()[source]
async fmn.database.main.init_async_model(async_engine: AsyncEngine | None = None)[source]
fmn.database.main.init_sync_model(sync_engine: Engine | None = None)[source]
fmn.database.setup module
fmn.database.setup.setup_db_schema(engine_or_session: Engine | Session | None = None) None[source]

fmn.messages package

Submodules
fmn.messages.base module
class fmn.messages.base.BaseMessage(body=None, headers=None, topic=None, properties=None, severity=None)[source]

Bases: Message

property app_icon

An URL to the icon of the application that generated the message.

Note

Sub-classes should override this method if their application has an icon and they wish that image to appear in applications that consume messages.

Returns:

The URL to the app’s icon.

Return type:

str or None

property app_name

The name of the application that generated the message.

Note

Sub-classes should override this method.

Returns:

The name of the application.

Return type:

str or None

fmn.messages.rule module
class fmn.messages.rule.RuleCreateV1(body=None, headers=None, topic=None, properties=None, severity=None)[source]

Bases: BaseMessage

body_schema = {'$schema': 'http://json-schema.org/draft-04/schema#', 'description': 'A rule was created', 'id': 'http://fedoraproject.org/message-schema/fmn', 'properties': {'rule': {'properties': {'id': {'description': 'The ID of the rule', 'type': 'integer'}, 'name': {'description': 'The name of the rule', 'type': 'string'}}, 'type': 'object'}, 'user': {'properties': {'name': {'description': 'The FAS username', 'type': 'string'}}, 'type': 'object'}}, 'required': ['rule', 'user'], 'type': 'object'}
topic = 'fmn.rule.update.v1'
class fmn.messages.rule.RuleDeleteV1(body=None, headers=None, topic=None, properties=None, severity=None)[source]

Bases: BaseMessage

body_schema = {'$schema': 'http://json-schema.org/draft-04/schema#', 'description': 'A rule was deleted', 'id': 'http://fedoraproject.org/message-schema/fmn', 'properties': {'rule': {'properties': {'id': {'description': 'The ID of the rule', 'type': 'integer'}, 'name': {'description': 'The name of the rule', 'type': 'string'}}, 'type': 'object'}, 'user': {'properties': {'name': {'description': 'The FAS username', 'type': 'string'}}, 'type': 'object'}}, 'required': ['rule', 'user'], 'type': 'object'}
topic = 'fmn.rule.delete.v1'
class fmn.messages.rule.RuleUpdateV1(body=None, headers=None, topic=None, properties=None, severity=None)[source]

Bases: BaseMessage

body_schema = {'$schema': 'http://json-schema.org/draft-04/schema#', 'description': 'A rule was updated', 'id': 'http://fedoraproject.org/message-schema/fmn', 'properties': {'rule': {'properties': {'id': {'description': 'The ID of the rule', 'type': 'integer'}, 'name': {'description': 'The name of the rule', 'type': 'string'}}, 'type': 'object'}, 'user': {'properties': {'name': {'description': 'The FAS username', 'type': 'string'}}, 'type': 'object'}}, 'required': ['rule', 'user'], 'type': 'object'}
topic = 'fmn.rule.update.v1'

fmn.rules package

Submodules
fmn.rules.filter module
class fmn.rules.filter.Applications(requester: Requester, params, username)[source]

Bases: Filter

matches(message)[source]
name: str = 'applications'
class fmn.rules.filter.Filter(requester: Requester, params, username)[source]

Bases: object

matches(message: Message)[source]
name: str
class fmn.rules.filter.MyActions(requester: Requester, params, username)[source]

Bases: Filter

matches(message)[source]
name: str = 'my_actions'
class fmn.rules.filter.Severities(*args, **kwargs)[source]

Bases: Filter

default = (20, 30, 40)
matches(message)[source]
name: str = 'severities'
class fmn.rules.filter.Topic(requester: Requester, params, username)[source]

Bases: Filter

matches(message)[source]
name: str = 'topic'
fmn.rules.notification module
class fmn.rules.notification.EmailNotification(*, protocol: Literal['email'], content: EmailNotificationContent)[source]

Bases: FrozenModel

content: EmailNotificationContent
protocol: Literal['email']
class fmn.rules.notification.EmailNotificationContent(*, headers: EmailNotificationHeaders, body: str)[source]

Bases: FrozenModel

body: str
headers: EmailNotificationHeaders
class fmn.rules.notification.EmailNotificationHeaders(*, To: str, Subject: str)[source]

Bases: FrozenModel

Subject: str
To: str
class fmn.rules.notification.FrozenModel[source]

Bases: BaseModel

class Config[source]

Bases: object

frozen = True
class fmn.rules.notification.IRCNotification(*, protocol: Literal['irc'], content: IRCNotificationContent)[source]

Bases: FrozenModel

content: IRCNotificationContent
protocol: Literal['irc']
class fmn.rules.notification.IRCNotificationContent(*, to: str, message: str)[source]

Bases: FrozenModel

message: str
to: str
class fmn.rules.notification.MatrixNotification(*, protocol: Literal['matrix'], content: MatrixNotificationContent)[source]

Bases: FrozenModel

content: MatrixNotificationContent
protocol: Literal['matrix']
class fmn.rules.notification.MatrixNotificationContent(*, to: str, message: str)[source]

Bases: FrozenModel

message: str
to: str
class fmn.rules.notification.Notification(*, __root__: EmailNotification | IRCNotification | MatrixNotification)[source]

Bases: FrozenModel

fmn.rules.requester module
class fmn.rules.requester.Requester(config)[source]

Bases: object

async invalidate_on_message(message: Message)[source]
fmn.rules.tracking_rules module
class fmn.rules.tracking_rules.ArtifactsFollowed(*args, **kwargs)[source]

Bases: TrackingRule

async matches(message)[source]
name: str | None = 'artifacts-followed'
async prime_cache(cache)[source]
class fmn.rules.tracking_rules.ArtifactsGroupOwned(*args, **kwargs)[source]

Bases: TrackingRule

async matches(message)[source]
name: str | None = 'artifacts-group-owned'
async prime_cache(cache)[source]
class fmn.rules.tracking_rules.ArtifactsOwned(*args, **kwargs)[source]

Bases: TrackingRule

async matches(message)[source]
name: str | None = 'artifacts-owned'
async prime_cache(cache)[source]
class fmn.rules.tracking_rules.RelatedEvents(*args, **kwargs)[source]

Bases: TrackingRule

async matches(message)[source]
name: str | None = 'related-events'
async prime_cache(cache)[source]
class fmn.rules.tracking_rules.TrackingRule(requester: Requester, params, owner)[source]

Bases: object

async matches(message: Message)[source]
name: str | None = None
async prime_cache(cache)[source]
class fmn.rules.tracking_rules.UsersFollowed(*args, **kwargs)[source]

Bases: TrackingRule

async matches(message)[source]
name: str | None = 'users-followed'
async prime_cache(cache)[source]

fmn.sender package

Submodules
fmn.sender.cli module
fmn.sender.config module
fmn.sender.config.get_config(path)[source]
fmn.sender.config.get_handler(config)[source]
fmn.sender.config.setup_logging(config)[source]
fmn.sender.consumer module
class fmn.sender.consumer.Consumer(config, handler)[source]

Bases: object

async connect()[source]
async start()[source]
async stop()[source]
fmn.sender.email module
class fmn.sender.email.EmailHandler(config)[source]

Bases: Handler

async handle(message)[source]
async setup()[source]
async stop()[source]
fmn.sender.handler module
class fmn.sender.handler.Handler(config)[source]

Bases: object

property closed

Default closed Future, can be overridden in child classes.

It should be triggered when there is an error and the app should stop.

async handle(message)[source]
async setup()[source]
async stop()[source]
exception fmn.sender.handler.HandlerError[source]

Bases: Exception

class fmn.sender.handler.PrintHandler(config)[source]

Bases: Handler

async handle(message)[source]
fmn.sender.irc module
class fmn.sender.irc.IRCClient(*args, **kwargs)[source]

Bases: AioSimpleIRCClient

async connect(*args, **kwargs)[source]

Connect using the underlying connection

async disconnect()[source]
on_900(connection, event)[source]
on_disconnect(connection, event)[source]
on_error(connection, event)[source]
on_nicknameinuse(connection, event)[source]
async privmsg(*args, **kwargs)[source]
class fmn.sender.irc.IRCHandler(*args, **kwargs)[source]

Bases: Handler

property closed

Default closed Future, can be overridden in child classes.

It should be triggered when there is an error and the app should stop.

async handle(message)[source]
async setup()[source]
async stop()[source]
fmn.sender.matrix module
class fmn.sender.matrix.MatrixHandler(config)[source]

Bases: Handler

async create_dm_room(dest)[source]
async get_dm_room(dest)[source]
async handle(message)[source]
async refresh_dm_rooms_cache()[source]
async send_dm(room_id, message)[source]
async setup()[source]
async stop()[source]
async update_dm_rooms_cache()[source]

Contributor Guide

You need to be legally allowed to submit any contribution to this project. What this means in detail is laid out at the Developer Certificate of Origin website. The mechanism by which you certify this is adding a Signed-off-by trailer to git commit log messages, you can do this by using the --signoff/-s option to git commit.

Changelog

Significant changes should appear in the ChangeLog. To that end, contributors must create a changelog entry using Towncrier and the appropriate category.

The format is based on Keep a Changelog.

The syntax to create a changelog entry is the following:

poetry run towncrier create -c "Added a cool feature" issuenumber.category.md

Where issuenumber is the issue number in Github, and category is one of:

  • security in case of vulnerabilities

  • removed for now removed features

  • deprecated for soon-to-be removed features

  • added for new features

  • changed for changes to existing functionality

  • fixed for any bug fixes

For example:

poetry run towncrier create -c "Added a cool feature!" 42.added.md

If the change does not fit into any category, prefix the filename with a “plus”, for example:

poetry run towncrier create -c "A fix without an issue number!" +something-unique.fixed.md

Components

FMN consists of several components, most of which run in Fedora Infrastructure.

Components of FMN

Message Consumer

The Message Consumer reads messages carried on the Message Bus. If a message is matched by a rule, it triggers the appropriate Notification Sender to send an email or chat message to the user who set up the rule.

It queries FASJSON and dist-git (Pagure) for information about users, groups and projects and stores the information in the Shared Service Cache (which is also used by the Configuration Backend) and is responsible for invalidating cached entries when receiving messages that affected objects have changed.

There can be multiple instances of the Message consumer, each will process incoming messages in a round-robin fashion.

Notification Sender

A Notification Sender receives messages over a private message queue from the Message Consumer, triggering it to send notifications to users over various communication channels, such as IRC, email or Matrix. These messages contain all information a sender needs to perform the work, no additional lookups in other services are necessary.

Because of how IRC and Matrix work, there can only be one instance of their respective senders.

Configuration Frontend

The Configuration Frontend is an application running in a web browser and is implemented using Vue.js. It lets users configure rules specifying which bus messages they want to be notified about, e.g.:

  • Regarding certain artifacts they’re interested in,

  • regarding themselves or groups they are a member of,

  • regarding someone else, e.g. a mentee or someone they sponsored.

It communicates with the Configuration Backend over a REST web API which also acts as an intermediary cache to services like FASJSON and dist-git (Pagure).

The Configuration Frontend lets users authenticate themselves with Ipsilon (our OpenID Connect identity provider) and uses the token it receives to establish a user’s identity with the Configuration Backend.

Configuration Backend

The Configuration Backend is a service implementing a REST web API using FastAPI and Python. It lets users manage their notification rules which are stored in the Database. Additionally, it works as a proxy cache to FASJSON and dist-git (Pagure) using the Shared Service Cache which is also used by the Message Consumer.

The Configuration Frontend which uses the web API on behalf of a user establishes their identity to the Configuration Backend using the token it got from the OpenID Connect Identity Provider (Ipsilon).

Shared Service Cache

The Shared Service Cache is used by the Message Consumer and the Configuration Backend. It is implemented using the cashews Python package and uses Redis as a data store.

Database

The Database is used to store the rules describing which messages users want to be notified about, as well as the number of notifications generated by a rule, to produce statistics for users and administrators.

It is implemented as a PostgreSQL RDBMS and accessed by the Message Consumer and the Configuration Backend using the SQLAlchemy object relational mapper.

Rules

Rules in FMN consist of several components: One Tracking Rule, and potentially many Filters and Destinations (grouped into Generation Rules). The Message Consumer looks at all messages transported over the bus and if any match a Rule, Notifications will be sent to the respective Notification Sender, one instance of each is responsible for actually sending out emails and chat messages over IRC or Matrix, respectively.

Tracking Rules

Each Rule has exactly one Tracking Rule which specifies what messages should be tracked, e.g. messages that concern:

  • Artifacts owned by the user or a group the user is member of,

  • specific named artifacts,

  • the user themselves, or

  • followed users.

If a Tracking Rule matches, its Generation Rules will be consulted for further processing.

Generation Rules

Each Rule contains one or more Generation Rules which group together zero or more Filters and one or more Destinations. If no Filters exist, or all of them match, Notifications will be created for each Destination which are processed by the respective Sender.

This lets users e.g. specify that messages of a lower severity should be sent via email, while higher severities should let FMN ping the user via IRC or Matrix.

Filters

Filters further restrict if a message should be matched by a Rule and notifications should be sent. Users can configure filters for these criteria:

  • The name of the application sending a message.

  • The severity(*) of the message.

  • If a message was caused by an action of the user.

  • If the message topic matches a certain glob pattern.

(*): At this point, we don’t know that any app in Fedora infrastructure tags its messages with a severity, which makes them default to INFO.

Destinations

A Destination contains information about how a user should be notified if a Rule matches, e.g. via email, IRC or Matrix. The destinations available to a user are retrieved from their account and can be configured in Noggin.

Changelog

All notable changes to this project will be documented in this file.

The format is based on Keep a Changelog, and this project adheres to Semantic Versioning.

This project uses towncrier and the changes for the upcoming release can be found in https://github.com/fedora-infra/fmn/tree/main/changelog.d/.