forked from VictoriaMetrics/vmctl
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprometheus.go
More file actions
134 lines (123 loc) · 3.01 KB
/
Copy pathprometheus.go
File metadata and controls
134 lines (123 loc) · 3.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package main
import (
"fmt"
"log"
"sync"
"github.com/cheggaaa/pb/v3"
"github.com/prometheus/prometheus/tsdb"
"github.com/victoriametrics/vmctl/prometheus"
"github.com/victoriametrics/vmctl/vm"
)
type prometheusProcessor struct {
// prometheus client fetches and reads
// snapshot blocks
cl *prometheus.Client
// importer performs import requests
// for timeseries data returned from
// snapshot blocks
im *vm.Importer
// cc stands for concurrency
// and defines number of concurrently
// running snapshot block readers
cc int
}
func (pp *prometheusProcessor) run() error {
blocks, err := pp.cl.Explore()
if err != nil {
return fmt.Errorf("explore failed: %s", err)
}
if len(blocks) < 1 {
return fmt.Errorf("found no blocks to import")
}
question := fmt.Sprintf("Found %d blocks to import. Continue?", len(blocks))
if !prompt(question) {
return nil
}
bar := pb.StartNew(len(blocks))
blockReadersCh := make(chan tsdb.BlockReader)
errCh := make(chan error, pp.cc)
var wg sync.WaitGroup
wg.Add(pp.cc)
for i := 0; i < pp.cc; i++ {
go func() {
defer wg.Done()
for br := range blockReadersCh {
if err := pp.do(br); err != nil {
errCh <- fmt.Errorf("read failed for block %q: %s", br.Meta().ULID, err)
return
}
bar.Increment()
}
}()
}
// any error breaks the import
for _, br := range blocks {
select {
case promErr := <-errCh:
close(blockReadersCh)
return fmt.Errorf("prometheus error: %s", promErr)
case vmErr := <-pp.im.Errors():
close(blockReadersCh)
var errTS string
for _, ts := range vmErr.Batch {
errTS += fmt.Sprintf("%s for timestamps range %d - %d\n",
ts.String(), ts.Timestamps[0], ts.Timestamps[len(ts.Timestamps)-1])
}
return fmt.Errorf("Import process failed for: \n%swith error: %s", errTS, vmErr.Err)
case blockReadersCh <- br:
}
}
close(blockReadersCh)
wg.Wait()
// wait for all buffers to flush
pp.im.Close()
bar.Finish()
log.Println("Import finished!")
log.Print(pp.im.Stats())
return nil
}
func (pp *prometheusProcessor) do(b tsdb.BlockReader) error {
ss, err := pp.cl.Read(b)
if err != nil {
return fmt.Errorf("failed to read block: %s", err)
}
if ss.Err() != nil {
return fmt.Errorf("unexpected series set error: %s", err)
}
for ss.Next() {
var name string
var labels []vm.LabelPair
series := ss.At()
for _, label := range series.Labels() {
if label.Name == "__name__" {
name = label.Value
continue
}
labels = append(labels, vm.LabelPair{
Name: label.Name,
Value: label.Value,
})
}
if name == "" {
return fmt.Errorf("failed to find `__name__` label in labelset for block %v", b.Meta().ULID)
}
var timestamps []int64
var values []interface{}
it := series.Iterator()
for it.Next() {
t, v := it.At()
timestamps = append(timestamps, t)
values = append(values, v)
}
if it.Err() != nil {
return ss.Err()
}
pp.im.Input() <- &vm.TimeSeries{
Name: name,
LabelPairs: labels,
Timestamps: timestamps,
Values: values,
}
}
return nil
}