这份文档描述业务侧应该怎样理解和使用 BatchFlow 指标。指标语义的 source of truth 是 Metrics 规格。
优先使用仓库内的 Prometheus 示例:
import prommetrics "github.com/rushairer/batchflow/examples/metrics/prometheus"metrics := prommetrics.NewMetrics(prommetrics.Options{
Namespace: "batchflow",
IncludeInstanceID: true,
EnablePipelineMetrics: true,
})
if err := metrics.StartServer(2112); err != nil {
return err
}
defer metrics.StopServer(context.Background())
reporter := prommetrics.NewReporter(metrics, "mysql", "order_writer")
flow := batchflow.NewMySQLBatchFlow(ctx, db, batchflow.PipelineConfig{
BufferSize: 5000,
FlushSize: 200,
FlushInterval: 100 * time.Millisecond,
ConcurrencyLimit: 8,
MetricsReporter: reporter,
})
defer flow.Close()enqueue_latency_secondspipeline_queue_lengthsubmit_rejected_total
pipeline_dequeue_latency_secondspipeline_process_duration_secondspipeline_flush_sizeschema_groups_per_flushpipeline_dropped_total
batch_assemble_duration_secondsexecute_duration_secondsbatch_sizeexecutor_concurrencyinflight_batcheserrors_total
表示单个 schema 执行批大小,也就是一次 ExecuteBatch(...) 收到的数据量。
表示一次 pipeline flush 收到的总请求数。它可能会在内部被拆成多个 schema 组。
表示一次 flush 内被拆成多少个 schema 组。这个值越高,说明单次 flush 的数据异质性越高。
表示 Submit(...) 在进入内部队列前被拒绝的次数,常见原因:
context_canceledcontext_deadline_exceededbatchflow_closedempty_requestinvalid_schemamissing_columnempty_schema_name
sum(rate(batchflow_submit_rejected_total{reason="batchflow_closed"}[5m])) by (instance_id)
histogram_quantile(0.95, sum by (le, instance_id) (rate(batchflow_pipeline_dequeue_latency_seconds_bucket[5m])))
sum(rate(batchflow_errors_total{error_type=~"final:.*"}[5m])) by (instance_id, error_type)
histogram_quantile(0.99, sum by (le, instance_id, status) (rate(batchflow_execute_duration_seconds_bucket[5m])))
histogram_quantile(0.95, sum by (le, instance_id) (rate(batchflow_schema_groups_per_flush_bucket[5m])))
建议最少做 3 组图:
enqueue_latency_secondspipeline_queue_lengthsubmit_rejected_total
pipeline_flush_sizeschema_groups_per_flushpipeline_dequeue_latency_seconds
execute_duration_secondsbatch_sizeerrors_totalinflight_batches
- 不要把
batch_size当吞吐总量指标;它是单次 schema 执行批大小分布。 - 不要把
pipeline_dequeue_latency_seconds理解成“网络延迟”;它反映的是内部排队等待。 - 不要把
Close()省略掉,否则最后一批数据和对应指标都可能没被完整 flush。
继续阅读: