From a8c9ee431192174b5ed1f6ef55f1132f2d061980 Mon Sep 17 00:00:00 2001 From: Nizar Izzuddin Yatim Fadlan Date: Sat, 6 Jun 2026 19:29:49 +0700 Subject: [PATCH 1/6] feat(telemetry): integrate OpenTelemetry for observability Added OpenTelemetry support to the application, including metrics and tracing. New telemetry configuration options have been added to the settings. Logging has been enhanced to redact sensitive information, and various routes have been instrumented to track payment and streaming events. Also updated dependencies in requirements.txt to include OpenTelemetry packages. --- .env.example | 7 ++ core/log.py | 105 +++++++++++++++++++ core/mayar_service.py | 26 ++--- core/otel_metrics.py | 24 +++++ core/telemetry.py | 169 +++++++++++++++++++++++++++++++ main.py | 40 +++++--- requirements.txt | 17 ++++ routes/payment.py | 228 ++++++++++++++++++++++++++---------------- routes/streaming.py | 20 +++- settings.py | 7 ++ 10 files changed, 523 insertions(+), 120 deletions(-) create mode 100644 core/otel_metrics.py create mode 100644 core/telemetry.py diff --git a/.env.example b/.env.example index 69717c7..4fa2205 100644 --- a/.env.example +++ b/.env.example @@ -57,3 +57,10 @@ STREAM_TOKEN_EXPIRE_MINUTES=15 FILE_STORAGE_PATH="./storage" MAX_FILE_SIZE_MB=5 + +# OpenTelemetry (optional observability) +OTEL_ENABLED=false +OTEL_SERVICE_NAME=pyconid25-be +OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 +# OTEL_EXPORTER_OTLP_HEADERS= +# OTEL_LOG_LEVEL=INFO diff --git a/core/log.py b/core/log.py index 60a0376..10bc025 100644 --- a/core/log.py +++ b/core/log.py @@ -1,3 +1,108 @@ +import json import logging +import logging.config +import os +import re +from datetime import datetime, timezone +from typing import Any + +_REDACTION_PATTERNS = ( + re.compile( + r"(?i)(authorization|x-callback-token|mux-signature)" + r"([\s:=]+)(bearer\s+)?([^\s,;}]+)" + ), + re.compile(r"(?i)bearer\s+[a-z0-9._~+/=-]+"), + re.compile( + r"(?i)(mayar_api_key|mayar_webhook_secret|mux_token_secret|" + r"mux_webhook_secret|mux_signing_key_private|secret_key)" + r"([\s:=]+)([^\s,;}]+)" + ), + re.compile( + r"(?i)(payment_link|link|redirect_url|redirecturl|url)" + r"([\s:=]+)(https?://[^\s,;}]+)" + ), + re.compile(r"(?i)(transactionid|transaction_id)([\s:=]+)([^\s,;}]+)"), + re.compile(r"(?i)(email)([\s:=]+)([^\s,;}]+@[^\s,;}]+)"), + re.compile(r"(?i)(phone|mobile)([\s:=]+)(\+?[0-9][0-9\s().-]{5,})"), + re.compile(r"eyJ[a-zA-Z0-9_-]+\.[a-zA-Z0-9_-]+\.[a-zA-Z0-9_-]+"), +) + + +def _redact_sensitive(value: str) -> str: + redacted = value + for pattern in _REDACTION_PATTERNS: + redacted = pattern.sub(_replace_sensitive_match, redacted) + return redacted + + +def _replace_sensitive_match(match: re.Match[str]) -> str: + if match.re.pattern.startswith("(?i)bearer"): + return "Bearer [REDACTED]" + if match.re.pattern.startswith("eyJ"): + return "[REDACTED]" + if match.lastindex and match.lastindex >= 2: + return f"{match.group(1)}{match.group(2)}[REDACTED]" + return "[REDACTED]" + + +class JsonFormatter(logging.Formatter): + def format(self, record: logging.LogRecord) -> str: + payload: dict[str, Any] = { + "timestamp": datetime.fromtimestamp( + record.created, timezone.utc + ).isoformat(), + "level": record.levelname, + "message": _redact_sensitive(record.getMessage()), + "logger": record.name, + "trace_id": getattr(record, "otelTraceID", "0"), + "span_id": getattr(record, "otelSpanID", "0"), + "service": getattr( + record, + "otelServiceName", + os.environ.get("OTEL_SERVICE_NAME", "pyconid25-be"), + ), + } + if record.exc_info: + payload["exception"] = _redact_sensitive( + self.formatException(record.exc_info) + ) + return json.dumps(payload, default=str) + + +def configure_logging() -> None: + log_level = os.environ.get("OTEL_LOG_LEVEL", "INFO").upper() + logging.config.dictConfig( + { + "version": 1, + "disable_existing_loggers": False, + "formatters": {"json": {"()": JsonFormatter}}, + "handlers": { + "console": { + "class": "logging.StreamHandler", + "formatter": "json", + "level": log_level, + } + }, + "root": {"handlers": ["console"], "level": log_level}, + "loggers": { + "uvicorn": { + "handlers": ["console"], + "level": log_level, + "propagate": False, + }, + "uvicorn.error": { + "handlers": ["console"], + "level": log_level, + "propagate": False, + }, + "uvicorn.access": { + "handlers": ["console"], + "level": log_level, + "propagate": False, + }, + }, + } + ) + logger = logging.getLogger("uvicorn.error") diff --git a/core/mayar_service.py b/core/mayar_service.py index f0ddfaa..28a4736 100644 --- a/core/mayar_service.py +++ b/core/mayar_service.py @@ -90,17 +90,13 @@ async def create_payment( timeout=30.0, ) response.raise_for_status() - print(response.status_code) result = response.json() - print( - f"Mayar create payment response: status:{response.status_code} {result}" + logger.info( + f"Payment created successfully in Mayar with status {response.status_code}" ) - logger.info(f"Payment created successfully: {result}") return result except httpx.HTTPStatusError as e: - logger.error( - f"Mayar API returned error {e.response.status_code}: {e.response.text}" - ) + logger.error(f"Mayar API returned error {e.response.status_code}") logger.debug(f"Request URL: {e.request.url}") raise except httpx.RequestError as e: @@ -136,12 +132,12 @@ async def get_payment_status(self, payment_id: str) -> Dict[str, Any]: response.raise_for_status() result = response.json() - logger.info(f"Payment status retrieved: {result}") + logger.info( + f"Payment status retrieved from Mayar for payment {payment_id}" + ) return result except httpx.HTTPStatusError as e: - logger.error( - f"Mayar API returned error {e.response.status_code}: {e.response.text}" - ) + logger.error(f"Mayar API returned error {e.response.status_code}") logger.debug(f"Request URL: {e.request.url}") raise except httpx.RequestError as e: @@ -184,15 +180,11 @@ async def close_payment(self, payment_id: str) -> Dict[str, Any]: if result.get("messages") == "success": logger.info(f"Payment {payment_id} closed successfully on Mayar") else: - logger.warning( - f"Failed to close payment {payment_id} on Mayar: {result}" - ) + logger.warning(f"Failed to close payment {payment_id} on Mayar") return result except httpx.HTTPStatusError as e: - logger.error( - f"Mayar API returned error {e.response.status_code}: {e.response.text}" - ) + logger.error(f"Mayar API returned error {e.response.status_code}") logger.debug(f"Request URL: {e.request.url}") raise except httpx.RequestError as e: diff --git a/core/otel_metrics.py b/core/otel_metrics.py new file mode 100644 index 0000000..17f2120 --- /dev/null +++ b/core/otel_metrics.py @@ -0,0 +1,24 @@ +from core.telemetry import get_meter + +meter = get_meter("pyconid25-be") + +payment_created_counter = meter.create_counter( + "payment.created", + description="Total payments created", + unit="1", +) +payment_status_counter = meter.create_counter( + "payment.status_change", + description="Payment status transitions", + unit="1", +) +payment_webhook_counter = meter.create_counter( + "payment.webhook_received", + description="Payment webhooks received", + unit="1", +) +streaming_webhook_counter = meter.create_counter( + "streaming.webhook_received", + description="Streaming webhooks received", + unit="1", +) diff --git a/core/telemetry.py b/core/telemetry.py new file mode 100644 index 0000000..0dfd7e6 --- /dev/null +++ b/core/telemetry.py @@ -0,0 +1,169 @@ +import os +from typing import Any + +from opentelemetry import metrics, trace +from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader +from opentelemetry.sdk.resources import SERVICE_NAME, Resource +from opentelemetry.sdk.trace import ReadableSpan, TracerProvider +from opentelemetry.sdk.trace.export import ( + BatchSpanProcessor, + SpanExporter, + SpanProcessor, +) + +from core.log import configure_logging + +_initialized = False +_enabled = False + + +class PCISafeSpanProcessor(SpanProcessor): + SCRUB_KEYS = { + "http.request.body", + "http.response.body", + "http.request.header.authorization", + "http.request.header.x-callback-token", + "http.request.header.mux-signature", + "http.response.header.authorization", + "http.response.header.x-callback-token", + "http.response.header.mux-signature", + "payment_link", + "payment.link", + "mayar_api_key", + "net.peer.name", + "net.sock.peer.addr", + "db.statement", + } + SCRUB_PREFIXES = ( + "card", + "account", + "payment_link", + "mayar_api_key", + "customer_email", + "customer_phone", + "customer_mobile", + "customer_name", + "mayar_payment_id", + "mayar_transaction_id", + "mux_stream_id", + "mux_asset_id", + "mux_live_stream_id", + "mux_playback_id", + "exception_message", + "exception_stacktrace", + "user_email", + "user_phone", + "user_mobile", + ) + + def __init__(self, exporter: SpanExporter): + self._delegate = BatchSpanProcessor(exporter) + + def on_start(self, span, parent_context=None) -> None: + self._delegate.on_start(span, parent_context=parent_context) + + def on_end(self, span: ReadableSpan) -> None: + span._attributes = self._scrub_attributes(span.attributes) # noqa: SLF001 + self._delegate.on_end(span) + + def shutdown(self) -> None: + self._delegate.shutdown() + + def force_flush(self, timeout_millis: int = 30000) -> bool: + return self._delegate.force_flush(timeout_millis) + + def _scrub_attributes(self, attributes: Any) -> dict[str, Any]: + scrubbed = {} + for key, value in dict(attributes or {}).items(): + normalized_key = key.lower().replace(".", "_").replace("-", "_") + should_scrub = key in self.SCRUB_KEYS or any( + normalized_key.startswith(prefix) for prefix in self.SCRUB_PREFIXES + ) + scrubbed[key] = "[REDACTED]" if should_scrub else value + return scrubbed + + +def setup_telemetry() -> bool: + global _enabled, _initialized + if _initialized: + return _enabled + + configure_logging() + + if not _env_bool("OTEL_ENABLED", default=False): + _initialized = True + return False + + resource = Resource.create( + { + SERVICE_NAME: os.environ.get("OTEL_SERVICE_NAME", "pyconid25-be"), + "deployment.environment": os.environ.get("ENVIRONTMENT", "file"), + } + ) + _setup_traces(resource) + _setup_metrics(resource) + _setup_auto_instrumentation() + + _enabled = True + _initialized = True + return True + + +def get_tracer(name: str = "pyconid25-be"): + return trace.get_tracer(name) + + +def get_meter(name: str = "pyconid25-be"): + return metrics.get_meter(name) + + +def amount_bucket(amount: int | None) -> str: + if not amount or amount <= 0: + return "free" + if amount <= 100_000: + return "0-100k" + if amount <= 500_000: + return "100k-500k" + if amount <= 1_000_000: + return "500k-1m" + return "1m+" + + +def _setup_traces(resource: Resource) -> None: + provider = TracerProvider(resource=resource) + provider.add_span_processor(PCISafeSpanProcessor(OTLPSpanExporter())) + trace.set_tracer_provider(provider) + + +def _setup_metrics(resource: Resource) -> None: + reader = PeriodicExportingMetricReader( + OTLPMetricExporter(), + export_interval_millis=int( + os.environ.get("OTEL_METRIC_EXPORT_INTERVAL_MS", "30000") + ), + ) + metrics.set_meter_provider( + MeterProvider(resource=resource, metric_readers=[reader]) + ) + + +def _setup_auto_instrumentation() -> None: + from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor + from opentelemetry.instrumentation.logging import LoggingInstrumentor + from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor + + from models import engine + + SQLAlchemyInstrumentor().instrument(engine=engine) + HTTPXClientInstrumentor().instrument() + LoggingInstrumentor().instrument(set_logging_format=False) + + +def _env_bool(name: str, default: bool) -> bool: + value = os.environ.get(name) + if value is None: + return default + return value.lower() in {"1", "true", "yes", "on"} diff --git a/main.py b/main.py index 18b5db5..c236a1e 100644 --- a/main.py +++ b/main.py @@ -1,26 +1,13 @@ from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from pydantic import ValidationError + from core.health_check import health_check from core.log import logger from core.rate_limiter.memory import InMemoryRateLimiter from core.rate_limiter.middleware import RateLimitMiddleware -from routes.auth import router as auth_router -from routes.user_profile import router as user_profile_router -from routes.locations import router as locations_router -from routes.ticket import router as ticket_router -from routes.room import router as room_router -from routes.schedule import router as schedule_router -from routes.speaker import router as speaker_router -from routes.payment import router as payment_router -from routes.streaming import router as streaming_router -from routes.voucher import router as voucher_router -from routes.speaker_type import router as speaker_type_router -from routes.organizer_type import router as organizer_type_router -from routes.organizer import router as organizer_router -from routes.schedule_type import router as schedule_type_router -from routes.volunteer import router as volunteer_router from settings import ( RATE_LIMIT_ENABLED, @@ -29,6 +16,10 @@ RATE_LIMIT_WINDOW, ) +from core.telemetry import setup_telemetry + +otel_enabled = setup_telemetry() + health_check() app = FastAPI(title="PyconId 2025 BE") @@ -50,6 +41,25 @@ exclude_paths=RATE_LIMIT_EXCLUDED_PATHS, ) +if otel_enabled: + FastAPIInstrumentor.instrument_app(app) + +from routes.auth import router as auth_router # noqa: E402 +from routes.locations import router as locations_router # noqa: E402 +from routes.organizer import router as organizer_router # noqa: E402 +from routes.organizer_type import router as organizer_type_router # noqa: E402 +from routes.payment import router as payment_router # noqa: E402 +from routes.room import router as room_router # noqa: E402 +from routes.schedule import router as schedule_router # noqa: E402 +from routes.schedule_type import router as schedule_type_router # noqa: E402 +from routes.speaker import router as speaker_router # noqa: E402 +from routes.speaker_type import router as speaker_type_router # noqa: E402 +from routes.streaming import router as streaming_router # noqa: E402 +from routes.ticket import router as ticket_router # noqa: E402 +from routes.user_profile import router as user_profile_router # noqa: E402 +from routes.volunteer import router as volunteer_router # noqa: E402 +from routes.voucher import router as voucher_router # noqa: E402 + app.include_router(auth_router) app.include_router(user_profile_router) app.include_router(locations_router) diff --git a/requirements.txt b/requirements.txt index cf75885..f132229 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ aiosmtplib==3.0.2 alembic==1.15.1 annotated-types==0.7.0 anyio==4.8.0 +asgiref==3.11.1 Authlib==1.6.0 bcrypt==4.3.0 blinker==1.9.0 @@ -25,12 +26,26 @@ httpx==0.28.1 idna==3.10 iniconfig==2.0.0 itsdangerous==2.2.0 +googleapis-common-protos==1.75.0 Jinja2==3.1.6 Mako==1.3.9 markdown-it-py==3.0.0 MarkupSafe==3.0.2 mdurl==0.1.2 mux-python==5.1.0 +opentelemetry-api==1.42.1 +opentelemetry-exporter-otlp-proto-common==1.42.1 +opentelemetry-exporter-otlp-proto-http==1.42.1 +opentelemetry-instrumentation==0.63b1 +opentelemetry-instrumentation-asgi==0.63b1 +opentelemetry-instrumentation-fastapi==0.63b1 +opentelemetry-instrumentation-httpx==0.63b1 +opentelemetry-instrumentation-logging==0.63b1 +opentelemetry-instrumentation-sqlalchemy==0.63b1 +opentelemetry-proto==1.42.1 +opentelemetry-sdk==1.42.1 +opentelemetry-semantic-conventions==0.63b1 +opentelemetry-util-http==0.63b1 mypy==1.15.0 mypy-extensions==1.0.0 numpy==2.3.3 @@ -44,6 +59,7 @@ pycparser==2.22 pydantic==2.10.6 pydantic-settings==2.11.0 pydantic_core==2.27.2 +protobuf==6.33.6 Pygments==2.19.1 PyJWT==2.10.1 pytest==8.3.5 @@ -73,3 +89,4 @@ uvicorn==0.34.0 uvloop==0.21.0 watchfiles==1.0.4 websockets==15.0.1 +wrapt==2.2.1 diff --git a/routes/payment.py b/routes/payment.py index 2b2a3c8..d109859 100644 --- a/routes/payment.py +++ b/routes/payment.py @@ -2,10 +2,17 @@ from typing import Optional from fastapi import APIRouter, Depends, Header +from opentelemetry import trace from sqlalchemy.orm import Session from core.log import logger from core.mayar_service import MayarService +from core.otel_metrics import ( + payment_created_counter, + payment_status_counter, + payment_webhook_counter, +) +from core.telemetry import amount_bucket, get_tracer from core.responses import ( BadRequest, Forbidden, @@ -61,6 +68,8 @@ router = APIRouter(prefix="/payment", tags=["Payment"]) +tracer = get_tracer("payment") + async def close_unpaid_payments_with_mayar( db: Session, @@ -68,33 +77,35 @@ async def close_unpaid_payments_with_mayar( exclude_payment_id: str, mayar_service: MayarService, ) -> int: - payments_to_close = paymentRepo.get_payments_by_user_id( - db=db, - user_id=user_id, - status=PaymentStatus.UNPAID, - exclude_payment_id=exclude_payment_id, - ) - - closed_count = 0 - for payment in payments_to_close: - if payment.mayar_id: - try: - await mayar_service.close_payment(payment_id=payment.mayar_id) - logger.info( - f"Closed payment {payment.id} in Mayar (mayar_id: {payment.mayar_id})" - ) - except Exception as e: - logger.error(f"Failed to close payment {payment.id} in Mayar: {e}") - - paymentRepo.update_payment( + with tracer.start_as_current_span("payment.close_unpaid_batch") as span: + payments_to_close = paymentRepo.get_payments_by_user_id( db=db, - payment=payment, - status=PaymentStatus.CLOSED, - is_commit=False, + user_id=user_id, + status=PaymentStatus.UNPAID, + exclude_payment_id=exclude_payment_id, ) - closed_count += 1 - return closed_count + closed_count = 0 + for payment in payments_to_close: + if payment.mayar_id: + try: + await mayar_service.close_payment(payment_id=payment.mayar_id) + logger.info( + f"Closed payment {payment.id} in Mayar (mayar_id: {payment.mayar_id})" + ) + except Exception as e: + logger.error(f"Failed to close payment {payment.id} in Mayar: {e}") + + paymentRepo.update_payment( + db=db, + payment=payment, + status=PaymentStatus.CLOSED, + is_commit=False, + ) + closed_count += 1 + + span.set_attribute("payment.close_count", closed_count) + return closed_count @router.get( @@ -181,6 +192,9 @@ async def create_payment( db: Session = Depends(get_db_sync), token: str = Depends(oauth2_scheme), ): + span = tracer.start_span("payment.create") + span_context = trace.use_span(span, end_on_exit=True) + span_context.__enter__() try: user = get_user_from_token(db=db, token=token) if user is None: @@ -251,6 +265,10 @@ async def create_payment( voucher_id=str(voucher.id) if voucher else None, is_commit=False, ) + span.set_attribute("payment.id", str(payment.id)) + span.set_attribute("ticket.id", str(ticket.id)) + span.set_attribute("payment.amount_bucket", amount_bucket(amount)) + span.set_attribute("payment.has_voucher", voucher is not None) if amount <= 0: user.participant_type = ( @@ -265,6 +283,14 @@ async def create_payment( db.commit() db.refresh(payment) + span.set_attribute("payment.status", str(payment.status)) + payment_created_counter.add( + 1, + { + "ticket.name": ticket.name, + "has_voucher": str(voucher is not None), + }, + ) return common_response( Ok( data=CreatePaymentResponse( @@ -298,14 +324,15 @@ async def create_payment( f"{user.first_name or ''} {user.last_name or ''}".strip() ) - mayar_response = await mayar_service.create_payment( - ticket=ticket, - customer_email=user.email, - customer_name=customer_name, - customer_phone=user.phone, - tx_internal_id=str(payment.id), - voucher=voucher, - ) + with tracer.start_as_current_span("mayar.create_payment"): + mayar_response = await mayar_service.create_payment( + ticket=ticket, + customer_email=user.email, + customer_name=customer_name, + customer_phone=user.phone, + tx_internal_id=str(payment.id), + voucher=voucher, + ) data = mayar_response.get("data", {}) payment_link = data.get("link", "") @@ -331,6 +358,14 @@ async def create_payment( db.commit() db.refresh(payment) + span.set_attribute("payment.status", str(payment.status)) + payment_created_counter.add( + 1, + { + "ticket.name": ticket.name, + "has_voucher": str(voucher is not None), + }, + ) return common_response( Ok( data=CreatePaymentResponse( @@ -359,6 +394,8 @@ async def create_payment( traceback.print_exc() logger.error(f"Error in create_payment: {e}") return common_response(InternalServerError(error="Internal Server Error")) + finally: + span_context.__exit__(None, None, None) @router.get( @@ -478,9 +515,10 @@ async def get_payment_detail( mayar_service = MayarService(api_key=MAYAR_API_KEY, base_url=MAYAR_BASE_URL) try: if payment.mayar_id and payment.status == PaymentStatus.UNPAID: - mayar_status_response = await mayar_service.get_payment_status( - payment_id=payment.mayar_id - ) + with tracer.start_as_current_span("mayar.get_payment_status"): + mayar_status_response = await mayar_service.get_payment_status( + payment_id=payment.mayar_id + ) data = mayar_status_response.get("data", {}) transaction_status = data.get("status", "").lower() @@ -603,69 +641,85 @@ async def payment_webhook( event = request.get("event") data = request.get("data", {}) if event == "payment.received" and data: - mayar_id = data.get("id") - mayar_transaction_id = data.get("transactionId") - transaction_status = data.get("status", "").lower() + with tracer.start_as_current_span("payment.webhook") as webhook_span: + payment_webhook_counter.add(1, {"event": event}) + webhook_span.set_attribute("webhook.event", event) + mayar_id = data.get("id") + mayar_transaction_id = data.get("transactionId") + transaction_status = data.get("status", "").lower() - if not mayar_id and not mayar_transaction_id: - return common_response( - BadRequest(message="id or transactionId is required") - ) + if not mayar_id and not mayar_transaction_id: + return common_response( + BadRequest(message="id or transactionId is required") + ) - status_mapping = {"success": PaymentStatus.PAID} + status_mapping = {"success": PaymentStatus.PAID} - status = status_mapping.get(transaction_status, PaymentStatus.UNPAID) + status = status_mapping.get(transaction_status, PaymentStatus.UNPAID) - payment = None - if mayar_transaction_id: - payment = paymentRepo.get_payment_by_mayar_transaction_id( - db=db, mayar_transaction_id=mayar_transaction_id - ) + payment = None + if mayar_transaction_id: + payment = paymentRepo.get_payment_by_mayar_transaction_id( + db=db, mayar_transaction_id=mayar_transaction_id + ) - if not payment and mayar_id: - payment = paymentRepo.get_payment_by_mayar_id(db=db, mayar_id=mayar_id) + if not payment and mayar_id: + payment = paymentRepo.get_payment_by_mayar_id( + db=db, mayar_id=mayar_id + ) - if not payment: - logger.warning( - f"Payment not found for mayar_id: {mayar_id}, transactionId: {mayar_transaction_id}" - ) - return common_response(BadRequest(message="Payment not found")) + if not payment: + logger.warning( + f"Payment not found for mayar_id: {mayar_id}, transactionId: {mayar_transaction_id}" + ) + return common_response(BadRequest(message="Payment not found")) - paymentRepo.update_payment( - db=db, - payment=payment, - status=status, - mayar_id=mayar_id, - mayar_transaction_id=mayar_transaction_id, - is_commit=False, - ) - user: UserModel = payment.user - ticket: TicketModel = payment.ticket - voucher: VoucherModel = payment.voucher - - if status == PaymentStatus.PAID: - if voucher and voucher.type: - user.participant_type = voucher.type - else: - user.participant_type = ticket.user_participant_type - db.add(user) - - mayar_service = MayarService( - api_key=MAYAR_API_KEY, base_url=MAYAR_BASE_URL - ) - closed_count = await close_unpaid_payments_with_mayar( + status_before = str(payment.status) + webhook_span.set_attribute("payment.id", str(payment.id)) + webhook_span.set_attribute("payment.status_before", status_before) + + paymentRepo.update_payment( db=db, - user_id=str(user.id), - exclude_payment_id=str(payment.id), - mayar_service=mayar_service, + payment=payment, + status=status, + mayar_id=mayar_id, + mayar_transaction_id=mayar_transaction_id, + is_commit=False, ) - if closed_count > 0: - logger.info( - f"Closed {closed_count} other unpaid payment(s) for user {user.id}" + user: UserModel = payment.user + ticket: TicketModel = payment.ticket + voucher: VoucherModel = payment.voucher + + if status == PaymentStatus.PAID: + if voucher and voucher.type: + user.participant_type = voucher.type + else: + user.participant_type = ticket.user_participant_type + db.add(user) + + mayar_service = MayarService( + api_key=MAYAR_API_KEY, base_url=MAYAR_BASE_URL + ) + closed_count = await close_unpaid_payments_with_mayar( + db=db, + user_id=str(user.id), + exclude_payment_id=str(payment.id), + mayar_service=mayar_service, ) + if closed_count > 0: + logger.info( + f"Closed {closed_count} other unpaid payment(s) for user {user.id}" + ) - db.commit() - logger.info(f"Payment {payment.id} updated to status {status} via webhook") + db.commit() + webhook_span.set_attribute("payment.status_after", str(status)) + payment_status_counter.add( + 1, + {"from_status": status_before, "to_status": str(status)}, + ) + logger.info( + f"Payment {payment.id} updated to status {status} via webhook" + ) return common_response(Ok(data={"message": "Webhook processed successfully"})) except Exception as e: diff --git a/routes/streaming.py b/routes/streaming.py index c424ae2..e8a7083 100644 --- a/routes/streaming.py +++ b/routes/streaming.py @@ -4,10 +4,13 @@ from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Request +from opentelemetry import trace from pytz import timezone from sqlalchemy.orm import Session from core.log import logger +from core.otel_metrics import streaming_webhook_counter +from core.telemetry import get_tracer from core.mux_service import mux_service from core.responses import ( BadRequest, @@ -138,8 +141,12 @@ async def get_stream_playback( return common_response(InternalServerError(error=str(e))) +tracer = get_tracer("streaming") + + @router.post("/webhook") async def mux_webhook(request: Request, db: Session = Depends(get_db_sync)): + stream_span = None try: payload = await request.body() signature = request.headers.get("Mux-Signature", "") @@ -148,9 +155,17 @@ async def mux_webhook(request: Request, db: Session = Depends(get_db_sync)): return common_response(Unauthorized(message="Invalid webhook signature")) webhook_data = json.loads(payload) - logger.info(f"Webhook Mux Data: {webhook_data}") event_type = webhook_data.get("type") data = webhook_data.get("data", {}) + logger.info(f"Mux webhook received: {event_type or 'unknown'}") + + stream_id = data.get("id", "") + streaming_webhook_counter.add(1, {"event_type": event_type or "unknown"}) + stream_span = tracer.start_span("streaming.webhook") + stream_context = trace.use_span(stream_span, end_on_exit=True) + stream_context.__enter__() + stream_span.set_attribute("webhook.event_type", event_type or "unknown") + stream_span.set_attribute("mux.stream_id", stream_id) if event_type == "video.asset.ready": asset_id = data.get("id") @@ -237,3 +252,6 @@ async def mux_webhook(request: Request, db: Session = Depends(get_db_sync)): raise except Exception as e: return common_response(InternalServerError(error=str(e))) + finally: + if stream_span is not None: + stream_context.__exit__(None, None, None) diff --git a/settings.py b/settings.py index 4a43eb2..d2c9584 100644 --- a/settings.py +++ b/settings.py @@ -93,6 +93,13 @@ def str_to_bool(string: str) -> bool: os.environ.get("STREAM_TOKEN_EXPIRE_MINUTES", default="15") ) +# OpenTelemetry +OTEL_ENABLED = str_to_bool(os.environ.get("OTEL_ENABLED", "False")) +OTEL_SERVICE_NAME = os.environ.get("OTEL_SERVICE_NAME", "pyconid25-be") +OTEL_EXPORTER_OTLP_ENDPOINT = os.environ.get( + "OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4318" +) + # File upload FILE_STORAGE_PATH = os.environ.get("FILE_STORAGE_PATH", "./storage") MAX_FILE_SIZE_MB = int(os.environ.get("MAX_FILE_SIZE_MB", "5")) From 6d82ae946a631474b6c7abda348670245d0e27e8 Mon Sep 17 00:00:00 2001 From: Nizar Izzuddin Yatim Fadlan Date: Sat, 6 Jun 2026 20:02:39 +0700 Subject: [PATCH 2/6] feat(observability): add OpenTelemetry collector configuration - Introduced a new otel_collector service in docker-compose.yaml - Added environment variables for Datadog integration - Created otel-collector-config.yaml for OpenTelemetry configuration --- .env.example | 4 ++++ docker-compose.yaml | 22 +++++++++++++++++++++- otel-collector-config.yaml | 35 +++++++++++++++++++++++++++++++++++ 3 files changed, 60 insertions(+), 1 deletion(-) create mode 100644 otel-collector-config.yaml diff --git a/.env.example b/.env.example index 4fa2205..1b90c7c 100644 --- a/.env.example +++ b/.env.example @@ -64,3 +64,7 @@ OTEL_SERVICE_NAME=pyconid25-be OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 # OTEL_EXPORTER_OTLP_HEADERS= # OTEL_LOG_LEVEL=INFO + +# Datadog — only needed for otel_collector +# DD_API_KEY= +# DD_SITE=us5.datadoghq.com diff --git a/docker-compose.yaml b/docker-compose.yaml index 9033022..6f704fe 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -24,6 +24,26 @@ services: max-size: "10m" max-file: "3" + otel_collector: + image: otel/opentelemetry-collector-contrib:latest + container_name: otel_collector + restart: always + command: ["--config=/conf/otel-collector-config.yaml"] + env_file: + - ./.env + environment: + - DD_API_KEY=${DD_API_KEY} + - DD_SITE=${DD_SITE:-datadoghq.com} + volumes: + - ./otel-collector-config.yaml:/conf/otel-collector-config.yaml:ro + networks: + - ${NETWORK_NAME} + logging: + driver: "json-file" + options: + max-size: "10m" + max-file: "3" + networks: pycon_project: - external: ${USE_EXTERNAL_NETWORK} \ No newline at end of file + external: ${USE_EXTERNAL_NETWORK} diff --git a/otel-collector-config.yaml b/otel-collector-config.yaml new file mode 100644 index 0000000..46c98d1 --- /dev/null +++ b/otel-collector-config.yaml @@ -0,0 +1,35 @@ +receivers: + otlp: + protocols: + http: + endpoint: 0.0.0.0:4318 + +processors: + batch: + send_batch_max_size: 100 + send_batch_size: 10 + timeout: 10s + +connectors: + datadog/connector: + +exporters: + datadog/exporter: + api: + site: ${env:DD_SITE:-datadoghq.com} + key: ${env:DD_API_KEY} + +service: + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [datadog/connector, datadog/exporter] + metrics: + receivers: [otlp, datadog/connector] + processors: [batch] + exporters: [datadog/exporter] + logs: + receivers: [otlp] + processors: [batch] + exporters: [datadog/exporter] From d63241135a17a0fe1f294940acc2747bd9365cfd Mon Sep 17 00:00:00 2001 From: Nizar Izzuddin Yatim Fadlan Date: Sat, 6 Jun 2026 20:46:02 +0700 Subject: [PATCH 3/6] feat(logging): add request logging middleware and enhance log formatting - Implemented middleware to log request details including method, path, status code, and duration. - Enhanced JsonFormatter to include OpenTelemetry trace and span IDs. - Updated OpenTelemetry collector image version in docker-compose. - Added memory limiter processor to the OpenTelemetry collector config. --- core/log.py | 17 ++++++++++++++--- docker-compose.yaml | 2 +- main.py | 18 ++++++++++++++++++ otel-collector-config.yaml | 16 ++++++++++------ 4 files changed, 43 insertions(+), 10 deletions(-) diff --git a/core/log.py b/core/log.py index 10bc025..08f79c0 100644 --- a/core/log.py +++ b/core/log.py @@ -6,6 +6,8 @@ from datetime import datetime, timezone from typing import Any +from opentelemetry import trace + _REDACTION_PATTERNS = ( re.compile( r"(?i)(authorization|x-callback-token|mux-signature)" @@ -47,6 +49,15 @@ def _replace_sensitive_match(match: re.Match[str]) -> str: class JsonFormatter(logging.Formatter): def format(self, record: logging.LogRecord) -> str: + trace_id = getattr(record, "otelTraceID", "0") + span_id = getattr(record, "otelSpanID", "0") + + if trace_id == "0": + span_context = trace.get_current_span().get_span_context() + if span_context.is_valid: + trace_id = format(span_context.trace_id, "032x") + span_id = format(span_context.span_id, "016x") + payload: dict[str, Any] = { "timestamp": datetime.fromtimestamp( record.created, timezone.utc @@ -54,8 +65,8 @@ def format(self, record: logging.LogRecord) -> str: "level": record.levelname, "message": _redact_sensitive(record.getMessage()), "logger": record.name, - "trace_id": getattr(record, "otelTraceID", "0"), - "span_id": getattr(record, "otelSpanID", "0"), + "trace_id": trace_id, + "span_id": span_id, "service": getattr( record, "otelServiceName", @@ -97,7 +108,7 @@ def configure_logging() -> None: }, "uvicorn.access": { "handlers": ["console"], - "level": log_level, + "level": "WARNING", "propagate": False, }, }, diff --git a/docker-compose.yaml b/docker-compose.yaml index 6f704fe..1aa6208 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -25,7 +25,7 @@ services: max-file: "3" otel_collector: - image: otel/opentelemetry-collector-contrib:latest + image: otel/opentelemetry-collector-contrib:0.153.0 container_name: otel_collector restart: always command: ["--config=/conf/otel-collector-config.yaml"] diff --git a/main.py b/main.py index c236a1e..d559bcb 100644 --- a/main.py +++ b/main.py @@ -1,3 +1,5 @@ +import time + from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse @@ -24,6 +26,22 @@ app = FastAPI(title="PyconId 2025 BE") + +@app.middleware("http") +async def request_logging_middleware(request: Request, call_next): + start = time.perf_counter() + response = await call_next(request) + duration_ms = (time.perf_counter() - start) * 1000 + logger.info( + "%s %s %d %.2fms", + request.method, + request.url.path, + response.status_code, + duration_ms, + ) + return response + + app.add_middleware( CORSMiddleware, allow_origins=["*"], diff --git a/otel-collector-config.yaml b/otel-collector-config.yaml index 46c98d1..01ea750 100644 --- a/otel-collector-config.yaml +++ b/otel-collector-config.yaml @@ -1,3 +1,6 @@ +extensions: + health_check: + receivers: otlp: protocols: @@ -5,6 +8,10 @@ receivers: endpoint: 0.0.0.0:4318 processors: + memory_limiter: + check_interval: 1s + limit_percentage: 80 + spike_limit_percentage: 25 batch: send_batch_max_size: 100 send_batch_size: 10 @@ -20,16 +27,13 @@ exporters: key: ${env:DD_API_KEY} service: + extensions: [health_check] pipelines: traces: receivers: [otlp] - processors: [batch] + processors: [memory_limiter, batch] exporters: [datadog/connector, datadog/exporter] metrics: receivers: [otlp, datadog/connector] - processors: [batch] - exporters: [datadog/exporter] - logs: - receivers: [otlp] - processors: [batch] + processors: [memory_limiter, batch] exporters: [datadog/exporter] From 2efaa4220e1124243cbfe8ff6186760c0a2e8b4f Mon Sep 17 00:00:00 2001 From: Nizar Izzuddin Yatim Fadlan Date: Sat, 6 Jun 2026 21:01:03 +0700 Subject: [PATCH 4/6] feat(logging): enhance request logging with trace and span IDs Added support for capturing request trace and span IDs in the logging middleware. This allows for better observability and tracing of requests through the application, improving debugging and monitoring capabilities. --- core/log.py | 8 ++++++-- main.py | 11 +++++++++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/core/log.py b/core/log.py index 08f79c0..8308f7d 100644 --- a/core/log.py +++ b/core/log.py @@ -49,8 +49,12 @@ def _replace_sensitive_match(match: re.Match[str]) -> str: class JsonFormatter(logging.Formatter): def format(self, record: logging.LogRecord) -> str: - trace_id = getattr(record, "otelTraceID", "0") - span_id = getattr(record, "otelSpanID", "0") + trace_id = getattr(record, "requestTraceID", None) or getattr( + record, "otelTraceID", "0" + ) + span_id = getattr(record, "requestSpanID", None) or getattr( + record, "otelSpanID", "0" + ) if trace_id == "0": span_context = trace.get_current_span().get_span_context() diff --git a/main.py b/main.py index d559bcb..6c1b3c1 100644 --- a/main.py +++ b/main.py @@ -3,6 +3,7 @@ from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse +from opentelemetry import trace from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from pydantic import ValidationError @@ -32,12 +33,22 @@ async def request_logging_middleware(request: Request, call_next): start = time.perf_counter() response = await call_next(request) duration_ms = (time.perf_counter() - start) * 1000 + + span_context = trace.get_current_span().get_span_context() + log_extra = {} + if span_context.is_valid: + log_extra = { + "requestTraceID": format(span_context.trace_id, "032x"), + "requestSpanID": format(span_context.span_id, "016x"), + } + logger.info( "%s %s %d %.2fms", request.method, request.url.path, response.status_code, duration_ms, + extra=log_extra, ) return response From fcee3e98390b5fe0cd0ed50df4fded4a1765a8ee Mon Sep 17 00:00:00 2001 From: Nizar Izzuddin Yatim Fadlan Date: Sat, 6 Jun 2026 22:01:08 +0700 Subject: [PATCH 5/6] feat(observability): add OpenTelemetry collector configuration This commit introduces a new service for the OpenTelemetry collector in the docker-compose setup. It includes configuration for the collector, environment variables, and logging options to enhance observability in the application. --- docker-compose.otel.yaml | 24 ++++++++++++++++++++++++ docker-compose.yaml | 20 -------------------- 2 files changed, 24 insertions(+), 20 deletions(-) create mode 100644 docker-compose.otel.yaml diff --git a/docker-compose.otel.yaml b/docker-compose.otel.yaml new file mode 100644 index 0000000..3733625 --- /dev/null +++ b/docker-compose.otel.yaml @@ -0,0 +1,24 @@ +services: + otel_collector: + image: otel/opentelemetry-collector-contrib:0.153.0 + container_name: otel_collector + restart: always + command: ["--config=/conf/otel-collector-config.yaml"] + env_file: + - ./.env + environment: + - DD_API_KEY=${DD_API_KEY} + - DD_SITE=${DD_SITE:-datadoghq.com} + volumes: + - ./otel-collector-config.yaml:/conf/otel-collector-config.yaml:ro + networks: + - ${NETWORK_NAME} + logging: + driver: "json-file" + options: + max-size: "10m" + max-file: "3" + +networks: + pycon_project: + external: ${USE_EXTERNAL_NETWORK} diff --git a/docker-compose.yaml b/docker-compose.yaml index 1aa6208..cd32af4 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -24,26 +24,6 @@ services: max-size: "10m" max-file: "3" - otel_collector: - image: otel/opentelemetry-collector-contrib:0.153.0 - container_name: otel_collector - restart: always - command: ["--config=/conf/otel-collector-config.yaml"] - env_file: - - ./.env - environment: - - DD_API_KEY=${DD_API_KEY} - - DD_SITE=${DD_SITE:-datadoghq.com} - volumes: - - ./otel-collector-config.yaml:/conf/otel-collector-config.yaml:ro - networks: - - ${NETWORK_NAME} - logging: - driver: "json-file" - options: - max-size: "10m" - max-file: "3" - networks: pycon_project: external: ${USE_EXTERNAL_NETWORK} From b497f374bffa301569cce5c6fa06260f347564c7 Mon Sep 17 00:00:00 2001 From: maliqpotter09 Date: Sun, 7 Jun 2026 01:55:38 +0700 Subject: [PATCH 6/6] feat(observability): update someconfig --- .env.example | 11 +++++++- core/log.py | 62 ++++++++++++++++++++++++++++++++++++++------- docker-compose.yaml | 2 ++ 3 files changed, 65 insertions(+), 10 deletions(-) diff --git a/.env.example b/.env.example index 1b90c7c..b660ff4 100644 --- a/.env.example +++ b/.env.example @@ -61,10 +61,19 @@ MAX_FILE_SIZE_MB=5 # OpenTelemetry (optional observability) OTEL_ENABLED=false OTEL_SERVICE_NAME=pyconid25-be +# When using Datadog Agent Fleet, point to the agent's OTLP receiver: +# OTEL_EXPORTER_OTLP_ENDPOINT=http://dd-agent:4318 +# When using standalone OTel Collector: +# OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 # OTEL_EXPORTER_OTLP_HEADERS= # OTEL_LOG_LEVEL=INFO -# Datadog — only needed for otel_collector +# Datadog +# Set to true to enable Datadog-specific log fields (dd.trace_id, dd.span_id, dd.env) +# When false, logs use generic OTel-compatible fields (trace_id, span_id in hex) +DD_ENABLED=false # DD_API_KEY= # DD_SITE=us5.datadoghq.com +# DD_ENV overrides ENVIRONTMENT for Datadog tagging (e.g. production, staging) +# DD_ENV=production diff --git a/core/log.py b/core/log.py index 8308f7d..29020c1 100644 --- a/core/log.py +++ b/core/log.py @@ -48,35 +48,79 @@ def _replace_sensitive_match(match: re.Match[str]) -> str: class JsonFormatter(logging.Formatter): + """JSON log formatter with optional Datadog-compatible field names. + + When ``DD_ENABLED=true``, outputs ``dd.trace_id`` and ``dd.span_id`` + as **decimal** strings (lower 64-bit of the 128-bit OTel trace-id) + so that Datadog Agent can correlate logs ↔ APM traces automatically. + + When ``DD_ENABLED=false`` (default), outputs standard ``trace_id`` + and ``span_id`` in hex format for generic OTel-compatible backends. + """ + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self._dd_enabled = os.environ.get("DD_ENABLED", "false").lower() in { + "1", + "true", + "yes", + "on", + } + + @staticmethod + def _hex_to_dd_trace_id(hex_trace_id: str) -> str: + """Convert a 128-bit hex trace-id to Datadog's 64-bit decimal.""" + try: + return str(int(hex_trace_id[-16:], 16)) + except (ValueError, IndexError): + return "0" + + @staticmethod + def _hex_to_dd_span_id(hex_span_id: str) -> str: + """Convert a hex span-id to Datadog's decimal format.""" + try: + return str(int(hex_span_id, 16)) + except (ValueError, IndexError): + return "0" + def format(self, record: logging.LogRecord) -> str: - trace_id = getattr(record, "requestTraceID", None) or getattr( + hex_trace_id = getattr(record, "requestTraceID", None) or getattr( record, "otelTraceID", "0" ) - span_id = getattr(record, "requestSpanID", None) or getattr( + hex_span_id = getattr(record, "requestSpanID", None) or getattr( record, "otelSpanID", "0" ) - if trace_id == "0": + if hex_trace_id == "0": span_context = trace.get_current_span().get_span_context() if span_context.is_valid: - trace_id = format(span_context.trace_id, "032x") - span_id = format(span_context.span_id, "016x") + hex_trace_id = format(span_context.trace_id, "032x") + hex_span_id = format(span_context.span_id, "016x") payload: dict[str, Any] = { "timestamp": datetime.fromtimestamp( record.created, timezone.utc ).isoformat(), - "level": record.levelname, + "status": record.levelname, "message": _redact_sensitive(record.getMessage()), - "logger": record.name, - "trace_id": trace_id, - "span_id": span_id, + "logger.name": record.name, "service": getattr( record, "otelServiceName", os.environ.get("OTEL_SERVICE_NAME", "pyconid25-be"), ), } + + if self._dd_enabled: + payload["dd.trace_id"] = self._hex_to_dd_trace_id(hex_trace_id) + payload["dd.span_id"] = self._hex_to_dd_span_id(hex_span_id) + payload["dd.env"] = os.environ.get( + "DD_ENV", os.environ.get("ENVIRONTMENT", "") + ) + else: + payload["trace_id"] = hex_trace_id + payload["span_id"] = hex_span_id + if record.exc_info: payload["exception"] = _redact_sensitive( self.formatException(record.exc_info) diff --git a/docker-compose.yaml b/docker-compose.yaml index cd32af4..454d396 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -12,6 +12,8 @@ services: - ./storage/:/app/storage/ networks: - ${NETWORK_NAME} + labels: + com.datadoghq.ad.logs: '[{"source": "python", "service": "pyconid25-be"}]' healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8000/health"] interval: 30s