diff --git a/.github/workflows/catalog-update.yml b/.github/workflows/catalog-update.yml index ed7395e9f..b90deba60 100644 --- a/.github/workflows/catalog-update.yml +++ b/.github/workflows/catalog-update.yml @@ -134,6 +134,8 @@ jobs: ENVIRONMENT: ${{ vars.DEV_MOBILITY_FEEDS_ENVIRONMENT }} # dev uses the QA sql instance DB_ENVIRONMENT: ${{ vars.QA_MOBILITY_FEEDS_ENVIRONMENT }} + # User DB lives on the shared QA instance but uses the DEV-suffixed name (MobilityDatabaseUsersDEV). + USER_DB_ENVIRONMENT: ${{ vars.DEV_MOBILITY_FEEDS_ENVIRONMENT }} # With repository_dispatch, DRY_RUN is always false. # With workflow_dispatch we take the specified values of DRY_RUN. DRY_RUN: ${{ (github.event_name != 'repository_dispatch' && inputs.DRY_RUN) }} diff --git a/.github/workflows/db-update-content.yml b/.github/workflows/db-update-content.yml index 69e1be340..3eb058ec6 100644 --- a/.github/workflows/db-update-content.yml +++ b/.github/workflows/db-update-content.yml @@ -41,6 +41,15 @@ on: description: GCP ENVIRONMENT where DB is deployed. required: true type: string + USER_DB_ENVIRONMENT: + description: >- + Logical environment for the users DB (issue #1683). Determines the user DB name + suffix (MobilityDatabaseUsers) used to build USERS_DATABASE_URL so the + populate scripts can emit notification events. Defaults to DB_ENVIRONMENT when empty. + For DEV (which shares QA infra), set this to 'dev' while DB_ENVIRONMENT stays 'qa'. + required: false + default: '' + type: string REGION: description: GCP region required: true @@ -88,15 +97,24 @@ jobs: - name: Update .env file run: | + # Resolve the users DB name (MobilityDatabaseUsers) the same way the + # schema workflow does, so populate scripts can emit notification events + # into the users DB. Defaults to DB_ENVIRONMENT when USER_DB_ENVIRONMENT is empty. + USER_ENV="${{ inputs.USER_DB_ENVIRONMENT }}" + if [ -z "${USER_ENV}" ]; then + USER_ENV="${{ inputs.DB_ENVIRONMENT }}" + fi + ENV_UPPER=$(echo "${USER_ENV}" | tr '[:lower:]' '[:upper:]') + USER_DB_NAME="MobilityDatabaseUsers${ENV_UPPER}" echo "PGUSER=${{ secrets.DB_USER_NAME }}" > config/.env.local echo "POSTGRES_USER=${{ secrets.DB_USER_NAME }}" >> config/.env.local echo "POSTGRES_PASSWORD=${{ secrets.DB_USER_PASSWORD }}" >> config/.env.local echo "POSTGRES_DB=${{ inputs.DB_NAME }}" >> config/.env.local echo "FEEDS_DATABASE_URL=postgresql://${{ secrets.DB_USER_NAME }}:${{ secrets.DB_USER_PASSWORD }}@localhost:5432/${{ inputs.DB_NAME }}" >> config/.env.local + echo "USERS_DATABASE_URL=postgresql://${{ secrets.DB_USER_NAME }}:${{ secrets.DB_USER_PASSWORD }}@localhost:5432/${USER_DB_NAME}" >> config/.env.local echo "POSTGRES_PORT=5432" >> config/.env.local echo "POSTGRES_HOST=localhost" >> config/.env.local echo "ENVIRONMENT=${{ inputs.ENVIRONMENT }}" >> config/.env.local - cat config/.env.local - name: Load secrets from 1Password uses: 1password/load-secrets-action@v2.0.0 diff --git a/api/src/scripts/populate_db_gbfs.py b/api/src/scripts/populate_db_gbfs.py index 9edfa22c1..ae61cd95f 100644 --- a/api/src/scripts/populate_db_gbfs.py +++ b/api/src/scripts/populate_db_gbfs.py @@ -17,6 +17,7 @@ from shared.common.license_utils import assign_license_by_url from shared.database.database import generate_unique_id, configure_polymorphic_mappers from shared.database_gen.sqlacodegen_models import Gbfsfeed, Location, Externalid +from shared.notifications.notification_event_service import emit_url_replaced, urls_differ GBFS_PUBSUB_TOPIC_NAME = "validate-gbfs-feed" @@ -108,9 +109,18 @@ def populate_db(self, session, fetch_url=True): gbfs_feed.operator = row["Name"] gbfs_feed.provider = row["Name"] gbfs_feed.operator_url = row["URL"] - gbfs_feed.producer_url = row["Auto-Discovery URL"] - gbfs_feed.auto_discovery_url = row["Auto-Discovery URL"] + old_producer_url = gbfs_feed.producer_url + new_producer_url = row["Auto-Discovery URL"] + gbfs_feed.producer_url = new_producer_url + gbfs_feed.auto_discovery_url = new_producer_url gbfs_feed.updated_at = datetime.now(pytz.utc) + if not is_new_feed and old_producer_url and urls_differ(old_producer_url, new_producer_url): + emit_url_replaced( + feed_stable_id=stable_id, + old_url=old_producer_url, + new_url=new_producer_url, + source="populate_db_gbfs", + ) if not gbfs_feed.locations: # If locations are empty, create a new location (no overwrite) country_code = self.get_safe_value(row, "Country Code", "") diff --git a/api/src/scripts/populate_db_gtfs.py b/api/src/scripts/populate_db_gtfs.py index 528aa7dbc..99f0edf44 100644 --- a/api/src/scripts/populate_db_gtfs.py +++ b/api/src/scripts/populate_db_gtfs.py @@ -16,6 +16,11 @@ Location, Redirectingid, ) +from shared.notifications.notification_event_service import ( + emit_feed_redirected, + emit_url_replaced, + urls_differ, +) from utils.data_utils import set_up_defaults if TYPE_CHECKING: @@ -200,6 +205,14 @@ def process_redirects(self, session: "Session"): ) # Flush to avoid FK violation session.flush() + emit_feed_redirected( + source_stable_id=stable_id, + target_stable_id=target_stable_id, + old_url=getattr(feed, "producer_url", None), + new_url=getattr(target_feed, "producer_url", None), + source="populate_db_gtfs", + extra_data={"redirect_comment": comment} if comment else None, + ) def populate_db(self, session: "Session", fetch_url: bool = True): """ @@ -252,7 +265,15 @@ def populate_db(self, session: "Session", fetch_url: bool = True): feed.note = self.get_safe_value(row, "note", "") producer_url = self.get_safe_value(row, "urls.direct_download", "") if "transitfeeds" not in producer_url: # Avoid setting transitfeeds as producer_url + old_producer_url = feed.producer_url feed.producer_url = producer_url + if not is_new_feed and old_producer_url and urls_differ(old_producer_url, producer_url): + emit_url_replaced( + feed_stable_id=stable_id, + old_url=old_producer_url, + new_url=producer_url, + source="populate_db_gtfs", + ) feed.authentication_type = str(int(float(self.get_safe_value(row, "urls.authentication_type", "0")))) feed.authentication_info_url = self.get_safe_value(row, "urls.authentication_info", "") feed.api_key_parameter_name = self.get_safe_value(row, "urls.api_key_parameter_name", "") diff --git a/api/src/shared/common/rate_limiter.py b/api/src/shared/common/rate_limiter.py new file mode 100644 index 000000000..e313cb044 --- /dev/null +++ b/api/src/shared/common/rate_limiter.py @@ -0,0 +1,152 @@ +# +# MobilityData 2026 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Generic, reusable client-side rate limiting. + +Provides a thread-safe token-bucket :class:`RateLimiter` and a small named +registry (:func:`get_rate_limiter`) so any outbound API caller can share a +single process-wide bucket keyed by a logical name (e.g. ``"brevo"``, +``"tdg"``). The algorithm is API-agnostic; callers only choose a name and rate. + +.. note:: + Scope is **per process**. Each Cloud Function instance / worker process keeps + its own bucket, so the effective aggregate rate against an external API is + ``configured_rate * number_of_concurrent_instances``. Size per-process rates + accordingly (or run a single-instance/serialized caller) when an external + provider enforces a hard global limit. + +Example:: + + limiter = get_rate_limiter("tdg", rate=10) # 10 requests/second + limiter.acquire() # blocks if necessary + response = requests.get(url) +""" + +from __future__ import annotations + +import threading +import time +from typing import Callable, Dict, Optional + + +class RateLimiter: + """Thread-safe token-bucket rate limiter. + + Tokens refill continuously at ``rate`` tokens per second up to ``capacity`` + (the maximum burst). :meth:`acquire` blocks just long enough to keep the + effective call rate at or below ``rate``. + + ``clock`` and ``sleep`` are injectable so the limiter can be unit-tested + deterministically without real time passing. + """ + + def __init__( + self, + rate: float, + capacity: Optional[float] = None, + clock: Callable[[], float] = time.monotonic, + sleep: Callable[[float], None] = time.sleep, + ) -> None: + if rate <= 0: + raise ValueError("rate must be greater than 0") + if capacity is not None and capacity <= 0: + raise ValueError("capacity must be greater than 0") + self._rate = float(rate) + self._capacity = float(capacity if capacity is not None else rate) + self._clock = clock + self._sleep = sleep + self._tokens = self._capacity + self._timestamp = clock() + self._lock = threading.Lock() + + @property + def rate(self) -> float: + return self._rate + + @property + def capacity(self) -> float: + return self._capacity + + def _refill(self) -> None: + now = self._clock() + elapsed = now - self._timestamp + if elapsed > 0: + self._tokens = min(self._capacity, self._tokens + elapsed * self._rate) + self._timestamp = now + + def acquire(self, n: float = 1) -> float: + """Consume ``n`` tokens, blocking until they are available. + + Returns the number of seconds spent waiting (``0`` when tokens were + immediately available). + + Tokens are *reserved* atomically under the lock (the bucket is allowed + to go negative), and the wait that corresponds to a reservation is slept + **outside** the lock. This keeps the shared bucket consistent while still + allowing concurrent callers to make progress instead of being serialized + behind one another's sleep. + """ + if n <= 0: + return 0.0 + with self._lock: + self._refill() + # Reserve the tokens now; a negative balance represents tokens that + # future refill will repay, and determines how long this caller waits. + self._tokens -= n + waited = 0.0 if self._tokens >= 0 else (-self._tokens) / self._rate + if waited > 0: + self._sleep(waited) + return waited + + def __enter__(self) -> "RateLimiter": + self.acquire() + return self + + def __exit__(self, exc_type, exc, tb) -> None: + return None + + +_registry: Dict[str, RateLimiter] = {} +_registry_lock = threading.Lock() + + +def get_rate_limiter( + name: str, + rate: float, + capacity: Optional[float] = None, +) -> RateLimiter: + """Return a process-wide :class:`RateLimiter` shared under ``name``. + + The first caller for a given ``name`` configures the limiter; subsequent + calls return the same instance and ignore their ``rate``/``capacity`` + arguments. Use :func:`reset_rate_limiter` in tests to reconfigure. + """ + limiter = _registry.get(name) + if limiter is None: + with _registry_lock: + limiter = _registry.get(name) + if limiter is None: + limiter = RateLimiter(rate, capacity=capacity) + _registry[name] = limiter + return limiter + + +def reset_rate_limiter(name: Optional[str] = None) -> None: + """Drop the cached limiter for ``name`` (or all when ``name`` is None).""" + with _registry_lock: + if name is None: + _registry.clear() + else: + _registry.pop(name, None) diff --git a/api/src/shared/notifications/__init__.py b/api/src/shared/notifications/__init__.py new file mode 100644 index 000000000..09db850ee --- /dev/null +++ b/api/src/shared/notifications/__init__.py @@ -0,0 +1,6 @@ +"""Shared notification utilities. + +Packages exported from here: + notification_event_service — emit_feed_redirected / emit_url_replaced + brevo_notification_sender — send_single / send_digest +""" diff --git a/api/src/shared/notifications/brevo_notification_sender.py b/api/src/shared/notifications/brevo_notification_sender.py new file mode 100644 index 000000000..7944a992b --- /dev/null +++ b/api/src/shared/notifications/brevo_notification_sender.py @@ -0,0 +1,508 @@ +# +# MobilityData 2026 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""brevo_notification_sender — transactional email delivery via Brevo. + +Brevo (formerly Sendinblue) is already used by this project for contact +management (``shared.common.brevo``). This module extends that integration +to **transactional email** using ``sib_api_v3_sdk.TransactionalEmailsApi``. + +Environment variables +--------------------- +BREVO_API_KEY + Brevo API key (required). +BREVO_SENDER_EMAIL + From-address for outgoing emails (default: ``noreply@mobilitydatabase.org``). +BREVO_SENDER_NAME + From-name (default: ``Mobility Database``). +BREVO_TEMPLATE_FEED_URL_UPDATED + Integer Brevo template ID for ``feed.url_updated`` single-event emails. + When not set, an inline HTML fallback is used. +BREVO_TEMPLATE_FEED_URL_UPDATED_DIGEST + Integer Brevo template ID for ``feed.url_updated`` digest emails. + When not set, an inline HTML fallback is used. +BREVO_TEMPLATE_ADMIN_EVENT_SUMMARY + Integer Brevo template ID for ``admin.event_summary`` emails. + When not set, an inline HTML fallback is used. +BREVO_MAX_RPS + Maximum Brevo API requests per second (default: ``900``). Stays below + Brevo's hard limit of 1000 rps. Enforced by a shared token-bucket limiter. + +Design +------ +* ``send_single`` sends one email for one notification_event. +* ``send_digest`` sends one email batching multiple notification_events. +* Both raise ``BrevoSendError`` on failure so the caller can update + ``notification_log.status`` and ``retry_count`` accordingly. +* Template params are passed as ``params`` to the Brevo API; Brevo renders + them via its template engine. When no template ID is configured, a minimal + HTML fallback is built inline. +""" + +from __future__ import annotations + +import logging +import os +from dataclasses import dataclass +from html import escape as _html_escape +from typing import Any, Dict, List, Optional + +from shared.common.rate_limiter import RateLimiter, get_rate_limiter +from shared.notifications.notification_constants import ( + NotificationFeedRole, + NotificationTypeId, +) +from shared.users_database_gen.sqlacodegen_models import NotificationEvent + +logger = logging.getLogger(__name__) + +_DEFAULT_SENDER_EMAIL = "noreply@mobilitydatabase.org" +_DEFAULT_SENDER_NAME = "Mobility Database" + +# Brevo's transactional email API allows up to 1000 requests/second. Default +# below that to leave headroom for clock skew and other API consumers. +BREVO_MAX_RPS_ENV = "BREVO_MAX_RPS" +DEFAULT_BREVO_MAX_RPS = 900.0 +_BREVO_RATE_LIMITER_NAME = "brevo" + + +def _configured_brevo_rps() -> float: + raw = os.getenv(BREVO_MAX_RPS_ENV) + if not raw: + return DEFAULT_BREVO_MAX_RPS + try: + value = float(raw) + except ValueError: + logger.warning( + "Invalid %s=%r; falling back to %.0f rps", + BREVO_MAX_RPS_ENV, + raw, + DEFAULT_BREVO_MAX_RPS, + ) + return DEFAULT_BREVO_MAX_RPS + if value <= 0: + logger.warning( + "%s must be > 0 (got %r); falling back to %.0f rps", + BREVO_MAX_RPS_ENV, + raw, + DEFAULT_BREVO_MAX_RPS, + ) + return DEFAULT_BREVO_MAX_RPS + return value + + +def get_brevo_rate_limiter() -> RateLimiter: + """Return the process-wide Brevo rate limiter (token bucket). + + Configured from ``BREVO_MAX_RPS`` (default :data:`DEFAULT_BREVO_MAX_RPS`). + Callers should ``acquire()`` before each Brevo API request. + """ + return get_rate_limiter(_BREVO_RATE_LIMITER_NAME, _configured_brevo_rps()) + + +_DIGEST_EMAIL_SUBJECT_DICTIONARY = { + NotificationTypeId.FEED_URL_UPDATED: "[Mobility Database] %s feed URL update%s", + NotificationTypeId.ADMIN_EVENT_SUMMARY: "[Mobility Database] Daily notification dispatch summary", +} + +_SINGLE_EMAIL_SUBJECT_DICTIONARY = { + NotificationTypeId.FEED_URL_UPDATED: "[Mobility Database] Feed %s has been updated", + NotificationTypeId.ADMIN_EVENT_SUMMARY: "[Mobility Database] Daily notification dispatch summary", +} + + +class BrevoSendError(Exception): + """Raised when a Brevo API call fails. Callers catch this to record failure.""" + + +@dataclass +class EmailRecipient: + email: str + name: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + d: Dict[str, Any] = {"email": self.email} + if self.name: + d["name"] = self.name + return d + + +def get_template_id_by_notification( + notification_type_id: str, + *, + digest: bool = False, +) -> Optional[int]: + match notification_type_id: + case NotificationTypeId.FEED_URL_UPDATED: + if digest: + return _int_env("BREVO_TEMPLATE_FEED_URL_UPDATED_DIGEST") + return _int_env("BREVO_TEMPLATE_FEED_URL_UPDATED") + case NotificationTypeId.ADMIN_EVENT_SUMMARY: + return _int_env("BREVO_TEMPLATE_ADMIN_EVENT_SUMMARY") + case _: + return None + + +# --------------------------------------------------------------------------- +# Event accessors — read feeds (from notification_event_feed) and payload +# --------------------------------------------------------------------------- + + +def _feeds_with_role(event, role: str) -> List[str]: + """Return the stable_ids of feeds attached to ``event`` with the given role.""" + return [f.feed_stable_id for f in (getattr(event, "notification_event_feeds", None) or []) if f.role == role] + + +def subject_feed(event) -> Optional[str]: + """First feed in the 'subject' role, or None.""" + feeds = _feeds_with_role(event, NotificationFeedRole.SUBJECT) + return feeds[0] if feeds else None + + +def target_feed(event) -> Optional[str]: + """First feed in the 'target' role, or None.""" + feeds = _feeds_with_role(event, NotificationFeedRole.TARGET) + return feeds[0] if feeds else None + + +def event_payload(event) -> Dict[str, Any]: + """Type-specific payload dict for ``event`` (never None).""" + return event.payload or {} + + +# --------------------------------------------------------------------------- +# Email content builders (plain-HTML fallback when no template is configured) +# --------------------------------------------------------------------------- + + +def build_single_subject(event) -> str: + template = _SINGLE_EMAIL_SUBJECT_DICTIONARY.get(event.notification_type_id) + if template is None: + return f"[Mobility Database] Notification for {event.notification_type_id}" + + if "%s" in template: + return template % (subject_feed(event) or "unknown") + return template + + +def build_digest_subject(events: List) -> str: + count = len(events) + type_id = events[0].notification_type_id if events else "notification" + template = _DIGEST_EMAIL_SUBJECT_DICTIONARY.get(type_id) + if template is None: + return f"[Mobility Database] {count} notification{'s' if count != 1 else ''}" + + placeholder_count = template.count("%s") + if placeholder_count == 2: + return template % (count, "s" if count != 1 else "") + if placeholder_count == 1: + return template % count + return template + + +def build_params_feed_url_updated(events: List, subscription): + return { + "event_count": len(events), + "subscription_id": subscription.id, + "events": [ + { + "feed_stable_id": subject_feed(e), + "target_feed_stable_id": target_feed(e), + "event_subtype": e.event_subtype, + "old_url": event_payload(e).get("old_url") or "", + "new_url": event_payload(e).get("new_url") or "", + "source": e.source or "", + "created_at": e.created_at.isoformat() if e.created_at else "", + "payload": event_payload(e), + } + for e in events + ], + } + + +def build_params_admin_event_summary(events: List, subscription): + summary_event = events[0] if events else None + return { + "event_count": len(events), + "subscription_id": subscription.id, + "summary": event_payload(summary_event) if summary_event else {}, + } + + +def build_params_by_notification( + notification_type_id: str, events: List[NotificationEvent], subscription +) -> Dict[str, Any]: + match notification_type_id: + case NotificationTypeId.FEED_URL_UPDATED: + return build_params_feed_url_updated(events, subscription) + case NotificationTypeId.ADMIN_EVENT_SUMMARY: + return build_params_admin_event_summary(events, subscription) + case _: + raise ValueError(f"Unsupported notification type for Brevo params: {notification_type_id}") + + +def build_single_html(event) -> str: + payload = event_payload(event) + if event.notification_type_id == NotificationTypeId.ADMIN_EVENT_SUMMARY: + return build_admin_summary_html(event) + if event.event_subtype == "feed_redirected": + return ( + f"

