Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions src/alerts/alert_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use std::{str::FromStr, time::Duration};

use actix_web::http::header::{HeaderMap, HeaderName, HeaderValue};
use actix_web::http::header::{HeaderName, HeaderValue};
use chrono::{DateTime, Utc};
use serde_json::Value;
use tonic::async_trait;
Expand All @@ -36,9 +36,12 @@ use crate::{
get_number_of_agg_exprs,
target::{self, NotificationConfig},
},
handlers::http::query::create_streams_for_distributed,
handlers::http::{
middleware::{CLUSTER_SECRET, CLUSTER_SECRET_HEADER},
query::create_streams_for_distributed,
},
metastore::metastore_traits::MetastoreObject,
parseable::PARSEABLE,
parseable::{DEFAULT_TENANT, PARSEABLE},
query::resolve_stream_names,
rbac::map::SessionKey,
storage::object_storage::alert_json_path,
Expand Down Expand Up @@ -87,18 +90,29 @@ impl MetastoreObject for ThresholdAlert {
impl AlertTrait for ThresholdAlert {
async fn eval_alert(&self) -> Result<Option<String>, AlertError> {
let time_range = extract_time_range(&self.eval_config)?;
let auth = if let Some(tenant) = self.tenant_id.as_ref()
&& let Some(header) = TENANT_METADATA.get_global_query_auth(tenant)
{
let mut map = HeaderMap::new();

let tenant = self.tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
let auth = if let Some((_, hash)) = CLUSTER_SECRET.get() {
let mut map = actix_web::http::header::HeaderMap::new();
if let Some(header) = TENANT_METADATA.get_global_query_auth(tenant) {
map.insert(
HeaderName::from_static("authorization"),
HeaderValue::from_str(&header).unwrap(),
);
}
map.insert(
HeaderName::from_static(CLUSTER_SECRET_HEADER),
HeaderValue::from_str(hash).unwrap(),
);
map.insert(
HeaderName::from_static("authorization"),
HeaderValue::from_str(&header).unwrap(),
HeaderName::from_static("intra-cluster-tenant"),
HeaderValue::from_str(tenant).unwrap(),
);
Some(map)
} else {
None
};

let query_result =
execute_alert_query(auth, self.get_query(), &time_range, &self.tenant_id).await?;

Expand Down
9 changes: 9 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,14 @@ pub struct Options {
)]
pub querier_endpoint: String,

#[arg(
long,
env = "P_PRISM_ENDPOINT",
default_value = "",
help = "URL to connect to the prism node. Default is the address of the server"
)]
pub prism_endpoint: String,

#[command(flatten)]
pub oidc: Option<OidcConfig>,

Expand Down Expand Up @@ -593,6 +601,7 @@ impl Options {
Mode::Ingest => self.get_endpoint(&self.ingestor_endpoint, "P_INGESTOR_ENDPOINT"),
Mode::Index => self.get_endpoint(&self.indexer_endpoint, "P_INDEXER_ENDPOINT"),
Mode::Query => self.get_endpoint(&self.querier_endpoint, "P_QUERIER_ENDPOINT"),
Mode::Prism => self.get_endpoint(&self.prism_endpoint, "P_PRISM_ENDPOINT"),
_ => return self.build_url(&self.address),
};

Expand Down
92 changes: 81 additions & 11 deletions src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
pub mod utils;
use actix_web::http::StatusCode;
use actix_web::http::header::HeaderMap;
use base64::Engine;
use base64::prelude::BASE64_STANDARD;
use futures::{StreamExt, future, stream};
use http::header;
use lazy_static::lazy_static;
Expand Down Expand Up @@ -50,7 +52,9 @@ use crate::rbac::role::model::Role;
use crate::rbac::user::User;
use crate::stats::Stats;
use crate::storage::{ObjectStorageError, ObjectStoreFormat};
use crate::utils::{create_intracluster_auth_headermap, get_tenant_id_from_request};
use crate::utils::{
create_intracluster_auth_headermap, get_tenant_id_from_request, get_user_from_request,
};

use super::base_path_without_preceding_slash;
use super::ingest::PostError;
Expand Down Expand Up @@ -519,6 +523,7 @@ pub async fn sync_users_with_roles_with_ingestors(
let userid = userid.to_owned();
let headers = req.headers().clone();
let op = operation.to_string();
let caller_userid = get_user_from_request(req).unwrap();
for_each_live_node(tenant_id, move |ingestor| {
let url = format!(
"{}{}/user/{}/role/sync/{}",
Expand All @@ -529,7 +534,8 @@ pub async fn sync_users_with_roles_with_ingestors(
);

let role_data = role_data.clone();
let headermap = create_intracluster_auth_headermap(&headers, &ingestor.token);
let headermap =
create_intracluster_auth_headermap(&headers, &ingestor.token, &caller_userid);
async move {
let res = INTRA_CLUSTER_CLIENT
.patch(url)
Expand Down Expand Up @@ -568,6 +574,7 @@ pub async fn sync_user_deletion_with_ingestors(
tenant_id: &Option<String>,
) -> Result<(), RBACError> {
let userid = userid.to_owned();
let caller_userid = get_user_from_request(req).unwrap();
let headers = req.headers().clone();
for_each_live_node(tenant_id, move |ingestor| {
let url = format!(
Expand All @@ -576,7 +583,8 @@ pub async fn sync_user_deletion_with_ingestors(
base_path_without_preceding_slash(),
userid
);
let headermap = create_intracluster_auth_headermap(&headers, &ingestor.token);
let headermap =
create_intracluster_auth_headermap(&headers, &ingestor.token, &caller_userid);
async move {
let res = INTRA_CLUSTER_CLIENT
.delete(url)
Expand Down Expand Up @@ -625,6 +633,7 @@ pub async fn sync_user_creation(
RBACError::SerdeError(err)
})?;

let caller_userid = get_user_from_request(req)?;
let userid = userid.to_string();
let headers = req.headers().clone();
for_each_live_node(tenant_id, move |node| {
Expand All @@ -634,7 +643,7 @@ pub async fn sync_user_creation(
base_path_without_preceding_slash(),
userid
);
let headermap = create_intracluster_auth_headermap(&headers, &node.token);
let headermap = create_intracluster_auth_headermap(&headers, &node.token, &caller_userid);
let user_data = user_data.clone();

async move {
Expand Down Expand Up @@ -672,20 +681,23 @@ pub async fn sync_password_reset_with_ingestors(
req: HttpRequest,
username: &str,
) -> Result<(), RBACError> {
let username = username.to_owned();
let userid = username.to_owned();
let tenant_id = get_tenant_id_from_request(&req);
let caller_userid = get_user_from_request(&req).unwrap();
let headers = req.headers().clone();
for_each_live_node(&tenant_id, move |ingestor| {
let url = format!(
"{}{}/user/{}/generate-new-password/sync",
ingestor.domain_name,
base_path_without_preceding_slash(),
username
userid
);

let headermap =
create_intracluster_auth_headermap(&headers, &ingestor.token, &caller_userid);
async move {
let res = INTRA_CLUSTER_CLIENT
.post(url)
.header(header::AUTHORIZATION, &ingestor.token)
.headers(headermap)
.header(header::CONTENT_TYPE, "application/json")
.send()
.await
Expand Down Expand Up @@ -713,11 +725,14 @@ pub async fn sync_password_reset_with_ingestors(

// forward the put role request to all ingestors and queriers to keep them in sync
pub async fn sync_role_update(
req: &HttpRequest,
name: String,
role: Role,
tenant_id: &Option<String>,
) -> Result<(), RoleError> {
let tenant = tenant_id.to_owned();
let userid = get_user_from_request(req).unwrap();
let headers = req.headers().clone();
for_each_live_node(tenant_id, move |node| {
let url = format!(
"{}{}/role/{}/sync",
Expand All @@ -727,12 +742,12 @@ pub async fn sync_role_update(
);

let role = role.clone();

let headermap = create_intracluster_auth_headermap(&headers, &node.token, &userid);
let tenant_id = tenant.clone();
async move {
let res = INTRA_CLUSTER_CLIENT
.put(url)
.header(header::AUTHORIZATION, &node.token)
.headers(headermap)
.header(header::CONTENT_TYPE, "application/json")
.json(&SyncRole::new(role, tenant_id.clone()))
.send()
Expand All @@ -759,6 +774,52 @@ pub async fn sync_role_update(
.await
}

// forward the put role request to all ingestors and queriers to keep them in sync
pub async fn sync_role_delete(
req: &HttpRequest,
name: String,
tenant_id: &Option<String>,
) -> Result<(), RoleError> {
let userid = get_user_from_request(req).unwrap();
let headers = req.headers().clone();
for_each_live_node(tenant_id, move |node| {
let url = format!(
"{}{}/role/{}/sync",
node.domain_name,
base_path_without_preceding_slash(),
name
);

let headermap = create_intracluster_auth_headermap(&headers, &node.token, &userid);
async move {
let res = INTRA_CLUSTER_CLIENT
.delete(url)
.headers(headermap)
.header(header::CONTENT_TYPE, "application/json")
.send()
.await
.map_err(|err| {
error!(
"Fatal: failed to forward request to node: {}\n Error: {:?}",
node.domain_name, err
);
RoleError::Network(err)
})?;

if !res.status().is_success() {
error!(
"failed to forward request to node: {}\nResponse Returned: {:?}",
node.domain_name,
res.text().await
);
}

Ok(())
}
})
.await
}

pub fn fetch_daily_stats(
date: &str,
stream_meta_list: &[ObjectStoreFormat],
Expand Down Expand Up @@ -1904,7 +1965,6 @@ pub async fn send_query_request(
let mut map = reqwest::header::HeaderMap::new();

if let Some(auth) = auth_token {
// always basic auth
for (key, value) in auth.iter() {
if let Ok(name) = reqwest::header::HeaderName::from_bytes(key.as_str().as_bytes())
&& let Ok(val) = reqwest::header::HeaderValue::from_bytes(value.as_bytes())
Expand All @@ -1918,6 +1978,16 @@ pub async fn send_query_request(
reqwest::header::HeaderValue::from_str(&querier.token).unwrap(),
);
};
if map.get("intra-cluster-userid").is_none() {
let token: Vec<&str> = querier.token().split(' ').collect();
let decode = BASE64_STANDARD.decode(token[1]).unwrap();
let user = String::from_utf8(decode).unwrap();
let user = user.split_once(':').unwrap();
map.insert(
reqwest::header::HeaderName::from_static("intra-cluster-userid"),
reqwest::header::HeaderValue::from_str(user.0).unwrap(),
);
}
let res = match INTRA_CLUSTER_CLIENT
.post(uri)
.timeout(Duration::from_secs(300))
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,10 +434,10 @@ pub async fn handle_otel_traces_ingestion(
// fails if the logstream does not exist
pub async fn post_event(
req: HttpRequest,
stream_name: Path<String>,
logstream: Path<String>,
Json(json): Json<StrictValue>,
) -> Result<HttpResponse, PostError> {
let stream_name = stream_name.into_inner();
let stream_name = logstream.into_inner();
let tenant_id = get_tenant_id_from_request(&req);
let internal_stream_names = PARSEABLE.streams.list_internal_streams(&tenant_id);
if internal_stream_names.contains(&stream_name) {
Expand Down
Loading
Loading