Skip to content

Commit f800483

Browse files
committed
DPL: introduce exponential backoff when idle
This should reduce the amount of CPU used by data processors when they are idle while retaining the ability to increase back the rate, should it be needed.
1 parent 174e54b commit f800483

File tree

3 files changed

+30
-2
lines changed

3 files changed

+30
-2
lines changed

Framework/Core/include/Framework/DataProcessingDevice.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ class DataProcessingDevice : public FairMQDevice
8383
uint64_t mLastMetricFlushedTimestamp = 0; /// The timestamp of the last time we actually flushed metrics
8484
uint64_t mBeginIterationTimestamp = 0; /// The timestamp of when the current ConditionalRun was started
8585
DataProcessingStats mStats; /// Stats about the actual data processing.
86+
int mCurrentBackoff = 0; /// The current exponential backoff value.
8687
};
8788

8889
} // namespace o2::framework

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
#include <TMessage.h>
3939
#include <TClonesArray.h>
4040

41+
#include <algorithm>
4142
#include <vector>
4243
#include <memory>
4344

@@ -51,6 +52,13 @@ using DataHeader = o2::header::DataHeader;
5152
constexpr unsigned int MONITORING_QUEUE_SIZE = 100;
5253
constexpr unsigned int MIN_RATE_LOGGING = 60;
5354

55+
// This should result in a minimum of 10Hz which should guarantee we do not use
56+
// much time when idle. We do not sleep at all when we are at less then 100us,
57+
// because that's what the default rate enforces in any case.
58+
constexpr int MAX_BACKOFF = 10;
59+
constexpr int MIN_BACKOFF_DELAY = 100;
60+
constexpr int BACKOFF_DELAY_STEP = 100;
61+
5462
namespace o2::framework
5563
{
5664

@@ -172,6 +180,7 @@ bool DataProcessingDevice::ConditionalRun()
172180
&stats = mStats,
173181
&lastSent = mLastSlowMetricSentTimestamp,
174182
&currentTime = mBeginIterationTimestamp,
183+
&currentBackoff = mCurrentBackoff,
175184
&monitoring = mServiceRegistry.get<Monitoring>()]()
176185
-> void {
177186
if (currentTime - lastSent < 5000) {
@@ -200,6 +209,7 @@ bool DataProcessingDevice::ConditionalRun()
200209
.addTag(Key::Subsystem, Value::DPL));
201210
monitoring.send(Metric{(stats.lastTotalProcessedSize / (stats.lastLatency.maxLatency ? stats.lastLatency.maxLatency : 1) / 1000), "input_rate_mb_s"}
202211
.addTag(Key::Subsystem, Value::DPL));
212+
monitoring.send(Metric{(int)currentBackoff, "current_backoff"}.addTag(Key::Subsystem, Value::DPL));
203213

204214
lastSent = currentTime;
205215
O2_SIGNPOST_END(MonitoringStatus::ID, MonitoringStatus::SEND, 0, 0, O2_SIGNPOST_BLUE);
@@ -267,7 +277,7 @@ bool DataProcessingDevice::ConditionalRun()
267277
if (active == false) {
268278
mServiceRegistry.get<CallbackService>()(CallbackService::Id::Idle);
269279
}
270-
mRelayer.processDanglingInputs(mExpirationHandlers, mServiceRegistry);
280+
active |= mRelayer.processDanglingInputs(mExpirationHandlers, mServiceRegistry);
271281
this->tryDispatchComputation();
272282

273283
sendRelayerMetrics();
@@ -308,6 +318,23 @@ bool DataProcessingDevice::ConditionalRun()
308318
switchState(StreamingState::Idle);
309319
return true;
310320
}
321+
// Update the backoff factor
322+
//
323+
// In principle we should use 1/rate for MIN_BACKOFF_DELAY and (1/maxRate -
324+
// 1/minRate)/ 2^MAX_BACKOFF for BACKOFF_DELAY_STEP. We hardcode the values
325+
// for the moment to some sensible default.
326+
if (active) {
327+
mCurrentBackoff = std::max(0, mCurrentBackoff - 1);
328+
} else {
329+
mCurrentBackoff = std::min(MAX_BACKOFF, mCurrentBackoff + 1);
330+
}
331+
332+
if (mCurrentBackoff != 0) {
333+
auto delay = (rand() % ((1 << mCurrentBackoff) - 1)) * BACKOFF_DELAY_STEP;
334+
if (delay > MIN_BACKOFF_DELAY) {
335+
WaitFor(std::chrono::microseconds(delay - MIN_BACKOFF_DELAY));
336+
}
337+
}
311338
return true;
312339
}
313340

Framework/Core/test/test_CallbackService.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const&)
4848
};
4949
ic.services().get<CallbackService>().set(CallbackService::Id::ClockTick, callback);
5050
return [count](ProcessingContext& ctx) {
51-
if (*count > 1000) {
51+
if (*count > 10) {
5252
ctx.services().get<ControlService>().readyToQuit(QuitRequest::All);
5353
}
5454
};

0 commit comments

Comments
 (0)