Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Storages/ExportReplicatedMergeTreePartitionManifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ struct ExportReplicatedMergeTreePartitionProcessedPartEntry
struct ExportReplicatedMergeTreePartitionManifest
{
String transaction_id;
String query_id;
String partition_id;
String destination_database;
String destination_table;
Expand All @@ -120,6 +121,7 @@ struct ExportReplicatedMergeTreePartitionManifest
{
Poco::JSON::Object json;
json.set("transaction_id", transaction_id);
json.set("query_id", query_id);
json.set("partition_id", partition_id);
json.set("destination_database", destination_database);
json.set("destination_table", destination_table);
Expand Down Expand Up @@ -153,6 +155,7 @@ struct ExportReplicatedMergeTreePartitionManifest

ExportReplicatedMergeTreePartitionManifest manifest;
manifest.transaction_id = json->getValue<String>("transaction_id");
manifest.query_id = json->getValue<String>("query_id");
manifest.partition_id = json->getValue<String>("partition_id");
Comment on lines 156 to 159

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Handle missing query_id in persisted manifests

This now requires query_id to be present in every persisted manifest JSON. Manifests created by older versions (or any already-queued exports before this change) won’t have that key, so json->getValue<String>("query_id") throws and the export cannot be resumed/inspected after upgrade. That can strand in-flight replicated exports until the manifest is manually fixed. Consider defaulting query_id to transaction_id or an empty string when the key is missing to keep backward compatibility.

Useful? React with 👍 / 👎.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As always, I have to explain: this feature is still very experimental and not used at all. Completely fine to make breaking changes

manifest.destination_database = json->getValue<String>("destination_database");
manifest.destination_table = json->getValue<String>("destination_table");
Expand Down
3 changes: 3 additions & 0 deletions src/Storages/MergeTree/ExportList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ ExportsListElement::ExportsListElement(
UInt64 total_size_bytes_compressed_,
UInt64 total_size_bytes_uncompressed_,
time_t create_time_,
const String & query_id_,
const ContextPtr & context)
: source_table_id(source_table_id_)
, destination_table_id(destination_table_id_)
Expand All @@ -23,6 +24,7 @@ ExportsListElement::ExportsListElement(
, total_size_bytes_compressed(total_size_bytes_compressed_)
, total_size_bytes_uncompressed(total_size_bytes_uncompressed_)
, create_time(create_time_)
, query_id(query_id_)
{
thread_group = ThreadGroup::createForMergeMutate(context);
}
Expand Down Expand Up @@ -55,6 +57,7 @@ ExportInfo ExportsListElement::getInfo() const
res.peak_memory_usage = getPeakMemoryUsage();
res.create_time = create_time;
res.elapsed = watch.elapsedSeconds();
res.query_id = query_id;
return res;
}

Expand Down
3 changes: 3 additions & 0 deletions src/Storages/MergeTree/ExportList.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ struct ExportInfo
UInt64 peak_memory_usage;
time_t create_time = 0;
Float64 elapsed;
String query_id;
};

struct ExportsListElement : private boost::noncopyable
Expand All @@ -51,6 +52,7 @@ struct ExportsListElement : private boost::noncopyable
UInt64 total_size_bytes_uncompressed {0};
std::atomic<UInt64> bytes_read_uncompressed {0};
time_t create_time {0};
String query_id;

Stopwatch watch;
ThreadGroupPtr thread_group;
Expand All @@ -66,6 +68,7 @@ struct ExportsListElement : private boost::noncopyable
UInt64 total_size_bytes_compressed_,
UInt64 total_size_bytes_uncompressed_,
time_t create_time_,
const String & query_id_,
const ContextPtr & context);

~ExportsListElement();
Expand Down
5 changes: 3 additions & 2 deletions src/Storages/MergeTree/ExportPartTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ bool ExportPartTask::executeStep()
{
auto local_context = Context::createCopy(storage.getContext());
local_context->makeQueryContextForExportPart();
local_context->setCurrentQueryId(manifest.transaction_id);
local_context->setCurrentQueryId(manifest.query_id);
local_context->setBackgroundOperationTypeForContext(ClientInfo::BackgroundOperationType::EXPORT_PART);
local_context->setSettings(manifest.settings);

Expand Down Expand Up @@ -89,6 +89,7 @@ bool ExportPartTask::executeStep()
manifest.data_part->getBytesOnDisk(),
manifest.data_part->getBytesUncompressedOnDisk(),
manifest.create_time,
manifest.query_id,
local_context);

SinkToStoragePtr sink;
Expand Down Expand Up @@ -284,7 +285,7 @@ Priority ExportPartTask::getPriority() const

