Skip to content

Rw refactoring datasource#251

Merged
vporoshok merged 94 commits into
ppfrom
rw_refactoring_datasource
May 12, 2026
Merged

Rw refactoring datasource#251
vporoshok merged 94 commits into
ppfrom
rw_refactoring_datasource

Conversation

@u-veles-a
Copy link
Copy Markdown
Collaborator

@u-veles-a u-veles-a commented Mar 5, 2026

Summary

Splits the remote-write dataSource into two specialised implementations to optimise WAL v2 reads:

  • dataSourceActive — drives the currently-writing head. Uses SegmentReadyChecker to discover which shard owns each segment and reads only ready ones, so we no longer try to read sync-only data ahead of what the writer has committed.
  • dataSourceRotated — drives a rotated/persisting head. The WAL is fully written, so Init fast-forwards each shard to the cached targetSegmentID in parallel (one goroutine per shard), and subsequent Next calls pick the shard with the lowest pending segment ID via selectNextSegment.

writeloop.makeDataSource selects the right implementation based on headRecord.Status() (StatusNew/StatusActive → active, otherwise rotated). Each variant only keeps the state it actually needs, and the hot read path no longer branches on head status.

Notable changes

  • New DataSourceV2 interface (Close/Init/LSSSnapshots/Next/NumberOfLSSes/WriteCaches) replaces the previous DataSource. Iterator is rewired through it; targetSegmentID is now advanced per actually-decoded segment (max(targetSegmentID, segment.ID+1)) instead of a blind ++.
  • shard is split into a "live" shard and a shardRotated that reads segment ID and body separately (ReadSegmentID + ReadSegmentBody), so the rotated traversal can choose the next shard without prefetching full segments. For V1 WALs the segment ID is synthesised from walReader.nextSegmentID; for V2 it is read from the file. Documented inline in walreader.go / shard.go.
  • walReader tracks remaining bytes (fsize) and exposes IsEOF, used only by the rotated reader.
  • The async decoder-state cache write-back loop is extracted into a generic caches[T io.WriterTo] helper shared by both data sources.
  • DestinationMetrics construction extracted into newDestinationMetrics. Histogram bucket configuration for generate_batch_duration_seconds, read_segment_duration_seconds, encode_batch_duration_seconds is preserved.
  • ByteReader now exposes a non-tracking Read(p) for bulk reads via io.ReadFull, and renames the ReadByte-only counter to readBytes to make the contract explicit.
  • markCorrupted on both data sources is annotated with //go:norace (safe under the race because catalog.SetCorrupted is idempotent and serialised, and the only write to corruptMarker is nil), logs the underlying read errors before marking, and propagates errors from the marker instead of swallowing them.

Test plan

  • New unit tests: shard_test.go, iterator_test.go, datasourceV2_test.go, walreader_test.go::TestReadIDAndBodyV1/V2, plus remotewritertest fixtures (MakeCatalog, WriteToShardWalFile{V1,V2}{Single,Multi}) and mock/target_segment_id_set_closer.go.
  • CI green (incl. -race).

cherep58 and others added 30 commits December 29, 2025 16:43
…llel_encoding

# Conflicts:
#	pp/go/storage/remotewriter/writeloop.go
…llel_encoding

# Conflicts:
#	pp/wal/tests/wal_tests.cpp
u-veles-a and others added 12 commits March 26, 2026 11:32
Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>
Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>
Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>
Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>
Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>
Base automatically changed from rw_refactoring to pp April 6, 2026 08:42
Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>
Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>
Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>
Copy link
Copy Markdown
Collaborator

@vporoshok vporoshok left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review summary

Splitting dataSource into dataSourceActive / dataSourceRotated is a sensible direction — the rotated head really has different invariants and the parallel Init fast-forward is a nice optimisation. Test coverage is materially improved (shard_test.go, iterator_test.go, datasourceV2_test.go, fixtures, mocks).

However there are a few issues that should be fixed before merge:

Blockers

  1. Histogram bucket regression in destination.gogenerate_batch_duration_seconds, read_segment_duration_seconds, encode_batch_duration_seconds lost their classic buckets (replaced with []float64{0}).
  2. Data race on dataSourceRotated.corruptMarker during the parallel Init. Logically safe (catalog.SetCorrupted is idempotent + serialised, both writes are nil), but go test -race will flag it. A //go:norace with explanation is enough.
  3. handleReadErrors lost the head %s is corrupted by read errors: %v log, and silently drops errors from MarkCorrupted itself — a head that should be marked corrupted will be retried as healthy on the next cycle.

