Source code for fmn.database.model.destination
# SPDX-FileCopyrightText: Contributors to the Fedora Project
#
# SPDX-License-Identifier: MIT
import logging
from datetime import datetime, timezone
from email.utils import format_datetime
from typing import TYPE_CHECKING
from httpx import AsyncClient, HTTPStatusError
from sqlalchemy import Column, ForeignKey, Integer, String, UnicodeText, select
from sqlalchemy.ext.asyncio import async_object_session
from sqlalchemy.orm import relationship
from ...core.config import get_settings
from ..main import Base
from .generation_rule import GenerationRule
if TYPE_CHECKING:
from fedora_messaging.message import Message
from ...rules.notification import Notification
log = logging.getLogger(__name__)
[docs]
class Destination(Base):
__tablename__ = "destinations"
id = Column(Integer, primary_key=True, nullable=False)
generation_rule_id = Column(
Integer, ForeignKey(GenerationRule.id, ondelete="CASCADE"), nullable=False
)
generation_rule = relationship(GenerationRule, back_populates="destinations")
protocol = Column(String(length=255), nullable=False)
address = Column(UnicodeText, nullable=False)
[docs]
async def generate(self, message: "Message") -> "Notification.content":
app_name = f"[{message.app_name}] " if message.app_name else ""
url = message.url if message.url else ""
settings = get_settings()
if self.protocol == "email":
body = f"{message!s}\n{url}"
extra = await get_extra(message)
if extra:
body = f"{body}\n{extra}"
# Find the URL of the Rule that generated this notification
session = async_object_session(self)
result = await session.execute(
select(GenerationRule).where(GenerationRule.id == self.generation_rule_id)
)
rule_id = result.scalar_one().rule_id
return {
"headers": {
"To": self.address,
"Subject": f"{app_name}{message.summary}",
"Date": format_datetime(get_sent_datetime(message)),
"X-FMN-Message-Id": (
f"{message.id} <{settings.services.datagrepper_url}"
f"/v2/id?id={message.id}&size=extra-large>"
),
"X-FMN-Message-Topic": message.topic,
},
"body": body,
"footer": f"Sent by Fedora Notifications: {settings.public_url}/rules/{rule_id}",
}
elif self.protocol == "irc":
return {"to": self.address, "message": f"{app_name}{message.summary} {url}"}
elif self.protocol == "matrix":
return {"to": self.address, "message": f"{app_name}{message.summary} {url}"}
else:
raise ValueError(f"Unknown destination protocol: {self.protocol}")
[docs]
def get_sent_datetime(message: "Message") -> datetime:
sent_at = message._headers.get("sent-at", None)
if sent_at:
# fromisoformat doesn't parse Z suffix (yet) see:
# https://discuss.python.org/t/parse-z-timezone-suffix-in-datetime/2220
try:
return datetime.fromisoformat(sent_at.replace("Z", "+00:00"))
except ValueError:
log.exception("Failed to parse sent-at timestamp value")
# Default to now
return datetime.now(tz=timezone.utc)