Skip to content

Conversation

@YannByron
Copy link
Contributor

Purpose

Linked issue: close #2376

Brief change log

Tests

API and Format

Documentation

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YannByron thanks, I left some comments.

return new GenericMap(newMap);
}

public static Object copyRow(Object o, DataType type) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method name conflicts with copyRow(InternalRow row, RowType rowType) and may be confusing. Can we rename it to copyValue to distinguish with copyRow?

Besides, since it is only called InternalRowUtils, we can change the visibility of this method and copyArray, copyMap to private.

protected val POLL_TIMEOUT: Duration = Duration.ofMillis(100)
protected lazy val conn: Connection = ConnectionFactory.createConnection(flussConfig)
protected lazy val table: Table = conn.getTable(tablePath)
protected lazy val tableInfo: TableInfo = conn.getAdmin.getTableInfo(tablePath).get()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tableInfo can be got from table: table.getTableInfo.

Comment on lines +132 to +135
FlussUpsertInputPartition(
tableBucket,
snapshotIdOpt.getAsLong,
logOffsetOpt.getAsLong
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a batch InputPartition, we should add an end offset to make the log split bounded. The latest end offset can be got from OffsetsInitializer.latest().getBucketOffsets(..) method.

We should:

  1. fetch the latest kvSnapshots, it is a map<bucket, snapshot_id&log_start_offset>.
  2. fetch the latest offset from OffsetsInitializer.latest, it is a map<bucket, log_end_offset>.
  3. Join the kvSnapshots and OffsetsInitializer.latest, to generate a input partition list for each bucket.

)
} else {
// No snapshot yet, only read log from beginning
FlussUpsertInputPartition(tableBucket, -1L, 0L)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use org.apache.fluss.client.table.scanner.log.LogScanner#EARLIEST_OFFSET instead of 0 to indicate reading log from beginning. Because the 0L offset maybe TTLed, and be thrown LogOffsetOutOfRangeException.

}

// Poll for more log records
val scanRecords: ScanRecords = logScanner.poll(POLL_TIMEOUT)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logScanner.poll() is a best-effort API: it may return an empty result due to transient issues (e.g., network glitches) even when unread log records remain on the server. Therefore, we should poll in a loop until we reach the known end offset.

The end offset should be determined at job startup using OffsetsInitializer.latest().getBucketOffsets(...), which gives us the high-watermark for each bucket at the beginning of the batch job.

Since there’s no built-in API to read a bounded log split, we must manually:

  • Skip any records with offsets beyond the precomputed end offset, and
  • Signal there is no next once all buckets have reached their respective end offsets.

logRecords = bucketRecords.iterator()
if (logRecords.hasNext) {
val scanRecord = logRecords.next()
currentRow = convertToSparkRow(scanRecord)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The LogRecord is a changelog that contains -D (delete) and -U (update-before) records. To produce a consistent view, we need to merge these changes with the KV snapshot data in a union-read fashion—just like how we combine data lake snapshots with changelogs.

Fortunately, the KV snapshot scan is already sorted by primary key. We can leverage this by:

  1. Materializing the delta changes into a temporary delta table;
  2. Sorting the delta table by primary key using org.apache.fluss.row.encode.KeyEncoder#of(...);
  3. Performing a sort-merge between the sorted KV snapshot reader and the sorted delta table reader.

This enables an efficient and correct merge without requiring random lookups or hash-based joins.

Comment on lines +192 to +199
checkAnswer(
sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId"),
Row(600L, 21L, 601, "addr1", "2026-01-01") ::
Row(700L, 220L, 602, "addr2_updated", "2026-01-01") ::
Row(800L, 23L, 603, "addr3", "2026-01-02") ::
Row(900L, 240L, 604, "addr4_updated", "2026-01-02") ::
Row(1000L, 25L, 605, "addr5", "2026-01-03") ::
Row(1100L, 260L, 606, "addr6", "2026-01-03") ::
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, the test passes even if changelog reading is not properly implemented. This is because the test base uses a very short KV snapshot interval (1 second), so the reader always falls back to the KV snapshot and never actually consumes the changelog.

I think it’s acceptable to keep this as-is for now, since we plan to refactor the changelog merge-read logic in upcoming PRs, as discussed offline. But please create an issue to track this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[spark] Support Spark Batch Read

2 participants