Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 47 additions & 68 deletions lib/logflare/alerting.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ defmodule Logflare.Alerting do
alias Logflare.Cluster
alias Logflare.Endpoints
alias Logflare.Google.BigQuery.GCPConfig
alias Logflare.Google.BigQuery.GenUtils
alias Logflare.Repo
alias Logflare.Teams
alias Logflare.TeamUsers.TeamUser
alias Logflare.User
alias Logflare.Utils.LoggerMetadata

require Logger
require OpenTelemetry.Tracer
Expand Down Expand Up @@ -431,72 +431,51 @@ defmodule Logflare.Alerting do
@spec execute_alert_query(AlertQuery.t(), use_query_cache: boolean) ::
{:ok, QueryResult.t()} | {:error, any()}
def execute_alert_query(%AlertQuery{user: %User{}} = alert_query, opts \\ []) do
Logger.debug("Executing AlertQuery | #{alert_query.name} | #{alert_query.id}")

endpoints = Endpoints.list_endpoints_by(user_id: alert_query.user_id)
use_query_cache = Keyword.get(opts, :use_query_cache, true)

alerts =
list_alert_queries_by_user_id(alert_query.user_id)
|> Enum.filter(&(&1.id != alert_query.id))

with {:ok, expanded_query} <-
Logflare.Sql.expand_subqueries(
alert_query.language,
alert_query.query,
endpoints ++ alerts
),
{:ok, transformed_query} <-
Logflare.Sql.transform(alert_query.language, expanded_query, alert_query.user_id),
{:ok, result} <-
BigQueryAdaptor.execute_query(
{
alert_query.user.bigquery_project_id || GCPConfig.default_project_id(),
alert_query.user.bigquery_dataset_id,
alert_query.user.id
},
{transformed_query, []},
parameterMode: "NAMED",
maxResults: 1000,
location: alert_query.user.bigquery_dataset_location,
use_query_cache: use_query_cache,
labels: %{
"alert_id" => alert_query.id
},
query_type: :alerts
) do
{:ok, result}
else
{:error, %Tesla.Env{body: body}} ->
decoded = Jason.decode!(body)["error"]

error =
decoded
|> GenUtils.process_bq_errors(alert_query.user_id)
|> case do
%{"message" => msg} -> msg
other -> other
end

Logger.error("Alert query execution failed with bad response",
user_id: alert_query.user_id,
alert_query_id: alert_query.id,
alert_name: alert_query.name,
possible_reservation_error: BigQueryAdaptor.reservation_error?(decoded),
error_string: inspect(error)
)

{:error, error}

{:error, error} ->
Logger.error("Alert query execution failed with an unknown error",
user_id: alert_query.user_id,
alert_query_id: alert_query.id,
alert_name: alert_query.name,
error_string: inspect(error)
)

{:error, error}
end
LoggerMetadata.with_metadata(alert_query_logger_metadata(alert_query), fn ->

@msmithstubbs msmithstubbs Jun 17, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging now handled by backend adaptor, but note message ("Alert query execution failed with bad response") will have changed.

Logger.debug("Executing AlertQuery | #{alert_query.name} | #{alert_query.id}")

endpoints = Endpoints.list_endpoints_by(user_id: alert_query.user_id)
use_query_cache = Keyword.get(opts, :use_query_cache, true)

alerts =
list_alert_queries_by_user_id(alert_query.user_id)
|> Enum.filter(&(&1.id != alert_query.id))

with {:ok, expanded_query} <-
Logflare.Sql.expand_subqueries(
alert_query.language,
alert_query.query,
endpoints ++ alerts
),
{:ok, transformed_query} <-
Logflare.Sql.transform(alert_query.language, expanded_query, alert_query.user_id),
{:ok, result} <-
BigQueryAdaptor.execute_query(
{
alert_query.user.bigquery_project_id || GCPConfig.default_project_id(),
alert_query.user.bigquery_dataset_id,
alert_query.user.id
},
{transformed_query, []},
parameterMode: "NAMED",
maxResults: 1000,
location: alert_query.user.bigquery_dataset_location,
use_query_cache: use_query_cache,
labels: %{
"alert_id" => alert_query.id
},
query_type: :alerts
) do
{:ok, result}
end
end)
end

defp alert_query_logger_metadata(%AlertQuery{} = alert_query) do
[
user_id: alert_query.user_id,
alert_query_id: alert_query.id,
alert_name: alert_query.name
]
end
end
92 changes: 78 additions & 14 deletions lib/logflare/backends/adaptor/bigquery_adaptor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do
alias Logflare.Backends.Ecto.SqlUtils
alias Logflare.Backends.IngestEventQueue
alias Logflare.Backends.Adaptor.QueryResult
alias Logflare.Backends.QueryError
alias Logflare.BigQuery.SchemaTypes
alias Logflare.Billing
alias Logflare.BqRepo
Expand Down Expand Up @@ -685,7 +686,7 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do
query_opts :: Keyword.t()
) ::
{:ok, QueryResult.t()}
| {:error, any()}
| {:error, QueryError.t()}
defp execute_user_query(%User{} = user, project_id, query_string, bq_params, query_opts)
when is_non_empty_binary(query_string) and is_list(bq_params) and is_list(query_opts) do
case BqRepo.query_with_sql_and_params(
Expand All @@ -704,20 +705,77 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do
bq_params: bq_params
})}

{:error, %{body: body}} ->
decoded = Jason.decode!(body)["error"]
error = GenUtils.process_bq_errors(decoded, user.id)
maybe_warn_reservation_error(decoded, user, project_id, query_opts)
{:error, error}
{:error, error} ->
query_error =
error
|> to_query_error(user.id)
|> QueryError.log(
user_id: user.id,
bigquery_project_id: project_id
)

maybe_warn_reservation_error(query_error, user, project_id, query_opts)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be logged already by QueryError.log - do we need the specific wording of maybe_warn_reservation_error ? If not, we can remove it.


{:error, query_error}
end
end

{:error, err} when is_atom(err) ->
{:error, GenUtils.process_bq_errors(err, user.id)}
@spec to_query_error(Tesla.Env.t() | GenUtils.transport_error(), pos_integer()) ::
QueryError.t()
defp to_query_error(error, _user_id) when error in [:timeout, :closed, :emfile] do
%QueryError{
kind: :connection_error,
raw_error: error,
backend: __MODULE__
}
end

{:error, err} ->
{:error, err}
defp to_query_error(%{body: body}, user_id) when is_binary(body) do
with {:ok, %{"error" => raw_error}} <- Jason.decode(body),
%{"message" => _message} = processed_error <-
GenUtils.process_bq_errors(raw_error, user_id) do
processed_error
|> query_error_kind()
|> query_error(processed_error)
else
_error -> query_error(:backend_error, body)
end
end

defp to_query_error(%{body: body}, _user_id) do
query_error(:backend_error, body)
end

@spec query_error_kind(map()) :: QueryError.kind()
defp query_error_kind(%{"reason" => "billingTierLimitExceeded"}), do: :backend_error
defp query_error_kind(%{"reason" => "invalidQuery"}), do: :invalid_query

defp query_error_kind(%{"errors" => errors}) when is_list(errors) do
if Enum.any?(errors, &match?(%{"reason" => "invalidQuery"}, &1)) do
:invalid_query
else
:backend_error
end
end

defp query_error_kind(processed_error) do
with %{"message" => message} when is_binary(message) <- processed_error,
true <- String.starts_with?(message, ["Unrecognized name:", "Field name"]) do
:invalid_query
else
_ -> :backend_error
end
end

@spec query_error(QueryError.kind(), term()) :: QueryError.t()
defp query_error(kind, raw_error) do
%QueryError{
kind: kind,
raw_error: raw_error,
backend: __MODULE__
}
end

@spec pg_sql_to_bq_sql(sql :: String.t()) :: String.t()
defp pg_sql_to_bq_sql(sql) when is_non_empty_binary(sql) do
sql
Expand All @@ -741,19 +799,25 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do
end

@spec maybe_warn_reservation_error(
decoded :: any(),
error :: QueryError.t(),
user :: User.t(),
project_id :: String.t(),
query_opts :: Keyword.t()
) :: :ok
defp maybe_warn_reservation_error(decoded, %User{} = user, project_id, query_opts) do
if reservation_error?(decoded) and not caller_logs_own_errors?(query_opts) do
defp maybe_warn_reservation_error(
%QueryError{raw_error: error},
%User{} = user,
project_id,
query_opts
) do
with true <- reservation_error?(error),
false <- caller_logs_own_errors?(query_opts) do
Logger.warning("Possible BigQuery reservation error",
user_id: user.id,
project_id: project_id,
reservation: Keyword.get(query_opts, :reservation),
query_type: Keyword.get(query_opts, :query_type),
bq_error_message: decoded["message"]
bq_error_message: error["message"]
)
end

