diff --git a/docs/en/engines/database-engines/datalake.md b/docs/en/engines/database-engines/datalake.md index 213690607f68..8b2011390d25 100644 --- a/docs/en/engines/database-engines/datalake.md +++ b/docs/en/engines/database-engines/datalake.md @@ -44,19 +44,20 @@ catalog_type, The following settings are supported: -| Setting | Description | -|-------------------------|---------------------------------------------------------------------------| -| `catalog_type` | Type of catalog: `glue`, `unity` (Delta), `rest` (Iceberg), `hive` | -| `warehouse` | The warehouse/database name to use in the catalog. | -| `catalog_credential` | Authentication credential for the catalog (e.g., API key or token) | -| `auth_header` | Custom HTTP header for authentication with the catalog service | -| `auth_scope` | OAuth2 scope for authentication (if using OAuth) | -| `storage_endpoint` | Endpoint URL for the underlying storage | -| `oauth_server_uri` | URI of the OAuth2 authorization server for authentication | -| `vended_credentials` | Boolean indicating whether to use vended credentials (AWS-specific) | -| `aws_access_key_id` | AWS access key ID for S3/Glue access (if not using vended credentials) | -| `aws_secret_access_key` | AWS secret access key for S3/Glue access (if not using vended credentials) | -| `region` | AWS region for the service (e.g., `us-east-1`) | +| Setting | Description | +|-------------------------|-----------------------------------------------------------------------------------------------| +| `catalog_type` | Type of catalog: `glue`, `unity` (Delta), `rest` (Iceberg), `hive` | +| `warehouse` | The warehouse/database name to use in the catalog. | +| `catalog_credential` | Authentication credential for the catalog (e.g., API key or token) | +| `auth_header` | Custom HTTP header for authentication with the catalog service | +| `auth_scope` | OAuth2 scope for authentication (if using OAuth) | +| `storage_endpoint` | Endpoint URL for the underlying storage | +| `oauth_server_uri` | URI of the OAuth2 authorization server for authentication | +| `vended_credentials` | Boolean indicating whether to use vended credentials (AWS-specific) | +| `aws_access_key_id` | AWS access key ID for S3/Glue access (if not using vended credentials) | +| `aws_secret_access_key` | AWS secret access key for S3/Glue access (if not using vended credentials) | +| `region` | AWS region for the service (e.g., `us-east-1`) | +| `namespaces` | Comma-separated list of namespaces, implemented for catalog types: `rest`, `glue` and `unity` | ## Examples {#examples} @@ -64,3 +65,29 @@ See below pages for examples of using the `DataLakeCatalog` engine: * [Unity Catalog](/use-cases/data-lake/unity-catalog) * [Glue Catalog](/use-cases/data-lake/glue-catalog) + +## Namespace filter {#namespace} + +By default, ClickHouse reads tables from all namespaces available in the catalog. You can limit this behavior using the `namespaces` database setting. The value should be a comma‑separated list of namespaces that are allowed to be read. + +Supported catalog types are `rest`, `glue` and `unity`. + +For example, if the catalog contains three namespaces - `dev`, `stage`, and `prod` - and you want to read data only from dev and stage, set: +``` +namespaces='dev,stage' +``` + +### Nested namespaces {#namespace-nested} + +The Iceberg (`rest`) catalog supports nested namespaces. The `namespaces` filter accepts the following patterns: + +- `namespace` - includes tables from the specified namespace, but not from its nested namespaces. +- `namespace.nested` - includes tables from the nested namespace, but not from the parent. +- `namespace.*` - includes tables from all nested namespaces, but not from the parent. + +If you need to include both a namespace and its nested namespaces, specify both explicitly. For example: +``` +namespaces='namespace,namespace.*' +``` + +The default value is '*', which means all namespaces are included. \ No newline at end of file diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index eb5b64f2f328..9041a44e29f9 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -636,6 +636,7 @@ M(754, UDF_EXECUTION_FAILED) \ M(755, TOO_LARGE_LIGHTWEIGHT_UPDATES) \ M(756, CANNOT_PARSE_PROMQL_QUERY) \ + M(757, CATALOG_NAMESPACE_DISABLED) \ \ M(900, DISTRIBUTED_CACHE_ERROR) \ M(901, CANNOT_USE_DISTRIBUTED_CACHE) \ diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index 5548ba6d916f..2a823d4e1940 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -52,6 +52,7 @@ namespace DatabaseDataLakeSetting extern const DatabaseDataLakeSettingsString aws_access_key_id; extern const DatabaseDataLakeSettingsString aws_secret_access_key; extern const DatabaseDataLakeSettingsString region; + extern const DatabaseDataLakeSettingsString namespaces; } namespace Setting @@ -121,6 +122,7 @@ std::shared_ptr DatabaseDataLake::getCatalog() const .aws_access_key_id = settings[DatabaseDataLakeSetting::aws_access_key_id].value, .aws_secret_access_key = settings[DatabaseDataLakeSetting::aws_secret_access_key].value, .region = settings[DatabaseDataLakeSetting::region].value, + .namespaces = settings[DatabaseDataLakeSetting::namespaces].value }; switch (settings[DatabaseDataLakeSetting::catalog_type].value) @@ -135,6 +137,7 @@ std::shared_ptr DatabaseDataLake::getCatalog() const settings[DatabaseDataLakeSetting::auth_header], settings[DatabaseDataLakeSetting::oauth_server_uri].value, settings[DatabaseDataLakeSetting::oauth_server_use_request_body].value, + settings[DatabaseDataLakeSetting::namespaces].value, Context::getGlobalContextInstance()); break; } @@ -144,6 +147,7 @@ std::shared_ptr DatabaseDataLake::getCatalog() const settings[DatabaseDataLakeSetting::warehouse].value, url, settings[DatabaseDataLakeSetting::catalog_credential].value, + settings[DatabaseDataLakeSetting::namespaces].value, Context::getGlobalContextInstance()); break; } @@ -195,24 +199,24 @@ StorageObjectStorageConfigurationPtr DatabaseDataLake::getConfiguration( #if USE_AWS_S3 case DB::DatabaseDataLakeStorageType::S3: { - return std::make_shared(storage_settings); + return std::make_shared(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value); } #endif #if USE_AZURE_BLOB_STORAGE case DB::DatabaseDataLakeStorageType::Azure: { - return std::make_shared(storage_settings); + return std::make_shared(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value); } #endif #if USE_HDFS case DB::DatabaseDataLakeStorageType::HDFS: { - return std::make_shared(storage_settings); + return std::make_shared(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value); } #endif case DB::DatabaseDataLakeStorageType::Local: { - return std::make_shared(storage_settings); + return std::make_shared(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value); } /// Fake storage in case when catalog store not only /// primary-type tables (DeltaLake or Iceberg), but for @@ -224,7 +228,7 @@ StorageObjectStorageConfigurationPtr DatabaseDataLake::getConfiguration( /// dependencies and the most lightweight case DB::DatabaseDataLakeStorageType::Other: { - return std::make_shared(storage_settings); + return std::make_shared(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value); } #if !USE_AWS_S3 || !USE_AZURE_BLOB_STORAGE || !USE_HDFS default: @@ -241,12 +245,12 @@ StorageObjectStorageConfigurationPtr DatabaseDataLake::getConfiguration( #if USE_AWS_S3 case DB::DatabaseDataLakeStorageType::S3: { - return std::make_shared(storage_settings); + return std::make_shared(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value); } #endif case DB::DatabaseDataLakeStorageType::Local: { - return std::make_shared(storage_settings); + return std::make_shared(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value); } /// Fake storage in case when catalog store not only /// primary-type tables (DeltaLake or Iceberg), but for @@ -258,7 +262,7 @@ StorageObjectStorageConfigurationPtr DatabaseDataLake::getConfiguration( /// dependencies and the most lightweight case DB::DatabaseDataLakeStorageType::Other: { - return std::make_shared(storage_settings); + return std::make_shared(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value); } default: throw Exception(ErrorCodes::BAD_ARGUMENTS, @@ -273,12 +277,12 @@ StorageObjectStorageConfigurationPtr DatabaseDataLake::getConfiguration( #if USE_AWS_S3 case DB::DatabaseDataLakeStorageType::S3: { - return std::make_shared(storage_settings); + return std::make_shared(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value); } #endif case DB::DatabaseDataLakeStorageType::Other: { - return std::make_shared(storage_settings); + return std::make_shared(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value); } default: throw Exception(ErrorCodes::BAD_ARGUMENTS, diff --git a/src/Databases/DataLake/DatabaseDataLakeSettings.cpp b/src/Databases/DataLake/DatabaseDataLakeSettings.cpp index 7a6b4c07b599..3d38d6c543fa 100644 --- a/src/Databases/DataLake/DatabaseDataLakeSettings.cpp +++ b/src/Databases/DataLake/DatabaseDataLakeSettings.cpp @@ -29,6 +29,7 @@ namespace ErrorCodes DECLARE(String, aws_secret_access_key, "", "Key for AWS connection for Glue Catalog'", 0) \ DECLARE(String, region, "", "Region for Glue catalog", 0) \ DECLARE(String, storage_endpoint, "", "Object storage endpoint", 0) \ + DECLARE(String, namespaces, "*", "Comma-separated list of allowed namespaces", 0) \ #define LIST_OF_DATABASE_ICEBERG_SETTINGS(M, ALIAS) \ DATABASE_ICEBERG_RELATED_SETTINGS(M, ALIAS) \ diff --git a/src/Databases/DataLake/GlueCatalog.cpp b/src/Databases/DataLake/GlueCatalog.cpp index 59a256d581a4..aeb9cfa0bdce 100644 --- a/src/Databases/DataLake/GlueCatalog.cpp +++ b/src/Databases/DataLake/GlueCatalog.cpp @@ -53,6 +53,7 @@ namespace DB::ErrorCodes { extern const int BAD_ARGUMENTS; extern const int DATALAKE_DATABASE_ERROR; + extern const int CATALOG_NAMESPACE_DISABLED; } namespace DB::Setting @@ -71,14 +72,6 @@ namespace DB::StorageObjectStorageSetting extern const StorageObjectStorageSettingsString iceberg_metadata_file_path; } -namespace DB::DatabaseDataLakeSetting -{ - extern const DatabaseDataLakeSettingsString storage_endpoint; - extern const DatabaseDataLakeSettingsString aws_access_key_id; - extern const DatabaseDataLakeSettingsString aws_secret_access_key; - extern const DatabaseDataLakeSettingsString region; -} - namespace CurrentMetrics { extern const Metric MarkCacheBytes; @@ -158,6 +151,7 @@ GlueCatalog::GlueCatalog( glue_client = std::make_unique(chain, endpoint_provider, client_configuration); } + boost::split(allowed_namespaces, settings.namespaces, boost::is_any_of(", "), boost::token_compress_on); } GlueCatalog::~GlueCatalog() = default; @@ -183,8 +177,9 @@ DataLake::ICatalog::Namespaces GlueCatalog::getDatabases(const std::string & pre for (const auto & db : dbs) { const auto & db_name = db.GetName(); - if (!db_name.starts_with(prefix)) + if (!isNamespaceAllowed(db_name) || !db_name.starts_with(prefix)) continue; + result.push_back(db_name); if (limit != 0 && result.size() >= limit) break; @@ -264,6 +259,9 @@ DB::Names GlueCatalog::getTables() const bool GlueCatalog::existsTable(const std::string & database_name, const std::string & table_name) const { + if (!isNamespaceAllowed(database_name)) + throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, "Namespace {} is filtered by `namespaces` database parameter", database_name); + Aws::Glue::Model::GetTableRequest request; request.SetDatabaseName(database_name); request.SetName(table_name); @@ -278,6 +276,9 @@ bool GlueCatalog::tryGetTableMetadata( DB::ContextPtr /* context_ */, TableMetadata & result) const { + if (!isNamespaceAllowed(database_name)) + throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, "Namespace {} is filtered by `namespaces` database parameter", database_name); + Aws::Glue::Model::GetTableRequest request; request.SetDatabaseName(database_name); request.SetName(table_name); @@ -538,7 +539,7 @@ String GlueCatalog::resolveMetadataPathFromTableLocation(const String & table_lo auto storage_settings = std::make_shared(); storage_settings->loadFromSettingsChanges(settings.allChanged()); - auto configuration = std::make_shared(storage_settings); + auto configuration = std::make_shared(storage_settings, settings.namespaces); configuration->initialize(args, getContext(), false); auto object_storage = configuration->createObjectStorage(getContext(), true); @@ -643,6 +644,11 @@ void GlueCatalog::createNamespaceIfNotExists(const String & namespace_name) cons void GlueCatalog::createTable(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr /*metadata_content*/) const { + if (!isNamespaceAllowed(namespace_name)) + throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, + "Failed to create table {}, namespace {} is filtered by `namespaces` database parameter", + table_name, namespace_name); + createNamespaceIfNotExists(namespace_name); Aws::Glue::Model::CreateTableRequest request; @@ -715,6 +721,11 @@ bool GlueCatalog::updateMetadata(const String & namespace_name, const String & t void GlueCatalog::dropTable(const String & namespace_name, const String & table_name) const { + if (!isNamespaceAllowed(namespace_name)) + throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, + "Failed to drop table {}, namespace {} is filtered by `namespaces` database parameter", + table_name, namespace_name); + Aws::Glue::Model::DeleteTableRequest request; request.SetDatabaseName(namespace_name); request.SetName(table_name); @@ -728,6 +739,11 @@ void GlueCatalog::dropTable(const String & namespace_name, const String & table_ response.GetError().GetMessage()); } +bool GlueCatalog::isNamespaceAllowed(const std::string & namespace_) const +{ + return allowed_namespaces.contains("*") || allowed_namespaces.contains(namespace_); +} + } #endif diff --git a/src/Databases/DataLake/GlueCatalog.h b/src/Databases/DataLake/GlueCatalog.h index bcecfd2368ca..c70e89c0f054 100644 --- a/src/Databases/DataLake/GlueCatalog.h +++ b/src/Databases/DataLake/GlueCatalog.h @@ -74,6 +74,9 @@ class GlueCatalog final : public ICatalog, private DB::WithContext std::string region; CatalogSettings settings; DB::ASTPtr table_engine_definition; + std::unordered_set allowed_namespaces; + + bool isNamespaceAllowed(const std::string & namespace_) const; DataLake::ICatalog::Namespaces getDatabases(const std::string & prefix, size_t limit = 0) const; DB::Names getTablesForDatabase(const std::string & db_name, size_t limit = 0) const; diff --git a/src/Databases/DataLake/ICatalog.h b/src/Databases/DataLake/ICatalog.h index 26964aa36433..d38773c0c4f3 100644 --- a/src/Databases/DataLake/ICatalog.h +++ b/src/Databases/DataLake/ICatalog.h @@ -125,6 +125,7 @@ struct CatalogSettings String aws_access_key_id; String aws_secret_access_key; String region; + String namespaces; DB::SettingsChanges allChanged() const; }; diff --git a/src/Databases/DataLake/RestCatalog.cpp b/src/Databases/DataLake/RestCatalog.cpp index 50492902de0a..6c85224cb095 100644 --- a/src/Databases/DataLake/RestCatalog.cpp +++ b/src/Databases/DataLake/RestCatalog.cpp @@ -40,6 +40,7 @@ namespace DB::ErrorCodes extern const int DATALAKE_DATABASE_ERROR; extern const int LOGICAL_ERROR; extern const int BAD_ARGUMENTS; + extern const int CATALOG_NAMESPACE_DISABLED; } namespace DataLake @@ -128,6 +129,7 @@ RestCatalog::RestCatalog( const std::string & auth_header_, const std::string & oauth_server_uri_, bool oauth_server_use_request_body_, + const std::string & namespaces_, DB::ContextPtr context_) : ICatalog(warehouse_) , DB::WithContext(context_) @@ -136,6 +138,7 @@ RestCatalog::RestCatalog( , auth_scope(auth_scope_) , oauth_server_uri(oauth_server_uri_) , oauth_server_use_request_body(oauth_server_use_request_body_) + , allowed_namespaces(namespaces_) { if (!catalog_credential_.empty()) { @@ -347,6 +350,8 @@ bool RestCatalog::empty() const bool found_table = false; auto stop_condition = [&](const std::string & namespace_name) -> bool { + if (!allowed_namespaces.isNamespaceAllowed(namespace_name, /*nested*/ false)) + return false; const auto tables = getTables(namespace_name, /* limit */1); found_table = !tables.empty(); return found_table; @@ -371,6 +376,8 @@ DB::Names RestCatalog::getTables() const runner( [=, &tables, &mutex, this] { + if (!allowed_namespaces.isNamespaceAllowed(current_namespace, /*nested*/ false)) + return; auto tables_in_namespace = getTables(current_namespace); std::lock_guard lock(mutex); std::move(tables_in_namespace.begin(), tables_in_namespace.end(), std::back_inserter(tables)); @@ -408,9 +415,21 @@ void RestCatalog::getNamespacesRecursive( break; if (func) - func(current_namespace); + { + if (allowed_namespaces.isNamespaceAllowed(current_namespace, /*nested*/ false)) + func(current_namespace); + else + { + LOG_DEBUG(log, "Tables in namespace {} are filtered", current_namespace); + } + } - getNamespacesRecursive(current_namespace, result, stop_condition, func); + if (allowed_namespaces.isNamespaceAllowed(current_namespace, /*nested*/ true)) + getNamespacesRecursive(current_namespace, result, stop_condition, func); + else + { + LOG_DEBUG(log, "Nested namespaces in namespace {} are filtered", current_namespace); + } } } @@ -513,6 +532,10 @@ RestCatalog::Namespaces RestCatalog::parseNamespaces(DB::ReadBuffer & buf, const DB::Names RestCatalog::getTables(const std::string & base_namespace, size_t limit) const { + if (!allowed_namespaces.isNamespaceAllowed(base_namespace, /*nested*/ false)) + throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, + "Namespace {} is filtered by `namespaces` database parameter", base_namespace); + auto encoded_namespace = encodeNamespaceForURI(base_namespace); const std::string endpoint = std::filesystem::path(NAMESPACES_ENDPOINT) / encoded_namespace / "tables"; @@ -581,6 +604,8 @@ bool RestCatalog::tryGetTableMetadata( } catch (const DB::Exception & ex) { + if (ex.code() == DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED) + throw; LOG_DEBUG(log, "tryGetTableMetadata response: {}", ex.what()); return false; } @@ -604,6 +629,10 @@ bool RestCatalog::getTableMetadataImpl( { LOG_DEBUG(log, "Checking table {} in namespace {}", table_name, namespace_name); + if (!allowed_namespaces.isNamespaceAllowed(namespace_name, /*nested*/ false)) + throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, + "Namespace {} is filtered by `namespaces` database parameter", namespace_name); + DB::HTTPHeaderEntries headers; if (result.requiresCredentials()) { @@ -787,6 +816,10 @@ void RestCatalog::createNamespaceIfNotExists(const String & namespace_name, cons void RestCatalog::createTable(const String & namespace_name, const String & table_name, const String & /*new_metadata_path*/, Poco::JSON::Object::Ptr metadata_content) const { + if (!allowed_namespaces.isNamespaceAllowed(namespace_name, /*nested*/ false)) + throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, + "Failed to create table {}, namespace {} is filtered by `namespaces` database parameter", table_name, namespace_name); + createNamespaceIfNotExists(namespace_name, metadata_content->getValue("location")); const std::string endpoint = fmt::format("{}/namespaces/{}/tables", base_url, namespace_name); @@ -891,6 +924,11 @@ bool RestCatalog::updateMetadata(const String & namespace_name, const String & t void RestCatalog::dropTable(const String & namespace_name, const String & table_name) const { + if (!allowed_namespaces.isNamespaceAllowed(namespace_name, /*nested*/ false)) + throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, + "Failed to drop table {}, namespace {} is filtered by `namespaces` database parameter", + table_name, namespace_name); + const std::string endpoint = fmt::format("{}/namespaces/{}/tables/{}?purgeRequested=False", base_url, namespace_name, table_name); Poco::JSON::Object::Ptr request_body = nullptr; @@ -904,6 +942,69 @@ void RestCatalog::dropTable(const String & namespace_name, const String & table_ } } +/// "alpha,alpha.a1,bravo,bravo.*,charlie,delta.d1,echo.*" +/// allows tables from +/// - "alpha" namespace +/// - "alpha.a1" namespace +/// - "bravo" namespace +/// - any nested namespaces of "bravo" +/// - "charlie" namespace, but not from nested of "charlie" +/// - "delta.d1" namespace, but not from "delta" +/// - any nested namespaces of "echo", but not "echo" itself +/// "bravo.*.b2" makes no sense for now, asterisk allows all nested +RestCatalog::AllowedNamespaces::AllowedNamespaces(const std::string & namespaces_) +{ + std::vector list_of_namespaces; + boost::split(list_of_namespaces, namespaces_, boost::is_any_of(", "), boost::token_compress_on); + for (const auto & ns : list_of_namespaces) + { + std::vector list_of_nested_namespaces; + boost::split(list_of_nested_namespaces, ns, boost::is_any_of(".")); + + size_t len = list_of_nested_namespaces.size(); + if (!len) + continue; + + AllowedNamespaces * current = &(nested_namespaces[list_of_nested_namespaces[0]]); + for (size_t i = 1; i <= len; ++i) + { + if (i == len) + current->allow_tables = true; + else + { + current = &(current->nested_namespaces[list_of_nested_namespaces[i]]); + if (list_of_nested_namespaces[i] == "*") + { + current->allow_tables = true; + break; + } + } + } + } +} + +bool RestCatalog::AllowedNamespaces::isNamespaceAllowed(const std::string & namespace_, bool nested) const +{ + // Trivial case, check here to avoid split namespace on nested + if (nested_namespaces.contains("*")) + return true; + + std::vector list_of_nested_namespaces; + boost::split(list_of_nested_namespaces, namespace_, boost::is_any_of(".")); + + const AllowedNamespaces * current = this; + for (const auto & nns : list_of_nested_namespaces) + { + if (current->nested_namespaces.contains("*")) + return true; + auto it = current->nested_namespaces.find(nns); + if (it == current->nested_namespaces.end()) + return false; + current = &(it->second); + } + + return nested ? !current->nested_namespaces.empty() : current->allow_tables; +} } diff --git a/src/Databases/DataLake/RestCatalog.h b/src/Databases/DataLake/RestCatalog.h index a98e719ff09d..730927cc3748 100644 --- a/src/Databases/DataLake/RestCatalog.h +++ b/src/Databases/DataLake/RestCatalog.h @@ -29,6 +29,7 @@ class RestCatalog final : public ICatalog, private DB::WithContext const std::string & auth_header_, const std::string & oauth_server_uri_, bool oauth_server_use_request_body_, + const std::string & namespaces_, DB::ContextPtr context_); ~RestCatalog() override = default; @@ -99,6 +100,26 @@ class RestCatalog final : public ICatalog, private DB::WithContext bool oauth_server_use_request_body; mutable std::optional access_token; +public: + class AllowedNamespaces + { + public: + AllowedNamespaces() {} + explicit AllowedNamespaces(const std::string & namespaces_); + + /// Check if nested namespaces (nesetd=true) or tables (nested=false) are allowed in namespace + bool isNamespaceAllowed(const std::string & namespace_, bool nested) const; + + private: + /// List of allowed nested namespaces + std::unordered_map nested_namespaces; + /// Tables from current level are allowed + bool allow_tables = false; + }; + +private: + AllowedNamespaces allowed_namespaces; + Poco::Net::HTTPBasicCredentials credentials{}; DB::ReadWriteBufferFromHTTPPtr createReadBuffer( diff --git a/src/Databases/DataLake/UnityCatalog.cpp b/src/Databases/DataLake/UnityCatalog.cpp index 8054d971c9e0..a0e1d8f30aa8 100644 --- a/src/Databases/DataLake/UnityCatalog.cpp +++ b/src/Databases/DataLake/UnityCatalog.cpp @@ -17,6 +17,7 @@ namespace DB::ErrorCodes { extern const int DATALAKE_DATABASE_ERROR; extern const int LOGICAL_ERROR; + extern const int CATALOG_NAMESPACE_DISABLED; } namespace @@ -140,6 +141,9 @@ bool UnityCatalog::tryGetTableMetadata( DB::ContextPtr /* context_ */, TableMetadata & result) const { + if (!isNamespaceAllowed(schema_name)) + throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, "Namespace {} is filtered by `namespaces` database parameter", schema_name); + auto full_table_name = warehouse + "." + schema_name + "." + table_name; Poco::Dynamic::Var json; std::string json_str; @@ -258,6 +262,9 @@ bool UnityCatalog::tryGetTableMetadata( bool UnityCatalog::existsTable(const std::string & schema_name, const std::string & table_name) const { + if (!isNamespaceAllowed(schema_name)) + throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, "Namespace {} is filtered by `namespaces` database parameter", schema_name); + String json_str; Poco::Dynamic::Var json; try @@ -367,7 +374,7 @@ DataLake::ICatalog::Namespaces UnityCatalog::getSchemas(const std::string & base chassert(schema_info->get("catalog_name").extract() == warehouse); UnityCatalogFullSchemaName schema_name = parseFullSchemaName(schema_info->get("full_name").extract()); - if (schema_name.schema_name.starts_with(base_prefix)) + if (isNamespaceAllowed(schema_name.schema_name) && schema_name.schema_name.starts_with(base_prefix)) schemas.push_back(schema_name.schema_name); if (limit && schemas.size() > limit) @@ -409,6 +416,7 @@ UnityCatalog::UnityCatalog( const std::string & catalog_, const std::string & base_url_, const std::string & catalog_credential_, + const std::string & namespaces_, DB::ContextPtr context_) : ICatalog(catalog_) , DB::WithContext(context_) @@ -416,6 +424,12 @@ UnityCatalog::UnityCatalog( , log(getLogger("UnityCatalog(" + catalog_ + ")")) , auth_header("Authorization", "Bearer " + catalog_credential_) { + boost::split(allowed_namespaces, namespaces_, boost::is_any_of(", "), boost::token_compress_on); +} + +bool UnityCatalog::isNamespaceAllowed(const std::string & namespace_) const +{ + return allowed_namespaces.contains("*") || allowed_namespaces.contains(namespace_); } } diff --git a/src/Databases/DataLake/UnityCatalog.h b/src/Databases/DataLake/UnityCatalog.h index 9d4dc0a74877..55ab8630db82 100644 --- a/src/Databases/DataLake/UnityCatalog.h +++ b/src/Databases/DataLake/UnityCatalog.h @@ -21,6 +21,7 @@ class UnityCatalog final : public ICatalog, private DB::WithContext const std::string & catalog_, const std::string & base_url_, const std::string & catalog_credential_, + const std::string & namespaces_, DB::ContextPtr context_); ~UnityCatalog() override = default; @@ -61,6 +62,10 @@ class UnityCatalog final : public ICatalog, private DB::WithContext Poco::Net::HTTPBasicCredentials credentials{}; + std::unordered_set allowed_namespaces; + + bool isNamespaceAllowed(const std::string & namespace_) const; + DataLake::ICatalog::Namespaces getSchemas(const std::string & base_prefix, size_t limit = 0) const; DB::Names getTablesForSchema(const std::string & schema, size_t limit = 0) const; diff --git a/src/Databases/DataLake/tests/gtest_rest_catalog_allowed_namespaces.cpp b/src/Databases/DataLake/tests/gtest_rest_catalog_allowed_namespaces.cpp new file mode 100644 index 000000000000..7a1981511c3f --- /dev/null +++ b/src/Databases/DataLake/tests/gtest_rest_catalog_allowed_namespaces.cpp @@ -0,0 +1,69 @@ +#include +#include + + +TEST(TestRestCatalogAllowedNamespaces, TestAllAllowed) +{ + DataLake::RestCatalog::AllowedNamespaces namespaces("*"); + EXPECT_TRUE(namespaces.isNamespaceAllowed("foo", /*nested*/ true)); + EXPECT_TRUE(namespaces.isNamespaceAllowed("foo", /*nested*/ false)); + EXPECT_TRUE(namespaces.isNamespaceAllowed("foo.bar", /*nested*/ true)); + EXPECT_TRUE(namespaces.isNamespaceAllowed("foo.bar", /*nested*/ false)); +} + +TEST(TestRestCatalogAllowedNamespaces, TestAllBlocked) +{ + DataLake::RestCatalog::AllowedNamespaces namespaces(""); + EXPECT_FALSE(namespaces.isNamespaceAllowed("foo", /*nested*/ true)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("foo", /*nested*/ false)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("foo.bar", /*nested*/ true)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("foo.bar", /*nested*/ false)); +} + +TEST(TestRestCatalogAllowedNamespaces, TestTableInNamespaceAllowed) +{ + DataLake::RestCatalog::AllowedNamespaces namespaces("foo"); + EXPECT_FALSE(namespaces.isNamespaceAllowed("foo", /*nested*/ true)); + EXPECT_TRUE(namespaces.isNamespaceAllowed("foo", /*nested*/ false)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("foo.bar", /*nested*/ true)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("foo.bar", /*nested*/ false)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("biz", /*nested*/ true)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("biz", /*nested*/ false)); +} + +TEST(TestRestCatalogAllowedNamespaces, TestSpecificNestedNamespaceAllowed) +{ + DataLake::RestCatalog::AllowedNamespaces namespaces("foo.bar"); + EXPECT_TRUE(namespaces.isNamespaceAllowed("foo", /*nested*/ true)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("foo", /*nested*/ false)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("foo.bar", /*nested*/ true)); + EXPECT_TRUE(namespaces.isNamespaceAllowed("foo.bar", /*nested*/ false)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("bar", /*nested*/ true)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("bar", /*nested*/ false)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("biz", /*nested*/ true)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("biz", /*nested*/ false)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("foo.biz", /*nested*/ true)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("foo.biz", /*nested*/ false)); +} + +TEST(TestRestCatalogAllowedNamespaces, TestNestedNamespacesAllowed) +{ + DataLake::RestCatalog::AllowedNamespaces namespaces("foo.*"); + EXPECT_TRUE(namespaces.isNamespaceAllowed("foo", /*nested*/ true)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("foo", /*nested*/ false)); + EXPECT_TRUE(namespaces.isNamespaceAllowed("foo.bar", /*nested*/ true)); + EXPECT_TRUE(namespaces.isNamespaceAllowed("foo.bar", /*nested*/ false)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("biz", /*nested*/ true)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("biz", /*nested*/ false)); +} + +TEST(TestRestCatalogAllowedNamespaces, TestTablesAndNestedNamespacesAllowed) +{ + DataLake::RestCatalog::AllowedNamespaces namespaces("foo,foo.*"); + EXPECT_TRUE(namespaces.isNamespaceAllowed("foo", /*nested*/ true)); + EXPECT_TRUE(namespaces.isNamespaceAllowed("foo", /*nested*/ false)); + EXPECT_TRUE(namespaces.isNamespaceAllowed("foo.bar", /*nested*/ true)); + EXPECT_TRUE(namespaces.isNamespaceAllowed("foo.bar", /*nested*/ false)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("biz", /*nested*/ true)); + EXPECT_FALSE(namespaces.isNamespaceAllowed("biz", /*nested*/ false)); +} diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index 4042793f7301..f624961b0528 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -67,7 +67,11 @@ template { public: - explicit DataLakeConfiguration(DataLakeStorageSettingsPtr settings_) : settings(settings_) {} + explicit DataLakeConfiguration( + DataLakeStorageSettingsPtr settings_, + std::optional catalog_namespaces_ = std::nullopt) + : settings(settings_) + , catalog_namespaces(catalog_namespaces_.value_or("*")) {} bool isDataLakeConfiguration() const override { return true; } @@ -287,6 +291,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl .aws_access_key_id = (*settings)[DataLakeStorageSetting::storage_aws_access_key_id].value, .aws_secret_access_key = (*settings)[DataLakeStorageSetting::storage_aws_secret_access_key].value, .region = (*settings)[DataLakeStorageSetting::storage_region].value, + .namespaces = catalog_namespaces, }; return std::make_shared( @@ -308,6 +313,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl (*settings)[DataLakeStorageSetting::storage_auth_header], (*settings)[DataLakeStorageSetting::storage_oauth_server_uri].value, (*settings)[DataLakeStorageSetting::storage_oauth_server_use_request_body].value, + catalog_namespaces, context); } @@ -366,6 +372,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl DataLakeMetadataPtr current_metadata; LoggerPtr log = getLogger("DataLakeConfiguration"); const DataLakeStorageSettingsPtr settings; + std::string catalog_namespaces; void assertInitializedDL() const { diff --git a/tests/integration/test_database_delta/test.py b/tests/integration/test_database_delta/test.py index fe4dc4d55a95..bf5aa92512e0 100644 --- a/tests/integration/test_database_delta/test.py +++ b/tests/integration/test_database_delta/test.py @@ -18,6 +18,9 @@ from helpers.test_tools import TSV +CATALOG_NAME = "unity_catalog_test_db" + + def start_unity_catalog(node): node.exec_in_container( [ @@ -417,3 +420,42 @@ def get_schemas(): return execute_spark_query(node1, f"SHOW SCHEMAS") assert schema_name in get_schemas() + + +def test_namespace_filter(started_cluster): + node = started_cluster.instances["node1"] + + # Use the same table name in all namespaces + table_name = f"table_{uuid.uuid4()}".replace("-", "_") + namespace_prefix = f"namespace_{uuid.uuid4()}_".replace("-", "_") + + + def create_namespace(suffix): + namespace = f"{namespace_prefix}{suffix}" + execute_spark_query( + node, f"CREATE SCHEMA {namespace}" + ) + execute_spark_query( + node, f"CREATE TABLE {namespace}.{table_name} (col1 int, col2 double) using Delta location '/var/lib/clickhouse/user_files/tmp/{namespace}/{table_name}'" + ) + + create_namespace("alpha"); + create_namespace("bravo"); + + node.query( + f""" + drop database if exists {CATALOG_NAME}; + create database {CATALOG_NAME} + engine DataLakeCatalog('http://localhost:8080/api/2.1/unity-catalog') + settings warehouse = 'unity', catalog_type='unity', vended_credentials=false, namespaces = '{namespace_prefix}alpha' + """, + settings={"allow_database_unity_catalog": "1"}, + ) + + assert node.query(f"SELECT name FROM system.tables WHERE database='{CATALOG_NAME}' ORDER BY name", settings={"show_data_lake_catalogs_in_system_tables": 1}) == TSV( + [ + [f"{namespace_prefix}alpha.{table_name}"], + ]) + + assert node.query(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}alpha.{table_name}`") == "0\n" + assert "is filtered by `namespaces` database parameter." in node.query_and_get_error(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}bravo.{table_name}`") diff --git a/tests/integration/test_database_glue/test.py b/tests/integration/test_database_glue/test.py index e5ffe67cc851..87472ec816b7 100644 --- a/tests/integration/test_database_glue/test.py +++ b/tests/integration/test_database_glue/test.py @@ -16,6 +16,7 @@ from pyiceberg.table.sorting import SortField, SortOrder from pyiceberg.transforms import DayTransform, IdentityTransform from helpers.config_cluster import minio_access_key, minio_secret_key +from helpers.test_tools import TSV import decimal from pyiceberg.types import ( DoubleType, @@ -642,3 +643,41 @@ def test_system_tables(started_cluster): assert int(node.query(f"SELECT count() FROM system.completions WHERE startsWith(word, '{test_ref}') SETTINGS show_data_lake_catalogs_in_system_tables = true").strip()) != 0 assert int(node.query(f"SELECT count() FROM system.completions WHERE startsWith(word, '{test_ref}')").strip()) != 0 assert int(node.query(f"SELECT count() FROM system.completions WHERE startsWith(word, '{test_ref}') SETTINGS show_data_lake_catalogs_in_system_tables = false").strip()) == 0 + + +def test_namespace_filter(started_cluster): + node = started_cluster.instances["node1"] + + # Use the same table name in all namespaces + table_name = f"table_{uuid.uuid4()}" + table2_name = f"table2_{uuid.uuid4()}" + namespace_prefix = f"namespace_{uuid.uuid4()}_" + + catalog = load_catalog_impl(started_cluster) + + def create_namespace(suffix): + namespace = f"{namespace_prefix}{suffix}" + catalog.create_namespace(namespace) + create_table(catalog, namespace, table_name, DEFAULT_SCHEMA, PartitionSpec(), DEFAULT_SORT_ORDER) + + create_namespace("alpha"); + create_namespace("bravo"); + + create_clickhouse_glue_database(started_cluster, node, CATALOG_NAME, + additional_settings={ + "namespaces": f"{namespace_prefix}alpha" + }) + + assert node.query(f"SELECT name FROM system.tables WHERE database='{CATALOG_NAME}' ORDER BY name", settings={"show_data_lake_catalogs_in_system_tables": 1}) == TSV( + [ + [f"{namespace_prefix}alpha.{table_name}"], + ]) + + assert node.query(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}alpha.{table_name}`") == "0\n" + assert "is filtered by `namespaces` database parameter." in node.query_and_get_error(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}bravo.{table_name}`") + + node.query(f"CREATE TABLE {CATALOG_NAME}.`{namespace_prefix}alpha.{table2_name}` (x String) ENGINE = IcebergS3('http://minio:9000/warehouse-glue/{namespace_prefix}alpha/a1/{table2_name}/', '{minio_access_key}', '{minio_secret_key}')") + assert "is filtered by `namespaces` database parameter." in node.query_and_get_error(f"CREATE TABLE {CATALOG_NAME}.`{namespace_prefix}bravo.{table2_name}` (x String) ENGINE = IcebergS3('http://minio:9000/warehouse-glue/{namespace_prefix}bravo/{table2_name}/', '{minio_access_key}', '{minio_secret_key}')") + + node.query(f"DROP TABLE {CATALOG_NAME}.`{namespace_prefix}alpha.{table_name}`") + assert "is filtered by `namespaces` database parameter." in node.query_and_get_error(f"DROP TABLE {CATALOG_NAME}.`{namespace_prefix}bravo.{table_name}`") diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 2de6586e3787..0f2656d7c87c 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -799,6 +799,7 @@ def test_cluster_joins(started_cluster): assert res == "Jack\tBlack\nJack\tSilver\nJohn\tBlack\nJohn\tSilver\n" + def test_gcs(started_cluster): node = started_cluster.instances["node1"] @@ -821,3 +822,69 @@ def test_gcs(started_cluster): """ ) assert "Google cloud storage converts to S3" in str(err.value) + + +def test_namespace_filter(started_cluster): + node = started_cluster.instances["node1"] + + # Use the same table name in all namespaces + table_name = f"table_{uuid.uuid4()}" + table2_name = f"table2_{uuid.uuid4()}" + namespace_prefix = f"namespace_{uuid.uuid4()}_" + + catalog = load_catalog_impl(started_cluster) + + def create_namespace(suffix): + namespace = f"{namespace_prefix}{suffix}" + catalog.create_namespace(namespace) + create_table(catalog, namespace, table_name, DEFAULT_SCHEMA, PartitionSpec(), DEFAULT_SORT_ORDER) + + create_namespace("alpha"); + create_namespace("alpha.a1"); + create_namespace("alpha.a2"); + create_namespace("bravo"); + create_namespace("bravo.b1"); + create_namespace("charlie"); + create_namespace("charlie.c1"); + create_namespace("delta"); + create_namespace("delta.d1"); + create_namespace("delta.d2"); + create_namespace("echo"); + create_namespace("echo.e1"); + + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME, + additional_settings={ + "namespaces": f"{namespace_prefix}alpha,{namespace_prefix}alpha.a1,{namespace_prefix}bravo,{namespace_prefix}bravo.*,{namespace_prefix}charlie,{namespace_prefix}delta.d1,{namespace_prefix}echo.*" + }) + + assert node.query(f"SELECT name FROM system.tables WHERE database='{CATALOG_NAME}' ORDER BY name", settings={"show_data_lake_catalogs_in_system_tables": 1}) == TSV( + [ + [f"{namespace_prefix}alpha.a1.{table_name}"], + [f"{namespace_prefix}alpha.{table_name}"], + [f"{namespace_prefix}bravo.b1.{table_name}"], + [f"{namespace_prefix}bravo.{table_name}"], + [f"{namespace_prefix}charlie.{table_name}"], + [f"{namespace_prefix}delta.d1.{table_name}"], + [f"{namespace_prefix}echo.e1.{table_name}"], + ]) + + assert node.query(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}alpha.{table_name}`") == "0\n" + assert node.query(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}alpha.a1.{table_name}`") == "0\n" + assert "is filtered by `namespaces` database parameter." in node.query_and_get_error(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}alpha.a2.{table_name}`") + assert node.query(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}bravo.{table_name}`") == "0\n" + assert node.query(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}bravo.b1.{table_name}`") == "0\n" + assert node.query(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}charlie.{table_name}`") == "0\n" + assert "is filtered by `namespaces` database parameter." in node.query_and_get_error(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}charlie.c1.{table_name}`") + assert "is filtered by `namespaces` database parameter." in node.query_and_get_error(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}delta.{table_name}`") + assert node.query(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}delta.d1.{table_name}`") == "0\n" + assert "is filtered by `namespaces` database parameter." in node.query_and_get_error(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}delta.d2.{table_name}`") + assert "is filtered by `namespaces` database parameter." in node.query_and_get_error(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}echo.{table_name}`") + assert node.query(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}echo.e1.{table_name}`") == "0\n" + + node.query(f"CREATE TABLE {CATALOG_NAME}.`{namespace_prefix}alpha.{table2_name}` (x String) ENGINE = IcebergS3('http://minio:9000/warehouse-rest/{namespace_prefix}alpha/{table2_name}/', '{minio_access_key}', '{minio_secret_key}')") + node.query(f"CREATE TABLE {CATALOG_NAME}.`{namespace_prefix}alpha.a1.{table2_name}` (x String) ENGINE = IcebergS3('http://minio:9000/warehouse-rest/{namespace_prefix}alpha/a1/{table2_name}/', '{minio_access_key}', '{minio_secret_key}')") + assert "is filtered by `namespaces` database parameter." in node.query_and_get_error(f"CREATE TABLE {CATALOG_NAME}.`{namespace_prefix}alpha.a2.{table2_name}` (x String) ENGINE = IcebergS3('http://minio:9000/warehouse-rest/{namespace_prefix}alpha/a2/{table2_name}/', '{minio_access_key}', '{minio_secret_key}')") + + node.query(f"DROP TABLE {CATALOG_NAME}.`{namespace_prefix}alpha.{table_name}`") + node.query(f"DROP TABLE {CATALOG_NAME}.`{namespace_prefix}alpha.a1.{table_name}`") + assert "is filtered by `namespaces` database parameter." in node.query_and_get_error(f"DROP TABLE {CATALOG_NAME}.`{namespace_prefix}alpha.a2.{table_name}`")