diff --git a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h index 6d6d56222c19..7c18b8a881c2 100644 --- a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h +++ b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h @@ -100,6 +100,7 @@ struct ExportReplicatedMergeTreePartitionProcessedPartEntry struct ExportReplicatedMergeTreePartitionManifest { String transaction_id; + String query_id; String partition_id; String destination_database; String destination_table; @@ -120,6 +121,7 @@ struct ExportReplicatedMergeTreePartitionManifest { Poco::JSON::Object json; json.set("transaction_id", transaction_id); + json.set("query_id", query_id); json.set("partition_id", partition_id); json.set("destination_database", destination_database); json.set("destination_table", destination_table); @@ -153,6 +155,7 @@ struct ExportReplicatedMergeTreePartitionManifest ExportReplicatedMergeTreePartitionManifest manifest; manifest.transaction_id = json->getValue("transaction_id"); + manifest.query_id = json->getValue("query_id"); manifest.partition_id = json->getValue("partition_id"); manifest.destination_database = json->getValue("destination_database"); manifest.destination_table = json->getValue("destination_table"); diff --git a/src/Storages/MergeTree/ExportList.cpp b/src/Storages/MergeTree/ExportList.cpp index 3ee75fde831e..018c1f091ef9 100644 --- a/src/Storages/MergeTree/ExportList.cpp +++ b/src/Storages/MergeTree/ExportList.cpp @@ -13,6 +13,7 @@ ExportsListElement::ExportsListElement( UInt64 total_size_bytes_compressed_, UInt64 total_size_bytes_uncompressed_, time_t create_time_, + const String & query_id_, const ContextPtr & context) : source_table_id(source_table_id_) , destination_table_id(destination_table_id_) @@ -23,6 +24,7 @@ ExportsListElement::ExportsListElement( , total_size_bytes_compressed(total_size_bytes_compressed_) , total_size_bytes_uncompressed(total_size_bytes_uncompressed_) , create_time(create_time_) +, query_id(query_id_) { thread_group = ThreadGroup::createForMergeMutate(context); } @@ -55,6 +57,7 @@ ExportInfo ExportsListElement::getInfo() const res.peak_memory_usage = getPeakMemoryUsage(); res.create_time = create_time; res.elapsed = watch.elapsedSeconds(); + res.query_id = query_id; return res; } diff --git a/src/Storages/MergeTree/ExportList.h b/src/Storages/MergeTree/ExportList.h index d799c68cd21c..4a02826dfe44 100644 --- a/src/Storages/MergeTree/ExportList.h +++ b/src/Storages/MergeTree/ExportList.h @@ -34,6 +34,7 @@ struct ExportInfo UInt64 peak_memory_usage; time_t create_time = 0; Float64 elapsed; + String query_id; }; struct ExportsListElement : private boost::noncopyable @@ -51,6 +52,7 @@ struct ExportsListElement : private boost::noncopyable UInt64 total_size_bytes_uncompressed {0}; std::atomic bytes_read_uncompressed {0}; time_t create_time {0}; + String query_id; Stopwatch watch; ThreadGroupPtr thread_group; @@ -66,6 +68,7 @@ struct ExportsListElement : private boost::noncopyable UInt64 total_size_bytes_compressed_, UInt64 total_size_bytes_uncompressed_, time_t create_time_, + const String & query_id_, const ContextPtr & context); ~ExportsListElement(); diff --git a/src/Storages/MergeTree/ExportPartTask.cpp b/src/Storages/MergeTree/ExportPartTask.cpp index ad737fedcb21..26a15fa0083f 100644 --- a/src/Storages/MergeTree/ExportPartTask.cpp +++ b/src/Storages/MergeTree/ExportPartTask.cpp @@ -52,7 +52,7 @@ bool ExportPartTask::executeStep() { auto local_context = Context::createCopy(storage.getContext()); local_context->makeQueryContextForExportPart(); - local_context->setCurrentQueryId(manifest.transaction_id); + local_context->setCurrentQueryId(manifest.query_id); local_context->setBackgroundOperationTypeForContext(ClientInfo::BackgroundOperationType::EXPORT_PART); local_context->setSettings(manifest.settings); @@ -89,6 +89,7 @@ bool ExportPartTask::executeStep() manifest.data_part->getBytesOnDisk(), manifest.data_part->getBytesUncompressedOnDisk(), manifest.create_time, + manifest.query_id, local_context); SinkToStoragePtr sink; @@ -284,7 +285,7 @@ Priority ExportPartTask::getPriority() const String ExportPartTask::getQueryId() const { - return manifest.transaction_id; + return manifest.query_id; } } diff --git a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp index a71cf6ae0e45..f562522d2efb 100644 --- a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp +++ b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp @@ -23,7 +23,7 @@ namespace { auto context_copy = Context::createCopy(context); context_copy->makeQueryContextForExportPart(); - context_copy->setCurrentQueryId(manifest.transaction_id); + context_copy->setCurrentQueryId(manifest.query_id); context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting); context_copy->setSetting("output_format_parquet_parallel_encoding", manifest.parquet_parallel_encoding); context_copy->setSetting("max_threads", manifest.max_threads); diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index e90a6e3ffc0b..0d9316cabdd3 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -6260,6 +6260,7 @@ void MergeTreeData::exportPartToTable( dest_storage->getStorageID(), part, transaction_id, + query_context->getCurrentQueryId(), query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value, query_context->getSettingsCopy(), source_metadata_ptr, @@ -9066,6 +9067,7 @@ try part_log_elem.rows_read = (*exports_entry)->rows_read; part_log_elem.bytes_read_uncompressed = (*exports_entry)->bytes_read_uncompressed; part_log_elem.peak_memory_usage = (*exports_entry)->getPeakMemoryUsage(); + part_log_elem.query_id = (*exports_entry)->query_id; /// no need to lock because at this point no one is writing to the destination file paths part_log_elem.remote_file_paths = (*exports_entry)->destination_file_paths; diff --git a/src/Storages/MergeTree/MergeTreePartExportManifest.h b/src/Storages/MergeTree/MergeTreePartExportManifest.h index db6626d22e0a..08a8daa20ac3 100644 --- a/src/Storages/MergeTree/MergeTreePartExportManifest.h +++ b/src/Storages/MergeTree/MergeTreePartExportManifest.h @@ -46,6 +46,7 @@ struct MergeTreePartExportManifest const StorageID & destination_storage_id_, const DataPartPtr & data_part_, const String & transaction_id_, + const String & query_id_, FileAlreadyExistsPolicy file_already_exists_policy_, const Settings & settings_, const StorageMetadataPtr & metadata_snapshot_, @@ -53,6 +54,7 @@ struct MergeTreePartExportManifest : destination_storage_id(destination_storage_id_), data_part(data_part_), transaction_id(transaction_id_), + query_id(query_id_), file_already_exists_policy(file_already_exists_policy_), settings(settings_), metadata_snapshot(metadata_snapshot_), @@ -63,6 +65,7 @@ struct MergeTreePartExportManifest DataPartPtr data_part; /// Used for killing the export. String transaction_id; + String query_id; FileAlreadyExistsPolicy file_already_exists_policy; Settings settings; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 0e6b54f06f86..80dd0cbd1277 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -4546,6 +4546,7 @@ std::vector StorageReplicatedMergeTree::getPartit info.destination_table = metadata.destination_table; info.partition_id = metadata.partition_id; info.transaction_id = metadata.transaction_id; + info.query_id = metadata.query_id; info.create_time = metadata.create_time; info.source_replica = metadata.source_replica; info.parts_count = metadata.number_of_parts; @@ -8212,6 +8213,7 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & ExportReplicatedMergeTreePartitionManifest manifest; manifest.transaction_id = generateSnowflakeIDString(); + manifest.query_id = query_context->getCurrentQueryId(); manifest.partition_id = partition_id; manifest.destination_database = dest_database; manifest.destination_table = dest_table; diff --git a/src/Storages/System/StorageSystemExports.cpp b/src/Storages/System/StorageSystemExports.cpp index 30ba713d914f..fad8b43d0d2f 100644 --- a/src/Storages/System/StorageSystemExports.cpp +++ b/src/Storages/System/StorageSystemExports.cpp @@ -22,6 +22,7 @@ ColumnsDescription StorageSystemExports::getColumnsDescription() {"destination_table", std::make_shared(), "Name of the destination table."}, {"create_time", std::make_shared(), "Date and time when the export command was received in the server."}, {"part_name", std::make_shared(), "Name of the part"}, + {"query_id", std::make_shared(), "Query ID of the export operation."}, {"destination_file_paths", std::make_shared(std::make_shared()), "File paths where the part is being exported."}, {"elapsed", std::make_shared(), "The time elapsed (in seconds) since the export started."}, {"rows_read", std::make_shared(), "The number of rows read from the exported part."}, @@ -51,6 +52,7 @@ void StorageSystemExports::fillData(MutableColumns & res_columns, ContextPtr con res_columns[i++]->insert(export_info.destination_table); res_columns[i++]->insert(export_info.create_time); res_columns[i++]->insert(export_info.part_name); + res_columns[i++]->insert(export_info.query_id); Array destination_file_paths_array; destination_file_paths_array.reserve(export_info.destination_file_paths.size()); for (const auto & file_path : export_info.destination_file_paths) diff --git a/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp b/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp index 49fcd0e1233c..00a8a78af55d 100644 --- a/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp +++ b/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp @@ -26,6 +26,7 @@ ColumnsDescription StorageSystemReplicatedPartitionExports::getColumnsDescriptio {"create_time", std::make_shared(), "Date and time when the export command was submitted"}, {"partition_id", std::make_shared(), "ID of the partition"}, {"transaction_id", std::make_shared(), "ID of the transaction."}, + {"query_id", std::make_shared(), "Query ID of the export operation."}, {"source_replica", std::make_shared(), "Name of the source replica."}, {"parts", std::make_shared(std::make_shared()), "List of part names to be exported."}, {"parts_count", std::make_shared(), "Number of parts in the export."}, @@ -123,11 +124,12 @@ void StorageSystemReplicatedPartitionExports::fillData(MutableColumns & res_colu res_columns[i++]->insert(info.create_time); res_columns[i++]->insert(info.partition_id); res_columns[i++]->insert(info.transaction_id); + res_columns[i++]->insert(info.query_id); res_columns[i++]->insert(info.source_replica); Array parts_array; parts_array.reserve(info.parts.size()); for (const auto & part : info.parts) - parts_array.push_back(part); + parts_array.push_back(part); res_columns[i++]->insert(parts_array); res_columns[i++]->insert(info.parts_count); res_columns[i++]->insert(info.parts_to_do); diff --git a/src/Storages/System/StorageSystemReplicatedPartitionExports.h b/src/Storages/System/StorageSystemReplicatedPartitionExports.h index de2547437c21..d57844cbb34e 100644 --- a/src/Storages/System/StorageSystemReplicatedPartitionExports.h +++ b/src/Storages/System/StorageSystemReplicatedPartitionExports.h @@ -13,6 +13,7 @@ struct ReplicatedPartitionExportInfo String destination_table; String partition_id; String transaction_id; + String query_id; time_t create_time; String source_replica; size_t parts_count;