-
Notifications
You must be signed in to change notification settings - Fork 486
[spark] support batch read from fluss cluster #2377
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
wuchong
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@YannByron thanks, I left some comments.
| return new GenericMap(newMap); | ||
| } | ||
|
|
||
| public static Object copyRow(Object o, DataType type) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method name conflicts with copyRow(InternalRow row, RowType rowType) and may be confusing. Can we rename it to copyValue to distinguish with copyRow?
Besides, since it is only called InternalRowUtils, we can change the visibility of this method and copyArray, copyMap to private.
| protected val POLL_TIMEOUT: Duration = Duration.ofMillis(100) | ||
| protected lazy val conn: Connection = ConnectionFactory.createConnection(flussConfig) | ||
| protected lazy val table: Table = conn.getTable(tablePath) | ||
| protected lazy val tableInfo: TableInfo = conn.getAdmin.getTableInfo(tablePath).get() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tableInfo can be got from table: table.getTableInfo.
| FlussUpsertInputPartition( | ||
| tableBucket, | ||
| snapshotIdOpt.getAsLong, | ||
| logOffsetOpt.getAsLong |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is a batch InputPartition, we should add an end offset to make the log split bounded. The latest end offset can be got from OffsetsInitializer.latest().getBucketOffsets(..) method.
We should:
- fetch the latest
kvSnapshots, it is amap<bucket, snapshot_id&log_start_offset>. - fetch the latest offset from
OffsetsInitializer.latest, it is amap<bucket, log_end_offset>. - Join the
kvSnapshotsandOffsetsInitializer.latest, to generate a input partition list for each bucket.
| ) | ||
| } else { | ||
| // No snapshot yet, only read log from beginning | ||
| FlussUpsertInputPartition(tableBucket, -1L, 0L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use org.apache.fluss.client.table.scanner.log.LogScanner#EARLIEST_OFFSET instead of 0 to indicate reading log from beginning. Because the 0L offset maybe TTLed, and be thrown LogOffsetOutOfRangeException.
| } | ||
|
|
||
| // Poll for more log records | ||
| val scanRecords: ScanRecords = logScanner.poll(POLL_TIMEOUT) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logScanner.poll() is a best-effort API: it may return an empty result due to transient issues (e.g., network glitches) even when unread log records remain on the server. Therefore, we should poll in a loop until we reach the known end offset.
The end offset should be determined at job startup using OffsetsInitializer.latest().getBucketOffsets(...), which gives us the high-watermark for each bucket at the beginning of the batch job.
Since there’s no built-in API to read a bounded log split, we must manually:
- Skip any records with offsets beyond the precomputed end offset, and
- Signal there is no
nextonce all buckets have reached their respective end offsets.
| logRecords = bucketRecords.iterator() | ||
| if (logRecords.hasNext) { | ||
| val scanRecord = logRecords.next() | ||
| currentRow = convertToSparkRow(scanRecord) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The LogRecord is a changelog that contains -D (delete) and -U (update-before) records. To produce a consistent view, we need to merge these changes with the KV snapshot data in a union-read fashion—just like how we combine data lake snapshots with changelogs.
Fortunately, the KV snapshot scan is already sorted by primary key. We can leverage this by:
- Materializing the delta changes into a temporary
deltatable; - Sorting the
deltatable by primary key usingorg.apache.fluss.row.encode.KeyEncoder#of(...); - Performing a sort-merge between the sorted KV snapshot reader and the sorted delta table reader.
This enables an efficient and correct merge without requiring random lookups or hash-based joins.
| checkAnswer( | ||
| sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId"), | ||
| Row(600L, 21L, 601, "addr1", "2026-01-01") :: | ||
| Row(700L, 220L, 602, "addr2_updated", "2026-01-01") :: | ||
| Row(800L, 23L, 603, "addr3", "2026-01-02") :: | ||
| Row(900L, 240L, 604, "addr4_updated", "2026-01-02") :: | ||
| Row(1000L, 25L, 605, "addr5", "2026-01-03") :: | ||
| Row(1100L, 260L, 606, "addr6", "2026-01-03") :: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, the test passes even if changelog reading is not properly implemented. This is because the test base uses a very short KV snapshot interval (1 second), so the reader always falls back to the KV snapshot and never actually consumes the changelog.
I think it’s acceptable to keep this as-is for now, since we plan to refactor the changelog merge-read logic in upcoming PRs, as discussed offline. But please create an issue to track this.
Purpose
Linked issue: close #2376
Brief change log
Tests
API and Format
Documentation