Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Framework/Core/include/Framework/DataRelayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ class DataRelayer

using OnDropCallback = std::function<void(TimesliceSlot, std::vector<MessageSet>&, TimesliceIndex::OldestOutputInfo info)>;

// Callback for when some messages are about to be owned by the the DataRelayer
using OnInsertionCallback = std::function<void(ServiceRegistryRef&, std::span<fair::mq::MessagePtr>&)>;

/// Prune all the pending entries in the cache.
void prunePending(OnDropCallback);
/// Prune the cache for a given slot
Expand All @@ -135,6 +138,7 @@ class DataRelayer
InputInfo const& info,
size_t nMessages,
size_t nPayloads = 1,
OnInsertionCallback onInsertion = nullptr,
OnDropCallback onDrop = nullptr);

/// This is to set the oldest possible @a timeslice this relayer can
Expand Down
1 change: 1 addition & 0 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1859,6 +1859,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
input,
nMessages,
nPayloadsPerHeader,
nullptr,
onDrop);
switch (relayed.type) {
case DataRelayer::RelayChoice::Type::Backpressured:
Expand Down
4 changes: 1 addition & 3 deletions Framework/Core/src/DataProcessingHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -343,9 +343,7 @@ auto DataProcessingHelpers::routeForwardedMessageSet(FairMQDeviceProxy& proxy,
const bool copyByDefault, bool consume) -> std::vector<fair::mq::Parts>
{
// we collect all messages per forward in a map and send them together
std::vector<fair::mq::Parts> forwardedParts;
forwardedParts.resize(proxy.getNumForwards());
std::vector<ChannelIndex> forwardingChoices{};
std::vector<fair::mq::Parts> forwardedParts(proxy.getNumForwardChannels());

for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
auto span = std::span<fair::mq::MessagePtr>(currentSetOfInputs[ii].messages);
Expand Down
10 changes: 8 additions & 2 deletions Framework/Core/src/DataRelayer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,8 @@ DataRelayer::RelayChoice
InputInfo const& info,
size_t nMessages,
size_t nPayloads,
std::function<void(TimesliceSlot, std::vector<MessageSet>&, TimesliceIndex::OldestOutputInfo)> onDrop)
OnInsertionCallback onInsertion,
OnDropCallback onDrop)
{
std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
DataProcessingHeader const* dph = o2::header::get<DataProcessingHeader*>(rawHeader);
Expand Down Expand Up @@ -482,6 +483,7 @@ DataRelayer::RelayChoice
&messages,
&nMessages,
&nPayloads,
&onInsertion,
&cache = mCache,
&services = mContext,
numInputTypes = mDistinctRoutesIndex.size()](TimesliceId timeslice, int input, TimesliceSlot slot, InputInfo const& info) -> size_t {
Expand Down Expand Up @@ -512,7 +514,11 @@ DataRelayer::RelayChoice
mi += nPayloads;
continue;
}
target.add([&messages, &mi](size_t i) -> fair::mq::MessagePtr& { return messages[mi + i]; }, nPayloads + 1);
auto span = std::span<fair::mq::MessagePtr>(messages + mi, messages + mi + nPayloads + 1);
if (onInsertion) {
onInsertion(services, span);
}
target.add([&span](size_t i) -> fair::mq::MessagePtr& { return span[i]; }, nPayloads + 1);
mi += nPayloads;
saved += nPayloads;
}
Expand Down
74 changes: 74 additions & 0 deletions Framework/Core/test/test_ForwardInputs.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,80 @@ TEST_CASE("ForwardInputsSplitPayload")
CHECK(result[1].Size() == 3);
}

TEST_CASE("ForwardInputsSplitPayloadNoMessageSet")
{
o2::header::DataHeader dh;
dh.dataOrigin = "TST";
dh.dataDescription = "A";
dh.subSpecification = 0;
dh.splitPayloadIndex = 2;
dh.splitPayloadParts = 2;

o2::header::DataHeader dh2;
dh2.dataOrigin = "TST";
dh2.dataDescription = "B";
dh2.subSpecification = 0;
dh2.splitPayloadIndex = 0;
dh2.splitPayloadParts = 1;

o2::framework::DataProcessingHeader dph{0, 1};

std::vector<fair::mq::Channel> channels{
fair::mq::Channel("from_A_to_B"),
fair::mq::Channel("from_A_to_C"),
};

bool consume = true;
bool copyByDefault = true;
FairMQDeviceProxy proxy;
std::vector<ForwardRoute> routes{
ForwardRoute{
.timeslice = 0,
.maxTimeslices = 1,
.matcher = {"binding", ConcreteDataMatcher{"TST", "B", 0}},
.channel = "from_A_to_B",
.policy = nullptr,
},
ForwardRoute{
.timeslice = 0,
.maxTimeslices = 1,
.matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
.channel = "from_A_to_C",
.policy = nullptr,
}};

auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
for (auto& channel : channels) {
if (channel.GetName() == channelName) {
return channel;
}
}
throw std::runtime_error("Channel not found");
};

proxy.bind({}, {}, routes, findChannelByName, nullptr);

auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
fair::mq::MessagePtr payload1(transport->CreateMessage());
fair::mq::MessagePtr payload2(transport->CreateMessage());
auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
std::vector<std::unique_ptr<fair::mq::Message>> messages;
messages.push_back(std::move(header));
messages.push_back(std::move(payload1));
messages.push_back(std::move(payload2));
auto header2 = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh2, dph});
messages.push_back(std::move(header2));
messages.push_back(transport->CreateMessage());

std::vector<fair::mq::Parts> result(2);
auto span = std::span(messages);
o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, span, result, copyByDefault, consume);
REQUIRE(result.size() == 2); // Two routes
CHECK(result[0].Size() == 2); // No messages on this route
CHECK(result[1].Size() == 3);
}

TEST_CASE("ForwardInputEOSSingleRoute")
{
o2::framework::SourceInfoHeader sih{};
Expand Down