From 51e3f7e1591536e060df6c1535abbb642c68a1cb Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Thu, 26 Feb 2026 13:05:31 +0530 Subject: [PATCH 1/4] fix: user login/logout sync --- src/cli.rs | 9 ++++++++ src/handlers/http/oidc.rs | 8 ++++--- src/rbac/user.rs | 1 - src/utils/mod.rs | 44 +++++++++++++++++++++++++++++++++------ 4 files changed, 52 insertions(+), 10 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 9cfb011bb..cf94e5800 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -407,6 +407,14 @@ pub struct Options { )] pub querier_endpoint: String, + #[arg( + long, + env = "P_PRISM_ENDPOINT", + default_value = "", + help = "URL to connect to the prism node. Default is the address of the server" + )] + pub prism_endpoint: String, + #[command(flatten)] pub oidc: Option, @@ -593,6 +601,7 @@ impl Options { Mode::Ingest => self.get_endpoint(&self.ingestor_endpoint, "P_INGESTOR_ENDPOINT"), Mode::Index => self.get_endpoint(&self.indexer_endpoint, "P_INDEXER_ENDPOINT"), Mode::Query => self.get_endpoint(&self.querier_endpoint, "P_QUERIER_ENDPOINT"), + Mode::Prism => self.get_endpoint(&self.prism_endpoint, "P_PRISM_ENDPOINT"), _ => return self.build_url(&self.address), }; diff --git a/src/handlers/http/oidc.rs b/src/handlers/http/oidc.rs index a7d24e3c2..9efff03a3 100644 --- a/src/handlers/http/oidc.rs +++ b/src/handlers/http/oidc.rs @@ -33,7 +33,7 @@ use tokio::sync::RwLock; use ulid::Ulid; use url::Url; -use crate::utils::login_sync; +use crate::utils::{login_sync, logout_sync}; use crate::{ handlers::{ COOKIE_AGE_DAYS, SESSION_COOKIE_NAME, USER_COOKIE_NAME, USER_ID_COOKIE_NAME, @@ -176,14 +176,16 @@ pub async fn logout(req: HttpRequest, query: web::Query) -> None }; - match (user, logout_endpoint) { + let res = match (user, logout_endpoint) { (Some(username), Some(logout_endpoint)) if Users.is_oauth(&username, &tenant_id).unwrap_or_default() => { redirect_to_oidc_logout(logout_endpoint, &query.redirect) } _ => redirect_to_client(query.redirect.as_str(), None), - } + }; + let _ = logout_sync(session, &tenant_id).await; + res } /// Handler for code callback diff --git a/src/rbac/user.rs b/src/rbac/user.rs index cb87798c5..475fb96ab 100644 --- a/src/rbac/user.rs +++ b/src/rbac/user.rs @@ -376,7 +376,6 @@ impl UserGroup { // validate that the users exist and no protected user is included if let Some(tenant_users) = users().get(tenant) { for group_user in &self.users { - tracing::warn!(group_user=?group_user); if let Some(user) = tenant_users.get(group_user.userid()) && !user.protected && user.tenant.eq(tenant_id) diff --git a/src/utils/mod.rs b/src/utils/mod.rs index eb0044201..6b37aea88 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -173,7 +173,6 @@ pub async fn user_auth_for_datasets( Some(ParseableResourceType::Stream(stream)), ) => { if !PARSEABLE.check_or_load_stream(stream, tenant_id).await { - tracing::warn!("unable to find stream"); return Err(actix_web::error::ErrorUnauthorized(format!( "Stream not found: {table_name}" ))); @@ -254,11 +253,14 @@ pub fn create_intracluster_auth_headermap( reqwest::header::AUTHORIZATION, reqwest::header::HeaderValue::from_bytes(auth.as_bytes()).unwrap(), ); - } else if let Some(auth) = req.get(actix_web::http::header::COOKIE) { - map.insert( - reqwest::header::COOKIE, - reqwest::header::HeaderValue::from_bytes(auth.as_bytes()).unwrap(), - ); + } else if req.contains_key(actix_web::http::header::COOKIE) { + // multiple cookies + for cookie in req.get_all(actix_web::http::header::COOKIE) { + map.insert( + reqwest::header::COOKIE, + reqwest::header::HeaderValue::from_bytes(cookie.as_bytes()).unwrap(), + ); + } } else { map.insert( reqwest::header::AUTHORIZATION, @@ -303,3 +305,33 @@ pub async fn login_sync( }) .await } + +pub async fn logout_sync( + session: SessionKey, + tenant_id: &Option, +) -> Result<(), anyhow::Error> { + for_each_live_node(tenant_id, move |node| { + let url = format!( + "{}{}/o/logout/sync", + node.domain_name, + base_path_without_preceding_slash(), + ); + let _session = session.clone(); + + async move { + INTRA_CLUSTER_CLIENT + .post(url) + .header(header::AUTHORIZATION, node.token) + .header(header::CONTENT_TYPE, "application/json") + .json(&json!( + { + "sessionKey": _session + } + )) + .send() + .await?; + Ok::<(), anyhow::Error>(()) + } + }) + .await +} From 4a423fb5e309735bc3f263675cb92cd0b89213c9 Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Fri, 27 Feb 2026 15:00:04 +0530 Subject: [PATCH 2/4] intra-cluster calls logic modified --- src/alerts/alert_types.rs | 32 +++-- src/handlers/http/cluster/mod.rs | 39 ++++-- src/handlers/http/ingest.rs | 4 +- src/handlers/http/logstream.rs | 32 ++--- src/handlers/http/middleware.rs | 111 +++++++++++++++++- src/handlers/http/modal/ingest_server.rs | 30 +++-- src/handlers/http/modal/query/querier_role.rs | 2 +- src/handlers/http/modal/server.rs | 33 ++++-- src/handlers/http/oidc.rs | 10 +- src/handlers/http/rbac.rs | 40 +++---- src/rbac/map.rs | 29 +++-- src/rbac/mod.rs | 4 + src/rbac/user.rs | 12 +- src/storage/field_stats.rs | 18 ++- src/utils/actix.rs | 6 +- src/utils/mod.rs | 96 ++++----------- src/validator.rs | 2 + 17 files changed, 319 insertions(+), 181 deletions(-) diff --git a/src/alerts/alert_types.rs b/src/alerts/alert_types.rs index b8ac44d7a..9c6c0e48d 100644 --- a/src/alerts/alert_types.rs +++ b/src/alerts/alert_types.rs @@ -18,7 +18,7 @@ use std::{str::FromStr, time::Duration}; -use actix_web::http::header::{HeaderMap, HeaderName, HeaderValue}; +use actix_web::http::header::{HeaderName, HeaderValue}; use chrono::{DateTime, Utc}; use serde_json::Value; use tonic::async_trait; @@ -36,9 +36,12 @@ use crate::{ get_number_of_agg_exprs, target::{self, NotificationConfig}, }, - handlers::http::query::create_streams_for_distributed, + handlers::http::{ + middleware::{CLUSTER_SECRET, CLUSTER_SECRET_HEADER}, + query::create_streams_for_distributed, + }, metastore::metastore_traits::MetastoreObject, - parseable::PARSEABLE, + parseable::{DEFAULT_TENANT, PARSEABLE}, query::resolve_stream_names, rbac::map::SessionKey, storage::object_storage::alert_json_path, @@ -87,18 +90,29 @@ impl MetastoreObject for ThresholdAlert { impl AlertTrait for ThresholdAlert { async fn eval_alert(&self) -> Result, AlertError> { let time_range = extract_time_range(&self.eval_config)?; - let auth = if let Some(tenant) = self.tenant_id.as_ref() - && let Some(header) = TENANT_METADATA.get_global_query_auth(tenant) - { - let mut map = HeaderMap::new(); + + let tenant = self.tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); + let auth = if let Some((_, hash)) = CLUSTER_SECRET.get() { + let mut map = actix_web::http::header::HeaderMap::new(); + if let Some(header) = TENANT_METADATA.get_global_query_auth(tenant) { + map.insert( + HeaderName::from_static("authorization"), + HeaderValue::from_str(&header).unwrap(), + ); + } + map.insert( + HeaderName::from_static(CLUSTER_SECRET_HEADER), + HeaderValue::from_str(hash).unwrap(), + ); map.insert( - HeaderName::from_static("authorization"), - HeaderValue::from_str(&header).unwrap(), + HeaderName::from_static("intra-cluster-tenant"), + HeaderValue::from_str(tenant).unwrap(), ); Some(map) } else { None }; + let query_result = execute_alert_query(auth, self.get_query(), &time_range, &self.tenant_id).await?; diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 83d62cb7e..cefe607b6 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -19,6 +19,8 @@ pub mod utils; use actix_web::http::StatusCode; use actix_web::http::header::HeaderMap; +use base64::Engine; +use base64::prelude::BASE64_STANDARD; use futures::{StreamExt, future, stream}; use http::header; use lazy_static::lazy_static; @@ -50,7 +52,9 @@ use crate::rbac::role::model::Role; use crate::rbac::user::User; use crate::stats::Stats; use crate::storage::{ObjectStorageError, ObjectStoreFormat}; -use crate::utils::{create_intracluster_auth_headermap, get_tenant_id_from_request}; +use crate::utils::{ + create_intracluster_auth_headermap, get_tenant_id_from_request, get_user_from_request, +}; use super::base_path_without_preceding_slash; use super::ingest::PostError; @@ -529,7 +533,7 @@ pub async fn sync_users_with_roles_with_ingestors( ); let role_data = role_data.clone(); - let headermap = create_intracluster_auth_headermap(&headers, &ingestor.token); + let headermap = create_intracluster_auth_headermap(&headers, &ingestor.token, &userid); async move { let res = INTRA_CLUSTER_CLIENT .patch(url) @@ -576,7 +580,7 @@ pub async fn sync_user_deletion_with_ingestors( base_path_without_preceding_slash(), userid ); - let headermap = create_intracluster_auth_headermap(&headers, &ingestor.token); + let headermap = create_intracluster_auth_headermap(&headers, &ingestor.token, &userid); async move { let res = INTRA_CLUSTER_CLIENT .delete(url) @@ -634,7 +638,7 @@ pub async fn sync_user_creation( base_path_without_preceding_slash(), userid ); - let headermap = create_intracluster_auth_headermap(&headers, &node.token); + let headermap = create_intracluster_auth_headermap(&headers, &node.token, &userid); let user_data = user_data.clone(); async move { @@ -672,20 +676,21 @@ pub async fn sync_password_reset_with_ingestors( req: HttpRequest, username: &str, ) -> Result<(), RBACError> { - let username = username.to_owned(); + let userid = username.to_owned(); let tenant_id = get_tenant_id_from_request(&req); + let headers = req.headers().clone(); for_each_live_node(&tenant_id, move |ingestor| { let url = format!( "{}{}/user/{}/generate-new-password/sync", ingestor.domain_name, base_path_without_preceding_slash(), - username + userid ); - + let headermap = create_intracluster_auth_headermap(&headers, &ingestor.token, &userid); async move { let res = INTRA_CLUSTER_CLIENT .post(url) - .header(header::AUTHORIZATION, &ingestor.token) + .headers(headermap) .header(header::CONTENT_TYPE, "application/json") .send() .await @@ -713,11 +718,14 @@ pub async fn sync_password_reset_with_ingestors( // forward the put role request to all ingestors and queriers to keep them in sync pub async fn sync_role_update( + req: &HttpRequest, name: String, role: Role, tenant_id: &Option, ) -> Result<(), RoleError> { let tenant = tenant_id.to_owned(); + let userid = get_user_from_request(req).unwrap(); + let headers = req.headers().clone(); for_each_live_node(tenant_id, move |node| { let url = format!( "{}{}/role/{}/sync", @@ -727,12 +735,12 @@ pub async fn sync_role_update( ); let role = role.clone(); - + let headermap = create_intracluster_auth_headermap(&headers, &node.token, &userid); let tenant_id = tenant.clone(); async move { let res = INTRA_CLUSTER_CLIENT .put(url) - .header(header::AUTHORIZATION, &node.token) + .headers(headermap) .header(header::CONTENT_TYPE, "application/json") .json(&SyncRole::new(role, tenant_id.clone())) .send() @@ -1904,7 +1912,6 @@ pub async fn send_query_request( let mut map = reqwest::header::HeaderMap::new(); if let Some(auth) = auth_token { - // always basic auth for (key, value) in auth.iter() { if let Ok(name) = reqwest::header::HeaderName::from_bytes(key.as_str().as_bytes()) && let Ok(val) = reqwest::header::HeaderValue::from_bytes(value.as_bytes()) @@ -1918,6 +1925,16 @@ pub async fn send_query_request( reqwest::header::HeaderValue::from_str(&querier.token).unwrap(), ); }; + if map.get("intra-cluster-userid").is_none() { + let token: Vec<&str> = querier.token().split(' ').collect(); + let decode = BASE64_STANDARD.decode(token[1]).unwrap(); + let user = String::from_utf8(decode).unwrap(); + let user = user.split_once(':').unwrap(); + map.insert( + reqwest::header::HeaderName::from_static("intra-cluster-userid"), + reqwest::header::HeaderValue::from_str(user.0).unwrap(), + ); + } let res = match INTRA_CLUSTER_CLIENT .post(uri) .timeout(Duration::from_secs(300)) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 22cfb7413..0f3a87744 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -434,10 +434,10 @@ pub async fn handle_otel_traces_ingestion( // fails if the logstream does not exist pub async fn post_event( req: HttpRequest, - stream_name: Path, + logstream: Path, Json(json): Json, ) -> Result { - let stream_name = stream_name.into_inner(); + let stream_name = logstream.into_inner(); let tenant_id = get_tenant_id_from_request(&req); let internal_stream_names = PARSEABLE.streams.list_internal_streams(&tenant_id); if internal_stream_names.contains(&stream_name) { diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 1359e1a36..17f1be2fe 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -51,9 +51,9 @@ use tracing::warn; pub async fn delete( req: HttpRequest, - stream_name: Path, + logstream: Path, ) -> Result { - let stream_name = stream_name.into_inner(); + let stream_name = logstream.into_inner(); // Error out if stream doesn't exist in memory, or in the case of query node, in storage as well let tenant_id = get_tenant_id_from_request(&req); if !PARSEABLE @@ -173,9 +173,9 @@ pub async fn detect_schema(Json(json): Json) -> Result, + logstream: Path, ) -> Result { - let stream_name = stream_name.into_inner(); + let stream_name = logstream.into_inner(); let tenant_id = get_tenant_id_from_request(&req); // Ensure parseable is aware of stream in distributed mode if !PARSEABLE @@ -200,10 +200,10 @@ pub async fn get_schema( pub async fn put_stream( req: HttpRequest, - stream_name: Path, + logstream: Path, body: Bytes, ) -> Result { - let stream_name = stream_name.into_inner(); + let stream_name = logstream.into_inner(); let tenant_id = get_tenant_id_from_request(&req); PARSEABLE @@ -300,9 +300,9 @@ pub async fn get_stats_date( pub async fn get_stats( req: HttpRequest, - stream_name: Path, + logstream: Path, ) -> Result { - let stream_name = stream_name.into_inner(); + let stream_name = logstream.into_inner(); let tenant_id = get_tenant_id_from_request(&req); // For query mode, if the stream not found in memory map, //check if it exists in the storage @@ -365,9 +365,9 @@ pub async fn get_stats( pub async fn get_stream_info( req: HttpRequest, - stream_name: Path, + logstream: Path, ) -> Result { - let stream_name = stream_name.into_inner(); + let stream_name = logstream.into_inner(); let tenant_id = get_tenant_id_from_request(&req); // For query mode, if the stream not found in memory map, //check if it exists in the storage @@ -415,10 +415,10 @@ pub async fn get_stream_info( pub async fn put_stream_hot_tier( req: HttpRequest, - stream_name: Path, + logstream: Path, Json(mut hottier): Json, ) -> Result { - let stream_name = stream_name.into_inner(); + let stream_name = logstream.into_inner(); let tenant_id = get_tenant_id_from_request(&req); // For query mode, if the stream not found in memory map, //check if it exists in the storage @@ -478,9 +478,9 @@ pub async fn put_stream_hot_tier( pub async fn get_stream_hot_tier( req: HttpRequest, - stream_name: Path, + logstream: Path, ) -> Result { - let stream_name = stream_name.into_inner(); + let stream_name = logstream.into_inner(); let tenant_id = get_tenant_id_from_request(&req); // For query mode, if the stream not found in memory map, //check if it exists in the storage @@ -504,9 +504,9 @@ pub async fn get_stream_hot_tier( pub async fn delete_stream_hot_tier( req: HttpRequest, - stream_name: Path, + logstream: Path, ) -> Result { - let stream_name = stream_name.into_inner(); + let stream_name = logstream.into_inner(); let tenant_id = get_tenant_id_from_request(&req); // For query mode, if the stream not found in memory map, //check if it exists in the storage diff --git a/src/handlers/http/middleware.rs b/src/handlers/http/middleware.rs index cde202a51..0f8880981 100644 --- a/src/handlers/http/middleware.rs +++ b/src/handlers/http/middleware.rs @@ -25,8 +25,11 @@ use actix_web::{ error::{ErrorBadRequest, ErrorForbidden, ErrorUnauthorized}, http::header::{self, HeaderName, HeaderValue}, }; -use chrono::{Duration, Utc}; +use argon2::{Argon2, PasswordHash, PasswordVerifier}; +use chrono::{Duration, TimeDelta, Utc}; use futures_util::future::LocalBoxFuture; +use once_cell::sync::OnceCell; +use ulid::Ulid; use crate::{ handlers::{ @@ -52,6 +55,9 @@ use crate::{ use serde::{Deserialize, Serialize}; +pub const CLUSTER_SECRET_HEADER: &str = "x-p-cluster-secret"; +pub static CLUSTER_SECRET: OnceCell<(String, String)> = OnceCell::new(); + #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] pub struct Message { @@ -398,7 +404,12 @@ pub fn auth_no_context(req: &mut ServiceRequest, action: Action) -> Result() { + Ok(session.clone()) + } else { + extract_session_key(req) + }; + // let creds = extract_session_key(req); creds.map(|key| Users.authorize(key, action, None, None)) } @@ -481,7 +492,7 @@ where forward_ready!(service); fn call(&self, req: ServiceRequest) -> Self::Future { - let username = req.match_info().get("username").unwrap_or(""); + let username = req.match_info().get("userid").unwrap_or(""); let is_root = username == PARSEABLE.options.username; let fut = self.service.call(req); @@ -494,6 +505,100 @@ where } } +// The credentials set in the env vars (P_USERNAME & P_PASSWORD) are treated +// as root credentials. Any other user is not allowed to modify or delete +// the root user. Deny request if username is same as username +// from env variable P_USERNAME. +pub struct IntraClusterRequest; + +impl Transform for IntraClusterRequest +where + S: Service, Error = Error>, + S::Future: 'static, + B: 'static, +{ + type Response = ServiceResponse; + type Error = Error; + type InitError = (); + type Transform = IntraClusterRequestMiddleware; + type Future = Ready>; + + fn new_transform(&self, service: S) -> Self::Future { + ready(Ok(IntraClusterRequestMiddleware { service })) + } +} + +pub struct IntraClusterRequestMiddleware { + service: S, +} + +impl Service for IntraClusterRequestMiddleware +where + S: Service, Error = Error>, + S::Future: 'static, + B: 'static, +{ + type Response = ServiceResponse; + type Error = Error; + type Future = LocalBoxFuture<'static, Result>; + + forward_ready!(service); + + fn call(&self, mut req: ServiceRequest) -> Self::Future { + let err = if let Some((_, hash)) = CLUSTER_SECRET.get() { + if let Some(header) = req.headers().get(CLUSTER_SECRET_HEADER) + && let Some(tenant) = req.headers().get("intra-cluster-tenant") + && let Some(userid) = req.headers().get("intra-cluster-userid") + && let Ok(incoming_secret) = header.to_str() + && let Ok(tenant) = tenant.to_str() + && let Ok(userid) = userid.to_str() + { + // validate the incoming header value + let parsed_hash = PasswordHash::new(incoming_secret).unwrap(); + if Argon2::default() + .verify_password(hash.as_bytes(), &parsed_hash) + .is_ok() + { + // create a user session (how to remove that later?) + let tenant_id = if tenant.eq(DEFAULT_TENANT) { + None + } else { + Some(tenant.to_owned()) + }; + if let Some(user) = Users.get_user(userid, &tenant_id) { + let id = Ulid::new(); + req.headers_mut().insert( + header::COOKIE, + HeaderValue::from_str(&format!("session={}", id)).unwrap(), + ); + let session = SessionKey::SessionId(id); + req.extensions_mut().insert(session.clone()); + Users.new_session(&user, session, TimeDelta::seconds(20)); + } + None + } else { + Some("Incoming intra-cluster request validation failed") + } + } else { + Some( + "Incoming intra-cluster request doesn't contain the proper header or the server was started without P_CLUSTER_SECRET", + ) + } + } else { + None + }; + + let fut = self.service.call(req); + + Box::pin(async move { + if let Some(err) = err { + return Err(ErrorUnauthorized(err)); + } + fut.await + }) + } +} + /// ModeFilterMiddleware factory pub struct ModeFilter; diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index c389bbafd..59e5b0787 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -30,6 +30,7 @@ use serde_json::Value; use tokio::sync::OnceCell; use tokio::sync::oneshot; +use crate::handlers::http::middleware::IntraClusterRequest; use crate::handlers::http::modal::NodeType; use crate::sync::sync_start; use crate::{ @@ -172,8 +173,12 @@ impl IngestServer { .route(web::get().to(role::get).authorize(Action::GetRole)), ) .service( - web::resource("/{name}/sync") - .route(web::put().to(ingestor_role::put).authorize(Action::PutRole)), // .route(web::delete().to(ingestor_role::delete).authorize(Action::DeleteRole)), + web::resource("/{name}/sync").route( + web::put() + .to(ingestor_role::put) + .authorize(Action::PutRole) + .wrap(IntraClusterRequest), + ), // .route(web::delete().to(ingestor_role::delete).authorize(Action::DeleteRole)), ) } // get the user webscope @@ -185,13 +190,15 @@ impl IngestServer { .route( web::post() .to(ingestor_rbac::post_user) - .authorize(Action::PutUser), + .authorize(Action::PutUser) + .wrap(IntraClusterRequest), ) // DELETE /user/{userid} => Sync deletion of a user .route( web::delete() .to(ingestor_rbac::delete_user) - .authorize(Action::DeleteUser), + .authorize(Action::DeleteUser) + .wrap(IntraClusterRequest), ) .wrap(DisAllowRootUser), ) @@ -202,7 +209,8 @@ impl IngestServer { web::patch() .to(ingestor_rbac::add_roles_to_user) .authorize(Action::PutUserRoles) - .wrap(DisAllowRootUser), + .wrap(DisAllowRootUser) + .wrap(IntraClusterRequest), ), ) .service( @@ -212,7 +220,8 @@ impl IngestServer { web::patch() .to(ingestor_rbac::remove_roles_from_user) .authorize(Action::PutUserRoles) - .wrap(DisAllowRootUser), + .wrap(DisAllowRootUser) + .wrap(IntraClusterRequest), ), ) .service( @@ -222,7 +231,8 @@ impl IngestServer { web::post() .to(ingestor_rbac::post_gen_password) .authorize(Action::PutUser) - .wrap(DisAllowRootUser), + .wrap(DisAllowRootUser) + .wrap(IntraClusterRequest), ), ) } @@ -247,13 +257,15 @@ impl IngestServer { .route( web::delete() .to(ingestor_logstream::delete) - .authorize(Action::DeleteStream), + .authorize(Action::DeleteStream) + .wrap(IntraClusterRequest), ) // PUT "/logstream/{logstream}/sync" ==> Sync creation of a new log stream .route( web::put() .to(ingestor_logstream::put_stream) - .authorize_for_resource(Action::CreateStream), + .authorize_for_resource(Action::CreateStream) + .wrap(IntraClusterRequest), ), ) .service( diff --git a/src/handlers/http/modal/query/querier_role.rs b/src/handlers/http/modal/query/querier_role.rs index f53e703ab..083736c8a 100644 --- a/src/handlers/http/modal/query/querier_role.rs +++ b/src/handlers/http/modal/query/querier_role.rs @@ -104,7 +104,7 @@ pub async fn put( mut_sessions().remove_user(&userid, tenant); } - sync_role_update(name.clone(), role, &tenant_id).await?; + sync_role_update(&req, name.clone(), role, &tenant_id).await?; Ok(HttpResponse::Ok().finish()) } diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 72b254ad3..9b7aa4aea 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -26,6 +26,7 @@ use crate::handlers::http::base_path; use crate::handlers::http::demo_data::get_demo_data; use crate::handlers::http::health_check; use crate::handlers::http::max_event_payload_size; +use crate::handlers::http::middleware::IntraClusterRequest; use crate::handlers::http::modal::initialize_hot_tier_metadata_on_startup; use crate::handlers::http::prism_base_path; use crate::handlers::http::query; @@ -417,7 +418,12 @@ impl Server { // get the query factory // POST "/query" ==> Get results of the SQL query passed in request body pub fn get_query_factory() -> Resource { - web::resource("/query").route(web::post().to(query::query).authorize(Action::Query)) + web::resource("/query").route( + web::post() + .to(query::query) + .authorize(Action::Query) + .wrap(IntraClusterRequest), + ) } // get the logstream web scope @@ -516,7 +522,8 @@ impl Server { .route( web::get() .to(logstream::get_stream_hot_tier) - .authorize_for_resource(Action::GetHotTierEnabled), + .authorize_for_resource(Action::GetHotTierEnabled) + .wrap(IntraClusterRequest), ) .route( web::delete() @@ -613,7 +620,7 @@ impl Server { ), ) .service( - web::resource("/{username}").route( + web::resource("/{userid}").route( web::get() .to(http::rbac::get_prism_user) .authorize_for_user(Action::GetUserRoles), @@ -634,14 +641,14 @@ impl Server { ), ) .service( - web::resource("/{username}") - // POST /user/{username} => Create a new user + web::resource("/{userid}") + // POST /user/{userid} => Create a new user .route( web::post() .to(http::rbac::post_user) .authorize(Action::PutUser), ) - // DELETE /user/{username} => Delete a user + // DELETE /user/{userid} => Delete a user .route( web::delete() .to(http::rbac::delete_user) @@ -650,15 +657,15 @@ impl Server { .wrap(DisAllowRootUser), ) .service( - web::resource("/{username}/role").route( + web::resource("/{userid}/role").route( web::get() .to(http::rbac::get_role) .authorize_for_user(Action::GetUserRoles), ), ) .service( - web::resource("/{username}/role/add") - // PATCH /user/{username}/role/add => Add roles to a user + web::resource("/{userid}/role/add") + // PATCH /user/{userid}/role/add => Add roles to a user .route( web::patch() .to(http::rbac::add_roles_to_user) @@ -667,8 +674,8 @@ impl Server { ), ) .service( - web::resource("/{username}/role/remove") - // PATCH /user/{username}/role/remove => Remove roles from a user + web::resource("/{userid}/role/remove") + // PATCH /user/{userid}/role/remove => Remove roles from a user .route( web::patch() .to(http::rbac::remove_roles_from_user) @@ -677,8 +684,8 @@ impl Server { ), ) .service( - web::resource("/{username}/generate-new-password") - // POST /user/{username}/generate-new-password => reset password for this user + web::resource("/{userid}/generate-new-password") + // POST /user/{userid}/generate-new-password => reset password for this user .route( web::post() .to(http::rbac::post_gen_password) diff --git a/src/handlers/http/oidc.rs b/src/handlers/http/oidc.rs index 9efff03a3..49bbb2e8c 100644 --- a/src/handlers/http/oidc.rs +++ b/src/handlers/http/oidc.rs @@ -33,7 +33,6 @@ use tokio::sync::RwLock; use ulid::Ulid; use url::Url; -use crate::utils::{login_sync, logout_sync}; use crate::{ handlers::{ COOKIE_AGE_DAYS, SESSION_COOKIE_NAME, USER_COOKIE_NAME, USER_ID_COOKIE_NAME, @@ -124,9 +123,6 @@ pub async fn login( SessionKey::BasicAuth { username, password }, EXPIRY_DURATION, ); - let _session = session_cookie.value().to_owned(); - let _user = user.clone(); - let _ = login_sync(_session, _user, EXPIRY_DURATION, &tenant_id).await; Ok(redirect_to_client( query.redirect.as_str(), @@ -176,16 +172,14 @@ pub async fn logout(req: HttpRequest, query: web::Query) -> None }; - let res = match (user, logout_endpoint) { + match (user, logout_endpoint) { (Some(username), Some(logout_endpoint)) if Users.is_oauth(&username, &tenant_id).unwrap_or_default() => { redirect_to_oidc_logout(logout_endpoint, &query.redirect) } _ => redirect_to_client(query.redirect.as_str(), None), - }; - let _ = logout_sync(session, &tenant_id).await; - res + } } /// Handler for code callback diff --git a/src/handlers/http/rbac.rs b/src/handlers/http/rbac.rs index 9ec2ed3fe..0f8fd3840 100644 --- a/src/handlers/http/rbac.rs +++ b/src/handlers/http/rbac.rs @@ -96,17 +96,17 @@ pub async fn list_users_prism(req: HttpRequest) -> impl Responder { web::Json(prism_users) } -/// Function for GET /users/{username} +/// Function for GET /users/{userid} pub async fn get_prism_user( req: HttpRequest, - username: Path, + userid: Path, ) -> Result { - let username = username.into_inner(); + let userid = userid.into_inner(); let tenant_id = get_tenant_id_from_request(&req); // First check if the user exists let users = rbac::map::users(); if let Some(users) = users.get(tenant_id.as_deref().unwrap_or(DEFAULT_TENANT)) - && let Some(user) = users.get(&username) + && let Some(user) = users.get(&userid) && !user.protected { // Create UsersPrism for the found user only @@ -117,16 +117,16 @@ pub async fn get_prism_user( } } -// Handler for POST /api/v1/user/{username} -// Creates a new user by username if it does not exists +// Handler for POST /api/v1/user/{userid} +// Creates a new user by userid if it does not exists pub async fn post_user( req: HttpRequest, - username: web::Path, + userid: web::Path, body: Option>, ) -> Result { - let username = username.into_inner(); + let userid = userid.into_inner(); let tenant_id = get_tenant_id_from_request(&req); - validator::user_role_name(&username)?; + validator::user_role_name(&userid)?; let mut metadata = get_metadata(&tenant_id).await?; let user_roles: HashSet = if let Some(body) = body { @@ -147,16 +147,16 @@ pub async fn post_user( return Err(RBACError::RolesDoNotExist(non_existent_roles)); } let _guard = UPDATE_LOCK.lock().await; - if Users.contains(&username, &tenant_id) + if Users.contains(&userid, &tenant_id) || metadata.users.iter().any(|user| match &user.ty { - UserType::Native(basic) => basic.username == username, + UserType::Native(basic) => basic.username == userid, UserType::OAuth(_) => false, // OAuth users should be created differently }) { - return Err(RBACError::UserExists(username)); + return Err(RBACError::UserExists(userid)); } - let (user, password) = user::User::new_basic(username.clone(), tenant_id.clone(), false); + let (user, password) = user::User::new_basic(userid.clone(), tenant_id.clone(), false); metadata.users.push(user.clone()); @@ -166,7 +166,7 @@ pub async fn post_user( if !created_role.is_empty() { add_roles_to_user( req, - web::Path::::from(username.clone()), + web::Path::::from(userid.clone()), web::Json(created_role), ) .await?; @@ -174,16 +174,16 @@ pub async fn post_user( Ok(password) } -// Handler for POST /api/v1/user/{username}/generate-new-password +// Handler for POST /api/v1/user/{userid}/generate-new-password // Resets password for the user to a newly generated one and returns it pub async fn post_gen_password( req: HttpRequest, - username: web::Path, + userid: web::Path, ) -> Result { - let username = username.into_inner(); + let userid = userid.into_inner(); let tenant_id = get_tenant_id_from_request(&req); // fail this request if the user is protected - if let Some(p) = Users.is_protected(&username, &tenant_id) + if let Some(p) = Users.is_protected(&userid, &tenant_id) && p { return Err(RBACError::ProtectedUser); @@ -203,14 +203,14 @@ pub async fn post_gen_password( user::UserType::Native(ref mut user) => Some(user), _ => None, }) - .find(|user| user.username == username) + .find(|user| user.username == userid) { user.password_hash.clone_from(&hash); } else { return Err(RBACError::UserDoesNotExist); } put_metadata(&metadata, &tenant_id).await?; - Users.change_password_hash(&username, &new_hash, &tenant_id); + Users.change_password_hash(&userid, &new_hash, &tenant_id); Ok(new_password) } diff --git a/src/rbac/map.rs b/src/rbac/map.rs index 8ae2385d8..2bcc45803 100644 --- a/src/rbac/map.rs +++ b/src/rbac/map.rs @@ -115,9 +115,8 @@ pub fn init(metadata: &StorageMetadata) { let admin_permissions = RoleBuilder::from(&admin_privilege).build(); roles.insert( "super-admin".to_string(), - Role::create_user_role(vec![admin_privilege]), + Role::create_internal_role(vec![admin_privilege]), ); - // roles.insert("super-admin".to_string(), vec![admin_privilege]); let mut users = Users::from(users); let admin = user::get_super_admin_user(); @@ -129,15 +128,6 @@ pub fn init(metadata: &StorageMetadata) { password: PARSEABLE.options.password.clone(), }; let mut sessions = Sessions::default(); - // sessions.track_new( - // admin_username.clone(), - // SessionKey::BasicAuth { - // username: PARSEABLE.options.username.clone(), - // password: PARSEABLE.options.password.clone(), - // }, - // chrono::DateTime::::MAX_UTC, - // admin_permissions, - // ); PARSEABLE .streams @@ -314,6 +304,23 @@ impl Sessions { sessions.retain(|(_, expiry)| expiry < &now); } + #[inline(always)] + pub fn remove_all_expired_sessions(&mut self, tenant_id: &str) { + let now = Utc::now(); + if let Some(user_sessions) = self.user_sessions.get_mut(tenant_id) { + for (_, sessions) in user_sessions.iter_mut() { + sessions.retain(|(s, t)| { + if now > *t { + self.active_sessions.remove(s); + true + } else { + false + } + }); + } + } + } + // get permission related to this session pub fn get(&self, key: &SessionKey) -> Option<&Vec> { self.active_sessions.get(key).map(|(_, _, perms)| perms) diff --git a/src/rbac/mod.rs b/src/rbac/mod.rs index ed2765339..691a7b507 100644 --- a/src/rbac/mod.rs +++ b/src/rbac/mod.rs @@ -224,6 +224,10 @@ impl Users { pub fn new_session(&self, user: &User, session: SessionKey, expires_in: TimeDelta) { let tenant_id = &user.tenant; let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); + + // remove stale sessions + mut_sessions().remove_all_expired_sessions(tenant); + mut_sessions().track_new( user.userid().to_owned(), session, diff --git a/src/rbac/user.rs b/src/rbac/user.rs index 475fb96ab..c438012e6 100644 --- a/src/rbac/user.rs +++ b/src/rbac/user.rs @@ -122,6 +122,10 @@ impl User { matches!(self.ty, UserType::OAuth(_)) } + pub fn is_super_admin(&self) -> bool { + self.roles.contains("super-admin") + } + pub fn roles(&self) -> Vec { self.roles.iter().cloned().collect() } @@ -161,7 +165,7 @@ pub fn verify(password_hash: &str, password: &str) -> bool { // generate a one way hash for password to be stored in metadata file // ref https://github.com/P-H-C/phc-string-format/blob/master/phc-sf-spec.md -fn gen_hash(password: &str) -> String { +pub fn gen_hash(password: &str) -> String { let mut bytes = [0u8; 32]; let r = &mut OsRng; r.fill_bytes(&mut bytes); @@ -361,9 +365,13 @@ impl UserGroup { let mut non_existent_roles = Vec::new(); if !self.roles.is_empty() { // validate that the roles exist + // can't add internal roles to a group if let Some(tenant_roles) = roles().get(tenant) { for role in &self.roles { - if !tenant_roles.contains_key(role) { + if let Some(role) = tenant_roles.get(role) + && role.role_type().eq(&RoleType::User) + { + } else { non_existent_roles.push(role.clone()); } } diff --git a/src/storage/field_stats.rs b/src/storage/field_stats.rs index 431b34195..78e6106e2 100644 --- a/src/storage/field_stats.rs +++ b/src/storage/field_stats.rs @@ -34,7 +34,9 @@ use crate::query::QUERY_SESSION_STATE; use crate::storage::ObjectStorageError; use crate::storage::StreamType; use crate::tenants::TENANT_METADATA; +use crate::utils::create_intracluster_auth_headermap; use crate::utils::get_tenant_id_from_request; +use crate::utils::get_user_from_request; use crate::utils::json::apply_generic_flattening_for_partition; use actix_web::HttpRequest; use actix_web::HttpResponse; @@ -571,7 +573,21 @@ pub async fn get_dataset_stats( ); Some(map) } else { - None + let auth = create_intracluster_auth_headermap( + req.headers(), + "", + &get_user_from_request(&req).unwrap(), + ); + let mut map = HeaderMap::new(); + + for (key, value) in auth.iter() { + if let Ok(name) = HeaderName::from_bytes(key.as_str().as_bytes()) + && let Ok(val) = HeaderValue::from_bytes(value.as_bytes()) + { + map.insert(name, val); + } + } + Some(map) }; let response = match send_query_request(auth, &query_request, &tenant_id).await { Ok((query_response, _)) => query_response, diff --git a/src/utils/actix.rs b/src/utils/actix.rs index aae3638f3..ac76075a8 100644 --- a/src/utils/actix.rs +++ b/src/utils/actix.rs @@ -18,7 +18,7 @@ */ use actix_web::{ - Error, FromRequest, HttpRequest, + Error, FromRequest, HttpMessage, HttpRequest, dev::ServiceRequest, error::{ErrorUnauthorized, ErrorUnprocessableEntity}, }; @@ -39,6 +39,8 @@ pub fn extract_session_key(req: &mut ServiceRequest) -> Result() { + Ok(session.clone()) } else if let Some(cookie) = req.cookie("session") { let ulid = ulid::Ulid::from_string(cookie.value()) .map_err(|_| ErrorUnprocessableEntity("Cookie is tampered with or invalid"))?; @@ -61,6 +63,8 @@ pub fn extract_session_key_from_req(req: &HttpRequest) -> Result() { + Ok(session.clone()) } else if let Some(cookie) = req.cookie("session") { let ulid = ulid::Ulid::from_string(cookie.value()) .map_err(|_| ErrorUnprocessableEntity("Cookie is tampered with or invalid"))?; diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 6b37aea88..237094444 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -26,26 +26,21 @@ pub mod time; pub mod uid; pub mod update; -use crate::INTRA_CLUSTER_CLIENT; use crate::handlers::TENANT_ID; -use crate::handlers::http::base_path_without_preceding_slash; -use crate::handlers::http::cluster::for_each_live_node; +use crate::handlers::http::middleware::{CLUSTER_SECRET, CLUSTER_SECRET_HEADER}; use crate::handlers::http::rbac::RBACError; use crate::parseable::{DEFAULT_TENANT, PARSEABLE}; use crate::query::resolve_stream_names; use crate::rbac::Users; use crate::rbac::map::{SessionKey, sessions}; use crate::rbac::role::{Action, ParseableResourceType, Permission}; -use crate::rbac::user::User; use actix::extract_session_key_from_req; use actix_web::dev::ServiceRequest; use actix_web::http::header::HeaderMap; use actix_web::{FromRequest, HttpRequest}; use actix_web_httpauth::extractors::basic::BasicAuth; -use chrono::{Duration, NaiveDate, NaiveDateTime, NaiveTime, Utc}; -use http::header; +use chrono::{NaiveDate, NaiveDateTime, NaiveTime, Utc}; use regex::Regex; -use serde_json::json; use sha2::{Digest, Sha256}; pub fn get_node_id() -> String { @@ -246,9 +241,28 @@ pub fn is_admin(req: &HttpRequest) -> Result { pub fn create_intracluster_auth_headermap( req: &HeaderMap, node_token: &str, + userid: &str, ) -> reqwest::header::HeaderMap { let mut map = reqwest::header::HeaderMap::new(); - if let Some(auth) = req.get(actix_web::http::header::AUTHORIZATION) { + let tenant_id = req + .get(TENANT_ID) + .map(|tenant_value| tenant_value.to_str().unwrap().to_owned()); + if let Some((_, hash)) = CLUSTER_SECRET.get() { + // also insert server secret along with other required headers for query forwarding + map.insert( + reqwest::header::HeaderName::from_static(CLUSTER_SECRET_HEADER), + reqwest::header::HeaderValue::from_str(hash).unwrap(), + ); + map.insert( + reqwest::header::HeaderName::from_static("intra-cluster-tenant"), + reqwest::header::HeaderValue::from_str(tenant_id.as_deref().unwrap_or(DEFAULT_TENANT)) + .unwrap(), + ); + map.insert( + reqwest::header::HeaderName::from_static("intra-cluster-userid"), + reqwest::header::HeaderValue::from_str(userid).unwrap(), + ); + } else if let Some(auth) = req.get(actix_web::http::header::AUTHORIZATION) { map.insert( reqwest::header::AUTHORIZATION, reqwest::header::HeaderValue::from_bytes(auth.as_bytes()).unwrap(), @@ -269,69 +283,3 @@ pub fn create_intracluster_auth_headermap( } map } - -pub async fn login_sync( - session: String, - user: User, - expiry: Duration, - tenant_id: &Option, -) -> Result<(), anyhow::Error> { - for_each_live_node(tenant_id, move |node| { - let url = format!( - "{}{}/o/login/sync", - node.domain_name, - base_path_without_preceding_slash(), - ); - let _session = session.clone(); - let _user = user.clone(); - let _expiry = expiry; - - async move { - INTRA_CLUSTER_CLIENT - .post(url) - .header(header::AUTHORIZATION, node.token) - .header(header::CONTENT_TYPE, "application/json") - .json(&json!( - { - "sessionCookie": _session, - "user": _user, - "expiry": _expiry - } - )) - .send() - .await?; - Ok::<(), anyhow::Error>(()) - } - }) - .await -} - -pub async fn logout_sync( - session: SessionKey, - tenant_id: &Option, -) -> Result<(), anyhow::Error> { - for_each_live_node(tenant_id, move |node| { - let url = format!( - "{}{}/o/logout/sync", - node.domain_name, - base_path_without_preceding_slash(), - ); - let _session = session.clone(); - - async move { - INTRA_CLUSTER_CLIENT - .post(url) - .header(header::AUTHORIZATION, node.token) - .header(header::CONTENT_TYPE, "application/json") - .json(&json!( - { - "sessionKey": _session - } - )) - .send() - .await?; - Ok::<(), anyhow::Error>(()) - } - }) - .await -} diff --git a/src/validator.rs b/src/validator.rs index ea628c3d9..0410e396a 100644 --- a/src/validator.rs +++ b/src/validator.rs @@ -72,6 +72,8 @@ pub fn stream_name( static RESERVED_NAMES: Lazy> = Lazy::new(|| { [ + "super-admin", + "superadmin", "admin", "user", "role", From d194faeb65c72c3e73eec2675759a6fb9aad1102 Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Fri, 27 Feb 2026 15:53:19 +0530 Subject: [PATCH 3/4] coderabbit suggestions --- src/handlers/http/middleware.rs | 35 ++++++++++++++++++++++----------- src/rbac/map.rs | 17 ---------------- src/rbac/mod.rs | 3 --- 3 files changed, 24 insertions(+), 31 deletions(-) diff --git a/src/handlers/http/middleware.rs b/src/handlers/http/middleware.rs index 0f8880981..f0d777879 100644 --- a/src/handlers/http/middleware.rs +++ b/src/handlers/http/middleware.rs @@ -448,7 +448,7 @@ pub fn auth_user_context( return Ok(rbac::Response::Suspended(msg)); } let creds = extract_session_key(req); - let user = req.match_info().get("username"); + let user = req.match_info().get("userid"); creds.map(|key| Users.authorize(key, action, None, user)) } @@ -545,7 +545,7 @@ where forward_ready!(service); fn call(&self, mut req: ServiceRequest) -> Self::Future { - let err = if let Some((_, hash)) = CLUSTER_SECRET.get() { + let (err, id) = if let Some((secret, _)) = CLUSTER_SECRET.get() { if let Some(header) = req.headers().get(CLUSTER_SECRET_HEADER) && let Some(tenant) = req.headers().get("intra-cluster-tenant") && let Some(userid) = req.headers().get("intra-cluster-userid") @@ -556,7 +556,7 @@ where // validate the incoming header value let parsed_hash = PasswordHash::new(incoming_secret).unwrap(); if Argon2::default() - .verify_password(hash.as_bytes(), &parsed_hash) + .verify_password(secret.as_bytes(), &parsed_hash) .is_ok() { // create a user session (how to remove that later?) @@ -565,7 +565,7 @@ where } else { Some(tenant.to_owned()) }; - if let Some(user) = Users.get_user(userid, &tenant_id) { + let id = if let Some(user) = Users.get_user(userid, &tenant_id) { let id = Ulid::new(); req.headers_mut().insert( header::COOKIE, @@ -574,18 +574,27 @@ where let session = SessionKey::SessionId(id); req.extensions_mut().insert(session.clone()); Users.new_session(&user, session, TimeDelta::seconds(20)); - } - None + Some(id) + } else { + None + }; + (None, id) } else { - Some("Incoming intra-cluster request validation failed") + ( + Some("Incoming intra-cluster request validation failed"), + None, + ) } } else { - Some( - "Incoming intra-cluster request doesn't contain the proper header or the server was started without P_CLUSTER_SECRET", + ( + Some( + "Incoming intra-cluster request doesn't contain the proper header or the server was started without P_CLUSTER_SECRET", + ), + None, ) } } else { - None + (None, None) }; let fut = self.service.call(req); @@ -594,7 +603,11 @@ where if let Some(err) = err { return Err(ErrorUnauthorized(err)); } - fut.await + let res = fut.await; + if let Some(id) = id { + mut_sessions().remove_session(&SessionKey::SessionId(id)); + } + res }) } } diff --git a/src/rbac/map.rs b/src/rbac/map.rs index 2bcc45803..ab158f11a 100644 --- a/src/rbac/map.rs +++ b/src/rbac/map.rs @@ -304,23 +304,6 @@ impl Sessions { sessions.retain(|(_, expiry)| expiry < &now); } - #[inline(always)] - pub fn remove_all_expired_sessions(&mut self, tenant_id: &str) { - let now = Utc::now(); - if let Some(user_sessions) = self.user_sessions.get_mut(tenant_id) { - for (_, sessions) in user_sessions.iter_mut() { - sessions.retain(|(s, t)| { - if now > *t { - self.active_sessions.remove(s); - true - } else { - false - } - }); - } - } - } - // get permission related to this session pub fn get(&self, key: &SessionKey) -> Option<&Vec> { self.active_sessions.get(key).map(|(_, _, perms)| perms) diff --git a/src/rbac/mod.rs b/src/rbac/mod.rs index 691a7b507..25dea5a6a 100644 --- a/src/rbac/mod.rs +++ b/src/rbac/mod.rs @@ -225,9 +225,6 @@ impl Users { let tenant_id = &user.tenant; let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); - // remove stale sessions - mut_sessions().remove_all_expired_sessions(tenant); - mut_sessions().track_new( user.userid().to_owned(), session, From ee37e68dad03d1f96d3e9feb4bf77d450ac99e05 Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Fri, 27 Feb 2026 18:16:22 +0530 Subject: [PATCH 4/4] updates for sync --- src/handlers/http/cluster/mod.rs | 61 ++++++++++++++-- .../http/modal/ingest/ingestor_rbac.rs | 12 ++-- .../http/modal/ingest/ingestor_role.rs | 49 ++++++++++++- src/handlers/http/modal/ingest_server.rs | 8 +-- src/handlers/http/modal/query/querier_rbac.rs | 12 ++-- src/handlers/http/modal/query/querier_role.rs | 71 ++++++++++++++++++- src/handlers/http/modal/query_server.rs | 18 +++-- 7 files changed, 201 insertions(+), 30 deletions(-) diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index cefe607b6..45ab79097 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -523,6 +523,7 @@ pub async fn sync_users_with_roles_with_ingestors( let userid = userid.to_owned(); let headers = req.headers().clone(); let op = operation.to_string(); + let caller_userid = get_user_from_request(req).unwrap(); for_each_live_node(tenant_id, move |ingestor| { let url = format!( "{}{}/user/{}/role/sync/{}", @@ -533,7 +534,8 @@ pub async fn sync_users_with_roles_with_ingestors( ); let role_data = role_data.clone(); - let headermap = create_intracluster_auth_headermap(&headers, &ingestor.token, &userid); + let headermap = + create_intracluster_auth_headermap(&headers, &ingestor.token, &caller_userid); async move { let res = INTRA_CLUSTER_CLIENT .patch(url) @@ -572,6 +574,7 @@ pub async fn sync_user_deletion_with_ingestors( tenant_id: &Option, ) -> Result<(), RBACError> { let userid = userid.to_owned(); + let caller_userid = get_user_from_request(req).unwrap(); let headers = req.headers().clone(); for_each_live_node(tenant_id, move |ingestor| { let url = format!( @@ -580,7 +583,8 @@ pub async fn sync_user_deletion_with_ingestors( base_path_without_preceding_slash(), userid ); - let headermap = create_intracluster_auth_headermap(&headers, &ingestor.token, &userid); + let headermap = + create_intracluster_auth_headermap(&headers, &ingestor.token, &caller_userid); async move { let res = INTRA_CLUSTER_CLIENT .delete(url) @@ -629,6 +633,7 @@ pub async fn sync_user_creation( RBACError::SerdeError(err) })?; + let caller_userid = get_user_from_request(req)?; let userid = userid.to_string(); let headers = req.headers().clone(); for_each_live_node(tenant_id, move |node| { @@ -638,7 +643,7 @@ pub async fn sync_user_creation( base_path_without_preceding_slash(), userid ); - let headermap = create_intracluster_auth_headermap(&headers, &node.token, &userid); + let headermap = create_intracluster_auth_headermap(&headers, &node.token, &caller_userid); let user_data = user_data.clone(); async move { @@ -678,6 +683,7 @@ pub async fn sync_password_reset_with_ingestors( ) -> Result<(), RBACError> { let userid = username.to_owned(); let tenant_id = get_tenant_id_from_request(&req); + let caller_userid = get_user_from_request(&req).unwrap(); let headers = req.headers().clone(); for_each_live_node(&tenant_id, move |ingestor| { let url = format!( @@ -686,7 +692,8 @@ pub async fn sync_password_reset_with_ingestors( base_path_without_preceding_slash(), userid ); - let headermap = create_intracluster_auth_headermap(&headers, &ingestor.token, &userid); + let headermap = + create_intracluster_auth_headermap(&headers, &ingestor.token, &caller_userid); async move { let res = INTRA_CLUSTER_CLIENT .post(url) @@ -767,6 +774,52 @@ pub async fn sync_role_update( .await } +// forward the put role request to all ingestors and queriers to keep them in sync +pub async fn sync_role_delete( + req: &HttpRequest, + name: String, + tenant_id: &Option, +) -> Result<(), RoleError> { + let userid = get_user_from_request(req).unwrap(); + let headers = req.headers().clone(); + for_each_live_node(tenant_id, move |node| { + let url = format!( + "{}{}/role/{}/sync", + node.domain_name, + base_path_without_preceding_slash(), + name + ); + + let headermap = create_intracluster_auth_headermap(&headers, &node.token, &userid); + async move { + let res = INTRA_CLUSTER_CLIENT + .delete(url) + .headers(headermap) + .header(header::CONTENT_TYPE, "application/json") + .send() + .await + .map_err(|err| { + error!( + "Fatal: failed to forward request to node: {}\n Error: {:?}", + node.domain_name, err + ); + RoleError::Network(err) + })?; + + if !res.status().is_success() { + error!( + "failed to forward request to node: {}\nResponse Returned: {:?}", + node.domain_name, + res.text().await + ); + } + + Ok(()) + } + }) + .await +} + pub fn fetch_daily_stats( date: &str, stream_meta_list: &[ObjectStoreFormat], diff --git a/src/handlers/http/modal/ingest/ingestor_rbac.rs b/src/handlers/http/modal/ingest/ingestor_rbac.rs index 034723129..5bf9bde59 100644 --- a/src/handlers/http/modal/ingest/ingestor_rbac.rs +++ b/src/handlers/http/modal/ingest/ingestor_rbac.rs @@ -35,14 +35,14 @@ use crate::{ utils::get_tenant_id_from_request, }; -// Handler for POST /api/v1/user/{username} +// Handler for POST /api/v1/user/{userid} // Creates a new user by username if it does not exists pub async fn post_user( req: HttpRequest, - username: web::Path, + userid: web::Path, body: Option>, ) -> Result { - let username = username.into_inner(); + let username = userid.into_inner(); if let Some(body) = body { let user: ParseableUser = serde_json::from_value(body.into_inner())?; @@ -198,13 +198,13 @@ pub async fn remove_roles_from_user( Ok(HttpResponse::Ok().status(StatusCode::OK).finish()) } -// Handler for POST /api/v1/user/{username}/generate-new-password +// Handler for POST /api/v1/user/{userid}/generate-new-password // Resets password for the user to a newly generated one and returns it pub async fn post_gen_password( req: HttpRequest, - username: web::Path, + userid: web::Path, ) -> Result { - let username = username.into_inner(); + let username = userid.into_inner(); let tenant_id = get_tenant_id_from_request(&req); let mut new_hash = String::default(); let mut metadata = get_metadata(&tenant_id).await?; diff --git a/src/handlers/http/modal/ingest/ingestor_role.rs b/src/handlers/http/modal/ingest/ingestor_role.rs index 50ad3f27f..dba9528e0 100644 --- a/src/handlers/http/modal/ingest/ingestor_role.rs +++ b/src/handlers/http/modal/ingest/ingestor_role.rs @@ -64,7 +64,6 @@ pub async fn put( .entry(tenant_id.clone()) .or_default() .insert(name.clone(), sync_req.privileges); - // mut_roles().insert(name.clone(), privileges); // refresh the sessions of all users using this role // for this, iterate over all user_groups and users and create a hashset of users @@ -93,3 +92,51 @@ pub async fn put( Ok(HttpResponse::Ok().finish()) } + +// Handler for DELETE /api/v1/role/{name} +// Deletes the role +pub async fn delete( + req: HttpRequest, + name: web::Path, +) -> Result { + let name = name.into_inner(); + let req_tenant_id = get_tenant_id_from_request(&req); + + let mut metadata = get_metadata(&req_tenant_id).await?; + metadata.roles.remove(&name); + + let _ = storage::put_staging_metadata(&metadata, &req_tenant_id); + let tenant_id = req_tenant_id + .as_deref() + .unwrap_or(DEFAULT_TENANT) + .to_owned(); + mut_roles() + .entry(tenant_id.clone()) + .or_default() + .remove(&name); + + let mut session_refresh_users: HashSet = HashSet::new(); + if let Some(groups) = read_user_groups().get(&tenant_id) { + for user_group in groups.values() { + if user_group.roles.contains(&name) { + session_refresh_users + .extend(user_group.users.iter().map(|u| u.userid().to_string())); + } + } + } + + // iterate over all users to see if they have this role + if let Some(users) = users().get(&tenant_id) { + for user in users.values() { + if user.roles.contains(&name) { + session_refresh_users.insert(user.userid().to_string()); + } + } + } + + for userid in session_refresh_users { + mut_sessions().remove_user(&userid, &tenant_id); + } + + Ok(HttpResponse::Ok().finish()) +} diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index 59e5b0787..67c9737e6 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -185,8 +185,8 @@ impl IngestServer { pub fn get_user_webscope() -> Scope { web::scope("/user") .service( - web::resource("/{username}/sync") - // POST /user/{username}/sync => Sync creation of a new user + web::resource("/{userid}/sync") + // POST /user/{userid}/sync => Sync creation of a new user .route( web::post() .to(ingestor_rbac::post_user) @@ -225,8 +225,8 @@ impl IngestServer { ), ) .service( - web::resource("/{username}/generate-new-password/sync") - // POST /user/{username}/generate-new-password => reset password for this user + web::resource("/{userid}/generate-new-password/sync") + // POST /user/{userid}/generate-new-password => reset password for this user .route( web::post() .to(ingestor_rbac::post_gen_password) diff --git a/src/handlers/http/modal/query/querier_rbac.rs b/src/handlers/http/modal/query/querier_rbac.rs index 6013c7e08..fc396fa06 100644 --- a/src/handlers/http/modal/query/querier_rbac.rs +++ b/src/handlers/http/modal/query/querier_rbac.rs @@ -39,14 +39,14 @@ use crate::{ validator, }; -// Handler for POST /api/v1/user/{username} +// Handler for POST /api/v1/user/{userid} // Creates a new user by username if it does not exists pub async fn post_user( req: HttpRequest, - username: web::Path, + userid: web::Path, body: Option>, ) -> Result { - let username = username.into_inner(); + let username = userid.into_inner(); let tenant_id = get_tenant_id_from_request(&req); validator::user_role_name(&username)?; let mut metadata = get_metadata(&tenant_id).await?; @@ -300,13 +300,13 @@ pub async fn remove_roles_from_user( Ok(HttpResponse::Ok().json(format!("Roles updated successfully for {username}"))) } -// Handler for POST /api/v1/user/{username}/generate-new-password +// Handler for POST /api/v1/user/{userid}/generate-new-password // Resets password for the user to a newly generated one and returns it pub async fn post_gen_password( req: HttpRequest, - username: web::Path, + userid: web::Path, ) -> Result { - let username = username.into_inner(); + let username = userid.into_inner(); let mut new_password = String::default(); let mut new_hash = String::default(); let tenant_id = get_tenant_id_from_request(&req); diff --git a/src/handlers/http/modal/query/querier_role.rs b/src/handlers/http/modal/query/querier_role.rs index 083736c8a..5c1a9a770 100644 --- a/src/handlers/http/modal/query/querier_role.rs +++ b/src/handlers/http/modal/query/querier_role.rs @@ -25,13 +25,13 @@ use actix_web::{ use crate::{ handlers::http::{ - cluster::sync_role_update, + cluster::{sync_role_delete, sync_role_update}, modal::utils::rbac_utils::{get_metadata, put_metadata}, role::RoleError, }, parseable::DEFAULT_TENANT, rbac::{ - map::{mut_roles, mut_sessions, read_user_groups, users}, + map::{mut_roles, mut_sessions, read_user_groups, roles, users}, role::model::{Role, RoleType}, }, utils::get_tenant_id_from_request, @@ -108,3 +108,70 @@ pub async fn put( Ok(HttpResponse::Ok().finish()) } + +// Handler for DELETE /api/v1/role/{name} +// Delete existing role +pub async fn delete( + req: HttpRequest, + name: web::Path, +) -> Result { + let name = name.into_inner(); + let tenant_id = get_tenant_id_from_request(&req); + let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); + if let Some(tenant_roles) = roles().get(tenant) + && let Some(role) = tenant_roles.get(&name) + && role.role_type().eq(&RoleType::Internal) + { + return Err(RoleError::ProtectedRole); + } + + // check if the role is being used by any user or group + let mut metadata = get_metadata(&tenant_id).await?; + if metadata.users.iter().any(|user| user.roles.contains(&name)) { + return Err(RoleError::RoleInUse); + } + if metadata + .user_groups + .iter() + .any(|user_group| user_group.roles.contains(&name)) + { + return Err(RoleError::RoleInUse); + } + metadata.roles.remove(&name); + put_metadata(&metadata, &tenant_id).await?; + + mut_roles() + .entry(tenant.to_owned()) + .or_default() + .remove(&name); + // mut_roles().remove(&name); + + // refresh the sessions of all users using this role + // for this, iterate over all user_groups and users and create a hashset of users + let mut session_refresh_users: HashSet = HashSet::new(); + if let Some(groups) = read_user_groups().get(tenant) { + for user_group in groups.values() { + if user_group.roles.contains(&name) { + session_refresh_users + .extend(user_group.users.iter().map(|u| u.userid().to_string())); + } + } + } + + // iterate over all users to see if they have this role + if let Some(users) = users().get(tenant) { + for user in users.values() { + if user.roles.contains(&name) { + session_refresh_users.insert(user.userid().to_string()); + } + } + } + + for userid in session_refresh_users { + mut_sessions().remove_user(&userid, tenant); + } + + sync_role_delete(&req, name.clone(), &tenant_id).await?; + + Ok(HttpResponse::Ok().finish()) +} diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 690eec76a..e9c006914 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -181,7 +181,11 @@ impl QueryServer { // PUT, GET, DELETE Roles resource("/{name}") .route(web::put().to(querier_role::put).authorize(Action::PutRole)) - .route(web::delete().to(role::delete).authorize(Action::DeleteRole)) + .route( + web::delete() + .to(querier_role::delete) + .authorize(Action::DeleteRole), + ) .route(web::get().to(role::get).authorize(Action::GetRole)), ) } @@ -195,8 +199,8 @@ impl QueryServer { .route(web::get().to(rbac::list_users).authorize(Action::ListUser)), ) .service( - web::resource("/{username}") - // POST /user/{username} => Create a new user + web::resource("/{userid}") + // POST /user/{userid} => Create a new user .route( web::post() .to(querier_rbac::post_user) @@ -222,7 +226,7 @@ impl QueryServer { // PATCH /user/{userid}/role/add => Add roles to a user .route( web::patch() - .to(rbac::add_roles_to_user) + .to(querier_rbac::add_roles_to_user) .authorize(Action::PutUserRoles) .wrap(DisAllowRootUser), ), @@ -232,14 +236,14 @@ impl QueryServer { // PATCH /user/{userid}/role/remove => Remove roles from a user .route( web::patch() - .to(rbac::remove_roles_from_user) + .to(querier_rbac::remove_roles_from_user) .authorize(Action::PutUserRoles) .wrap(DisAllowRootUser), ), ) .service( - web::resource("/{username}/generate-new-password") - // POST /user/{username}/generate-new-password => reset password for this user + web::resource("/{userid}/generate-new-password") + // POST /user/{userid}/generate-new-password => reset password for this user .route( web::post() .to(querier_rbac::post_gen_password)