diff --git a/pp-pkg/rules/alerting_stuck_test.go b/pp-pkg/rules/alerting_stuck_test.go new file mode 100644 index 0000000000..2e4b9143e1 --- /dev/null +++ b/pp-pkg/rules/alerting_stuck_test.go @@ -0,0 +1,181 @@ +// Copyright 2026 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Tests in this file investigate the "stuck D8ControlPlaneManagerPodNotRunning" +// alert behavior, where an alert keeps firing for hours even after the source +// data clearly indicates it should be inactive. +// +// Each test pins down ONE behavioral assumption underlying our diagnosis. + +package rules + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" + "github.com/stretchr/testify/require" +) + +// scriptedQuery returns a QueryFunc that hands out canned promql.Vector values +// from a script. The i-th call returns script[i]; the timestamp/identity is +// tagged onto each Sample's T so the rule sees consistent evaluation time. +func scriptedQuery(script []promql.Vector) (QueryFunc, func() int) { + var ( + mu sync.Mutex + i int + ) + q := func(_ context.Context, _ string, t time.Time) (promql.Vector, error) { + mu.Lock() + defer mu.Unlock() + if i >= len(script) { + panic("scriptedQuery exhausted") + } + out := make(promql.Vector, len(script[i])) + for j, s := range script[i] { + s.T = t.UnixMilli() + out[j] = s + } + i++ + return out, nil + } + return q, func() int { + mu.Lock() + defer mu.Unlock() + return i + } +} + +func mustExpr(t *testing.T, src string) parser.Expr { + t.Helper() + e, err := parser.ParseExpr(src) + require.NoError(t, err) + return e +} + +// TestAlertingRule_OnePassEmpty_TransitionsToInactive verifies that ONE +// evaluation with an empty query result is enough to move a Firing alert into +// the Inactive state. This is the baseline guarantee for "an alert cannot be +// stuck if the query returns nothing". +func TestAlertingRule_OnePassEmpty_TransitionsToInactive(t *testing.T) { + rule := NewAlertingRule( + "X", mustExpr(t, `up == 0`), + time.Minute, // holdDuration + 0, // keepFiringFor + labels.EmptyLabels(), labels.EmptyLabels(), labels.EmptyLabels(), "", + true, nil, // restored=true → ALERTS series will be emitted + ) + + hot := promql.Vector{{ + Metric: labels.FromStrings("__name__", "up", "job", "x"), + F: 0, + }} + q, _ := scriptedQuery([]promql.Vector{ + hot, // t=0 : pending starts (new entry in r.active) + hot, // t=1m : transitions to firing (holdDuration met) + nil, // t=2m : empty → must transition to Inactive + }) + + t0 := time.Unix(0, 0).UTC() + for step := 0; step < 3; step++ { + ts := t0.Add(time.Duration(step) * time.Minute) + _, err := rule.Eval(context.TODO(), 0, ts, q, nil, 0) + require.NoError(t, err) + } + + require.Len(t, rule.active, 1, "alert entry must still be in r.active (Inactive but kept for resolvedRetention)") + for _, a := range rule.active { + require.Equal(t, StateInactive, a.State, + "after a single empty Eval, an active alert MUST be Inactive — got %s", a.State) + require.False(t, a.ResolvedAt.IsZero(), "ResolvedAt must be set on transition to Inactive") + } +} + +// TestAlertingRule_StaysFiringWhileQueryReturnsSameFingerprint pins down the +// other side of the contract: as long as the query returns a sample with the +// same fingerprint, the alert keeps firing indefinitely. This is exactly the +// behaviour we observe in production — so the only way a stuck alert can exist +// is if query() consistently returns a non-empty result. +func TestAlertingRule_StaysFiringWhileQueryReturnsSameFingerprint(t *testing.T) { + rule := NewAlertingRule( + "X", mustExpr(t, `up == 0`), + time.Minute, 0, + labels.EmptyLabels(), labels.EmptyLabels(), labels.EmptyLabels(), "", + true, nil, + ) + + hot := promql.Vector{{ + Metric: labels.FromStrings("__name__", "up", "job", "x"), + F: 0, + }} + // 100 consecutive non-empty evaluations: the alert MUST stay firing the entire time. + const evals = 100 + script := make([]promql.Vector, evals) + for i := range script { + script[i] = hot + } + q, _ := scriptedQuery(script) + + t0 := time.Unix(0, 0).UTC() + for step := 0; step < evals; step++ { + ts := t0.Add(time.Duration(step) * time.Minute) + _, err := rule.Eval(context.TODO(), 0, ts, q, nil, 0) + require.NoError(t, err) + } + + require.Len(t, rule.active, 1) + for _, a := range rule.active { + require.Equal(t, StateFiring, a.State, + "alert must stay Firing while query keeps returning the same fingerprint") + } +} + +// TestAlertingRule_DroppedFromActiveAfterResolvedRetention verifies the +// 15-minute resolvedRetention window: once the alert is Inactive AND +// resolvedRetention is exceeded, it must be deleted from r.active. +// +// (This eliminates the hypothesis that some glitch could leave a "stale entry" +// sitting in r.active forever — by design, the entry is GC'd after 15min idle.) +func TestAlertingRule_DroppedFromActiveAfterResolvedRetention(t *testing.T) { + rule := NewAlertingRule( + "X", mustExpr(t, `up == 0`), + time.Minute, 0, + labels.EmptyLabels(), labels.EmptyLabels(), labels.EmptyLabels(), "", + true, nil, + ) + + hot := promql.Vector{{ + Metric: labels.FromStrings("__name__", "up", "job", "x"), + F: 0, + }} + // pending(0m), firing(1m), then 20 empty evals one minute apart. + script := []promql.Vector{hot, hot} + for i := 0; i < 20; i++ { + script = append(script, nil) + } + q, _ := scriptedQuery(script) + + t0 := time.Unix(0, 0).UTC() + for step := 0; step < len(script); step++ { + ts := t0.Add(time.Duration(step) * time.Minute) + _, err := rule.Eval(context.TODO(), 0, ts, q, nil, 0) + require.NoError(t, err) + } + + require.Empty(t, rule.active, + "after >resolvedRetention(15min) of empty evaluations, the alert MUST be removed from r.active") +} diff --git a/pp-pkg/rules/control_plane_expr_test.go b/pp-pkg/rules/control_plane_expr_test.go new file mode 100644 index 0000000000..54d706f490 --- /dev/null +++ b/pp-pkg/rules/control_plane_expr_test.go @@ -0,0 +1,164 @@ +// Copyright 2026 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Tests in this file run the EXACT alert expression from +// debug/run/rules/control-plane-manager.yaml +// (D8ControlPlaneManagerPodNotRunning) +// against synthetic timeseries shaped like real kube-state-metrics output, +// to lock down what conditions cause the alert to fire vs. stay silent. +// +// We use the upstream PromQL test storage so this file does NOT depend on +// cppbridge or the prompp-specific querier — it isolates the expression +// semantics layer. + +package rules + +import ( + "context" + "testing" + "time" + + "github.com/prometheus/prometheus/promql/promqltest" + "github.com/stretchr/testify/require" +) + +const cpmExpr = `max by (node) (kube_node_role{role="master"} unless kube_node_role{role="master"}` + + ` * on (node) group_left () ((kube_pod_status_ready{condition="true"} == 1) *` + + ` on (pod, namespace) group_right () kube_controller_pod{controller_name="d8-control-plane-manager",controller_type="DaemonSet",namespace="kube-system"}))` + +// TestCPM_HappyPath_NoFiring: all 3 masters have a healthy DaemonSet pod, all +// labels match, the `unless` cancels everything → no alert series. +func TestCPM_HappyPath_NoFiring(t *testing.T) { + storage := promqltest.LoadedStorage(t, ` +load 1m + kube_node_role{role="master",node="m0",instance="ksm",job="kube-state-metrics"} 1+0x10 + kube_node_role{role="master",node="m1",instance="ksm",job="kube-state-metrics"} 1+0x10 + kube_node_role{role="master",node="m2",instance="ksm",job="kube-state-metrics"} 1+0x10 + kube_pod_status_ready{condition="true",namespace="kube-system",pod="cpm-a",instance="ksm",job="kube-state-metrics"} 1+0x10 + kube_pod_status_ready{condition="true",namespace="kube-system",pod="cpm-b",instance="ksm",job="kube-state-metrics"} 1+0x10 + kube_pod_status_ready{condition="true",namespace="kube-system",pod="cpm-c",instance="ksm",job="kube-state-metrics"} 1+0x10 + kube_controller_pod{controller_name="d8-control-plane-manager",controller_type="DaemonSet",namespace="kube-system",node="m0",pod="cpm-a",job="kube-state-metrics"} 1+0x10 + kube_controller_pod{controller_name="d8-control-plane-manager",controller_type="DaemonSet",namespace="kube-system",node="m1",pod="cpm-b",job="kube-state-metrics"} 1+0x10 + kube_controller_pod{controller_name="d8-control-plane-manager",controller_type="DaemonSet",namespace="kube-system",node="m2",pod="cpm-c",job="kube-state-metrics"} 1+0x10 +`) + t.Cleanup(func() { _ = storage.Close() }) + + ng := testEngine(t) + q := EngineQueryFunc(ng, storage) + res, err := q(context.TODO(), cpmExpr, time.Unix(5*60, 0)) // t=5m + require.NoError(t, err) + require.Empty(t, res, "alert expr must be empty when all masters have healthy pods — got %v", res) +} + +// TestCPM_PodGoneForOneMaster_AlertsForThatMaster: kube_pod_status_ready for +// master-1's pod stops being scraped. Within lookback the right-hand side of +// `unless` loses master-1 → master-1 escapes → expr returns one series. +// +// This is the canonical "cause" of the alert firing: a missing pod_status_ready +// readout for one of the masters. +func TestCPM_PodGoneForOneMaster_AlertsForThatMaster(t *testing.T) { + // We deliberately stop kube_pod_status_ready for cpm-b (master-1) at minute 3 + // (load 1m → 4 points 0,1,2,3). Default lookback is 5m, so at t=10m the + // querier would still return the last point from t=3m as a stale value — + // we therefore evaluate at t=12m, which is well outside the 5m lookback. + storage := promqltest.LoadedStorage(t, ` +load 1m + kube_node_role{role="master",node="m0",instance="ksm",job="kube-state-metrics"} 1+0x20 + kube_node_role{role="master",node="m1",instance="ksm",job="kube-state-metrics"} 1+0x20 + kube_node_role{role="master",node="m2",instance="ksm",job="kube-state-metrics"} 1+0x20 + kube_pod_status_ready{condition="true",namespace="kube-system",pod="cpm-a",instance="ksm",job="kube-state-metrics"} 1+0x20 + kube_pod_status_ready{condition="true",namespace="kube-system",pod="cpm-b",instance="ksm",job="kube-state-metrics"} 1 1 1 1 + kube_pod_status_ready{condition="true",namespace="kube-system",pod="cpm-c",instance="ksm",job="kube-state-metrics"} 1+0x20 + kube_controller_pod{controller_name="d8-control-plane-manager",controller_type="DaemonSet",namespace="kube-system",node="m0",pod="cpm-a",job="kube-state-metrics"} 1+0x20 + kube_controller_pod{controller_name="d8-control-plane-manager",controller_type="DaemonSet",namespace="kube-system",node="m1",pod="cpm-b",job="kube-state-metrics"} 1+0x20 + kube_controller_pod{controller_name="d8-control-plane-manager",controller_type="DaemonSet",namespace="kube-system",node="m2",pod="cpm-c",job="kube-state-metrics"} 1+0x20 +`) + t.Cleanup(func() { _ = storage.Close() }) + + ng := testEngine(t) + q := EngineQueryFunc(ng, storage) + res, err := q(context.TODO(), cpmExpr, time.Unix(12*60, 0)) + require.NoError(t, err) + require.Len(t, res, 1, "exactly one master must escape the unless") + require.Equal(t, "m1", res[0].Metric.Get("node")) +} + +// TestCPM_RecoveryClearsAlert: same as the previous test, but the missing +// kube_pod_status_ready readout reappears at minute 13. By minute 14 the +// expression must again return empty (the alert would transition Inactive in +// AlertingRule.Eval). +// +// This test pins down: "as soon as the data comes back, the expression must +// stop returning the master". If this fails, we have a real engine/data bug. +func TestCPM_RecoveryClearsAlert(t *testing.T) { + storage := promqltest.LoadedStorage(t, ` +load 1m + kube_node_role{role="master",node="m0",instance="ksm",job="kube-state-metrics"} 1+0x20 + kube_node_role{role="master",node="m1",instance="ksm",job="kube-state-metrics"} 1+0x20 + kube_node_role{role="master",node="m2",instance="ksm",job="kube-state-metrics"} 1+0x20 + kube_pod_status_ready{condition="true",namespace="kube-system",pod="cpm-a",instance="ksm",job="kube-state-metrics"} 1+0x20 + kube_pod_status_ready{condition="true",namespace="kube-system",pod="cpm-b",instance="ksm",job="kube-state-metrics"} 1 1 1 1 _ _ _ _ _ _ _ _ _ _ 1 1 1 1 1 1 1 + kube_pod_status_ready{condition="true",namespace="kube-system",pod="cpm-c",instance="ksm",job="kube-state-metrics"} 1+0x20 + kube_controller_pod{controller_name="d8-control-plane-manager",controller_type="DaemonSet",namespace="kube-system",node="m0",pod="cpm-a",job="kube-state-metrics"} 1+0x20 + kube_controller_pod{controller_name="d8-control-plane-manager",controller_type="DaemonSet",namespace="kube-system",node="m1",pod="cpm-b",job="kube-state-metrics"} 1+0x20 + kube_controller_pod{controller_name="d8-control-plane-manager",controller_type="DaemonSet",namespace="kube-system",node="m2",pod="cpm-c",job="kube-state-metrics"} 1+0x20 +`) + t.Cleanup(func() { _ = storage.Close() }) + + ng := testEngine(t) + q := EngineQueryFunc(ng, storage) + + // While the gap is open (t=10m), we expect 1 alerting series. + res, err := q(context.TODO(), cpmExpr, time.Unix(10*60, 0)) + require.NoError(t, err) + require.Len(t, res, 1, "expected 1 alerting series during the gap") + + // After recovery (t=16m, beyond default 5m lookback past the recovered point at 14m), + // the expression MUST be empty again. + res, err = q(context.TODO(), cpmExpr, time.Unix(16*60, 0)) + require.NoError(t, err) + require.Empty(t, res, "after recovery the expression MUST be empty — got %v", res) +} + +// TestCPM_PodReplaced_OldFingerprintMustNotLinger: simulates the production +// scenario where a DaemonSet pod is recreated (UID/pod-name change). The OLD +// pod stops being scraped and the NEW one starts scraping with a different +// `pod` label. The expression has no business reporting either pod after the +// 5m lookback — but if the old `pod` label lingers in storage and the engine +// still sees it via Select(), the join becomes inconsistent and triggers the +// alert. +// +// This pins down: pod replacement WITHOUT a real outage must not fire the alert. +func TestCPM_PodReplaced_OldFingerprintMustNotLinger(t *testing.T) { + storage := promqltest.LoadedStorage(t, ` +load 1m + kube_node_role{role="master",node="m0",instance="ksm",job="kube-state-metrics"} 1+0x30 + kube_node_role{role="master",node="m1",instance="ksm",job="kube-state-metrics"} 1+0x30 + kube_node_role{role="master",node="m2",instance="ksm",job="kube-state-metrics"} 1+0x30 + kube_pod_status_ready{condition="true",namespace="kube-system",pod="cpm-a-old",instance="ksm",job="kube-state-metrics"} 1 1 1 1 1 + kube_pod_status_ready{condition="true",namespace="kube-system",pod="cpm-b-old",instance="ksm",job="kube-state-metrics"} 1 1 1 1 1 + kube_pod_status_ready{condition="true",namespace="kube-system",pod="cpm-c-old",instance="ksm",job="kube-state-metrics"} 1 1 1 1 1 + kube_controller_pod{controller_name="d8-control-plane-manager",controller_type="DaemonSet",namespace="kube-system",node="m0",pod="cpm-a-old",job="kube-state-metrics"} 1 1 1 1 1 + kube_controller_pod{controller_name="d8-control-plane-manager",controller_type="DaemonSet",namespace="kube-system",node="m1",pod="cpm-b-old",job="kube-state-metrics"} 1 1 1 1 1 + kube_controller_pod{controller_name="d8-control-plane-manager",controller_type="DaemonSet",namespace="kube-system",node="m2",pod="cpm-c-old",job="kube-state-metrics"} 1 1 1 1 1 + kube_pod_status_ready{condition="true",namespace="kube-system",pod="cpm-a-new",instance="ksm",job="kube-state-metrics"} _ _ _ _ _ 1+0x25 + kube_pod_status_ready{condition="true",namespace="kube-system",pod="cpm-b-new",instance="ksm",job="kube-state-metrics"} _ _ _ _ _ 1+0x25 + kube_pod_status_ready{condition="true",namespace="kube-system",pod="cpm-c-new",instance="ksm",job="kube-state-metrics"} _ _ _ _ _ 1+0x25 + kube_controller_pod{controller_name="d8-control-plane-manager",controller_type="DaemonSet",namespace="kube-system",node="m0",pod="cpm-a-new",job="kube-state-metrics"} _ _ _ _ _ 1+0x25 + kube_controller_pod{controller_name="d8-control-plane-manager",controller_type="DaemonSet",namespace="kube-system",node="m1",pod="cpm-b-new",job="kube-state-metrics"} _ _ _ _ _ 1+0x25 + kube_controller_pod{controller_name="d8-control-plane-manager",controller_type="DaemonSet",namespace="kube-system",node="m2",pod="cpm-c-new",job="kube-state-metrics"} _ _ _ _ _ 1+0x25 +`) + t.Cleanup(func() { _ = storage.Close() }) + + ng := testEngine(t) + q := EngineQueryFunc(ng, storage) + + // Well after replacement and well outside lookback for the OLD generation: + res, err := q(context.TODO(), cpmExpr, time.Unix(20*60, 0)) + require.NoError(t, err) + require.Empty(t, res, "after pod replacement settled, the alert MUST stay silent — got %v", res) +} diff --git a/pp-pkg/storage/adapter_promql_test.go b/pp-pkg/storage/adapter_promql_test.go new file mode 100644 index 0000000000..0ec401ae6f --- /dev/null +++ b/pp-pkg/storage/adapter_promql_test.go @@ -0,0 +1,884 @@ +// Copyright 2026 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// This file pins down PromQL semantics over a real prompp Adapter (cppbridge +// head + querier). It mirrors the four CPM scenarios from +// pp-pkg/rules/control_plane_expr_test.go that have been validated against +// the upstream test storage. +// +// If a scenario behaves differently here than against upstream storage we have +// pinpointed a bug in the prompp querier path. + +package storage + +import ( + "testing" + "time" + + "github.com/go-kit/log" + "github.com/jonboulle/clockwork" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/suite" + + pp_model "github.com/prometheus/prometheus/pp/go/model" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/storage" +) + +const cpmExpr = `max by (node) (kube_node_role{role="master"} unless kube_node_role{role="master"}` + + ` * on (node) group_left () ((kube_pod_status_ready{condition="true"} == 1) *` + + ` on (pod, namespace) group_right () kube_controller_pod{controller_name="d8-control-plane-manager",controller_type="DaemonSet",namespace="kube-system"}))` + +// scrapePoint represents a single (labels, ts, value) tuple to ingest. +type scrapePoint struct { + labels []string // key1, val1, key2, val2, ... + tMs int64 + v float64 +} + +// AdapterPromQLSuite reuses the boilerplate from BatchStorageSuite to set up a +// real Adapter, then drives PromQL queries against it through the promql.Engine. +type AdapterPromQLSuite struct { + BatchStorageSuite +} + +func TestAdapterPromQLSuite(t *testing.T) { + suite.Run(t, new(AdapterPromQLSuite)) +} + +// engine returns a PromQL engine configured the same way it is in production +// (lookback-delta=1m as in the affected cluster's prompp). +func (s *AdapterPromQLSuite) engine() *promql.Engine { + return promql.NewEngine(promql.EngineOpts{ + Logger: log.NewNopLogger(), + Reg: nil, + MaxSamples: 100_000, + Timeout: 10 * time.Second, + LookbackDelta: 1 * time.Minute, + NoStepSubqueryIntervalFn: func(int64) int64 { return 60 * 1000 }, + EnableAtModifier: true, + EnableNegativeOffset: true, + }) +} + +// ingest appends the supplied scrape points to the active head. +// +// Implementation note: prompp's Adapter is shard-aware, but writing through +// AppendTimeSeries with a freshly built batch lets the Adapter do the routing. +// We re-use the global StateV2 just like a real scrape would. +func (s *AdapterPromQLSuite) ingest(points []scrapePoint) { + batch := &testTimeSeriesBatch{ + timeSeries: make([]pp_model.TimeSeries, 0, len(points)), + } + for _, p := range points { + b := pp_model.NewLabelSetBuilder() + for i := 0; i < len(p.labels); i += 2 { + b.Set(p.labels[i], p.labels[i+1]) + } + batch.timeSeries = append(batch.timeSeries, pp_model.TimeSeries{ + LabelSet: b.Build(), + Timestamp: uint64(p.tMs), // #nosec G115 + Value: p.v, + }) + } + _, err := s.adapter.AppendTimeSeries(s.ctx, batch, s.state, false) + s.Require().NoError(err) +} + +// queryAt runs an instant PromQL query at the given wall-clock timestamp and +// returns the resulting Vector. We use the same plumbing the rule manager uses. +func (s *AdapterPromQLSuite) queryAt(expr string, ts time.Time) promql.Vector { + q, err := s.engine().NewInstantQuery(s.ctx, s.adapter, nil, expr, ts) + s.Require().NoError(err) + defer q.Close() + + res := q.Exec(s.ctx) + s.Require().NoError(res.Err) + + v, err := res.Vector() + s.Require().NoError(err) + return v +} + +// SetupTest extends the parent setup with a real (non-fake) clock. The fake +// clock from BatchStorageSuite breaks the engine's deadline plumbing. +func (s *AdapterPromQLSuite) SetupTest() { + s.BatchStorageSuite.SetupTest() + s.clock = clockwork.NewRealClock() +} + +// TestCPM_HappyPath_NoFiring_PromppStorage mirrors +// pp-pkg/rules/control_plane_expr_test.go::TestCPM_HappyPath_NoFiring but +// against the prompp Adapter. If this passes we know the prompp querier +// returns the same shape of data as the upstream test storage for the +// fully-healthy case. +func (s *AdapterPromQLSuite) TestCPM_HappyPath_NoFiring_PromppStorage() { + const stepMs = 60_000 // 1 minute + var points []scrapePoint + + for step := int64(0); step < 11; step++ { + ts := step * stepMs + // kube_node_role + for _, node := range []string{"m0", "m1", "m2"} { + points = append(points, scrapePoint{ + labels: []string{ + "__name__", "kube_node_role", + "role", "master", + "node", node, + "instance", "ksm", + "job", "kube-state-metrics", + }, + tMs: ts, v: 1, + }) + } + // kube_pod_status_ready + for _, pod := range []string{"cpm-a", "cpm-b", "cpm-c"} { + points = append(points, scrapePoint{ + labels: []string{ + "__name__", "kube_pod_status_ready", + "condition", "true", + "namespace", "kube-system", + "pod", pod, + "instance", "ksm", + "job", "kube-state-metrics", + }, + tMs: ts, v: 1, + }) + } + // kube_controller_pod + for _, pair := range [][2]string{{"m0", "cpm-a"}, {"m1", "cpm-b"}, {"m2", "cpm-c"}} { + points = append(points, scrapePoint{ + labels: []string{ + "__name__", "kube_controller_pod", + "controller_name", "d8-control-plane-manager", + "controller_type", "DaemonSet", + "namespace", "kube-system", + "node", pair[0], + "pod", pair[1], + "job", "kube-state-metrics", + }, + tMs: ts, v: 1, + }) + } + } + s.ingest(points) + + // Sanity probe: kube_node_role at t=5m must return 3 series. + probe := s.queryAt(`kube_node_role{role="master"}`, model.Time(5*60_000).Time()) + s.Require().Len(probe, 3, "sanity: expected 3 master roles at t=5m, got %v", probe) + + // Real assertion: the alert expression must be empty. + res := s.queryAt(cpmExpr, model.Time(5*60_000).Time()) + s.Require().Empty(res, "alert expr must be empty when all masters have healthy pods — got %v", res) +} + +// TestCPM_PodGoneForOneMaster_PromppStorage mirrors the upstream test that +// stops scraping master-1's kube_pod_status_ready: outside the 1m lookback +// past the last point, master-1 must escape the unless and the alert must fire +// for exactly one master. +func (s *AdapterPromQLSuite) TestCPM_PodGoneForOneMaster_PromppStorage() { + const stepMs = 60_000 + var points []scrapePoint + + // 21 minutes of data for everything that doesn't disappear. + for step := int64(0); step < 21; step++ { + ts := step * stepMs + for _, node := range []string{"m0", "m1", "m2"} { + points = append(points, scrapePoint{ + labels: []string{ + "__name__", "kube_node_role", + "role", "master", + "node", node, + "instance", "ksm", + "job", "kube-state-metrics", + }, + tMs: ts, v: 1, + }) + } + // kube_pod_status_ready: cpm-a and cpm-c always; cpm-b only for first 4 minutes. + for _, pod := range []string{"cpm-a", "cpm-c"} { + points = append(points, scrapePoint{ + labels: []string{ + "__name__", "kube_pod_status_ready", + "condition", "true", + "namespace", "kube-system", + "pod", pod, + "instance", "ksm", + "job", "kube-state-metrics", + }, + tMs: ts, v: 1, + }) + } + if step < 4 { + points = append(points, scrapePoint{ + labels: []string{ + "__name__", "kube_pod_status_ready", + "condition", "true", + "namespace", "kube-system", + "pod", "cpm-b", + "instance", "ksm", + "job", "kube-state-metrics", + }, + tMs: ts, v: 1, + }) + } + for _, pair := range [][2]string{{"m0", "cpm-a"}, {"m1", "cpm-b"}, {"m2", "cpm-c"}} { + points = append(points, scrapePoint{ + labels: []string{ + "__name__", "kube_controller_pod", + "controller_name", "d8-control-plane-manager", + "controller_type", "DaemonSet", + "namespace", "kube-system", + "node", pair[0], + "pod", pair[1], + "job", "kube-state-metrics", + }, + tMs: ts, v: 1, + }) + } + } + s.ingest(points) + + res := s.queryAt(cpmExpr, model.Time(12*60_000).Time()) + s.Require().Len(res, 1, "exactly one master must escape the unless") + s.Require().Equal("m1", res[0].Metric.Get("node")) +} + +// TestCPM_PodReplaced_PromppStorage exercises the production-like scenario +// where the OLD pod stops being scraped at minute 5 and the NEW pod starts at +// minute 5 with a different `pod` label. After the lookback past the +// transition, the alert MUST stay silent — if it fires, we have a bug in the +// prompp querier (e.g. the old `pod` lingers in the LSS). +func (s *AdapterPromQLSuite) TestCPM_PodReplaced_PromppStorage() { + const stepMs = 60_000 + var points []scrapePoint + + for step := int64(0); step < 30; step++ { + ts := step * stepMs + for _, node := range []string{"m0", "m1", "m2"} { + points = append(points, scrapePoint{ + labels: []string{ + "__name__", "kube_node_role", + "role", "master", + "node", node, + "instance", "ksm", + "job", "kube-state-metrics", + }, + tMs: ts, v: 1, + }) + } + + // OLD generation: present only for steps 0..4 (5 minutes). + if step < 5 { + for _, oldPair := range [][2]string{{"m0", "cpm-a-old"}, {"m1", "cpm-b-old"}, {"m2", "cpm-c-old"}} { + points = append(points, scrapePoint{ + labels: []string{ + "__name__", "kube_pod_status_ready", + "condition", "true", + "namespace", "kube-system", + "pod", oldPair[1], + "instance", "ksm", + "job", "kube-state-metrics", + }, + tMs: ts, v: 1, + }) + points = append(points, scrapePoint{ + labels: []string{ + "__name__", "kube_controller_pod", + "controller_name", "d8-control-plane-manager", + "controller_type", "DaemonSet", + "namespace", "kube-system", + "node", oldPair[0], + "pod", oldPair[1], + "job", "kube-state-metrics", + }, + tMs: ts, v: 1, + }) + } + } + + // NEW generation: present from step 5 onwards. + if step >= 5 { + for _, newPair := range [][2]string{{"m0", "cpm-a-new"}, {"m1", "cpm-b-new"}, {"m2", "cpm-c-new"}} { + points = append(points, scrapePoint{ + labels: []string{ + "__name__", "kube_pod_status_ready", + "condition", "true", + "namespace", "kube-system", + "pod", newPair[1], + "instance", "ksm", + "job", "kube-state-metrics", + }, + tMs: ts, v: 1, + }) + points = append(points, scrapePoint{ + labels: []string{ + "__name__", "kube_controller_pod", + "controller_name", "d8-control-plane-manager", + "controller_type", "DaemonSet", + "namespace", "kube-system", + "node", newPair[0], + "pod", newPair[1], + "job", "kube-state-metrics", + }, + tMs: ts, v: 1, + }) + } + } + } + s.ingest(points) + + // Way past the transition (t=20m), well outside the 1m lookback: + res := s.queryAt(cpmExpr, model.Time(20*60_000).Time()) + s.Require().Empty(res, "after pod replacement settled, the alert MUST stay silent — got %v", res) +} + +// rotateHead manually replays what services/rotator.go does on a rotation tick: +// move the current active head into the keeper and install a fresh new active +// head built off the next generation. This lets us test multi-head behavior +// (active + keeper'ed old heads) without spinning the real Rotator. +func (s *AdapterPromQLSuite) rotateHead() { + oldHead := s.manager.Proxy().Get() + newHead, err := s.manager.Builder().Build(oldHead.Generation()+1, 2) + s.Require().NoError(err) + + // Mirror rotator.rotate: keep the old head accessible via Proxy.Heads(). + s.Require().NoError(s.manager.Proxy().AddWithReplace(oldHead, 0)) + s.Require().NoError(s.manager.Proxy().Replace(s.ctx, newHead)) + oldHead.SetReadOnly() +} + +// TestCPM_PodReplaced_AcrossRotation_PromppStorage is the most production-like +// scenario for the stuck D8ControlPlaneManagerPodNotRunning alert: +// +// 1. Initial pods (cpm-X-old) are scraped into HEAD-A. HEAD-A becomes the +// keeper'ed read-only head after rotation. +// 2. A rotation occurs (mimicking the periodic rotator tick). +// 3. The DaemonSet controller recreates pods (cpm-X-new) and KSM exposes the +// new pod labels. These new series are scraped into the fresh HEAD-B +// (active). +// +// At a query time well past the rotation and well past the 1m lookback, the +// alert expression MUST stay silent — the OLD pods' last sample is more than +// 1m in the past and therefore fall outside lookback. If the unless join +// becomes inconsistent and the alert fires, we have reproduced the prod bug. +func (s *AdapterPromQLSuite) TestCPM_PodReplaced_AcrossRotation_PromppStorage() { + const stepMs = 60_000 + + // Phase 1: write OLD generation into HEAD-A (steps 0..4). + { + var points []scrapePoint + for step := int64(0); step < 5; step++ { + ts := step * stepMs + for _, node := range []string{"m0", "m1", "m2"} { + points = append(points, scrapePoint{ + labels: []string{ + "__name__", "kube_node_role", + "role", "master", + "node", node, + "instance", "ksm", + "job", "kube-state-metrics", + }, + tMs: ts, v: 1, + }) + } + for _, oldPair := range [][2]string{{"m0", "cpm-a-old"}, {"m1", "cpm-b-old"}, {"m2", "cpm-c-old"}} { + points = append(points, scrapePoint{ + labels: []string{ + "__name__", "kube_pod_status_ready", + "condition", "true", + "namespace", "kube-system", + "pod", oldPair[1], + "instance", "ksm", + "job", "kube-state-metrics", + }, + tMs: ts, v: 1, + }) + points = append(points, scrapePoint{ + labels: []string{ + "__name__", "kube_controller_pod", + "controller_name", "d8-control-plane-manager", + "controller_type", "DaemonSet", + "namespace", "kube-system", + "node", oldPair[0], + "pod", oldPair[1], + "job", "kube-state-metrics", + }, + tMs: ts, v: 1, + }) + } + } + s.ingest(points) + } + + // Phase 2: rotate. Old head with OLD pods now lives in the keeper. + s.rotateHead() + + // Phase 3: write NEW generation into HEAD-B (steps 5..29). + { + var points []scrapePoint + for step := int64(5); step < 30; step++ { + ts := step * stepMs + for _, node := range []string{"m0", "m1", "m2"} { + points = append(points, scrapePoint{ + labels: []string{ + "__name__", "kube_node_role", + "role", "master", + "node", node, + "instance", "ksm", + "job", "kube-state-metrics", + }, + tMs: ts, v: 1, + }) + } + for _, newPair := range [][2]string{{"m0", "cpm-a-new"}, {"m1", "cpm-b-new"}, {"m2", "cpm-c-new"}} { + points = append(points, scrapePoint{ + labels: []string{ + "__name__", "kube_pod_status_ready", + "condition", "true", + "namespace", "kube-system", + "pod", newPair[1], + "instance", "ksm", + "job", "kube-state-metrics", + }, + tMs: ts, v: 1, + }) + points = append(points, scrapePoint{ + labels: []string{ + "__name__", "kube_controller_pod", + "controller_name", "d8-control-plane-manager", + "controller_type", "DaemonSet", + "namespace", "kube-system", + "node", newPair[0], + "pod", newPair[1], + "job", "kube-state-metrics", + }, + tMs: ts, v: 1, + }) + } + } + s.ingest(points) + } + + // Sanity: we have 2 heads (one in keeper, one active). + s.Require().NotEmpty(s.manager.Proxy().Heads(), "keeper should hold the rotated head") + + // Probe at t=20m (14m past the last OLD point at t=4m, well outside 1m lookback). + t20m := model.Time(20 * 60_000).Time() + + // First sanity-check the building blocks of the expression independently: + leftV := s.queryAt(`kube_node_role{role="master"}`, t20m) + s.Require().Len(leftV, 3, "expected 3 master roles, got %v", leftV) + + innerV := s.queryAt( + `(kube_pod_status_ready{condition="true"} == 1) * on (pod, namespace) group_right () `+ + `kube_controller_pod{controller_name="d8-control-plane-manager",controller_type="DaemonSet",namespace="kube-system"}`, t20m) + if !s.Equal(3, len(innerV), "INNER must yield exactly 3 series at t=20m — got %d:\n%v", len(innerV), innerV) { + for i, ss := range innerV { + s.T().Logf(" inner[%d] = %s", i, ss.Metric.String()) + } + } + + // The actual assertion: + res := s.queryAt(cpmExpr, t20m) + if !s.Empty(res, "after rotation+pod replacement settled, alert MUST stay silent — got %v", res) { + for i, ss := range res { + s.T().Logf(" firing[%d] = %s", i, ss.Metric.String()) + } + } +} + +// TestCPM_RecordingRuleLagsBeyondLookback_AlertFires reproduces the production +// scenario hypothesised by the user: +// +// `kube_controller_pod` is a RECORDING RULE (no `instance` label, generated by +// kube-prometheus-stack from `kube_pod_owner`). It lives in a different group +// than our alerting group. If that recording group falls behind (e.g. heavy +// eval, missed iterations, scrape lag of `kube_pod_owner`) and stops writing +// fresh `kube_controller_pod` samples for longer than the lookback delta: +// - the INNER subexpression of the alert becomes empty +// - `unless` cancels nothing +// - the alert fires for ALL masters +// +// We model "recording rule lagged" by ingesting kube_controller_pod up to a +// cutoff that's older than `lookback delta` from the query time. +func (s *AdapterPromQLSuite) TestCPM_RecordingRuleLagsBeyondLookback_AlertFires() { + const stepMs = 60_000 + + var points []scrapePoint + for step := int64(0); step < 30; step++ { + ts := step * stepMs + // kube_node_role: scraped continuously + for _, node := range []string{"m0", "m1", "m2"} { + points = append(points, scrapePoint{ + labels: []string{ + "__name__", "kube_node_role", + "role", "master", + "node", node, + "instance", "ksm", + "job", "kube-state-metrics", + "scrape_endpoint", "main", + }, + tMs: ts, v: 1, + }) + } + // kube_pod_status_ready: scraped continuously + for _, pod := range []string{"cpm-a", "cpm-b", "cpm-c"} { + points = append(points, scrapePoint{ + labels: []string{ + "__name__", "kube_pod_status_ready", + "condition", "true", + "namespace", "kube-system", + "pod", pod, + "instance", "ksm", + "job", "kube-state-metrics", + }, + tMs: ts, v: 1, + }) + } + // kube_controller_pod: recording rule that STOPS being recorded after step 10. + if step <= 10 { + for _, pair := range [][2]string{{"m0", "cpm-a"}, {"m1", "cpm-b"}, {"m2", "cpm-c"}} { + points = append(points, scrapePoint{ + labels: []string{ + "__name__", "kube_controller_pod", + "controller", "ds/d8-control-plane-manager", + "controller_name", "d8-control-plane-manager", + "controller_type", "DaemonSet", + "job", "kube-state-metrics", + "namespace", "kube-system", + "node", pair[0], + "pod", pair[1], + }, + tMs: ts, v: 1, + }) + } + } + } + s.ingest(points) + + // At t=20m the last kube_controller_pod sample (t=10m) is 10 minutes in the + // past — far beyond the 1m lookback. INNER becomes empty, the alert fires + // for all three masters. + res := s.queryAt(cpmExpr, model.Time(20*60_000).Time()) + s.Require().Len(res, 3, "alert MUST fire for all 3 masters when kube_controller_pod recording rule lags out — got %v", res) + nodes := map[string]bool{} + for _, r := range res { + nodes[r.Metric.Get("node")] = true + } + s.Require().True(nodes["m0"] && nodes["m1"] && nodes["m2"], + "all three masters expected, got: %v", nodes) +} + +// TestCPM_RecordingRuleCatchesUp_AlertClears extends the previous test: after +// the recording rule resumes producing samples, within 1m+ the alert MUST +// stop matching. This nails down "is the bug just lag, or is there something +// stickier in storage that prevents recovery?". +func (s *AdapterPromQLSuite) TestCPM_RecordingRuleCatchesUp_AlertClears() { + const stepMs = 60_000 + + var points []scrapePoint + for step := int64(0); step < 30; step++ { + ts := step * stepMs + for _, node := range []string{"m0", "m1", "m2"} { + points = append(points, scrapePoint{ + labels: []string{ + "__name__", "kube_node_role", + "role", "master", + "node", node, + "instance", "ksm", + "job", "kube-state-metrics", + "scrape_endpoint", "main", + }, + tMs: ts, v: 1, + }) + } + for _, pod := range []string{"cpm-a", "cpm-b", "cpm-c"} { + points = append(points, scrapePoint{ + labels: []string{ + "__name__", "kube_pod_status_ready", + "condition", "true", + "namespace", "kube-system", + "pod", pod, + "instance", "ksm", + "job", "kube-state-metrics", + }, + tMs: ts, v: 1, + }) + } + // kube_controller_pod: lag from step 11..15, resumes at step 16. + if step <= 10 || step >= 16 { + for _, pair := range [][2]string{{"m0", "cpm-a"}, {"m1", "cpm-b"}, {"m2", "cpm-c"}} { + points = append(points, scrapePoint{ + labels: []string{ + "__name__", "kube_controller_pod", + "controller", "ds/d8-control-plane-manager", + "controller_name", "d8-control-plane-manager", + "controller_type", "DaemonSet", + "job", "kube-state-metrics", + "namespace", "kube-system", + "node", pair[0], + "pod", pair[1], + }, + tMs: ts, v: 1, + }) + } + } + } + s.ingest(points) + + // At t=14m INNER is empty (last sample was at t=10m, lookback is 1m). + resDuringLag := s.queryAt(cpmExpr, model.Time(14*60_000).Time()) + s.Require().Len(resDuringLag, 3, "during lag, alert must fire for all 3 masters") + + // At t=20m the recording rule is producing again (last sample t=20m). + // INNER must be non-empty for all 3 masters → unless cancels everything. + resAfter := s.queryAt(cpmExpr, model.Time(20*60_000).Time()) + s.Require().Empty(resAfter, "after recording rule recovers, alert MUST clear — got %v", resAfter) +} + +// TestCPM_RecordingRuleMissedIterations_AlertStaysFiring models the most +// realistic prod scenario: +// +// - LookbackDelta = 1m AND alert group interval = 1m AND recording rule +// interval = 1m (all three confirmed against the affected cluster's flags +// and rule_group.json) +// - The recording rule that produces `kube_controller_pod` runs in a +// DIFFERENT group, and that group has steady +// `prometheus_rule_group_iterations_missed_total` (≥ every 2nd iteration). +// The cluster snapshot shows ≥20 groups with this counter > 0. +// +// When iterations are missed, the recording rule produces samples no more +// often than once per `interval * 2 = 2m`. With LookbackDelta = 1m there is +// always a window of at least 1 minute when the latest visible sample is +// already older than the lookback. Querying inside that window: +// - kube_controller_pod selector returns empty +// - INNER is empty → `unless` cancels nothing +// - alert fires for ALL three masters +// +// Repeating the query on every alert-eval tick (also every 60s) of the +// affected group: half the time the alert sees firing, half the time it +// doesn't. State machine then keeps the alert in r.active continuously +// because at least one out of two evals returns the firing set, never letting +// it drop to Inactive long enough to expire (`resolvedRetention` = 15m). +// +// Concretely we write kube_controller_pod every 120s (sparse) and verify that +// queries at the midpoints (offset +90s after the last sample, well outside +// the 60s lookback window) all fire. +func (s *AdapterPromQLSuite) TestCPM_RecordingRuleMissedIterations_AlertStaysFiring() { + const oneMin = int64(60_000) + const twoMin = int64(120_000) + + var points []scrapePoint + // 30 minutes worth of healthy direct scrapes for kube_node_role + + // kube_pod_status_ready (every 60s). + for ts := int64(0); ts <= 30*oneMin; ts += oneMin { + for _, node := range []string{"m0", "m1", "m2"} { + points = append(points, scrapePoint{ + labels: []string{ + "__name__", "kube_node_role", + "role", "master", + "node", node, + "instance", "ksm", + "job", "kube-state-metrics", + "scrape_endpoint", "main", + }, + tMs: ts, v: 1, + }) + } + for _, pod := range []string{"cpm-a", "cpm-b", "cpm-c"} { + points = append(points, scrapePoint{ + labels: []string{ + "__name__", "kube_pod_status_ready", + "condition", "true", + "namespace", "kube-system", + "pod", pod, + "instance", "ksm", + "job", "kube-state-metrics", + }, + tMs: ts, v: 1, + }) + } + } + // kube_controller_pod recording rule writes sparsely — every 2 minutes + // (every 2nd iteration is missed). + for ts := int64(0); ts <= 30*oneMin; ts += twoMin { + for _, pair := range [][2]string{{"m0", "cpm-a"}, {"m1", "cpm-b"}, {"m2", "cpm-c"}} { + points = append(points, scrapePoint{ + labels: []string{ + "__name__", "kube_controller_pod", + "controller", "ds/d8-control-plane-manager", + "controller_name", "d8-control-plane-manager", + "controller_type", "DaemonSet", + "job", "kube-state-metrics", + "namespace", "kube-system", + "node", pair[0], + "pod", pair[1], + }, + tMs: ts, v: 1, + }) + } + } + s.ingest(points) + + // Pick query times that fall in the "blind" half of every 2-minute cycle: + // 90 seconds after the last recording-rule sample. The last sample is at + // t-90s; lookback window is (t-60s, t]; so t-90s is OUTSIDE the window. + for _, atMs := range []int64{ + 3*oneMin + oneMin/2, // 3:30 → last RR sample @ 2:00, gap 90s + 5*oneMin + oneMin/2, // 5:30 → last RR sample @ 4:00, gap 90s + 7*oneMin + oneMin/2, + 9*oneMin + oneMin/2, + 15*oneMin + oneMin/2, + 25*oneMin + oneMin/2, + } { + res := s.queryAt(cpmExpr, model.Time(atMs).Time()) + s.Require().Lenf(res, 3, + "alert MUST fire for all 3 masters at t=%.1fmin (recording rule misses every 2nd iteration) — got %v", + float64(atMs)/60_000.0, res) + } + + // And on the "lucky" tick — exactly when the recording rule did write — + // the alert correctly clears. This proves the firing is not state-machine + // stickiness in storage: it's a pure expression-vs-data mismatch. + for _, atMs := range []int64{4 * oneMin, 6 * oneMin, 10 * oneMin, 20 * oneMin} { + res := s.queryAt(cpmExpr, model.Time(atMs).Time()) + s.Require().Emptyf(res, + "alert must NOT fire at t=%dmin (recording rule sample fresh) — got %v", + atMs/60_000, res) + } +} + +// TestCPM_OffsetCollisionRace_FixedByQueryOffset is the prod-confirmed scenario. +// +// In the affected v0.7.6 cluster the alert group +// `control-plane-manager-control-plane-manager-0` and the recording-rule group +// `monitoring-kubernetes-kubernetes-controller-0` (which writes +// `kube_controller_pod` from `kube_pod_owner`) ended up with practically +// identical hash-derived eval offsets inside the minute: +// +// last_evaluation_timestamp_seconds (alert group) = T + 6.329 s +// last_evaluation_timestamp_seconds (rec.rule group) = T + 6.310 s +// kube_controller_pod sample TS = T + 6.308 s +// +// i.e. alert eval starts only 19 ms after recording-rule eval, while the +// cppbridge head ingest path needs more than 19 ms to make the new sample +// visible to a querier. Result: +// +// - alert eval @ T+6.329 sees the LATEST committed sample at T-60+6.308 +// (previous minute) → distance = 60.021 s > LookbackDelta (60 s) +// - INNER subexpression is empty → `unless` cancels nothing +// - `max by (node)` returns all three masters → alert FIRING +// - API queries (run later) see the new sample already committed → empty +// +// The fix is to add `query_offset` (per-group or global). An offset of 30 s +// pushes alert eval to ask storage for time `T-23.671`; the previous-minute +// sample at `T-53.692` lies 30.021 s back, well within the 60 s lookback. +// INNER becomes non-empty, `unless` cancels everything, alert is silent. +// +// We model the race by ingesting kube_controller_pod up to step N-1 and +// querying at exactly step N's eval time — i.e. "the new sample doesn't +// exist in storage yet from the alert eval's point of view". This gives the +// same lookback geometry as the prod race (latest visible sample > 60 s +// behind the alert eval timestamp). +func (s *AdapterPromQLSuite) TestCPM_OffsetCollisionRace_FixedByQueryOffset() { + const ( + stepMs = int64(60_000) + offsetMs = int64(6_000) // shared offset of both groups inside the minute + alertJitterMs = int64(329) // alert eval starts 329 ms after minute+offset (matches prod 6.329) + nLastWritten = int64(29) // we write kube_controller_pod up to and including this step + alertStep = int64(30) // alert eval happens at this step's tick + ) + + var points []scrapePoint + for step := int64(0); step <= alertStep; step++ { + ts := step*stepMs + offsetMs + + for _, node := range []string{"m0", "m1", "m2"} { + points = append(points, scrapePoint{ + labels: []string{ + "__name__", "kube_node_role", + "role", "master", + "node", node, + "instance", "ksm", + "job", "kube-state-metrics", + "scrape_endpoint", "main", + }, + tMs: ts, v: 1, + }) + } + for _, pod := range []string{"cpm-a", "cpm-b", "cpm-c"} { + points = append(points, scrapePoint{ + labels: []string{ + "__name__", "kube_pod_status_ready", + "condition", "true", + "namespace", "kube-system", + "pod", pod, + "instance", "ksm", + "job", "kube-state-metrics", + }, + tMs: ts, v: 1, + }) + } + // kube_controller_pod: recording rule output. STOP one step earlier than + // the alert tick — this models "the sample for alertStep has been + // produced by the recording rule but is NOT YET committed when alert + // eval starts 19 ms later". + if step <= nLastWritten { + for _, pair := range [][2]string{{"m0", "cpm-a"}, {"m1", "cpm-b"}, {"m2", "cpm-c"}} { + points = append(points, scrapePoint{ + labels: []string{ + "__name__", "kube_controller_pod", + "controller", "ds/d8-control-plane-manager", + "controller_name", "d8-control-plane-manager", + "controller_type", "DaemonSet", + "job", "kube-state-metrics", + "namespace", "kube-system", + "node", pair[0], + "pod", pair[1], + }, + tMs: ts, v: 1, + }) + } + } + } + s.ingest(points) + + alertEvalMs := alertStep*stepMs + offsetMs + alertJitterMs + + // (1) Without query_offset: alert eval queries storage at exactly + // alertEvalMs. Latest committed kube_controller_pod sample is from + // step nLastWritten = 29 → ts = 29*60_000 + 6_000 = 1_746_000 ms. + // Distance from alert eval = alertEvalMs - 1_746_000 = 60_329 ms, + // which is > 60_000 ms (LookbackDelta) → kube_controller_pod is OUT of + // the lookback window → INNER empty → unless does not cancel anything → + // alert fires for ALL three masters. This reproduces the prod race. + resNoOffset := s.queryAt(cpmExpr, model.Time(alertEvalMs).Time()) + s.Require().Lenf(resNoOffset, 3, + "prod race must reproduce: without query_offset, alert fires for all 3 masters; got %v", resNoOffset) + + // (2) With query_offset = 30s: alert eval queries storage at + // alertEvalMs - 30_000 ms. Distance from latest committed sample drops to + // 30_329 ms (< 60_000 ms) → kube_controller_pod is INSIDE the lookback → + // INNER non-empty → unless cancels all three masters → alert silent. + const queryOffsetMs = int64(30_000) + resWithOffset := s.queryAt(cpmExpr, model.Time(alertEvalMs-queryOffsetMs).Time()) + s.Require().Emptyf(resWithOffset, + "with query_offset=30s race must NOT fire: got %v", resWithOffset) + + // (3) Boundary safety: 19 ms (= prod ingest delay) is enough — anything + // strictly larger than (60_000 ms - distance_to_latest_visible_sample) lets + // the previous sample cover the lookback. We pick 1 s as the practical + // minimum that's bigger than any sane ingest jitter. + const minimalOffsetMs = int64(1_000) + resWithMinOffset := s.queryAt(cpmExpr, model.Time(alertEvalMs-minimalOffsetMs).Time()) + s.Require().Emptyf(resWithMinOffset, + "with query_offset=1s race must already NOT fire: got %v", resWithMinOffset) +} + +// Static check that we still satisfy storage.Queryable (compile-time only). +var _ storage.Queryable = (*Adapter)(nil) diff --git a/pp-pkg/storage/batch_storage_test.go b/pp-pkg/storage/batch_storage_test.go index a345f30b35..7574c892bf 100644 --- a/pp-pkg/storage/batch_storage_test.go +++ b/pp-pkg/storage/batch_storage_test.go @@ -12,10 +12,12 @@ import ( "github.com/prometheus/prometheus/pp/go/storage/storagetest" "github.com/stretchr/testify/suite" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/pp/go/cppbridge" pp_model "github.com/prometheus/prometheus/pp/go/model" pp_storage "github.com/prometheus/prometheus/pp/go/storage" "github.com/prometheus/prometheus/pp/go/storage/catalog" + "github.com/prometheus/prometheus/storage" ) const ( @@ -208,3 +210,68 @@ func (s *BatchStorageSuite) TestCommit_WithSamplesAdded() { }, }, storagetest.GetSamplesFromSerializedData(queryResult.SerializedData)) } + +// TestEphemeralHead_IsSterileBetweenEvaluations validates the hypothesis that +// each call to adapter.BatchStorage() produces a fresh, empty ephemeral head. +// +// Background: rule evaluation creates a BatchStorage per Group.Eval() iteration +// and uses it as a primary in the FanoutQueryable used by the rule QueryFunc. +// If the ephemeral head leaked data between evaluations, a recording rule (or +// the ALERTS series written by alerting rules) from a previous iteration could +// influence subsequent rules' query results — which would be a serious data +// flow bug between rules in the same group. +// +// This test writes a series into one BatchStorage and confirms that a freshly +// obtained BatchStorage from the same adapter does NOT see that series. +func (s *BatchStorageSuite) TestEphemeralHead_IsSterileBetweenEvaluations() { + // Arrange: write a series into the first ephemeral head. + bs1 := s.adapter.BatchStorage().(*BatchStorage) + batch := &testTimeSeriesBatch{ + timeSeries: []pp_model.TimeSeries{ + { + LabelSet: pp_model.NewLabelSetBuilder().Set("__name__", "leak_check").Set("job", "ephemeral").Build(), + Timestamp: 1_000, + Value: 1.0, + }, + }, + } + _, err := bs1.AppendTimeSeries(s.ctx, batch, s.state, false) + s.Require().NoError(err) + + // Sanity: bs1's own querier MUST see the series we just wrote. + q1, err := bs1.Querier(0, 10_000) + s.Require().NoError(err) + matcher := labels.MustNewMatcher(labels.MatchEqual, "__name__", "leak_check") + got1 := collectLabelSets(q1.Select(s.ctx, false, nil, matcher)) + s.Require().Len(got1, 1, "bs1 must see its own series") + + // Act: obtain a brand-new BatchStorage from the SAME adapter. + bs2 := s.adapter.BatchStorage().(*BatchStorage) + + // Assert 1: bs2's transactionHead MUST be a different instance. + s.Require().NotSame(bs1.transactionHead, bs2.transactionHead, + "each BatchStorage() call must build a brand-new TransactionHead") + + // Assert 2: bs2 MUST NOT see the series written into bs1. + q2, err := bs2.Querier(0, 10_000) + s.Require().NoError(err) + got2 := collectLabelSets(q2.Select(s.ctx, false, nil, matcher)) + s.Require().Empty(got2, + "bs2 must be sterile and not leak series from bs1 — got %v", got2) + + // Assert 3: nothing reached the main (active) head either, since bs1.Commit() was not called. + mainQ, err := s.adapter.Querier(0, 10_000) + s.Require().NoError(err) + gotMain := collectLabelSets(mainQ.Select(s.ctx, false, nil, matcher)) + s.Require().Empty(gotMain, + "main head must NOT see uncommitted ephemeral data — got %v", gotMain) +} + +// collectLabelSets exhausts a SeriesSet and returns its series labels. +func collectLabelSets(ss storage.SeriesSet) []labels.Labels { + var out []labels.Labels + for ss.Next() { + out = append(out, ss.At().Labels()) + } + return out +}