Feed {_esc(subject_feed(event))} has been deprecated " + f"and now redirects to {_esc(target_feed(event))}.

" + f"

New URL: {_link(payload.get('new_url'))}

" + ) + return ( + f"

The URL for feed {_esc(subject_feed(event))} has changed.

" + f"

Old URL: {_esc(payload.get('old_url'))}

" + f"

New URL: {_link(payload.get('new_url'))}

" + ) + + +def build_admin_summary_html(event) -> str: + """Render the dispatch-statistics summary for an ``admin.event_summary`` event.""" + p = event_payload(event) + + def _row(label: str, key: str) -> str: + return f"{_esc(label)}{_esc(p.get(key, 0))}" + + rows = "".join( + [ + _row("Subscriptions processed", "subscriptions_processed"), + _row("Events found", "events_found"), + _row("Emails sent", "emails_sent"), + _row("Emails failed", "emails_failed"), + _row("Permanently failed", "permanently_failed"), + _row("Skipped (max retries)", "skipped_max_retries"), + ] + ) + return ( + "

Notification Dispatch Summary

" + f"

Cadence: {_esc(p.get('cadence', '-'))}

" + "" + "" + f"{rows}
MetricCount
" + ) + + +def build_digest_html(events: List) -> str: + if not events: + return "

No feed URL changes in this period.

" + + if events[0].notification_type_id == NotificationTypeId.ADMIN_EVENT_SUMMARY: + # One run summary per event (most digests contain a single summary). + return "".join(build_admin_summary_html(e) for e in events) + + rows = "".join( + f"{_esc(subject_feed(e))}{_esc(e.event_subtype)}" + f"{_esc(event_payload(e).get('old_url') or '-')}" + f"{_esc(event_payload(e).get('new_url') or '-')}" + f"{_esc(e.source or '-')}" + for e in events + ) + return ( + "

Feed URL Updates

" + "" + "" + f"{rows}
FeedTypeOld URLNew URLSource
" + ) + + +def _esc(value: Any) -> str: + """HTML-escape an arbitrary value for safe inline interpolation. + + Feed stable_ids, URLs and ``source`` originate from provider-supplied data, + so every value rendered into the fallback HTML emails must be escaped to + prevent markup/attribute injection. + """ + if value is None: + return "" + return _html_escape(str(value), quote=True) + + +def _link(url: Optional[str]) -> str: + """Render a safe ```` for a URL, escaping it and only linking http(s). + + Non-http(s) (or empty) values are rendered as escaped text only, so a + crafted ``javascript:`` / malformed value cannot become a live link or + break out of the ``href`` attribute. + """ + if not url: + return "" + safe = _esc(url) + if str(url).strip().lower().startswith(("http://", "https://")): + return f'{safe}' + return safe + + +def send_single( + recipient: EmailRecipient, + notification_event: NotificationEvent, # NotificationEvent ORM object + subscription, # NotificationSubscription ORM object +) -> None: + """Send a single-event notification email. + + Parameters + ---------- + recipient: + Destination email address and name. + notification_event: + The ``NotificationEvent`` ORM instance to notify about. + subscription: + The ``NotificationSubscription`` ORM instance (used for unsubscribe context). + + Raises + ------ + BrevoSendError + When the Brevo API returns an error. + """ + template_id = get_template_id_by_notification( + notification_event.notification_type_id, + digest=False, + ) + params = build_params_by_notification( + notification_event.notification_type_id, + [notification_event], + subscription, + ) + subject = build_single_subject(notification_event) + # This is case the HTML fallback is used, so we don't need to pass html_content + html = build_single_html(notification_event) if template_id is None else None + + _send( + recipient=recipient, + subject=subject, + html_content=html, + template_id=template_id, + params=params, + ) + + +def send_digest( + recipient: EmailRecipient, + notification_events: List, # List[NotificationEvent] + subscription, # NotificationSubscription ORM object +) -> None: + """Send a digest email batching multiple notification events. + + Parameters + ---------- + recipient: + Destination email address and name. + notification_events: + The ``NotificationEvent`` instances to include in the digest. + subscription: + The ``NotificationSubscription`` ORM instance. + + Raises + ------ + BrevoSendError + When the Brevo API returns an error. + """ + if not notification_events: + logger.debug("send_digest called with empty event list; skipping") + return + + notification_type_id = notification_events[0].notification_type_id + + template_id = get_template_id_by_notification(notification_type_id, digest=True) + if template_id is None: + logger.info( + "No Brevo template configured for notification type %s; using HTML fallback", + notification_type_id, + ) + + params = build_params_by_notification( + notification_type_id, + notification_events, + subscription, + ) + subject = build_digest_subject(notification_events) + html = build_digest_html(notification_events) if template_id is None else None + + _send( + recipient=recipient, + subject=subject, + html_content=html, + template_id=template_id, + params=params, + ) + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +def _send( + recipient: EmailRecipient, + subject: str, + html_content: Optional[str], + template_id: Optional[int], + params: Dict[str, Any], +) -> None: + """Low-level send via Brevo TransactionalEmailsApi. + + Raises BrevoSendError on any failure. + """ + try: + import sib_api_v3_sdk + from sib_api_v3_sdk.rest import ApiException + except ImportError as exc: + raise BrevoSendError(f"sib_api_v3_sdk is not installed: {exc}") from exc + + api_key = os.getenv("BREVO_API_KEY") + if not api_key: + raise BrevoSendError("BREVO_API_KEY environment variable is not set") + + configuration = sib_api_v3_sdk.Configuration() + configuration.api_key = {"api-key": api_key} + + client = sib_api_v3_sdk.ApiClient(configuration) + api = sib_api_v3_sdk.TransactionalEmailsApi(client) + + sender_email = os.getenv("BREVO_SENDER_EMAIL", _DEFAULT_SENDER_EMAIL) + sender_name = os.getenv("BREVO_SENDER_NAME", _DEFAULT_SENDER_NAME) + + send_email = sib_api_v3_sdk.SendSmtpEmail( + to=[recipient.to_dict()], + sender={"email": sender_email, "name": sender_name}, + subject=subject if template_id is None else None, + html_content=html_content, + template_id=template_id, + params=params if template_id is not None else None, + ) + + try: + result = api.send_transac_email(send_email) + logger.info( + "Brevo email sent (message_id=%s)", + getattr(result, "message_id", "n/a"), + ) + except ApiException as exc: + raise BrevoSendError(f"Brevo API error {exc.status}: {exc.reason}") from exc + except Exception as exc: + raise BrevoSendError(f"Unexpected error sending email: {exc}") from exc + + +def _int_env(var: str) -> Optional[int]: + """Return an env var as int, or None if not set / not parseable.""" + val = os.getenv(var) + if val is None: + return None + try: + return int(val) + except ValueError: + logger.warning("Environment variable %s=%r is not a valid integer; ignoring", var, val) + return None diff --git a/api/src/shared/notifications/notification_constants.py b/api/src/shared/notifications/notification_constants.py new file mode 100644 index 000000000..4c642e22a --- /dev/null +++ b/api/src/shared/notifications/notification_constants.py @@ -0,0 +1,92 @@ +# +# MobilityData 2026 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Notification system string constants. + +These classes act as namespaced string constants for values stored in the +``notification_type.id``, ``notification_event.event_subtype``, +``notification_subscription.cadence``, ``notification_log.status``, +``notification_event.source``, and ``notification_event_feed.role`` columns. + +Usage +----- + from shared.notifications.notification_constants import ( + NotificationTypeId, + FeedUrlUpdateType, + AdminEventUpdateType, + NotificationCadence, + NotificationLogStatus, + NotificationSource, + NotificationFeedRole, + ) +""" + + +class NotificationTypeId: + """Primary keys of rows in the ``notification_type`` table.""" + + FEED_URL_UPDATED = "feed.url_updated" + ADMIN_EVENT_SUMMARY = "admin.event_summary" + # Delivered via Brevo contact lists/campaigns (managed at opt-in time), NOT by + # the notification dispatcher. The batch planner must skip these subscriptions. + API_ANNOUNCEMENTS = "api.announcements" + + +class FeedUrlUpdateType: + """Allowed values for ``notification_event.event_subtype`` when + ``notification_type_id == NotificationTypeId.FEED_URL_UPDATED``.""" + + URL_REPLACED = "url_replaced" + FEED_REDIRECTED = "feed_redirected" + + +class AdminEventUpdateType: + """Allowed values for ``notification_event.event_subtype`` when + ``notification_type_id == NotificationTypeId.ADMIN_EVENT_SUMMARY``.""" + + DISPATCH_SUMMARY = "dispatch_summary" + + +class NotificationFeedRole: + """Allowed values for ``notification_event_feed.role`` — the role a feed + plays within a notification event.""" + + SUBJECT = "subject" # the feed the event is primarily about + TARGET = "target" # the destination feed (e.g. redirect target) + + +class NotificationCadence: + """Allowed values for ``notification_subscription.cadence``.""" + + IMMEDIATE = "immediate" + DAILY = "daily" + WEEKLY = "weekly" + + +class NotificationLogStatus: + """Allowed values for ``notification_log.status``.""" + + PENDING = "pending" + SENT = "sent" + FAILED = "failed" + PERMANENTLY_FAILED = "permanently_failed" + + +class NotificationSource: + """Human-readable tags for ``notification_event.source``.""" + + DISPATCHER = "dispatcher" + TDG_REDIRECTS = "tdg_redirects" + TDG_IMPORT = "tdg_import" diff --git a/api/src/shared/notifications/notification_event_service.py b/api/src/shared/notifications/notification_event_service.py new file mode 100644 index 000000000..1c3e5d625 --- /dev/null +++ b/api/src/shared/notifications/notification_event_service.py @@ -0,0 +1,301 @@ +# +# MobilityData 2026 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""notification_event_service — best-effort writers for notification_event rows. + +Design principles +----------------- +* Each public function writes a single notification_event row to the users DB. +* All functions are **fire-and-forget**: errors are logged and swallowed so that + the calling feed-change code is never blocked or rolled back. +* If ``USERS_DATABASE_URL`` is not configured (e.g. in the populate_db CI scripts + that only have access to the feeds DB), the call is a no-op with a warning. + +Usage +----- + from shared.notifications.notification_event_service import ( + emit_feed_redirected, + emit_url_replaced, + ) + + # After creating a Redirectingid row: + emit_feed_redirected( + source_stable_id="mdb-1", + target_stable_id="tdg-42", + old_url="https://old.example.com/feed.zip", + new_url="https://new.example.com/feed.zip", + source="tdg_redirects", + ) + + # After detecting a producer_url change: + emit_url_replaced( + feed_stable_id="mdb-7", + old_url="https://old.example.com/feed.zip", + new_url="https://new.example.com/feed.zip", + source="tdg_import", + ) +""" + +from __future__ import annotations + +import logging +import os +import uuid +from typing import Any, Dict, List, Optional, Tuple + +from dotenv import load_dotenv + +from shared.database.users_database import with_users_db_session +from shared.notifications.notification_constants import ( + FeedUrlUpdateType, + NotificationFeedRole, + NotificationTypeId, +) + +logger = logging.getLogger(__name__) + + +def normalize_url_for_strict_compare(url: Optional[str]) -> str: + """Normalize a producer URL for change detection. + + Comparisons should ignore case and leading/trailing whitespace so that + cosmetic differences (e.g. ``" HTTPS://Example.com "`` vs + ``"http://example.com"``) do not trigger a notification event. + This function is different from the previously implemented normalize_url + in that it accounts for protocol changes and www. prefix. + """ + if url is None: + return "" + return url.strip().casefold() + + +def urls_differ(old_url: Optional[str], new_url: Optional[str]) -> bool: + """Return True if two URLs differ after normalization (case/whitespace-insensitive).""" + return normalize_url_for_strict_compare(old_url) != normalize_url_for_strict_compare(new_url) + + +def emit_feed_redirected( + source_stable_id: str, + target_stable_id: str, + old_url: Optional[str], + new_url: Optional[str], + source: str, + extra_data: Optional[Dict[str, Any]] = None, +) -> None: + """Create a ``feed.url_updated / feed_redirected`` notification_event. + + Called when a new ``Redirectingid`` row is created — meaning a feed has been + deprecated and now points users to a different feed. + + Parameters + ---------- + source_stable_id: + stable_id of the feed that is being deprecated (the source of the redirect). + target_stable_id: + stable_id of the feed that subscribers should follow instead. + old_url: + producer_url of the source feed at deprecation time (may be None). + new_url: + producer_url of the target feed (may be None). + source: + Human-readable tag identifying the process that triggered this + (e.g. ``NotificationSource.TDG_REDIRECTS``). + extra_data: + Optional extra free-form JSON merged into the event payload + (e.g. redirect_comment). + + The event is written immediately and best-effort (see :func:`_emit`). It is + fire-and-forget: if the surrounding feed-change transaction is later rolled + back, the event row remains. This is an accepted trade-off — duplicate or + rare spurious events are bounded by the dispatcher, which commits its + delivery log after every send. + """ + payload: Dict[str, Any] = {"old_url": old_url, "new_url": new_url} + if extra_data: + payload.update(extra_data) + _emit( + notification_type_id=NotificationTypeId.FEED_URL_UPDATED, + event_subtype=FeedUrlUpdateType.FEED_REDIRECTED, + source=source, + feeds=[ + (source_stable_id, NotificationFeedRole.SUBJECT), + (target_stable_id, NotificationFeedRole.TARGET), + ], + payload=payload, + ) + + +def emit_url_replaced( + feed_stable_id: str, + old_url: str, + new_url: str, + source: str, + extra_data: Optional[Dict[str, Any]] = None, +) -> None: + """Create a ``feed.url_updated / url_replaced`` notification_event. + + Called when automation changes ``Feed.producer_url`` **in-place** — the feed + keeps the same ``stable_id`` but its source URL has changed. + + Only emit when the URLs differ after normalization (case- and + surrounding-whitespace-insensitive); identical URLs are skipped. + + Parameters + ---------- + feed_stable_id: + stable_id of the feed whose URL changed. + old_url: + The previous producer_url value. + new_url: + The new producer_url value. + source: + Human-readable tag identifying the process (e.g. ``NotificationSource.TDG_IMPORT``). + extra_data: + Optional extra free-form JSON merged into the event payload. + + The event is written immediately and best-effort (see :func:`_emit`). It is + fire-and-forget: if the surrounding feed-change transaction is later rolled + back, the event row remains. This is an accepted trade-off — duplicate or + rare spurious events are bounded by the dispatcher, which commits its + delivery log after every send. + """ + if not urls_differ(old_url, new_url): + logger.debug( + "Skipping url_replaced event for %s: URLs are equivalent after normalization", + feed_stable_id, + ) + return + payload: Dict[str, Any] = {"old_url": old_url, "new_url": new_url} + if extra_data: + payload.update(extra_data) + _emit( + notification_type_id=NotificationTypeId.FEED_URL_UPDATED, + event_subtype=FeedUrlUpdateType.URL_REPLACED, + source=source, + feeds=[(feed_stable_id, NotificationFeedRole.SUBJECT)], + payload=payload, + ) + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +def _emit( + notification_type_id: str, + event_subtype: str, + source: str, + feeds: Optional[List[Tuple[str, str]]] = None, + payload: Optional[Dict[str, Any]] = None, +) -> None: + """Write one notification_event row (plus its notification_event_feed rows) + to the users DB, immediately and best-effort. + + Parameters + ---------- + notification_type_id: + Row id in ``notification_type`` (e.g. ``feed.url_updated``). + event_subtype: + Discriminator within the type (e.g. ``url_replaced``). + source: + Tag identifying the emitting process. + feeds: + Optional list of ``(feed_stable_id, role)`` tuples relating this event to + one-or-more feeds. ``role`` is a ``NotificationFeedRole`` value. + payload: + Optional type-specific JSON payload. + + Fire-and-forget: any failure is logged and swallowed so the calling + feed-change code is never blocked or rolled back. If ``USERS_DATABASE_URL`` + is not configured (e.g. the populate_db CI scripts that only reach the feeds + DB), the call is a no-op with a warning. + """ + load_dotenv() + if not os.getenv("USERS_DATABASE_URL"): + primary_feed = feeds[0][0] if feeds else None + logger.warning( + "notification_event_service: USERS_DATABASE_URL not configured; " "skipping %s/%s for feed=%s", + notification_type_id, + event_subtype, + primary_feed, + ) + return + try: + _persist_event( + notification_type_id=notification_type_id, + event_subtype=event_subtype, + source=source, + feeds=feeds, + payload=payload, + ) + except Exception as exc: + primary_feed = feeds[0][0] if feeds else None + logger.exception( + "notification_event_service: failed to persist event " "type=%s subtype=%s feed=%s: %s", + notification_type_id, + event_subtype, + primary_feed, + exc, + ) + + +@with_users_db_session +def _persist_event( + notification_type_id: str, + event_subtype: str, + source: str, + feeds: Optional[List[Tuple[str, str]]] = None, + payload: Optional[Dict[str, Any]] = None, + db_session=None, +) -> None: + """Insert the notification_event (and its feed rows) on ``db_session``. + + Wrapped by :func:`with_users_db_session`, which opens a users-DB session and + commits when this function returns. Exceptions propagate to :func:`_emit`, + which logs and swallows them. + """ + from shared.users_database_gen.sqlacodegen_models import ( + NotificationEvent, + NotificationEventFeed, + ) + + event_id = str(uuid.uuid4()) + db_session.add( + NotificationEvent( + id=event_id, + notification_type_id=notification_type_id, + event_subtype=event_subtype, + source=source, + payload=payload, + ) + ) + for feed_stable_id, role in feeds or []: + db_session.add( + NotificationEventFeed( + id=str(uuid.uuid4()), + notification_event_id=event_id, + feed_stable_id=feed_stable_id, + role=role, + ) + ) + logger.info( + "notification_event created: type=%s subtype=%s feeds=%s source=%s id=%s", + notification_type_id, + event_subtype, + [f[0] for f in (feeds or [])], + source, + event_id, + ) diff --git a/api/tests/unittest/test_brevo_notification_sender.py b/api/tests/unittest/test_brevo_notification_sender.py new file mode 100644 index 000000000..6b7d3e278 --- /dev/null +++ b/api/tests/unittest/test_brevo_notification_sender.py @@ -0,0 +1,478 @@ +# +# MobilityData 2026 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Unit tests for the pure builders, accessors and ``_send`` of +``brevo_notification_sender`` — the bulk of which had no coverage.""" + +from datetime import datetime, timezone +from types import SimpleNamespace +from unittest.mock import MagicMock, patch + +import pytest + +import shared.notifications.brevo_notification_sender as bns +from shared.notifications.brevo_notification_sender import ( + BrevoSendError, + EmailRecipient, + build_digest_html, + build_digest_subject, + build_params_admin_event_summary, + build_params_by_notification, + build_params_feed_url_updated, + build_single_html, + build_single_subject, + event_payload, + get_template_id_by_notification, + send_digest, + send_single, + subject_feed, + target_feed, + _int_env, + _send, +) +from shared.notifications.notification_constants import ( + FeedUrlUpdateType, + NotificationFeedRole, + NotificationTypeId, +) + + +def _feed(stable_id, role): + return SimpleNamespace(feed_stable_id=stable_id, role=role) + + +def _event( + *, + type_id=NotificationTypeId.FEED_URL_UPDATED, + subtype=FeedUrlUpdateType.URL_REPLACED, + feeds=None, + payload=None, + source="unit_test", + created_at=None, +): + return SimpleNamespace( + notification_type_id=type_id, + event_subtype=subtype, + notification_event_feeds=feeds or [], + payload=payload, + source=source, + created_at=created_at, + ) + + +_SUBSCRIPTION = SimpleNamespace(id="sub-1") + + +# --------------------------------------------------------------------------- +# EmailRecipient +# --------------------------------------------------------------------------- + + +def test_email_recipient_to_dict_with_and_without_name(): + assert EmailRecipient(email="a@b.com").to_dict() == {"email": "a@b.com"} + assert EmailRecipient(email="a@b.com", name="A B").to_dict() == { + "email": "a@b.com", + "name": "A B", + } + + +# --------------------------------------------------------------------------- +# get_template_id_by_notification +# --------------------------------------------------------------------------- + + +def test_get_template_id_feed_url_updated_digest_and_single(monkeypatch): + monkeypatch.setenv("BREVO_TEMPLATE_FEED_URL_UPDATED", "11") + monkeypatch.setenv("BREVO_TEMPLATE_FEED_URL_UPDATED_DIGEST", "22") + assert get_template_id_by_notification(NotificationTypeId.FEED_URL_UPDATED) == 11 + assert get_template_id_by_notification(NotificationTypeId.FEED_URL_UPDATED, digest=True) == 22 + + +def test_get_template_id_admin_event_summary(monkeypatch): + monkeypatch.setenv("BREVO_TEMPLATE_ADMIN_EVENT_SUMMARY", "33") + assert get_template_id_by_notification(NotificationTypeId.ADMIN_EVENT_SUMMARY) == 33 + + +def test_get_template_id_unknown_type_returns_none(): + assert get_template_id_by_notification("some.unknown.type") is None + + +# --------------------------------------------------------------------------- +# Event accessors +# --------------------------------------------------------------------------- + + +def test_feed_accessors_and_payload(): + event = _event( + feeds=[ + _feed("mdb-1", NotificationFeedRole.SUBJECT), + _feed("mdb-2", NotificationFeedRole.TARGET), + ], + payload={"old_url": "x"}, + ) + assert subject_feed(event) == "mdb-1" + assert target_feed(event) == "mdb-2" + assert event_payload(event) == {"old_url": "x"} + + +def test_feed_accessors_when_missing(): + event = _event(feeds=[], payload=None) + assert subject_feed(event) is None + assert target_feed(event) is None + assert event_payload(event) == {} + + +# --------------------------------------------------------------------------- +# Subject builders +# --------------------------------------------------------------------------- + + +def test_build_single_subject_known_and_unknown(): + known = _event(feeds=[_feed("mdb-9", NotificationFeedRole.SUBJECT)]) + assert build_single_subject(known) == "[Mobility Database] Feed mdb-9 has been updated" + + unknown = _event(type_id="other.type") + assert "other.type" in build_single_subject(unknown) + + +def test_build_single_subject_uses_unknown_feed_placeholder(): + event = _event(feeds=[]) + assert "unknown" in build_single_subject(event) + + +def test_build_digest_subject_pluralization(): + two = [_event(), _event()] + assert build_digest_subject(two) == "[Mobility Database] 2 feed URL updates" + one = [_event()] + assert build_digest_subject(one) == "[Mobility Database] 1 feed URL update" + + +def test_build_digest_subject_unknown_type_fallback(): + events = [_event(type_id="other.type"), _event(type_id="other.type")] + assert build_digest_subject(events) == "[Mobility Database] 2 notifications" + single = [_event(type_id="other.type")] + assert build_digest_subject(single) == "[Mobility Database] 1 notification" + + +# --------------------------------------------------------------------------- +# Params builders +# --------------------------------------------------------------------------- + + +def test_build_params_feed_url_updated(): + created = datetime(2026, 1, 2, 3, 4, tzinfo=timezone.utc) + event = _event( + feeds=[ + _feed("mdb-1", NotificationFeedRole.SUBJECT), + _feed("mdb-2", NotificationFeedRole.TARGET), + ], + payload={"old_url": "old", "new_url": "new"}, + created_at=created, + ) + params = build_params_feed_url_updated([event], _SUBSCRIPTION) + assert params["event_count"] == 1 + assert params["subscription_id"] == "sub-1" + item = params["events"][0] + assert item["feed_stable_id"] == "mdb-1" + assert item["target_feed_stable_id"] == "mdb-2" + assert item["old_url"] == "old" + assert item["new_url"] == "new" + assert item["created_at"] == created.isoformat() + + +def test_build_params_feed_url_updated_handles_missing_created_at_and_urls(): + event = _event(feeds=[], payload={}, created_at=None, source=None) + item = build_params_feed_url_updated([event], _SUBSCRIPTION)["events"][0] + assert item["old_url"] == "" + assert item["new_url"] == "" + assert item["source"] == "" + assert item["created_at"] == "" + + +def test_build_params_admin_event_summary_with_and_without_events(): + event = _event( + type_id=NotificationTypeId.ADMIN_EVENT_SUMMARY, + payload={"emails_sent": 5}, + ) + params = build_params_admin_event_summary([event], _SUBSCRIPTION) + assert params["event_count"] == 1 + assert params["summary"] == {"emails_sent": 5} + + empty = build_params_admin_event_summary([], _SUBSCRIPTION) + assert empty["event_count"] == 0 + assert empty["summary"] == {} + + +def test_build_params_by_notification_dispatch_and_unsupported(): + feed_event = _event() + assert "events" in build_params_by_notification(NotificationTypeId.FEED_URL_UPDATED, [feed_event], _SUBSCRIPTION) + admin_event = _event(type_id=NotificationTypeId.ADMIN_EVENT_SUMMARY) + assert "summary" in build_params_by_notification( + NotificationTypeId.ADMIN_EVENT_SUMMARY, [admin_event], _SUBSCRIPTION + ) + with pytest.raises(ValueError): + build_params_by_notification("other.type", [feed_event], _SUBSCRIPTION) + + +# --------------------------------------------------------------------------- +# HTML builders +# --------------------------------------------------------------------------- + + +def test_build_single_html_redirected_and_replaced(): + redirected = _event( + subtype=FeedUrlUpdateType.FEED_REDIRECTED, + feeds=[ + _feed("mdb-1", NotificationFeedRole.SUBJECT), + _feed("mdb-2", NotificationFeedRole.TARGET), + ], + payload={"new_url": "https://new"}, + ) + html = build_single_html(redirected) + assert "deprecated" in html + assert "mdb-2" in html + + replaced = _event( + subtype=FeedUrlUpdateType.URL_REPLACED, + feeds=[_feed("mdb-1", NotificationFeedRole.SUBJECT)], + payload={"old_url": "https://old", "new_url": "https://new"}, + ) + html = build_single_html(replaced) + assert "has changed" in html + assert "https://old" in html + + +def test_build_single_html_admin_summary(): + event = _event( + type_id=NotificationTypeId.ADMIN_EVENT_SUMMARY, + feeds=[], + payload={ + "subscriptions_processed": 4, + "events_found": 7, + "emails_sent": 5, + "emails_failed": 2, + "cadence": "weekly", + }, + ) + html = build_single_html(event) + assert "Notification Dispatch Summary" in html + assert "weekly" in html + assert "5" in html + assert "2" in html + # Must NOT fall through to the feed url-updated layout. + assert "has changed" not in html + assert "Old URL" not in html + + +def test_build_digest_html_empty(): + assert "No feed URL changes" in build_digest_html([]) + + +def test_build_digest_html_admin_summary(): + event = _event( + type_id=NotificationTypeId.ADMIN_EVENT_SUMMARY, + feeds=[], + payload={"emails_sent": 3, "emails_failed": 1}, + ) + html = build_digest_html([event]) + assert "Notification Dispatch Summary" in html + assert "3" in html + assert "1" in html + + +def test_build_digest_html_feed_url_updates(): + event = _event( + feeds=[_feed("mdb-1", NotificationFeedRole.SUBJECT)], + payload={"old_url": "o", "new_url": "n"}, + ) + html = build_digest_html([event]) + assert "Feed URL Updates" in html + assert "mdb-1" in html + + +# --------------------------------------------------------------------------- +# send_single / send_digest orchestration +# --------------------------------------------------------------------------- + + +@patch.object(bns, "_send") +def test_send_single_uses_html_fallback_when_no_template(mock_send, monkeypatch): + monkeypatch.delenv("BREVO_TEMPLATE_FEED_URL_UPDATED", raising=False) + event = _event( + feeds=[_feed("mdb-1", NotificationFeedRole.SUBJECT)], + payload={"old_url": "o", "new_url": "n"}, + ) + send_single(EmailRecipient(email="a@b.com"), event, _SUBSCRIPTION) + mock_send.assert_called_once() + kwargs = mock_send.call_args.kwargs + assert kwargs["template_id"] is None + assert kwargs["html_content"] is not None + + +@patch.object(bns, "_send") +def test_send_single_uses_template_when_configured(mock_send, monkeypatch): + monkeypatch.setenv("BREVO_TEMPLATE_FEED_URL_UPDATED", "77") + event = _event( + feeds=[_feed("mdb-1", NotificationFeedRole.SUBJECT)], + payload={"old_url": "o", "new_url": "n"}, + ) + send_single(EmailRecipient(email="a@b.com"), event, _SUBSCRIPTION) + kwargs = mock_send.call_args.kwargs + assert kwargs["template_id"] == 77 + assert kwargs["html_content"] is None + + +@patch.object(bns, "_send") +def test_send_digest_empty_is_noop(mock_send): + send_digest(EmailRecipient(email="a@b.com"), [], _SUBSCRIPTION) + mock_send.assert_not_called() + + +@patch.object(bns, "_send") +def test_send_digest_html_fallback(mock_send, monkeypatch): + monkeypatch.delenv("BREVO_TEMPLATE_FEED_URL_UPDATED_DIGEST", raising=False) + events = [ + _event( + feeds=[_feed("mdb-1", NotificationFeedRole.SUBJECT)], + payload={"old_url": "o", "new_url": "n"}, + ) + ] + send_digest(EmailRecipient(email="a@b.com"), events, _SUBSCRIPTION) + kwargs = mock_send.call_args.kwargs + assert kwargs["template_id"] is None + assert kwargs["html_content"] is not None + + +# --------------------------------------------------------------------------- +# _send low-level +# --------------------------------------------------------------------------- + + +def test_send_raises_when_api_key_missing(monkeypatch): + monkeypatch.delenv("BREVO_API_KEY", raising=False) + with pytest.raises(BrevoSendError, match="BREVO_API_KEY"): + _send( + recipient=EmailRecipient(email="a@b.com"), + subject="s", + html_content="

