Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 40 additions & 13 deletions docs/en/engines/database-engines/datalake.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,50 @@ catalog_type,

The following settings are supported:

| Setting | Description |
|-------------------------|---------------------------------------------------------------------------|
| `catalog_type` | Type of catalog: `glue`, `unity` (Delta), `rest` (Iceberg), `hive` |
| `warehouse` | The warehouse/database name to use in the catalog. |
| `catalog_credential` | Authentication credential for the catalog (e.g., API key or token) |
| `auth_header` | Custom HTTP header for authentication with the catalog service |
| `auth_scope` | OAuth2 scope for authentication (if using OAuth) |
| `storage_endpoint` | Endpoint URL for the underlying storage |
| `oauth_server_uri` | URI of the OAuth2 authorization server for authentication |
| `vended_credentials` | Boolean indicating whether to use vended credentials (AWS-specific) |
| `aws_access_key_id` | AWS access key ID for S3/Glue access (if not using vended credentials) |
| `aws_secret_access_key` | AWS secret access key for S3/Glue access (if not using vended credentials) |
| `region` | AWS region for the service (e.g., `us-east-1`) |
| Setting | Description |
|-------------------------|---------------------------------------------------------------------------------|
| `catalog_type` | Type of catalog: `glue`, `unity` (Delta), `rest` (Iceberg), `hive` |
| `warehouse` | The warehouse/database name to use in the catalog. |
| `catalog_credential` | Authentication credential for the catalog (e.g., API key or token) |
| `auth_header` | Custom HTTP header for authentication with the catalog service |
| `auth_scope` | OAuth2 scope for authentication (if using OAuth) |
| `storage_endpoint` | Endpoint URL for the underlying storage |
| `oauth_server_uri` | URI of the OAuth2 authorization server for authentication |
| `vended_credentials` | Boolean indicating whether to use vended credentials (AWS-specific) |
| `aws_access_key_id` | AWS access key ID for S3/Glue access (if not using vended credentials) |
| `aws_secret_access_key` | AWS secret access key for S3/Glue access (if not using vended credentials) |
| `region` | AWS region for the service (e.g., `us-east-1`) |
| `namespaces` | Comma-separated list of namespaces, supported types: `rest`, `glue` and `unity` |
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially I thought that rest, glue and unity are supported namespaces. Probably target audience of this feature would not have this problem, but may be '... implemented for catalog types: ..'


## Examples {#examples}

See below pages for examples of using the `DataLakeCatalog` engine:

* [Unity Catalog](/use-cases/data-lake/unity-catalog)
* [Glue Catalog](/use-cases/data-lake/glue-catalog)

## Namespace filter {#namespace}

By default, ClickHouse reads tables from all namespaces available in the catalog. You can limit this behavior using the `namespaces` database setting. The value should be a comma‑separated list of namespaces that are allowed to be read.

Supported catalog types are `rest`, `glue` and `unity`.

For example, if the catalog contains three namespaces - `dev`, `stage`, and `prod` - and you want to read data only from dev and stage, set:
```
namespaces='dev,stage'
```

### Nested namespaces {#namespace-nested}

The Iceberg (`rest`) catalog supports nested namespaces. The `namespaces` filter accepts the following patterns:

- `namespace` - includes tables from the specified namespace, but not from its nested namespaces.
- `namespace.nested` - includes tables from the nested namespace, but not from the parent.
- `namespace.*` - includes tables from all nested namespaces, but not from the parent.

If you need to include both a namespace and its nested namespaces, specify both explicitly. For example:
```
namespaces='namespace,namespace.*'
```

The default value is '*', which means all namespaces are included.
1 change: 1 addition & 0 deletions src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@
M(754, UDF_EXECUTION_FAILED) \
M(755, TOO_LARGE_LIGHTWEIGHT_UPDATES) \
M(756, CANNOT_PARSE_PROMQL_QUERY) \
M(757, OUT_OF_SCOPE) \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, though may be it is better to either use existing error code (e.g. DATALAKE_DATABASE_ERROR) or make up a more specific name, e.g. CATALOG_NAMESPACE_DISABLED ?

