Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
90d0a34
changed logic of IntervalDecodeIterator
cherep58 Dec 3, 2025
fcd61e5
add longterm
u-veles-a Dec 3, 2025
31fc7a6
added downsamplingMs parameter into Go-bindings
cherep58 Dec 3, 2025
4ee8815
remove lookbackDelta
u-veles-a Dec 3, 2025
664394d
added GO-test for downsampling
cherep58 Dec 4, 2025
421bab3
Merge branch 'downsampling' of https://github.com/deckhouse/prompp in…
cherep58 Dec 4, 2025
70c874e
optimized IntervalDecodeIterator
cherep58 Dec 4, 2025
1c4dbcd
renamed IntervalDecodeIterator to DownsamplingDecodeIterator
cherep58 Dec 5, 2025
bba84b7
optimized DownsamplingDecodeIterator
cherep58 Dec 5, 2025
88eae42
review fixes
cherep58 Dec 5, 2025
f24c038
review fixes
cherep58 Dec 8, 2025
5d5d628
Merge branch 'pp' of https://github.com/deckhouse/prompp into downsam…
cherep58 Dec 8, 2025
e2f38d7
removed std::assignable_from from AssignableFromUniversaleDecodeItera…
cherep58 Dec 8, 2025
f8aa910
Merge branch 'pp' of https://github.com/deckhouse/prompp into downsam…
cherep58 Dec 25, 2025
5087d19
fix after merge
u-veles-a Dec 26, 2025
b6509c2
Merge branch 'pp' into downsampling
u-veles-a Dec 26, 2025
97e937e
Merge branch 'pp' of https://github.com/deckhouse/prompp into downsam…
cherep58 Dec 29, 2025
03cc0fb
Merge branch 'pp' into downsampling
cherep58 Dec 30, 2025
28aab09
longterm blockwriter
u-veles-a Jan 12, 2026
b52a21b
fix
u-veles-a Jan 13, 2026
ab463e2
Merge branch 'pp' into downsampling
u-veles-a Jan 13, 2026
5006155
added downsampling feature in ChunkRecoder
cherep58 Jan 13, 2026
77c5ab1
Merge branch 'pp' of https://github.com/deckhouse/prompp into downsam…
cherep58 Jan 13, 2026
f1c7b28
fix compactor
u-veles-a Jan 13, 2026
858d5cf
fixed compilation error
cherep58 Jan 13, 2026
2227a87
fixed compilation error
cherep58 Jan 13, 2026
3c13156
Merge branch 'pp' into downsampling
u-veles-a Jan 26, 2026
fcf69b9
fix after merge
u-veles-a Jan 27, 2026
b9a0b12
Merge branch 'pp' into downsampling
cherep58 Feb 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
561 changes: 542 additions & 19 deletions cmd/prometheus/main.go

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion cmd/prompptool/walpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"context"
"errors"
"fmt"
"github.com/prometheus/prometheus/pp/go/cppbridge"
"os"
"path/filepath"
"time"

"github.com/prometheus/prometheus/pp/go/cppbridge"

