Rw refactoring datasource#251
Merged
Merged
Conversation
…llel_encoding # Conflicts: # pp/go/storage/remotewriter/writeloop.go
…llel_encoding # Conflicts: # pp/wal/tests/wal_tests.cpp
Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>
Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>
Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>
Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>
Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>
Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>
vporoshok
requested changes
Apr 29, 2026
Collaborator
vporoshok
left a comment
There was a problem hiding this comment.
Review summary
Splitting dataSource into dataSourceActive / dataSourceRotated is a sensible direction — the rotated head really has different invariants and the parallel Init fast-forward is a nice optimisation. Test coverage is materially improved (shard_test.go, iterator_test.go, datasourceV2_test.go, fixtures, mocks).
However there are a few issues that should be fixed before merge:
Blockers
- Histogram bucket regression in
destination.go—generate_batch_duration_seconds,read_segment_duration_seconds,encode_batch_duration_secondslost their classic buckets (replaced with[]float64{0}). - Data race on
dataSourceRotated.corruptMarkerduring the parallelInit. Logically safe (catalog.SetCorruptedis idempotent + serialised, both writes arenil), butgo test -racewill flag it. A//go:noracewith explanation is enough. handleReadErrorslost thehead %s is corrupted by read errors: %vlog, and silently drops errors fromMarkCorrupteditself — a head that should be marked corrupted will be retried as healthy on the next cycle.
Should fix
Record.SetStatusis added as a public production mutator with the comment "!!! Use only for testing purposes !!!", but has no callers in this PR. Please drop it.- Field name typo
numberOfsamples→numberOfSamples. - New metric names:
number_of_msg→number_of_messages(no other metric in the namespace abbreviates), and the help texts ("Number of messages.","Number of samples.") are too generic. byte_reader.Readdoesn't updater.n, which makes the byte-accounting contract inconsistent withReadByteand easy to misuse.shardRotatedtraversal of V1 segments works only by accident (V1 has no segment IDs, soselectNextSegmentfalls back toMaxUint32for everything). Either restrict this path to V2 or add a comment explaining why "all shards reportUnknownSegmentID" still produces a valid traversal.dataSourceRotated.finalizehas a confusing loop that effectively breaks after the first call.LSSSnapshot()for corrupted shards allocates a freshcppbridge.NewLssStorage()every call.
Other notes
Iterator.SendMessagecallsnumberOfsamples.Observeper message in a loop on the hot path — consider aggregating or guarding behind an aggregate metric for high-shard configs.- The semantics of
i.targetSegmentIDchanged (nowmax(..., segment.ID+1)instead of++). Worth a doc comment on the field/setter, and a mention in the PR description. dataSourceActive.readFromShardsanddataSourceRotated.readFromShardsare 90% identical — could share a helper parametrised by the read function.walReader.fsizeends up negative (-headerSize) afternewWalReaderand is only "fixed up" innewWalReaderRotated. Add a comment, otherwise it reads as a bug.- V1
Segment.ReadFromreads the body viar, V2readSegmentBodyreads it viabr. Same effect (ByteReader doesn't buffer), but the asymmetry is confusing.
Tests
- Please add a test that exercises
dataSourceRotated.Initunder-racewith a corruption-marker fake to catch (2). - The PR description mentions only the V2 split — please also call out the new
DataSourceV2interface (breaking change for implementers), the metric set changes (addednumber_of_msg,number_of_samples, removed buckets on three existing histograms), the parallelInit, and update the changelog per.cursor/rules/changelog.mdc.
Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>
Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>
Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>
Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>
Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>
Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com>
vporoshok
approved these changes
May 12, 2026
gshigin
pushed a commit
that referenced
this pull request
May 13, 2026
* refactored unit tests * renamed ProtobufEncoder to ProtobufEncoderOld * extracted WAL::Encoder::Buffer into SegmentSamplesStorage * rewrited remote write message encoding logic * removed unused code * removed using BasicTimeseries in ProtobufEncoder * fixed compilation error * renamed methods * optimized prompp_wal_output_decoder_decode Go-binding * removed old data types * refactored RWMessageList * review fixes * created prompp_wal_segment_samples_storage_clear Go-binding * changed buckets for metrics * removed unused Iterator.targetSegmentIsPartiallyRead field * used targetSegmentID from message * fixed return ErrEndOfBlock from SendMessage * fixed compilation error * fixed use after free bug * WIP * add test * WIP: add wal reader v2 * WIP * WIP: SetLastSegmentID * WIP: switch rw v1v2 * WIP: update error messages * WIP: update segment handling, switching between versions of WAL files * WIP: add loader test * WIP: add walReader test * WIP: add shard_test * WIP: add dataSourceV2, test * WIP: add test * WIP: add dataSourceActive, dataSourceRotated, test * WIP: refactoring test * WIP: integrate ds * WIP: add DS test * WIP: clear * WIP: corrupted * WIP: add test file not exist * WIP: add test shard corrupted * WIP: test DS * WIP: add test DS * WIP: add test Iterator * WIP: add bench iterator * WIP: fix test * WIP: fix metrics * fix review * fix review * WIP: rebuild Record * WIP: small fix * fix after merge Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com> * fix test Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com> * fix test Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com> * WIP: fix test Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com> * WIP: description test Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com> * WIP: add to test keepalive Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com> * WIP: fix review Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com> * WIP: add description Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com> * WIP: fix test Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com> * WIP: tuning test Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com> * WIP: fix review Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com> --------- Signed-off-by: Alexandr Yudin <57181751+u-veles-a@users.noreply.github.com> Co-authored-by: Vladimir Pustovalov <cherep@sura.ru> Co-authored-by: Bastrykov Evgeniy <vporoshok@gmail.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Splits the remote-write
dataSourceinto two specialised implementations to optimise WAL v2 reads:dataSourceActive— drives the currently-writing head. UsesSegmentReadyCheckerto discover which shard owns each segment and reads only ready ones, so we no longer try to read sync-only data ahead of what the writer has committed.dataSourceRotated— drives a rotated/persisting head. The WAL is fully written, soInitfast-forwards each shard to the cachedtargetSegmentIDin parallel (one goroutine per shard), and subsequentNextcalls pick the shard with the lowest pending segment ID viaselectNextSegment.writeloop.makeDataSourceselects the right implementation based onheadRecord.Status()(StatusNew/StatusActive→ active, otherwise rotated). Each variant only keeps the state it actually needs, and the hot read path no longer branches on head status.Notable changes
DataSourceV2interface (Close/Init/LSSSnapshots/Next/NumberOfLSSes/WriteCaches) replaces the previousDataSource.Iteratoris rewired through it;targetSegmentIDis now advanced per actually-decoded segment (max(targetSegmentID, segment.ID+1)) instead of a blind++.shardis split into a "live"shardand ashardRotatedthat reads segment ID and body separately (ReadSegmentID+ReadSegmentBody), so the rotated traversal can choose the next shard without prefetching full segments. For V1 WALs the segment ID is synthesised fromwalReader.nextSegmentID; for V2 it is read from the file. Documented inline inwalreader.go/shard.go.walReadertracks remaining bytes (fsize) and exposesIsEOF, used only by the rotated reader.caches[T io.WriterTo]helper shared by both data sources.DestinationMetricsconstruction extracted intonewDestinationMetrics. Histogram bucket configuration forgenerate_batch_duration_seconds,read_segment_duration_seconds,encode_batch_duration_secondsis preserved.ByteReadernow exposes a non-trackingRead(p)for bulk reads viaio.ReadFull, and renames theReadByte-only counter toreadBytesto make the contract explicit.markCorruptedon both data sources is annotated with//go:norace(safe under the race becausecatalog.SetCorruptedis idempotent and serialised, and the only write tocorruptMarkerisnil), logs the underlying read errors before marking, and propagates errors from the marker instead of swallowing them.Test plan
shard_test.go,iterator_test.go,datasourceV2_test.go,walreader_test.go::TestReadIDAndBodyV1/V2, plusremotewritertestfixtures (MakeCatalog,WriteToShardWalFile{V1,V2}{Single,Multi}) andmock/target_segment_id_set_closer.go.-race).