Skip to content

Commit 598f925

Browse files
committed
MINIFICPP-2765 Move GCP Extension to stable C API
1 parent 6809849 commit 598f925

21 files changed

+623
-603
lines changed

extensions/gcp/CMakeLists.txt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ add_minifi_library(minifi-gcp SHARED ${SOURCES})
3030
if (NOT WIN32)
3131
target_compile_options(minifi-gcp PRIVATE -Wno-error=deprecated-declarations) # Suppress deprecation warnings for std::rel_ops usage
3232
endif()
33-
target_link_libraries(minifi-gcp ${LIBMINIFI} google-cloud-cpp::storage)
34-
target_include_directories(minifi-gcp SYSTEM PUBLIC ${google-cloud-cpp_INCLUDE_DIRS})
33+
target_link_libraries(minifi-gcp minifi-cpp-extension-lib google-cloud-cpp::storage)
3534

36-
register_extension(minifi-gcp "GCP EXTENSIONS" GCP-EXTENSIONS "This enables Google Cloud Platform support" "extensions/gcp/tests")
35+
target_include_directories(minifi-gcp SYSTEM PUBLIC ${google-cloud-cpp_INCLUDE_DIRS})
3736

37+
register_c_api_extension(minifi-gcp "GCP EXTENSIONS" GCP-EXTENSIONS "This enables Google Cloud Platform support" "extensions/gcp/tests")
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#include "api/core/Resource.h"
19+
#include "api/utils/minifi-c-utils.h"
20+
#include "processors/DeleteGCSObject.h"
21+
#include "processors/FetchGCSObject.h"
22+
#include "processors/ListGCSBucket.h"
23+
#include "processors/PutGCSObject.h"
24+
25+
#define MKSOC(x) #x
26+
#define MAKESTRING(x) MKSOC(x) // NOLINT(cppcoreguidelines-macro-usage)
27+
28+
namespace minifi = org::apache::nifi::minifi;
29+
30+
CEXTENSIONAPI const uint32_t MinifiApiVersion = MINIFI_API_VERSION;
31+
32+
CEXTENSIONAPI void MinifiInitExtension(MinifiExtensionContext* extension_context) {
33+
MinifiExtensionCreateInfo ext_create_info{.name = minifi::api::utils::toStringView(MAKESTRING(EXTENSION_NAME)),
34+
.version = minifi::api::utils::toStringView(MAKESTRING(EXTENSION_VERSION)),
35+
.deinit = nullptr,
36+
.user_data = nullptr};
37+
auto* extension = MinifiCreateExtension(extension_context, &ext_create_info);
38+
minifi::api::core::useProcessorClassDescription<minifi::extensions::gcp::DeleteGCSObject>([&](const MinifiProcessorClassDefinition& description) {
39+
MinifiRegisterProcessor(extension, &description);
40+
});
41+
minifi::api::core::useProcessorClassDescription<minifi::extensions::gcp::FetchGCSObject>([&](const MinifiProcessorClassDefinition& description) {
42+
MinifiRegisterProcessor(extension, &description);
43+
});
44+
minifi::api::core::useProcessorClassDescription<minifi::extensions::gcp::ListGCSBucket>([&](const MinifiProcessorClassDefinition& description) {
45+
MinifiRegisterProcessor(extension, &description);
46+
});
47+
minifi::api::core::useProcessorClassDescription<minifi::extensions::gcp::PutGCSObject>([&](const MinifiProcessorClassDefinition& description) {
48+
MinifiRegisterProcessor(extension, &description);
49+
});
50+
minifi::api::core::useControllerServiceClassDescription<minifi::extensions::gcp::GCPCredentialsControllerService>(
51+
[&](const MinifiControllerServiceClassDefinition& description) { MinifiRegisterControllerService(extension, &description); });
52+
}

extensions/gcp/GCPAttributes.h

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@
1919

2020
#include <string_view>
2121

22+
#include "api/core/FlowFile.h"
23+
#include "api/core/ProcessSession.h"
2224
#include "google/cloud/storage/object_metadata.h"
23-
#include "minifi-cpp/core/FlowFile.h"
2425

2526
namespace org::apache::nifi::minifi::extensions::gcp {
2627

@@ -50,32 +51,32 @@ constexpr std::string_view GCS_SELF_LINK_ATTR = "gcs.self.link";
5051
constexpr std::string_view GCS_ENCRYPTION_ALGORITHM_ATTR = "gcs.encryption.algorithm";
5152
constexpr std::string_view GCS_ENCRYPTION_SHA256_ATTR = "gcs.encryption.sha256";
5253

53-
inline void setAttributesFromObjectMetadata(core::FlowFile& flow_file, const ::google::cloud::storage::ObjectMetadata& object_metadata) {
54-
flow_file.setAttribute(GCS_BUCKET_ATTR, object_metadata.bucket());
55-
flow_file.setAttribute(GCS_OBJECT_NAME_ATTR, object_metadata.name());
56-
flow_file.setAttribute(GCS_SIZE_ATTR, std::to_string(object_metadata.size()));
57-
flow_file.setAttribute(GCS_CRC32C_ATTR, object_metadata.crc32c());
58-
flow_file.setAttribute(GCS_MD5_ATTR, object_metadata.md5_hash());
59-
flow_file.setAttribute(GCS_CONTENT_ENCODING_ATTR, object_metadata.content_encoding());
60-
flow_file.setAttribute(GCS_CONTENT_LANGUAGE_ATTR, object_metadata.content_language());
61-
flow_file.setAttribute(GCS_CONTENT_DISPOSITION_ATTR, object_metadata.content_disposition());
62-
flow_file.setAttribute(GCS_CREATE_TIME_ATTR, std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(object_metadata.time_created().time_since_epoch()).count()));
63-
flow_file.setAttribute(GCS_UPDATE_TIME_ATTR, std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(object_metadata.updated().time_since_epoch()).count()));
64-
flow_file.setAttribute(GCS_DELETE_TIME_ATTR, std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(object_metadata.time_deleted().time_since_epoch()).count()));
65-
flow_file.setAttribute(GCS_MEDIA_LINK_ATTR, object_metadata.media_link());
66-
flow_file.setAttribute(GCS_SELF_LINK_ATTR, object_metadata.self_link());
67-
flow_file.setAttribute(GCS_ETAG_ATTR, object_metadata.etag());
68-
flow_file.setAttribute(GCS_GENERATED_ID, object_metadata.id());
69-
flow_file.setAttribute(GCS_META_GENERATION, std::to_string(object_metadata.metageneration()));
70-
flow_file.setAttribute(GCS_GENERATION, std::to_string(object_metadata.generation()));
71-
flow_file.setAttribute(GCS_STORAGE_CLASS, object_metadata.storage_class());
54+
inline void setAttributesFromObjectMetadata(api::core::FlowFile& flow_file, const ::google::cloud::storage::ObjectMetadata& object_metadata, api::core::ProcessSession& session) {
55+
session.setAttribute(flow_file, GCS_BUCKET_ATTR, object_metadata.bucket());
56+
session.setAttribute(flow_file, GCS_OBJECT_NAME_ATTR, object_metadata.name());
57+
session.setAttribute(flow_file, GCS_SIZE_ATTR, std::to_string(object_metadata.size()));
58+
session.setAttribute(flow_file, GCS_CRC32C_ATTR, object_metadata.crc32c());
59+
session.setAttribute(flow_file, GCS_MD5_ATTR, object_metadata.md5_hash());
60+
session.setAttribute(flow_file, GCS_CONTENT_ENCODING_ATTR, object_metadata.content_encoding());
61+
session.setAttribute(flow_file, GCS_CONTENT_LANGUAGE_ATTR, object_metadata.content_language());
62+
session.setAttribute(flow_file, GCS_CONTENT_DISPOSITION_ATTR, object_metadata.content_disposition());
63+
session.setAttribute(flow_file, GCS_CREATE_TIME_ATTR, std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(object_metadata.time_created().time_since_epoch()).count()));
64+
session.setAttribute(flow_file, GCS_UPDATE_TIME_ATTR, std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(object_metadata.updated().time_since_epoch()).count()));
65+
session.setAttribute(flow_file, GCS_DELETE_TIME_ATTR, std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(object_metadata.time_deleted().time_since_epoch()).count()));
66+
session.setAttribute(flow_file, GCS_MEDIA_LINK_ATTR, object_metadata.media_link());
67+
session.setAttribute(flow_file, GCS_SELF_LINK_ATTR, object_metadata.self_link());
68+
session.setAttribute(flow_file, GCS_ETAG_ATTR, object_metadata.etag());
69+
session.setAttribute(flow_file, GCS_GENERATED_ID, object_metadata.id());
70+
session.setAttribute(flow_file, GCS_META_GENERATION, std::to_string(object_metadata.metageneration()));
71+
session.setAttribute(flow_file, GCS_GENERATION, std::to_string(object_metadata.generation()));
72+
session.setAttribute(flow_file, GCS_STORAGE_CLASS, object_metadata.storage_class());
7273
if (object_metadata.has_customer_encryption()) {
73-
flow_file.setAttribute(GCS_ENCRYPTION_ALGORITHM_ATTR, object_metadata.customer_encryption().encryption_algorithm);
74-
flow_file.setAttribute(GCS_ENCRYPTION_SHA256_ATTR, object_metadata.customer_encryption().key_sha256);
74+
session.setAttribute(flow_file, GCS_ENCRYPTION_ALGORITHM_ATTR, object_metadata.customer_encryption().encryption_algorithm);
75+
session.setAttribute(flow_file, GCS_ENCRYPTION_SHA256_ATTR, object_metadata.customer_encryption().key_sha256);
7576
}
7677
if (object_metadata.has_owner()) {
77-
flow_file.setAttribute(GCS_OWNER_ENTITY_ATTR, object_metadata.owner().entity);
78-
flow_file.setAttribute(GCS_OWNER_ENTITY_ID_ATTR, object_metadata.owner().entity_id);
78+
session.setAttribute(flow_file, GCS_OWNER_ENTITY_ATTR, object_metadata.owner().entity);
79+
session.setAttribute(flow_file, GCS_OWNER_ENTITY_ID_ATTR, object_metadata.owner().entity_id);
7980
}
8081
}
8182