"github.com/alecthomas/kingpin/v2"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -84,7 +85,9 @@ func (cmd *cmdWALPPToBlock) Do(

bw := block.NewWriter[*shard.Shard](
workingDir,
"",
block.DefaultChunkSegmentSize,
cppbridge.NoDownsampling,
time.Duration(cmd.blockDuration),
registerer,
)
Expand Down
2 changes: 1 addition & 1 deletion pp-pkg/rules/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2419,7 +2419,7 @@ func makeManager(
},
clock,
headCatalog,
pp_storage.NewTriggerNotifier(),
pp_storage.NewMultiTriggerNotifier(pp_storage.NewTriggerNotifier()),
pp_storage.NewTriggerNotifier(),
&mock.ReadyNotifierMock{NotifyReadyFunc: func() {}},
prometheus.DefaultRegisterer,
Expand Down
90 changes: 83 additions & 7 deletions pp-pkg/storage/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Adapter struct {
hashdexLimits cppbridge.WALHashdexLimits
transparentState *cppbridge.StateV2
mergeOutOfOrderChunks func()
longtermIntervalMs int64

// stat
activeQuerierMetrics *querier.Metrics
Expand All @@ -42,13 +43,56 @@ type Adapter struct {
samplesAppended prometheus.Counter
}

// NewAdapter init new [Adapter].
// NewAdapter init new main [Adapter].
func NewAdapter(
clock clockwork.Clock,
proxy *pp_storage.Proxy,
builder *pp_storage.Builder,
mergeOutOfOrderChunks func(),
registerer prometheus.Registerer,
) *Adapter {
return newAdapter(
clock,
proxy,
builder,
mergeOutOfOrderChunks,
0,
querier.QueryableAppenderSource,
querier.QueryableStorageSource,
registerer,
)
}

// NewLongtermAdapter init new longterm [Adapter].
func NewLongtermAdapter(
clock clockwork.Clock,
proxy *pp_storage.Proxy,
builder *pp_storage.Builder,
mergeOutOfOrderChunks func(),
longtermIntervalMs int64,
registerer prometheus.Registerer,
) *Adapter {
return newAdapter(
clock,
proxy,
builder,
mergeOutOfOrderChunks,
longtermIntervalMs,
querier.QueryableLongtermAppenderSource,
querier.QueryableLongtermStorageSource,
registerer,
)
}

// newAdapter init new [Adapter].
func newAdapter(
clock clockwork.Clock,
proxy *pp_storage.Proxy,
builder *pp_storage.Builder,
mergeOutOfOrderChunks func(),
longtermIntervalMs int64,
activeSource, storageSource string,
registerer prometheus.Registerer,
) *Adapter {
factory := util.NewUnconflictRegisterer(registerer)
return &Adapter{
Expand All @@ -59,8 +103,9 @@ func NewAdapter(
hashdexLimits: cppbridge.DefaultWALHashdexLimits(),
transparentState: cppbridge.NewTransitionStateV2(),
mergeOutOfOrderChunks: mergeOutOfOrderChunks,
activeQuerierMetrics: querier.NewMetrics(registerer, querier.QueryableAppenderSource),
storageQuerierMetrics: querier.NewMetrics(registerer, querier.QueryableStorageSource),
longtermIntervalMs: longtermIntervalMs,
activeQuerierMetrics: querier.NewMetrics(registerer, activeSource),
storageQuerierMetrics: querier.NewMetrics(registerer, storageSource),
appendDuration: factory.NewHistogram(
prometheus.HistogramOpts{
Name: "prompp_adapter_append_duration",
Expand Down Expand Up @@ -229,7 +274,14 @@ func (ar *Adapter) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error)
ahead := ar.proxy.Get()
queriers = append(
queriers,
querier.NewChunkQuerier(ahead, querier.NewNoOpShardedDeduplicator, mint, maxt, nil),
querier.NewChunkQuerier(
ahead,
querier.NewNoOpShardedDeduplicator,
mint,
maxt,
ar.longtermIntervalMs,
nil,
),
)

for _, head := range ar.proxy.Heads() {
Expand All @@ -239,7 +291,14 @@ func (ar *Adapter) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error)

queriers = append(
queriers,
querier.NewChunkQuerier(head, querier.NewNoOpShardedDeduplicator, mint, maxt, nil),
querier.NewChunkQuerier(
head,
querier.NewNoOpShardedDeduplicator,
mint,
maxt,
ar.longtermIntervalMs,
nil,
),
)
}

Expand All @@ -264,6 +323,7 @@ func (ar *Adapter) HeadQuerier(mint, maxt int64) (storage.Querier, error) {
querier.NewNoOpShardedDeduplicator,
mint,
maxt,
ar.longtermIntervalMs,
nil,
ar.activeQuerierMetrics,
), nil
Expand Down Expand Up @@ -291,7 +351,15 @@ func (ar *Adapter) Querier(mint, maxt int64) (storage.Querier, error) {
ahead := ar.proxy.Get()
queriers = append(
queriers,
querier.NewQuerier(ahead, querier.NewNoOpShardedDeduplicator, mint, maxt, nil, ar.activeQuerierMetrics),
querier.NewQuerier(
ahead,
querier.NewNoOpShardedDeduplicator,
mint,
maxt,
ar.longtermIntervalMs,
nil,
ar.activeQuerierMetrics,
),
)

for _, head := range ar.proxy.Heads() {
Expand All @@ -306,7 +374,15 @@ func (ar *Adapter) Querier(mint, maxt int64) (storage.Querier, error) {

queriers = append(
queriers,
querier.NewQuerier(head, querier.NewNoOpShardedDeduplicator, mint, maxt, nil, ar.storageQuerierMetrics),
querier.NewQuerier(
head,
querier.NewNoOpShardedDeduplicator,
mint,
maxt,
ar.longtermIntervalMs,
nil,
ar.storageQuerierMetrics,
),
)
}

Expand Down
10 changes: 9 additions & 1 deletion pp-pkg/storage/batch_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,13 @@ func (bs *BatchStorage) CommitWithState(ctx context.Context, state *cppbridge.St

// Querier calls f() with the given parameters. Returns a [querier.Querier].
func (bs *BatchStorage) Querier(mint, maxt int64) (storage.Querier, error) {
return querier.NewQuerier(bs.transactionHead, querier.NewNoOpShardedDeduplicator, mint, maxt, nil, nil), nil
return querier.NewQuerier(
bs.transactionHead,
querier.NewNoOpShardedDeduplicator,
mint,
maxt,
bs.adapter.longtermIntervalMs,
nil,
nil,
), nil
}
1 change: 1 addition & 0 deletions pp-pkg/tsdb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tsdb

import (
"context"

"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
Expand Down
2 changes: 1 addition & 1 deletion pp/entrypoint/go_constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#define Sizeof_InnerSeries (Sizeof_SizeT + Sizeof_BareBonesVector + Sizeof_RoaringBitset)
#define Sizeof_GoLabels 16

#define Sizeof_SerializedDataIterator 192
#define Sizeof_SerializedDataIterator 200

#define Sizeof_MetricsIterator 24

Expand Down
16 changes: 12 additions & 4 deletions pp/entrypoint/head/serialization.h
Original file line number Diff line number Diff line change
@@ -1,27 +1,35 @@
#pragma once

#include "series_data/decoder/decorator/downsampling_decode_iterator.h"
#include "series_data/serialization/serialized_data.h"

namespace entrypoint::head {

using DecodeIterator = series_data::decoder::decorator::DownsamplingDecodeIterator<series_data::decoder::UniversalDecodeIterator>;
using SerializedDataIterator = series_data::serialization::SerializedDataView::SeriesIterator<DecodeIterator>;

class SerializedDataGo {
public:
explicit SerializedDataGo(const series_data::DataStorage& storage, const series_data::querier::QueriedChunkList& queried_chunks)
: data_{series_data::serialization::DataSerializer{storage}.serialize(queried_chunks)} {}
explicit SerializedDataGo(const series_data::DataStorage& storage,
const series_data::querier::QueriedChunkList& queried_chunks,
PromPP::Primitives::Timestamp downsampling_ms)
: data_{series_data::serialization::DataSerializer{storage}.serialize(queried_chunks)}, downsampling_ms_(downsampling_ms) {}

[[nodiscard]] PROMPP_ALWAYS_INLINE auto get_buffer_view() const noexcept { return data_view_.get_buffer_view(); }
[[nodiscard]] PROMPP_ALWAYS_INLINE auto get_chunks_view() const noexcept { return data_view_.get_chunks_view(); }

[[nodiscard]] PROMPP_ALWAYS_INLINE auto next() noexcept { return data_view_.next_series(); }
[[nodiscard]] PROMPP_ALWAYS_INLINE auto iterator(uint32_t chunk_id) const noexcept { return data_view_.create_series_iterator(chunk_id); }
[[nodiscard]] PROMPP_ALWAYS_INLINE SerializedDataIterator iterator(uint32_t chunk_id) const noexcept {
return data_view_.create_series_iterator<DecodeIterator>(chunk_id, DecodeIterator(downsampling_ms_));
}

private:
series_data::serialization::SerializedData data_;
series_data::serialization::SerializedDataView data_view_{data_};
PromPP::Primitives::Timestamp downsampling_ms_{};
};

using SerializedDataPtr = std::unique_ptr<SerializedDataGo>;
using SerializedDataIterator = series_data::serialization::SerializedDataView::SeriesIterator;

static_assert(sizeof(SerializedDataPtr) == sizeof(void*));

Expand Down
10 changes: 7 additions & 3 deletions pp/entrypoint/series_data/querier.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,11 @@ class RangeQuerierWithArgumentsWrapperV2 {
using BytesStream = PromPP::Primitives::Go::BytesStream;

public:
RangeQuerierWithArgumentsWrapperV2(DataStorage& storage, const Query& query, head::SerializedDataPtr* serialized_data)
: querier_(storage), query_(&query), serialized_data_(serialized_data) {}
RangeQuerierWithArgumentsWrapperV2(DataStorage& storage,
const Query& query,
head::SerializedDataPtr* serialized_data,
PromPP::Primitives::Timestamp downsampling_ms)
: querier_(storage), query_(&query), serialized_data_(serialized_data), downsampling_ms_(downsampling_ms) {}

void query() noexcept {
querier_.query(*query_);
Expand All @@ -84,9 +87,10 @@ class RangeQuerierWithArgumentsWrapperV2 {
::series_data::querier::Querier querier_;
const Query* query_;
head::SerializedDataPtr* serialized_data_;
PromPP::Primitives::Timestamp downsampling_ms_;

PROMPP_ALWAYS_INLINE void serialize_chunks() const noexcept {
std::construct_at(serialized_data_, std::make_unique<head::SerializedDataGo>(querier_.get_storage(), querier_.chunks()));
std::construct_at(serialized_data_, std::make_unique<head::SerializedDataGo>(querier_.get_storage(), querier_.chunks(), downsampling_ms_));
}
};

Expand Down
9 changes: 6 additions & 3 deletions pp/entrypoint/series_data_data_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ extern "C" void prompp_series_data_data_storage_query_v2(void* args, void* res)
struct Arguments {
DataStoragePtr data_storage;
Query query;
PromPP::Primitives::Timestamp downsampling_ms;
};

struct Result {
Expand All @@ -130,7 +131,7 @@ extern "C" void prompp_series_data_data_storage_query_v2(void* args, void* res)
const auto in = static_cast<Arguments*>(args);
const auto out = static_cast<Result*>(res);

RangeQuerierWithArgumentsWrapperV2 querier(*in->data_storage, in->query, out->serialized_data);
RangeQuerierWithArgumentsWrapperV2 querier(*in->data_storage, in->query, out->serialized_data, in->downsampling_ms);
querier.query();

if (querier.need_loading()) {
Expand Down Expand Up @@ -217,6 +218,7 @@ extern "C" void prompp_series_data_chunk_recoder_ctor(void* args, void* res) {
uint32_t ls_id_batch_size;
DataStoragePtr data_storage;
PromPP::Primitives::TimeInterval time_interval;
PromPP::Primitives::Timestamp downsampling_ms;
};
struct Result {
ChunkRecoderVariantPtr chunk_recoder;
Expand All @@ -228,7 +230,8 @@ extern "C" void prompp_series_data_chunk_recoder_ctor(void* args, void* res) {
new (res) Result{
.chunk_recoder = std::make_unique<ChunkRecoderVariant>(
std::in_place_type<ChunkRecoder>,
ChunkRecoderIterator{ls_id_set.begin(), ls_id_set.end(), in->ls_id_batch_size, in->data_storage.get(), in->time_interval}, in->time_interval),
ChunkRecoderIterator{ls_id_set.begin(), ls_id_set.end(), in->ls_id_batch_size, in->data_storage.get(), in->time_interval}, in->time_interval,
in->downsampling_ms),
};
}

Expand All @@ -246,7 +249,7 @@ extern "C" void prompp_series_data_serialized_chunk_recoder_ctor(void* args, voi
.chunk_recoder = std::make_unique<ChunkRecoderVariant>(
std::in_place_type<SerializedChunkRecoder>,
series_data::chunk::SerializedChunkIterator{in->serialized_data->get()->get_buffer_view(), in->serialized_data->get()->get_chunks_view()},
in->time_interval),
in->time_interval, series_data::decoder::decorator::kNoDownsampling),
};
}

Expand Down
8 changes: 5 additions & 3 deletions pp/entrypoint/series_data_data_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,13 @@ void prompp_series_data_data_storage_allocated_memory(void* args, void* res);
* @param args {
* dataStorage uintptr // pointer to constructed data storage
* query DataStorageQuery // query
* downsamplingMs int64 // downsampling interval in milliseconds (0 - downsampling is disabled)
* }
*
* @param res {
* Querier uintptr // pointer to constructed Querier if data loading is needed.
* querier uintptr // pointer to constructed Querier if data loading is needed.
* // If constructed (!= 0) it must be destroyed by calling prompp_series_data_data_storage_query_final.
* Status uint8 // status of a query (0 - Success, 1 - Data loading is needed)
* status uint8 // status of a query (0 - Success, 1 - Data loading is needed)
* serializedData uintptr // pointer to serialized data
* }
*/
Expand Down Expand Up @@ -150,10 +151,11 @@ void prompp_series_data_data_storage_dtor(void* args);
* lss uintptr // pointer to constructed label sets
* lsIdBatchSize uint32 // size of ls batch for recoding
* dataStorage uintptr // pointer to constructed data storage
* time_interval struct { closed interval [min, max]
* timeInterval struct { // closed interval [min, max]
* min int64
* max int64
* }
* downsamplingMs int64 // downsampling interval in milliseconds (0 - downsampling is disabled)
* }
* @param res {
* chunk_recoder uintptr // pointer to chunk recoder
Expand Down
4 changes: 2 additions & 2 deletions pp/go/cppbridge/data_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ type DataStorageQuery struct {
LabelSetIDs []uint32
}

func (ds *DataStorage) Query(query DataStorageQuery) DataStorageQueryResult {
func (ds *DataStorage) Query(query DataStorageQuery, downsamplingMs int64) DataStorageQueryResult {
sd := NewDataStorageSerializedData()
querier, status := seriesDataDataStorageQueryV2(ds.dataStorage, query, sd)
querier, status := seriesDataDataStorageQueryV2(ds.dataStorage, query, sd, downsamplingMs)
return DataStorageQueryResult{
Querier: querier,
Status: status,
Expand Down
Loading
Loading