Skip to content

Commit 9d6785d

Browse files
authored
Merge pull request #679 from A5cend-dev/backend-api-performance-enhancement
#438 [BE-API-084] Backend API Performance Enhancement and Storage - Step 84
2 parents f8ad2a0 + d452973 commit 9d6785d

5 files changed

Lines changed: 662 additions & 3 deletions

File tree

backend/src/Main.rs

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
// backend/src/main.rs
2+
//
3+
// Lance — Freelancer Platform with AI Agent Judge
4+
// BE-API-083: Async Processing Queue for Dispute File Analysis
5+
//
6+
// Bootstraps the Axum HTTP server, SQLx connection pool, tracing infrastructure,
7+
// and the background worker pool that processes dispute file analysis tasks.
8+
9+
use std::net::SocketAddr;
10+
use std::sync::Arc;
11+
12+
use axum::{middleware, Router};
13+
use sqlx::postgres::PgPoolOptions;
14+
use tower_http::{
15+
cors::{Any, CorsLayer},
16+
request_id::{MakeRequestUuid, PropagateRequestIdLayer, SetRequestIdLayer},
17+
timeout::TimeoutLayer,
18+
trace::TraceLayer,
19+
};
20+
use tracing::info;
21+
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
22+
23+
mod db;
24+
mod error;
25+
mod models;
26+
mod queue;
27+
mod routes;
28+
mod state;
29+
30+
use queue::worker::spawn_dispute_workers;
31+
use state::AppState;
32+
33+
/// Application entry point.
34+
///
35+
/// Initialisation order:
36+
/// 1. Tracing subscriber (JSON in production, pretty in dev)
37+
/// 2. Database pool (with validated pool limits for stability under load)
38+
/// 3. Async dispute queue + worker pool
39+
/// 4. Axum router with all middleware layers
40+
/// 5. TCP listener + graceful shutdown signal
41+
#[tokio::main]
42+
async fn main() -> anyhow::Result<()> {
43+
// ── 1. Tracing ──────────────────────────────────────────────────────────
44+
dotenvy::dotenv().ok();
45+
46+
let log_format = std::env::var("LOG_FORMAT").unwrap_or_else(|_| "pretty".into());
47+
48+
let filter = EnvFilter::try_from_default_env()
49+
.unwrap_or_else(|_| "backend=debug,tower_http=debug,sqlx=warn".into());
50+
51+
if log_format == "json" {
52+
tracing_subscriber::registry()
53+
.with(filter)
54+
.with(tracing_subscriber::fmt::layer().json())
55+
.init();
56+
} else {
57+
tracing_subscriber::registry()
58+
.with(filter)
59+
.with(tracing_subscriber::fmt::layer().pretty())
60+
.init();
61+
}
62+
63+
info!(
64+
version = env!("CARGO_PKG_VERSION"),
65+
"Lance backend starting"
66+
);
67+
68+
// ── 2. Database pool ────────────────────────────────────────────────────
69+
let database_url = std::env::var("DATABASE_URL")
70+
.expect("DATABASE_URL must be set");
71+
72+
// Pool tuning: keep max connections bounded so that concurrent load tests
73+
// never exhaust the PostgreSQL max_connections limit (acceptance criterion).
74+
let max_connections: u32 = std::env::var("DB_MAX_CONNECTIONS")
75+
.ok()
76+
.and_then(|v| v.parse().ok())
77+
.unwrap_or(20);
78+
79+
let min_connections: u32 = std::env::var("DB_MIN_CONNECTIONS")
80+
.ok()
81+
.and_then(|v| v.parse().ok())
82+
.unwrap_or(2);
83+
84+
let pool = PgPoolOptions::new()
85+
.max_connections(max_connections)
86+
.min_connections(min_connections)
87+
.acquire_timeout(std::time::Duration::from_secs(5))
88+
.idle_timeout(std::time::Duration::from_secs(600))
89+
.max_lifetime(std::time::Duration::from_secs(1800))
90+
.connect(&database_url)
91+
.await
92+
.expect("Failed to create database pool");
93+
94+
// Run any pending migrations on startup.
95+
sqlx::migrate!("./migrations")
96+
.run(&pool)
97+
.await
98+
.expect("Database migration failed");
99+
100+
info!(
101+
max_connections,
102+
min_connections,
103+
"Database pool initialised"
104+
);
105+
106+
// ── 3. Async queue + workers ────────────────────────────────────────────
107+
let worker_count: usize = std::env::var("DISPUTE_WORKER_COUNT")
108+
.ok()
109+
.and_then(|v| v.parse().ok())
110+
.unwrap_or(4);
111+
112+
let queue_capacity: usize = std::env::var("DISPUTE_QUEUE_CAPACITY")
113+
.ok()
114+
.and_then(|v| v.parse().ok())
115+
.unwrap_or(256);
116+
117+
let (queue_tx, queue_rx) = async_channel::bounded(queue_capacity);
118+
119+
// Spawn N background workers that drain the queue concurrently.
120+
spawn_dispute_workers(worker_count, queue_rx.clone(), pool.clone());
121+
122+
info!(
123+
worker_count,
124+
queue_capacity,
125+
"Dispute file analysis queue initialised"
126+
);
127+
128+
// ── 4. Application state ────────────────────────────────────────────────
129+
let state = Arc::new(AppState {
130+
db: pool,
131+
dispute_queue: queue_tx,
132+
});
133+
134+
// ── 5. Router ───────────────────────────────────────────────────────────
135+
let app = build_router(state);
136+
137+
// ── 6. Serve ────────────────────────────────────────────────────────────
138+
let host = std::env::var("HOST").unwrap_or_else(|_| "0.0.0.0".into());
139+
let port: u16 = std::env::var("PORT")
140+
.ok()
141+
.and_then(|v| v.parse().ok())
142+
.unwrap_or(8080);
143+
144+
let addr: SocketAddr = format!("{host}:{port}").parse()?;
145+
let listener = tokio::net::TcpListener::bind(addr).await?;
146+
147+
info!(%addr, "Listening");
148+
149+
axum::serve(listener, app)
150+
.with_graceful_shutdown(shutdown_signal())
151+
.await?;
152+
153+
Ok(())
154+
}
155+
156+
/// Constructs the full Axum `Router` with all middleware layers attached.
157+
///
158+
/// Middleware stack (outermost → innermost):
159+
/// SetRequestId → PropagateRequestId → TraceLayer → TimeoutLayer → CorsLayer
160+
fn build_router(state: Arc<AppState>) -> Router {
161+
let x_request_id = axum::http::HeaderName::from_static("x-request-id");
162+
163+
Router::new()
164+
.merge(routes::health::router())
165+
.merge(routes::disputes::router())
166+
.with_state(state)
167+
// Emit structured per-request spans that include method, URI, status,
168+
// latency, and the propagated x-request-id.
169+
.layer(TraceLayer::new_for_http())
170+
// Hard request timeout — prevents slow DB queries from starving workers.
171+
.layer(TimeoutLayer::new(std::time::Duration::from_secs(30)))
172+
// CORS — tighten in production via ALLOWED_ORIGINS env var.
173+
.layer(
174+
CorsLayer::new()
175+
.allow_origin(Any)
176+
.allow_methods(Any)
177+
.allow_headers(Any),
178+
)
179+
// Propagate request-id header through response so clients can correlate.
180+
.layer(PropagateRequestIdLayer::new(x_request_id.clone()))
181+
.layer(SetRequestIdLayer::new(
182+
x_request_id,
183+
MakeRequestUuid,
184+
))
185+
}
186+
187+
/// Listens for SIGTERM (Docker/k8s) and Ctrl-C and resolves when either fires.
188+
async fn shutdown_signal() {
189+
use tokio::signal;
190+
191+
let ctrl_c = async {
192+
signal::ctrl_c()
193+
.await
194+
.expect("failed to install Ctrl+C handler");
195+
};
196+
197+
#[cfg(unix)]
198+
let terminate = async {
199+
signal::unix::signal(signal::unix::SignalKind::terminate())
200+
.expect("failed to install signal handler")
201+
.recv()
202+
.await;
203+
};
204+
205+
#[cfg(not(unix))]
206+
let terminate = std::future::pending::<()>();
207+
208+
tokio::select! {
209+
_ = ctrl_c => { info!("Received Ctrl-C, shutting down") },
210+
_ = terminate => { info!("Received SIGTERM, shutting down") },
211+
}
212+
}