String ExportPartTask::getQueryId() const
{
return manifest.transaction_id;
return manifest.query_id;
}

}
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace
{
auto context_copy = Context::createCopy(context);
context_copy->makeQueryContextForExportPart();
context_copy->setCurrentQueryId(manifest.transaction_id);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might be wondering why I was setting to transaction_id. Guess what: I don't remember :).

Honestly, I think it is simply because the query id was not being saved in the manifest and it wasn't available, so transaction_id sounded like a good replacement.

context_copy->setCurrentQueryId(manifest.query_id);
context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting);
context_copy->setSetting("output_format_parquet_parallel_encoding", manifest.parquet_parallel_encoding);
context_copy->setSetting("max_threads", manifest.max_threads);
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6260,6 +6260,7 @@ void MergeTreeData::exportPartToTable(
dest_storage->getStorageID(),
part,
transaction_id,
query_context->getCurrentQueryId(),
query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value,
query_context->getSettingsCopy(),
source_metadata_ptr,
Expand Down Expand Up @@ -9066,6 +9067,7 @@ try
part_log_elem.rows_read = (*exports_entry)->rows_read;
part_log_elem.bytes_read_uncompressed = (*exports_entry)->bytes_read_uncompressed;
part_log_elem.peak_memory_usage = (*exports_entry)->getPeakMemoryUsage();
part_log_elem.query_id = (*exports_entry)->query_id;

/// no need to lock because at this point no one is writing to the destination file paths
part_log_elem.remote_file_paths = (*exports_entry)->destination_file_paths;
Expand Down
3 changes: 3 additions & 0 deletions src/Storages/MergeTree/MergeTreePartExportManifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@ struct MergeTreePartExportManifest
const StorageID & destination_storage_id_,
const DataPartPtr & data_part_,
const String & transaction_id_,
const String & query_id_,
FileAlreadyExistsPolicy file_already_exists_policy_,
const Settings & settings_,
const StorageMetadataPtr & metadata_snapshot_,
std::function<void(CompletionCallbackResult)> completion_callback_ = {})
: destination_storage_id(destination_storage_id_),
data_part(data_part_),
transaction_id(transaction_id_),
query_id(query_id_),
file_already_exists_policy(file_already_exists_policy_),
settings(settings_),
metadata_snapshot(metadata_snapshot_),
Expand All @@ -63,6 +65,7 @@ struct MergeTreePartExportManifest
DataPartPtr data_part;
/// Used for killing the export.
String transaction_id;
String query_id;
FileAlreadyExistsPolicy file_already_exists_policy;
Settings settings;

Expand Down
2 changes: 2 additions & 0 deletions src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4546,6 +4546,7 @@ std::vector<ReplicatedPartitionExportInfo> StorageReplicatedMergeTree::getPartit
info.destination_table = metadata.destination_table;
info.partition_id = metadata.partition_id;
info.transaction_id = metadata.transaction_id;
info.query_id = metadata.query_id;
info.create_time = metadata.create_time;
info.source_replica = metadata.source_replica;
info.parts_count = metadata.number_of_parts;
Expand Down Expand Up @@ -8212,6 +8213,7 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand &
ExportReplicatedMergeTreePartitionManifest manifest;

manifest.transaction_id = generateSnowflakeIDString();
manifest.query_id = query_context->getCurrentQueryId();
manifest.partition_id = partition_id;
manifest.destination_database = dest_database;
manifest.destination_table = dest_table;
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/System/StorageSystemExports.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ ColumnsDescription StorageSystemExports::getColumnsDescription()
{"destination_table", std::make_shared<DataTypeString>(), "Name of the destination table."},
{"create_time", std::make_shared<DataTypeDateTime>(), "Date and time when the export command was received in the server."},
{"part_name", std::make_shared<DataTypeString>(), "Name of the part"},
{"query_id", std::make_shared<DataTypeString>(), "Query ID of the export operation."},
{"destination_file_paths", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "File paths where the part is being exported."},
{"elapsed", std::make_shared<DataTypeFloat64>(), "The time elapsed (in seconds) since the export started."},
{"rows_read", std::make_shared<DataTypeUInt64>(), "The number of rows read from the exported part."},
Expand Down Expand Up @@ -51,6 +52,7 @@ void StorageSystemExports::fillData(MutableColumns & res_columns, ContextPtr con
res_columns[i++]->insert(export_info.destination_table);
res_columns[i++]->insert(export_info.create_time);
res_columns[i++]->insert(export_info.part_name);
res_columns[i++]->insert(export_info.query_id);
Array destination_file_paths_array;
destination_file_paths_array.reserve(export_info.destination_file_paths.size());
for (const auto & file_path : export_info.destination_file_paths)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ ColumnsDescription StorageSystemReplicatedPartitionExports::getColumnsDescriptio
{"create_time", std::make_shared<DataTypeDateTime>(), "Date and time when the export command was submitted"},
{"partition_id", std::make_shared<DataTypeString>(), "ID of the partition"},
{"transaction_id", std::make_shared<DataTypeString>(), "ID of the transaction."},
{"query_id", std::make_shared<DataTypeString>(), "Query ID of the export operation."},
{"source_replica", std::make_shared<DataTypeString>(), "Name of the source replica."},
{"parts", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "List of part names to be exported."},
{"parts_count", std::make_shared<DataTypeUInt64>(), "Number of parts in the export."},
Expand Down Expand Up @@ -123,11 +124,12 @@ void StorageSystemReplicatedPartitionExports::fillData(MutableColumns & res_colu
res_columns[i++]->insert(info.create_time);
res_columns[i++]->insert(info.partition_id);
res_columns[i++]->insert(info.transaction_id);
res_columns[i++]->insert(info.query_id);
res_columns[i++]->insert(info.source_replica);
Array parts_array;
parts_array.reserve(info.parts.size());
for (const auto & part : info.parts)
parts_array.push_back(part);
parts_array.push_back(part);
res_columns[i++]->insert(parts_array);
res_columns[i++]->insert(info.parts_count);
res_columns[i++]->insert(info.parts_to_do);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ struct ReplicatedPartitionExportInfo
String destination_table;
String partition_id;
String transaction_id;
String query_id;
time_t create_time;
String source_replica;
size_t parts_count;
Expand Down
Loading