\
M(900, DISTRIBUTED_CACHE_ERROR) \
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \
Expand Down
24 changes: 14 additions & 10 deletions src/Databases/DataLake/DatabaseDataLake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ namespace DatabaseDataLakeSetting
extern const DatabaseDataLakeSettingsString aws_access_key_id;
extern const DatabaseDataLakeSettingsString aws_secret_access_key;
extern const DatabaseDataLakeSettingsString region;
extern const DatabaseDataLakeSettingsString namespaces;
}

namespace Setting
Expand Down Expand Up @@ -121,6 +122,7 @@ std::shared_ptr<DataLake::ICatalog> DatabaseDataLake::getCatalog() const
.aws_access_key_id = settings[DatabaseDataLakeSetting::aws_access_key_id].value,
.aws_secret_access_key = settings[DatabaseDataLakeSetting::aws_secret_access_key].value,
.region = settings[DatabaseDataLakeSetting::region].value,
.namespaces = settings[DatabaseDataLakeSetting::namespaces].value
};

switch (settings[DatabaseDataLakeSetting::catalog_type].value)
Expand All @@ -135,6 +137,7 @@ std::shared_ptr<DataLake::ICatalog> DatabaseDataLake::getCatalog() const
settings[DatabaseDataLakeSetting::auth_header],
settings[DatabaseDataLakeSetting::oauth_server_uri].value,
settings[DatabaseDataLakeSetting::oauth_server_use_request_body].value,
settings[DatabaseDataLakeSetting::namespaces].value,
Context::getGlobalContextInstance());
break;
}
Expand All @@ -144,6 +147,7 @@ std::shared_ptr<DataLake::ICatalog> DatabaseDataLake::getCatalog() const
settings[DatabaseDataLakeSetting::warehouse].value,
url,
settings[DatabaseDataLakeSetting::catalog_credential].value,
settings[DatabaseDataLakeSetting::namespaces].value,
Context::getGlobalContextInstance());
break;
}
Expand Down Expand Up @@ -195,24 +199,24 @@ StorageObjectStorageConfigurationPtr DatabaseDataLake::getConfiguration(
#if USE_AWS_S3
case DB::DatabaseDataLakeStorageType::S3:
{
return std::make_shared<StorageS3IcebergConfiguration>(storage_settings);
return std::make_shared<StorageS3IcebergConfiguration>(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value);
}
#endif
#if USE_AZURE_BLOB_STORAGE
case DB::DatabaseDataLakeStorageType::Azure:
{
return std::make_shared<StorageAzureIcebergConfiguration>(storage_settings);
return std::make_shared<StorageAzureIcebergConfiguration>(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value);
}
#endif
#if USE_HDFS
case DB::DatabaseDataLakeStorageType::HDFS:
{
return std::make_shared<StorageHDFSIcebergConfiguration>(storage_settings);
return std::make_shared<StorageHDFSIcebergConfiguration>(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value);
}
#endif
case DB::DatabaseDataLakeStorageType::Local:
{
return std::make_shared<StorageLocalIcebergConfiguration>(storage_settings);
return std::make_shared<StorageLocalIcebergConfiguration>(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value);
}
/// Fake storage in case when catalog store not only
/// primary-type tables (DeltaLake or Iceberg), but for
Expand All @@ -224,7 +228,7 @@ StorageObjectStorageConfigurationPtr DatabaseDataLake::getConfiguration(
/// dependencies and the most lightweight
case DB::DatabaseDataLakeStorageType::Other:
{
return std::make_shared<StorageLocalIcebergConfiguration>(storage_settings);
return std::make_shared<StorageLocalIcebergConfiguration>(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value);
}
#if !USE_AWS_S3 || !USE_AZURE_BLOB_STORAGE || !USE_HDFS
default:
Expand All @@ -241,12 +245,12 @@ StorageObjectStorageConfigurationPtr DatabaseDataLake::getConfiguration(
#if USE_AWS_S3
case DB::DatabaseDataLakeStorageType::S3:
{
return std::make_shared<StorageS3DeltaLakeConfiguration>(storage_settings);
return std::make_shared<StorageS3DeltaLakeConfiguration>(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value);
}
#endif
case DB::DatabaseDataLakeStorageType::Local:
{
return std::make_shared<StorageLocalDeltaLakeConfiguration>(storage_settings);
return std::make_shared<StorageLocalDeltaLakeConfiguration>(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value);
}
/// Fake storage in case when catalog store not only
/// primary-type tables (DeltaLake or Iceberg), but for
Expand All @@ -258,7 +262,7 @@ StorageObjectStorageConfigurationPtr DatabaseDataLake::getConfiguration(
/// dependencies and the most lightweight
case DB::DatabaseDataLakeStorageType::Other:
{
return std::make_shared<StorageLocalDeltaLakeConfiguration>(storage_settings);
return std::make_shared<StorageLocalDeltaLakeConfiguration>(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value);
}
default:
throw Exception(ErrorCodes::BAD_ARGUMENTS,
Expand All @@ -273,12 +277,12 @@ StorageObjectStorageConfigurationPtr DatabaseDataLake::getConfiguration(
#if USE_AWS_S3
case DB::DatabaseDataLakeStorageType::S3:
{
return std::make_shared<StorageS3IcebergConfiguration>(storage_settings);
return std::make_shared<StorageS3IcebergConfiguration>(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value);
}
#endif
case DB::DatabaseDataLakeStorageType::Other:
{
return std::make_shared<StorageLocalIcebergConfiguration>(storage_settings);
return std::make_shared<StorageLocalIcebergConfiguration>(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value);
}
default:
throw Exception(ErrorCodes::BAD_ARGUMENTS,
Expand Down
1 change: 1 addition & 0 deletions src/Databases/DataLake/DatabaseDataLakeSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ namespace ErrorCodes
DECLARE(String, aws_secret_access_key, "", "Key for AWS connection for Glue Catalog'", 0) \
DECLARE(String, region, "", "Region for Glue catalog", 0) \
DECLARE(String, storage_endpoint, "", "Object storage endpoint", 0) \
DECLARE(String, namespaces, "*", "Comma-separated list of allowed namespaces", 0) \

#define LIST_OF_DATABASE_ICEBERG_SETTINGS(M, ALIAS) \
DATABASE_ICEBERG_RELATED_SETTINGS(M, ALIAS) \
Expand Down
36 changes: 26 additions & 10 deletions src/Databases/DataLake/GlueCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ namespace DB::ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int DATALAKE_DATABASE_ERROR;
extern const int OUT_OF_SCOPE;
}

namespace DB::Setting
Expand All @@ -71,14 +72,6 @@ namespace DB::StorageObjectStorageSetting
extern const StorageObjectStorageSettingsString iceberg_metadata_file_path;
}

namespace DB::DatabaseDataLakeSetting
{
extern const DatabaseDataLakeSettingsString storage_endpoint;
extern const DatabaseDataLakeSettingsString aws_access_key_id;
extern const DatabaseDataLakeSettingsString aws_secret_access_key;
extern const DatabaseDataLakeSettingsString region;
}

namespace CurrentMetrics
{
extern const Metric MarkCacheBytes;
Expand Down Expand Up @@ -158,6 +151,7 @@ GlueCatalog::GlueCatalog(
glue_client = std::make_unique<Aws::Glue::GlueClient>(chain, endpoint_provider, client_configuration);
}

boost::split(allowed_namespaces, settings.namespaces, [](char c){ return c == ','; });
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider boost::is_any_of instead of lambda.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, am I right that 'aaa, bbb' would not work because of space after comma?
Is it ok?
To fix it one can
...split ... is_any_of(", "), boost::token_compress_on)

}

GlueCatalog::~GlueCatalog() = default;
Expand All @@ -183,8 +177,9 @@ DataLake::ICatalog::Namespaces GlueCatalog::getDatabases(const std::string & pre
for (const auto & db : dbs)
{
const auto & db_name = db.GetName();
if (!db_name.starts_with(prefix))
if (!isNamespaceAllowed(db_name) || !db_name.starts_with(prefix))
continue;

result.push_back(db_name);
if (limit != 0 && result.size() >= limit)
break;
Expand Down Expand Up @@ -264,6 +259,9 @@ DB::Names GlueCatalog::getTables() const

bool GlueCatalog::existsTable(const std::string & database_name, const std::string & table_name) const
{
if (!isNamespaceAllowed(database_name))
throw DB::Exception(DB::ErrorCodes::OUT_OF_SCOPE, "Namespace {} is filtered by `namespaces` database parameter", database_name);

Aws::Glue::Model::GetTableRequest request;
request.SetDatabaseName(database_name);
request.SetName(table_name);
Expand All @@ -278,6 +276,9 @@ bool GlueCatalog::tryGetTableMetadata(
DB::ContextPtr /* context_ */,
TableMetadata & result) const
{
if (!isNamespaceAllowed(database_name))
throw DB::Exception(DB::ErrorCodes::OUT_OF_SCOPE, "Namespace {} is filtered by `namespaces` database parameter", database_name);

Aws::Glue::Model::GetTableRequest request;
request.SetDatabaseName(database_name);
request.SetName(table_name);
Expand Down Expand Up @@ -538,7 +539,7 @@ String GlueCatalog::resolveMetadataPathFromTableLocation(const String & table_lo

auto storage_settings = std::make_shared<DB::DataLakeStorageSettings>();
storage_settings->loadFromSettingsChanges(settings.allChanged());
auto configuration = std::make_shared<DB::StorageS3IcebergConfiguration>(storage_settings);
auto configuration = std::make_shared<DB::StorageS3IcebergConfiguration>(storage_settings, settings.namespaces);
configuration->initialize(args, getContext(), false);

auto object_storage = configuration->createObjectStorage(getContext(), true);
Expand Down Expand Up @@ -643,6 +644,11 @@ void GlueCatalog::createNamespaceIfNotExists(const String & namespace_name) cons

void GlueCatalog::createTable(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr /*metadata_content*/) const
{
if (!isNamespaceAllowed(namespace_name))
throw DB::Exception(DB::ErrorCodes::OUT_OF_SCOPE,
"Failed to create table {}, namespace {} is filtered by `namespaces` database parameter",
table_name, namespace_name);

createNamespaceIfNotExists(namespace_name);

Aws::Glue::Model::CreateTableRequest request;
Expand Down Expand Up @@ -715,6 +721,11 @@ bool GlueCatalog::updateMetadata(const String & namespace_name, const String & t

void GlueCatalog::dropTable(const String & namespace_name, const String & table_name) const
{
if (!isNamespaceAllowed(namespace_name))
throw DB::Exception(DB::ErrorCodes::OUT_OF_SCOPE,
"Failed to drop table {}, namespace {} is filtered by `namespaces` database parameter",
table_name, namespace_name);

Aws::Glue::Model::DeleteTableRequest request;
request.SetDatabaseName(namespace_name);
request.SetName(table_name);
Expand All @@ -728,6 +739,11 @@ void GlueCatalog::dropTable(const String & namespace_name, const String & table_
response.GetError().GetMessage());
}

bool GlueCatalog::isNamespaceAllowed(const std::string & namespace_) const
{
return allowed_namespaces.contains("*") || allowed_namespaces.contains(namespace_);
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big deal, but may be make this method virtual and move this implementation to ICatalog to avoid duplication?
Feel free to ignore if you've considered this option and decided that it is better to keep it simple.

}

#endif
3 changes: 3 additions & 0 deletions src/Databases/DataLake/GlueCatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ class GlueCatalog final : public ICatalog, private DB::WithContext
std::string region;
CatalogSettings settings;
DB::ASTPtr table_engine_definition;
std::unordered_set<std::string> allowed_namespaces;

bool isNamespaceAllowed(const std::string & namespace_) const;

DataLake::ICatalog::Namespaces getDatabases(const std::string & prefix, size_t limit = 0) const;
DB::Names getTablesForDatabase(const std::string & db_name, size_t limit = 0) const;
Expand Down
1 change: 1 addition & 0 deletions src/Databases/DataLake/ICatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ struct CatalogSettings
String aws_access_key_id;
String aws_secret_access_key;
String region;
String namespaces;

DB::SettingsChanges allChanged() const;
};
Expand Down
Loading
Loading