diff --git a/extension-framework/cpp-extension-lib/include/api/core/Resource.h b/extension-framework/cpp-extension-lib/include/api/core/Resource.h index c9777330bc..0bfe98e1f9 100644 --- a/extension-framework/cpp-extension-lib/include/api/core/Resource.h +++ b/extension-framework/cpp-extension-lib/include/api/core/Resource.h @@ -189,4 +189,18 @@ void useControllerServiceClassDescription(Fn&& fn) { fn(description); } +template +void registerProcessors(MinifiExtension* extension) { + (core::useProcessorClassDescription([&](const MinifiProcessorClassDefinition& definition) { + MinifiRegisterProcessor(extension, &definition); + }), ...); +} + +template +void registerControllerServices(MinifiExtension* extension) { + (core::useControllerServiceClassDescription([&](const MinifiControllerServiceClassDefinition& definition) { + MinifiRegisterControllerService(extension, &definition); + }), ...); +} + } // namespace org::apache::nifi::minifi::api::core diff --git a/extensions/gcp/CMakeLists.txt b/extensions/gcp/CMakeLists.txt index 654209d080..2fc42260c2 100644 --- a/extensions/gcp/CMakeLists.txt +++ b/extensions/gcp/CMakeLists.txt @@ -30,8 +30,8 @@ add_minifi_library(minifi-gcp SHARED ${SOURCES}) if (NOT WIN32) target_compile_options(minifi-gcp PRIVATE -Wno-error=deprecated-declarations) # Suppress deprecation warnings for std::rel_ops usage endif() -target_link_libraries(minifi-gcp ${LIBMINIFI} google-cloud-cpp::storage) -target_include_directories(minifi-gcp SYSTEM PUBLIC ${google-cloud-cpp_INCLUDE_DIRS}) +target_link_libraries(minifi-gcp minifi-cpp-extension-lib google-cloud-cpp::storage) -register_extension(minifi-gcp "GCP EXTENSIONS" GCP-EXTENSIONS "This enables Google Cloud Platform support" "extensions/gcp/tests") +target_include_directories(minifi-gcp SYSTEM PUBLIC ${google-cloud-cpp_INCLUDE_DIRS}) +register_c_api_extension(minifi-gcp "GCP EXTENSIONS" GCP-EXTENSIONS "This enables Google Cloud Platform support" "extensions/gcp/tests") diff --git a/extensions/gcp/ExtensionInitializer.cpp b/extensions/gcp/ExtensionInitializer.cpp new file mode 100644 index 0000000000..efcf37c6bb --- /dev/null +++ b/extensions/gcp/ExtensionInitializer.cpp @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../../extension-framework/cpp-extension-lib/include/api/core/Resource.h" +#include "api/core/Resource.h" +#include "api/utils/minifi-c-utils.h" +#include "processors/DeleteGCSObject.h" +#include "processors/FetchGCSObject.h" +#include "processors/ListGCSBucket.h" +#include "processors/PutGCSObject.h" + +#define MKSOC(x) #x +#define MAKESTRING(x) MKSOC(x) // NOLINT(cppcoreguidelines-macro-usage) + +namespace minifi = org::apache::nifi::minifi; + +CEXTENSIONAPI const uint32_t MinifiApiVersion = MINIFI_API_VERSION; + +CEXTENSIONAPI void MinifiInitExtension(MinifiExtensionContext* extension_context) { + MinifiExtensionCreateInfo ext_create_info{.name = minifi::api::utils::toStringView(MAKESTRING(EXTENSION_NAME)), + .version = minifi::api::utils::toStringView(MAKESTRING(EXTENSION_VERSION)), + .deinit = nullptr, + .user_data = nullptr}; + auto* extension = MinifiCreateExtension(extension_context, &ext_create_info); + minifi::api::core::registerProcessors(extension); + minifi::api::core::registerControllerServices(extension); +} diff --git a/extensions/gcp/GCPAttributes.h b/extensions/gcp/GCPAttributes.h index 5657e72005..0ec973ea60 100644 --- a/extensions/gcp/GCPAttributes.h +++ b/extensions/gcp/GCPAttributes.h @@ -19,8 +19,9 @@ #include +#include "api/core/FlowFile.h" +#include "api/core/ProcessSession.h" #include "google/cloud/storage/object_metadata.h" -#include "minifi-cpp/core/FlowFile.h" namespace org::apache::nifi::minifi::extensions::gcp { @@ -50,32 +51,32 @@ constexpr std::string_view GCS_SELF_LINK_ATTR = "gcs.self.link"; constexpr std::string_view GCS_ENCRYPTION_ALGORITHM_ATTR = "gcs.encryption.algorithm"; constexpr std::string_view GCS_ENCRYPTION_SHA256_ATTR = "gcs.encryption.sha256"; -inline void setAttributesFromObjectMetadata(core::FlowFile& flow_file, const ::google::cloud::storage::ObjectMetadata& object_metadata) { - flow_file.setAttribute(GCS_BUCKET_ATTR, object_metadata.bucket()); - flow_file.setAttribute(GCS_OBJECT_NAME_ATTR, object_metadata.name()); - flow_file.setAttribute(GCS_SIZE_ATTR, std::to_string(object_metadata.size())); - flow_file.setAttribute(GCS_CRC32C_ATTR, object_metadata.crc32c()); - flow_file.setAttribute(GCS_MD5_ATTR, object_metadata.md5_hash()); - flow_file.setAttribute(GCS_CONTENT_ENCODING_ATTR, object_metadata.content_encoding()); - flow_file.setAttribute(GCS_CONTENT_LANGUAGE_ATTR, object_metadata.content_language()); - flow_file.setAttribute(GCS_CONTENT_DISPOSITION_ATTR, object_metadata.content_disposition()); - flow_file.setAttribute(GCS_CREATE_TIME_ATTR, std::to_string(std::chrono::duration_cast(object_metadata.time_created().time_since_epoch()).count())); - flow_file.setAttribute(GCS_UPDATE_TIME_ATTR, std::to_string(std::chrono::duration_cast(object_metadata.updated().time_since_epoch()).count())); - flow_file.setAttribute(GCS_DELETE_TIME_ATTR, std::to_string(std::chrono::duration_cast(object_metadata.time_deleted().time_since_epoch()).count())); - flow_file.setAttribute(GCS_MEDIA_LINK_ATTR, object_metadata.media_link()); - flow_file.setAttribute(GCS_SELF_LINK_ATTR, object_metadata.self_link()); - flow_file.setAttribute(GCS_ETAG_ATTR, object_metadata.etag()); - flow_file.setAttribute(GCS_GENERATED_ID, object_metadata.id()); - flow_file.setAttribute(GCS_META_GENERATION, std::to_string(object_metadata.metageneration())); - flow_file.setAttribute(GCS_GENERATION, std::to_string(object_metadata.generation())); - flow_file.setAttribute(GCS_STORAGE_CLASS, object_metadata.storage_class()); +inline void setAttributesFromObjectMetadata(api::core::FlowFile& flow_file, const ::google::cloud::storage::ObjectMetadata& object_metadata, api::core::ProcessSession& session) { + session.setAttribute(flow_file, GCS_BUCKET_ATTR, object_metadata.bucket()); + session.setAttribute(flow_file, GCS_OBJECT_NAME_ATTR, object_metadata.name()); + session.setAttribute(flow_file, GCS_SIZE_ATTR, std::to_string(object_metadata.size())); + session.setAttribute(flow_file, GCS_CRC32C_ATTR, object_metadata.crc32c()); + session.setAttribute(flow_file, GCS_MD5_ATTR, object_metadata.md5_hash()); + session.setAttribute(flow_file, GCS_CONTENT_ENCODING_ATTR, object_metadata.content_encoding()); + session.setAttribute(flow_file, GCS_CONTENT_LANGUAGE_ATTR, object_metadata.content_language()); + session.setAttribute(flow_file, GCS_CONTENT_DISPOSITION_ATTR, object_metadata.content_disposition()); + session.setAttribute(flow_file, GCS_CREATE_TIME_ATTR, std::to_string(std::chrono::duration_cast(object_metadata.time_created().time_since_epoch()).count())); + session.setAttribute(flow_file, GCS_UPDATE_TIME_ATTR, std::to_string(std::chrono::duration_cast(object_metadata.updated().time_since_epoch()).count())); + session.setAttribute(flow_file, GCS_DELETE_TIME_ATTR, std::to_string(std::chrono::duration_cast(object_metadata.time_deleted().time_since_epoch()).count())); + session.setAttribute(flow_file, GCS_MEDIA_LINK_ATTR, object_metadata.media_link()); + session.setAttribute(flow_file, GCS_SELF_LINK_ATTR, object_metadata.self_link()); + session.setAttribute(flow_file, GCS_ETAG_ATTR, object_metadata.etag()); + session.setAttribute(flow_file, GCS_GENERATED_ID, object_metadata.id()); + session.setAttribute(flow_file, GCS_META_GENERATION, std::to_string(object_metadata.metageneration())); + session.setAttribute(flow_file, GCS_GENERATION, std::to_string(object_metadata.generation())); + session.setAttribute(flow_file, GCS_STORAGE_CLASS, object_metadata.storage_class()); if (object_metadata.has_customer_encryption()) { - flow_file.setAttribute(GCS_ENCRYPTION_ALGORITHM_ATTR, object_metadata.customer_encryption().encryption_algorithm); - flow_file.setAttribute(GCS_ENCRYPTION_SHA256_ATTR, object_metadata.customer_encryption().key_sha256); + session.setAttribute(flow_file, GCS_ENCRYPTION_ALGORITHM_ATTR, object_metadata.customer_encryption().encryption_algorithm); + session.setAttribute(flow_file, GCS_ENCRYPTION_SHA256_ATTR, object_metadata.customer_encryption().key_sha256); } if (object_metadata.has_owner()) { - flow_file.setAttribute(GCS_OWNER_ENTITY_ATTR, object_metadata.owner().entity); - flow_file.setAttribute(GCS_OWNER_ENTITY_ID_ATTR, object_metadata.owner().entity_id); + session.setAttribute(flow_file, GCS_OWNER_ENTITY_ATTR, object_metadata.owner().entity); + session.setAttribute(flow_file, GCS_OWNER_ENTITY_ID_ATTR, object_metadata.owner().entity_id); } } diff --git a/extensions/gcp/controllerservices/GCPCredentialsControllerService.cpp b/extensions/gcp/controllerservices/GCPCredentialsControllerService.cpp index 9e93c29422..d84bbeb4bd 100644 --- a/extensions/gcp/controllerservices/GCPCredentialsControllerService.cpp +++ b/extensions/gcp/controllerservices/GCPCredentialsControllerService.cpp @@ -18,34 +18,36 @@ #include "GCPCredentialsControllerService.h" -#include "core/Resource.h" #include "google/cloud/storage/client.h" -#include "utils/ProcessorConfigUtils.h" -#include "utils/file/FileUtils.h" namespace org::apache::nifi::minifi::extensions::gcp { -void GCPCredentialsControllerService::initialize() { - setSupportedProperties(Properties); +namespace { +// TODO(MINIFICPP-2763) use utils::file::get_content instead +std::string get_content(const std::filesystem::path& file_name) { + std::ifstream file(file_name, std::ifstream::binary); + std::string content((std::istreambuf_iterator(file)), std::istreambuf_iterator()); + return content; +} } -std::shared_ptr GCPCredentialsControllerService::createCredentialsFromJsonPath() const { - const auto json_path = getProperty(JsonFilePath.name); +std::shared_ptr GCPCredentialsControllerService::createCredentialsFromJsonPath(api::core::ControllerServiceContext& ctx) const { + const auto json_path = ctx.getProperty(JsonFilePath.name); if (!json_path) { logger_->log_error("Missing or invalid {}", JsonFilePath.name); return nullptr; } - if (!utils::file::exists(*json_path)) { + if (std::error_code ec; !std::filesystem::exists(*json_path, ec) || ec) { logger_->log_error("JSON file for GCP credentials '{}' does not exist", *json_path); return nullptr; } - return google::cloud::MakeServiceAccountCredentials(utils::file::get_content(*json_path)); + return google::cloud::MakeServiceAccountCredentials(get_content(*json_path)); } -std::shared_ptr GCPCredentialsControllerService::createCredentialsFromJsonContents() const { - auto json_contents = getProperty(JsonContents.name); +std::shared_ptr GCPCredentialsControllerService::createCredentialsFromJsonContents(api::core::ControllerServiceContext& ctx) const { + auto json_contents = ctx.getProperty(JsonContents.name); if (!json_contents) { logger_->log_error("Missing or invalid {}", JsonContents.name); return nullptr; @@ -54,9 +56,9 @@ std::shared_ptr GCPCredentialsControllerService::cre return google::cloud::MakeServiceAccountCredentials(*json_contents); } -void GCPCredentialsControllerService::onEnable() { +MinifiStatus GCPCredentialsControllerService::enableImpl(api::core::ControllerServiceContext& ctx) { std::optional credentials_location; - if (const auto value = getProperty(CredentialsLoc.name)) { + if (const auto value = ctx.getProperty(CredentialsLoc.name)) { credentials_location = magic_enum::enum_cast(*value); } if (!credentials_location) { @@ -68,15 +70,15 @@ void GCPCredentialsControllerService::onEnable() { } else if (*credentials_location == CredentialsLocation::USE_COMPUTE_ENGINE_CREDENTIALS) { credentials_ = google::cloud::MakeComputeEngineCredentials(); } else if (*credentials_location == CredentialsLocation::USE_JSON_FILE) { - credentials_ = createCredentialsFromJsonPath(); + credentials_ = createCredentialsFromJsonPath(ctx); } else if (*credentials_location == CredentialsLocation::USE_JSON_CONTENTS) { - credentials_ = createCredentialsFromJsonContents(); + credentials_ = createCredentialsFromJsonContents(ctx); } else if (*credentials_location == CredentialsLocation::USE_ANONYMOUS_CREDENTIALS) { credentials_ = google::cloud::MakeInsecureCredentials(); } if (!credentials_) logger_->log_error("Couldn't create valid credentials"); + return MINIFI_STATUS_SUCCESS; } -REGISTER_RESOURCE(GCPCredentialsControllerService, ControllerService); } // namespace org::apache::nifi::minifi::extensions::gcp diff --git a/extensions/gcp/controllerservices/GCPCredentialsControllerService.h b/extensions/gcp/controllerservices/GCPCredentialsControllerService.h index 0eb8fbf8a1..b53971ae20 100644 --- a/extensions/gcp/controllerservices/GCPCredentialsControllerService.h +++ b/extensions/gcp/controllerservices/GCPCredentialsControllerService.h @@ -18,17 +18,15 @@ #pragma once #include -#include #include +#include -#include "core/controller/ControllerServiceBase.h" -#include "minifi-cpp/core/logging/Logger.h" -#include "core/logging/LoggerFactory.h" -#include "minifi-cpp/core/PropertyDefinition.h" +#include "api/core/ControllerServiceImpl.h" +#include "api/utils/Export.h" #include "core/PropertyDefinitionBuilder.h" -#include "utils/Enum.h" - #include "google/cloud/credentials.h" +#include "minifi-cpp/core/PropertyDefinition.h" +#include "utils/Enum.h" namespace org::apache::nifi::minifi::extensions::gcp { enum class CredentialsLocation { @@ -63,7 +61,7 @@ constexpr customize_t enum_name(CredentialsLocation value) namespace org::apache::nifi::minifi::extensions::gcp { -class GCPCredentialsControllerService : public core::controller::ControllerServiceBase, public core::controller::ControllerServiceHandle { +class GCPCredentialsControllerService : public api::core::ControllerServiceImpl { public: EXTENSIONAPI static constexpr const char* Description = "Manages the credentials for Google Cloud Platform. This allows for multiple Google Cloud Platform related processors " "to reference this single controller service so that Google Cloud Platform credentials can be managed and controlled in a central location."; @@ -91,21 +89,16 @@ class GCPCredentialsControllerService : public core::controller::ControllerServi EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; - ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES - - using ControllerServiceBase::ControllerServiceBase; - - void initialize() override; - void onEnable() override; + using ControllerServiceImpl::ControllerServiceImpl; - [[nodiscard]] ControllerServiceHandle* getControllerServiceHandle() override {return this;} + MinifiStatus enableImpl(api::core::ControllerServiceContext& ctx) override; [[nodiscard]] const auto& getCredentials() const { return credentials_; } protected: - [[nodiscard]] std::shared_ptr createCredentialsFromJsonPath() const; - [[nodiscard]] std::shared_ptr createCredentialsFromJsonContents() const; + [[nodiscard]] std::shared_ptr createCredentialsFromJsonPath(api::core::ControllerServiceContext& ctx) const; + [[nodiscard]] std::shared_ptr createCredentialsFromJsonContents(api::core::ControllerServiceContext& ctx) const; std::shared_ptr credentials_; diff --git a/extensions/gcp/processors/DeleteGCSObject.cpp b/extensions/gcp/processors/DeleteGCSObject.cpp index 4ad4b43271..a7a6d923bd 100644 --- a/extensions/gcp/processors/DeleteGCSObject.cpp +++ b/extensions/gcp/processors/DeleteGCSObject.cpp @@ -17,69 +17,62 @@ #include "DeleteGCSObject.h" -#include "utils/ProcessorConfigUtils.h" #include "../GCPAttributes.h" -#include "minifi-cpp/core/FlowFile.h" -#include "minifi-cpp/core/ProcessContext.h" -#include "core/ProcessSession.h" -#include "core/Resource.h" +#include "api/core/ProcessContext.h" +#include "api/core/ProcessSession.h" +#include "api/core/Resource.h" +#include "api/utils/ProcessorConfigUtils.h" namespace gcs = ::google::cloud::storage; namespace org::apache::nifi::minifi::extensions::gcp { -void DeleteGCSObject::initialize() { - setSupportedProperties(Properties); - setSupportedRelationships(Relationships); -} -void DeleteGCSObject::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { +MinifiStatus DeleteGCSObject::onTriggerImpl(api::core::ProcessContext& context, api::core::ProcessSession& session) { gsl_Expects(gcp_credentials_); auto flow_file = session.get(); if (!flow_file) { - context.yield(); - return; + return MINIFI_STATUS_PROCESSOR_YIELD; } - auto bucket = context.getProperty(Bucket, flow_file.get()); + auto bucket = api::utils::parseOptionalProperty(context, Bucket, &flow_file); if (!bucket || bucket->empty()) { logger_->log_error("Missing bucket name"); - session.transfer(flow_file, Failure); - return; + session.transfer(std::move(flow_file), Failure); + return MINIFI_STATUS_SUCCESS; } - auto object_name = context.getProperty(Key, flow_file.get()); + auto object_name = api::utils::parseOptionalProperty(context, Key, &flow_file); if (!object_name || object_name->empty()) { logger_->log_error("Missing object name"); - session.transfer(flow_file, Failure); - return; + session.transfer(std::move(flow_file), Failure); + return MINIFI_STATUS_SUCCESS; } gcs::Generation generation; - if (const auto object_generation_str = context.getProperty(ObjectGeneration, flow_file.get()); object_generation_str && !object_generation_str->empty()) { + if (auto object_generation_str = api::utils::parseOptionalProperty(context, ObjectGeneration, &flow_file); object_generation_str && !object_generation_str->empty()) { if (const auto geni64 = parsing::parseIntegral(*object_generation_str)) { generation = gcs::Generation{*geni64}; } else { logger_->log_error("Invalid generation: {}", *object_generation_str); - session.transfer(flow_file, Failure); - return; + session.transfer(std::move(flow_file), Failure); + return MINIFI_STATUS_SUCCESS; } } auto status = getClient().DeleteObject(*bucket, *object_name, generation, gcs::IfGenerationNotMatch(0)); if (!status.ok()) { - flow_file->setAttribute(GCS_STATUS_MESSAGE, status.message()); - flow_file->setAttribute(GCS_ERROR_REASON, status.error_info().reason()); - flow_file->setAttribute(GCS_ERROR_DOMAIN, status.error_info().domain()); + session.setAttribute(flow_file, GCS_STATUS_MESSAGE, status.message()); + session.setAttribute(flow_file, GCS_ERROR_REASON, status.error_info().reason()); + session.setAttribute(flow_file, GCS_ERROR_DOMAIN, status.error_info().domain()); logger_->log_error("Failed to delete {} object from {} bucket on Google Cloud Storage {} {}", *object_name, *bucket, status.message(), status.error_info().reason()); - session.transfer(flow_file, Failure); - return; + session.transfer(std::move(flow_file), Failure); + return MINIFI_STATUS_SUCCESS; } - session.transfer(flow_file, Success); + session.transfer(std::move(flow_file), Success); + return MINIFI_STATUS_SUCCESS; } -REGISTER_RESOURCE(DeleteGCSObject, Processor); - } // namespace org::apache::nifi::minifi::extensions::gcp diff --git a/extensions/gcp/processors/DeleteGCSObject.h b/extensions/gcp/processors/DeleteGCSObject.h index a5d0228476..48364cd45e 100644 --- a/extensions/gcp/processors/DeleteGCSObject.h +++ b/extensions/gcp/processors/DeleteGCSObject.h @@ -17,24 +17,20 @@ #pragma once -#include -#include -#include - #include "../GCPAttributes.h" #include "GCSProcessor.h" -#include "core/logging/LoggerFactory.h" #include "minifi-cpp/core/OutputAttributeDefinition.h" #include "minifi-cpp/core/PropertyDefinition.h" #include "core/PropertyDefinitionBuilder.h" #include "utils/ArrayUtils.h" +#include "minifi-cpp/core/Annotation.h" + namespace org::apache::nifi::minifi::extensions::gcp { class DeleteGCSObject : public GCSProcessor { public: using GCSProcessor::GCSProcessor; - ~DeleteGCSObject() override = default; EXTENSIONAPI static constexpr const char* Description = "Deletes an object from a Google Cloud Bucket."; @@ -79,10 +75,8 @@ class DeleteGCSObject : public GCSProcessor { EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED; EXTENSIONAPI static constexpr bool IsSingleThreaded = false; - ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS - - void initialize() override; - void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; + protected: + MinifiStatus onTriggerImpl(api::core::ProcessContext& context, api::core::ProcessSession& session) override; }; } // namespace org::apache::nifi::minifi::extensions::gcp diff --git a/extensions/gcp/processors/FetchGCSObject.cpp b/extensions/gcp/processors/FetchGCSObject.cpp index a42799e281..0289cb63a2 100644 --- a/extensions/gcp/processors/FetchGCSObject.cpp +++ b/extensions/gcp/processors/FetchGCSObject.cpp @@ -19,11 +19,9 @@ #include -#include "core/Resource.h" -#include "minifi-cpp/core/FlowFile.h" -#include "minifi-cpp/core/ProcessContext.h" -#include "core/ProcessSession.h" #include "../GCPAttributes.h" +#include "api/utils/ProcessorConfigUtils.h" +#include "minifi-cpp/io/OutputStream.h" namespace gcs = ::google::cloud::storage; @@ -82,80 +80,78 @@ class FetchFromGCSCallback { }; } // namespace - -void FetchGCSObject::initialize() { - setSupportedProperties(Properties); - setSupportedRelationships(Relationships); -} - -void FetchGCSObject::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) { - GCSProcessor::onSchedule(context, session_factory); +MinifiStatus FetchGCSObject::onScheduleImpl(api::core::ProcessContext& context) { + const auto status = GCSProcessor::onScheduleImpl(context); + if (MINIFI_STATUS_SUCCESS != status) { + return status; + } if (auto encryption_key = context.getProperty(EncryptionKey)) { try { encryption_key_ = gcs::EncryptionKey::FromBase64Key(*encryption_key); } catch (const google::cloud::RuntimeStatusError&) { - throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Could not decode the base64-encoded encryption key from property " + std::string(EncryptionKey.name)); } + logger_->log_error("Could not decode the base64-encoded encryption key from property {}", std::string(EncryptionKey.name)); + return MINIFI_STATUS_UNKNOWN_ERROR; + } } + return MINIFI_STATUS_SUCCESS; } -void FetchGCSObject::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { +MinifiStatus FetchGCSObject::onTriggerImpl(api::core::ProcessContext& context, api::core::ProcessSession& session) { gsl_Expects(gcp_credentials_); auto flow_file = session.get(); if (!flow_file) { - context.yield(); - return; + return MINIFI_STATUS_PROCESSOR_YIELD; } - auto bucket = context.getProperty(Bucket, flow_file.get()); + auto bucket = api::utils::parseOptionalProperty(context, Bucket, &flow_file); if (!bucket || bucket->empty()) { logger_->log_error("Missing bucket name"); - session.transfer(flow_file, Failure); - return; + session.transfer(std::move(flow_file), Failure); + return MINIFI_STATUS_SUCCESS; } - auto object_name = context.getProperty(Key, flow_file.get()); + auto object_name = api::utils::parseOptionalProperty(context, Key, &flow_file); if (!object_name || object_name->empty()) { logger_->log_error("Missing object name"); - session.transfer(flow_file, Failure); - return; + session.transfer(std::move(flow_file), Failure); + return MINIFI_STATUS_SUCCESS; } gcs::Client client = getClient(); FetchFromGCSCallback callback(client, *bucket, *object_name); callback.setEncryptionKey(encryption_key_); - if (const auto object_generation_str = context.getProperty(ObjectGeneration, flow_file.get()); object_generation_str && !object_generation_str->empty()) { + if (const auto object_generation_str = api::utils::parseOptionalProperty(context, ObjectGeneration, &flow_file); object_generation_str && !object_generation_str->empty()) { if (const auto geni64 = parsing::parseIntegral(*object_generation_str)) { gcs::Generation generation = gcs::Generation{*geni64}; callback.setGeneration(generation); } else { logger_->log_error("Invalid generation: {}", *object_generation_str); - session.transfer(flow_file, Failure); - return; + session.transfer(std::move(flow_file), Failure); + return MINIFI_STATUS_SUCCESS; } } session.write(flow_file, std::ref(callback)); if (!callback.getStatus().ok()) { - flow_file->setAttribute(GCS_STATUS_MESSAGE, callback.getStatus().message()); - flow_file->setAttribute(GCS_ERROR_REASON, callback.getStatus().error_info().reason()); - flow_file->setAttribute(GCS_ERROR_DOMAIN, callback.getStatus().error_info().domain()); + session.setAttribute(flow_file, GCS_STATUS_MESSAGE, callback.getStatus().message()); + session.setAttribute(flow_file, GCS_ERROR_REASON, callback.getStatus().error_info().reason()); + session.setAttribute(flow_file, GCS_ERROR_DOMAIN, callback.getStatus().error_info().domain()); logger_->log_error("Failed to fetch from Google Cloud Storage {} {}", callback.getStatus().message(), callback.getStatus().error_info().reason()); - session.transfer(flow_file, Failure); - return; + session.transfer(std::move(flow_file), Failure); + return MINIFI_STATUS_SUCCESS; } if (auto generation = callback.getGeneration()) - flow_file->setAttribute(GCS_GENERATION, std::to_string(*generation)); + session.setAttribute(flow_file, GCS_GENERATION, std::to_string(*generation)); if (auto meta_generation = callback.getMetaGeneration()) - flow_file->setAttribute(GCS_META_GENERATION, std::to_string(*meta_generation)); + session.setAttribute(flow_file, GCS_META_GENERATION, std::to_string(*meta_generation)); if (auto storage_class = callback.getStorageClass()) - flow_file->setAttribute(GCS_STORAGE_CLASS, *storage_class); - session.transfer(flow_file, Success); + session.setAttribute(flow_file, GCS_STORAGE_CLASS, *storage_class); + session.transfer(std::move(flow_file), Success); + return MINIFI_STATUS_SUCCESS; } -REGISTER_RESOURCE(FetchGCSObject, Processor); - } // namespace org::apache::nifi::minifi::extensions::gcp diff --git a/extensions/gcp/processors/FetchGCSObject.h b/extensions/gcp/processors/FetchGCSObject.h index 694ad1f5e0..51d72dab98 100644 --- a/extensions/gcp/processors/FetchGCSObject.h +++ b/extensions/gcp/processors/FetchGCSObject.h @@ -17,24 +17,20 @@ #pragma once -#include -#include -#include - #include "../GCPAttributes.h" #include "GCSProcessor.h" #include "minifi-cpp/core/PropertyDefinition.h" #include "minifi-cpp/core/RelationshipDefinition.h" #include "google/cloud/storage/well_known_headers.h" -#include "core/logging/LoggerFactory.h" #include "utils/ArrayUtils.h" +#include "minifi-cpp/core/Annotation.h" + namespace org::apache::nifi::minifi::extensions::gcp { class FetchGCSObject : public GCSProcessor { public: using GCSProcessor::GCSProcessor; - ~FetchGCSObject() override = default; EXTENSIONAPI static constexpr const char* Description = "Fetches a file from a Google Cloud Bucket. Designed to be used in tandem with ListGCSBucket."; @@ -80,11 +76,9 @@ class FetchGCSObject : public GCSProcessor { EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED; EXTENSIONAPI static constexpr bool IsSingleThreaded = false; - ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS - - void initialize() override; - void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; - void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; + protected: + MinifiStatus onScheduleImpl(api::core::ProcessContext& context) override; + MinifiStatus onTriggerImpl(api::core::ProcessContext& context, api::core::ProcessSession& session) override; private: google::cloud::storage::EncryptionKey encryption_key_; diff --git a/extensions/gcp/processors/GCSProcessor.cpp b/extensions/gcp/processors/GCSProcessor.cpp index 91d48d64a6..f5361d8a84 100644 --- a/extensions/gcp/processors/GCSProcessor.cpp +++ b/extensions/gcp/processors/GCSProcessor.cpp @@ -17,37 +17,35 @@ #include "GCSProcessor.h" -#include "utils/ProcessorConfigUtils.h" - #include "../controllerservices/GCPCredentialsControllerService.h" -#include "minifi-cpp/core/ProcessContext.h" -#include "core/ProcessSession.h" +#include "api/utils/ProcessorConfigUtils.h" namespace gcs = ::google::cloud::storage; namespace org::apache::nifi::minifi::extensions::gcp { -std::shared_ptr GCSProcessor::getCredentials(core::ProcessContext& context) const { - auto gcp_credentials_controller_service = utils::parseOptionalControllerService(context, GCSProcessor::GCPCredentials, getUUID()); - if (gcp_credentials_controller_service) { +std::shared_ptr GCSProcessor::getCredentials(const api::core::ProcessContext& context) { + if (const auto gcp_credentials_controller_service = api::utils::parseOptionalControllerService(context, + GCPCredentials)) { return gcp_credentials_controller_service->getCredentials(); } return nullptr; } -void GCSProcessor::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { - if (auto number_of_retries = utils::parseOptionalU64Property(context, NumberOfRetries)) { +MinifiStatus GCSProcessor::onScheduleImpl(api::core::ProcessContext& context) { + if (const auto number_of_retries = api::utils::parseOptionalU64Property(context, NumberOfRetries)) { retry_policy_ = std::make_shared(gsl::narrow(*number_of_retries)); } gcp_credentials_ = getCredentials(context); if (!gcp_credentials_) { - throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing GCP Credentials"); + logger_->log_error("Couldnt find valid credentials"); + return MINIFI_STATUS_UNKNOWN_ERROR; } endpoint_url_ = context.getProperty(EndpointOverrideURL) | utils::toOptional(); - if (endpoint_url_) - logger_->log_debug("Endpoint overwritten: {}", *endpoint_url_); + if (endpoint_url_) { logger_->log_debug("Endpoint overwritten: {}", *endpoint_url_); } + return MINIFI_STATUS_SUCCESS; } gcs::Client GCSProcessor::getClient() const { @@ -55,9 +53,7 @@ gcs::Client GCSProcessor::getClient() const { .set(gcp_credentials_) .set(retry_policy_); - if (endpoint_url_) { - options.set(*endpoint_url_); - } + if (endpoint_url_) { options.set(*endpoint_url_); } return gcs::Client(options); } diff --git a/extensions/gcp/processors/GCSProcessor.h b/extensions/gcp/processors/GCSProcessor.h index 1ec2b6641a..697e94a724 100644 --- a/extensions/gcp/processors/GCSProcessor.h +++ b/extensions/gcp/processors/GCSProcessor.h @@ -16,23 +16,21 @@ */ #pragma once -#include #include -#include #include +#include #include "../controllerservices/GCPCredentialsControllerService.h" -#include "minifi-cpp/core/logging/Logger.h" -#include "core/ProcessorImpl.h" -#include "minifi-cpp/core/PropertyDefinition.h" +#include "api/core/ProcessorImpl.h" #include "core/PropertyDefinitionBuilder.h" -#include "minifi-cpp/core/PropertyValidator.h" #include "google/cloud/credentials.h" #include "google/cloud/storage/client.h" #include "google/cloud/storage/retry_policy.h" +#include "minifi-cpp/core/PropertyDefinition.h" +#include "minifi-cpp/core/PropertyValidator.h" namespace org::apache::nifi::minifi::extensions::gcp { -class GCSProcessor : public core::ProcessorImpl { +class GCSProcessor : public api::core::ProcessorImpl { public: using ProcessorImpl::ProcessorImpl; @@ -59,12 +57,11 @@ class GCSProcessor : public core::ProcessorImpl { EndpointOverrideURL }); - - void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; - protected: + MinifiStatus onScheduleImpl(api::core::ProcessContext& context) override; + virtual google::cloud::storage::Client getClient() const; - std::shared_ptr getCredentials(core::ProcessContext& context) const; + static std::shared_ptr getCredentials(const api::core::ProcessContext& context); std::optional endpoint_url_; std::shared_ptr gcp_credentials_; diff --git a/extensions/gcp/processors/ListGCSBucket.cpp b/extensions/gcp/processors/ListGCSBucket.cpp index 13426e4419..ac4cadd40e 100644 --- a/extensions/gcp/processors/ListGCSBucket.cpp +++ b/extensions/gcp/processors/ListGCSBucket.cpp @@ -17,47 +17,44 @@ #include "ListGCSBucket.h" -#include "utils/ProcessorConfigUtils.h" - #include "../GCPAttributes.h" -#include "minifi-cpp/core/FlowFile.h" -#include "minifi-cpp/core/ProcessContext.h" -#include "core/ProcessSession.h" -#include "core/Resource.h" +#include "api/core/ProcessContext.h" +#include "api/core/ProcessSession.h" +#include "api/core/Resource.h" +#include "api/utils/ProcessorConfigUtils.h" +#include "minifi-cpp/core/SpecialFlowAttribute.h" namespace gcs = ::google::cloud::storage; namespace org::apache::nifi::minifi::extensions::gcp { -void ListGCSBucket::initialize() { - setSupportedProperties(Properties); - setSupportedRelationships(Relationships); -} - -void ListGCSBucket::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) { - GCSProcessor::onSchedule(context, session_factory); - bucket_ = utils::parseProperty(context, Bucket); +MinifiStatus ListGCSBucket::onScheduleImpl(api::core::ProcessContext& context) { + const auto status = GCSProcessor::onScheduleImpl(context); + if (status != MinifiStatus::MINIFI_STATUS_SUCCESS) { + return status; + } + bucket_ = api::utils::parseProperty(context, Bucket); + return MinifiStatus::MINIFI_STATUS_SUCCESS; } -void ListGCSBucket::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { +MinifiStatus ListGCSBucket::onTriggerImpl(api::core::ProcessContext& context, api::core::ProcessSession& session) { gsl_Expects(gcp_credentials_); gcs::Client client = getClient(); - auto list_all_versions = utils::parseOptionalBoolProperty(context, ListAllVersions); + auto list_all_versions = api::utils::parseOptionalBoolProperty(context, ListAllVersions); gcs::Versions versions = (list_all_versions && *list_all_versions) ? gcs::Versions(true) : gcs::Versions(false); auto objects_in_bucket = client.ListObjects(bucket_, versions); for (const auto& object_in_bucket : objects_in_bucket) { if (object_in_bucket.ok()) { auto flow_file = session.create(); - flow_file->updateAttribute(core::SpecialFlowAttribute::FILENAME, object_in_bucket->name()); - setAttributesFromObjectMetadata(*flow_file, *object_in_bucket); - session.transfer(flow_file, Success); + session.setAttribute(flow_file, core::SpecialFlowAttribute::FILENAME, object_in_bucket->name()); + setAttributesFromObjectMetadata(flow_file, *object_in_bucket, session); + session.transfer(std::move(flow_file), Success); } else { logger_->log_error("Invalid object in bucket {}", bucket_); } } + return MinifiStatus::MINIFI_STATUS_SUCCESS; } -REGISTER_RESOURCE(ListGCSBucket, Processor); - } // namespace org::apache::nifi::minifi::extensions::gcp diff --git a/extensions/gcp/processors/ListGCSBucket.h b/extensions/gcp/processors/ListGCSBucket.h index 36c4ac1e01..e654a314d7 100644 --- a/extensions/gcp/processors/ListGCSBucket.h +++ b/extensions/gcp/processors/ListGCSBucket.h @@ -17,19 +17,17 @@ #pragma once -#include #include -#include #include "../GCPAttributes.h" #include "GCSProcessor.h" -#include "core/logging/LoggerFactory.h" #include "minifi-cpp/core/OutputAttributeDefinition.h" #include "minifi-cpp/core/PropertyDefinition.h" #include "core/PropertyDefinitionBuilder.h" #include "minifi-cpp/core/PropertyValidator.h" #include "minifi-cpp/core/RelationshipDefinition.h" #include "utils/ArrayUtils.h" +#include "minifi-cpp/core/Annotation.h" namespace org::apache::nifi::minifi::extensions::gcp { @@ -44,7 +42,6 @@ inline constexpr auto FILENAME_OUTPUT_ATTRIBUTE_DESCRIPTION = utils::array_to_st class ListGCSBucket : public GCSProcessor { public: using GCSProcessor::GCSProcessor; - ~ListGCSBucket() override = default; EXTENSIONAPI static constexpr const char* Description = "Retrieves a listing of objects from an GCS bucket. " "For each object that is listed, creates a FlowFile that represents the object so that it can be fetched in conjunction with FetchGCSObject."; @@ -120,11 +117,9 @@ class ListGCSBucket : public GCSProcessor { EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_FORBIDDEN; EXTENSIONAPI static constexpr bool IsSingleThreaded = true; - ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS - - void initialize() override; - void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; - void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; + protected: + MinifiStatus onScheduleImpl(api::core::ProcessContext& context) override; + MinifiStatus onTriggerImpl(api::core::ProcessContext& context, api::core::ProcessSession& session) override; private: std::string bucket_; diff --git a/extensions/gcp/processors/PutGCSObject.cpp b/extensions/gcp/processors/PutGCSObject.cpp index ceea476588..5dd5171df2 100644 --- a/extensions/gcp/processors/PutGCSObject.cpp +++ b/extensions/gcp/processors/PutGCSObject.cpp @@ -19,12 +19,12 @@ #include -#include "core/Resource.h" -#include "minifi-cpp/core/FlowFile.h" -#include "minifi-cpp/core/ProcessContext.h" -#include "core/ProcessSession.h" #include "../GCPAttributes.h" -#include "utils/ProcessorConfigUtils.h" +#include "api/core/ProcessContext.h" +#include "api/core/ProcessSession.h" +#include "api/core/Resource.h" +#include "api/utils/ProcessorConfigUtils.h" +#include "minifi-cpp/io/InputStream.h" namespace gcs = ::google::cloud::storage; @@ -102,80 +102,79 @@ class UploadToGCSCallback { } // namespace -void PutGCSObject::initialize() { - setSupportedProperties(Properties); - setSupportedRelationships(Relationships); -} - +MinifiStatus PutGCSObject::onScheduleImpl(api::core::ProcessContext& context) { + const auto status = GCSProcessor::onScheduleImpl(context); + if (status != MinifiStatus::MINIFI_STATUS_SUCCESS) { + return status; + } -void PutGCSObject::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) { - GCSProcessor::onSchedule(context, session_factory); if (auto encryption_key = context.getProperty(EncryptionKey)) { try { encryption_key_ = gcs::EncryptionKey::FromBase64Key(*encryption_key); } catch (const google::cloud::RuntimeStatusError&) { - throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Could not decode the base64-encoded encryption key from property " + std::string(EncryptionKey.name)); + logger_->log_error("Could not decode the base64-encoded encryption key from property {}", std::string(EncryptionKey.name)); + return MINIFI_STATUS_UNKNOWN_ERROR; } } + return MINIFI_STATUS_SUCCESS; } -void PutGCSObject::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { +MinifiStatus PutGCSObject::onTriggerImpl(api::core::ProcessContext& context, api::core::ProcessSession& session) { gsl_Expects(gcp_credentials_); auto flow_file = session.get(); if (!flow_file) { - context.yield(); - return; + return MINIFI_STATUS_PROCESSOR_YIELD; } - auto bucket = context.getProperty(Bucket, flow_file.get()); + auto bucket = api::utils::parseOptionalProperty(context, Bucket, &flow_file); + if (!bucket || bucket->empty()) { logger_->log_error("Missing bucket name"); - session.transfer(flow_file, Failure); - return; + session.transfer(std::move(flow_file), Failure); + return MINIFI_STATUS_SUCCESS; } - auto object_name = context.getProperty(Key, flow_file.get()); + auto object_name = api::utils::parseOptionalProperty(context, Key, &flow_file); if (!object_name || object_name->empty()) { logger_->log_error("Missing object name"); - session.transfer(flow_file, Failure); - return; + session.transfer(std::move(flow_file), Failure); + return MINIFI_STATUS_SUCCESS; } gcs::Client client = getClient(); UploadToGCSCallback callback(client, *bucket, *object_name); - if (auto crc32_checksum = context.getProperty(Crc32cChecksum, flow_file.get())) { + if (auto crc32_checksum = api::utils::parseOptionalProperty(context, Crc32cChecksum, &flow_file)) { callback.setCrc32CChecksumValue(*crc32_checksum); } - if (auto md5_hash = context.getProperty(MD5Hash, flow_file.get())) { + if (auto md5_hash = api::utils::parseOptionalProperty(context, MD5Hash, &flow_file)) { callback.setHashValue(*md5_hash); } - auto content_type = context.getProperty(ContentType, flow_file.get()); + auto content_type = api::utils::parseOptionalProperty(context, ContentType, &flow_file); if (content_type && !content_type->empty()) callback.setContentType(*content_type); - if (auto predefined_acl = utils::parseOptionalEnumProperty(context, ObjectACL)) + if (auto predefined_acl = api::utils::parseOptionalEnumProperty(context, ObjectACL)) callback.setPredefinedAcl(*predefined_acl); - callback.setIfGenerationMatch(utils::parseOptionalBoolProperty(context, OverwriteObject)); + callback.setIfGenerationMatch(api::utils::parseOptionalBoolProperty(context, OverwriteObject)); callback.setEncryptionKey(encryption_key_); session.read(flow_file, std::ref(callback)); auto& result = callback.getResult(); if (!result.ok()) { - flow_file->setAttribute(GCS_STATUS_MESSAGE, result.status().message()); - flow_file->setAttribute(GCS_ERROR_REASON, result.status().error_info().reason()); - flow_file->setAttribute(GCS_ERROR_DOMAIN, result.status().error_info().domain()); + session.setAttribute(flow_file, GCS_STATUS_MESSAGE, result.status().message()); + session.setAttribute(flow_file, GCS_ERROR_REASON, result.status().error_info().reason()); + session.setAttribute(flow_file, GCS_ERROR_DOMAIN, result.status().error_info().domain()); logger_->log_error("Failed to upload to Google Cloud Storage {} {}", result.status().message(), result.status().error_info().reason()); - session.transfer(flow_file, Failure); + session.transfer(std::move(flow_file), Failure); } else { - setAttributesFromObjectMetadata(*flow_file, *result); - session.transfer(flow_file, Success); + setAttributesFromObjectMetadata(flow_file, *result, session); + session.transfer(std::move(flow_file), Success); } + return MINIFI_STATUS_SUCCESS; } -REGISTER_RESOURCE(PutGCSObject, Processor); - } // namespace org::apache::nifi::minifi::extensions::gcp diff --git a/extensions/gcp/processors/PutGCSObject.h b/extensions/gcp/processors/PutGCSObject.h index 58d7675fa5..73d8c82183 100644 --- a/extensions/gcp/processors/PutGCSObject.h +++ b/extensions/gcp/processors/PutGCSObject.h @@ -19,17 +19,16 @@ #include #include -#include #include "../GCPAttributes.h" #include "GCSProcessor.h" #include "minifi-cpp/core/PropertyDefinition.h" #include "minifi-cpp/core/PropertyValidator.h" #include "minifi-cpp/core/RelationshipDefinition.h" -#include "core/logging/LoggerFactory.h" #include "utils/ArrayUtils.h" #include "utils/Enum.h" #include "google/cloud/storage/well_known_headers.h" +#include "minifi-cpp/core/Annotation.h" namespace org::apache::nifi::minifi::extensions::gcp::put_gcs_object { enum class PredefinedAcl { @@ -73,7 +72,6 @@ namespace org::apache::nifi::minifi::extensions::gcp { class PutGCSObject : public GCSProcessor { public: using GCSProcessor::GCSProcessor; - ~PutGCSObject() override = default; EXTENSIONAPI static constexpr const char* Description = "Puts flow files to a Google Cloud Storage Bucket."; @@ -191,11 +189,9 @@ class PutGCSObject : public GCSProcessor { EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED; EXTENSIONAPI static constexpr bool IsSingleThreaded = false; - ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS - - void initialize() override; - void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; - void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; + protected: + MinifiStatus onScheduleImpl(api::core::ProcessContext& context) override; + MinifiStatus onTriggerImpl(api::core::ProcessContext& context, api::core::ProcessSession& session) override; private: google::cloud::storage::EncryptionKey encryption_key_; diff --git a/extensions/gcp/tests/CMakeLists.txt b/extensions/gcp/tests/CMakeLists.txt index 0acc5241bf..89fe308e95 100644 --- a/extensions/gcp/tests/CMakeLists.txt +++ b/extensions/gcp/tests/CMakeLists.txt @@ -36,8 +36,8 @@ FOREACH(testfile ${GCS_TESTS}) createTests("${testfilename}") target_link_libraries(${testfilename} minifi-gcp) - target_link_libraries(${testfilename} minifi-standard-processors) target_link_libraries(${testfilename} gtest_main gmock) + target_link_libraries(${testfilename} libminifi-c-unittest) gtest_add_tests(TARGET "${testfilename}") ENDFOREACH() diff --git a/extensions/gcp/tests/DeleteGCSObjectTests.cpp b/extensions/gcp/tests/DeleteGCSObjectTests.cpp index bde7e010ff..ba8694e335 100644 --- a/extensions/gcp/tests/DeleteGCSObjectTests.cpp +++ b/extensions/gcp/tests/DeleteGCSObjectTests.cpp @@ -14,60 +14,65 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "../processors/DeleteGCSObject.h" #include "../controllerservices/GCPCredentialsControllerService.h" +#include "../processors/DeleteGCSObject.h" +#include "CProcessorTestUtils.h" #include "GCPAttributes.h" #include "core/Resource.h" -#include "unit/SingleProcessorTestController.h" -#include "google/cloud/storage/testing/mock_client.h" #include "google/cloud/storage/internal/object_metadata_parser.h" #include "google/cloud/storage/testing/canonical_errors.h" +#include "google/cloud/storage/testing/mock_client.h" #include "unit/ProcessorUtils.h" +#include "unit/SingleProcessorTestController.h" namespace gcs = ::google::cloud::storage; -namespace minifi_gcp = org::apache::nifi::minifi::extensions::gcp; +namespace minifi_gcp = minifi::extensions::gcp; -using DeleteGCSObject = org::apache::nifi::minifi::extensions::gcp::DeleteGCSObject; -using GCPCredentialsControllerService = org::apache::nifi::minifi::extensions::gcp::GCPCredentialsControllerService; +using DeleteGCSObject = minifi::extensions::gcp::DeleteGCSObject; +using GCPCredentialsControllerService = minifi::extensions::gcp::GCPCredentialsControllerService; using DeleteObjectRequest = gcs::internal::DeleteObjectRequest; -using ::google::cloud::storage::testing::canonical_errors::TransientError; using ::google::cloud::storage::testing::canonical_errors::PermanentError; +using ::google::cloud::storage::testing::canonical_errors::TransientError; namespace { class DeleteGCSObjectMocked : public DeleteGCSObject { - using org::apache::nifi::minifi::extensions::gcp::DeleteGCSObject::DeleteGCSObject; + using DeleteGCSObject::DeleteGCSObject; + public: - static constexpr const char* Description = "DeleteGCSObjectMocked"; + DeleteGCSObjectMocked(minifi::core::ProcessorMetadata metadata, std::shared_ptr mock_client) + : DeleteGCSObject(std::move(metadata)), + mock_client_(std::move(mock_client)) {} - gcs::Client getClient() const override { - return gcs::testing::UndecoratedClientFromMock(mock_client_); - } - std::shared_ptr mock_client_ = std::make_shared(); + protected: + gcs::Client getClient() const override { return gcs::testing::UndecoratedClientFromMock(mock_client_); } + std::shared_ptr mock_client_; }; -REGISTER_RESOURCE(DeleteGCSObjectMocked, Processor); } // namespace class DeleteGCSObjectTests : public ::testing::Test { - public: + protected: void SetUp() override { - delete_gcs_object_ = test_controller_.getProcessor(); - gcp_credentials_node_ = test_controller_.plan->addController("GCPCredentialsControllerService", "gcp_credentials_controller_service"); - test_controller_.plan->setProperty(gcp_credentials_node_, - GCPCredentialsControllerService::CredentialsLoc, - magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_ANONYMOUS_CREDENTIALS)); - test_controller_.plan->setProperty(delete_gcs_object_, - DeleteGCSObject::GCPCredentials, - "gcp_credentials_controller_service"); + const auto gcp_credential_controller_service = + minifi::test::utils::make_custom_c_controller_service(core::ControllerServiceMetadata{utils::Identifier{}, + "GCPCredentialsControllerService", + logging::LoggerFactory::getLogger()}); + gcp_credentials_node_ = test_controller_.plan->addController("gcp_credentials_controller_service", gcp_credential_controller_service); + test_controller_.getProcessor()->setProperty(GCPCredentialsControllerService::CredentialsLoc.name, + std::string(magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_ANONYMOUS_CREDENTIALS))); + test_controller_.getProcessor()->setProperty(DeleteGCSObject::GCPCredentials.name, "gcp_credentials_controller_service"); } - org::apache::nifi::minifi::test::SingleProcessorTestController test_controller_{minifi::test::utils::make_processor("DeleteGCSObjectMocked")}; - std::shared_ptr gcp_credentials_node_; - TypedProcessorWrapper delete_gcs_object_ = nullptr; + public: + std::shared_ptr mock_client_ = std::make_shared(); + minifi::test::SingleProcessorTestController test_controller_{minifi::test::utils::make_custom_c_processor( + core::ProcessorMetadata{utils::Identifier{}, "DeleteGCSObjectMocked", logging::LoggerFactory::getLogger()}, + mock_client_)}; + std::shared_ptr gcp_credentials_node_; }; TEST_F(DeleteGCSObjectTests, MissingBucket) { - EXPECT_CALL(*delete_gcs_object_.get().mock_client_, CreateResumableUpload).Times(0); - EXPECT_TRUE(test_controller_.plan->setProperty(delete_gcs_object_, DeleteGCSObject::Bucket, "")); + EXPECT_CALL(*mock_client_, CreateResumableUpload).Times(0); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(DeleteGCSObject::Bucket.name, "")); const auto& result = test_controller_.trigger("hello world"); EXPECT_EQ(0, result.at(DeleteGCSObject::Success).size()); ASSERT_EQ(1, result.at(DeleteGCSObject::Failure).size()); @@ -77,9 +82,8 @@ TEST_F(DeleteGCSObjectTests, MissingBucket) { } TEST_F(DeleteGCSObjectTests, ServerGivesPermaError) { - EXPECT_CALL(*delete_gcs_object_.get().mock_client_, DeleteObject) - .WillOnce(testing::Return(PermanentError())); - EXPECT_TRUE(test_controller_.plan->setProperty(delete_gcs_object_, DeleteGCSObject::Bucket, "bucket-from-property")); + EXPECT_CALL(*mock_client_, DeleteObject).WillOnce(testing::Return(PermanentError())); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(DeleteGCSObject::Bucket.name, "bucket-from-property")); const auto& result = test_controller_.trigger("hello world"); EXPECT_EQ(0, result.at(DeleteGCSObject::Success).size()); ASSERT_EQ(1, result.at(DeleteGCSObject::Failure).size()); @@ -89,9 +93,9 @@ TEST_F(DeleteGCSObjectTests, ServerGivesPermaError) { } TEST_F(DeleteGCSObjectTests, ServerGivesTransientErrors) { - EXPECT_CALL(*delete_gcs_object_.get().mock_client_, DeleteObject).WillOnce(testing::Return(TransientError())); - EXPECT_TRUE(test_controller_.plan->setProperty(delete_gcs_object_, DeleteGCSObject::NumberOfRetries, "1")); - EXPECT_TRUE(test_controller_.plan->setProperty(delete_gcs_object_, DeleteGCSObject::Bucket, "bucket-from-property")); + EXPECT_CALL(*mock_client_, DeleteObject).WillOnce(testing::Return(TransientError())); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(DeleteGCSObject::NumberOfRetries.name, "1")); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(DeleteGCSObject::Bucket.name, "bucket-from-property")); const auto& result = test_controller_.trigger("hello world", {{std::string(minifi_gcp::GCS_BUCKET_ATTR), "bucket-from-attribute"}}); EXPECT_EQ(0, result.at(DeleteGCSObject::Success).size()); ASSERT_EQ(1, result.at(DeleteGCSObject::Failure).size()); @@ -100,31 +104,29 @@ TEST_F(DeleteGCSObjectTests, ServerGivesTransientErrors) { EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(DeleteGCSObject::Failure)[0])); } - TEST_F(DeleteGCSObjectTests, HandlingSuccessfullDeletion) { - EXPECT_CALL(*delete_gcs_object_.get().mock_client_, DeleteObject) - .WillOnce([](DeleteObjectRequest const& request) { - EXPECT_EQ("bucket-from-attribute", request.bucket_name()); - EXPECT_TRUE(request.HasOption()); - EXPECT_TRUE(request.GetOption().has_value()); - EXPECT_EQ(23, request.GetOption().value()); - return google::cloud::make_status_or(gcs::internal::EmptyResponse{}); - }); - EXPECT_TRUE(test_controller_.plan->setProperty(delete_gcs_object_, DeleteGCSObject::ObjectGeneration, "${gcs.generation}")); - const auto& result = test_controller_.trigger("hello world", {{std::string(minifi_gcp::GCS_BUCKET_ATTR), "bucket-from-attribute"}, {std::string(minifi_gcp::GCS_GENERATION), "23"}}); + EXPECT_CALL(*mock_client_, DeleteObject).WillOnce([](DeleteObjectRequest const& request) { + EXPECT_EQ("bucket-from-attribute", request.bucket_name()); + EXPECT_TRUE(request.HasOption()); + EXPECT_TRUE(request.GetOption().has_value()); + EXPECT_EQ(23, request.GetOption().value()); + return google::cloud::make_status_or(gcs::internal::EmptyResponse{}); + }); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(DeleteGCSObject::ObjectGeneration.name, "${gcs.generation}")); + const auto& result = test_controller_.trigger("hello world", + {{std::string(minifi_gcp::GCS_BUCKET_ATTR), "bucket-from-attribute"}, {std::string(minifi_gcp::GCS_GENERATION), "23"}}); ASSERT_EQ(1, result.at(DeleteGCSObject::Success).size()); EXPECT_EQ(0, result.at(DeleteGCSObject::Failure).size()); EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(DeleteGCSObject::Success)[0])); } TEST_F(DeleteGCSObjectTests, EmptyGeneration) { - EXPECT_CALL(*delete_gcs_object_.get().mock_client_, DeleteObject) - .WillOnce([](DeleteObjectRequest const& request) { - EXPECT_EQ("bucket-from-attribute", request.bucket_name()); - EXPECT_FALSE(request.HasOption()); - return google::cloud::make_status_or(gcs::internal::EmptyResponse{}); - }); - EXPECT_TRUE(test_controller_.plan->setProperty(delete_gcs_object_, DeleteGCSObject::ObjectGeneration, "${gcs.generation}")); + EXPECT_CALL(*mock_client_, DeleteObject).WillOnce([](DeleteObjectRequest const& request) { + EXPECT_EQ("bucket-from-attribute", request.bucket_name()); + EXPECT_FALSE(request.HasOption()); + return google::cloud::make_status_or(gcs::internal::EmptyResponse{}); + }); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(DeleteGCSObject::ObjectGeneration.name, "${gcs.generation}")); const auto& result = test_controller_.trigger("hello world", {{std::string(minifi_gcp::GCS_BUCKET_ATTR), "bucket-from-attribute"}}); ASSERT_EQ(1, result.at(DeleteGCSObject::Success).size()); EXPECT_EQ(0, result.at(DeleteGCSObject::Failure).size()); @@ -132,8 +134,9 @@ TEST_F(DeleteGCSObjectTests, EmptyGeneration) { } TEST_F(DeleteGCSObjectTests, InvalidGeneration) { - EXPECT_TRUE(test_controller_.plan->setProperty(delete_gcs_object_, DeleteGCSObject::ObjectGeneration, "${gcs.generation}")); - const auto& result = test_controller_.trigger("hello world", {{std::string(minifi_gcp::GCS_BUCKET_ATTR), "bucket-from-attribute"}, {std::string(minifi_gcp::GCS_GENERATION), "23 banana"}}); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(DeleteGCSObject::ObjectGeneration.name, "${gcs.generation}")); + const auto& result = test_controller_.trigger("hello world", + {{std::string(minifi_gcp::GCS_BUCKET_ATTR), "bucket-from-attribute"}, {std::string(minifi_gcp::GCS_GENERATION), "23 banana"}}); ASSERT_EQ(0, result.at(DeleteGCSObject::Success).size()); EXPECT_EQ(1, result.at(DeleteGCSObject::Failure).size()); EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(DeleteGCSObject::Failure)[0])); diff --git a/extensions/gcp/tests/FetchGCSObjectTests.cpp b/extensions/gcp/tests/FetchGCSObjectTests.cpp index ea87c7e63d..23ea105fbf 100644 --- a/extensions/gcp/tests/FetchGCSObjectTests.cpp +++ b/extensions/gcp/tests/FetchGCSObjectTests.cpp @@ -14,56 +14,63 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "../processors/FetchGCSObject.h" +#include + #include "../controllerservices/GCPCredentialsControllerService.h" +#include "../processors/FetchGCSObject.h" +#include "CProcessorTestUtils.h" #include "GCPAttributes.h" #include "core/Resource.h" -#include "unit/SingleProcessorTestController.h" -#include "google/cloud/storage/testing/mock_client.h" #include "google/cloud/storage/internal/object_metadata_parser.h" #include "google/cloud/storage/testing/canonical_errors.h" +#include "google/cloud/storage/testing/mock_client.h" #include "unit/ProcessorUtils.h" +#include "unit/SingleProcessorTestController.h" namespace gcs = ::google::cloud::storage; -namespace minifi_gcp = org::apache::nifi::minifi::extensions::gcp; +namespace minifi_gcp = minifi::extensions::gcp; -using FetchGCSObject = org::apache::nifi::minifi::extensions::gcp::FetchGCSObject; -using GCPCredentialsControllerService = org::apache::nifi::minifi::extensions::gcp::GCPCredentialsControllerService; +using FetchGCSObject = minifi::extensions::gcp::FetchGCSObject; +using GCPCredentialsControllerService = minifi::extensions::gcp::GCPCredentialsControllerService; namespace { class FetchGCSObjectMocked : public FetchGCSObject { - using org::apache::nifi::minifi::extensions::gcp::FetchGCSObject::FetchGCSObject; + using FetchGCSObject::FetchGCSObject; + public: - static constexpr const char* Description = "FetchGCSObjectMocked"; + FetchGCSObjectMocked(minifi::core::ProcessorMetadata metadata, std::shared_ptr mock_client) + : FetchGCSObject(std::move(metadata)), + mock_client_(std::move(mock_client)) {} - gcs::Client getClient() const override { - return gcs::testing::UndecoratedClientFromMock(mock_client_); - } - std::shared_ptr mock_client_ = std::make_shared(); + protected: + gcs::Client getClient() const override { return gcs::testing::UndecoratedClientFromMock(mock_client_); } + std::shared_ptr mock_client_; }; -REGISTER_RESOURCE(FetchGCSObjectMocked, Processor); } // namespace class FetchGCSObjectTests : public ::testing::Test { - public: + protected: void SetUp() override { - fetch_gcs_object_ = test_controller_.getProcessor(); - gcp_credentials_node_ = test_controller_.plan->addController("GCPCredentialsControllerService", "gcp_credentials_controller_service"); - test_controller_.plan->setProperty(gcp_credentials_node_, - GCPCredentialsControllerService::CredentialsLoc, - magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_ANONYMOUS_CREDENTIALS)); - test_controller_.plan->setProperty(fetch_gcs_object_, - FetchGCSObject::GCPCredentials, - "gcp_credentials_controller_service"); + const auto gcp_credential_controller_service = + minifi::test::utils::make_custom_c_controller_service(core::ControllerServiceMetadata{utils::Identifier{}, + "GCPCredentialsControllerService", + logging::LoggerFactory::getLogger()}); + gcp_credentials_node_ = test_controller_.plan->addController("gcp_credentials_controller_service", gcp_credential_controller_service); + test_controller_.getProcessor()->setProperty(GCPCredentialsControllerService::CredentialsLoc.name, + std::string(magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_ANONYMOUS_CREDENTIALS))); + test_controller_.getProcessor()->setProperty(FetchGCSObjectMocked::GCPCredentials.name, "gcp_credentials_controller_service"); } - TypedProcessorWrapper fetch_gcs_object_; - org::apache::nifi::minifi::test::SingleProcessorTestController test_controller_{minifi::test::utils::make_processor("FetchGCSObjectMocked")}; - std::shared_ptr gcp_credentials_node_; + + public: + std::shared_ptr mock_client_ = std::make_shared(); + minifi::test::SingleProcessorTestController test_controller_{minifi::test::utils::make_custom_c_processor( + core::ProcessorMetadata{utils::Identifier{}, "FetchGCSObjectMocked", logging::LoggerFactory::getLogger()}, mock_client_)}; + std::shared_ptr gcp_credentials_node_; }; TEST_F(FetchGCSObjectTests, MissingBucket) { - EXPECT_CALL(*fetch_gcs_object_.get().mock_client_, CreateResumableUpload).Times(0); - EXPECT_TRUE(test_controller_.plan->setProperty(fetch_gcs_object_, FetchGCSObject::Bucket, "")); + EXPECT_CALL(*mock_client_, CreateResumableUpload).Times(0); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(FetchGCSObject::Bucket.name, "")); const auto& result = test_controller_.trigger("hello world"); EXPECT_EQ(0, result.at(FetchGCSObject::Success).size()); ASSERT_EQ(1, result.at(FetchGCSObject::Failure).size()); @@ -73,22 +80,18 @@ TEST_F(FetchGCSObjectTests, MissingBucket) { } TEST_F(FetchGCSObjectTests, ServerError) { - EXPECT_CALL(*fetch_gcs_object_.get().mock_client_, ReadObject) - .WillOnce([](gcs::internal::ReadObjectRangeRequest const& request) { - EXPECT_EQ(request.bucket_name(), "bucket-from-property") << request; - auto mock_source = std::make_unique(); - ::testing::InSequence seq; - EXPECT_CALL(*mock_source, IsOpen).WillRepeatedly(testing::Return(true)); - EXPECT_CALL(*mock_source, Read) - .WillOnce(testing::Return(google::cloud::Status( - google::cloud::StatusCode::kInvalidArgument, - "Invalid Argument"))); - EXPECT_CALL(*mock_source, IsOpen).WillRepeatedly(testing::Return(false)); - - std::unique_ptr object_read_source = std::move(mock_source); - return google::cloud::make_status_or(std::move(object_read_source)); - }); - EXPECT_TRUE(test_controller_.plan->setProperty(fetch_gcs_object_, FetchGCSObject::Bucket, "bucket-from-property")); + EXPECT_CALL(*mock_client_, ReadObject).WillOnce([](gcs::internal::ReadObjectRangeRequest const& request) { + EXPECT_EQ(request.bucket_name(), "bucket-from-property") << request; + auto mock_source = std::make_unique(); + ::testing::InSequence seq; + EXPECT_CALL(*mock_source, IsOpen).WillRepeatedly(testing::Return(true)); + EXPECT_CALL(*mock_source, Read).WillOnce(testing::Return(google::cloud::Status(google::cloud::StatusCode::kInvalidArgument, "Invalid Argument"))); + EXPECT_CALL(*mock_source, IsOpen).WillRepeatedly(testing::Return(false)); + + std::unique_ptr object_read_source = std::move(mock_source); + return google::cloud::make_status_or(std::move(object_read_source)); + }); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(FetchGCSObject::Bucket.name, "bucket-from-property")); const auto& result = test_controller_.trigger("hello world", {{std::string(minifi_gcp::GCS_BUCKET_ATTR), "bucket-from-attribute"}}); EXPECT_EQ(0, result.at(FetchGCSObject::Success).size()); ASSERT_EQ(1, result.at(FetchGCSObject::Failure).size()); @@ -97,65 +100,58 @@ TEST_F(FetchGCSObjectTests, ServerError) { } TEST_F(FetchGCSObjectTests, HappyPath) { - std::string const text = "stored text"; + constexpr std::string_view text = "stored text"; std::size_t offset = 0; // Simulate a Read() call in the MockObjectReadSource object created below auto simulate_read = [&text, &offset](void* buf, std::size_t n) { auto const l = (std::min)(n, text.size() - offset); std::memcpy(buf, text.data() + offset, l); offset += l; - return gcs::internal::ReadSourceResult{ - l, gcs::internal::HttpResponse{200, {}, {}}}; + return gcs::internal::ReadSourceResult{l, gcs::internal::HttpResponse{200, {}, {}}}; }; - EXPECT_CALL(*fetch_gcs_object_.get().mock_client_, ReadObject) - .WillOnce([&](gcs::internal::ReadObjectRangeRequest const& request) { - EXPECT_EQ(request.bucket_name(), "bucket-from-attribute") << request; - EXPECT_TRUE(request.HasOption()); - EXPECT_TRUE(request.GetOption().has_value()); - EXPECT_EQ(23, request.GetOption().value()); - std::unique_ptr mock_source(new gcs::testing::MockObjectReadSource); - ::testing::InSequence seq; - EXPECT_CALL(*mock_source, IsOpen()).WillRepeatedly(testing::Return(true)); - EXPECT_CALL(*mock_source, Read).WillOnce(simulate_read); - EXPECT_CALL(*mock_source, IsOpen()).WillRepeatedly(testing::Return(false)); - - return google::cloud::make_status_or( - std::unique_ptr( - std::move(mock_source))); - }); - EXPECT_TRUE(test_controller_.plan->setProperty(fetch_gcs_object_, FetchGCSObject::ObjectGeneration, "${gcs.generation}")); - const auto& result = test_controller_.trigger("hello world", {{std::string(minifi_gcp::GCS_BUCKET_ATTR), "bucket-from-attribute"}, {std::string(minifi_gcp::GCS_GENERATION), "23"}}); + EXPECT_CALL(*mock_client_, ReadObject).WillOnce([&](gcs::internal::ReadObjectRangeRequest const& request) { + EXPECT_EQ(request.bucket_name(), "bucket-from-attribute") << request; + EXPECT_TRUE(request.HasOption()); + EXPECT_TRUE(request.GetOption().has_value()); + EXPECT_EQ(23, request.GetOption().value()); + auto mock_source = std::make_unique(); + ::testing::InSequence seq; + EXPECT_CALL(*mock_source, IsOpen()).WillRepeatedly(testing::Return(true)); + EXPECT_CALL(*mock_source, Read).WillOnce(simulate_read); + EXPECT_CALL(*mock_source, IsOpen()).WillRepeatedly(testing::Return(false)); + + return google::cloud::make_status_or(std::unique_ptr(std::move(mock_source))); + }); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(FetchGCSObject::ObjectGeneration.name, "${gcs.generation}")); + const auto& result = test_controller_.trigger("hello world", + {{std::string(minifi_gcp::GCS_BUCKET_ATTR), "bucket-from-attribute"}, {std::string(minifi_gcp::GCS_GENERATION), "23"}}); ASSERT_EQ(1, result.at(FetchGCSObject::Success).size()); EXPECT_EQ(0, result.at(FetchGCSObject::Failure).size()); EXPECT_EQ("stored text", test_controller_.plan->getContent(result.at(FetchGCSObject::Success)[0])); } TEST_F(FetchGCSObjectTests, EmptyGeneration) { - std::string const text = "stored text"; + constexpr std::string_view text = "stored text"; std::size_t offset = 0; // Simulate a Read() call in the MockObjectReadSource object created below auto simulate_read = [&text, &offset](void* buf, std::size_t n) { auto const l = (std::min)(n, text.size() - offset); std::memcpy(buf, text.data() + offset, l); offset += l; - return gcs::internal::ReadSourceResult{ - l, gcs::internal::HttpResponse{200, {}, {}}}; + return gcs::internal::ReadSourceResult{l, gcs::internal::HttpResponse{200, {}, {}}}; }; - EXPECT_CALL(*fetch_gcs_object_.get().mock_client_, ReadObject) - .WillOnce([&](gcs::internal::ReadObjectRangeRequest const& request) { - EXPECT_EQ(request.bucket_name(), "bucket-from-attribute") << request; - EXPECT_FALSE(request.HasOption()); - std::unique_ptr mock_source(new gcs::testing::MockObjectReadSource); - ::testing::InSequence seq; - EXPECT_CALL(*mock_source, IsOpen()).WillRepeatedly(testing::Return(true)); - EXPECT_CALL(*mock_source, Read).WillOnce(simulate_read); - EXPECT_CALL(*mock_source, IsOpen()).WillRepeatedly(testing::Return(false)); - - return google::cloud::make_status_or( - std::unique_ptr( - std::move(mock_source))); - }); - EXPECT_TRUE(test_controller_.plan->setProperty(fetch_gcs_object_, FetchGCSObject::ObjectGeneration, "${gcs.generation}")); + EXPECT_CALL(*mock_client_, ReadObject).WillOnce([&](gcs::internal::ReadObjectRangeRequest const& request) { + EXPECT_EQ(request.bucket_name(), "bucket-from-attribute") << request; + EXPECT_FALSE(request.HasOption()); + auto mock_source = std::make_unique(); + ::testing::InSequence seq; + EXPECT_CALL(*mock_source, IsOpen()).WillRepeatedly(testing::Return(true)); + EXPECT_CALL(*mock_source, Read).WillOnce(simulate_read); + EXPECT_CALL(*mock_source, IsOpen()).WillRepeatedly(testing::Return(false)); + + return google::cloud::make_status_or(std::unique_ptr(std::move(mock_source))); + }); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(FetchGCSObject::ObjectGeneration.name, "${gcs.generation}")); const auto& result = test_controller_.trigger("hello world", {{std::string(minifi_gcp::GCS_BUCKET_ATTR), "bucket-from-attribute"}}); ASSERT_EQ(1, result.at(FetchGCSObject::Success).size()); EXPECT_EQ(0, result.at(FetchGCSObject::Failure).size()); @@ -163,8 +159,9 @@ TEST_F(FetchGCSObjectTests, EmptyGeneration) { } TEST_F(FetchGCSObjectTests, InvalidGeneration) { - EXPECT_TRUE(test_controller_.plan->setProperty(fetch_gcs_object_, FetchGCSObject::ObjectGeneration, "${gcs.generation}")); - const auto& result = test_controller_.trigger("hello world", {{std::string(minifi_gcp::GCS_BUCKET_ATTR), "bucket-from-attribute"}, {std::string(minifi_gcp::GCS_GENERATION), "23 banana"}}); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(FetchGCSObject::ObjectGeneration.name, "${gcs.generation}")); + const auto& result = test_controller_.trigger("hello world", + {{std::string(minifi_gcp::GCS_BUCKET_ATTR), "bucket-from-attribute"}, {std::string(minifi_gcp::GCS_GENERATION), "23 banana"}}); ASSERT_EQ(0, result.at(FetchGCSObject::Success).size()); EXPECT_EQ(1, result.at(FetchGCSObject::Failure).size()); EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(FetchGCSObject::Failure)[0])); diff --git a/extensions/gcp/tests/GCPCredentialsControllerServiceTests.cpp b/extensions/gcp/tests/GCPCredentialsControllerServiceTests.cpp index 0334cb4d95..b7ac6a147e 100644 --- a/extensions/gcp/tests/GCPCredentialsControllerServiceTests.cpp +++ b/extensions/gcp/tests/GCPCredentialsControllerServiceTests.cpp @@ -16,38 +16,41 @@ */ #define EXTENSION_LIST "minifi-gcp" // NOLINT(cppcoreguidelines-macro-usage) -#include "unit/TestBase.h" -#include "gtest/gtest.h" #include "../controllerservices/GCPCredentialsControllerService.h" -#include "core/Resource.h" +#include "CProcessorTestUtils.h" #include "core/Processor.h" +#include "core/Resource.h" #include "core/controller/ControllerServiceNode.h" +#include "gtest/gtest.h" #include "rapidjson/document.h" #include "rapidjson/stream.h" #include "rapidjson/writer.h" #include "unit/DummyProcessor.h" +#include "unit/TestBase.h" #include "utils/Environment.h" -namespace minifi_gcp = org::apache::nifi::minifi::extensions::gcp; +namespace minifi_gcp = minifi::extensions::gcp; using GCPCredentialsControllerService = minifi_gcp::GCPCredentialsControllerService; namespace { std::string create_mock_service_json() { - rapidjson::Document root = rapidjson::Document(rapidjson::kObjectType); + auto root = rapidjson::Document(rapidjson::kObjectType); root.AddMember("type", "service_account", root.GetAllocator()); root.AddMember("project_id", "mock_project_id", root.GetAllocator()); root.AddMember("private_key_id", "my_private_key_id", root.GetAllocator()); - root.AddMember("private_key", "-----BEGIN PRIVATE KEY-----\n" - "MIIBVAIBADANBgkqhkiG9w0BAQEFAASCAT4wggE6AgEAAkEAo2Eyw6KfcYOSD0D1\n" - "7cw3+M/Qkv5xXwaxxHlAZk+Bscjkm2S37iQwm87mLnhyr7nnAUXZTHsR6SDrBhj7\n" - "9xvM1QIDAQABAkB1RTJL7HGn5/myCz27J4fRh1E+AXbc75Av55yLE2yTb+qwfX3m\n" - "eAw0dZAIRQ8ZuXw7su71bW2YyB43RwXOnGWtAiEA0zo0bu6h8LPAK9y65zw8KNuF\n" - "A+Rif5+7K12uv1XgCWsCIQDGAqSH6JToI7yHOup47XM1CKMnjBDe67ExJPuDH3HS\n" - "vwIgAI+RABJmH6t6gSNO47pHNpyOl9oNYOVdq9nN0vg5Zg0CIQDEDjXOg9F8kHXJ\n" - "B+LFXYamyiiRrbO+pWvKly2ZRPc0jQIgfZyH0JGjJKZTLog14owyAA+JUkHTh7Em\n" - "8o9ev8MeLoM=\n" - "-----END PRIVATE KEY-----", root.GetAllocator()); + root.AddMember("private_key", + "-----BEGIN PRIVATE KEY-----\n" + "MIIBVAIBADANBgkqhkiG9w0BAQEFAASCAT4wggE6AgEAAkEAo2Eyw6KfcYOSD0D1\n" + "7cw3+M/Qkv5xXwaxxHlAZk+Bscjkm2S37iQwm87mLnhyr7nnAUXZTHsR6SDrBhj7\n" + "9xvM1QIDAQABAkB1RTJL7HGn5/myCz27J4fRh1E+AXbc75Av55yLE2yTb+qwfX3m\n" + "eAw0dZAIRQ8ZuXw7su71bW2YyB43RwXOnGWtAiEA0zo0bu6h8LPAK9y65zw8KNuF\n" + "A+Rif5+7K12uv1XgCWsCIQDGAqSH6JToI7yHOup47XM1CKMnjBDe67ExJPuDH3HS\n" + "vwIgAI+RABJmH6t6gSNO47pHNpyOl9oNYOVdq9nN0vg5Zg0CIQDEDjXOg9F8kHXJ\n" + "B+LFXYamyiiRrbO+pWvKly2ZRPc0jQIgfZyH0JGjJKZTLog14owyAA+JUkHTh7Em\n" + "8o9ev8MeLoM=\n" + "-----END PRIVATE KEY-----", + root.GetAllocator()); root.AddMember("client_email", "my_client_email", root.GetAllocator()); root.AddMember("client_id", "my_client_id", root.GetAllocator()); rapidjson::StringBuffer buffer; @@ -59,8 +62,7 @@ std::string create_mock_service_json() { std::optional create_mock_json_file(const std::filesystem::path& dir_path) { std::filesystem::path path = dir_path / "mock_credentials.json"; std::ofstream p{path}; - if (!p) - return std::nullopt; + if (!p) { return std::nullopt; } p << create_mock_service_json(); p.close(); return path; @@ -70,14 +72,20 @@ std::optional create_mock_json_file(const std::filesystem class GCPCredentialsTests : public ::testing::Test { protected: void SetUp() override { - ASSERT_TRUE(gcp_credentials_node_); - ASSERT_TRUE(gcp_credentials_); plan_->addProcessor("DummyProcessor", "dummy_processor"); + auto gcp_credential_controller_service = + minifi::test::utils::make_custom_c_controller_service(core::ControllerServiceMetadata{utils::Identifier{}, + "GCPCredentialsControllerService", + logging::LoggerFactory::getLogger()}); + gcp_credentials_node_ = plan_->addController("gcp_credentials_controller_service", gcp_credential_controller_service); + gcp_credentials_ = static_cast( + gcp_credentials_node_->getControllerServiceImplementation()->getImpl()); } TestController test_controller_; std::shared_ptr plan_ = test_controller_.createPlan(); - std::shared_ptr gcp_credentials_node_ = plan_->addController("GCPCredentialsControllerService", "gcp_credentials_controller_service"); - std::shared_ptr gcp_credentials_ = gcp_credentials_node_->getControllerServiceImplementation(); + + std::shared_ptr gcp_credentials_node_; + GCPCredentialsControllerService* gcp_credentials_ = nullptr; }; TEST_F(GCPCredentialsTests, DefaultGCPCredentialsWithEnv) { @@ -85,13 +93,17 @@ TEST_F(GCPCredentialsTests, DefaultGCPCredentialsWithEnv) { auto path = create_mock_json_file(temp_directory); ASSERT_TRUE(path.has_value()); minifi::utils::Environment::setEnvironmentVariable("GOOGLE_APPLICATION_CREDENTIALS", path->string().c_str()); - plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::CredentialsLoc, magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_DEFAULT_CREDENTIALS)); + plan_->setProperty(gcp_credentials_node_, + GCPCredentialsControllerService::CredentialsLoc, + magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_DEFAULT_CREDENTIALS)); ASSERT_NO_THROW(test_controller_.runSession(plan_)); EXPECT_NE(nullptr, gcp_credentials_->getCredentials()); } TEST_F(GCPCredentialsTests, CredentialsFromJsonWithoutProperty) { - plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::CredentialsLoc, magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_JSON_FILE)); + plan_->setProperty(gcp_credentials_node_, + GCPCredentialsControllerService::CredentialsLoc, + magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_JSON_FILE)); ASSERT_NO_THROW(test_controller_.runSession(plan_)); EXPECT_EQ(nullptr, gcp_credentials_->getCredentials()); } @@ -100,8 +112,10 @@ TEST_F(GCPCredentialsTests, CredentialsFromJsonWithProperty) { auto temp_directory = test_controller_.createTempDirectory(); auto path = create_mock_json_file(temp_directory); ASSERT_TRUE(path.has_value()); - plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::CredentialsLoc, magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_JSON_FILE)); - plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::JsonFilePath, path->string()); + plan_->setProperty(gcp_credentials_node_, + GCPCredentialsControllerService::CredentialsLoc, + magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_JSON_FILE)); + ASSERT_TRUE(plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::JsonFilePath, path->string())); ASSERT_NO_THROW(test_controller_.runSession(plan_)); EXPECT_NE(nullptr, gcp_credentials_->getCredentials()); } @@ -110,32 +124,42 @@ TEST_F(GCPCredentialsTests, CredentialsFromJsonWithInvalidPath) { auto temp_directory = test_controller_.createTempDirectory(); auto path = create_mock_json_file(temp_directory); ASSERT_TRUE(path.has_value()); - plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::CredentialsLoc, magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_JSON_FILE)); - plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::JsonFilePath, "/invalid/path/to/credentials.json"); + plan_->setProperty(gcp_credentials_node_, + GCPCredentialsControllerService::CredentialsLoc, + magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_JSON_FILE)); + ASSERT_TRUE(plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::JsonFilePath, "/invalid/path/to/credentials.json")); ASSERT_NO_THROW(test_controller_.runSession(plan_)); EXPECT_EQ(nullptr, gcp_credentials_->getCredentials()); } TEST_F(GCPCredentialsTests, CredentialsFromComputeEngineVM) { - plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::CredentialsLoc, magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_COMPUTE_ENGINE_CREDENTIALS)); + plan_->setProperty(gcp_credentials_node_, + GCPCredentialsControllerService::CredentialsLoc, + magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_COMPUTE_ENGINE_CREDENTIALS)); ASSERT_NO_THROW(test_controller_.runSession(plan_)); EXPECT_NE(nullptr, gcp_credentials_->getCredentials()); } TEST_F(GCPCredentialsTests, AnonymousCredentials) { - plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::CredentialsLoc, magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_ANONYMOUS_CREDENTIALS)); + plan_->setProperty(gcp_credentials_node_, + GCPCredentialsControllerService::CredentialsLoc, + magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_ANONYMOUS_CREDENTIALS)); ASSERT_NO_THROW(test_controller_.runSession(plan_)); EXPECT_NE(nullptr, gcp_credentials_->getCredentials()); } TEST_F(GCPCredentialsTests, CredentialsFromJsonContentsWithoutProperty) { - plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::CredentialsLoc, magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_JSON_CONTENTS)); + plan_->setProperty(gcp_credentials_node_, + GCPCredentialsControllerService::CredentialsLoc, + magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_JSON_CONTENTS)); ASSERT_NO_THROW(test_controller_.runSession(plan_)); EXPECT_EQ(nullptr, gcp_credentials_->getCredentials()); } TEST_F(GCPCredentialsTests, CredentialsFromJsonContentsWithProperty) { - plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::CredentialsLoc, magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_JSON_CONTENTS)); + plan_->setProperty(gcp_credentials_node_, + GCPCredentialsControllerService::CredentialsLoc, + magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_JSON_CONTENTS)); plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::JsonContents, create_mock_service_json()); ASSERT_NO_THROW(test_controller_.runSession(plan_)); EXPECT_NE(nullptr, gcp_credentials_->getCredentials()); diff --git a/extensions/gcp/tests/ListGCSBucketTests.cpp b/extensions/gcp/tests/ListGCSBucketTests.cpp index 16b2c8dadb..df7e083079 100644 --- a/extensions/gcp/tests/ListGCSBucketTests.cpp +++ b/extensions/gcp/tests/ListGCSBucketTests.cpp @@ -14,43 +14,44 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "../processors/ListGCSBucket.h" #include "../controllerservices/GCPCredentialsControllerService.h" +#include "../processors/ListGCSBucket.h" +#include "CProcessorTestUtils.h" #include "core/Resource.h" -#include "unit/SingleProcessorTestController.h" -#include "google/cloud/storage/testing/mock_client.h" #include "google/cloud/storage/internal/object_metadata_parser.h" #include "google/cloud/storage/testing/canonical_errors.h" +#include "google/cloud/storage/testing/mock_client.h" #include "unit/ProcessorUtils.h" +#include "unit/SingleProcessorTestController.h" namespace gcs = ::google::cloud::storage; -namespace minifi_gcp = org::apache::nifi::minifi::extensions::gcp; +namespace minifi_gcp = minifi::extensions::gcp; -using ListGCSBucket = org::apache::nifi::minifi::extensions::gcp::ListGCSBucket; +using ListGCSBucket = minifi::extensions::gcp::ListGCSBucket; using ListObjectsRequest = gcs::internal::ListObjectsRequest; using ListObjectsResponse = gcs::internal::ListObjectsResponse; -using GCPCredentialsControllerService = org::apache::nifi::minifi::extensions::gcp::GCPCredentialsControllerService; -using ::google::cloud::storage::testing::canonical_errors::TransientError; +using GCPCredentialsControllerService = minifi::extensions::gcp::GCPCredentialsControllerService; using ::google::cloud::storage::testing::canonical_errors::PermanentError; +using ::google::cloud::storage::testing::canonical_errors::TransientError; namespace { class ListGCSBucketMocked : public ListGCSBucket { - using org::apache::nifi::minifi::extensions::gcp::ListGCSBucket::ListGCSBucket; + using ListGCSBucket::ListGCSBucket; + public: - static constexpr const char* Description = "ListGCSBucketMocked"; + ListGCSBucketMocked(minifi::core::ProcessorMetadata metadata, std::shared_ptr mock_client) + : ListGCSBucket(std::move(metadata)), + mock_client_(std::move(mock_client)) {} - gcs::Client getClient() const override { - return gcs::testing::UndecoratedClientFromMock(mock_client_); - } - std::shared_ptr mock_client_ = std::make_shared(); + protected: + gcs::Client getClient() const override { return gcs::testing::UndecoratedClientFromMock(mock_client_); } + std::shared_ptr mock_client_; }; -REGISTER_RESOURCE(ListGCSBucketMocked, Processor); auto CreateObject(int index, int generation = 1) { std::string id = "object-" + std::to_string(index); std::string name = id; - std::string link = - "https://storage.googleapis.com/storage/v1/b/test-bucket/" + id + "#1"; + std::string link = "https://storage.googleapis.com/storage/v1/b/test-bucket/" + id + "#1"; nlohmann::json metadata{ {"bucket", "test-bucket"}, {"id", id}, @@ -64,86 +65,78 @@ auto CreateObject(int index, int generation = 1) { } // namespace class ListGCSBucketTests : public ::testing::Test { - public: + protected: void SetUp() override { - list_gcs_bucket_ = test_controller_.getProcessor(); - gcp_credentials_node_ = test_controller_.plan->addController("GCPCredentialsControllerService", "gcp_credentials_controller_service"); - test_controller_.plan->setProperty(gcp_credentials_node_, - GCPCredentialsControllerService::CredentialsLoc, - magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_ANONYMOUS_CREDENTIALS)); - test_controller_.plan->setProperty(list_gcs_bucket_, - ListGCSBucket::GCPCredentials, - "gcp_credentials_controller_service"); + const auto gcp_credential_controller_service = + minifi::test::utils::make_custom_c_controller_service(core::ControllerServiceMetadata{utils::Identifier{}, + "GCPCredentialsControllerService", + logging::LoggerFactory::getLogger()}); + gcp_credentials_node_ = test_controller_.plan->addController("gcp_credentials_controller_service", gcp_credential_controller_service); + test_controller_.getProcessor()->setProperty(GCPCredentialsControllerService::CredentialsLoc.name, + std::string(magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_ANONYMOUS_CREDENTIALS))); + test_controller_.getProcessor()->setProperty(ListGCSBucketMocked::GCPCredentials.name, "gcp_credentials_controller_service"); } - org::apache::nifi::minifi::test::SingleProcessorTestController test_controller_{minifi::test::utils::make_processor("ListGCSBucketMocked")}; - TypedProcessorWrapper list_gcs_bucket_; - std::shared_ptr gcp_credentials_node_; + + public: + std::shared_ptr mock_client_ = std::make_shared(); + minifi::test::SingleProcessorTestController test_controller_{minifi::test::utils::make_custom_c_processor( + core::ProcessorMetadata{utils::Identifier{}, "ListGCSBucketMocked", logging::LoggerFactory::getLogger()}, mock_client_)}; + std::shared_ptr gcp_credentials_node_; }; TEST_F(ListGCSBucketTests, MissingBucket) { - EXPECT_CALL(*list_gcs_bucket_.get().mock_client_, CreateResumableUpload).Times(0); + EXPECT_CALL(*mock_client_, CreateResumableUpload).Times(0); EXPECT_THROW(test_controller_.trigger(), std::runtime_error); } TEST_F(ListGCSBucketTests, ServerGivesPermaError) { - auto return_permanent_error = [](ListObjectsRequest const&) { - return google::cloud::StatusOr(PermanentError()); - }; - EXPECT_CALL(*list_gcs_bucket_.get().mock_client_, ListObjects) - .WillOnce(return_permanent_error); - EXPECT_TRUE(test_controller_.plan->setProperty(list_gcs_bucket_, ListGCSBucket::Bucket, "bucket-from-property")); + auto return_permanent_error = [](ListObjectsRequest const&) { return google::cloud::StatusOr(PermanentError()); }; + EXPECT_CALL(*mock_client_, ListObjects).WillOnce(return_permanent_error); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(ListGCSBucket::Bucket.name, "bucket-from-property")); const auto& result = test_controller_.trigger(); EXPECT_EQ(0, result.at(ListGCSBucket::Success).size()); } TEST_F(ListGCSBucketTests, ServerGivesTransientErrors) { - auto return_temp_error = [](ListObjectsRequest const&) { - return google::cloud::StatusOr(TransientError()); - }; - EXPECT_CALL(*list_gcs_bucket_.get().mock_client_, ListObjects).WillOnce(return_temp_error); - EXPECT_TRUE(test_controller_.plan->setProperty(list_gcs_bucket_, ListGCSBucket::NumberOfRetries, "1")); - EXPECT_TRUE(test_controller_.plan->setProperty(list_gcs_bucket_, ListGCSBucket::Bucket, "bucket-from-property")); + auto return_temp_error = [](ListObjectsRequest const&) { return google::cloud::StatusOr(TransientError()); }; + EXPECT_CALL(*mock_client_, ListObjects).WillOnce(return_temp_error); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(ListGCSBucket::NumberOfRetries.name, "1")); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(ListGCSBucket::Bucket.name, "bucket-from-property")); const auto& result = test_controller_.trigger(); EXPECT_EQ(0, result.at(ListGCSBucket::Success).size()); } TEST_F(ListGCSBucketTests, WithoutVersions) { - EXPECT_CALL(*list_gcs_bucket_.get().mock_client_, ListObjects) - .WillOnce([](ListObjectsRequest const& req) - -> google::cloud::StatusOr { - EXPECT_EQ("bucket-from-property", req.bucket_name()); - EXPECT_TRUE(req.HasOption()); - EXPECT_FALSE(req.GetOption().value()); + EXPECT_CALL(*mock_client_, ListObjects).WillOnce([](ListObjectsRequest const& req) -> google::cloud::StatusOr { + EXPECT_EQ("bucket-from-property", req.bucket_name()); + EXPECT_TRUE(req.HasOption()); + EXPECT_FALSE(req.GetOption().value()); - ListObjectsResponse response; - response.items.emplace_back(CreateObject(1, 1)); - response.items.emplace_back(CreateObject(1, 2)); - response.items.emplace_back(CreateObject(1, 3)); - return response; - }); - EXPECT_TRUE(test_controller_.plan->setProperty(list_gcs_bucket_, ListGCSBucket::Bucket, "bucket-from-property")); + ListObjectsResponse response; + response.items.emplace_back(CreateObject(1, 1)); + response.items.emplace_back(CreateObject(1, 2)); + response.items.emplace_back(CreateObject(1, 3)); + return response; + }); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(ListGCSBucket::Bucket.name, "bucket-from-property")); const auto& result = test_controller_.trigger(); EXPECT_EQ(3, result.at(ListGCSBucket::Success).size()); } - TEST_F(ListGCSBucketTests, WithVersions) { - EXPECT_CALL(*list_gcs_bucket_.get().mock_client_, ListObjects) - .WillOnce([](ListObjectsRequest const& req) - -> google::cloud::StatusOr { - EXPECT_EQ("bucket-from-property", req.bucket_name()); - EXPECT_TRUE(req.HasOption()); - EXPECT_TRUE(req.GetOption().value()); + EXPECT_CALL(*mock_client_, ListObjects).WillOnce([](ListObjectsRequest const& req) -> google::cloud::StatusOr { + EXPECT_EQ("bucket-from-property", req.bucket_name()); + EXPECT_TRUE(req.HasOption()); + EXPECT_TRUE(req.GetOption().value()); - ListObjectsResponse response; - response.items.emplace_back(CreateObject(1)); - response.items.emplace_back(CreateObject(2)); - response.items.emplace_back(CreateObject(3)); - return response; - }); - EXPECT_TRUE(test_controller_.plan->setProperty(list_gcs_bucket_, ListGCSBucket::Bucket, "bucket-from-property")); - EXPECT_TRUE(test_controller_.plan->setProperty(list_gcs_bucket_, ListGCSBucket::ListAllVersions, "true")); + ListObjectsResponse response; + response.items.emplace_back(CreateObject(1)); + response.items.emplace_back(CreateObject(2)); + response.items.emplace_back(CreateObject(3)); + return response; + }); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(ListGCSBucket::Bucket.name, "bucket-from-property")); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(ListGCSBucket::ListAllVersions.name, "true")); const auto& result = test_controller_.trigger(); EXPECT_EQ(3, result.at(ListGCSBucket::Success).size()); } - diff --git a/extensions/gcp/tests/PutGCSObjectTests.cpp b/extensions/gcp/tests/PutGCSObjectTests.cpp index a8355b50f7..af7a1a9582 100644 --- a/extensions/gcp/tests/PutGCSObjectTests.cpp +++ b/extensions/gcp/tests/PutGCSObjectTests.cpp @@ -14,56 +14,59 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "../processors/PutGCSObject.h" #include "../controllerservices/GCPCredentialsControllerService.h" +#include "../processors/PutGCSObject.h" +#include "CProcessorTestUtils.h" #include "GCPAttributes.h" #include "core/Resource.h" -#include "unit/SingleProcessorTestController.h" -#include "google/cloud/storage/testing/mock_client.h" #include "google/cloud/storage/internal/object_metadata_parser.h" -#include "google/cloud/storage/retry_policy.h" #include "google/cloud/storage/testing/canonical_errors.h" +#include "google/cloud/storage/testing/mock_client.h" #include "unit/ProcessorUtils.h" +#include "unit/SingleProcessorTestController.h" namespace gcs = ::google::cloud::storage; -namespace minifi_gcp = org::apache::nifi::minifi::extensions::gcp; +namespace minifi_gcp = minifi::extensions::gcp; -using PutGCSObject = org::apache::nifi::minifi::extensions::gcp::PutGCSObject; -using GCPCredentialsControllerService = org::apache::nifi::minifi::extensions::gcp::GCPCredentialsControllerService; +using PutGCSObject = minifi::extensions::gcp::PutGCSObject; +using GCPCredentialsControllerService = minifi::extensions::gcp::GCPCredentialsControllerService; using ResumableUploadRequest = gcs::internal::ResumableUploadRequest; using QueryResumableUploadResponse = gcs::internal::QueryResumableUploadResponse; -using ::google::cloud::storage::testing::canonical_errors::TransientError; using ::google::cloud::storage::testing::canonical_errors::PermanentError; +using ::google::cloud::storage::testing::canonical_errors::TransientError; namespace { class PutGCSObjectMocked : public PutGCSObject { - using org::apache::nifi::minifi::extensions::gcp::PutGCSObject::PutGCSObject; + using PutGCSObject::PutGCSObject; + public: - static constexpr const char* Description = "PutGCSObjectMocked"; + PutGCSObjectMocked(minifi::core::ProcessorMetadata metadata, std::shared_ptr mock_client) + : PutGCSObject(std::move(metadata)), + mock_client_(std::move(mock_client)) {} - gcs::Client getClient() const override { - return gcs::testing::UndecoratedClientFromMock(mock_client_); - } - std::shared_ptr mock_client_ = std::make_shared(); + protected: + gcs::Client getClient() const override { return gcs::testing::UndecoratedClientFromMock(mock_client_); } + std::shared_ptr mock_client_; }; -REGISTER_RESOURCE(PutGCSObjectMocked, Processor); } // namespace class PutGCSObjectTests : public ::testing::Test { - public: + protected: void SetUp() override { - put_gcs_object_ = test_controller_.getProcessor(); - gcp_credentials_node_ = test_controller_.plan->addController("GCPCredentialsControllerService", "gcp_credentials_controller_service"); - test_controller_.plan->setProperty(gcp_credentials_node_, - GCPCredentialsControllerService::CredentialsLoc, - magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_ANONYMOUS_CREDENTIALS)); - test_controller_.plan->setProperty(put_gcs_object_, - PutGCSObject::GCPCredentials, - "gcp_credentials_controller_service"); + const auto gcp_credential_controller_service = + minifi::test::utils::make_custom_c_controller_service(core::ControllerServiceMetadata{utils::Identifier{}, + "GCPCredentialsControllerService", + logging::LoggerFactory::getLogger()}); + gcp_credentials_node_ = test_controller_.plan->addController("gcp_credentials_controller_service", gcp_credential_controller_service); + test_controller_.getProcessor()->setProperty(GCPCredentialsControllerService::CredentialsLoc.name, + std::string(magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_ANONYMOUS_CREDENTIALS))); + test_controller_.getProcessor()->setProperty(PutGCSObjectMocked::GCPCredentials.name, "gcp_credentials_controller_service"); } - TypedProcessorWrapper put_gcs_object_; - org::apache::nifi::minifi::test::SingleProcessorTestController test_controller_{minifi::test::utils::make_processor("PutGCSObjectMocked")}; - std::shared_ptr gcp_credentials_node_; + + std::shared_ptr mock_client_ = std::make_shared(); + minifi::test::SingleProcessorTestController test_controller_{minifi::test::utils::make_custom_c_processor( + core::ProcessorMetadata{utils::Identifier{}, "PutGCSObjectMocked", logging::LoggerFactory::getLogger()}, mock_client_)}; + std::shared_ptr gcp_credentials_node_; static auto return_upload_done(const ResumableUploadRequest& request) { using ObjectMetadataParser = gcs::internal::ObjectMetadataParser; @@ -75,13 +78,14 @@ class PutGCSObjectTests : public ::testing::Test { metadata_json["customerEncryption"]["encryptionAlgorithm"] = "AES256"; metadata_json["customerEncryption"]["keySha256"] = "zkeXIcAB56dkHp0z1023TQZ+mzm+fZ5JRVgmAQ3bEVE="; } - return testing::Return(google::cloud::make_status_or(QueryResumableUploadResponse{absl::nullopt, *ObjectMetadataParser::FromJson(metadata_json)})); + return testing::Return(google::cloud::make_status_or(QueryResumableUploadResponse{absl::nullopt, + *ObjectMetadataParser::FromJson(metadata_json)})); } }; TEST_F(PutGCSObjectTests, MissingBucket) { - EXPECT_CALL(*put_gcs_object_.get().mock_client_, CreateResumableUpload).Times(0); - EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket, "")); + EXPECT_CALL(*mock_client_, CreateResumableUpload).Times(0); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(PutGCSObject::Bucket.name, "")); const auto& result = test_controller_.trigger("hello world"); EXPECT_EQ(0, result.at(PutGCSObject::Success).size()); ASSERT_EQ(1, result.at(PutGCSObject::Failure).size()); @@ -91,13 +95,12 @@ TEST_F(PutGCSObjectTests, MissingBucket) { } TEST_F(PutGCSObjectTests, BucketFromAttribute) { - EXPECT_CALL(*put_gcs_object_.get().mock_client_, CreateResumableUpload) - .WillOnce([this](const ResumableUploadRequest& request) { - EXPECT_EQ("bucket-from-attribute", request.bucket_name()); - EXPECT_CALL(*put_gcs_object_.get().mock_client_, UploadChunk).WillOnce(return_upload_done(request)); - return gcs::internal::CreateResumableUploadResponse{"test-only-upload-id"}; - }); - EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket, "${gcs.bucket}")); + EXPECT_CALL(*mock_client_, CreateResumableUpload).WillOnce([this](const ResumableUploadRequest& request) { + EXPECT_EQ("bucket-from-attribute", request.bucket_name()); + EXPECT_CALL(*mock_client_, UploadChunk).WillOnce(return_upload_done(request)); + return gcs::internal::CreateResumableUploadResponse{"test-only-upload-id"}; + }); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(PutGCSObject::Bucket.name, "${gcs.bucket}")); const auto& result = test_controller_.trigger("hello world", {{std::string(minifi_gcp::GCS_BUCKET_ATTR), "bucket-from-attribute"}}); ASSERT_EQ(1, result.at(PutGCSObject::Success).size()); EXPECT_EQ(0, result.at(PutGCSObject::Failure).size()); @@ -105,10 +108,10 @@ TEST_F(PutGCSObjectTests, BucketFromAttribute) { } TEST_F(PutGCSObjectTests, ServerGivesTransientErrors) { - EXPECT_CALL(*put_gcs_object_.get().mock_client_, CreateResumableUpload).WillOnce(testing::Return(TransientError())); - EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::NumberOfRetries, "2")); - EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket, "bucket-from-property")); - EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key, "object-name-from-property")); + EXPECT_CALL(*mock_client_, CreateResumableUpload).WillOnce(testing::Return(TransientError())); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(PutGCSObject::NumberOfRetries.name, "2")); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(PutGCSObject::Bucket.name, "bucket-from-property")); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(PutGCSObject::Key.name, "object-name-from-property")); const auto& result = test_controller_.trigger("hello world"); EXPECT_EQ(0, result.at(PutGCSObject::Success).size()); ASSERT_EQ(1, result.at(PutGCSObject::Failure).size()); @@ -118,9 +121,9 @@ TEST_F(PutGCSObjectTests, ServerGivesTransientErrors) { } TEST_F(PutGCSObjectTests, ServerGivesPermaError) { - EXPECT_CALL(*put_gcs_object_.get().mock_client_, CreateResumableUpload).WillOnce(testing::Return(PermanentError())); - EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket, "bucket-from-property")); - EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key, "object-name-from-property")); + EXPECT_CALL(*mock_client_, CreateResumableUpload).WillOnce(testing::Return(PermanentError())); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(PutGCSObject::Bucket.name, "bucket-from-property")); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(PutGCSObject::Key.name, "object-name-from-property")); const auto& result = test_controller_.trigger("hello world"); EXPECT_EQ(0, result.at(PutGCSObject::Success).size()); ASSERT_EQ(1, result.at(PutGCSObject::Failure).size()); @@ -130,51 +133,48 @@ TEST_F(PutGCSObjectTests, ServerGivesPermaError) { } TEST_F(PutGCSObjectTests, NonRequiredPropertiesAreMissing) { - EXPECT_CALL(*put_gcs_object_.get().mock_client_, CreateResumableUpload) - .WillOnce([this](const ResumableUploadRequest& request) { - EXPECT_FALSE(request.HasOption()); - EXPECT_FALSE(request.HasOption()); - EXPECT_FALSE(request.HasOption()); - EXPECT_FALSE(request.HasOption()); - EXPECT_CALL(*put_gcs_object_.get().mock_client_, UploadChunk).WillOnce(return_upload_done(request)); - return gcs::internal::CreateResumableUploadResponse{"test-only-upload-id"}; - }); - EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket, "bucket-from-property")); - EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key, "object-name-from-property")); + EXPECT_CALL(*mock_client_, CreateResumableUpload).WillOnce([this](const ResumableUploadRequest& request) { + EXPECT_FALSE(request.HasOption()); + EXPECT_FALSE(request.HasOption()); + EXPECT_FALSE(request.HasOption()); + EXPECT_FALSE(request.HasOption()); + EXPECT_CALL(*mock_client_, UploadChunk).WillOnce(return_upload_done(request)); + return gcs::internal::CreateResumableUploadResponse{"test-only-upload-id"}; + }); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(PutGCSObject::Bucket.name, "bucket-from-property")); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(PutGCSObject::Key.name, "object-name-from-property")); const auto& result = test_controller_.trigger("hello world"); EXPECT_EQ(1, result.at(PutGCSObject::Success).size()); EXPECT_EQ(0, result.at(PutGCSObject::Failure).size()); } TEST_F(PutGCSObjectTests, Crc32cMD5LocationTest) { - EXPECT_CALL(*put_gcs_object_.get().mock_client_, CreateResumableUpload) - .WillOnce([this](const ResumableUploadRequest& request) { - EXPECT_TRUE(request.HasOption()); - EXPECT_EQ("yZRlqg==", request.GetOption().value()); - EXPECT_TRUE(request.HasOption()); - EXPECT_EQ("XrY7u+Ae7tCTyyK7j1rNww==", request.GetOption().value()); - EXPECT_CALL(*put_gcs_object_.get().mock_client_, UploadChunk).WillOnce(return_upload_done(request)); - return gcs::internal::CreateResumableUploadResponse{"test-only-upload-id"}; - }); - EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::MD5Hash, "${md5}")); - EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Crc32cChecksum, "${crc32c}")); - EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket, "bucket-from-property")); - EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key, "object-name-from-property")); + EXPECT_CALL(*mock_client_, CreateResumableUpload).WillOnce([this](const ResumableUploadRequest& request) { + EXPECT_TRUE(request.HasOption()); + EXPECT_EQ("yZRlqg==", request.GetOption().value()); + EXPECT_TRUE(request.HasOption()); + EXPECT_EQ("XrY7u+Ae7tCTyyK7j1rNww==", request.GetOption().value()); + EXPECT_CALL(*mock_client_, UploadChunk).WillOnce(return_upload_done(request)); + return gcs::internal::CreateResumableUploadResponse{"test-only-upload-id"}; + }); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(PutGCSObject::MD5Hash.name, "${md5}")); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(PutGCSObject::Crc32cChecksum.name, "${crc32c}")); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(PutGCSObject::Bucket.name, "bucket-from-property")); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(PutGCSObject::Key.name, "object-name-from-property")); const auto& result = test_controller_.trigger("hello world", {{"crc32c", "yZRlqg=="}, {"md5", "XrY7u+Ae7tCTyyK7j1rNww=="}}); EXPECT_EQ(1, result.at(PutGCSObject::Success).size()); EXPECT_EQ(0, result.at(PutGCSObject::Failure).size()); } TEST_F(PutGCSObjectTests, DontOverwriteTest) { - EXPECT_CALL(*put_gcs_object_.get().mock_client_, CreateResumableUpload) - .WillOnce([this](const ResumableUploadRequest& request) { - EXPECT_TRUE(request.HasOption()); - EXPECT_CALL(*put_gcs_object_.get().mock_client_, UploadChunk).WillOnce(return_upload_done(request)); - return gcs::internal::CreateResumableUploadResponse{"test-only-upload-id"}; - }); - EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::OverwriteObject, "false")); - EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket, "bucket-from-property")); - EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key, "object-name-from-property")); + EXPECT_CALL(*mock_client_, CreateResumableUpload).WillOnce([this](const ResumableUploadRequest& request) { + EXPECT_TRUE(request.HasOption()); + EXPECT_CALL(*mock_client_, UploadChunk).WillOnce(return_upload_done(request)); + return gcs::internal::CreateResumableUploadResponse{"test-only-upload-id"}; + }); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(PutGCSObject::OverwriteObject.name, "false")); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(PutGCSObject::Bucket.name, "bucket-from-property")); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(PutGCSObject::Key.name, "object-name-from-property")); const auto& result = test_controller_.trigger("hello world", {{"crc32c", "yZRlqg=="}, {"md5", "XrY7u+Ae7tCTyyK7j1rNww=="}}); ASSERT_EQ(1, result.at(PutGCSObject::Success).size()); EXPECT_EQ(0, result.at(PutGCSObject::Failure).size()); @@ -182,15 +182,14 @@ TEST_F(PutGCSObjectTests, DontOverwriteTest) { } TEST_F(PutGCSObjectTests, ValidServerSideEncryptionTest) { - EXPECT_CALL(*put_gcs_object_.get().mock_client_, CreateResumableUpload) - .WillOnce([this](const ResumableUploadRequest& request) { - EXPECT_TRUE(request.HasOption()); - EXPECT_CALL(*put_gcs_object_.get().mock_client_, UploadChunk).WillOnce(return_upload_done(request)); - return gcs::internal::CreateResumableUploadResponse{"test-only-upload-id"}; - }); - EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::EncryptionKey, "ZW5jcnlwdGlvbl9rZXk=")); - EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket, "bucket-from-property")); - EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key, "object-name-from-property")); + EXPECT_CALL(*mock_client_, CreateResumableUpload).WillOnce([this](const ResumableUploadRequest& request) { + EXPECT_TRUE(request.HasOption()); + EXPECT_CALL(*mock_client_, UploadChunk).WillOnce(return_upload_done(request)); + return gcs::internal::CreateResumableUploadResponse{"test-only-upload-id"}; + }); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(PutGCSObject::EncryptionKey.name, "ZW5jcnlwdGlvbl9rZXk=")); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(PutGCSObject::Bucket.name, "bucket-from-property")); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(PutGCSObject::Key.name, "object-name-from-property")); const auto& result = test_controller_.trigger("hello world"); ASSERT_EQ(1, result.at(PutGCSObject::Success).size()); EXPECT_EQ(0, result.at(PutGCSObject::Failure).size()); @@ -200,22 +199,21 @@ TEST_F(PutGCSObjectTests, ValidServerSideEncryptionTest) { } TEST_F(PutGCSObjectTests, InvalidServerSideEncryptionTest) { - EXPECT_CALL(*put_gcs_object_.get().mock_client_, CreateResumableUpload).Times(0); - EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::EncryptionKey, "not_base64_key")); - EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket, "bucket-from-property")); - EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key, "object-name-from-property")); + EXPECT_CALL(*mock_client_, CreateResumableUpload).Times(0); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(PutGCSObject::EncryptionKey.name, "not_base64_key")); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(PutGCSObject::Bucket.name, "bucket-from-property")); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(PutGCSObject::Key.name, "object-name-from-property")); EXPECT_THROW(test_controller_.trigger("hello world"), minifi::Exception); } TEST_F(PutGCSObjectTests, NoContentType) { - EXPECT_CALL(*put_gcs_object_.get().mock_client_, CreateResumableUpload) - .WillOnce([this](const ResumableUploadRequest& request) { - EXPECT_FALSE(request.HasOption()); - EXPECT_CALL(*put_gcs_object_.get().mock_client_, UploadChunk).WillOnce(return_upload_done(request)); - return gcs::internal::CreateResumableUploadResponse{"test-only-upload-id"}; - }); - EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket, "bucket-from-property")); - EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key, "object-name-from-property")); + EXPECT_CALL(*mock_client_, CreateResumableUpload).WillOnce([this](const ResumableUploadRequest& request) { + EXPECT_FALSE(request.HasOption()); + EXPECT_CALL(*mock_client_, UploadChunk).WillOnce(return_upload_done(request)); + return gcs::internal::CreateResumableUploadResponse{"test-only-upload-id"}; + }); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(PutGCSObject::Bucket.name, "bucket-from-property")); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(PutGCSObject::Key.name, "object-name-from-property")); const auto& result = test_controller_.trigger("hello world"); ASSERT_EQ(1, result.at(PutGCSObject::Success).size()); EXPECT_EQ(0, result.at(PutGCSObject::Failure).size()); @@ -223,15 +221,14 @@ TEST_F(PutGCSObjectTests, NoContentType) { } TEST_F(PutGCSObjectTests, ContentTypeFromAttribute) { - EXPECT_CALL(*put_gcs_object_.get().mock_client_, CreateResumableUpload) - .WillOnce([this](const ResumableUploadRequest& request) { - EXPECT_TRUE(request.HasOption()); - EXPECT_EQ("text/attribute", request.GetOption().value()); - EXPECT_CALL(*put_gcs_object_.get().mock_client_, UploadChunk).WillOnce(return_upload_done(request)); - return gcs::internal::CreateResumableUploadResponse{"test-only-upload-id"}; - }); - EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket, "bucket-from-property")); - EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key, "object-name-from-property")); + EXPECT_CALL(*mock_client_, CreateResumableUpload).WillOnce([this](const ResumableUploadRequest& request) { + EXPECT_TRUE(request.HasOption()); + EXPECT_EQ("text/attribute", request.GetOption().value()); + EXPECT_CALL(*mock_client_, UploadChunk).WillOnce(return_upload_done(request)); + return gcs::internal::CreateResumableUploadResponse{"test-only-upload-id"}; + }); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(PutGCSObject::Bucket.name, "bucket-from-property")); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(PutGCSObject::Key.name, "object-name-from-property")); const auto& result = test_controller_.trigger("hello world", {{"mime.type", "text/attribute"}}); ASSERT_EQ(1, result.at(PutGCSObject::Success).size()); EXPECT_EQ(0, result.at(PutGCSObject::Failure).size()); @@ -239,16 +236,16 @@ TEST_F(PutGCSObjectTests, ContentTypeFromAttribute) { } TEST_F(PutGCSObjectTests, ObjectACLTest) { - EXPECT_CALL(*put_gcs_object_.get().mock_client_, CreateResumableUpload) - .WillOnce([this](const ResumableUploadRequest& request) { - EXPECT_TRUE(request.HasOption()); - EXPECT_EQ(gcs::PredefinedAcl::AuthenticatedRead().value(), request.GetOption().value()); - EXPECT_CALL(*put_gcs_object_.get().mock_client_, UploadChunk).WillOnce(return_upload_done(request)); - return gcs::internal::CreateResumableUploadResponse{"test-only-upload-id"}; - }); - EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket, "bucket-from-property")); - EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key, "object-name-from-property")); - EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::ObjectACL, magic_enum::enum_name(minifi_gcp::put_gcs_object::PredefinedAcl::AUTHENTICATED_READ))); + EXPECT_CALL(*mock_client_, CreateResumableUpload).WillOnce([this](const ResumableUploadRequest& request) { + EXPECT_TRUE(request.HasOption()); + EXPECT_EQ(gcs::PredefinedAcl::AuthenticatedRead().value(), request.GetOption().value()); + EXPECT_CALL(*mock_client_, UploadChunk).WillOnce(return_upload_done(request)); + return gcs::internal::CreateResumableUploadResponse{"test-only-upload-id"}; + }); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(PutGCSObject::Bucket.name, "bucket-from-property")); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(PutGCSObject::Key.name, "object-name-from-property")); + EXPECT_TRUE(test_controller_.getProcessor()->setProperty(PutGCSObject::ObjectACL.name, + std::string(magic_enum::enum_name(minifi_gcp::put_gcs_object::PredefinedAcl::AUTHENTICATED_READ)))); const auto& result = test_controller_.trigger("hello world"); ASSERT_EQ(1, result.at(PutGCSObject::Success).size()); EXPECT_EQ(0, result.at(PutGCSObject::Failure).size()); @@ -257,7 +254,8 @@ TEST_F(PutGCSObjectTests, ObjectACLTest) { TEST_F(PutGCSObjectTests, PredefinedACLTests) { EXPECT_EQ(magic_enum::enum_name(minifi_gcp::put_gcs_object::PredefinedAcl::AUTHENTICATED_READ), gcs::PredefinedAcl::AuthenticatedRead().value()); - EXPECT_EQ(magic_enum::enum_name(minifi_gcp::put_gcs_object::PredefinedAcl::BUCKET_OWNER_FULL_CONTROL), gcs::PredefinedAcl::BucketOwnerFullControl().value()); + EXPECT_EQ(magic_enum::enum_name(minifi_gcp::put_gcs_object::PredefinedAcl::BUCKET_OWNER_FULL_CONTROL), + gcs::PredefinedAcl::BucketOwnerFullControl().value()); EXPECT_EQ(magic_enum::enum_name(minifi_gcp::put_gcs_object::PredefinedAcl::BUCKET_OWNER_READ_ONLY), gcs::PredefinedAcl::BucketOwnerRead().value()); EXPECT_EQ(magic_enum::enum_name(minifi_gcp::put_gcs_object::PredefinedAcl::PRIVATE), gcs::PredefinedAcl::Private().value()); EXPECT_EQ(magic_enum::enum_name(minifi_gcp::put_gcs_object::PredefinedAcl::PROJECT_PRIVATE), gcs::PredefinedAcl::ProjectPrivate().value()); diff --git a/extensions/llamacpp/CMakeLists.txt b/extensions/llamacpp/CMakeLists.txt index 421143f692..9caae4033f 100644 --- a/extensions/llamacpp/CMakeLists.txt +++ b/extensions/llamacpp/CMakeLists.txt @@ -25,7 +25,7 @@ include(LlamaCpp) include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt) -file(GLOB SOURCES "processors/*.cpp") +file(GLOB SOURCES "processors/*.cpp" "ExtensionInitializer.cpp") add_minifi_library(minifi-llamacpp SHARED ${SOURCES}) target_include_directories(minifi-llamacpp PUBLIC "${CMAKE_SOURCE_DIR}/extensions/llamacpp") diff --git a/extensions/llamacpp/processors/ExtensionInitializer.cpp b/extensions/llamacpp/ExtensionInitializer.cpp similarity index 84% rename from extensions/llamacpp/processors/ExtensionInitializer.cpp rename to extensions/llamacpp/ExtensionInitializer.cpp index ac5e0519d2..82b91c41d9 100644 --- a/extensions/llamacpp/processors/ExtensionInitializer.cpp +++ b/extensions/llamacpp/ExtensionInitializer.cpp @@ -15,9 +15,9 @@ * limitations under the License. */ -#include "RunLlamaCppInference.h" #include "api/core/Resource.h" #include "api/utils/minifi-c-utils.h" +#include "processors/RunLlamaCppInference.h" #define MKSOC(x) #x #define MAKESTRING(x) MKSOC(x) // NOLINT(cppcoreguidelines-macro-usage) @@ -34,7 +34,5 @@ CEXTENSIONAPI void MinifiInitExtension(MinifiExtensionContext* extension_context .user_data = nullptr }; auto* extension = MinifiCreateExtension(extension_context, &ext_create_info); - minifi::api::core::useProcessorClassDescription([&] (const MinifiProcessorClassDefinition& description) { - MinifiRegisterProcessor(extension, &description); - }); + minifi::api::core::registerProcessors(extension); }