backend/src/State.rs

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/// state.rs
2+
///
3+
/// Shared application state injected into every Axum handler via
4+
/// `axum::extract::State<AppState>`.
5+
///
6+
/// This module initialises:
7+
/// • The SQLx Postgres connection pool (with tuned pool limits).
8+
/// • The `deadpool_redis` connection pool.
9+
/// • The `RedisCache` wrapper.
10+
11+
use std::env;
12+
13+
use deadpool_redis::{Config as RedisConfig, Runtime};
14+
use sqlx::{postgres::PgPoolOptions, PgPool};
15+
use tracing::{info, instrument};
16+
17+
use crate::cache::RedisCache;
18+
19+
/// Cloneable application state shared across all request handlers.
20+
#[derive(Clone, Debug)]
21+
pub struct AppState {
22+
/// SQLx Postgres pool.
23+
pub db: PgPool,
24+
/// Redis cache wrapper.
25+
pub cache: RedisCache,
26+
}
27+
28+
impl AppState {
29+
/// Build the `AppState` from environment variables.
30+
///
31+
/// Required env vars:
32+
/// - `DATABASE_URL` — Postgres connection string.
33+
/// - `REDIS_URL` — Redis connection string (e.g. `redis://localhost:6379`).
34+
///
35+
/// Optional env vars with defaults:
36+
/// - `DB_MAX_CONNECTIONS` (default: 20)
37+
/// - `DB_MIN_CONNECTIONS` (default: 5)
38+
/// - `REDIS_POOL_SIZE` (default: 16)
39+
#[instrument(name = "AppState::init")]
40+
pub async fn init() -> anyhow::Result<Self> {
41+
let database_url = env::var("DATABASE_URL")
42+
.expect("DATABASE_URL must be set");
43+
let redis_url = env::var("REDIS_URL")
44+
.expect("REDIS_URL must be set");
45+
46+
let db_max: u32 = env::var("DB_MAX_CONNECTIONS")
47+
.ok()
48+
.and_then(|v| v.parse().ok())
49+
.unwrap_or(20);
50+
51+
let db_min: u32 = env::var("DB_MIN_CONNECTIONS")
52+
.ok()
53+
.and_then(|v| v.parse().ok())
54+
.unwrap_or(5);
55+
56+
let redis_pool_size: usize = env::var("REDIS_POOL_SIZE")
57+
.ok()
58+
.and_then(|v| v.parse().ok())
59+
.unwrap_or(16);
60+
61+
// ----------------------------------------------------------------
62+
// Postgres pool
63+
// ----------------------------------------------------------------
64+
info!(db_max, db_min, "Initialising Postgres connection pool");
65+
66+
let db = PgPoolOptions::new()
67+
// Hard ceiling on open connections.
68+
.max_connections(db_max)
69+
// Keep a warm pool — prevents cold-start latency spikes.
70+
.min_connections(db_min)
71+
// Fail fast rather than queue requests indefinitely.
72+
.acquire_timeout(std::time::Duration::from_secs(5))
73+
// Recycle long-lived idle connections to avoid stale socket errors.
74+
.idle_timeout(std::time::Duration::from_secs(300))
75+
// Validate the connection health before handing it to a handler.
76+
.test_before_acquire(true)
77+
.connect(&database_url)
78+
.await?;
79+
80+
info!("Postgres pool ready");
81+
82+
// ----------------------------------------------------------------
83+
// Redis pool
84+
// ----------------------------------------------------------------
85+
info!(redis_pool_size, "Initialising Redis connection pool");
86+
87+
let redis_cfg = RedisConfig::from_url(&redis_url);
88+
let redis_pool = redis_cfg
89+
.create_pool(Some(Runtime::Tokio1))
90+
.map_err(|e| anyhow::anyhow!("Redis pool creation failed: {e}"))?;
91+
92+
// Override pool size from config.
93+
// deadpool_redis uses a builder; we re-create with explicit size.
94+
let redis_pool = deadpool_redis::Config {
95+
url: Some(redis_url),
96+
pool: Some(deadpool_redis::PoolConfig {
97+
max_size: redis_pool_size,
98+
..Default::default()
99+
}),
100+
..Default::default()
101+
}
102+
.create_pool(Some(Runtime::Tokio1))
103+
.map_err(|e| anyhow::anyhow!("Redis pool creation failed: {e}"))?;
104+
105+
// Smoke-test the Redis connection at startup.
106+
{
107+
let mut conn = redis_pool.get().await
108+
.map_err(|e| anyhow::anyhow!("Redis connection test failed: {e}"))?;
109+
let pong: String = deadpool_redis::redis::cmd("PING")
110+
.query_async(&mut conn)
111+
.await
112+
.map_err(|e| anyhow::anyhow!("Redis PING failed: {e}"))?;
113+
info!(pong, "Redis connection verified");
114+
}
115+
116+
let cache = RedisCache::new(redis_pool);
117+
118+
Ok(Self { db, cache })
119+
}
120+
}

0 commit comments

Comments
 (0)