-
Notifications
You must be signed in to change notification settings - Fork 493
Description
Search before asking
- I searched in the issues and found nothing similar.
Fluss version
0.8.0 (latest release)
Please describe the bug 🐞
Problem
When reading from a FirstRowMergeEngine table, the read operation occasionally gets stuck for several hours. During this period, the fetch delay suddenly increases from around 100ms to several hours. Upon investigating the logs, I observed the following behavior:
The fetch logs continuously print, but no data is actually read:
2026-01-12 20:13:57,856 INFO org.apache.fluss.client.table.scanner.log.LogFetcher [] - Preparing fetch request for bucket TableBucket{tableId=17, bucket=1} to server 1
2026-01-12 20:13:57,860 INFO org.apache.fluss.client.table.scanner.log.LogFetcher [] - Preparing fetch request for bucket TableBucket{tableId=17, bucket=1} to server 1
.... the repeated log...
2026-01-12 21:51:21,371 INFO org.apache.fluss.client.table.scanner.log.LogFetcher [] - Preparing fetch request for bucket TableBucket{tableId=17, bucket=1} to server 1
2026-01-12 21:51:22,390 INFO org.apache.fluss.client.table.scanner.log.RemoteLogDownloader [] - Successfully downloaded remote log segment file 84cbd058-d9f7-487c-b401-e24c67da3c49_00000000000004368958.log to local cost 8718 ms.
026-01-12 21:51:23,491 [DownloadRemoteLog-[ods.sales_flat_order_item_create]] INFO org.apache.fluss.client.table.scanner.log.RemoteLogDownloader [] - Consumed and deleted the fetched log segment file 84cbd058-d9f7-487c-b401-e24c67da3c49_00000000000004368958.log for bucket TableBucket{tableId=17, bucket=1}.This indicates that the fetch requests were repeatedly sent for over an hour without returning any data. This is unexpected because the logs do exist.
Cause
After reviewing the code, I identified the following issue:
In the FirstRowMergeEngine, when an update operation occurs, it generates an empty log entry (but with a valid offset). During projection pushdown queries, if the message content is empty, it gets filtered out (fluss-417).
If a read batch happens to contain only these empty messages, the result returned will also be empty. This prevents the client from advancing the offset, causing it to repeatedly send ineffective fetch requests. The issue persists until a remote log segment is generated. However, since the user's actual data volume is small, generating remote logs can be slow, leading to scenarios where certain partitions remain unreadable for extended periods.
It is important to note that no data is lost; however, data retrieval becomes significantly delayed.
Example
This issue is easily triggered under the following conditions:
Assume we have two local log segments(one is active , another is not active). Each time we read with client.scanner.log.fetch.max-bytes-for-bucket = 2500 Bytes
- The first read starts at offset 100, returning data between offsets 100 and 200. The data between offsets 201 and 202 is empty and thus gets filtered out.
- The second read starts at offset 201, only 201 and 202 will be read( each time only read one segment, thus the next active segment won't be read in this batch). However, all 201 and 202 are all empty log which will be filtered. Thus this round, will return an emty batch. The offset in next fetch request won't be moved.
- then will alway repeat reading starts at offset 201 until segment 1 is deleted thus will read remote file.
Min Reproduce Step
I have modify org.apache.fluss.client.table.FlussTableITCase#testFirstRowMergeEngine.
- If doProjection = true, will be stuck forever.
- If doProjection = false, will execute quickily.
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testFirstRowMergeEngine(boolean doProjection) throws Exception {
Configuration conf = initConfig();
// To better mock the issue:
// 1. disable remote log task so that won't read remote log.
// 2. Set LOG_SEGMENT_FILE_SIZE to make sure one segment before last segment is contain
// empty batch at the end.
// In this way, if skip empty batch, the read will in stuck forever.
conf.set(ConfigOptions.REMOTE_LOG_TASK_INTERVAL_DURATION, Duration.ZERO);
conf.set(
ConfigOptions.LOG_SEGMENT_FILE_SIZE,
new MemorySize(5 * V0_RECORD_BATCH_HEADER_SIZE));
conf.set(
ConfigOptions.CLIENT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET,
new MemorySize(5 * V0_RECORD_BATCH_HEADER_SIZE));
final FlussClusterExtension flussClusterExtension =
FlussClusterExtension.builder()
.setNumOfTabletServers(3)
.setClusterConf(conf)
.build();
flussClusterExtension.start();
TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(DATA1_SCHEMA_PK)
.property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngineType.FIRST_ROW)
.build();
RowType rowType = DATA1_SCHEMA_PK.getRowType();
String tableName =
String.format(
"test_first_row_merge_engine_with_%s",
doProjection ? "projection" : "no_projection");
TablePath tablePath = TablePath.of("test_db_1", tableName);
int rows = 5;
int duplicateNum = 10;
int batchSize = 3;
int count = 0;
// Case1: Test normal update to generator not empty cdc logs.
Table table = null;
LogScanner logScanner = null;
try (Connection connection =
ConnectionFactory.createConnection(
flussClusterExtension.getClientConfig());
Admin admin = connection.getAdmin()) {
admin.createDatabase(tablePath.getDatabaseName(), DatabaseDescriptor.EMPTY, false)
.get();
admin.createTable(tablePath, tableDescriptor, false).get();
table = connection.getTable(tablePath);
// first, put rows
UpsertWriter upsertWriter = table.newUpsert().createWriter();
List<InternalRow> expectedScanRows = new ArrayList<>(rows);
List<InternalRow> expectedLookupRows = new ArrayList<>(rows);
for (int id = 0; id < rows; id++) {
for (int num = 0; num < duplicateNum; num++) {
upsertWriter.upsert(row(id, "value_" + num));
if (count++ > batchSize) {
upsertWriter.flush();
count = 0;
}
}
expectedLookupRows.add(row(id, "value_0"));
expectedScanRows.add(doProjection ? row(id) : row(id, "value_0"));
}
upsertWriter.flush();
Lookuper lookuper = table.newLookup().createLookuper();
// now, get rows by lookup
for (int id = 0; id < rows; id++) {
InternalRow gotRow = lookuper.lookup(row(id)).get().getSingletonRow();
assertThatRow(gotRow).withSchema(rowType).isEqualTo(expectedLookupRows.get(id));
}
Scan scan = table.newScan();
if (doProjection) {
scan = scan.project(new int[] {0}); // do projection.
}
logScanner = scan.createLogScanner();
logScanner.subscribeFromBeginning(0);
List<ScanRecord> actualLogRecords = new ArrayList<>(0);
while (actualLogRecords.size() < rows) {
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
scanRecords.forEach(actualLogRecords::add);
}
assertThat(actualLogRecords).hasSize(rows);
for (int i = 0; i < actualLogRecords.size(); i++) {
ScanRecord scanRecord = actualLogRecords.get(i);
assertThat(scanRecord.getChangeType()).isEqualTo(ChangeType.INSERT);
assertThatRow(scanRecord.getRow())
.withSchema(doProjection ? rowType.project(new int[] {0}) : rowType)
.isEqualTo(expectedScanRows.get(i));
}
// Case2: Test all the update in the write batch are duplicate(Thus generate empty cdc
// logs).
// insert duplicate rows again to generate empty cdc log.
for (int num = 0; num < duplicateNum; num++) {
upsertWriter.upsert(row(0, "value_" + num));
upsertWriter.flush();
}
// insert a new row.
upsertWriter.upsert(row(rows + 1, "new_value"));
actualLogRecords = new ArrayList<>(0);
while (actualLogRecords.isEmpty()) {
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
scanRecords.forEach(actualLogRecords::add);
}
logScanner.close();
assertThat(actualLogRecords).hasSize(1);
ScanRecord scanRecord = actualLogRecords.get(0);
assertThat(scanRecord.getChangeType()).isEqualTo(ChangeType.INSERT);
assertThatRow(scanRecord.getRow())
.withSchema(doProjection ? rowType.project(new int[] {0}) : rowType)
.isEqualTo(doProjection ? row(rows + 1) : row(rows + 1, "new_value"));
} finally {
if (logScanner != null) {
logScanner.close();
}
if (table != null) {
table.close();
}
flussClusterExtension.close();
}
}Solution
No response
Are you willing to submit a PR?
- I'm willing to submit a PR!