Source code for fmn.api.handlers.users

# SPDX-FileCopyrightText: Contributors to the Fedora Project
#
# SPDX-License-Identifier: MIT

import logging
from datetime import datetime, timedelta
from urllib.parse import urlparse

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import func, select
from sqlalchemy.exc import NoResultFound
from sqlalchemy.ext.asyncio import AsyncSession

from fmn_messages.rule import RuleCreateV1, RuleDeleteV1, RuleUpdateV1

from ...backends import FASJSONAsyncProxy, get_fasjson_proxy
from ...core.constants import DEFAULT_MATRIX_DOMAIN
from ...database.model import Destination, Filter, Generated, GenerationRule, Rule, User
from .. import api_models
from ..auth import Identity, get_identity, get_identity_optional
from ..database import gen_db_session
from ..messaging import publish
from .utils import db_rule_from_api_rule

log = logging.getLogger(__name__)

router = APIRouter(prefix="/users")


[docs] @router.get("/me", response_model=api_models.User, tags=["users"]) async def get_me( identity: Identity = Depends(get_identity), db_session: AsyncSession = Depends(gen_db_session), ): user, created = await User.get_or_create(db_session, name=identity.name) user.is_admin = identity.admin return user
[docs] @router.get("", response_model=list[str], tags=["users"]) async def get_users( search: str | None = None, identity: Identity = Depends(get_identity_optional), fasjson_proxy: FASJSONAsyncProxy = Depends(get_fasjson_proxy), ): if not search: if identity and identity.name: return [identity.name] else: return [] return [u["username"] for u in await fasjson_proxy.search_users(username=search)]
[docs] @router.get("/{username}/info", tags=["users"]) async def get_user_info(username, fasjson_proxy: FASJSONAsyncProxy = Depends(get_fasjson_proxy)): return await fasjson_proxy.get_user(username=username)
[docs] @router.get("/{username}/groups", response_model=list[str], tags=["users"]) async def get_user_groups(username, fasjson_proxy: FASJSONAsyncProxy = Depends(get_fasjson_proxy)): return [g["groupname"] for g in await fasjson_proxy.get_user_groups(username=username)]
[docs] @router.get("/{username}/destinations", response_model=list[api_models.Destination], tags=["users"]) async def get_user_destinations( username, fasjson_proxy: FASJSONAsyncProxy = Depends(get_fasjson_proxy) ): user = await fasjson_proxy.get_user(username=username) result = [{"protocol": "email", "address": email} for email in user["emails"]] nicks = user.get("ircnicks") or [] for nick in nicks: url = urlparse(nick) address = url.path.lstrip("/") protocol = url.scheme if not protocol: protocol = "irc" if protocol == "matrix": domain = url.hostname or DEFAULT_MATRIX_DOMAIN address = f"@{address}:{domain}" result.append({"protocol": protocol, "address": address}) return result
[docs] @router.get("/{username}/rules", response_model=list[api_models.Rule], tags=["users/rules"]) async def get_user_rules( username, identity: Identity = Depends(get_identity), db_session: AsyncSession = Depends(gen_db_session), ): if username != identity.name: raise HTTPException(status_code=403, detail="Not allowed to see someone else's rules") db_result = await db_session.execute( Rule.select_related().filter(Rule.user.has(name=username)).order_by(Rule.id) ) rules = db_result.scalars().all() # Collect the number of notifications sent last_week = datetime.now() - timedelta(days=7) db_result = await db_session.execute( select(Rule.id, func.sum(Generated.count)) .join(Rule) .group_by(Rule) .filter(Rule.id.in_([r.id for r in rules])) .filter(Generated.when >= last_week) ) counts = dict(db_result.all()) for rule in rules: rule.generated_last_week = counts.get(rule.id, 0) return rules
[docs] @router.get("/{username}/rules/{id}", response_model=api_models.Rule, tags=["users/rules"]) async def get_user_rule( username: str, id: int, identity: Identity = Depends(get_identity), db_session: AsyncSession = Depends(gen_db_session), ): if username != identity.name: raise HTTPException(status_code=403, detail="Not allowed to see someone else's rules") try: return ( await db_session.execute( Rule.select_related().filter(Rule.id == id, Rule.user.has(name=username)) ) ).scalar_one() except NoResultFound as e: raise HTTPException(status_code=404, detail=f"No rule with ID {id}.") from e
[docs] @router.put("/{username}/rules/{id}", response_model=api_models.Rule, tags=["users/rules"]) async def edit_user_rule( username: str, id: int, rule: api_models.Rule, identity: Identity = Depends(get_identity), db_session: AsyncSession = Depends(gen_db_session), ): if username != identity.name: raise HTTPException(status_code=403, detail="Not allowed to edit someone else's rules") rule_db = ( await db_session.execute( Rule.select_related().filter(Rule.id == id, Rule.user.has(name=username)) ) ).scalar_one() rule_db.name = rule.name rule_db.disabled = rule.disabled rule_db.tracking_rule.name = rule.tracking_rule.name rule_db.tracking_rule.params = rule.tracking_rule.params for to_delete in rule_db.generation_rules[len(rule.generation_rules) :]: await db_session.delete(to_delete) for index, gr in enumerate(rule.generation_rules): try: gr_db = rule_db.generation_rules[index] except IndexError: gr_db = GenerationRule(rule=rule_db) rule_db.generation_rules.append(gr_db) for to_delete in gr_db.destinations[len(gr.destinations) :]: await db_session.delete(to_delete) for index, dst in enumerate(gr.destinations): try: dst_db = gr_db.destinations[index] except IndexError: dst_db = Destination( generation_rule=gr_db, protocol=dst.protocol, address=dst.address ) gr_db.destinations.append(dst_db) else: dst_db.protocol = dst.protocol dst_db.address = dst.address to_delete = [ f for f in gr_db.filters if f.name not in gr.filters.model_dump(exclude_unset=True) ] for f in to_delete: await db_session.delete(f) existing_filters = {f.name: f for f in gr_db.filters} for f_name, f_params in gr.filters.model_dump(exclude_unset=True).items(): try: f_db = existing_filters[f_name] except KeyError: f_db = Filter(generation_rule=gr_db, name=f_name, params=f_params) gr_db.filters.append(f_db) else: f_db.name = f_name f_db.params = f_params await db_session.flush() await db_session.commit() # Refresh using the full query to get relationships db_session.expire(rule_db) rule_db = ( await db_session.execute( Rule.select_related().filter(Rule.id == id, Rule.user.has(name=username)) ) ).scalar_one() message = RuleUpdateV1( body={ "rule": api_models.Rule.model_validate(rule_db).model_dump(), "user": api_models.User.model_validate(rule_db.user).model_dump(), } ) await publish(message) return rule_db
[docs] @router.delete("/{username}/rules/{id}", tags=["users/rules"]) async def delete_user_rule( username: str, id: int, identity: Identity = Depends(get_identity), db_session: AsyncSession = Depends(gen_db_session), ): if username != identity.name: raise HTTPException(status_code=403, detail="Not allowed to delete someone else's rules") # We need the full query to populate the outgoing message rule = ( await db_session.execute( Rule.select_related().filter(Rule.id == id, Rule.user.has(name=username)) ) ).scalar_one() message = RuleDeleteV1( body={ "rule": api_models.Rule.model_validate(rule).model_dump(), "user": api_models.User.model_validate(rule.user).model_dump(), } ) await db_session.delete(rule) await db_session.commit() await publish(message)
[docs] @router.post("/{username}/rules", response_model=api_models.Rule, tags=["users/rules"]) async def create_user_rule( username, rule: api_models.NewRule, identity: Identity = Depends(get_identity), db_session: AsyncSession = Depends(gen_db_session), ): if username != identity.name: raise HTTPException(status_code=403, detail="Not allowed to edit someone else's rules") log.info("Creating rule: %s", rule) user = await User.get_one(db_session, name=username) rule_db = db_rule_from_api_rule(rule, user) db_session.add(rule_db) await db_session.commit() # Refresh using the full query to get relationships rule_db = ( await db_session.execute( Rule.select_related().filter(Rule.id == rule_db.id, Rule.user.has(name=username)) ) ).scalar_one() message = RuleCreateV1( body={ "rule": api_models.Rule.model_validate(rule_db).model_dump(), "user": api_models.User.model_validate(rule_db.user).model_dump(), } ) await publish(message) return rule_db