x

", + template_id=None, + params={}, + ) + + +def test_send_success(monkeypatch): + monkeypatch.setenv("BREVO_API_KEY", "key") + fake_api = MagicMock() + fake_api.send_transac_email.return_value = SimpleNamespace(message_id="mid-1") + with patch("sib_api_v3_sdk.TransactionalEmailsApi", return_value=fake_api): + _send( + recipient=EmailRecipient(email="a@b.com"), + subject="s", + html_content="

x

", + template_id=None, + params={}, + ) + fake_api.send_transac_email.assert_called_once() + + +def test_send_wraps_api_exception(monkeypatch): + monkeypatch.setenv("BREVO_API_KEY", "key") + from sib_api_v3_sdk.rest import ApiException + + fake_api = MagicMock() + fake_api.send_transac_email.side_effect = ApiException(status=429, reason="Too Many") + with patch("sib_api_v3_sdk.TransactionalEmailsApi", return_value=fake_api): + with pytest.raises(BrevoSendError, match="429"): + _send( + recipient=EmailRecipient(email="a@b.com"), + subject="s", + html_content="

x

", + template_id=None, + params={}, + ) + + +def test_send_wraps_unexpected_exception(monkeypatch): + monkeypatch.setenv("BREVO_API_KEY", "key") + fake_api = MagicMock() + fake_api.send_transac_email.side_effect = RuntimeError("boom") + with patch("sib_api_v3_sdk.TransactionalEmailsApi", return_value=fake_api): + with pytest.raises(BrevoSendError, match="Unexpected error"): + _send( + recipient=EmailRecipient(email="a@b.com"), + subject="s", + html_content="

x

", + template_id=None, + params={}, + ) + + +# --------------------------------------------------------------------------- +# _int_env +# --------------------------------------------------------------------------- + + +def test_int_env_valid_invalid_and_unset(monkeypatch): + monkeypatch.setenv("SOME_INT", "42") + assert _int_env("SOME_INT") == 42 + monkeypatch.setenv("SOME_INT", "not-a-number") + assert _int_env("SOME_INT") is None + monkeypatch.delenv("SOME_INT", raising=False) + assert _int_env("SOME_INT") is None + + +# --------------------------------------------------------------------------- +# HTML escaping (injection safety) +# --------------------------------------------------------------------------- + + +def test_build_single_html_escapes_injection(): + event = _event( + subtype=FeedUrlUpdateType.URL_REPLACED, + feeds=[_feed("mdb-