diff --git a/.ai/REST_API.md b/.ai/REST_API.md index f89a0190..6324b522 100644 --- a/.ai/REST_API.md +++ b/.ai/REST_API.md @@ -1571,4 +1571,18 @@ GET /api/v1/groups/{id}, POST /api/v1/groups/join 응답의 `data` 필드: - `deleted`: briefing_subscriptions row 삭제 (notification_hour NULL 또는 Pro 상실) - 에러: 401 (X-Scheduler-Secret 불일치) -*마지막 업데이트: 2026-04-26* +### POST /api/v1/internal/live-activity/jobs/{job_id}/dispatch + +- 설명: Cloud Tasks가 예약 시각에 호출하는 Live Activity job dispatch. 지정된 `job_id` 하나만 claim 후 처리 +- 인증: `X-Scheduler-Secret` 헤더 (SCHEDULER_SECRET 환경변수와 대조). 불일치 시 401 +- 요청 바디: 없음 +- 응답 200: + ```json + { + "processed": true + } + ``` + - `processed`: 해당 job을 claim/처리했으면 true. 이미 처리됐거나 due가 아니면 false +- 에러: 401 (X-Scheduler-Secret 불일치), 500 (SCHEDULER_SECRET 미설정 또는 처리 실패) + +*마지막 업데이트: 2026-05-21* diff --git a/.github/workflows/deploy-rust.yml b/.github/workflows/deploy-rust.yml index 806b731b..281884bf 100644 --- a/.github/workflows/deploy-rust.yml +++ b/.github/workflows/deploy-rust.yml @@ -25,6 +25,7 @@ jobs: apns_environment: ${{ steps.env.outputs.apns_environment }} apns_bundle_id: ${{ steps.env.outputs.apns_bundle_id }} gcs_bucket: ${{ steps.env.outputs.gcs_bucket }} + rust_api_url: ${{ steps.env.outputs.rust_api_url }} env_label: ${{ steps.env.outputs.env_label }} emoji: ${{ steps.env.outputs.emoji }} @@ -40,6 +41,7 @@ jobs: APNS_ENV="development" APNS_BUNDLE="com.promiso.dev" GCS_BUCKET="promiso-dev-media" + RUST_API_URL="https://promiso-api-7z5hrxvm5q-du.a.run.app" EMOJI="🟢" LABEL="Dev" ;; @@ -48,6 +50,7 @@ jobs: APNS_ENV="development" APNS_BUNDLE="com.promiso.stage" GCS_BUCKET="promiso-stage-media" + RUST_API_URL="https://promiso-api-2re4udjdkq-du.a.run.app" EMOJI="🟡" LABEL="Stage" ;; @@ -56,6 +59,7 @@ jobs: APNS_ENV="production" APNS_BUNDLE="com.promiso" GCS_BUCKET="promiso-prod-media" + RUST_API_URL="https://promiso-api-maen7botya-du.a.run.app" EMOJI="🚀" LABEL="Production" ;; @@ -71,6 +75,7 @@ jobs: echo "apns_environment=$APNS_ENV" echo "apns_bundle_id=$APNS_BUNDLE" echo "gcs_bucket=$GCS_BUCKET" + echo "rust_api_url=$RUST_API_URL" echo "env_label=$LABEL" echo "emoji=$EMOJI" } >> "$GITHUB_OUTPUT" @@ -184,7 +189,7 @@ jobs: --project ${{ needs.setup.outputs.project_id }} \ --region asia-northeast3 \ --allow-unauthenticated \ - --set-env-vars "FIREBASE_PROJECT_ID=${{ needs.setup.outputs.project_id }},RUST_LOG=info,APNS_KEY_ID=4CHBHPYY75,APNS_TEAM_ID=BAC795627G,APNS_BUNDLE_ID=${{ needs.setup.outputs.apns_bundle_id }},APNS_ENVIRONMENT=${{ needs.setup.outputs.apns_environment }},GCS_UPLOAD_BUCKET=${{ needs.setup.outputs.gcs_bucket }}" \ + --set-env-vars "FIREBASE_PROJECT_ID=${{ needs.setup.outputs.project_id }},RUST_LOG=info,APNS_KEY_ID=4CHBHPYY75,APNS_TEAM_ID=BAC795627G,APNS_BUNDLE_ID=${{ needs.setup.outputs.apns_bundle_id }},APNS_ENVIRONMENT=${{ needs.setup.outputs.apns_environment }},GCS_UPLOAD_BUCKET=${{ needs.setup.outputs.gcs_bucket }},LIVE_ACTIVITY_TASK_PROJECT_ID=${{ needs.setup.outputs.project_id }},LIVE_ACTIVITY_TASK_LOCATION=asia-northeast3,LIVE_ACTIVITY_TASK_QUEUE=live-activity-jobs,LIVE_ACTIVITY_TASK_TARGET_BASE_URL=${{ needs.setup.outputs.rust_api_url }}" \ --set-secrets "DATABASE_URL=DATABASE_URL:latest,DATABASE_POOL_URL=DATABASE_POOL_URL:latest,AUTH_JWT_SECRET=AUTH_JWT_SECRET:latest,FIREBASE_SERVICE_ACCOUNT_JSON=FIREBASE_SERVICE_ACCOUNT_JSON:latest,GEMINI_API_KEY=GEMINI_API_KEY:latest,WIDGET_JWT_SECRET=WIDGET_JWT_SECRET:latest,APNS_AUTH_KEY=APNS_AUTH_KEY:latest,KAKAO_REST_API_KEY=KAKAO_REST_API_KEY:latest,ODSAY_API_KEY=ODSAY_API_KEY:latest,SCHEDULER_SECRET=SCHEDULER_SECRET:latest,KMA_API_KEY=KMA_API_KEY:latest" \ --quiet diff --git a/infra/rust-backend/.env.example b/infra/rust-backend/.env.example index e288476d..19b5503d 100644 --- a/infra/rust-backend/.env.example +++ b/infra/rust-backend/.env.example @@ -40,5 +40,14 @@ APNS_AUTH_KEY_PATH=/absolute/path/to/AuthKey_ABC123XYZ.p8 # optional override; default is derived from FIREBASE_PROJECT_ID # APNS_BUNDLE_ID=com.promiso.dev WIDGET_JWT_SECRET=replace-with-widget-jwt-secret +SCHEDULER_SECRET=replace-with-scheduler-secret +# Live Activity Cloud Tasks dispatch +# optional override; default is derived from FIREBASE_PROJECT_ID +# LIVE_ACTIVITY_TASK_PROJECT_ID=promiso-dev +# optional override +# LIVE_ACTIVITY_TASK_LOCATION=asia-northeast3 +# optional override +# LIVE_ACTIVITY_TASK_QUEUE=live-activity-jobs +LIVE_ACTIVITY_TASK_TARGET_BASE_URL=https://promiso-api-xxxxx.a.run.app GEMINI_API_KEY=replace-with-gemini-api-key RUST_LOG=info diff --git a/infra/rust-backend/src/config/mod.rs b/infra/rust-backend/src/config/mod.rs index 5ccd7d42..e720c8be 100644 --- a/infra/rust-backend/src/config/mod.rs +++ b/infra/rust-backend/src/config/mod.rs @@ -20,6 +20,11 @@ pub struct Config { pub widget_jwt_secret: Option, pub odsay_api_key: Option, pub kakao_rest_api_key: Option, + pub scheduler_secret: Option, + pub live_activity_task_project_id: String, + pub live_activity_task_location: String, + pub live_activity_task_queue: String, + pub live_activity_task_target_base_url: Option, } impl Config { @@ -77,6 +82,17 @@ impl Config { widget_jwt_secret: std::env::var("WIDGET_JWT_SECRET").ok(), odsay_api_key: std::env::var("ODSAY_API_KEY").ok(), kakao_rest_api_key: std::env::var("KAKAO_REST_API_KEY").ok(), + scheduler_secret: std::env::var("SCHEDULER_SECRET").ok(), + live_activity_task_project_id: std::env::var("LIVE_ACTIVITY_TASK_PROJECT_ID") + .unwrap_or_else(|_| { + std::env::var("FIREBASE_PROJECT_ID").expect("FIREBASE_PROJECT_ID must be set") + }), + live_activity_task_location: std::env::var("LIVE_ACTIVITY_TASK_LOCATION") + .unwrap_or_else(|_| "asia-northeast3".to_string()), + live_activity_task_queue: std::env::var("LIVE_ACTIVITY_TASK_QUEUE") + .unwrap_or_else(|_| "live-activity-jobs".to_string()), + live_activity_task_target_base_url: std::env::var("LIVE_ACTIVITY_TASK_TARGET_BASE_URL") + .ok(), } } } diff --git a/infra/rust-backend/src/main.rs b/infra/rust-backend/src/main.rs index febef223..7b63e350 100644 --- a/infra/rust-backend/src/main.rs +++ b/infra/rust-backend/src/main.rs @@ -1,7 +1,5 @@ use promiso_backend::config::Config; -use promiso_backend::push::{build_live_activity_sender, build_push_sender}; use promiso_backend::routes; -use promiso_backend::services::live_activity_service; use tokio::net::TcpListener; use tracing_subscriber::EnvFilter; @@ -31,12 +29,6 @@ async fn main() { .await .expect("Failed to connect to database pool"); - let _live_activity_worker = live_activity_service::spawn_worker( - pool.clone(), - build_live_activity_sender(&config), - build_push_sender(&config), - ); - let app = routes::create_router(pool, &config); // [::]:port → IPv4 + IPv6 동시 바인딩 (iOS 시뮬레이터 호환) diff --git a/infra/rust-backend/src/routes/internal.rs b/infra/rust-backend/src/routes/internal.rs index 4ea86cfb..a9063d07 100644 --- a/infra/rust-backend/src/routes/internal.rs +++ b/infra/rust-backend/src/routes/internal.rs @@ -1,24 +1,32 @@ use std::sync::Arc; -use axum::extract::State; +use axum::extract::{Path, State}; use axum::http::HeaderMap; use axum::routing::post; use axum::{Extension, Json, Router}; use chrono::Utc; use serde::Serialize; use sqlx::PgPool; +use uuid::Uuid; use crate::errors::AppError; +use crate::models::live_activity::LiveActivitySender; use crate::models::notification::PushSender; use crate::services::briefing_scheduler_service::{ self, dispatch_due_briefings, verify_scheduler_secret, }; +use crate::services::live_activity_service::{self, LiveActivityJobScheduler}; pub fn router() -> Router { - Router::new().route( - "/api/v1/internal/briefing/dispatch", - post(dispatch_briefings_handler), - ) + Router::new() + .route( + "/api/v1/internal/briefing/dispatch", + post(dispatch_briefings_handler), + ) + .route( + "/api/v1/internal/live-activity/jobs/{job_id}/dispatch", + post(dispatch_live_activity_job_handler), + ) } #[derive(Debug, Serialize)] @@ -27,6 +35,12 @@ struct DispatchResponse { summary: briefing_scheduler_service::DispatchSummary, } +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +struct LiveActivityJobDispatchResponse { + processed: bool, +} + /// POST /api/v1/internal/briefing/dispatch /// /// 인증: X-Scheduler-Secret 헤더 vs SCHEDULER_SECRET 환경변수 @@ -62,3 +76,43 @@ async fn dispatch_briefings_handler( // 5. 응답 Ok(Json(DispatchResponse { summary })) } + +async fn dispatch_live_activity_job_handler( + State(pool): State, + Extension(push_sender): Extension>, + Extension(live_activity_sender): Extension>, + Extension(live_activity_job_scheduler): Extension>, + headers: HeaderMap, + Path(job_id): Path, +) -> Result, AppError> { + let provided = headers + .get("x-scheduler-secret") + .and_then(|v| v.to_str().ok()) + .unwrap_or(""); + + let expected = match std::env::var("SCHEDULER_SECRET") { + Ok(v) if !v.is_empty() => v, + _ => { + return Err(AppError::Internal( + "SCHEDULER_SECRET not configured".to_string(), + )) + } + }; + + if !verify_scheduler_secret(provided, &expected) { + return Err(AppError::Unauthorized("Invalid scheduler secret".into())); + } + + let result = live_activity_service::dispatch_live_activity_job_with_scheduler( + &pool, + live_activity_sender.as_ref(), + push_sender.as_ref(), + live_activity_job_scheduler.as_ref(), + job_id, + ) + .await?; + + Ok(Json(LiveActivityJobDispatchResponse { + processed: result.processed, + })) +} diff --git a/infra/rust-backend/src/routes/mod.rs b/infra/rust-backend/src/routes/mod.rs index aa2cdf33..ffaea010 100644 --- a/infra/rust-backend/src/routes/mod.rs +++ b/infra/rust-backend/src/routes/mod.rs @@ -28,6 +28,7 @@ use crate::middleware::auth::{require_auth, ServerAuth, WidgetAuth}; use crate::push::{build_live_activity_sender, build_push_sender}; use crate::routes::transportation::TransportationKeys; use crate::services::app_store_service::{RealAppStoreVerifier, SharedAppStoreVerifier}; +use crate::services::live_activity_service::build_live_activity_job_scheduler; use crate::services::provider_verifier::{RealProviderVerifier, SharedProviderVerifier}; use crate::services::storage_service::GcsUploadSigner; @@ -40,8 +41,10 @@ pub fn create_router(pool: PgPool, config: &Config) -> Router { let widget_auth = WidgetAuth::new(config.widget_jwt_secret.clone()); let push_sender = build_push_sender(config); let live_activity_sender = build_live_activity_sender(config); + let live_activity_job_scheduler = build_live_activity_job_scheduler(config); let public_push_sender = push_sender.clone(); let public_live_activity_sender = live_activity_sender.clone(); + let public_live_activity_job_scheduler = live_activity_job_scheduler.clone(); let app_store_verifier: SharedAppStoreVerifier = Arc::new(RealAppStoreVerifier::new(config)); let provider_verifier: SharedProviderVerifier = Arc::new(RealProviderVerifier::new(config)); let gcs_upload_signer = match GcsUploadSigner::from_config(config) { @@ -72,6 +75,7 @@ pub fn create_router(pool: PgPool, config: &Config) -> Router { .merge(emoji::router()) .layer(axum::Extension(app_store_verifier.clone())) .layer(axum::Extension(live_activity_sender.clone())) + .layer(axum::Extension(live_activity_job_scheduler.clone())) .layer(axum::Extension(gcs_upload_signer.clone())) .layer(axum::Extension(push_sender)) .layer(axum::Extension(transportation_keys)) @@ -82,10 +86,14 @@ pub fn create_router(pool: PgPool, config: &Config) -> Router { .merge(subscriptions::public_router()) .merge(widget::widget_snapshot_router()) .layer(axum::Extension(app_store_verifier)) - .layer(axum::Extension(public_live_activity_sender)) + .layer(axum::Extension(public_live_activity_sender.clone())) + .layer(axum::Extension(public_live_activity_job_scheduler.clone())) .layer(axum::Extension(public_push_sender.clone())); - let internal_routes = internal::router().layer(axum::Extension(public_push_sender)); + let internal_routes = internal::router() + .layer(axum::Extension(public_push_sender)) + .layer(axum::Extension(public_live_activity_sender)) + .layer(axum::Extension(public_live_activity_job_scheduler)); Router::new() .merge(health::router()) // /health — 인증 불필요 diff --git a/infra/rust-backend/src/routes/schedules.rs b/infra/rust-backend/src/routes/schedules.rs index 17548649..404a86f3 100644 --- a/infra/rust-backend/src/routes/schedules.rs +++ b/infra/rust-backend/src/routes/schedules.rs @@ -17,13 +17,17 @@ use crate::models::live_activity::{ use crate::models::notification::PushSender; use crate::models::schedule::*; use crate::response::ApiResponse; +use crate::services::live_activity_service::LiveActivityJobScheduler; use crate::services::{live_activity_service, schedule_service, vote_live_activity_service}; pub fn router() -> Router { let schedule_routes = Router::new() .route("/", post(create_schedule)) .route("/home", get(get_home_schedules)) - .route("/personal/active", get(get_personal_active_schedules_handler)) + .route( + "/personal/active", + get(get_personal_active_schedules_handler), + ) .route("/personal/past", get(get_personal_past_schedules)) .route("/calendar", get(get_calendar_schedules)) .route("/calendar-sync", get(get_calendar_sync)) @@ -75,11 +79,13 @@ async fn create_schedule( State(pool): State, Extension(claims): Extension, Extension(push_sender): Extension>, + Extension(live_activity_job_scheduler): Extension>, Json(req): Json, ) -> Result, AppError> { - let result = schedule_service::create_schedule_with_push_sender( + let result = schedule_service::create_schedule_with_push_sender_and_live_activity_scheduler( &pool, push_sender.as_ref(), + live_activity_job_scheduler.as_ref(), &claims.uid, req, ) @@ -101,13 +107,15 @@ async fn update_schedule( Extension(claims): Extension, Extension(push_sender): Extension>, Extension(live_activity_sender): Extension>, + Extension(live_activity_job_scheduler): Extension>, Path(id): Path, Json(req): Json, ) -> Result, AppError> { let should_end_vote_activity = req.start_at.is_some(); - schedule_service::update_schedule_with_push_sender( + schedule_service::update_schedule_with_push_sender_and_live_activity_scheduler( &pool, push_sender.as_ref(), + live_activity_job_scheduler.as_ref(), &claims.uid, id, req, @@ -162,12 +170,14 @@ async fn respond_schedule( Extension(claims): Extension, Extension(push_sender): Extension>, Extension(live_activity_sender): Extension>, + Extension(live_activity_job_scheduler): Extension>, Path(id): Path, Json(req): Json, ) -> Result, AppError> { - let result = schedule_service::respond_schedule_with_push_sender( + let result = schedule_service::respond_schedule_with_push_sender_and_live_activity_scheduler( &pool, push_sender.as_ref(), + live_activity_job_scheduler.as_ref(), &claims.uid, id, req, @@ -195,11 +205,13 @@ async fn start_live_activity( State(pool): State, Extension(claims): Extension, Extension(live_activity_sender): Extension>, + Extension(live_activity_job_scheduler): Extension>, Path(id): Path, ) -> Result, AppError> { - let result = live_activity_service::start_schedule_live_activity( + let result = live_activity_service::start_schedule_live_activity_with_scheduler( &pool, live_activity_sender.as_ref(), + live_activity_job_scheduler.as_ref(), id, &claims.uid, ) @@ -212,12 +224,14 @@ async fn start_vote_live_activity( Extension(claims): Extension, Extension(push_sender): Extension>, Extension(live_activity_sender): Extension>, + Extension(live_activity_job_scheduler): Extension>, Path(id): Path, ) -> Result, AppError> { - let result = vote_live_activity_service::start_vote_live_activity( + let result = vote_live_activity_service::start_vote_live_activity_with_scheduler( &pool, live_activity_sender.as_ref(), Some(push_sender.as_ref()), + live_activity_job_scheduler.as_ref(), id, &claims.uid, ) @@ -245,12 +259,14 @@ async fn update_live_activity_eta( State(pool): State, Extension(claims): Extension, Extension(live_activity_sender): Extension>, + Extension(live_activity_job_scheduler): Extension>, Path(id): Path, Json(req): Json, ) -> Result, AppError> { - let result = live_activity_service::update_schedule_live_activity( + let result = live_activity_service::update_schedule_live_activity_with_scheduler( &pool, live_activity_sender.as_ref(), + live_activity_job_scheduler.as_ref(), id, &claims.uid, req, @@ -264,6 +280,7 @@ async fn widget_update_live_activity_eta( Extension(server_auth): Extension, Extension(widget_auth): Extension, Extension(live_activity_sender): Extension>, + Extension(live_activity_job_scheduler): Extension>, headers: HeaderMap, Json(req): Json, ) -> Result, AppError> { @@ -284,21 +301,17 @@ async fn widget_update_live_activity_eta( .and_then(|value| value.to_str().ok()) .filter(|value| !value.trim().is_empty()); - let claims = verify_widget_or_server_token( - &server_auth, - &widget_auth, - &pool, - auth_token, - device_id, - ) - .await?; + let claims = + verify_widget_or_server_token(&server_auth, &widget_auth, &pool, auth_token, device_id) + .await?; if claims.uid != user_id { return Err(AppError::Unauthorized("Token uid mismatch".to_string())); } - let result = live_activity_service::update_schedule_live_activity_from_widget( + let result = live_activity_service::update_schedule_live_activity_from_widget_with_scheduler( &pool, live_activity_sender.as_ref(), + live_activity_job_scheduler.as_ref(), req.schedule_id, user_id, UpdateScheduleLiveActivityRequest { @@ -317,6 +330,7 @@ async fn widget_vote_live_activity( Extension(widget_auth): Extension, Extension(push_sender): Extension>, Extension(live_activity_sender): Extension>, + Extension(live_activity_job_scheduler): Extension>, headers: HeaderMap, Json(req): Json, ) -> Result, AppError> { @@ -337,14 +351,9 @@ async fn widget_vote_live_activity( .and_then(|value| value.to_str().ok()) .filter(|value| !value.trim().is_empty()); - let claims = verify_widget_or_server_token( - &server_auth, - &widget_auth, - &pool, - auth_token, - device_id, - ) - .await?; + let claims = + verify_widget_or_server_token(&server_auth, &widget_auth, &pool, auth_token, device_id) + .await?; if claims.uid != user_id { return Err(AppError::Unauthorized("Token uid mismatch".to_string())); } @@ -355,9 +364,10 @@ async fn widget_vote_live_activity( )); } - schedule_service::respond_schedule_with_push_sender( + schedule_service::respond_schedule_with_push_sender_and_live_activity_scheduler( &pool, push_sender.as_ref(), + live_activity_job_scheduler.as_ref(), user_id, req.schedule_id, RespondScheduleRequest { diff --git a/infra/rust-backend/src/services/live_activity_service.rs b/infra/rust-backend/src/services/live_activity_service.rs index 10d9793e..4ded5201 100644 --- a/infra/rust-backend/src/services/live_activity_service.rs +++ b/infra/rust-backend/src/services/live_activity_service.rs @@ -1,10 +1,13 @@ use std::sync::Arc; -use chrono::{Duration, Utc}; +use base64::Engine; +use chrono::{Duration, SecondsFormat, Utc}; +use serde::Deserialize; use serde_json::json; use sqlx::PgPool; use uuid::Uuid; +use crate::config::Config; use crate::errors::AppError; use crate::models::live_activity::{ LiveActivityJob, LiveActivityJobPayload, LiveActivityJobStatus, LiveActivityJobType, @@ -21,6 +24,8 @@ const JOB_BATCH_SIZE: i64 = 10; const JOB_LOCK_SECONDS: i64 = 60; const JOB_MAX_ATTEMPTS: i32 = 3; const WORKER_POLL_INTERVAL_SECONDS: u64 = 5; +const METADATA_TOKEN_URL: &str = + "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token"; #[derive(Debug, Clone, sqlx::FromRow)] struct GroupSummary { @@ -40,7 +45,214 @@ struct PushToStartTarget { push_to_start_token: String, } +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ScheduledLiveActivityJob { + pub id: Uuid, + pub schedule_id: Uuid, + pub job_type: LiveActivityJobType, + pub scheduled_at: chrono::DateTime, +} + +#[async_trait::async_trait] +pub trait LiveActivityJobScheduler: Send + Sync { + async fn enqueue_live_activity_job( + &self, + job: ScheduledLiveActivityJob, + ) -> Result<(), AppError>; +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct LiveActivityJobDispatchResult { + pub processed: bool, +} + +#[derive(Clone)] +pub struct CloudTasksLiveActivityJobScheduler { + client: reqwest::Client, + project_id: String, + location: String, + queue: String, + target_base_url: String, + scheduler_secret: String, +} + +#[derive(Clone)] +pub struct MisconfiguredLiveActivityJobScheduler { + reason: String, +} + +#[derive(Debug, Deserialize)] +struct MetadataTokenResponse { + access_token: String, +} + +impl CloudTasksLiveActivityJobScheduler { + pub fn new( + project_id: String, + location: String, + queue: String, + target_base_url: String, + scheduler_secret: String, + ) -> Self { + Self { + client: reqwest::Client::new(), + project_id, + location, + queue, + target_base_url: target_base_url.trim_end_matches('/').to_string(), + scheduler_secret, + } + } + + async fn access_token(&self) -> Result { + let response = self + .client + .get(METADATA_TOKEN_URL) + .header("Metadata-Flavor", "Google") + .send() + .await + .map_err(|error| { + AppError::Internal(format!("Failed to request metadata access token: {error}")) + })?; + + if !response.status().is_success() { + return Err(AppError::Internal(format!( + "Metadata access token request failed with status {}", + response.status() + ))); + } + + response + .json::() + .await + .map(|token| token.access_token) + .map_err(|error| { + AppError::Internal(format!("Failed to decode metadata access token: {error}")) + }) + } + + fn queue_endpoint(&self) -> String { + format!( + "https://cloudtasks.googleapis.com/v2/projects/{}/locations/{}/queues/{}/tasks", + self.project_id, self.location, self.queue + ) + } + + fn target_url(&self, job_id: Uuid) -> String { + format!( + "{}/api/v1/internal/live-activity/jobs/{}/dispatch", + self.target_base_url, job_id + ) + } +} + +#[async_trait::async_trait] +impl LiveActivityJobScheduler for CloudTasksLiveActivityJobScheduler { + async fn enqueue_live_activity_job( + &self, + job: ScheduledLiveActivityJob, + ) -> Result<(), AppError> { + let token = self.access_token().await?; + let schedule_time = job.scheduled_at.to_rfc3339_opts(SecondsFormat::Secs, true); + let body = json!({ + "task": { + "scheduleTime": schedule_time, + "httpRequest": { + "httpMethod": "POST", + "url": self.target_url(job.id), + "headers": { + "Content-Type": "application/json", + "X-Scheduler-Secret": self.scheduler_secret, + }, + "body": base64::engine::general_purpose::STANDARD.encode( + format!(r#"{{"jobId":"{}"}}"#, job.id) + ), + } + } + }); + + let response = self + .client + .post(self.queue_endpoint()) + .bearer_auth(token) + .json(&body) + .send() + .await + .map_err(|error| { + AppError::Internal(format!( + "Failed to enqueue Live Activity Cloud Task: {error}" + )) + })?; + + if response.status().is_success() { + return Ok(()); + } + + let status = response.status(); + let text = response.text().await.unwrap_or_default(); + Err(AppError::Internal(format!( + "Live Activity Cloud Task enqueue failed with status {status}: {text}" + ))) + } +} + +impl MisconfiguredLiveActivityJobScheduler { + pub fn new(reason: impl Into) -> Self { + Self { + reason: reason.into(), + } + } +} + +#[async_trait::async_trait] +impl LiveActivityJobScheduler for MisconfiguredLiveActivityJobScheduler { + async fn enqueue_live_activity_job( + &self, + _job: ScheduledLiveActivityJob, + ) -> Result<(), AppError> { + Err(AppError::Internal(self.reason.clone())) + } +} + +pub fn build_live_activity_job_scheduler(config: &Config) -> Arc { + match ( + config.live_activity_task_target_base_url.clone(), + config.scheduler_secret.clone(), + ) { + (Some(target_base_url), Some(scheduler_secret)) + if !target_base_url.trim().is_empty() && !scheduler_secret.trim().is_empty() => + { + Arc::new(CloudTasksLiveActivityJobScheduler::new( + config.live_activity_task_project_id.clone(), + config.live_activity_task_location.clone(), + config.live_activity_task_queue.clone(), + target_base_url, + scheduler_secret, + )) + } + _ => Arc::new(MisconfiguredLiveActivityJobScheduler::new( + "Live Activity Cloud Tasks scheduler is not configured", + )), + } +} + +pub async fn sync_schedule_jobs_with_scheduler( + pool: &PgPool, + scheduler: &dyn LiveActivityJobScheduler, + schedule_id: Uuid, +) -> Result<(), AppError> { + sync_schedule_jobs_internal(pool, Some(scheduler), schedule_id).await +} + pub async fn sync_schedule_jobs(pool: &PgPool, schedule_id: Uuid) -> Result<(), AppError> { + sync_schedule_jobs_internal(pool, None, schedule_id).await +} + +async fn sync_schedule_jobs_internal( + pool: &PgPool, + scheduler: Option<&dyn LiveActivityJobScheduler>, + schedule_id: Uuid, +) -> Result<(), AppError> { let schedule = load_schedule(pool, schedule_id).await?; if schedule.schedule_type != ScheduleType::Group { @@ -85,8 +297,9 @@ pub async fn sync_schedule_jobs(pool: &PgPool, schedule_id: Uuid) -> Result<(), } let scheduled_at = schedule.start_at - Duration::minutes(tracking_minutes.unwrap() as i64); - replace_pending_job( + replace_pending_job_internal( pool, + scheduler, schedule_id, LiveActivityJobType::Start, scheduled_at, @@ -121,6 +334,25 @@ pub async fn start_schedule_live_activity( execute_start_job(pool, sender, &schedule).await } +pub async fn start_schedule_live_activity_with_scheduler( + pool: &PgPool, + sender: &dyn LiveActivitySender, + scheduler: &dyn LiveActivityJobScheduler, + schedule_id: Uuid, + user_id: &str, +) -> Result { + let schedule = load_schedule(pool, schedule_id).await?; + ensure_group_schedule(&schedule)?; + + if schedule.user_id != user_id { + return Err(AppError::Forbidden( + "호스트만 Live Activity를 시작할 수 있습니다".to_string(), + )); + } + + execute_start_job_internal(pool, sender, Some(scheduler), &schedule).await +} + pub async fn update_schedule_live_activity( pool: &PgPool, sender: &dyn LiveActivitySender, @@ -128,7 +360,19 @@ pub async fn update_schedule_live_activity( user_id: &str, req: UpdateScheduleLiveActivityRequest, ) -> Result { - update_schedule_live_activity_internal(pool, sender, schedule_id, user_id, req).await + update_schedule_live_activity_internal(pool, sender, None, schedule_id, user_id, req).await +} + +pub async fn update_schedule_live_activity_with_scheduler( + pool: &PgPool, + sender: &dyn LiveActivitySender, + scheduler: &dyn LiveActivityJobScheduler, + schedule_id: Uuid, + user_id: &str, + req: UpdateScheduleLiveActivityRequest, +) -> Result { + update_schedule_live_activity_internal(pool, sender, Some(scheduler), schedule_id, user_id, req) + .await } pub async fn update_schedule_live_activity_from_widget( @@ -138,7 +382,19 @@ pub async fn update_schedule_live_activity_from_widget( user_id: &str, req: UpdateScheduleLiveActivityRequest, ) -> Result { - update_schedule_live_activity_internal(pool, sender, schedule_id, user_id, req).await + update_schedule_live_activity_internal(pool, sender, None, schedule_id, user_id, req).await +} + +pub async fn update_schedule_live_activity_from_widget_with_scheduler( + pool: &PgPool, + sender: &dyn LiveActivitySender, + scheduler: &dyn LiveActivityJobScheduler, + schedule_id: Uuid, + user_id: &str, + req: UpdateScheduleLiveActivityRequest, +) -> Result { + update_schedule_live_activity_internal(pool, sender, Some(scheduler), schedule_id, user_id, req) + .await } pub fn spawn_worker( @@ -185,9 +441,85 @@ pub async fn process_due_jobs( Ok(()) } +pub async fn dispatch_live_activity_job( + pool: &PgPool, + live_activity_sender: &dyn LiveActivitySender, + push_sender: &dyn PushSender, + job_id: Uuid, +) -> Result { + dispatch_live_activity_job_internal(pool, live_activity_sender, push_sender, None, job_id).await +} + +pub async fn dispatch_live_activity_job_with_scheduler( + pool: &PgPool, + live_activity_sender: &dyn LiveActivitySender, + push_sender: &dyn PushSender, + scheduler: &dyn LiveActivityJobScheduler, + job_id: Uuid, +) -> Result { + dispatch_live_activity_job_internal( + pool, + live_activity_sender, + push_sender, + Some(scheduler), + job_id, + ) + .await +} + +async fn dispatch_live_activity_job_internal( + pool: &PgPool, + live_activity_sender: &dyn LiveActivitySender, + push_sender: &dyn PushSender, + scheduler: Option<&dyn LiveActivityJobScheduler>, + job_id: Uuid, +) -> Result { + let Some(job) = claim_job_by_id(pool, job_id).await? else { + return Ok(LiveActivityJobDispatchResult { processed: false }); + }; + + let result = match job.job_type { + LiveActivityJobType::Start => { + let schedule = load_schedule(pool, job.schedule_id).await?; + ensure_group_schedule(&schedule)?; + + if schedule.live_activity_started_at.is_some() + && schedule.live_activity_ended_at.is_none() + { + Ok(()) + } else { + let response = + execute_start_job_internal(pool, live_activity_sender, scheduler, &schedule) + .await?; + if response.success_count == 0 { + Err(AppError::Internal( + "No successful Live Activity push-to-start delivery".to_string(), + )) + } else { + Ok(()) + } + } + } + LiveActivityJobType::End => process_end_job(pool, live_activity_sender, &job).await, + LiveActivityJobType::Nudge => process_nudge_job(pool, push_sender, &job).await, + }; + + match result { + Ok(()) => { + mark_job_succeeded(pool, job.id).await?; + Ok(LiveActivityJobDispatchResult { processed: true }) + } + Err(error) => { + handle_job_failure(pool, &job, &error.to_string()).await?; + Err(error) + } + } +} + async fn update_schedule_live_activity_internal( pool: &PgPool, sender: &dyn LiveActivitySender, + scheduler: Option<&dyn LiveActivityJobScheduler>, schedule_id: Uuid, user_id: &str, req: UpdateScheduleLiveActivityRequest, @@ -288,8 +620,9 @@ async fn update_schedule_live_activity_internal( RuntimeEnvironment::Release => 5, }; - replace_pending_job( + replace_pending_job_internal( pool, + scheduler, schedule_id, LiveActivityJobType::End, Utc::now() + Duration::minutes(delay_minutes), @@ -299,7 +632,14 @@ async fn update_schedule_live_activity_internal( ) .await?; } else { - schedule_default_end_job(pool, schedule_id, schedule.start_at, &req.channel_id).await?; + schedule_default_end_job( + pool, + scheduler, + schedule_id, + schedule.start_at, + &req.channel_id, + ) + .await?; } Ok(UpdateScheduleLiveActivityResponse { @@ -427,6 +767,15 @@ async fn execute_start_job( pool: &PgPool, sender: &dyn LiveActivitySender, schedule: &Schedule, +) -> Result { + execute_start_job_internal(pool, sender, None, schedule).await +} + +async fn execute_start_job_internal( + pool: &PgPool, + sender: &dyn LiveActivitySender, + scheduler: Option<&dyn LiveActivityJobScheduler>, + schedule: &Schedule, ) -> Result { ensure_group_schedule(schedule)?; @@ -539,10 +888,11 @@ async fn execute_start_job( cancel_pending_jobs(pool, schedule.id, &[LiveActivityJobType::Start]).await?; - schedule_default_end_job(pool, schedule.id, schedule.start_at, &channel_id).await?; + schedule_default_end_job(pool, scheduler, schedule.id, schedule.start_at, &channel_id).await?; - replace_pending_job( + replace_pending_job_internal( pool, + scheduler, schedule.id, LiveActivityJobType::Nudge, Utc::now() + Duration::minutes((tracking_duration_minutes / 2) as i64), @@ -585,14 +935,35 @@ async fn claim_due_jobs(pool: &PgPool) -> Result, AppError> .map_err(Into::into) } +async fn claim_job_by_id(pool: &PgPool, job_id: Uuid) -> Result, AppError> { + sqlx::query_as::<_, LiveActivityJob>( + "UPDATE live_activity_jobs \ + SET locked_until = NOW() + ($2::TEXT || ' seconds')::INTERVAL, \ + attempts = attempts + 1, \ + updated_at = NOW() \ + WHERE id = $1 \ + AND status = 'pending'::live_activity_job_status \ + AND scheduled_at <= NOW() \ + AND (locked_until IS NULL OR locked_until < NOW()) \ + RETURNING *", + ) + .bind(job_id) + .bind(JOB_LOCK_SECONDS) + .fetch_optional(pool) + .await + .map_err(Into::into) +} + async fn schedule_default_end_job( pool: &PgPool, + scheduler: Option<&dyn LiveActivityJobScheduler>, schedule_id: Uuid, start_at: chrono::DateTime, channel_id: &str, ) -> Result<(), AppError> { - replace_pending_job( + replace_pending_job_internal( pool, + scheduler, schedule_id, LiveActivityJobType::End, start_at + Duration::minutes(DEFAULT_END_MINUTES_AFTER_START), @@ -640,8 +1011,9 @@ async fn handle_job_failure( Ok(()) } -async fn replace_pending_job( +async fn replace_pending_job_internal( pool: &PgPool, + scheduler: Option<&dyn LiveActivityJobScheduler>, schedule_id: Uuid, job_type: LiveActivityJobType, scheduled_at: chrono::DateTime, @@ -649,9 +1021,10 @@ async fn replace_pending_job( ) -> Result<(), AppError> { cancel_pending_jobs(pool, schedule_id, std::slice::from_ref(&job_type)).await?; - sqlx::query( + let job = sqlx::query_as::<_, LiveActivityJob>( "INSERT INTO live_activity_jobs (schedule_id, job_type, scheduled_at, payload) \ - VALUES ($1, $2, $3, $4)", + VALUES ($1, $2, $3, $4) \ + RETURNING *", ) .bind(schedule_id) .bind(job_type) @@ -666,9 +1039,40 @@ async fn replace_pending_job( )) })?, ) - .execute(pool) + .fetch_one(pool) .await?; + if let Some(scheduler) = scheduler { + let scheduled_job = ScheduledLiveActivityJob { + id: job.id, + schedule_id: job.schedule_id, + job_type: job.job_type, + scheduled_at: job.scheduled_at, + }; + + if let Err(error) = scheduler.enqueue_live_activity_job(scheduled_job).await { + mark_job_cancelled(pool, job.id, &error.to_string()).await?; + return Err(error); + } + } + + Ok(()) +} + +async fn mark_job_cancelled(pool: &PgPool, job_id: Uuid, reason: &str) -> Result<(), AppError> { + sqlx::query( + "UPDATE live_activity_jobs \ + SET status = 'cancelled'::live_activity_job_status, \ + locked_until = NULL, \ + last_error = $2, \ + updated_at = NOW() \ + WHERE id = $1 \ + AND status = 'pending'::live_activity_job_status", + ) + .bind(job_id) + .bind(reason) + .execute(pool) + .await?; Ok(()) } diff --git a/infra/rust-backend/src/services/schedule_service.rs b/infra/rust-backend/src/services/schedule_service.rs index 2d518e5f..2dbf4393 100644 --- a/infra/rust-backend/src/services/schedule_service.rs +++ b/infra/rust-backend/src/services/schedule_service.rs @@ -10,6 +10,7 @@ use crate::errors::AppError; use crate::models::notification::{FieldChange, PushSender, VoteInfo}; use crate::models::schedule::*; use crate::services::live_activity_service; +use crate::services::live_activity_service::LiveActivityJobScheduler; use crate::services::notification_service; // ============================================================ @@ -421,7 +422,7 @@ pub async fn create_schedule( user_id: &str, req: CreateScheduleRequest, ) -> Result { - create_schedule_impl(pool, None, user_id, req).await + create_schedule_impl(pool, None, None, user_id, req).await } pub async fn create_schedule_with_push_sender( @@ -430,12 +431,30 @@ pub async fn create_schedule_with_push_sender( user_id: &str, req: CreateScheduleRequest, ) -> Result { - create_schedule_impl(pool, Some(push_sender), user_id, req).await + create_schedule_impl(pool, Some(push_sender), None, user_id, req).await +} + +pub async fn create_schedule_with_push_sender_and_live_activity_scheduler( + pool: &PgPool, + push_sender: &dyn PushSender, + live_activity_job_scheduler: &dyn LiveActivityJobScheduler, + user_id: &str, + req: CreateScheduleRequest, +) -> Result { + create_schedule_impl( + pool, + Some(push_sender), + Some(live_activity_job_scheduler), + user_id, + req, + ) + .await } async fn create_schedule_impl( pool: &PgPool, push_sender: Option<&dyn PushSender>, + live_activity_job_scheduler: Option<&dyn LiveActivityJobScheduler>, user_id: &str, req: CreateScheduleRequest, ) -> Result { @@ -573,7 +592,16 @@ async fn create_schedule_impl( } } - if let Err(error) = live_activity_service::sync_schedule_jobs(pool, schedule.id).await { + if let Some(scheduler) = live_activity_job_scheduler { + live_activity_service::sync_schedule_jobs_with_scheduler( + pool, + scheduler, + schedule.id, + ) + .await?; + } else if let Err(error) = + live_activity_service::sync_schedule_jobs(pool, schedule.id).await + { tracing::error!( "Failed to sync live activity jobs after schedule create {}: {}", schedule.id, @@ -597,9 +625,7 @@ async fn create_schedule_impl( )); } - if req.minimum_participants.is_some() - || req.tracking_start_minutes_before.is_some() - { + if req.minimum_participants.is_some() || req.tracking_start_minutes_before.is_some() { return Err(AppError::BadRequest( "개인 일정에는 그룹 일정 전용 필드를 설정할 수 없습니다".to_string(), )); @@ -706,7 +732,7 @@ pub async fn update_schedule( schedule_id: Uuid, req: UpdateScheduleRequest, ) -> Result<(), AppError> { - update_schedule_impl(pool, None, user_id, schedule_id, req).await + update_schedule_impl(pool, None, None, user_id, schedule_id, req).await } pub async fn update_schedule_with_push_sender( @@ -716,12 +742,32 @@ pub async fn update_schedule_with_push_sender( schedule_id: Uuid, req: UpdateScheduleRequest, ) -> Result<(), AppError> { - update_schedule_impl(pool, Some(push_sender), user_id, schedule_id, req).await + update_schedule_impl(pool, Some(push_sender), None, user_id, schedule_id, req).await +} + +pub async fn update_schedule_with_push_sender_and_live_activity_scheduler( + pool: &PgPool, + push_sender: &dyn PushSender, + live_activity_job_scheduler: &dyn LiveActivityJobScheduler, + user_id: &str, + schedule_id: Uuid, + req: UpdateScheduleRequest, +) -> Result<(), AppError> { + update_schedule_impl( + pool, + Some(push_sender), + Some(live_activity_job_scheduler), + user_id, + schedule_id, + req, + ) + .await } async fn update_schedule_impl( pool: &PgPool, push_sender: Option<&dyn PushSender>, + live_activity_job_scheduler: Option<&dyn LiveActivityJobScheduler>, user_id: &str, schedule_id: Uuid, req: UpdateScheduleRequest, @@ -815,9 +861,7 @@ async fn update_schedule_impl( } } ScheduleType::Personal => { - if req.minimum_participants.is_some() - || req.tracking_start_minutes_before.is_some() - { + if req.minimum_participants.is_some() || req.tracking_start_minutes_before.is_some() { return Err(AppError::BadRequest( "개인 일정에는 그룹 일정 전용 필드를 설정할 수 없습니다".to_string(), )); @@ -1038,7 +1082,12 @@ async fn update_schedule_impl( } if schedule.schedule_type == ScheduleType::Group { - if let Err(error) = live_activity_service::sync_schedule_jobs(pool, schedule_id).await { + if let Some(scheduler) = live_activity_job_scheduler { + live_activity_service::sync_schedule_jobs_with_scheduler(pool, scheduler, schedule_id) + .await?; + } else if let Err(error) = + live_activity_service::sync_schedule_jobs(pool, schedule_id).await + { tracing::error!( "Failed to sync live activity jobs after schedule update {}: {}", schedule_id, @@ -1122,7 +1171,7 @@ pub async fn respond_schedule( schedule_id: Uuid, req: RespondScheduleRequest, ) -> Result { - respond_schedule_impl(pool, None, user_id, schedule_id, req).await + respond_schedule_impl(pool, None, None, user_id, schedule_id, req).await } pub async fn respond_schedule_with_push_sender( @@ -1132,12 +1181,32 @@ pub async fn respond_schedule_with_push_sender( schedule_id: Uuid, req: RespondScheduleRequest, ) -> Result { - respond_schedule_impl(pool, Some(push_sender), user_id, schedule_id, req).await + respond_schedule_impl(pool, Some(push_sender), None, user_id, schedule_id, req).await +} + +pub async fn respond_schedule_with_push_sender_and_live_activity_scheduler( + pool: &PgPool, + push_sender: &dyn PushSender, + live_activity_job_scheduler: &dyn LiveActivityJobScheduler, + user_id: &str, + schedule_id: Uuid, + req: RespondScheduleRequest, +) -> Result { + respond_schedule_impl( + pool, + Some(push_sender), + Some(live_activity_job_scheduler), + user_id, + schedule_id, + req, + ) + .await } async fn respond_schedule_impl( pool: &PgPool, push_sender: Option<&dyn PushSender>, + live_activity_job_scheduler: Option<&dyn LiveActivityJobScheduler>, user_id: &str, schedule_id: Uuid, req: RespondScheduleRequest, @@ -1275,7 +1344,10 @@ async fn respond_schedule_impl( } } - if let Err(error) = live_activity_service::sync_schedule_jobs(pool, schedule_id).await { + if let Some(scheduler) = live_activity_job_scheduler { + live_activity_service::sync_schedule_jobs_with_scheduler(pool, scheduler, schedule_id) + .await?; + } else if let Err(error) = live_activity_service::sync_schedule_jobs(pool, schedule_id).await { tracing::error!( "Failed to sync live activity jobs after schedule response {}: {}", schedule_id, diff --git a/infra/rust-backend/src/services/vote_live_activity_service.rs b/infra/rust-backend/src/services/vote_live_activity_service.rs index 21898e1b..a9e4f271 100644 --- a/infra/rust-backend/src/services/vote_live_activity_service.rs +++ b/infra/rust-backend/src/services/vote_live_activity_service.rs @@ -10,6 +10,7 @@ use crate::models::live_activity::{ }; use crate::models::notification::{PushSender, VoteInfo}; use crate::models::schedule::{Schedule, ScheduleType}; +use crate::services::live_activity_service::LiveActivityJobScheduler; use crate::services::{live_activity_service, notification_service}; const DEFAULT_VOTE_DEADLINE_MINUTES_BEFORE: i16 = 30; @@ -45,6 +46,36 @@ pub async fn start_vote_live_activity( push_sender: Option<&dyn PushSender>, schedule_id: Uuid, user_id: &str, +) -> Result { + start_vote_live_activity_internal(pool, sender, push_sender, None, schedule_id, user_id).await +} + +pub async fn start_vote_live_activity_with_scheduler( + pool: &PgPool, + sender: &dyn LiveActivitySender, + push_sender: Option<&dyn PushSender>, + live_activity_job_scheduler: &dyn LiveActivityJobScheduler, + schedule_id: Uuid, + user_id: &str, +) -> Result { + start_vote_live_activity_internal( + pool, + sender, + push_sender, + Some(live_activity_job_scheduler), + schedule_id, + user_id, + ) + .await +} + +async fn start_vote_live_activity_internal( + pool: &PgPool, + sender: &dyn LiveActivitySender, + push_sender: Option<&dyn PushSender>, + live_activity_job_scheduler: Option<&dyn LiveActivityJobScheduler>, + schedule_id: Uuid, + user_id: &str, ) -> Result { let schedule = load_schedule(pool, schedule_id).await?; ensure_group_schedule(&schedule)?; @@ -167,7 +198,10 @@ pub async fn start_vote_live_activity( } } - if let Err(error) = live_activity_service::sync_schedule_jobs(pool, schedule.id).await { + if let Some(scheduler) = live_activity_job_scheduler { + live_activity_service::sync_schedule_jobs_with_scheduler(pool, scheduler, schedule.id) + .await?; + } else if let Err(error) = live_activity_service::sync_schedule_jobs(pool, schedule.id).await { tracing::error!( "Failed to sync live activity jobs after vote live activity start {}: {}", schedule.id, @@ -476,7 +510,8 @@ async fn load_vote_start_content_state( ); } - let responded_count = content_state.accepted_members.len() + content_state.declined_members.len(); + let responded_count = + content_state.accepted_members.len() + content_state.declined_members.len(); content_state.pending_count = total_member_count.saturating_sub(responded_count) as i32; content_state.is_finalized = responded_count >= total_member_count; diff --git a/infra/rust-backend/tests/live_activity_test.rs b/infra/rust-backend/tests/live_activity_test.rs index 91d7d9c2..8b187d09 100644 --- a/infra/rust-backend/tests/live_activity_test.rs +++ b/infra/rust-backend/tests/live_activity_test.rs @@ -1,11 +1,15 @@ use chrono::{DateTime, Duration, Utc}; use http_body_util::BodyExt; +use promiso_backend::errors::AppError; use promiso_backend::models::live_activity::{ LiveActivityJob, LiveActivityJobPayload, LiveActivityJobStatus, LiveActivityJobType, LiveActivityParticipant, LiveActivitySender, UpdateScheduleLiveActivityRequest, }; use promiso_backend::models::notification::{FcmMessage, NotificationType, PushResult, PushSender}; use promiso_backend::models::schedule::Schedule; +use promiso_backend::services::live_activity_service::{ + LiveActivityJobScheduler, ScheduledLiveActivityJob, +}; use promiso_backend::services::{live_activity_service, vote_live_activity_service}; use sqlx::PgPool; use std::sync::{Arc, Mutex}; @@ -187,6 +191,70 @@ async fn insert_job( .expect("Failed to insert job"); } +async fn insert_job_returning_id( + pool: &PgPool, + schedule_id: Uuid, + job_type: LiveActivityJobType, + scheduled_at: DateTime, + payload: Option, +) -> Uuid { + let row: (Uuid,) = sqlx::query_as( + "INSERT INTO live_activity_jobs (schedule_id, job_type, scheduled_at, payload) \ + VALUES ($1, $2, $3, $4) \ + RETURNING id", + ) + .bind(schedule_id) + .bind(job_type) + .bind(scheduled_at) + .bind(payload.map(|payload| serde_json::to_value(payload).expect("payload encode"))) + .fetch_one(pool) + .await + .expect("Failed to insert job"); + + row.0 +} + +#[derive(Clone)] +struct MockLiveActivityJobScheduler { + calls: Arc>>, + should_fail: bool, +} + +impl MockLiveActivityJobScheduler { + fn succeed() -> Self { + Self { + calls: Arc::new(Mutex::new(Vec::new())), + should_fail: false, + } + } + + fn fail() -> Self { + Self { + calls: Arc::new(Mutex::new(Vec::new())), + should_fail: true, + } + } + + fn calls(&self) -> Vec { + self.calls.lock().unwrap().clone() + } +} + +#[async_trait::async_trait] +impl LiveActivityJobScheduler for MockLiveActivityJobScheduler { + async fn enqueue_live_activity_job( + &self, + job: ScheduledLiveActivityJob, + ) -> Result<(), AppError> { + if self.should_fail { + return Err(AppError::Internal("cloud task enqueue failed".to_string())); + } + + self.calls.lock().unwrap().push(job); + Ok(()) + } +} + #[derive(Default)] struct MockLiveActivitySender { state: Arc>, @@ -362,6 +430,90 @@ async fn sync_schedule_jobs_creates_pending_start_job(pool: PgPool) { assert!(jobs[0].scheduled_at <= Utc::now()); } +#[sqlx::test(migrations = "./migrations")] +async fn sync_schedule_jobs_enqueues_cloud_task_for_created_start_job(pool: PgPool) { + insert_test_user(&pool, "host_task", "호스트").await; + insert_test_user(&pool, "member_task", "멤버").await; + let group_id = create_test_group(&pool, "host_task", "태스크 그룹").await; + add_member_to_group(&pool, group_id, "member_task").await; + + let schedule_id = insert_group_schedule( + &pool, + "host_task", + group_id, + "태스크 예약 일정", + Utc::now() + Duration::minutes(90), + Some(30), + 2, + ) + .await; + accept_schedule(&pool, schedule_id, "host_task").await; + accept_schedule(&pool, schedule_id, "member_task").await; + + let scheduler = MockLiveActivityJobScheduler::succeed(); + live_activity_service::sync_schedule_jobs_with_scheduler(&pool, &scheduler, schedule_id) + .await + .expect("sync should enqueue cloud task"); + + let jobs = load_jobs(&pool, schedule_id).await; + let pending_start_jobs: Vec<_> = jobs + .iter() + .filter(|job| { + job.job_type == LiveActivityJobType::Start + && job.status == LiveActivityJobStatus::Pending + }) + .collect(); + assert_eq!(pending_start_jobs.len(), 1); + + let calls = scheduler.calls(); + assert_eq!(calls.len(), 1); + assert_eq!(calls[0].id, pending_start_jobs[0].id); + assert_eq!(calls[0].schedule_id, schedule_id); + assert_eq!(calls[0].job_type, LiveActivityJobType::Start); + assert_eq!(calls[0].scheduled_at, pending_start_jobs[0].scheduled_at); +} + +#[sqlx::test(migrations = "./migrations")] +async fn sync_schedule_jobs_does_not_leave_pending_job_when_cloud_task_enqueue_fails(pool: PgPool) { + insert_test_user(&pool, "host_task_fail", "호스트").await; + insert_test_user(&pool, "member_task_fail", "멤버").await; + let group_id = create_test_group(&pool, "host_task_fail", "태스크 실패 그룹").await; + add_member_to_group(&pool, group_id, "member_task_fail").await; + + let schedule_id = insert_group_schedule( + &pool, + "host_task_fail", + group_id, + "태스크 실패 일정", + Utc::now() + Duration::minutes(90), + Some(30), + 2, + ) + .await; + accept_schedule(&pool, schedule_id, "host_task_fail").await; + accept_schedule(&pool, schedule_id, "member_task_fail").await; + + let scheduler = MockLiveActivityJobScheduler::fail(); + let error = + live_activity_service::sync_schedule_jobs_with_scheduler(&pool, &scheduler, schedule_id) + .await + .expect_err("sync should fail when cloud task enqueue fails"); + assert!(matches!(error, AppError::Internal(_))); + + let pending_start_count: (i64,) = sqlx::query_as( + "SELECT COUNT(*) \ + FROM live_activity_jobs \ + WHERE schedule_id = $1 \ + AND job_type = 'start'::live_activity_job_type \ + AND status = 'pending'::live_activity_job_status", + ) + .bind(schedule_id) + .fetch_one(&pool) + .await + .expect("Failed to count pending start jobs"); + assert_eq!(pending_start_count.0, 0); +} + #[sqlx::test(migrations = "./migrations")] async fn process_due_start_job_stores_channel_and_schedules_followups(pool: PgPool) { insert_test_user(&pool, "host_start", "호스트").await; @@ -422,6 +574,67 @@ async fn process_due_start_job_stores_channel_and_schedules_followups(pool: PgPo assert_eq!(sender_state.push_to_start_calls.len(), 2); } +#[sqlx::test(migrations = "./migrations")] +async fn dispatch_live_activity_job_processes_only_requested_due_job(pool: PgPool) { + insert_test_user(&pool, "host_dispatch", "호스트").await; + insert_test_user(&pool, "member_dispatch", "멤버").await; + let group_id = create_test_group(&pool, "host_dispatch", "디스패치 그룹").await; + add_member_to_group(&pool, group_id, "member_dispatch").await; + + let schedule_id = insert_group_schedule( + &pool, + "host_dispatch", + group_id, + "디스패치 일정", + Utc::now() + Duration::minutes(20), + Some(30), + 2, + ) + .await; + accept_schedule(&pool, schedule_id, "host_dispatch").await; + accept_schedule(&pool, schedule_id, "member_dispatch").await; + register_push_to_start_token(&pool, "host_dispatch", "host-dispatch-device", "token-host") + .await; + register_push_to_start_token( + &pool, + "member_dispatch", + "member-dispatch-device", + "token-member", + ) + .await; + + let job_id = insert_job_returning_id( + &pool, + schedule_id, + LiveActivityJobType::Start, + Utc::now() - Duration::minutes(1), + None, + ) + .await; + + let live_sender = MockLiveActivitySender::new(); + let push_sender = MockPushSender::new(); + let result = live_activity_service::dispatch_live_activity_job( + &pool, + &live_sender, + &push_sender, + job_id, + ) + .await + .expect("requested job dispatch should succeed"); + + assert!(result.processed); + let schedule = load_schedule(&pool, schedule_id).await; + assert_eq!( + schedule.live_activity_channel_id.as_deref(), + Some("channel-test-1") + ); + + let jobs = load_jobs(&pool, schedule_id).await; + let dispatched_job = jobs.iter().find(|job| job.id == job_id).unwrap(); + assert_eq!(dispatched_job.status, LiveActivityJobStatus::Succeeded); +} + #[sqlx::test(migrations = "./migrations")] async fn update_schedule_live_activity_all_arrived_schedules_end_job(pool: PgPool) { insert_test_user(&pool, "host_eta", "호스트").await; @@ -584,7 +797,11 @@ async fn regression_update_schedule_live_activity_reverts_end_job_when_all_arriv job.job_type == LiveActivityJobType::End && job.status == LiveActivityJobStatus::Pending }) .collect(); - assert_eq!(pending_end_jobs.len(), 1, "pending end job should be restored"); + assert_eq!( + pending_end_jobs.len(), + 1, + "pending end job should be restored" + ); assert!( pending_end_jobs[0].scheduled_at >= start_at + Duration::minutes(30) - Duration::minutes(1), "all-arrived override should be reverted to the default end time" @@ -776,7 +993,10 @@ async fn regression_start_vote_live_activity_without_targets_does_not_mutate_hos .fetch_one(&pool) .await .expect("Failed to count host votes"); - assert_eq!(host_vote_count.0, 0, "host vote must not be persisted on no-target return"); + assert_eq!( + host_vote_count.0, 0, + "host vote must not be persisted on no-target return" + ); } #[sqlx::test(migrations = "./migrations")] @@ -898,7 +1118,10 @@ async fn regression_start_vote_live_activity_failure_does_not_persist_host_vote( ) .await .expect_err("vote live activity should fail when every push-to-start send fails"); - assert!(matches!(error, promiso_backend::errors::AppError::Internal(_))); + assert!(matches!( + error, + promiso_backend::errors::AppError::Internal(_) + )); let schedule = load_schedule(&pool, schedule_id).await; assert_eq!(schedule.is_confirmed, Some(false)); @@ -911,8 +1134,7 @@ async fn regression_start_vote_live_activity_failure_does_not_persist_host_vote( .await .expect("Failed to count host votes"); assert_eq!( - host_vote_count.0, - 0, + host_vote_count.0, 0, "host vote must not be persisted when all push-to-start sends fail" ); } diff --git a/infra/rust-backend/tests/rust_migration_regression_test.rs b/infra/rust-backend/tests/rust_migration_regression_test.rs index 2ea14b3c..a20d82d4 100644 --- a/infra/rust-backend/tests/rust_migration_regression_test.rs +++ b/infra/rust-backend/tests/rust_migration_regression_test.rs @@ -30,6 +30,11 @@ fn test_config() -> Config { widget_jwt_secret: Some("test-secret".to_string()), odsay_api_key: None, kakao_rest_api_key: None, + scheduler_secret: None, + live_activity_task_project_id: "promiso-dev".to_string(), + live_activity_task_location: "asia-northeast3".to_string(), + live_activity_task_queue: "live-activity-jobs".to_string(), + live_activity_task_target_base_url: None, } }