Source code for fmn.core.amqp
# SPDX-FileCopyrightText: Contributors to the Fedora Project
#
# SPDX-License-Identifier: MIT
import ssl
from aio_pika.abc import SSLOptions
from aio_pika.connection import URL
[docs]def get_url_from_config(config: dict):
url = URL(config["amqp_url"])
if "tls" in config:
url = url.update_query(auth="EXTERNAL")
url = url.update_query(
SSLOptions(
cafile=config["tls"]["ca_cert"],
certfile=config["tls"]["certfile"],
keyfile=config["tls"]["keyfile"],
no_verify_ssl=ssl.CERT_REQUIRED,
)
)
return url