Should fix

  1. Record.SetStatus is added as a public production mutator with the comment "!!! Use only for testing purposes !!!", but has no callers in this PR. Please drop it.
  2. Field name typo numberOfsamplesnumberOfSamples.
  3. New metric names: number_of_msgnumber_of_messages (no other metric in the namespace abbreviates), and the help texts ("Number of messages.", "Number of samples.") are too generic.
  4. byte_reader.Read doesn't update r.n, which makes the byte-accounting contract inconsistent with ReadByte and easy to misuse.
  5. shardRotated traversal of V1 segments works only by accident (V1 has no segment IDs, so selectNextSegment falls back to MaxUint32 for everything). Either restrict this path to V2 or add a comment explaining why "all shards report UnknownSegmentID" still produces a valid traversal.
  6. dataSourceRotated.finalize has a confusing loop that effectively breaks after the first call.
  7. LSSSnapshot() for corrupted shards allocates a fresh cppbridge.NewLssStorage() every call.

Other notes

  • Iterator.SendMessage calls numberOfsamples.Observe per message in a loop on the hot path — consider aggregating or guarding behind an aggregate metric for high-shard configs.
  • The semantics of i.targetSegmentID changed (now max(..., segment.ID+1) instead of ++). Worth a doc comment on the field/setter, and a mention in the PR description.
  • dataSourceActive.readFromShards and dataSourceRotated.readFromShards are 90% identical — could share a helper parametrised by the read function.
  • walReader.fsize ends up negative (-headerSize) after newWalReader and is only "fixed up" in newWalReaderRotated. Add a comment, otherwise it reads as a bug.
  • V1 Segment.ReadFrom reads the body via r, V2 readSegmentBody reads it via br. Same effect (ByteReader doesn't buffer), but the asymmetry is confusing.

Tests

  • Please add a test that exercises dataSourceRotated.Init under -race with a corruption-marker fake to catch (2).
  • The PR description mentions only the V2 split — please also call out the new DataSourceV2 interface (breaking change for implementers), the metric set changes (added number_of_msg, number_of_samples, removed buckets on three existing histograms), the parallel Init, and update the changelog per .cursor/rules/changelog.mdc.

Comment thread pp/go/storage/remotewriter/destination.go
Comment thread pp/go/storage/remotewriter/datasourceV2.go
Comment thread pp/go/storage/remotewriter/datasourceV2.go
Comment thread pp/go/storage/remotewriter/datasourceV2.go
Comment thread pp/go/storage/catalog/record.go Outdated
Comment thread pp/go/storage/remotewriter/destination.go Outdated
Comment thread pp/go/storage/remotewriter/destination.go Outdated
Comment thread pp/go/storage/head/shard/wal/reader/byte_reader.go
Comment thread pp/go/storage/remotewriter/shard.go
Comment thread pp/go/storage/remotewriter/shard.go
Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>
Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>
Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>
Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>
Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>
Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>
@vporoshok vporoshok merged commit 0613de8 into pp May 12, 2026
30 of 33 checks passed
@vporoshok vporoshok deleted the rw_refactoring_datasource branch May 12, 2026 09:53
gshigin pushed a commit that referenced this pull request May 13, 2026
* refactored unit tests

* renamed ProtobufEncoder to ProtobufEncoderOld

* extracted WAL::Encoder::Buffer into SegmentSamplesStorage

* rewrited remote write message encoding logic

* removed unused code

* removed using BasicTimeseries in ProtobufEncoder

* fixed compilation error

* renamed methods

* optimized prompp_wal_output_decoder_decode Go-binding

* removed old data types

* refactored RWMessageList

* review fixes

* created prompp_wal_segment_samples_storage_clear Go-binding

* changed buckets for metrics

* removed unused Iterator.targetSegmentIsPartiallyRead field

* used targetSegmentID from message

* fixed return ErrEndOfBlock from SendMessage

* fixed compilation error

* fixed use after free bug

* WIP

* add test

* WIP: add wal reader v2

* WIP

* WIP: SetLastSegmentID

* WIP: switch rw  v1v2

* WIP: update error messages

* WIP: update segment handling, switching between versions of WAL files

* WIP: add loader test

* WIP: add walReader test

* WIP: add shard_test

* WIP: add dataSourceV2, test

* WIP: add test

* WIP: add dataSourceActive,  dataSourceRotated, test

* WIP: refactoring test

* WIP: integrate ds

* WIP: add DS test

* WIP: clear

* WIP: corrupted

* WIP: add test file not exist

* WIP: add test shard corrupted

* WIP: test DS

* WIP: add test DS

* WIP: add test Iterator

* WIP: add bench iterator

* WIP: fix test

* WIP: fix metrics

* fix review

* fix review

* WIP: rebuild Record

* WIP: small fix

* fix after merge

Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>

* fix test

Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>

* fix test

Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>

* WIP: fix test

Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>

* WIP: description test

Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>

* WIP: add to test keepalive

Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>

* WIP: fix review

Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>

* WIP: add description

Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>

* WIP: fix test

Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>

* WIP: tuning test

Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>

* WIP: fix review

Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>

---------

Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>
Co-authored-by: Vladimir Pustovalov <cherep@sura.ru>
Co-authored-by: Bastrykov Evgeniy <vporoshok@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants