Skip to content

FirstRowMergeEngine: Read Stuck for Hours Due to Empty Log Entries in Projection Pushdown Queries #2369

@loserwang1024

Description

@loserwang1024

Search before asking

  • I searched in the issues and found nothing similar.

Fluss version

0.8.0 (latest release)

Please describe the bug 🐞

Problem

When reading from a FirstRowMergeEngine table, the read operation occasionally gets stuck for several hours. During this period, the fetch delay suddenly increases from around 100ms to several hours. Upon investigating the logs, I observed the following behavior:

The fetch logs continuously print, but no data is actually read:

2026-01-12 20:13:57,856 INFO  org.apache.fluss.client.table.scanner.log.LogFetcher         [] - Preparing fetch request for bucket TableBucket{tableId=17, bucket=1} to server 1
2026-01-12 20:13:57,860 INFO  org.apache.fluss.client.table.scanner.log.LogFetcher         [] - Preparing fetch request for bucket TableBucket{tableId=17, bucket=1} to server 1

.... the repeated log...
2026-01-12 21:51:21,371  INFO  org.apache.fluss.client.table.scanner.log.LogFetcher         [] - Preparing fetch request for bucket TableBucket{tableId=17, bucket=1} to server 1

2026-01-12 21:51:22,390  INFO  org.apache.fluss.client.table.scanner.log.RemoteLogDownloader [] - Successfully downloaded remote log segment file  84cbd058-d9f7-487c-b401-e24c67da3c49_00000000000004368958.log to local cost 8718 ms.
026-01-12 21:51:23,491 [DownloadRemoteLog-[ods.sales_flat_order_item_create]] INFO  org.apache.fluss.client.table.scanner.log.RemoteLogDownloader [] - Consumed and deleted the fetched log segment file 84cbd058-d9f7-487c-b401-e24c67da3c49_00000000000004368958.log for bucket TableBucket{tableId=17, bucket=1}.

This indicates that the fetch requests were repeatedly sent for over an hour without returning any data. This is unexpected because the logs do exist.

Cause

After reviewing the code, I identified the following issue:
In the FirstRowMergeEngine, when an update operation occurs, it generates an empty log entry (but with a valid offset). During projection pushdown queries, if the message content is empty, it gets filtered out (fluss-417).

Image

If a read batch happens to contain only these empty messages, the result returned will also be empty. This prevents the client from advancing the offset, causing it to repeatedly send ineffective fetch requests. The issue persists until a remote log segment is generated. However, since the user's actual data volume is small, generating remote logs can be slow, leading to scenarios where certain partitions remain unreadable for extended periods.

It is important to note that no data is lost; however, data retrieval becomes significantly delayed.

Example

This issue is easily triggered under the following conditions:

Assume we have two local log segments(one is active , another is not active). Each time we read with client.scanner.log.fetch.max-bytes-for-bucket = 2500 Bytes

  1. The first read starts at offset 100, returning data between offsets 100 and 200. The data between offsets 201 and 202 is empty and thus gets filtered out.
  2. The second read starts at offset 201, only 201 and 202 will be read( each time only read one segment, thus the next active segment won't be read in this batch). However, all 201 and 202 are all empty log which will be filtered. Thus this round, will return an emty batch. The offset in next fetch request won't be moved.
  3. then will alway repeat reading starts at offset 201 until segment 1 is deleted thus will read remote file.
Image

Min Reproduce Step

I have modify org.apache.fluss.client.table.FlussTableITCase#testFirstRowMergeEngine.

  • If doProjection = true, will be stuck forever.
  • If doProjection = false, will execute quickily.
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    void testFirstRowMergeEngine(boolean doProjection) throws Exception {
        Configuration conf = initConfig();
        // To better mock the issue: 
        // 1. disable remote log task so that won't read remote log.
        // 2. Set LOG_SEGMENT_FILE_SIZE to make sure one segment before last segment is contain
        // empty batch at the end.
        //  In this way, if skip empty batch, the read will in stuck forever.
        conf.set(ConfigOptions.REMOTE_LOG_TASK_INTERVAL_DURATION, Duration.ZERO);
        conf.set(
                ConfigOptions.LOG_SEGMENT_FILE_SIZE,
                new MemorySize(5 * V0_RECORD_BATCH_HEADER_SIZE));
        conf.set(
                ConfigOptions.CLIENT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET,
                new MemorySize(5 * V0_RECORD_BATCH_HEADER_SIZE));
        final FlussClusterExtension flussClusterExtension =
                FlussClusterExtension.builder()
                        .setNumOfTabletServers(3)
                        .setClusterConf(conf)
                        .build();
        flussClusterExtension.start();

        TableDescriptor tableDescriptor =
                TableDescriptor.builder()
                        .schema(DATA1_SCHEMA_PK)
                        .property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngineType.FIRST_ROW)
                        .build();
        RowType rowType = DATA1_SCHEMA_PK.getRowType();
        String tableName =
                String.format(
                        "test_first_row_merge_engine_with_%s",
                        doProjection ? "projection" : "no_projection");
        TablePath tablePath = TablePath.of("test_db_1", tableName);

        int rows = 5;
        int duplicateNum = 10;
        int batchSize = 3;
        int count = 0;
        // Case1: Test normal update to generator not empty cdc logs.
        Table table = null;
        LogScanner logScanner = null;
        try (Connection connection =
                        ConnectionFactory.createConnection(
                                flussClusterExtension.getClientConfig());
                Admin admin = connection.getAdmin()) {
            admin.createDatabase(tablePath.getDatabaseName(), DatabaseDescriptor.EMPTY, false)
                    .get();
            admin.createTable(tablePath, tableDescriptor, false).get();
            table = connection.getTable(tablePath);
            // first, put rows
            UpsertWriter upsertWriter = table.newUpsert().createWriter();
            List<InternalRow> expectedScanRows = new ArrayList<>(rows);
            List<InternalRow> expectedLookupRows = new ArrayList<>(rows);
            for (int id = 0; id < rows; id++) {
                for (int num = 0; num < duplicateNum; num++) {
                    upsertWriter.upsert(row(id, "value_" + num));
                    if (count++ > batchSize) {
                        upsertWriter.flush();
                        count = 0;
                    }
                }

                expectedLookupRows.add(row(id, "value_0"));
                expectedScanRows.add(doProjection ? row(id) : row(id, "value_0"));
            }

            upsertWriter.flush();

            Lookuper lookuper = table.newLookup().createLookuper();
            // now, get rows by lookup
            for (int id = 0; id < rows; id++) {
                InternalRow gotRow = lookuper.lookup(row(id)).get().getSingletonRow();
                assertThatRow(gotRow).withSchema(rowType).isEqualTo(expectedLookupRows.get(id));
            }

            Scan scan = table.newScan();
            if (doProjection) {
                scan = scan.project(new int[] {0}); // do projection.
            }
            logScanner = scan.createLogScanner();

            logScanner.subscribeFromBeginning(0);
            List<ScanRecord> actualLogRecords = new ArrayList<>(0);
            while (actualLogRecords.size() < rows) {
                ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
                scanRecords.forEach(actualLogRecords::add);
            }
            assertThat(actualLogRecords).hasSize(rows);
            for (int i = 0; i < actualLogRecords.size(); i++) {
                ScanRecord scanRecord = actualLogRecords.get(i);
                assertThat(scanRecord.getChangeType()).isEqualTo(ChangeType.INSERT);
                assertThatRow(scanRecord.getRow())
                        .withSchema(doProjection ? rowType.project(new int[] {0}) : rowType)
                        .isEqualTo(expectedScanRows.get(i));
            }

            // Case2: Test all the update in the write batch are duplicate(Thus generate empty cdc
            // logs).
            // insert duplicate rows again to generate empty cdc log.
            for (int num = 0; num < duplicateNum; num++) {
                upsertWriter.upsert(row(0, "value_" + num));
                upsertWriter.flush();
            }

            // insert a new row.
            upsertWriter.upsert(row(rows + 1, "new_value"));

            actualLogRecords = new ArrayList<>(0);
            while (actualLogRecords.isEmpty()) {
                ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
                scanRecords.forEach(actualLogRecords::add);
            }
            logScanner.close();
            assertThat(actualLogRecords).hasSize(1);
            ScanRecord scanRecord = actualLogRecords.get(0);
            assertThat(scanRecord.getChangeType()).isEqualTo(ChangeType.INSERT);
            assertThatRow(scanRecord.getRow())
                    .withSchema(doProjection ? rowType.project(new int[] {0}) : rowType)
                    .isEqualTo(doProjection ? row(rows + 1) : row(rows + 1, "new_value"));
        } finally {
            if (logScanner != null) {
                logScanner.close();
            }
            if (table != null) {
                table.close();
            }
            flussClusterExtension.close();
        }
    }

Solution

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions