From 746ca04c4a95f1d452ff7b199e12a7d7b7c18a64 Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Tue, 17 Jun 2025 12:45:34 +0800 Subject: [PATCH 01/13] feat: adding in iam management functions --- config/runtime.exs | 7 +++ .../backends/adaptor/bigquery_adaptor.ex | 63 +++++++++++++++++++ lib/logflare/google/resource_manager.ex | 2 +- mix.exs | 1 + mix.lock | 1 + 5 files changed, 73 insertions(+), 1 deletion(-) diff --git a/config/runtime.exs b/config/runtime.exs index 0417c5be02..6980809e65 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -44,6 +44,13 @@ config :logflare, ] |> filter_nil_kv_pairs.() +config :logflare, :bigquery_backend_adaptor, +[ managed_service_account_pool_size: + System.get_env("LOGFLARE_BIGQUERY_MANAGED_SERVICE_ACCOUNT_POOL_SIZE", "3") + |> String.to_integer()] +|> filter_nil_kv_pairs.() + + config :logflare, Logflare.Alerting, [ diff --git a/lib/logflare/backends/adaptor/bigquery_adaptor.ex b/lib/logflare/backends/adaptor/bigquery_adaptor.ex index ad7fc67666..627d5ede51 100644 --- a/lib/logflare/backends/adaptor/bigquery_adaptor.ex +++ b/lib/logflare/backends/adaptor/bigquery_adaptor.ex @@ -12,10 +12,12 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do alias Logflare.Sources alias Logflare.Billing alias Logflare.Backends + alias Logflare.Google.BigQuery.GenUtils use Supervisor require Logger @behaviour Logflare.Backends.Adaptor + @service_account_prefix "logflare_managed" @impl Logflare.Backends.Adaptor def start_link({source, backend} = source_backend) do @@ -135,4 +137,65 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do @impl Logflare.Backends.Adaptor def validate_config(changeset), do: changeset + + @spec managed_service_account_name(String.t(), non_neg_integer()) :: String.t() + def managed_service_account_name(project_id, service_account_index \\ 0) do + "#{@service_account_prefix}_#{service_account_index}@#{project_id}.iam.gserviceaccount.com" + end + + + @doc """ + Lists all managed service accounts + """ + @spec list_managed_service_accounts(String.t()) :: [GoogleApi.IAM.V1.Model.ServiceAccount.t()] + def list_managed_service_accounts(project_id) do + get_next_page(project_id, nil) + |> Enum.filter(&(&1.name =~ @service_account_prefix)) + end + + defp handle_response({:ok, response}) do + case response do + {:ok, %{accounts: accounts, next_page_token: nil}} -> + accounts + + {:ok, %{accounts: accounts, next_page_token: next_page_token}} -> + get_next_page(project_id, next_page_token) ++ accounts + end + |> List.flatten() + end + + defp handle_response({:error, error}) do + Logger.error("Error listing managed service accounts: #{inspect(error)}") + [] + end + + defp get_next_page(project_id, page_token) do + GenUtils.get_conn(:default) + |> GoogleApi.IAM.V1.Api.Projects.iam_projects_service_accounts_list(project_id, + page_size: 100, + page_token: page_token + ) + |> handle_response() + end + + def create_managed_service_accounts(project_id) do + # determine the ids of of service accounts to create, based on what service accounts already exist + size = Application.get_env(:logflare, :bigquery_backend_adaptor)[:managed_service_account_pool_size] + existing = list_managed_service_accounts(project_id) |> Enum.map(& &1.name) + indexes = for i <- 0..(size - 1), managed_service_account_name(project_id, i) not in existing, do: i + + for i <- indexes do + create_managed_service_account(project_id, i) + end + end + + defp create_managed_service_account(project_id, service_account_index) do + GenUtils.get_conn(:default) + |> GoogleApi.IAM.V1.Api.Projects.iam_projects_service_accounts_create(project_id, %{ + account_id: managed_service_account_name(project_id, service_account_index), + service_account_object: %{ + project_id: project_id + } + }) + end end diff --git a/lib/logflare/google/resource_manager.ex b/lib/logflare/google/resource_manager.ex index 710077f824..0747490088 100644 --- a/lib/logflare/google/resource_manager.ex +++ b/lib/logflare/google/resource_manager.ex @@ -87,7 +87,7 @@ defmodule Logflare.Google.CloudResourceManager do defp get_service_accounts() do for {member, roles} <- [ {env_service_account(), - ["roles/bigquery.admin", "roles/resourcemanager.projectIamAdmin"]}, + ["roles/bigquery.admin", "roles/resourcemanager.projectIamAdmin", "roles/iam.serviceAccountCreator", "roles/resourcemanager.projectIamAdmin", "roles/iam.serviceAccountTokenCreator"]}, {env_compute_engine_sa(), [ "roles/compute.instanceAdmin", diff --git a/mix.exs b/mix.exs index aa43655821..7dda1dd830 100644 --- a/mix.exs +++ b/mix.exs @@ -149,6 +149,7 @@ defmodule Logflare.Mixfile do # GCP {:google_api_cloud_resource_manager, "~> 0.34.0"}, {:google_api_big_query, "~> 0.79.0"}, + {:google_api_iam, "~> 0.45.0"}, {:goth, "~> 1.4.0"}, {:google_gax, github: "Logflare/elixir-google-gax", ref: "6772193", override: true}, diff --git a/mix.lock b/mix.lock index dbe84833f9..57148fc595 100644 --- a/mix.lock +++ b/mix.lock @@ -58,6 +58,7 @@ "global_flags": {:hex, :global_flags, "1.0.0", "ee6b864979a1fb38d1fbc67838565644baf632212bce864adca21042df036433", [:rebar3], [], "hexpm", "85d944cecd0f8f96b20ce70b5b16ebccedfcd25e744376b131e89ce61ba93176"}, "google_api_big_query": {:hex, :google_api_big_query, "0.79.0", "a4800c699932879a39a7098f04bc7a659c2a1745399867f5b688233d38f3fd86", [:mix], [{:google_gax, "~> 0.4", [hex: :google_gax, repo: "hexpm", optional: false]}], "hexpm", "6b4102c5aa74f31310a022066151d2f974b1ed26ee250571beec22b0be9a9cb2"}, "google_api_cloud_resource_manager": {:hex, :google_api_cloud_resource_manager, "0.34.3", "9e0ffa89b895a77a9fac1fa51f92d06f690b32002224281098be500322988680", [:mix], [{:google_gax, "~> 0.4", [hex: :google_gax, repo: "hexpm", optional: false]}], "hexpm", "de1aee529a9ce424ef93b97bdffc50a1b589fb206ee3866bbd23329eb4df43f2"}, + "google_api_iam": {:hex, :google_api_iam, "0.45.0", "08eeddf383bb02f74f02169a4d509479240622491c71c580229a89412f73e4d9", [:mix], [{:google_gax, "~> 0.4", [hex: :google_gax, repo: "hexpm", optional: false]}], "hexpm", "acbce36bab44412c7be4ce1c8fb0283876d5c06ce2b2e938fb927be3e35346e6"}, "google_gax": {:git, "https://github.com/Logflare/elixir-google-gax.git", "6772193c438dd72ac253ea34ab13e9c02842cba8", [ref: "6772193"]}, "goth": {:hex, :goth, "1.4.2", "a598dfbce6fe65db3f5f43b1ab2ce8fbe3b2fe20a7569ad62d71c11c0ddc3f41", [:mix], [{:finch, "~> 0.9", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:jose, "~> 1.11", [hex: :jose, repo: "hexpm", optional: false]}], "hexpm", "d51bb6544dc551fe5754ab72e6cf194120b3c06d924282aaa3321a516ed3b98a"}, "gproc": {:hex, :gproc, "0.9.1", "f1df0364423539cf0b80e8201c8b1839e229e5f9b3ccb944c5834626998f5b8c", [:rebar3], [], "hexpm", "905088e32e72127ed9466f0bac0d8e65704ca5e73ee5a62cb073c3117916d507"}, From a3df6e16f04a9e8ab16f9c696730ebb7635b906c Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Tue, 17 Jun 2025 23:03:09 +0800 Subject: [PATCH 02/13] feat: iam service account provisioning --- config/config.exs | 2 + config/runtime.exs | 14 +- lib/logflare/alerting/alert_query.ex | 2 +- lib/logflare/application.ex | 72 ++------ .../backends/adaptor/bigquery_adaptor.ex | 159 +++++++++++++++--- .../google/bigquery/gen_utils/gen_utils.ex | 38 ++++- lib/logflare/google/resource_manager.ex | 7 +- mix.exs | 2 +- mix.lock | 4 +- 9 files changed, 205 insertions(+), 95 deletions(-) diff --git a/config/config.exs b/config/config.exs index e87a950da5..1199889b59 100644 --- a/config/config.exs +++ b/config/config.exs @@ -25,6 +25,8 @@ config :logflare, Logflare.Google, dataset_id_append: "_default" config :logflare, :postgres_backend_adapter, pool_size: 3 config :logflare, :clickhouse_backend_adapter, pool_size: 3 +config :logflare, :bigquery_backend_adaptor, managed_service_account_pool_size: 0 + config :logflare, Logflare.Source.BigQuery.Schema, updates_per_minute: 6 # Configures the endpoint diff --git a/config/runtime.exs b/config/runtime.exs index 6980809e65..67d33c4cd9 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -44,12 +44,14 @@ config :logflare, ] |> filter_nil_kv_pairs.() -config :logflare, :bigquery_backend_adaptor, -[ managed_service_account_pool_size: - System.get_env("LOGFLARE_BIGQUERY_MANAGED_SERVICE_ACCOUNT_POOL_SIZE", "3") - |> String.to_integer()] -|> filter_nil_kv_pairs.() - +config :logflare, + :bigquery_backend_adaptor, + [ + managed_service_account_pool_size: + System.get_env("LOGFLARE_BIGQUERY_MANAGED_SERVICE_ACCOUNT_POOL_SIZE", "0") + |> String.to_integer() + ] + |> filter_nil_kv_pairs.() config :logflare, Logflare.Alerting, diff --git a/lib/logflare/alerting/alert_query.ex b/lib/logflare/alerting/alert_query.ex index 08aa9650df..06b917975d 100644 --- a/lib/logflare/alerting/alert_query.ex +++ b/lib/logflare/alerting/alert_query.ex @@ -30,7 +30,7 @@ defmodule Logflare.Alerting.AlertQuery do many_to_many :backends, Logflare.Backends.Backend, join_through: "alert_queries_backends", - on_replace: :delete + _ on_replace: :delete timestamps() end diff --git a/lib/logflare/application.ex b/lib/logflare/application.ex index 86a5ca8506..621970b492 100644 --- a/lib/logflare/application.ex +++ b/lib/logflare/application.ex @@ -3,6 +3,7 @@ defmodule Logflare.Application do use Application require Logger + alias Logflare.Backends.Adaptor.BigQueryAdaptor alias Logflare.ContextCache alias Logflare.Logs alias Logflare.SingleTenant @@ -123,58 +124,15 @@ defmodule Logflare.Application do ] end + def goth_partition_count, do: 5 + def conditional_children do goth = - case Application.get_env(:goth, :json) do - nil -> - [] - - json -> - # Setup Goth for GCP connections - credentials = Jason.decode!(json) - scopes = ["https://www.googleapis.com/auth/cloud-platform"] - source = {:service_account, credentials, scopes: scopes} - - spec = - { - Goth, - # https://hexdocs.pm/goth/Goth.html#fetch/2 - # refresh 15 min before - # don't start server until fetch is made - # cap retries at 10s, warn when >5 - name: Logflare.Goth, - source: source, - refresh_before: 60 * 15, - prefetch: :sync, - http_client: &goth_finch_http_client/1, - retry_delay: fn - n when n < 3 -> - 1000 - - n when n < 5 -> - Logger.warning("Goth refresh retry count is #{n}") - 1000 * 3 - - n when n < 10 -> - Logger.warning("Goth refresh retry count is #{n}") - 1000 * 5 + [ + BigQueryAdaptor.partitioned_goth_child_spec() + ] ++ BigQueryAdaptor.impersonated_goth_child_specs() - n -> - Logger.warning("Goth refresh retry count is #{n}") - 1000 * 10 - end - } - - # Partition Goth - [ - {PartitionSupervisor, - child_spec: spec, - name: Logflare.GothPartitionSup, - with_arguments: fn [opts], partition -> - [Keyword.put(opts, :name, {Logflare.Goth, partition})] - end} - ] - end + dbg(goth) # only add in config cat to multi-tenant prod config_cat = @@ -191,18 +149,6 @@ defmodule Logflare.Application do :ok end - # tell goth to use our finch pool - # https://github.com/peburrows/goth/blob/master/lib/goth/token.ex#L144 - defp goth_finch_http_client(options) do - {method, options} = Keyword.pop!(options, :method) - {url, options} = Keyword.pop!(options, :url) - {headers, options} = Keyword.pop!(options, :headers) - {body, options} = Keyword.pop!(options, :body) - - Finch.build(method, url, headers, body) - |> Finch.request(Logflare.FinchGoth, options) - end - defp finch_pools do base = System.schedulers_online() min_count = max(5, ceil(base / 10)) @@ -278,6 +224,10 @@ defmodule Logflare.Application do # if single tenant, insert enterprise user Logger.info("Executing startup tasks") + if !SingleTenant.postgres_backend?() do + BigQueryAdaptor.create_managed_service_accounts() + end + if SingleTenant.single_tenant?() do Logger.info("Ensuring single tenant user is seeded...") SingleTenant.create_default_plan() diff --git a/lib/logflare/backends/adaptor/bigquery_adaptor.ex b/lib/logflare/backends/adaptor/bigquery_adaptor.ex index 627d5ede51..4377c8b97b 100644 --- a/lib/logflare/backends/adaptor/bigquery_adaptor.ex +++ b/lib/logflare/backends/adaptor/bigquery_adaptor.ex @@ -17,8 +17,8 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do require Logger @behaviour Logflare.Backends.Adaptor - @service_account_prefix "logflare_managed" - + @service_account_prefix "logflare-managed" + @managed_service_account_partition_count 5 @impl Logflare.Backends.Adaptor def start_link({source, backend} = source_backend) do Supervisor.start_link(__MODULE__, source_backend, @@ -140,9 +140,13 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do @spec managed_service_account_name(String.t(), non_neg_integer()) :: String.t() def managed_service_account_name(project_id, service_account_index \\ 0) do - "#{@service_account_prefix}_#{service_account_index}@#{project_id}.iam.gserviceaccount.com" + "#{@service_account_prefix}-#{service_account_index}@#{project_id}.iam.gserviceaccount.com" end + @spec managed_service_account_id(String.t(), non_neg_integer()) :: String.t() + def managed_service_account_id(project_id, service_account_index \\ 0) do + "#{@service_account_prefix}-#{service_account_index}" + end @doc """ Lists all managed service accounts @@ -153,36 +157,47 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do |> Enum.filter(&(&1.name =~ @service_account_prefix)) end - defp handle_response({:ok, response}) do + defp handle_response({:ok, response}, project_id) do + dbg(response) + case response do - {:ok, %{accounts: accounts, next_page_token: nil}} -> + %{accounts: accounts, nextPageToken: nil} -> accounts - {:ok, %{accounts: accounts, next_page_token: next_page_token}} -> + %{accounts: accounts, nextPageToken: next_page_token} -> get_next_page(project_id, next_page_token) ++ accounts end |> List.flatten() end - defp handle_response({:error, error}) do + defp handle_response({:error, error}, project_id) do Logger.error("Error listing managed service accounts: #{inspect(error)}") [] end defp get_next_page(project_id, page_token) do GenUtils.get_conn(:default) - |> GoogleApi.IAM.V1.Api.Projects.iam_projects_service_accounts_list(project_id, - page_size: 100, - page_token: page_token + |> GoogleApi.IAM.V1.Api.Projects.iam_projects_service_accounts_list("projects/#{project_id}", + pageSize: 100, + pageToken: page_token ) - |> handle_response() + |> dbg() + |> handle_response(project_id) end - def create_managed_service_accounts(project_id) do + def create_managed_service_accounts(project_id \\ nil) do + project_id = project_id || Application.get_env(:logflare, Logflare.Google)[:project_id] + # determine the ids of of service accounts to create, based on what service accounts already exist - size = Application.get_env(:logflare, :bigquery_backend_adaptor)[:managed_service_account_pool_size] - existing = list_managed_service_accounts(project_id) |> Enum.map(& &1.name) - indexes = for i <- 0..(size - 1), managed_service_account_name(project_id, i) not in existing, do: i + size = + Application.get_env(:logflare, :bigquery_backend_adaptor)[ + :managed_service_account_pool_size + ] + + existing = list_managed_service_accounts(project_id) |> Enum.map(& &1.email) + + indexes = + for i <- 0..(size - 1), managed_service_account_name(project_id, i) not in existing, do: i for i <- indexes do create_managed_service_account(project_id, i) @@ -191,11 +206,115 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do defp create_managed_service_account(project_id, service_account_index) do GenUtils.get_conn(:default) - |> GoogleApi.IAM.V1.Api.Projects.iam_projects_service_accounts_create(project_id, %{ - account_id: managed_service_account_name(project_id, service_account_index), - service_account_object: %{ - project_id: project_id + |> GoogleApi.IAM.V1.Api.Projects.iam_projects_service_accounts_create( + "projects/#{project_id}", + body: %GoogleApi.IAM.V1.Model.CreateServiceAccountRequest{ + accountId: managed_service_account_id(project_id, service_account_index) } - }) + ) + |> dbg() + end + + def managed_service_account_pool_size do + Application.get_env(:logflare, :bigquery_backend_adaptor)[:managed_service_account_pool_size] + end + + def managed_service_account_partition_count, do: @managed_service_account_partition_count + + def ingest_service_account_partition_count, + do: max(@managed_service_account_partition_count, System.schedulers_online()) + + # Goth provisioning + def partitioned_goth_child_spec() do + if json = Application.get_env(:goth, :json) do + {PartitionSupervisor, + child_spec: goth_child_spec(json), + name: Logflare.GothPartitionSup, + with_arguments: fn [opts], partition -> + [Keyword.put(opts, :name, {Logflare.Goth, partition})] + end} + end + end + + def goth_child_spec(nil), do: nil + + def goth_child_spec(json, sub \\ nil) do + credentials = Jason.decode!(json) + scopes = ["https://www.googleapis.com/auth/cloud-platform"] + + source = + if sub, + do: + {:service_account, credentials, + [ + url: + "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/#{sub}:signJwt", + claims: %{"sub" => sub, "scope" => "https://www.googleapis.com/auth/cloud-platform"} + ]}, + else: {:service_account, credentials, scopes: scopes} + + # source = {:service_account, credentials, scopes: scopes} + { + Goth, + # https://hexdocs.pm/goth/Goth.html#fetch/2 + # refresh 15 min before + # don't start server until fetch is made + # cap retries at 10s, warn when >5 + name: Logflare.Goth, + source: source, + refresh_before: 60 * 15, + prefetch: :sync, + http_client: &goth_finch_http_client/1, + retry_delay: fn + n when n < 3 -> + 1000 + + n when n < 5 -> + Logger.warning("Goth refresh retry count is #{n}") + 1000 * 3 + + n when n < 10 -> + Logger.warning("Goth refresh retry count is #{n}") + 1000 * 5 + + n -> + Logger.warning("Goth refresh retry count is #{n}") + 1000 * 10 + end + } + end + + def impersonated_goth_child_specs() do + project_id = Application.get_env(:logflare, Logflare.Google)[:project_id] + pool_size = managed_service_account_pool_size() + json = Application.get_env(:goth, :json) + + if json != nil and pool_size > 0 do + for i <- 0..(pool_size - 1) do + spec = goth_child_spec(json, managed_service_account_name(project_id, i)) + + {PartitionSupervisor, + child_spec: spec, + partitions: @managed_service_account_partition_count, + name: String.to_atom("Logflare.GothPartitionSup_#{i}"), + with_arguments: fn [opts], partition -> + [Keyword.put(opts, :name, {Logflare.GothQuery, i, partition})] + end} + end + else + [] + end + end + + # tell goth to use our finch pool + # https://github.com/peburrows/goth/blob/master/lib/goth/token.ex#L144 + defp goth_finch_http_client(options) do + {method, options} = Keyword.pop!(options, :method) + {url, options} = Keyword.pop!(options, :url) + {headers, options} = Keyword.pop!(options, :headers) + {body, options} = Keyword.pop!(options, :body) + + Finch.build(method, url, headers, body) + |> Finch.request(Logflare.FinchGoth, options) end end diff --git a/lib/logflare/google/bigquery/gen_utils/gen_utils.ex b/lib/logflare/google/bigquery/gen_utils/gen_utils.ex index 1668bb3a79..7508ba8b78 100644 --- a/lib/logflare/google/bigquery/gen_utils/gen_utils.ex +++ b/lib/logflare/google/bigquery/gen_utils/gen_utils.ex @@ -8,6 +8,7 @@ defmodule Logflare.Google.BigQuery.GenUtils do alias Logflare.{Sources, Users} alias Logflare.{Source, User} alias GoogleApi.BigQuery.V2.Connection + alias Logflare.Backends.Adaptor.BigQueryAdaptor @table_ttl 604_800_000 @default_dataset_location "US" @@ -81,13 +82,44 @@ defmodule Logflare.Google.BigQuery.GenUtils do @spec get_conn(conn_type()) :: Tesla.Env.client() def get_conn(conn_type \\ :default) do # use pid as the partition hash - partition_count = System.schedulers_online() + partition_count = + if conn_type == :query do + BigQueryAdaptor.managed_service_account_partition_count() + else + BigQueryAdaptor.ingest_service_account_partition_count() + end + partition = :erlang.phash2(self(), partition_count) - metadata = %{partition: partition} + {name, metadata} = + if conn_type == :query do + service_account_count = BigQueryAdaptor.managed_service_account_pool_size() + sa_index = :erlang.phash2(self(), service_account_count) + + {{ + Logflare.GothQuery, + sa_index, + partition + }, + %{ + service_account_count: service_account_count, + sa_index: sa_index, + partition: partition + }} + else + {{ + Logflare.Goth, + partition + }, + %{ + partition: partition + }} + end + + dbg(name) :telemetry.span([:logflare, :goth, :fetch], metadata, fn -> - result = Goth.fetch({Logflare.Goth, partition}) + result = Goth.fetch(name) {result, metadata} end) |> case do diff --git a/lib/logflare/google/resource_manager.ex b/lib/logflare/google/resource_manager.ex index 0747490088..79bae42f73 100644 --- a/lib/logflare/google/resource_manager.ex +++ b/lib/logflare/google/resource_manager.ex @@ -87,7 +87,12 @@ defmodule Logflare.Google.CloudResourceManager do defp get_service_accounts() do for {member, roles} <- [ {env_service_account(), - ["roles/bigquery.admin", "roles/resourcemanager.projectIamAdmin", "roles/iam.serviceAccountCreator", "roles/resourcemanager.projectIamAdmin", "roles/iam.serviceAccountTokenCreator"]}, + [ + "roles/bigquery.admin", + "roles/resourcemanager.projectIamAdmin", + "roles/iam.serviceAccountCreator", + "roles/iam.serviceAccountTokenCreator" + ]}, {env_compute_engine_sa(), [ "roles/compute.instanceAdmin", diff --git a/mix.exs b/mix.exs index 7dda1dd830..bc32fd6259 100644 --- a/mix.exs +++ b/mix.exs @@ -150,7 +150,7 @@ defmodule Logflare.Mixfile do {:google_api_cloud_resource_manager, "~> 0.34.0"}, {:google_api_big_query, "~> 0.79.0"}, {:google_api_iam, "~> 0.45.0"}, - {:goth, "~> 1.4.0"}, + {:goth, "~> 1.4.5"}, {:google_gax, github: "Logflare/elixir-google-gax", ref: "6772193", override: true}, # Ecto diff --git a/mix.lock b/mix.lock index 57148fc595..09c7b12476 100644 --- a/mix.lock +++ b/mix.lock @@ -60,7 +60,7 @@ "google_api_cloud_resource_manager": {:hex, :google_api_cloud_resource_manager, "0.34.3", "9e0ffa89b895a77a9fac1fa51f92d06f690b32002224281098be500322988680", [:mix], [{:google_gax, "~> 0.4", [hex: :google_gax, repo: "hexpm", optional: false]}], "hexpm", "de1aee529a9ce424ef93b97bdffc50a1b589fb206ee3866bbd23329eb4df43f2"}, "google_api_iam": {:hex, :google_api_iam, "0.45.0", "08eeddf383bb02f74f02169a4d509479240622491c71c580229a89412f73e4d9", [:mix], [{:google_gax, "~> 0.4", [hex: :google_gax, repo: "hexpm", optional: false]}], "hexpm", "acbce36bab44412c7be4ce1c8fb0283876d5c06ce2b2e938fb927be3e35346e6"}, "google_gax": {:git, "https://github.com/Logflare/elixir-google-gax.git", "6772193c438dd72ac253ea34ab13e9c02842cba8", [ref: "6772193"]}, - "goth": {:hex, :goth, "1.4.2", "a598dfbce6fe65db3f5f43b1ab2ce8fbe3b2fe20a7569ad62d71c11c0ddc3f41", [:mix], [{:finch, "~> 0.9", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:jose, "~> 1.11", [hex: :jose, repo: "hexpm", optional: false]}], "hexpm", "d51bb6544dc551fe5754ab72e6cf194120b3c06d924282aaa3321a516ed3b98a"}, + "goth": {:hex, :goth, "1.4.5", "ee37f96e3519bdecd603f20e7f10c758287088b6d77c0147cd5ee68cf224aade", [:mix], [{:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:jose, "~> 1.11", [hex: :jose, repo: "hexpm", optional: false]}], "hexpm", "0fc2dce5bd710651ed179053d0300ce3a5d36afbdde11e500d57f05f398d5ed5"}, "gproc": {:hex, :gproc, "0.9.1", "f1df0364423539cf0b80e8201c8b1839e229e5f9b3ccb944c5834626998f5b8c", [:rebar3], [], "hexpm", "905088e32e72127ed9466f0bac0d8e65704ca5e73ee5a62cb073c3117916d507"}, "grpc": {:hex, :grpc, "0.9.0", "1b930a57272d4356ea65969b984c2eb04f3dab81420e1e28f0e6ec04b8f88515", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:cowboy, "~> 2.10", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowlib, "~> 2.12", [hex: :cowlib, repo: "hexpm", optional: false]}, {:gun, "~> 2.0", [hex: :gun, repo: "hexpm", optional: false]}, {:jason, ">= 0.0.0", [hex: :jason, repo: "hexpm", optional: true]}, {:mint, "~> 1.5", [hex: :mint, repo: "hexpm", optional: false]}, {:protobuf, "~> 0.11", [hex: :protobuf, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7c059698248738fcf7ad551f1d78f4a3d2e0642a72a5834f2a0b0db4b9f3d2b5"}, "grpcbox": {:hex, :grpcbox, "0.17.1", "6e040ab3ef16fe699ffb513b0ef8e2e896da7b18931a1ef817143037c454bcce", [:rebar3], [{:acceptor_pool, "~> 1.0.0", [hex: :acceptor_pool, repo: "hexpm", optional: false]}, {:chatterbox, "~> 0.15.1", [hex: :ts_chatterbox, repo: "hexpm", optional: false]}, {:ctx, "~> 0.6.0", [hex: :ctx, repo: "hexpm", optional: false]}, {:gproc, "~> 0.9.1", [hex: :gproc, repo: "hexpm", optional: false]}], "hexpm", "4a3b5d7111daabc569dc9cbd9b202a3237d81c80bf97212fbc676832cb0ceb17"}, @@ -76,7 +76,7 @@ "iteraptor": {:hex, :iteraptor, "1.14.0", "a6a23ec9ac1c25f3065138fd87f7f739f9b5a7e08fe915cfefcd155105445167", [:mix], [], "hexpm", "88d7a8bb7829a0faa8f99e15b12a8082403f07236d0911e55ad9245b306fd558"}, "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, "joken": {:hex, :joken, "2.6.0", "b9dd9b6d52e3e6fcb6c65e151ad38bf4bc286382b5b6f97079c47ade6b1bcc6a", [:mix], [{:jose, "~> 1.11.5", [hex: :jose, repo: "hexpm", optional: false]}], "hexpm", "5a95b05a71cd0b54abd35378aeb1d487a23a52c324fa7efdffc512b655b5aaa7"}, - "jose": {:hex, :jose, "1.11.6", "613fda82552128aa6fb804682e3a616f4bc15565a048dabd05b1ebd5827ed965", [:mix, :rebar3], [], "hexpm", "6275cb75504f9c1e60eeacb771adfeee4905a9e182103aa59b53fed651ff9738"}, + "jose": {:hex, :jose, "1.11.10", "a903f5227417bd2a08c8a00a0cbcc458118be84480955e8d251297a425723f83", [:mix, :rebar3], [], "hexpm", "0d6cd36ff8ba174db29148fc112b5842186b68a90ce9fc2b3ec3afe76593e614"}, "jumper": {:hex, :jumper, "1.0.2", "68cdcd84472a00ac596b4e6459a41b3062d4427cbd4f1e8c8793c5b54f1406a7", [:mix], [], "hexpm", "9b7782409021e01ab3c08270e26f36eb62976a38c1aa64b2eaf6348422f165e1"}, "key_tools": {:hex, :key_tools, "0.4.1", "4bdf5a39190dc465e58f0c44784b7bb5300bafbbfff2b4ada4d7ec3bfde8d470", [:mix], [], "hexpm", "1a5afce636176481acec2db91066e68af5bf3c512327292a14078ca1aad1a57e"}, "libcluster": {:hex, :libcluster, "3.3.3", "a4f17721a19004cfc4467268e17cff8b1f951befe428975dd4f6f7b84d927fe0", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "7c0a2275a0bb83c07acd17dab3c3bfb4897b145106750eeccc62d302e3bdfee5"}, From 843b9e1918955ad19f2880d1ec80d91fb61e8fe1 Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Wed, 18 Jun 2025 11:38:58 +0800 Subject: [PATCH 03/13] chore: comment out signJwt url --- lib/logflare/backends/adaptor/bigquery_adaptor.ex | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/logflare/backends/adaptor/bigquery_adaptor.ex b/lib/logflare/backends/adaptor/bigquery_adaptor.ex index 4377c8b97b..79561e7b7b 100644 --- a/lib/logflare/backends/adaptor/bigquery_adaptor.ex +++ b/lib/logflare/backends/adaptor/bigquery_adaptor.ex @@ -247,13 +247,12 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do do: {:service_account, credentials, [ - url: - "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/#{sub}:signJwt", + # url: + # "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/#{sub}:signJwt", claims: %{"sub" => sub, "scope" => "https://www.googleapis.com/auth/cloud-platform"} ]}, else: {:service_account, credentials, scopes: scopes} - # source = {:service_account, credentials, scopes: scopes} { Goth, # https://hexdocs.pm/goth/Goth.html#fetch/2 From 19b6e53396c36c95175904764cefe8ec0040cc08 Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Fri, 20 Jun 2025 17:08:43 +0800 Subject: [PATCH 04/13] feat: working service account impersonation --- lib/logflare/alerting/alert_query.ex | 2 +- lib/logflare/application.ex | 2 - .../backends/adaptor/bigquery_adaptor.ex | 12 +-- .../google/bigquery/gen_utils/gen_utils.ex | 1 - lib/logflare/google/resource_manager.ex | 79 ++++++++++--------- mix.exs | 2 +- 6 files changed, 49 insertions(+), 49 deletions(-) diff --git a/lib/logflare/alerting/alert_query.ex b/lib/logflare/alerting/alert_query.ex index 06b917975d..08aa9650df 100644 --- a/lib/logflare/alerting/alert_query.ex +++ b/lib/logflare/alerting/alert_query.ex @@ -30,7 +30,7 @@ defmodule Logflare.Alerting.AlertQuery do many_to_many :backends, Logflare.Backends.Backend, join_through: "alert_queries_backends", - _ on_replace: :delete + on_replace: :delete timestamps() end diff --git a/lib/logflare/application.ex b/lib/logflare/application.ex index 621970b492..dca944a200 100644 --- a/lib/logflare/application.ex +++ b/lib/logflare/application.ex @@ -132,8 +132,6 @@ defmodule Logflare.Application do BigQueryAdaptor.partitioned_goth_child_spec() ] ++ BigQueryAdaptor.impersonated_goth_child_specs() - dbg(goth) - # only add in config cat to multi-tenant prod config_cat = case Application.get_env(:logflare, :config_cat_sdk_key) do diff --git a/lib/logflare/backends/adaptor/bigquery_adaptor.ex b/lib/logflare/backends/adaptor/bigquery_adaptor.ex index 79561e7b7b..30f580b228 100644 --- a/lib/logflare/backends/adaptor/bigquery_adaptor.ex +++ b/lib/logflare/backends/adaptor/bigquery_adaptor.ex @@ -152,14 +152,14 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do Lists all managed service accounts """ @spec list_managed_service_accounts(String.t()) :: [GoogleApi.IAM.V1.Model.ServiceAccount.t()] - def list_managed_service_accounts(project_id) do + def list_managed_service_accounts(project_id \\ nil) do + project_id = project_id || Application.get_env(:logflare, Logflare.Google)[:project_id] + get_next_page(project_id, nil) |> Enum.filter(&(&1.name =~ @service_account_prefix)) end defp handle_response({:ok, response}, project_id) do - dbg(response) - case response do %{accounts: accounts, nextPageToken: nil} -> accounts @@ -181,7 +181,6 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do pageSize: 100, pageToken: page_token ) - |> dbg() |> handle_response(project_id) end @@ -212,7 +211,6 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do accountId: managed_service_account_id(project_id, service_account_index) } ) - |> dbg() end def managed_service_account_pool_size do @@ -247,9 +245,7 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do do: {:service_account, credentials, [ - # url: - # "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/#{sub}:signJwt", - claims: %{"sub" => sub, "scope" => "https://www.googleapis.com/auth/cloud-platform"} + sub: sub ]}, else: {:service_account, credentials, scopes: scopes} diff --git a/lib/logflare/google/bigquery/gen_utils/gen_utils.ex b/lib/logflare/google/bigquery/gen_utils/gen_utils.ex index 7508ba8b78..6c1a968331 100644 --- a/lib/logflare/google/bigquery/gen_utils/gen_utils.ex +++ b/lib/logflare/google/bigquery/gen_utils/gen_utils.ex @@ -116,7 +116,6 @@ defmodule Logflare.Google.BigQuery.GenUtils do }} end - dbg(name) :telemetry.span([:logflare, :goth, :fetch], metadata, fn -> result = Goth.fetch(name) diff --git a/lib/logflare/google/resource_manager.ex b/lib/logflare/google/resource_manager.ex index 79bae42f73..3a5644eafd 100644 --- a/lib/logflare/google/resource_manager.ex +++ b/lib/logflare/google/resource_manager.ex @@ -12,6 +12,7 @@ defmodule Logflare.Google.CloudResourceManager do alias Logflare.TeamUsers alias Logflare.Billing alias Logflare.Utils.Tasks + alias Logflare.Backends.Adaptor.BigQueryAdaptor def list_projects() do conn = GenUtils.get_conn() @@ -85,42 +86,48 @@ defmodule Logflare.Google.CloudResourceManager do end defp get_service_accounts() do - for {member, roles} <- [ - {env_service_account(), - [ - "roles/bigquery.admin", - "roles/resourcemanager.projectIamAdmin", - "roles/iam.serviceAccountCreator", - "roles/iam.serviceAccountTokenCreator" - ]}, - {env_compute_engine_sa(), - [ - "roles/compute.instanceAdmin", - "roles/artifactregistry.reader", - "roles/artifactregistry.writer", - "roles/logging.logWriter", - "roles/monitoring.metricWriter" - ]}, - {env_cloud_build_sa(), - [ - "roles/cloudbuild.builds.builder", - "roles/compute.admin", - "roles/container.admin", - "roles/cloudkms.cryptoKeyDecrypter", - "roles/iam.serviceAccountUser", - "roles/editor", - "roles/cloudbuild.builds.editor", - "roles/cloudbuild.serviceAgent" - ]}, - {env_cloud_build_trigger_sa(), - [ - "roles/cloudbuild.builds.editor", - "roles/iam.serviceAccountUser", - "roles/cloudbuild.serviceAgent" - ]}, - {env_api_sa(), ["roles/editor", "roles/cloudbuild.builds.editor"]}, - {env_grafana_sa(), ["roles/bigquery.dataViewer", "roles/bigquery.jobUser"]} - ], + managed_service_accounts = + for %{email: name} <- BigQueryAdaptor.list_managed_service_accounts() do + {name, ["roles/bigquery.admin"]} + end + + for {member, roles} <- + [ + {env_service_account(), + [ + "roles/bigquery.admin", + "roles/resourcemanager.projectIamAdmin", + "roles/iam.serviceAccountCreator", + "roles/iam.serviceAccountTokenCreator" + ]}, + {env_compute_engine_sa(), + [ + "roles/compute.instanceAdmin", + "roles/artifactregistry.reader", + "roles/artifactregistry.writer", + "roles/logging.logWriter", + "roles/monitoring.metricWriter" + ]}, + {env_cloud_build_sa(), + [ + "roles/cloudbuild.builds.builder", + "roles/compute.admin", + "roles/container.admin", + "roles/cloudkms.cryptoKeyDecrypter", + "roles/iam.serviceAccountUser", + "roles/editor", + "roles/cloudbuild.builds.editor", + "roles/cloudbuild.serviceAgent" + ]}, + {env_cloud_build_trigger_sa(), + [ + "roles/cloudbuild.builds.editor", + "roles/iam.serviceAccountUser", + "roles/cloudbuild.serviceAgent" + ]}, + {env_api_sa(), ["roles/editor", "roles/cloudbuild.builds.editor"]}, + {env_grafana_sa(), ["roles/bigquery.dataViewer", "roles/bigquery.jobUser"]} + ] ++ managed_service_accounts, member, role <- roles do %Model.Binding{ diff --git a/mix.exs b/mix.exs index bc32fd6259..2c516794d5 100644 --- a/mix.exs +++ b/mix.exs @@ -150,7 +150,7 @@ defmodule Logflare.Mixfile do {:google_api_cloud_resource_manager, "~> 0.34.0"}, {:google_api_big_query, "~> 0.79.0"}, {:google_api_iam, "~> 0.45.0"}, - {:goth, "~> 1.4.5"}, + {:goth, path: "../goth"}, {:google_gax, github: "Logflare/elixir-google-gax", ref: "6772193", override: true}, # Ecto From cf65fa7db258967e7470a54a230dbbaf19342bca Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Fri, 20 Jun 2025 17:13:32 +0800 Subject: [PATCH 05/13] feat: adjust goth to use fork --- mix.exs | 2 +- mix.lock | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/mix.exs b/mix.exs index 2c516794d5..4aa128878b 100644 --- a/mix.exs +++ b/mix.exs @@ -150,7 +150,7 @@ defmodule Logflare.Mixfile do {:google_api_cloud_resource_manager, "~> 0.34.0"}, {:google_api_big_query, "~> 0.79.0"}, {:google_api_iam, "~> 0.45.0"}, - {:goth, path: "../goth"}, + {:goth, github: "Logflare/goth", branch: "feat/service-account-impersonation"}, {:google_gax, github: "Logflare/elixir-google-gax", ref: "6772193", override: true}, # Ecto diff --git a/mix.lock b/mix.lock index 09c7b12476..b0407fa8de 100644 --- a/mix.lock +++ b/mix.lock @@ -60,7 +60,7 @@ "google_api_cloud_resource_manager": {:hex, :google_api_cloud_resource_manager, "0.34.3", "9e0ffa89b895a77a9fac1fa51f92d06f690b32002224281098be500322988680", [:mix], [{:google_gax, "~> 0.4", [hex: :google_gax, repo: "hexpm", optional: false]}], "hexpm", "de1aee529a9ce424ef93b97bdffc50a1b589fb206ee3866bbd23329eb4df43f2"}, "google_api_iam": {:hex, :google_api_iam, "0.45.0", "08eeddf383bb02f74f02169a4d509479240622491c71c580229a89412f73e4d9", [:mix], [{:google_gax, "~> 0.4", [hex: :google_gax, repo: "hexpm", optional: false]}], "hexpm", "acbce36bab44412c7be4ce1c8fb0283876d5c06ce2b2e938fb927be3e35346e6"}, "google_gax": {:git, "https://github.com/Logflare/elixir-google-gax.git", "6772193c438dd72ac253ea34ab13e9c02842cba8", [ref: "6772193"]}, - "goth": {:hex, :goth, "1.4.5", "ee37f96e3519bdecd603f20e7f10c758287088b6d77c0147cd5ee68cf224aade", [:mix], [{:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:jose, "~> 1.11", [hex: :jose, repo: "hexpm", optional: false]}], "hexpm", "0fc2dce5bd710651ed179053d0300ce3a5d36afbdde11e500d57f05f398d5ed5"}, + "goth": {:git, "https://github.com/Logflare/goth.git", "23e77d38ce3acb35d949cf65fcb4331e0d14ab25", [branch: "feat/service-account-impersonation"]}, "gproc": {:hex, :gproc, "0.9.1", "f1df0364423539cf0b80e8201c8b1839e229e5f9b3ccb944c5834626998f5b8c", [:rebar3], [], "hexpm", "905088e32e72127ed9466f0bac0d8e65704ca5e73ee5a62cb073c3117916d507"}, "grpc": {:hex, :grpc, "0.9.0", "1b930a57272d4356ea65969b984c2eb04f3dab81420e1e28f0e6ec04b8f88515", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:cowboy, "~> 2.10", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowlib, "~> 2.12", [hex: :cowlib, repo: "hexpm", optional: false]}, {:gun, "~> 2.0", [hex: :gun, repo: "hexpm", optional: false]}, {:jason, ">= 0.0.0", [hex: :jason, repo: "hexpm", optional: true]}, {:mint, "~> 1.5", [hex: :mint, repo: "hexpm", optional: false]}, {:protobuf, "~> 0.11", [hex: :protobuf, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7c059698248738fcf7ad551f1d78f4a3d2e0642a72a5834f2a0b0db4b9f3d2b5"}, "grpcbox": {:hex, :grpcbox, "0.17.1", "6e040ab3ef16fe699ffb513b0ef8e2e896da7b18931a1ef817143037c454bcce", [:rebar3], [{:acceptor_pool, "~> 1.0.0", [hex: :acceptor_pool, repo: "hexpm", optional: false]}, {:chatterbox, "~> 0.15.1", [hex: :ts_chatterbox, repo: "hexpm", optional: false]}, {:ctx, "~> 0.6.0", [hex: :ctx, repo: "hexpm", optional: false]}, {:gproc, "~> 0.9.1", [hex: :gproc, repo: "hexpm", optional: false]}], "hexpm", "4a3b5d7111daabc569dc9cbd9b202a3237d81c80bf97212fbc676832cb0ceb17"}, From d727543e10cf0a1cd9907947965f27f32bc658f7 Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Mon, 23 Jun 2025 15:43:25 +0800 Subject: [PATCH 06/13] chore: formatting --- lib/logflare/google/bigquery/gen_utils/gen_utils.ex | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/logflare/google/bigquery/gen_utils/gen_utils.ex b/lib/logflare/google/bigquery/gen_utils/gen_utils.ex index 6c1a968331..0a6b24cbaa 100644 --- a/lib/logflare/google/bigquery/gen_utils/gen_utils.ex +++ b/lib/logflare/google/bigquery/gen_utils/gen_utils.ex @@ -116,7 +116,6 @@ defmodule Logflare.Google.BigQuery.GenUtils do }} end - :telemetry.span([:logflare, :goth, :fetch], metadata, fn -> result = Goth.fetch(name) {result, metadata} From 5c4bd314073856627213c3fe1c104f28d9c0532f Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Tue, 24 Jun 2025 13:21:57 +0800 Subject: [PATCH 07/13] chore: add docs and fix compilation warnings --- .../backends/adaptor/bigquery_adaptor.ex | 81 +++++++++++++++++-- 1 file changed, 74 insertions(+), 7 deletions(-) diff --git a/lib/logflare/backends/adaptor/bigquery_adaptor.ex b/lib/logflare/backends/adaptor/bigquery_adaptor.ex index 30f580b228..e288f33257 100644 --- a/lib/logflare/backends/adaptor/bigquery_adaptor.ex +++ b/lib/logflare/backends/adaptor/bigquery_adaptor.ex @@ -138,18 +138,36 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do def validate_config(changeset), do: changeset + @doc """ + Returns the email of a managed service account + + iex> managed_service_account_name("my-project", 0) + "logflare-managed-0@my-project.iam.gserviceaccount.com" + """ @spec managed_service_account_name(String.t(), non_neg_integer()) :: String.t() def managed_service_account_name(project_id, service_account_index \\ 0) do - "#{@service_account_prefix}-#{service_account_index}@#{project_id}.iam.gserviceaccount.com" + "#{managed_service_account_id(service_account_index)}@#{project_id}.iam.gserviceaccount.com" end - @spec managed_service_account_id(String.t(), non_neg_integer()) :: String.t() - def managed_service_account_id(project_id, service_account_index \\ 0) do + @doc """ + Returns the id of a managed service account + + iex> managed_service_account_id("my-project", 0) + "logflare-managed-0" + """ + @spec managed_service_account_id(non_neg_integer()) :: String.t() + def managed_service_account_id(service_account_index \\ 0) do "#{@service_account_prefix}-#{service_account_index}" end @doc """ - Lists all managed service accounts + Lists all managed service accounts. + + iex> list_managed_service_accounts() + [%GoogleApi.IAM.V1.Model.ServiceAccount{...}, ...] + + + https://hexdocs.pm/google_api_iam/0.45.0/GoogleApi.IAM.V1.Model.ServiceAccount.html """ @spec list_managed_service_accounts(String.t()) :: [GoogleApi.IAM.V1.Model.ServiceAccount.t()] def list_managed_service_accounts(project_id \\ nil) do @@ -170,11 +188,12 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do |> List.flatten() end - defp handle_response({:error, error}, project_id) do + defp handle_response({:error, error}, _project_id) do Logger.error("Error listing managed service accounts: #{inspect(error)}") [] end + # handles pagination for the IAM api defp get_next_page(project_id, page_token) do GenUtils.get_conn(:default) |> GoogleApi.IAM.V1.Api.Projects.iam_projects_service_accounts_list("projects/#{project_id}", @@ -184,6 +203,15 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do |> handle_response(project_id) end + @doc """ + Creates managed service accounts for the project. Multiple service accounts are created, each with partitioning. + + iex> create_managed_service_accounts() + :ok + """ + @spec create_managed_service_accounts(optional(String.t())) :: [ + GoogleApi.IAM.V1.Model.ServiceAccount.t() + ] def create_managed_service_accounts(project_id \\ nil) do project_id = project_id || Application.get_env(:logflare, Logflare.Google)[:project_id] @@ -208,21 +236,49 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do |> GoogleApi.IAM.V1.Api.Projects.iam_projects_service_accounts_create( "projects/#{project_id}", body: %GoogleApi.IAM.V1.Model.CreateServiceAccountRequest{ - accountId: managed_service_account_id(project_id, service_account_index) + accountId: managed_service_account_id(service_account_index) } ) end + @doc """ + Returns the size of the managed service account pool from configuration + + iex> managed_service_account_pool_size() + 5 + """ def managed_service_account_pool_size do Application.get_env(:logflare, :bigquery_backend_adaptor)[:managed_service_account_pool_size] end + @doc """ + Returns the number of partitions for each managed service account + + iex> managed_service_account_partition_count() + #{@managed_service_account_partition_count} + """ def managed_service_account_partition_count, do: @managed_service_account_partition_count + @doc """ + Returns the number of partitions for the ingest service account, which accounts for number of schedulers. + + iex> ingest_service_account_partition_count() + 5 + """ def ingest_service_account_partition_count, do: max(@managed_service_account_partition_count, System.schedulers_online()) # Goth provisioning + + @doc """ + Returns a child spec for the Goth PartitionSupervisor, which is partitioned for each service account. + + iex> partitioned_goth_child_spec() + {PartitionSupervisor, ...} + + if no base service account is set, no child spec is returned. + """ + @spec partitioned_goth_child_spec() :: Supervisor.child_spec() | nil def partitioned_goth_child_spec() do if json = Application.get_env(:goth, :json) do {PartitionSupervisor, @@ -234,8 +290,13 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do end end - def goth_child_spec(nil), do: nil + @doc """ + Returns a Goth child spec for a given service account key. If `sub` is provided, the tokens generated will be impersonated by the `sub` service account. + iex> goth_child_spec(json) + {Goth, ...} + """ + @spec goth_child_spec(String.t(), optional(String.t())) :: Supervisor.child_spec() def goth_child_spec(json, sub \\ nil) do credentials = Jason.decode!(json) scopes = ["https://www.googleapis.com/auth/cloud-platform"] @@ -279,6 +340,12 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do } end + @doc """ + Returns a list of partitioned Goth child specs with impersonation for the set service account. + + iex> impersonated_goth_child_specs() + [{PartitionSupervisor, ...}, ...] + """ def impersonated_goth_child_specs() do project_id = Application.get_env(:logflare, Logflare.Google)[:project_id] pool_size = managed_service_account_pool_size() From a7f200bd49f0c831799c2d67061088afc317faac Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Tue, 24 Jun 2025 13:36:12 +0800 Subject: [PATCH 08/13] docs: add in docs around LOGFLARE_BIGQUERY_MANAGED_SA_POOL --- config/runtime.exs | 2 +- .../docs/self-hosting/index.md | 45 ++++++++++--------- 2 files changed, 26 insertions(+), 21 deletions(-) diff --git a/config/runtime.exs b/config/runtime.exs index 67d33c4cd9..16c914627a 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -48,7 +48,7 @@ config :logflare, :bigquery_backend_adaptor, [ managed_service_account_pool_size: - System.get_env("LOGFLARE_BIGQUERY_MANAGED_SERVICE_ACCOUNT_POOL_SIZE", "0") + System.get_env("LOGFLARE_BIGQUERY_MANAGED_SA_POOL", "0") |> String.to_integer() ] |> filter_nil_kv_pairs.() diff --git a/docs/docs.logflare.com/docs/self-hosting/index.md b/docs/docs.logflare.com/docs/self-hosting/index.md index 1713f9304e..84268cc5d5 100644 --- a/docs/docs.logflare.com/docs/self-hosting/index.md +++ b/docs/docs.logflare.com/docs/self-hosting/index.md @@ -85,11 +85,26 @@ Setting `LOGFLARE_METADATA_CLUSTER=production` will result the following payload ### BigQuery Backend Configuration -| Env Var | Type | Description | -| -------------------------- | --------------------------- | ------------------------------------------------------------- | -| `GOOGLE_PROJECT_ID` | string, required | Specifies the GCP project to use. | -| `GOOGLE_PROJECT_NUMBER` | string, required | Specifies the GCP project to use. | -| `GOOGLE_DATASET_ID_APPEND` | string, defaults to `_prod` | This allows customization of the dataset created in BigQuery. | +| Env Var | Type | Description | +| ----------------------------------- | --------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `GOOGLE_PROJECT_ID` | string, required | Specifies the GCP project to use. | +| `GOOGLE_PROJECT_NUMBER` | string, required | Specifies the GCP project to use. | +| `GOOGLE_DATASET_ID_APPEND` | string, defaults to `_prod` | This allows customization of the dataset created in BigQuery. | +| `LOGFLARE_BIGQUERY_MANAGED_SA_POOL` | Integer, defaults to `0` | Sets the number of managed service accounts to create for BigQuery API operations. When set to 0, managed service accounts are disabled, and all queries will run throguh the main service account. | + +#### Managed Service Accounts + +When `LOGFLARE_BIGQUERY_MANAGED_SA_POOL` is a non-zero value, managed service accounts will use impersonation when making requests against the BigQuery REST API. Increase this value when experiencing rate limiting. + +This is due to BigQuery having a fixed 100 requests per second per user limit on their core REST API. However, service account impersonation allows us to spread out requests across multiple service accounts, thereby avoiding this limitation. + +Managed service accounts will be provisioned automatically by the server, hence it will require additional permissions: + +- `roles/resourcemanager.projectIamAdmin` +- `roles/iam.serviceAccountCreator` +- `roles/iam.serviceAccountTokenCreator` + +Without these two additional permissions, the managed service accounts feature will not work. ### PostgreSQL Backend Configuration @@ -135,21 +150,11 @@ The requirements for server startup are as follows after creating the project: To ensure that you have sufficient permissions to insert into your Google Cloud BigQuery, ensure that you have created a service account with either: -- BigQuery Admin role; or -- The following permissions: - - bigquery.datasets.create - - bigquery.datasets.get - - bigquery.datasets.getIamPolicy - - bigquery.datasets.update - - bigquery.jobs.create - - bigquery.routines.create - - bigquery.routines.update - - bigquery.tables.create - - bigquery.tables.delete - - bigquery.tables.get - - bigquery.tables.getData - - bigquery.tables.update - - bigquery.tables.updateData +- `roles/bigquery.admin` +- for [managed service accounts](#managed-service-accounts) + - `roles/resourcemanager.projectIamAdmin` + - `role/iam.serviceAccountCreator` + - `role/iam.serviceAccountTokenCreator` We recommend setting the BigQuery Admin role, as it simplifies permissions setup. From c97f72f71d42e63c1a823a9e71b7fabbcae2e168 Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Tue, 24 Jun 2025 14:16:26 +0800 Subject: [PATCH 09/13] feat: add in BigQueryAdaptor.set_iam_policy/0 wrapper --- lib/logflare/application.ex | 1 + .../backends/adaptor/bigquery_adaptor.ex | 36 +++++++-- .../backends/bigquery_adaptor_test.exs | 75 ++++++++++++++++++- .../logflare/google/resource_manager_test.exs | 5 ++ test/test_helper.exs | 14 ++++ 5 files changed, 119 insertions(+), 12 deletions(-) diff --git a/lib/logflare/application.ex b/lib/logflare/application.ex index dca944a200..26e171dc2c 100644 --- a/lib/logflare/application.ex +++ b/lib/logflare/application.ex @@ -224,6 +224,7 @@ defmodule Logflare.Application do if !SingleTenant.postgres_backend?() do BigQueryAdaptor.create_managed_service_accounts() + BigQueryAdaptor.update_iam_policy() end if SingleTenant.single_tenant?() do diff --git a/lib/logflare/backends/adaptor/bigquery_adaptor.ex b/lib/logflare/backends/adaptor/bigquery_adaptor.ex index e288f33257..e4f8d4ba68 100644 --- a/lib/logflare/backends/adaptor/bigquery_adaptor.ex +++ b/lib/logflare/backends/adaptor/bigquery_adaptor.ex @@ -13,6 +13,7 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do alias Logflare.Billing alias Logflare.Backends alias Logflare.Google.BigQuery.GenUtils + alias Logflare.Google.CloudResourceManager use Supervisor require Logger @@ -209,7 +210,7 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do iex> create_managed_service_accounts() :ok """ - @spec create_managed_service_accounts(optional(String.t())) :: [ + @spec create_managed_service_accounts(String.t()) :: [ GoogleApi.IAM.V1.Model.ServiceAccount.t() ] def create_managed_service_accounts(project_id \\ nil) do @@ -221,14 +222,23 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do :managed_service_account_pool_size ] - existing = list_managed_service_accounts(project_id) |> Enum.map(& &1.email) + accounts = + if size > 0 do + existing = list_managed_service_accounts(project_id) |> Enum.map(& &1.email) - indexes = - for i <- 0..(size - 1), managed_service_account_name(project_id, i) not in existing, do: i + indexes = + for i <- 0..(size - 1), + managed_service_account_name(project_id, i) not in existing, + do: i - for i <- indexes do - create_managed_service_account(project_id, i) - end + for i <- indexes, {:ok, sa} = create_managed_service_account(project_id, i) do + sa + end + else + [] + end + + {:ok, accounts} end defp create_managed_service_account(project_id, service_account_index) do @@ -296,7 +306,7 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do iex> goth_child_spec(json) {Goth, ...} """ - @spec goth_child_spec(String.t(), optional(String.t())) :: Supervisor.child_spec() + @spec goth_child_spec(String.t(), String.t()) :: Supervisor.child_spec() def goth_child_spec(json, sub \\ nil) do credentials = Jason.decode!(json) scopes = ["https://www.googleapis.com/auth/cloud-platform"] @@ -379,4 +389,14 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do Finch.build(method, url, headers, body) |> Finch.request(Logflare.FinchGoth, options) end + + @doc """ + Updates the IAM policy for the project. + + iex> update_iam_policy() + :ok + """ + def update_iam_policy() do + CloudResourceManager.set_iam_policy(async: false) + end end diff --git a/test/logflare/backends/bigquery_adaptor_test.exs b/test/logflare/backends/bigquery_adaptor_test.exs index 173a946827..e644a14562 100644 --- a/test/logflare/backends/bigquery_adaptor_test.exs +++ b/test/logflare/backends/bigquery_adaptor_test.exs @@ -7,10 +7,6 @@ defmodule Logflare.Backends.BigQueryAdaptorTest do alias Logflare.Backends.Adaptor.BigQueryAdaptor alias Logflare.SystemMetrics.AllLogsLogged - @subject Logflare.Backends.Adaptor.BigQueryAdaptor - - doctest @subject - setup do start_supervised!(AllLogsLogged) insert(:plan) @@ -319,4 +315,75 @@ defmodule Logflare.Backends.BigQueryAdaptorTest do end end end + + describe "managed service accounts" do + setup do + original_pool_size = + Application.get_env(:logflare, :bigquery_backend_adaptor)[ + :managed_service_account_pool_size + ] + + Application.put_env(:logflare, :bigquery_backend_adaptor, + managed_service_account_pool_size: 2 + ) + + on_exit(fn -> + Application.put_env(:logflare, :bigquery_backend_adaptor, + managed_service_account_pool_size: original_pool_size + ) + end) + + :ok + end + + test "create_managed_service_accounts/0" do + ref = self() + # Mock IAM API calls for listing existing service accounts + expect(GoogleApi.IAM.V1.Api.Projects, :iam_projects_service_accounts_list, fn + _conn, "projects/" <> _project_id, _opts -> + {:ok, %{accounts: [], nextPageToken: nil}} + end) + + # Mock IAM API calls for creating new service accounts + expect(GoogleApi.IAM.V1.Api.Projects, :iam_projects_service_accounts_create, 2, fn + _conn, "projects/" <> project_id, opts -> + # Assert the body has the correct account ID + assert %{accountId: account_id} = opts[:body] + assert account_id =~ ~r/logflare-managed-\d+/ + send(ref, {:created, account_id}) + + {:ok, + %GoogleApi.IAM.V1.Model.ServiceAccount{ + email: "#{account_id}@#{project_id}.iam.gserviceaccount.com" + }} + end) + + assert {:ok, [_, _]} = BigQueryAdaptor.create_managed_service_accounts() + + assert_receive {:created, "logflare-managed-0"}, 1000 + assert_receive {:created, "logflare-managed-1"}, 1000 + end + + test "update_iam_policy/0" do + pid = self() + # Mock IAM API calls for listing existing service accounts + expect(GoogleApi.IAM.V1.Api.Projects, :iam_projects_service_accounts_list, fn + _conn, "projects/" <> _project_id, _opts -> + {:ok, %{accounts: [], nextPageToken: nil}} + end) + + expect( + GoogleApi.CloudResourceManager.V1.Api.Projects, + :cloudresourcemanager_projects_set_iam_policy, + fn _conn, project_number, [body: body] -> + send(pid, body.policy.bindings) + {:ok, ""} + end + ) + + BigQueryAdaptor.update_iam_policy() + + assert_received [_ | _] + end + end end diff --git a/test/logflare/google/resource_manager_test.exs b/test/logflare/google/resource_manager_test.exs index 25bfa11ee4..b2af8dcad8 100644 --- a/test/logflare/google/resource_manager_test.exs +++ b/test/logflare/google/resource_manager_test.exs @@ -21,6 +21,11 @@ defmodule Logflare.Google.CloudResourceManagerTest do expected_members: expected_members } do pid = self() + # Mock IAM API calls for listing existing service accounts + stub(GoogleApi.IAM.V1.Api.Projects, :iam_projects_service_accounts_list, fn + _conn, "projects/" <> _project_id, _opts -> + {:ok, %{accounts: [], nextPageToken: nil}} + end) stub( GoogleApi.CloudResourceManager.V1.Api.Projects, diff --git a/test/test_helper.exs b/test/test_helper.exs index 426a61f175..34ff1227d5 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -40,6 +40,20 @@ Mimic.copy(Logflare.Sources.Cache) Mimic.copy(Logflare.SystemMetrics.AllLogsLogged) Mimic.copy(Logflare.Users) Mimic.copy(LogflareWeb.Plugs.RateLimiter) +Mimic.copy(Logflare.Alerting.AlertsScheduler) +Mimic.copy(Stripe.Customer) +Mimic.copy(Stripe.PaymentMethod) +Mimic.copy(Stripe.SubscriptionItem.Usage) +Mimic.copy(GoogleApi.BigQuery.V2.Api.Jobs) +Mimic.copy(GoogleApi.BigQuery.V2.Api.Tabledata) +Mimic.copy(GoogleApi.BigQuery.V2.Api.Tables) +Mimic.copy(GoogleApi.BigQuery.V2.Api.Datasets) +Mimic.copy(GoogleApi.CloudResourceManager.V1.Api.Projects) +Mimic.copy(Goth) +Mimic.copy(ConfigCat) +Mimic.copy(Finch) +Mimic.copy(ExTwilio.Message) +Mimic.copy(Broadway) {:ok, _} = Application.ensure_all_started(:ex_machina) {:ok, _} = Application.ensure_all_started(:mimic) From 7721aae570c378fed0b9d1b309adaf4cdf5ab559 Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Tue, 24 Jun 2025 17:50:29 +0800 Subject: [PATCH 10/13] fix: phash on 0 value --- lib/logflare/google/bigquery/gen_utils/gen_utils.ex | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/logflare/google/bigquery/gen_utils/gen_utils.ex b/lib/logflare/google/bigquery/gen_utils/gen_utils.ex index 0a6b24cbaa..bbccaa583f 100644 --- a/lib/logflare/google/bigquery/gen_utils/gen_utils.ex +++ b/lib/logflare/google/bigquery/gen_utils/gen_utils.ex @@ -94,7 +94,7 @@ defmodule Logflare.Google.BigQuery.GenUtils do {name, metadata} = if conn_type == :query do service_account_count = BigQueryAdaptor.managed_service_account_pool_size() - sa_index = :erlang.phash2(self(), service_account_count) + sa_index = if service_account_count > 0, do: :erlang.phash2(self(), service_account_count), else: 0 {{ Logflare.GothQuery, @@ -115,7 +115,6 @@ defmodule Logflare.Google.BigQuery.GenUtils do partition: partition }} end - :telemetry.span([:logflare, :goth, :fetch], metadata, fn -> result = Goth.fetch(name) {result, metadata} From 6779225e0a48301fab49f94cc962d6310d2ddc19 Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Tue, 24 Jun 2025 17:50:41 +0800 Subject: [PATCH 11/13] chore: formatting --- lib/logflare/google/bigquery/gen_utils/gen_utils.ex | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/logflare/google/bigquery/gen_utils/gen_utils.ex b/lib/logflare/google/bigquery/gen_utils/gen_utils.ex index bbccaa583f..443952e986 100644 --- a/lib/logflare/google/bigquery/gen_utils/gen_utils.ex +++ b/lib/logflare/google/bigquery/gen_utils/gen_utils.ex @@ -94,7 +94,9 @@ defmodule Logflare.Google.BigQuery.GenUtils do {name, metadata} = if conn_type == :query do service_account_count = BigQueryAdaptor.managed_service_account_pool_size() - sa_index = if service_account_count > 0, do: :erlang.phash2(self(), service_account_count), else: 0 + + sa_index = + if service_account_count > 0, do: :erlang.phash2(self(), service_account_count), else: 0 {{ Logflare.GothQuery, @@ -115,6 +117,7 @@ defmodule Logflare.Google.BigQuery.GenUtils do partition: partition }} end + :telemetry.span([:logflare, :goth, :fetch], metadata, fn -> result = Goth.fetch(name) {result, metadata} From 450646766ff2220c5da3cf7115a9d4087d92524e Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Wed, 25 Jun 2025 17:40:33 +0800 Subject: [PATCH 12/13] fix: add Projects copying --- test/test_helper.exs | 1 + 1 file changed, 1 insertion(+) diff --git a/test/test_helper.exs b/test/test_helper.exs index 34ff1227d5..a295717c0a 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -49,6 +49,7 @@ Mimic.copy(GoogleApi.BigQuery.V2.Api.Tabledata) Mimic.copy(GoogleApi.BigQuery.V2.Api.Tables) Mimic.copy(GoogleApi.BigQuery.V2.Api.Datasets) Mimic.copy(GoogleApi.CloudResourceManager.V1.Api.Projects) +Mimic.copy(GoogleApi.IAM.V1.Api.Projects) Mimic.copy(Goth) Mimic.copy(ConfigCat) Mimic.copy(Finch) From b48c9507ffdb86651a6f38fbb8ee127f49cbac02 Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Wed, 25 Jun 2025 19:40:21 +0800 Subject: [PATCH 13/13] chore: PR comments, refactoring --- lib/logflare/backends/adaptor/bigquery_adaptor.ex | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/lib/logflare/backends/adaptor/bigquery_adaptor.ex b/lib/logflare/backends/adaptor/bigquery_adaptor.ex index e4f8d4ba68..1a429e5dd4 100644 --- a/lib/logflare/backends/adaptor/bigquery_adaptor.ex +++ b/lib/logflare/backends/adaptor/bigquery_adaptor.ex @@ -175,7 +175,7 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do project_id = project_id || Application.get_env(:logflare, Logflare.Google)[:project_id] get_next_page(project_id, nil) - |> Enum.filter(&(&1.name =~ @service_account_prefix)) + |> Enum.filter(&(&1.name =~ @service_account_prefix <> "@")) end defp handle_response({:ok, response}, project_id) do @@ -309,16 +309,7 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do @spec goth_child_spec(String.t(), String.t()) :: Supervisor.child_spec() def goth_child_spec(json, sub \\ nil) do credentials = Jason.decode!(json) - scopes = ["https://www.googleapis.com/auth/cloud-platform"] - - source = - if sub, - do: - {:service_account, credentials, - [ - sub: sub - ]}, - else: {:service_account, credentials, scopes: scopes} + source = {:service_account, credentials, if(sub, do: [sub: sub], else: [])} { Goth,