Skip to content

Latest commit

 

History

History
149 lines (103 loc) · 3.48 KB

File metadata and controls

149 lines (103 loc) · 3.48 KB

BatchFlow 监控指南

这份文档描述业务侧应该怎样理解和使用 BatchFlow 指标。指标语义的 source of truth 是 Metrics 规格

推荐接入方式

优先使用仓库内的 Prometheus 示例:

import prommetrics "github.com/rushairer/batchflow/examples/metrics/prometheus"
metrics := prommetrics.NewMetrics(prommetrics.Options{
	Namespace:             "batchflow",
	IncludeInstanceID:     true,
	EnablePipelineMetrics: true,
})

if err := metrics.StartServer(2112); err != nil {
	return err
}
defer metrics.StopServer(context.Background())

reporter := prommetrics.NewReporter(metrics, "mysql", "order_writer")

flow := batchflow.NewMySQLBatchFlow(ctx, db, batchflow.PipelineConfig{
	BufferSize:       5000,
	FlushSize:        200,
	FlushInterval:    100 * time.Millisecond,
	ConcurrencyLimit: 8,
	MetricsReporter:  reporter,
})
defer flow.Close()

指标分层

Submit / Queue

  • enqueue_latency_seconds
  • pipeline_queue_length
  • submit_rejected_total

Pipeline / Flush

  • pipeline_dequeue_latency_seconds
  • pipeline_process_duration_seconds
  • pipeline_flush_size
  • schema_groups_per_flush
  • pipeline_dropped_total

Executor

  • batch_assemble_duration_seconds
  • execute_duration_seconds
  • batch_size
  • executor_concurrency
  • inflight_batches
  • errors_total

关键语义

batch_size

表示单个 schema 执行批大小,也就是一次 ExecuteBatch(...) 收到的数据量。

pipeline_flush_size

表示一次 pipeline flush 收到的总请求数。它可能会在内部被拆成多个 schema 组。

schema_groups_per_flush

表示一次 flush 内被拆成多少个 schema 组。这个值越高,说明单次 flush 的数据异质性越高。

submit_rejected_total

表示 Submit(...) 在进入内部队列前被拒绝的次数,常见原因:

  • context_canceled
  • context_deadline_exceeded
  • batchflow_closed
  • empty_request
  • invalid_schema
  • missing_column
  • empty_schema_name

推荐告警

队列和背压

sum(rate(batchflow_submit_rejected_total{reason="batchflow_closed"}[5m])) by (instance_id)
histogram_quantile(0.95, sum by (le, instance_id) (rate(batchflow_pipeline_dequeue_latency_seconds_bucket[5m])))

执行质量

sum(rate(batchflow_errors_total{error_type=~"final:.*"}[5m])) by (instance_id, error_type)
histogram_quantile(0.99, sum by (le, instance_id, status) (rate(batchflow_execute_duration_seconds_bucket[5m])))

flush 结构复杂度

histogram_quantile(0.95, sum by (le, instance_id) (rate(batchflow_schema_groups_per_flush_bucket[5m])))

仪表盘建议

建议最少做 3 组图:

入口压力

  • enqueue_latency_seconds
  • pipeline_queue_length
  • submit_rejected_total

flush 形态

  • pipeline_flush_size
  • schema_groups_per_flush
  • pipeline_dequeue_latency_seconds

执行结果

  • execute_duration_seconds
  • batch_size
  • errors_total
  • inflight_batches

常见误区

  • 不要把 batch_size 当吞吐总量指标;它是单次 schema 执行批大小分布。
  • 不要把 pipeline_dequeue_latency_seconds 理解成“网络延迟”;它反映的是内部排队等待。
  • 不要把 Close() 省略掉,否则最后一批数据和对应指标都可能没被完整 flush。

继续阅读: