Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions core-framework/include/http/HTTPCallback.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class HttpStreamingCallback final : public HTTPUploadByteArrayInputCallback {
seekInner(lock, pos);
}

int64_t operator()(const std::shared_ptr<io::InputStream>& stream) override {
io::IoResult operator()(const std::shared_ptr<io::InputStream>& stream) override {
std::vector<std::byte> vec;

if (stream->size() > 0) {
Expand All @@ -76,7 +76,7 @@ class HttpStreamingCallback final : public HTTPUploadByteArrayInputCallback {
return processInner(std::move(vec));
}

int64_t process(const uint8_t* data, size_t size) {
io::IoResult process(const uint8_t* data, size_t size) {
std::vector<std::byte> vec;
vec.resize(size);
memcpy(vec.data(), data, size);
Expand Down Expand Up @@ -156,20 +156,20 @@ class HttpStreamingCallback final : public HTTPUploadByteArrayInputCallback {
* @param vec the buffer to be inserted
* @return the number of bytes processed (the size of vec)
*/
int64_t processInner(std::vector<std::byte>&& vec) {
io::IoResult processInner(std::vector<std::byte>&& vec) {
size_t size = vec.size();

logger_->log_trace("processInner() called, vec.data(): {}, vec.size(): {}", static_cast<void*>(vec.data()), size);

if (size == 0U) {
return 0U;
return io::IoResult::zero();
}

std::unique_lock<std::mutex> lock(mutex_);
byte_arrays_.emplace_back(std::move(vec));
cv.notify_all();

return size;
return io::IoResult::fromSizeT(size);
}

/**
Expand Down
14 changes: 7 additions & 7 deletions core-framework/include/io/StreamPipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
namespace org::apache::nifi::minifi {
namespace internal {

inline int64_t pipe(io::InputStream& src, io::OutputStream& dst) {
inline io::IoResult pipe(io::InputStream& src, io::OutputStream& dst) {
std::array<std::byte, utils::configuration::DEFAULT_BUFFER_SIZE> buffer{};
size_t totalTransferred = 0;
uint64_t totalTransferred = 0;
while (true) {
const auto readRet = src.read(buffer);
if (io::isError(readRet)) return -1;
if (io::isError(readRet)) return io::IoResult::error();
if (readRet == 0) break;
auto remaining = readRet;
size_t transferred = 0;
Expand All @@ -48,14 +48,14 @@ inline int64_t pipe(io::InputStream& src, io::OutputStream& dst) {
// - the number of bytes read or
// - the number of bytes wrote
if (io::isError(writeRet)) {
return -1;
return io::IoResult::error();
}
transferred += writeRet;
remaining -= writeRet;
}
totalTransferred += transferred;
}
return gsl::narrow<int64_t>(totalTransferred);
return io::IoResult::fromSizeT(totalTransferred);
}

} // namespace internal
Expand All @@ -64,7 +64,7 @@ class InputStreamPipe {
public:
explicit InputStreamPipe(io::OutputStream& output) : output_(&output) {}

int64_t operator()(const std::shared_ptr<io::InputStream>& stream) const {
io::IoResult operator()(const std::shared_ptr<io::InputStream>& stream) const {
return internal::pipe(*stream, *output_);
}

Expand All @@ -76,7 +76,7 @@ class OutputStreamPipe {
public:
explicit OutputStreamPipe(io::InputStream& input) : input_(&input) {}

int64_t operator()(const std::shared_ptr<io::OutputStream>& stream) const {
io::IoResult operator()(const std::shared_ptr<io::OutputStream>& stream) const {
return internal::pipe(*input_, *stream);
}

Expand Down
11 changes: 6 additions & 5 deletions core-framework/include/utils/ByteArrayCallback.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@
*/
#pragma once

#include <condition_variable>
#include <memory>
#include <string>
#include <vector>
#include <utility>
#include <condition_variable>
#include <vector>

#include "concurrentqueue.h"
#include "core/logging/LoggerFactory.h"
#include "minifi-cpp/utils/gsl.h"
#include "minifi-cpp/io/InputStream.h"
#include "minifi-cpp/io/StreamCallback.h"
#include "minifi-cpp/utils/gsl.h"

namespace org::apache::nifi::minifi::utils {

Expand All @@ -36,15 +37,15 @@ class ByteInputCallback {
public:
virtual ~ByteInputCallback() = default;

virtual int64_t operator()(const std::shared_ptr<io::InputStream>& stream) {
virtual io::IoResult operator()(const std::shared_ptr<io::InputStream>& stream) {
stream->seek(0);

if (stream->size() > 0) {
vec.resize(stream->size());
stream->read(vec);
}

return gsl::narrow<int64_t>(vec.size());
return io::IoResult::fromSizeT(vec.size());
}

virtual void close() { }
Expand Down
21 changes: 11 additions & 10 deletions core-framework/include/utils/JsonCallback.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,19 @@ namespace org::apache::nifi::minifi::utils {
class JsonInputCallback {
public:
explicit JsonInputCallback(rapidjson::Document& document) : document_(document) {}
int64_t operator()(const std::shared_ptr<io::InputStream>& stream) {
io::IoResult operator()(const std::shared_ptr<io::InputStream>& stream) {
std::string content;
content.resize(stream->size());
const auto read_ret = stream->read(as_writable_bytes(std::span(content)));
const size_t read_ret = stream->read(as_writable_bytes(std::span(content)));
if (io::isError(read_ret)) {
return -1;
return io::IoResult::error();
}
rapidjson::ParseResult parse_result = document_.Parse<rapidjson::kParseStopWhenDoneFlag>(content.data());
if (parse_result.IsError())
return -1;
if (parse_result.IsError()) {
return io::IoResult::error();
}

return read_ret;
return io::IoResult::fromSizeT(read_ret);
}
private:
rapidjson::Document& document_;
Expand All @@ -54,14 +55,14 @@ class JsonOutputCallback {
explicit JsonOutputCallback(rapidjson::Document&& root, std::optional<uint8_t> decimal_places)
: root_(std::move(root)), decimal_places_(decimal_places) {}

int64_t operator()(const std::shared_ptr<io::OutputStream>& stream) const {
io::IoResult operator()(const std::shared_ptr<io::OutputStream>& stream) const {
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
if (decimal_places_.has_value())
writer.SetMaxDecimalPlaces(decimal_places_.value());
root_.Accept(writer);
const auto write_return = stream->write(reinterpret_cast<const uint8_t*>(buffer.GetString()), buffer.GetSize());
return !io::isError(write_return) ? gsl::narrow<int64_t>(write_return) : -1;
return io::IoResult::fromSizeT(write_return);
}

protected:
Expand All @@ -74,14 +75,14 @@ class PrettyJsonOutputCallback {
explicit PrettyJsonOutputCallback(rapidjson::Document&& root, std::optional<uint8_t> decimal_places)
: root_(std::move(root)), decimal_places_(decimal_places) {}

int64_t operator()(const std::shared_ptr<io::OutputStream>& stream) const {
io::IoResult operator()(const std::shared_ptr<io::OutputStream>& stream) const {
rapidjson::StringBuffer buffer;
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(buffer);
if (decimal_places_.has_value())
writer.SetMaxDecimalPlaces(decimal_places_.value());
root_.Accept(writer);
const auto write_return = stream->write(reinterpret_cast<const uint8_t*>(buffer.GetString()), buffer.GetSize());
return !io::isError(write_return) ? gsl::narrow<int64_t>(write_return) : -1;
return io::IoResult::fromSizeT(write_return);
}

protected:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class LineByLineInputOutputStreamCallback {
public:
using CallbackType = std::function<std::string(const std::string& input_line, bool is_first_line, bool is_last_line)>;
explicit LineByLineInputOutputStreamCallback(CallbackType callback);
std::optional<io::ReadWriteResult> operator()(const std::shared_ptr<io::InputStream>& input, const std::shared_ptr<io::OutputStream>& output);
io::ReadWriteResult operator()(const std::shared_ptr<io::InputStream>& input, const std::shared_ptr<io::OutputStream>& output);

private:
int64_t readInput(io::InputStream& stream);
Expand Down
19 changes: 9 additions & 10 deletions core-framework/src/utils/LineByLineInputOutputStreamCallback.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,37 +26,36 @@ LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(Callbac
: callback_(std::move(callback)) {
}

std::optional<io::ReadWriteResult> LineByLineInputOutputStreamCallback::operator()(const std::shared_ptr<io::InputStream>& input, const std::shared_ptr<io::OutputStream>& output) {
io::ReadWriteResult LineByLineInputOutputStreamCallback::operator()(const std::shared_ptr<io::InputStream>& input, const std::shared_ptr<io::OutputStream>& output) {
gsl_Expects(input);
gsl_Expects(output);

io::ReadWriteResult result;

if (int64_t status = readInput(*input); status <= 0) {
if (const int64_t status = readInput(*input); status <= 0) {
if (status < 0) {
return std::nullopt;
return io::ReadWriteResult::error();
}
return result;
return io::ReadWriteResult::zero();
}

result.bytes_read = gsl::narrow<int64_t>(input_.size());
const uint64_t bytes_read = input_.size();

std::size_t total_bytes_written = 0;
uint64_t total_bytes_written = 0;
bool is_first_line = true;
readLine();
do {
readLine();
std::string output_line = callback_(*current_line_, is_first_line, isLastLine());
const auto bytes_written = output->write(reinterpret_cast<const uint8_t *>(output_line.data()), output_line.size());
if (io::isError(bytes_written)) {
return std::nullopt;
return io::ReadWriteResult::error();
}
total_bytes_written += bytes_written;
is_first_line = false;
} while (!isLastLine());

result.bytes_written = gsl::narrow<int64_t>(total_bytes_written);
return result;

return { bytes_read, total_bytes_written };
}

int64_t LineByLineInputOutputStreamCallback::readInput(io::InputStream& stream) {
Expand Down
44 changes: 26 additions & 18 deletions extension-framework/cpp-extension-lib/src/core/ProcessSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

#include "api/core/ProcessSession.h"

#include "api/core/FlowFile.h"
#include "api/utils/minifi-c-utils.h"
#include "io/InputStream.h"
#include "io/OutputStream.h"
#include "api/utils/minifi-c-utils.h"
#include "api/core/FlowFile.h"
#include "minifi-cpp/Exception.h"

namespace org::apache::nifi::minifi::api::core {
Expand Down Expand Up @@ -91,21 +91,29 @@ void ProcessSession::remove(FlowFile ff) {
}

void ProcessSession::write(FlowFile& flow_file, const io::OutputStreamCallback& callback) {
const auto status = MinifiProcessSessionWrite(impl_, flow_file.get(), [] (void* data, MinifiOutputStream* output) {
return (*static_cast<const io::OutputStreamCallback*>(data))(std::make_shared<MinifiOutputStreamWrapper>(output));
}, const_cast<io::OutputStreamCallback*>(&callback));
if (status != MINIFI_STATUS_SUCCESS) {
throw minifi::Exception(minifi::FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
}
const auto status = MinifiProcessSessionWrite(
impl_,
flow_file.get(),
[](void* data, MinifiOutputStream* output) -> int64_t {
const auto result =
(*static_cast<const io::OutputStreamCallback*>(data))(std::make_shared<MinifiOutputStreamWrapper>(output));
return result.toI64();
},
const_cast<io::OutputStreamCallback*>(&callback));
if (status != MINIFI_STATUS_SUCCESS) { throw minifi::Exception(minifi::FILE_OPERATION_EXCEPTION, "Failed to process flowfile content"); }
}

void ProcessSession::read(FlowFile& flow_file, const io::InputStreamCallback& callback) {
const auto status = MinifiProcessSessionRead(impl_, flow_file.get(), [] (void* data, MinifiInputStream* input) {
return (*static_cast<const io::InputStreamCallback*>(data))(std::make_shared<MinifiInputStreamWrapper>(input));
}, const_cast<io::InputStreamCallback*>(&callback));
if (status != MINIFI_STATUS_SUCCESS) {
throw minifi::Exception(minifi::FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
}
const auto status = MinifiProcessSessionRead(
impl_,
flow_file.get(),
[](void* data, MinifiInputStream* input) -> int64_t {
const auto result =
(*static_cast<const io::InputStreamCallback*>(data))(std::make_shared<MinifiInputStreamWrapper>(input));
return result.toI64();
},
const_cast<io::InputStreamCallback*>(&callback));
if (status != MINIFI_STATUS_SUCCESS) { throw minifi::Exception(minifi::FILE_OPERATION_EXCEPTION, "Failed to process flowfile content"); }
}

void ProcessSession::setAttribute(FlowFile& ff, const std::string_view key, std::string value) { // NOLINT(performance-unnecessary-value-param)
Expand Down Expand Up @@ -142,18 +150,18 @@ void ProcessSession::writeBuffer(FlowFile& flow_file, std::span<const char> buff
}

void ProcessSession::writeBuffer(FlowFile& flow_file, std::span<const std::byte> buffer) {
write(flow_file, [buffer](const std::shared_ptr<io::OutputStream>& output_stream) {
write(flow_file, [buffer](const std::shared_ptr<io::OutputStream>& output_stream) -> io::IoResult {
const auto write_status = output_stream->write(buffer);
return io::isError(write_status) ? -1 : gsl::narrow<int64_t>(write_status);
return io::IoResult::fromSizeT(write_status);
});
}

std::vector<std::byte> ProcessSession::readBuffer(FlowFile& flow_file) {
std::vector<std::byte> result;
read(flow_file, [&result](const std::shared_ptr<io::InputStream>& input_stream) {
read(flow_file, [&result](const std::shared_ptr<io::InputStream>& input_stream) -> io::IoResult {
result.resize(input_stream->size());
const auto read_status = input_stream->read(result);
return io::isError(read_status) ? -1 : gsl::narrow<int64_t>(read_status);
return io::IoResult::fromSizeT(read_status);
});
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ class FlowFile;

class FlowFileSerializer {
public:
using FlowFileReader = std::function<int64_t(const std::shared_ptr<core::FlowFile>&, const io::InputStreamCallback&)>;
using FlowFileReader = std::function<io::IoResult(const std::shared_ptr<core::FlowFile>&, const io::InputStreamCallback&)>;

explicit FlowFileSerializer(FlowFileReader reader) : reader_(std::move(reader)) {}

virtual int64_t serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) = 0;
virtual io::IoResult serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) = 0;

virtual ~FlowFileSerializer() = default;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class FlowFileV3Serializer : public FlowFileSerializer {
public:
using FlowFileSerializer::FlowFileSerializer;

int64_t serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) override;
io::IoResult serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) override;
};

} // namespace org::apache::nifi::minifi
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class PayloadSerializer : public FlowFileSerializer {
public:
using FlowFileSerializer::FlowFileSerializer;

int64_t serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) override;
io::IoResult serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) override;
};

} // namespace org::apache::nifi::minifi
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace org::apache::nifi::minifi::utils {
class FileReaderCallback {
public:
explicit FileReaderCallback(std::filesystem::path file_path, size_t buffer_size);
int64_t operator()(const std::shared_ptr<io::OutputStream>& output_stream) const;
io::IoResult operator()(const std::shared_ptr<io::OutputStream>& output_stream) const;

private:
std::filesystem::path file_path_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class FileWriterCallback {
public:
explicit FileWriterCallback(std::filesystem::path dest_path);
~FileWriterCallback();
int64_t operator()(const std::shared_ptr<io::InputStream>& stream);
io::IoResult operator()(const std::shared_ptr<io::InputStream>& stream);
bool commit();


Expand Down
Loading
Loading