Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 115 additions & 1 deletion pkg/util/metrics_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math"
"strings"
"sync"
"time"

"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -460,7 +461,69 @@ func (s *SummaryData) Metric(desc *prometheus.Desc, labelValues ...string) prome
type HistogramData struct {
sampleCount uint64
sampleSum float64
buckets map[float64]uint64

buckets map[float64]uint64

// Native histogram fields
nativeHistogram bool
Schema int32
ZeroThreshold float64
ZeroCount uint64
PositiveBuckets map[int]int64 // bucket index -> count
NegativeBuckets map[int]int64
}

// A histogram is considered native if it has positive/negative spans or a non-zero zero bucket.
func isNative(histo *dto.Histogram) bool {
return len(histo.GetPositiveSpan()) > 0 ||
len(histo.GetNegativeSpan()) > 0 ||
histo.GetZeroCount() > 0 ||
histo.GetZeroThreshold() > 0
}

func (d *HistogramData) hasNative() bool {
return d.ZeroCount > 0 ||
len(d.PositiveBuckets) > 0 ||
len(d.NegativeBuckets) > 0
}

func spansCountsToBucketMap(spans []*dto.BucketSpan, counts []int64) map[int]int64 {
if len(spans) == 0 {
return nil
}
bucketMap := make(map[int]int64, len(counts))
var idx int32
bucketIdx := 0
for _, sp := range spans {
idx += sp.GetOffset()
for j := 0; j < int(sp.GetLength()) && bucketIdx < len(counts); j++ {
bucketMap[int(idx)] += counts[bucketIdx]
idx++
bucketIdx++
}
}
return bucketMap
}

func deltasToCountsInt(deltas []int64) []int64 {
counts := make([]int64, len(deltas))
var cur int64
for i, d := range deltas {
cur += int64(d)
counts[i] = cur
}
return counts
}

// mergeBucketMaps merges src bucket map into dst bucket map by summing counts for each bucket index.
func mergeBucketMaps(dst, src map[int]int64) map[int]int64 {
if dst == nil {
dst = make(map[int]int64)
}
for idx, count := range src {
dst[idx] += count
}
return dst
}

// AddHistogram adds histogram from gathered metrics to this histogram data.
Expand All @@ -471,6 +534,28 @@ func (d *HistogramData) AddHistogram(histo *dto.Histogram) {
d.sampleCount += histo.GetSampleCount()
d.sampleSum += histo.GetSampleSum()

if isNative(histo) {

// Initialize schema/threshold once
if !d.hasNative() {
d.Schema = histo.GetSchema()
d.ZeroThreshold = histo.GetZeroThreshold()
}
d.ZeroCount += histo.GetZeroCount()

// Decode spans+deltas -> index->population maps and merge.
posCounts := deltasToCountsInt(histo.GetPositiveDelta())
negCounts := deltasToCountsInt(histo.GetNegativeDelta())

posMap := spansCountsToBucketMap(histo.GetPositiveSpan(), posCounts)
negMap := spansCountsToBucketMap(histo.GetNegativeSpan(), negCounts)

d.PositiveBuckets = mergeBucketMaps(d.PositiveBuckets, posMap)
d.NegativeBuckets = mergeBucketMaps(d.NegativeBuckets, negMap)
return
}

// Handle classic histogram
histoBuckets := histo.GetBucket()
if len(histoBuckets) > 0 && d.buckets == nil {
d.buckets = map[float64]uint64{}
Expand All @@ -490,6 +575,18 @@ func (d *HistogramData) AddHistogramData(histo HistogramData) {
d.sampleCount += histo.sampleCount
d.sampleSum += histo.sampleSum

if histo.hasNative() {
if !d.hasNative() {
d.Schema = histo.Schema
d.ZeroThreshold = histo.ZeroThreshold
}
d.ZeroCount += histo.ZeroCount
d.PositiveBuckets = mergeBucketMaps(d.PositiveBuckets, histo.PositiveBuckets)
d.NegativeBuckets = mergeBucketMaps(d.NegativeBuckets, histo.NegativeBuckets)
return
}

// Handle classic histogram
if len(histo.buckets) > 0 && d.buckets == nil {
d.buckets = map[float64]uint64{}
}
Expand All @@ -504,7 +601,24 @@ func (d *HistogramData) AddHistogramData(histo HistogramData) {
//
// Note that returned metric shares bucket with this HistogramData, so avoid
// doing more modifications to this HistogramData after calling Metric.
//
// For native histograms, this returns a metric created via MustNewConstNativeHistogram.
// For classic histograms, this returns a metric created via MustNewConstHistogram.
func (d *HistogramData) Metric(desc *prometheus.Desc, labelValues ...string) prometheus.Metric {
if d.hasNative() {
return prometheus.MustNewConstNativeHistogram(
desc,
d.sampleCount,
d.sampleSum,
d.PositiveBuckets,
d.NegativeBuckets,
d.ZeroCount,
d.Schema,
d.ZeroThreshold,
time.Time{}, // No timestamp for aggregated histograms
labelValues...,
)
}
return prometheus.MustNewConstHistogram(desc, d.sampleCount, d.sampleSum, d.buckets, labelValues...)
}

Expand Down
Loading
Loading