diff --git a/.env.example b/.env.example index 69717c7..b660ff4 100644 --- a/.env.example +++ b/.env.example @@ -57,3 +57,23 @@ STREAM_TOKEN_EXPIRE_MINUTES=15 FILE_STORAGE_PATH="./storage" MAX_FILE_SIZE_MB=5 + +# OpenTelemetry (optional observability) +OTEL_ENABLED=false +OTEL_SERVICE_NAME=pyconid25-be +# When using Datadog Agent Fleet, point to the agent's OTLP receiver: +# OTEL_EXPORTER_OTLP_ENDPOINT=http://dd-agent:4318 +# When using standalone OTel Collector: +# OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 +OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 +# OTEL_EXPORTER_OTLP_HEADERS= +# OTEL_LOG_LEVEL=INFO + +# Datadog +# Set to true to enable Datadog-specific log fields (dd.trace_id, dd.span_id, dd.env) +# When false, logs use generic OTel-compatible fields (trace_id, span_id in hex) +DD_ENABLED=false +# DD_API_KEY= +# DD_SITE=us5.datadoghq.com +# DD_ENV overrides ENVIRONTMENT for Datadog tagging (e.g. production, staging) +# DD_ENV=production diff --git a/core/log.py b/core/log.py index 60a0376..29020c1 100644 --- a/core/log.py +++ b/core/log.py @@ -1,3 +1,167 @@ +import json import logging +import logging.config +import os +import re +from datetime import datetime, timezone +from typing import Any + +from opentelemetry import trace + +_REDACTION_PATTERNS = ( + re.compile( + r"(?i)(authorization|x-callback-token|mux-signature)" + r"([\s:=]+)(bearer\s+)?([^\s,;}]+)" + ), + re.compile(r"(?i)bearer\s+[a-z0-9._~+/=-]+"), + re.compile( + r"(?i)(mayar_api_key|mayar_webhook_secret|mux_token_secret|" + r"mux_webhook_secret|mux_signing_key_private|secret_key)" + r"([\s:=]+)([^\s,;}]+)" + ), + re.compile( + r"(?i)(payment_link|link|redirect_url|redirecturl|url)" + r"([\s:=]+)(https?://[^\s,;}]+)" + ), + re.compile(r"(?i)(transactionid|transaction_id)([\s:=]+)([^\s,;}]+)"), + re.compile(r"(?i)(email)([\s:=]+)([^\s,;}]+@[^\s,;}]+)"), + re.compile(r"(?i)(phone|mobile)([\s:=]+)(\+?[0-9][0-9\s().-]{5,})"), + re.compile(r"eyJ[a-zA-Z0-9_-]+\.[a-zA-Z0-9_-]+\.[a-zA-Z0-9_-]+"), +) + + +def _redact_sensitive(value: str) -> str: + redacted = value + for pattern in _REDACTION_PATTERNS: + redacted = pattern.sub(_replace_sensitive_match, redacted) + return redacted + + +def _replace_sensitive_match(match: re.Match[str]) -> str: + if match.re.pattern.startswith("(?i)bearer"): + return "Bearer [REDACTED]" + if match.re.pattern.startswith("eyJ"): + return "[REDACTED]" + if match.lastindex and match.lastindex >= 2: + return f"{match.group(1)}{match.group(2)}[REDACTED]" + return "[REDACTED]" + + +class JsonFormatter(logging.Formatter): + """JSON log formatter with optional Datadog-compatible field names. + + When ``DD_ENABLED=true``, outputs ``dd.trace_id`` and ``dd.span_id`` + as **decimal** strings (lower 64-bit of the 128-bit OTel trace-id) + so that Datadog Agent can correlate logs ↔ APM traces automatically. + + When ``DD_ENABLED=false`` (default), outputs standard ``trace_id`` + and ``span_id`` in hex format for generic OTel-compatible backends. + """ + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self._dd_enabled = os.environ.get("DD_ENABLED", "false").lower() in { + "1", + "true", + "yes", + "on", + } + + @staticmethod + def _hex_to_dd_trace_id(hex_trace_id: str) -> str: + """Convert a 128-bit hex trace-id to Datadog's 64-bit decimal.""" + try: + return str(int(hex_trace_id[-16:], 16)) + except (ValueError, IndexError): + return "0" + + @staticmethod + def _hex_to_dd_span_id(hex_span_id: str) -> str: + """Convert a hex span-id to Datadog's decimal format.""" + try: + return str(int(hex_span_id, 16)) + except (ValueError, IndexError): + return "0" + + def format(self, record: logging.LogRecord) -> str: + hex_trace_id = getattr(record, "requestTraceID", None) or getattr( + record, "otelTraceID", "0" + ) + hex_span_id = getattr(record, "requestSpanID", None) or getattr( + record, "otelSpanID", "0" + ) + + if hex_trace_id == "0": + span_context = trace.get_current_span().get_span_context() + if span_context.is_valid: + hex_trace_id = format(span_context.trace_id, "032x") + hex_span_id = format(span_context.span_id, "016x") + + payload: dict[str, Any] = { + "timestamp": datetime.fromtimestamp( + record.created, timezone.utc + ).isoformat(), + "status": record.levelname, + "message": _redact_sensitive(record.getMessage()), + "logger.name": record.name, + "service": getattr( + record, + "otelServiceName", + os.environ.get("OTEL_SERVICE_NAME", "pyconid25-be"), + ), + } + + if self._dd_enabled: + payload["dd.trace_id"] = self._hex_to_dd_trace_id(hex_trace_id) + payload["dd.span_id"] = self._hex_to_dd_span_id(hex_span_id) + payload["dd.env"] = os.environ.get( + "DD_ENV", os.environ.get("ENVIRONTMENT", "") + ) + else: + payload["trace_id"] = hex_trace_id + payload["span_id"] = hex_span_id + + if record.exc_info: + payload["exception"] = _redact_sensitive( + self.formatException(record.exc_info) + ) + return json.dumps(payload, default=str) + + +def configure_logging() -> None: + log_level = os.environ.get("OTEL_LOG_LEVEL", "INFO").upper() + logging.config.dictConfig( + { + "version": 1, + "disable_existing_loggers": False, + "formatters": {"json": {"()": JsonFormatter}}, + "handlers": { + "console": { + "class": "logging.StreamHandler", + "formatter": "json", + "level": log_level, + } + }, + "root": {"handlers": ["console"], "level": log_level}, + "loggers": { + "uvicorn": { + "handlers": ["console"], + "level": log_level, + "propagate": False, + }, + "uvicorn.error": { + "handlers": ["console"], + "level": log_level, + "propagate": False, + }, + "uvicorn.access": { + "handlers": ["console"], + "level": "WARNING", + "propagate": False, + }, + }, + } + ) + logger = logging.getLogger("uvicorn.error") diff --git a/core/mayar_service.py b/core/mayar_service.py index f0ddfaa..28a4736 100644 --- a/core/mayar_service.py +++ b/core/mayar_service.py @@ -90,17 +90,13 @@ async def create_payment( timeout=30.0, ) response.raise_for_status() - print(response.status_code) result = response.json() - print( - f"Mayar create payment response: status:{response.status_code} {result}" + logger.info( + f"Payment created successfully in Mayar with status {response.status_code}" ) - logger.info(f"Payment created successfully: {result}") return result except httpx.HTTPStatusError as e: - logger.error( - f"Mayar API returned error {e.response.status_code}: {e.response.text}" - ) + logger.error(f"Mayar API returned error {e.response.status_code}") logger.debug(f"Request URL: {e.request.url}") raise except httpx.RequestError as e: @@ -136,12 +132,12 @@ async def get_payment_status(self, payment_id: str) -> Dict[str, Any]: response.raise_for_status() result = response.json() - logger.info(f"Payment status retrieved: {result}") + logger.info( + f"Payment status retrieved from Mayar for payment {payment_id}" + ) return result except httpx.HTTPStatusError as e: - logger.error( - f"Mayar API returned error {e.response.status_code}: {e.response.text}" - ) + logger.error(f"Mayar API returned error {e.response.status_code}") logger.debug(f"Request URL: {e.request.url}") raise except httpx.RequestError as e: @@ -184,15 +180,11 @@ async def close_payment(self, payment_id: str) -> Dict[str, Any]: if result.get("messages") == "success": logger.info(f"Payment {payment_id} closed successfully on Mayar") else: - logger.warning( - f"Failed to close payment {payment_id} on Mayar: {result}" - ) + logger.warning(f"Failed to close payment {payment_id} on Mayar") return result except httpx.HTTPStatusError as e: - logger.error( - f"Mayar API returned error {e.response.status_code}: {e.response.text}" - ) + logger.error(f"Mayar API returned error {e.response.status_code}") logger.debug(f"Request URL: {e.request.url}") raise except httpx.RequestError as e: diff --git a/core/otel_metrics.py b/core/otel_metrics.py new file mode 100644 index 0000000..17f2120 --- /dev/null +++ b/core/otel_metrics.py @@ -0,0 +1,24 @@ +from core.telemetry import get_meter + +meter = get_meter("pyconid25-be") + +payment_created_counter = meter.create_counter( + "payment.created", + description="Total payments created", + unit="1", +) +payment_status_counter = meter.create_counter( + "payment.status_change", + description="Payment status transitions", + unit="1", +) +payment_webhook_counter = meter.create_counter( + "payment.webhook_received", + description="Payment webhooks received", + unit="1", +) +streaming_webhook_counter = meter.create_counter( + "streaming.webhook_received", + description="Streaming webhooks received", + unit="1", +) diff --git a/core/telemetry.py b/core/telemetry.py new file mode 100644 index 0000000..0dfd7e6 --- /dev/null +++ b/core/telemetry.py @@ -0,0 +1,169 @@ +import os +from typing import Any + +from opentelemetry import metrics, trace +from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader +from opentelemetry.sdk.resources import SERVICE_NAME, Resource +from opentelemetry.sdk.trace import ReadableSpan, TracerProvider +from opentelemetry.sdk.trace.export import ( + BatchSpanProcessor, + SpanExporter, + SpanProcessor, +) + +from core.log import configure_logging + +_initialized = False +_enabled = False + + +class PCISafeSpanProcessor(SpanProcessor): + SCRUB_KEYS = { + "http.request.body", + "http.response.body", + "http.request.header.authorization", + "http.request.header.x-callback-token", + "http.request.header.mux-signature", + "http.response.header.authorization", + "http.response.header.x-callback-token", + "http.response.header.mux-signature", + "payment_link", + "payment.link", + "mayar_api_key", + "net.peer.name", + "net.sock.peer.addr", + "db.statement", + } + SCRUB_PREFIXES = ( + "card", + "account", + "payment_link", + "mayar_api_key", + "customer_email", + "customer_phone", + "customer_mobile", + "customer_name", + "mayar_payment_id", + "mayar_transaction_id", + "mux_stream_id", + "mux_asset_id", + "mux_live_stream_id", + "mux_playback_id", + "exception_message", + "exception_stacktrace", + "user_email", + "user_phone", + "user_mobile", + ) + + def __init__(self, exporter: SpanExporter): + self._delegate = BatchSpanProcessor(exporter) + + def on_start(self, span, parent_context=None) -> None: + self._delegate.on_start(span, parent_context=parent_context) + + def on_end(self, span: ReadableSpan) -> None: + span._attributes = self._scrub_attributes(span.attributes) # noqa: SLF001 + self._delegate.on_end(span) + + def shutdown(self) -> None: + self._delegate.shutdown() + + def force_flush(self, timeout_millis: int = 30000) -> bool: + return self._delegate.force_flush(timeout_millis) + + def _scrub_attributes(self, attributes: Any) -> dict[str, Any]: + scrubbed = {} + for key, value in dict(attributes or {}).items(): + normalized_key = key.lower().replace(".", "_").replace("-", "_") + should_scrub = key in self.SCRUB_KEYS or any( + normalized_key.startswith(prefix) for prefix in self.SCRUB_PREFIXES + ) + scrubbed[key] = "[REDACTED]" if should_scrub else value + return scrubbed + + +def setup_telemetry() -> bool: + global _enabled, _initialized + if _initialized: + return _enabled + + configure_logging() + + if not _env_bool("OTEL_ENABLED", default=False): + _initialized = True + return False + + resource = Resource.create( + { + SERVICE_NAME: os.environ.get("OTEL_SERVICE_NAME", "pyconid25-be"), + "deployment.environment": os.environ.get("ENVIRONTMENT", "file"), + } + ) + _setup_traces(resource) + _setup_metrics(resource) + _setup_auto_instrumentation() + + _enabled = True + _initialized = True + return True + + +def get_tracer(name: str = "pyconid25-be"): + return trace.get_tracer(name) + + +def get_meter(name: str = "pyconid25-be"): + return metrics.get_meter(name) + + +def amount_bucket(amount: int | None) -> str: + if not amount or amount <= 0: + return "free" + if amount <= 100_000: + return "0-100k" + if amount <= 500_000: + return "100k-500k" + if amount <= 1_000_000: + return "500k-1m" + return "1m+" + + +def _setup_traces(resource: Resource) -> None: + provider = TracerProvider(resource=resource) + provider.add_span_processor(PCISafeSpanProcessor(OTLPSpanExporter())) + trace.set_tracer_provider(provider) + + +def _setup_metrics(resource: Resource) -> None: + reader = PeriodicExportingMetricReader( + OTLPMetricExporter(), + export_interval_millis=int( + os.environ.get("OTEL_METRIC_EXPORT_INTERVAL_MS", "30000") + ), + ) + metrics.set_meter_provider( + MeterProvider(resource=resource, metric_readers=[reader]) + ) + + +def _setup_auto_instrumentation() -> None: + from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor + from opentelemetry.instrumentation.logging import LoggingInstrumentor + from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor + + from models import engine + + SQLAlchemyInstrumentor().instrument(engine=engine) + HTTPXClientInstrumentor().instrument() + LoggingInstrumentor().instrument(set_logging_format=False) + + +def _env_bool(name: str, default: bool) -> bool: + value = os.environ.get(name) + if value is None: + return default + return value.lower() in {"1", "true", "yes", "on"} diff --git a/docker-compose.otel.yaml b/docker-compose.otel.yaml new file mode 100644 index 0000000..3733625 --- /dev/null +++ b/docker-compose.otel.yaml @@ -0,0 +1,24 @@ +services: + otel_collector: + image: otel/opentelemetry-collector-contrib:0.153.0 + container_name: otel_collector + restart: always + command: ["--config=/conf/otel-collector-config.yaml"] + env_file: + - ./.env + environment: + - DD_API_KEY=${DD_API_KEY} + - DD_SITE=${DD_SITE:-datadoghq.com} + volumes: + - ./otel-collector-config.yaml:/conf/otel-collector-config.yaml:ro + networks: + - ${NETWORK_NAME} + logging: + driver: "json-file" + options: + max-size: "10m" + max-file: "3" + +networks: + pycon_project: + external: ${USE_EXTERNAL_NETWORK} diff --git a/docker-compose.yaml b/docker-compose.yaml index 9033022..454d396 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -12,6 +12,8 @@ services: - ./storage/:/app/storage/ networks: - ${NETWORK_NAME} + labels: + com.datadoghq.ad.logs: '[{"source": "python", "service": "pyconid25-be"}]' healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8000/health"] interval: 30s @@ -26,4 +28,4 @@ services: networks: pycon_project: - external: ${USE_EXTERNAL_NETWORK} \ No newline at end of file + external: ${USE_EXTERNAL_NETWORK} diff --git a/main.py b/main.py index 18b5db5..6c1b3c1 100644 --- a/main.py +++ b/main.py @@ -1,26 +1,16 @@ +import time + from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse +from opentelemetry import trace +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from pydantic import ValidationError + from core.health_check import health_check from core.log import logger from core.rate_limiter.memory import InMemoryRateLimiter from core.rate_limiter.middleware import RateLimitMiddleware -from routes.auth import router as auth_router -from routes.user_profile import router as user_profile_router -from routes.locations import router as locations_router -from routes.ticket import router as ticket_router -from routes.room import router as room_router -from routes.schedule import router as schedule_router -from routes.speaker import router as speaker_router -from routes.payment import router as payment_router -from routes.streaming import router as streaming_router -from routes.voucher import router as voucher_router -from routes.speaker_type import router as speaker_type_router -from routes.organizer_type import router as organizer_type_router -from routes.organizer import router as organizer_router -from routes.schedule_type import router as schedule_type_router -from routes.volunteer import router as volunteer_router from settings import ( RATE_LIMIT_ENABLED, @@ -29,10 +19,40 @@ RATE_LIMIT_WINDOW, ) +from core.telemetry import setup_telemetry + +otel_enabled = setup_telemetry() + health_check() app = FastAPI(title="PyconId 2025 BE") + +@app.middleware("http") +async def request_logging_middleware(request: Request, call_next): + start = time.perf_counter() + response = await call_next(request) + duration_ms = (time.perf_counter() - start) * 1000 + + span_context = trace.get_current_span().get_span_context() + log_extra = {} + if span_context.is_valid: + log_extra = { + "requestTraceID": format(span_context.trace_id, "032x"), + "requestSpanID": format(span_context.span_id, "016x"), + } + + logger.info( + "%s %s %d %.2fms", + request.method, + request.url.path, + response.status_code, + duration_ms, + extra=log_extra, + ) + return response + + app.add_middleware( CORSMiddleware, allow_origins=["*"], @@ -50,6 +70,25 @@ exclude_paths=RATE_LIMIT_EXCLUDED_PATHS, ) +if otel_enabled: + FastAPIInstrumentor.instrument_app(app) + +from routes.auth import router as auth_router # noqa: E402 +from routes.locations import router as locations_router # noqa: E402 +from routes.organizer import router as organizer_router # noqa: E402 +from routes.organizer_type import router as organizer_type_router # noqa: E402 +from routes.payment import router as payment_router # noqa: E402 +from routes.room import router as room_router # noqa: E402 +from routes.schedule import router as schedule_router # noqa: E402 +from routes.schedule_type import router as schedule_type_router # noqa: E402 +from routes.speaker import router as speaker_router # noqa: E402 +from routes.speaker_type import router as speaker_type_router # noqa: E402 +from routes.streaming import router as streaming_router # noqa: E402 +from routes.ticket import router as ticket_router # noqa: E402 +from routes.user_profile import router as user_profile_router # noqa: E402 +from routes.volunteer import router as volunteer_router # noqa: E402 +from routes.voucher import router as voucher_router # noqa: E402 + app.include_router(auth_router) app.include_router(user_profile_router) app.include_router(locations_router) diff --git a/otel-collector-config.yaml b/otel-collector-config.yaml new file mode 100644 index 0000000..01ea750 --- /dev/null +++ b/otel-collector-config.yaml @@ -0,0 +1,39 @@ +extensions: + health_check: + +receivers: + otlp: + protocols: + http: + endpoint: 0.0.0.0:4318 + +processors: + memory_limiter: + check_interval: 1s + limit_percentage: 80 + spike_limit_percentage: 25 + batch: + send_batch_max_size: 100 + send_batch_size: 10 + timeout: 10s + +connectors: + datadog/connector: + +exporters: + datadog/exporter: + api: + site: ${env:DD_SITE:-datadoghq.com} + key: ${env:DD_API_KEY} + +service: + extensions: [health_check] + pipelines: + traces: + receivers: [otlp] + processors: [memory_limiter, batch] + exporters: [datadog/connector, datadog/exporter] + metrics: + receivers: [otlp, datadog/connector] + processors: [memory_limiter, batch] + exporters: [datadog/exporter] diff --git a/requirements.txt b/requirements.txt index cf75885..f132229 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ aiosmtplib==3.0.2 alembic==1.15.1 annotated-types==0.7.0 anyio==4.8.0 +asgiref==3.11.1 Authlib==1.6.0 bcrypt==4.3.0 blinker==1.9.0 @@ -25,12 +26,26 @@ httpx==0.28.1 idna==3.10 iniconfig==2.0.0 itsdangerous==2.2.0 +googleapis-common-protos==1.75.0 Jinja2==3.1.6 Mako==1.3.9 markdown-it-py==3.0.0 MarkupSafe==3.0.2 mdurl==0.1.2 mux-python==5.1.0 +opentelemetry-api==1.42.1 +opentelemetry-exporter-otlp-proto-common==1.42.1 +opentelemetry-exporter-otlp-proto-http==1.42.1 +opentelemetry-instrumentation==0.63b1 +opentelemetry-instrumentation-asgi==0.63b1 +opentelemetry-instrumentation-fastapi==0.63b1 +opentelemetry-instrumentation-httpx==0.63b1 +opentelemetry-instrumentation-logging==0.63b1 +opentelemetry-instrumentation-sqlalchemy==0.63b1 +opentelemetry-proto==1.42.1 +opentelemetry-sdk==1.42.1 +opentelemetry-semantic-conventions==0.63b1 +opentelemetry-util-http==0.63b1 mypy==1.15.0 mypy-extensions==1.0.0 numpy==2.3.3 @@ -44,6 +59,7 @@ pycparser==2.22 pydantic==2.10.6 pydantic-settings==2.11.0 pydantic_core==2.27.2 +protobuf==6.33.6 Pygments==2.19.1 PyJWT==2.10.1 pytest==8.3.5 @@ -73,3 +89,4 @@ uvicorn==0.34.0 uvloop==0.21.0 watchfiles==1.0.4 websockets==15.0.1 +wrapt==2.2.1 diff --git a/routes/payment.py b/routes/payment.py index 2b2a3c8..d109859 100644 --- a/routes/payment.py +++ b/routes/payment.py @@ -2,10 +2,17 @@ from typing import Optional from fastapi import APIRouter, Depends, Header +from opentelemetry import trace from sqlalchemy.orm import Session from core.log import logger from core.mayar_service import MayarService +from core.otel_metrics import ( + payment_created_counter, + payment_status_counter, + payment_webhook_counter, +) +from core.telemetry import amount_bucket, get_tracer from core.responses import ( BadRequest, Forbidden, @@ -61,6 +68,8 @@ router = APIRouter(prefix="/payment", tags=["Payment"]) +tracer = get_tracer("payment") + async def close_unpaid_payments_with_mayar( db: Session, @@ -68,33 +77,35 @@ async def close_unpaid_payments_with_mayar( exclude_payment_id: str, mayar_service: MayarService, ) -> int: - payments_to_close = paymentRepo.get_payments_by_user_id( - db=db, - user_id=user_id, - status=PaymentStatus.UNPAID, - exclude_payment_id=exclude_payment_id, - ) - - closed_count = 0 - for payment in payments_to_close: - if payment.mayar_id: - try: - await mayar_service.close_payment(payment_id=payment.mayar_id) - logger.info( - f"Closed payment {payment.id} in Mayar (mayar_id: {payment.mayar_id})" - ) - except Exception as e: - logger.error(f"Failed to close payment {payment.id} in Mayar: {e}") - - paymentRepo.update_payment( + with tracer.start_as_current_span("payment.close_unpaid_batch") as span: + payments_to_close = paymentRepo.get_payments_by_user_id( db=db, - payment=payment, - status=PaymentStatus.CLOSED, - is_commit=False, + user_id=user_id, + status=PaymentStatus.UNPAID, + exclude_payment_id=exclude_payment_id, ) - closed_count += 1 - return closed_count + closed_count = 0 + for payment in payments_to_close: + if payment.mayar_id: + try: + await mayar_service.close_payment(payment_id=payment.mayar_id) + logger.info( + f"Closed payment {payment.id} in Mayar (mayar_id: {payment.mayar_id})" + ) + except Exception as e: + logger.error(f"Failed to close payment {payment.id} in Mayar: {e}") + + paymentRepo.update_payment( + db=db, + payment=payment, + status=PaymentStatus.CLOSED, + is_commit=False, + ) + closed_count += 1 + + span.set_attribute("payment.close_count", closed_count) + return closed_count @router.get( @@ -181,6 +192,9 @@ async def create_payment( db: Session = Depends(get_db_sync), token: str = Depends(oauth2_scheme), ): + span = tracer.start_span("payment.create") + span_context = trace.use_span(span, end_on_exit=True) + span_context.__enter__() try: user = get_user_from_token(db=db, token=token) if user is None: @@ -251,6 +265,10 @@ async def create_payment( voucher_id=str(voucher.id) if voucher else None, is_commit=False, ) + span.set_attribute("payment.id", str(payment.id)) + span.set_attribute("ticket.id", str(ticket.id)) + span.set_attribute("payment.amount_bucket", amount_bucket(amount)) + span.set_attribute("payment.has_voucher", voucher is not None) if amount <= 0: user.participant_type = ( @@ -265,6 +283,14 @@ async def create_payment( db.commit() db.refresh(payment) + span.set_attribute("payment.status", str(payment.status)) + payment_created_counter.add( + 1, + { + "ticket.name": ticket.name, + "has_voucher": str(voucher is not None), + }, + ) return common_response( Ok( data=CreatePaymentResponse( @@ -298,14 +324,15 @@ async def create_payment( f"{user.first_name or ''} {user.last_name or ''}".strip() ) - mayar_response = await mayar_service.create_payment( - ticket=ticket, - customer_email=user.email, - customer_name=customer_name, - customer_phone=user.phone, - tx_internal_id=str(payment.id), - voucher=voucher, - ) + with tracer.start_as_current_span("mayar.create_payment"): + mayar_response = await mayar_service.create_payment( + ticket=ticket, + customer_email=user.email, + customer_name=customer_name, + customer_phone=user.phone, + tx_internal_id=str(payment.id), + voucher=voucher, + ) data = mayar_response.get("data", {}) payment_link = data.get("link", "") @@ -331,6 +358,14 @@ async def create_payment( db.commit() db.refresh(payment) + span.set_attribute("payment.status", str(payment.status)) + payment_created_counter.add( + 1, + { + "ticket.name": ticket.name, + "has_voucher": str(voucher is not None), + }, + ) return common_response( Ok( data=CreatePaymentResponse( @@ -359,6 +394,8 @@ async def create_payment( traceback.print_exc() logger.error(f"Error in create_payment: {e}") return common_response(InternalServerError(error="Internal Server Error")) + finally: + span_context.__exit__(None, None, None) @router.get( @@ -478,9 +515,10 @@ async def get_payment_detail( mayar_service = MayarService(api_key=MAYAR_API_KEY, base_url=MAYAR_BASE_URL) try: if payment.mayar_id and payment.status == PaymentStatus.UNPAID: - mayar_status_response = await mayar_service.get_payment_status( - payment_id=payment.mayar_id - ) + with tracer.start_as_current_span("mayar.get_payment_status"): + mayar_status_response = await mayar_service.get_payment_status( + payment_id=payment.mayar_id + ) data = mayar_status_response.get("data", {}) transaction_status = data.get("status", "").lower() @@ -603,69 +641,85 @@ async def payment_webhook( event = request.get("event") data = request.get("data", {}) if event == "payment.received" and data: - mayar_id = data.get("id") - mayar_transaction_id = data.get("transactionId") - transaction_status = data.get("status", "").lower() + with tracer.start_as_current_span("payment.webhook") as webhook_span: + payment_webhook_counter.add(1, {"event": event}) + webhook_span.set_attribute("webhook.event", event) + mayar_id = data.get("id") + mayar_transaction_id = data.get("transactionId") + transaction_status = data.get("status", "").lower() - if not mayar_id and not mayar_transaction_id: - return common_response( - BadRequest(message="id or transactionId is required") - ) + if not mayar_id and not mayar_transaction_id: + return common_response( + BadRequest(message="id or transactionId is required") + ) - status_mapping = {"success": PaymentStatus.PAID} + status_mapping = {"success": PaymentStatus.PAID} - status = status_mapping.get(transaction_status, PaymentStatus.UNPAID) + status = status_mapping.get(transaction_status, PaymentStatus.UNPAID) - payment = None - if mayar_transaction_id: - payment = paymentRepo.get_payment_by_mayar_transaction_id( - db=db, mayar_transaction_id=mayar_transaction_id - ) + payment = None + if mayar_transaction_id: + payment = paymentRepo.get_payment_by_mayar_transaction_id( + db=db, mayar_transaction_id=mayar_transaction_id + ) - if not payment and mayar_id: - payment = paymentRepo.get_payment_by_mayar_id(db=db, mayar_id=mayar_id) + if not payment and mayar_id: + payment = paymentRepo.get_payment_by_mayar_id( + db=db, mayar_id=mayar_id + ) - if not payment: - logger.warning( - f"Payment not found for mayar_id: {mayar_id}, transactionId: {mayar_transaction_id}" - ) - return common_response(BadRequest(message="Payment not found")) + if not payment: + logger.warning( + f"Payment not found for mayar_id: {mayar_id}, transactionId: {mayar_transaction_id}" + ) + return common_response(BadRequest(message="Payment not found")) - paymentRepo.update_payment( - db=db, - payment=payment, - status=status, - mayar_id=mayar_id, - mayar_transaction_id=mayar_transaction_id, - is_commit=False, - ) - user: UserModel = payment.user - ticket: TicketModel = payment.ticket - voucher: VoucherModel = payment.voucher - - if status == PaymentStatus.PAID: - if voucher and voucher.type: - user.participant_type = voucher.type - else: - user.participant_type = ticket.user_participant_type - db.add(user) - - mayar_service = MayarService( - api_key=MAYAR_API_KEY, base_url=MAYAR_BASE_URL - ) - closed_count = await close_unpaid_payments_with_mayar( + status_before = str(payment.status) + webhook_span.set_attribute("payment.id", str(payment.id)) + webhook_span.set_attribute("payment.status_before", status_before) + + paymentRepo.update_payment( db=db, - user_id=str(user.id), - exclude_payment_id=str(payment.id), - mayar_service=mayar_service, + payment=payment, + status=status, + mayar_id=mayar_id, + mayar_transaction_id=mayar_transaction_id, + is_commit=False, ) - if closed_count > 0: - logger.info( - f"Closed {closed_count} other unpaid payment(s) for user {user.id}" + user: UserModel = payment.user + ticket: TicketModel = payment.ticket + voucher: VoucherModel = payment.voucher + + if status == PaymentStatus.PAID: + if voucher and voucher.type: + user.participant_type = voucher.type + else: + user.participant_type = ticket.user_participant_type + db.add(user) + + mayar_service = MayarService( + api_key=MAYAR_API_KEY, base_url=MAYAR_BASE_URL + ) + closed_count = await close_unpaid_payments_with_mayar( + db=db, + user_id=str(user.id), + exclude_payment_id=str(payment.id), + mayar_service=mayar_service, ) + if closed_count > 0: + logger.info( + f"Closed {closed_count} other unpaid payment(s) for user {user.id}" + ) - db.commit() - logger.info(f"Payment {payment.id} updated to status {status} via webhook") + db.commit() + webhook_span.set_attribute("payment.status_after", str(status)) + payment_status_counter.add( + 1, + {"from_status": status_before, "to_status": str(status)}, + ) + logger.info( + f"Payment {payment.id} updated to status {status} via webhook" + ) return common_response(Ok(data={"message": "Webhook processed successfully"})) except Exception as e: diff --git a/routes/streaming.py b/routes/streaming.py index c424ae2..e8a7083 100644 --- a/routes/streaming.py +++ b/routes/streaming.py @@ -4,10 +4,13 @@ from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Request +from opentelemetry import trace from pytz import timezone from sqlalchemy.orm import Session from core.log import logger +from core.otel_metrics import streaming_webhook_counter +from core.telemetry import get_tracer from core.mux_service import mux_service from core.responses import ( BadRequest, @@ -138,8 +141,12 @@ async def get_stream_playback( return common_response(InternalServerError(error=str(e))) +tracer = get_tracer("streaming") + + @router.post("/webhook") async def mux_webhook(request: Request, db: Session = Depends(get_db_sync)): + stream_span = None try: payload = await request.body() signature = request.headers.get("Mux-Signature", "") @@ -148,9 +155,17 @@ async def mux_webhook(request: Request, db: Session = Depends(get_db_sync)): return common_response(Unauthorized(message="Invalid webhook signature")) webhook_data = json.loads(payload) - logger.info(f"Webhook Mux Data: {webhook_data}") event_type = webhook_data.get("type") data = webhook_data.get("data", {}) + logger.info(f"Mux webhook received: {event_type or 'unknown'}") + + stream_id = data.get("id", "") + streaming_webhook_counter.add(1, {"event_type": event_type or "unknown"}) + stream_span = tracer.start_span("streaming.webhook") + stream_context = trace.use_span(stream_span, end_on_exit=True) + stream_context.__enter__() + stream_span.set_attribute("webhook.event_type", event_type or "unknown") + stream_span.set_attribute("mux.stream_id", stream_id) if event_type == "video.asset.ready": asset_id = data.get("id") @@ -237,3 +252,6 @@ async def mux_webhook(request: Request, db: Session = Depends(get_db_sync)): raise except Exception as e: return common_response(InternalServerError(error=str(e))) + finally: + if stream_span is not None: + stream_context.__exit__(None, None, None) diff --git a/settings.py b/settings.py index 4a43eb2..d2c9584 100644 --- a/settings.py +++ b/settings.py @@ -93,6 +93,13 @@ def str_to_bool(string: str) -> bool: os.environ.get("STREAM_TOKEN_EXPIRE_MINUTES", default="15") ) +# OpenTelemetry +OTEL_ENABLED = str_to_bool(os.environ.get("OTEL_ENABLED", "False")) +OTEL_SERVICE_NAME = os.environ.get("OTEL_SERVICE_NAME", "pyconid25-be") +OTEL_EXPORTER_OTLP_ENDPOINT = os.environ.get( + "OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4318" +) + # File upload FILE_STORAGE_PATH = os.environ.get("FILE_STORAGE_PATH", "./storage") MAX_FILE_SIZE_MB = int(os.environ.get("MAX_FILE_SIZE_MB", "5"))