Expand Down
71 changes: 50 additions & 21 deletions lib/logflare/backends/adaptor/clickhouse_adaptor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ defmodule Logflare.Backends.Adaptor.ClickHouseAdaptor do
alias Logflare.Backends.Ecto.SqlUtils
alias Logflare.Backends.IngestEventQueue
alias Logflare.Backends.Adaptor.QueryResult
alias Logflare.Backends.QueryError
alias Logflare.LogEvent
alias Logflare.LogEvent.TypeDetection

Expand Down Expand Up @@ -323,7 +324,7 @@ defmodule Logflare.Backends.Adaptor.ClickHouseAdaptor do
statement :: iodata(),
params :: map | [term] | [row :: [term]] | iodata | Enumerable.t(),
[Ch.query_option()]
) :: {:ok, Ch.Result.t()} | {:error, Exception.t()}
) :: {:ok, Ch.Result.t()} | {:error, QueryError.t()}
def execute_ch_query(backend, statement, params \\ [], opts \\ [])

def execute_ch_query(%Backend{} = backend, statement, params, opts)
Expand All @@ -338,28 +339,56 @@ defmodule Logflare.Backends.Adaptor.ClickHouseAdaptor do
{:ok, %Ch.Result{} = result} ->
{:ok, decode_ch_result(result)}

{:error, %Ch.Error{message: error_msg}} when is_non_empty_binary(error_msg) ->
Logger.warning(
"ClickHouse query failed: #{inspect(error_msg)}",
backend_id: backend.id
)
{:error, error} ->
{:error,
error
|> to_query_error()
|> QueryError.log(
user_id: backend.user_id,
backend_id: backend.id,
backend_token: backend.token
)}
end
end
end

{:error, "Error executing ClickHouse query"}
@spec to_query_error(term()) :: QueryError.t()
defp to_query_error(%Ch.Error{} = error) do
error
|> ch_query_error_kind()
|> query_error(error)
end

{:error, %{message: message}} when is_non_empty_binary(message) ->
Logger.warning(
"ClickHouse query failed: #{inspect(message)}",
backend_id: backend.id
)
defp to_query_error(%DBConnection.ConnectionError{} = error) do
query_error(:connection_error, error)
end

{:error, "Error executing ClickHouse query"}
defp to_query_error(error) do
query_error(:backend_error, error)
end

{:error, _} ->
{:error, "Error executing ClickHouse query"}
end
@spec ch_query_error_kind(term()) :: QueryError.kind()
defp ch_query_error_kind(%Ch.Error{code: code}) when code in [47, 62], do: :invalid_query

defp ch_query_error_kind(%Ch.Error{message: message}) when is_binary(message) do
if message =~ "UNKNOWN_IDENTIFIER" or message =~ "SYNTAX_ERROR" do
:invalid_query
else
:backend_error
end
end

defp ch_query_error_kind(%Ch.Error{}), do: :backend_error

@spec query_error(QueryError.kind(), term()) :: QueryError.t()
defp query_error(kind, raw_error) do
%QueryError{
kind: kind,
raw_error: raw_error,
backend: __MODULE__
}
end

@spec execute_direct_query(url :: String.t(), config :: map(), statement :: String.t()) ::
{:ok, list()} | {:error, term()}
defp execute_direct_query(url, config, statement) do
Expand All @@ -385,15 +414,15 @@ defmodule Logflare.Backends.Adaptor.ClickHouseAdaptor do
{:ok, %Ch.Result{} = result} ->
{:ok, decode_ch_result(result)}

{:error, _} ->
{:error, "Error executing ClickHouse query"}
{:error, error} ->
{:error, to_query_error(error)}
end
after
GenServer.stop(pid)
end

{:error, _} ->
{:error, "Error executing ClickHouse query"}
{:error, error} ->
{:error, to_query_error(error)}
end
end

Expand Down Expand Up @@ -472,7 +501,7 @@ defmodule Logflare.Backends.Adaptor.ClickHouseAdaptor do

Creates one table per log type: `_logs`, `_metrics`, and `_traces`.
"""
@spec provision_ingest_tables(Backend.t()) :: :ok | {:error, Exception.t()}
@spec provision_ingest_tables(Backend.t()) :: :ok | {:error, QueryError.t()}
def provision_ingest_tables(%Backend{config: config} = backend) do
cloud? = clickhouse_cloud?(backend)

Expand Down
Loading
Loading