extensions/gcp/controllerservices/GCPCredentialsControllerService.cpp

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,34 +18,36 @@
1818

1919
#include "GCPCredentialsControllerService.h"
2020

21-
#include "core/Resource.h"
2221
#include "google/cloud/storage/client.h"
23-
#include "utils/ProcessorConfigUtils.h"
24-
#include "utils/file/FileUtils.h"
2522

2623
namespace org::apache::nifi::minifi::extensions::gcp {
2724

28-
void GCPCredentialsControllerService::initialize() {
29-
setSupportedProperties(Properties);
25+
namespace {
26+
// TODO(MINIFICPP-2763) use utils::file::get_content instead
27+
std::string get_content(const std::filesystem::path& file_name) {
28+
std::ifstream file(file_name, std::ifstream::binary);
29+
std::string content((std::istreambuf_iterator<char>(file)), std::istreambuf_iterator<char>());
30+
return content;
31+
}
3032
}
3133

32-
std::shared_ptr<google::cloud::Credentials> GCPCredentialsControllerService::createCredentialsFromJsonPath() const {
33-
const auto json_path = getProperty(JsonFilePath.name);
34+
std::shared_ptr<google::cloud::Credentials> GCPCredentialsControllerService::createCredentialsFromJsonPath(api::core::ControllerServiceContext& ctx) const {
35+
const auto json_path = ctx.getProperty(JsonFilePath.name);
3436
if (!json_path) {
3537
logger_->log_error("Missing or invalid {}", JsonFilePath.name);
3638
return nullptr;
3739
}
3840

39-
if (!utils::file::exists(*json_path)) {
41+
if (std::error_code ec; !std::filesystem::exists(*json_path, ec) || ec) {
4042
logger_->log_error("JSON file for GCP credentials '{}' does not exist", *json_path);
4143
return nullptr;
4244
}
4345

44-
return google::cloud::MakeServiceAccountCredentials(utils::file::get_content(*json_path));
46+
return google::cloud::MakeServiceAccountCredentials(get_content(*json_path));
4547
}
4648

47-
std::shared_ptr<google::cloud::Credentials> GCPCredentialsControllerService::createCredentialsFromJsonContents() const {
48-
auto json_contents = getProperty(JsonContents.name);
49+
std::shared_ptr<google::cloud::Credentials> GCPCredentialsControllerService::createCredentialsFromJsonContents(api::core::ControllerServiceContext& ctx) const {
50+
auto json_contents = ctx.getProperty(JsonContents.name);
4951
if (!json_contents) {
5052
logger_->log_error("Missing or invalid {}", JsonContents.name);
5153
return nullptr;
@@ -54,9 +56,9 @@ std::shared_ptr<google::cloud::Credentials> GCPCredentialsControllerService::cre
5456
return google::cloud::MakeServiceAccountCredentials(*json_contents);
5557
}
5658

57-
void GCPCredentialsControllerService::onEnable() {
59+
MinifiStatus GCPCredentialsControllerService::enableImpl(api::core::ControllerServiceContext& ctx) {
5860
std::optional<CredentialsLocation> credentials_location;
59-
if (const auto value = getProperty(CredentialsLoc.name)) {
61+
if (const auto value = ctx.getProperty(CredentialsLoc.name)) {
6062
credentials_location = magic_enum::enum_cast<CredentialsLocation>(*value);
6163
}
6264
if (!credentials_location) {
@@ -68,15 +70,15 @@ void GCPCredentialsControllerService::onEnable() {
6870
} else if (*credentials_location == CredentialsLocation::USE_COMPUTE_ENGINE_CREDENTIALS) {
6971
credentials_ = google::cloud::MakeComputeEngineCredentials();
7072
} else if (*credentials_location == CredentialsLocation::USE_JSON_FILE) {
71-
credentials_ = createCredentialsFromJsonPath();
73+
credentials_ = createCredentialsFromJsonPath(ctx);
7274
} else if (*credentials_location == CredentialsLocation::USE_JSON_CONTENTS) {
73-
credentials_ = createCredentialsFromJsonContents();
75+
credentials_ = createCredentialsFromJsonContents(ctx);
7476
} else if (*credentials_location == CredentialsLocation::USE_ANONYMOUS_CREDENTIALS) {
7577
credentials_ = google::cloud::MakeInsecureCredentials();
7678
}
7779
if (!credentials_)
7880
logger_->log_error("Couldn't create valid credentials");
81+
return MINIFI_STATUS_SUCCESS;
7982
}
8083

81-
REGISTER_RESOURCE(GCPCredentialsControllerService, ControllerService);
8284
} // namespace org::apache::nifi::minifi::extensions::gcp

extensions/gcp/controllerservices/GCPCredentialsControllerService.h

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,15 @@
1818
#pragma once
1919

2020
#include <filesystem>
21-
#include <string>
2221
#include <memory>
22+
#include <string>
2323

24-
#include "core/controller/ControllerServiceBase.h"
25-
#include "minifi-cpp/core/logging/Logger.h"
26-
#include "core/logging/LoggerFactory.h"
27-
#include "minifi-cpp/core/PropertyDefinition.h"
24+
#include "api/core/ControllerServiceImpl.h"
25+
#include "api/utils/Export.h"
2826
#include "core/PropertyDefinitionBuilder.h"
29-
#include "utils/Enum.h"
30-
3127
#include "google/cloud/credentials.h"
28+
#include "minifi-cpp/core/PropertyDefinition.h"
29+
#include "utils/Enum.h"
3230

3331
namespace org::apache::nifi::minifi::extensions::gcp {
3432
enum class CredentialsLocation {
@@ -63,7 +61,7 @@ constexpr customize_t enum_name<CredentialsLocation>(CredentialsLocation value)
6361

6462
namespace org::apache::nifi::minifi::extensions::gcp {
6563

66-
class GCPCredentialsControllerService : public core::controller::ControllerServiceBase, public core::controller::ControllerServiceHandle {
64+
class GCPCredentialsControllerService : public api::core::ControllerServiceImpl {
6765
public:
6866
EXTENSIONAPI static constexpr const char* Description = "Manages the credentials for Google Cloud Platform. This allows for multiple Google Cloud Platform related processors "
6967
"to reference this single controller service so that Google Cloud Platform credentials can be managed and controlled in a central location.";
@@ -91,21 +89,16 @@ class GCPCredentialsControllerService : public core::controller::ControllerServi
9189

9290

9391
EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
94-
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES
95-
96-
using ControllerServiceBase::ControllerServiceBase;
97-
98-
void initialize() override;
9992

100-
void onEnable() override;
93+
using ControllerServiceImpl::ControllerServiceImpl;
10194

102-
[[nodiscard]] ControllerServiceHandle* getControllerServiceHandle() override {return this;}
95+
MinifiStatus enableImpl(api::core::ControllerServiceContext& ctx) override;
10396

10497
[[nodiscard]] const auto& getCredentials() const { return credentials_; }
10598

10699
protected:
107-
[[nodiscard]] std::shared_ptr<google::cloud::Credentials> createCredentialsFromJsonPath() const;
108-
[[nodiscard]] std::shared_ptr<google::cloud::Credentials> createCredentialsFromJsonContents() const;
100+
[[nodiscard]] std::shared_ptr<google::cloud::Credentials> createCredentialsFromJsonPath(api::core::ControllerServiceContext& ctx) const;
101+
[[nodiscard]] std::shared_ptr<google::cloud::Credentials> createCredentialsFromJsonContents(api::core::ControllerServiceContext& ctx) const;
109102

110103

111104
std::shared_ptr<google::cloud::Credentials> credentials_;

extensions/gcp/processors/DeleteGCSObject.cpp

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,69 +17,62 @@
1717

1818
#include "DeleteGCSObject.h"
1919

20-
#include "utils/ProcessorConfigUtils.h"
2120

2221
#include "../GCPAttributes.h"
23-
#include "minifi-cpp/core/FlowFile.h"
24-
#include "minifi-cpp/core/ProcessContext.h"
25-
#include "core/ProcessSession.h"
26-
#include "core/Resource.h"
22+
#include "api/core/ProcessContext.h"
23+
#include "api/core/ProcessSession.h"
24+
#include "api/core/Resource.h"
25+
#include "api/utils/ProcessorConfigUtils.h"
2726

2827
namespace gcs = ::google::cloud::storage;
2928

3029
namespace org::apache::nifi::minifi::extensions::gcp {
31-
void DeleteGCSObject::initialize() {
32-
setSupportedProperties(Properties);
33-
setSupportedRelationships(Relationships);
34-
}
3530

36-
void DeleteGCSObject::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
31+
MinifiStatus DeleteGCSObject::onTriggerImpl(api::core::ProcessContext& context, api::core::ProcessSession& session) {
3732
gsl_Expects(gcp_credentials_);
3833

3934
auto flow_file = session.get();
4035
if (!flow_file) {
41-
context.yield();
42-
return;
36+
return MINIFI_STATUS_PROCESSOR_YIELD;
4337
}
4438

45-
auto bucket = context.getProperty(Bucket, flow_file.get());
39+
auto bucket = api::utils::parseOptionalProperty(context, Bucket, &flow_file);
4640
if (!bucket || bucket->empty()) {
4741
logger_->log_error("Missing bucket name");
48-
session.transfer(flow_file, Failure);
49-
return;
42+
session.transfer(std::move(flow_file), Failure);
43+
return MINIFI_STATUS_SUCCESS;
5044
}
51-
auto object_name = context.getProperty(Key, flow_file.get());
45+
auto object_name = api::utils::parseOptionalProperty(context, Key, &flow_file);
5246
if (!object_name || object_name->empty()) {
5347
logger_->log_error("Missing object name");
54-
session.transfer(flow_file, Failure);
55-
return;
48+
session.transfer(std::move(flow_file), Failure);
49+
return MINIFI_STATUS_SUCCESS;
5650
}
5751

5852
gcs::Generation generation;
59-
if (const auto object_generation_str = context.getProperty(ObjectGeneration, flow_file.get()); object_generation_str && !object_generation_str->empty()) {
53+
if (auto object_generation_str = api::utils::parseOptionalProperty(context, ObjectGeneration, &flow_file); object_generation_str && !object_generation_str->empty()) {
6054
if (const auto geni64 = parsing::parseIntegral<int64_t>(*object_generation_str)) {
6155
generation = gcs::Generation{*geni64};
6256
} else {
6357
logger_->log_error("Invalid generation: {}", *object_generation_str);
64-
session.transfer(flow_file, Failure);
65-
return;
58+
session.transfer(std::move(flow_file), Failure);
59+
return MINIFI_STATUS_SUCCESS;
6660
}
6761
}
6862

6963
auto status = getClient().DeleteObject(*bucket, *object_name, generation, gcs::IfGenerationNotMatch(0));
7064

7165
if (!status.ok()) {
72-
flow_file->setAttribute(GCS_STATUS_MESSAGE, status.message());
73-
flow_file->setAttribute(GCS_ERROR_REASON, status.error_info().reason());
74-
flow_file->setAttribute(GCS_ERROR_DOMAIN, status.error_info().domain());
66+
session.setAttribute(flow_file, GCS_STATUS_MESSAGE, status.message());
67+
session.setAttribute(flow_file, GCS_ERROR_REASON, status.error_info().reason());
68+
session.setAttribute(flow_file, GCS_ERROR_DOMAIN, status.error_info().domain());
7569
logger_->log_error("Failed to delete {} object from {} bucket on Google Cloud Storage {} {}", *object_name, *bucket, status.message(), status.error_info().reason());
76-
session.transfer(flow_file, Failure);
77-
return;
70+
session.transfer(std::move(flow_file), Failure);
71+
return MINIFI_STATUS_SUCCESS;
7872
}
7973

80-
session.transfer(flow_file, Success);
74+
session.transfer(std::move(flow_file), Success);
75+
return MINIFI_STATUS_SUCCESS;
8176
}
8277

83-
REGISTER_RESOURCE(DeleteGCSObject, Processor);
84-
8578
} // namespace org::apache::nifi::minifi::extensions::gcp

0 commit comments

Comments
 (0)