Skip to content

Conversation

@MehulBatra
Copy link
Contributor

@MehulBatra MehulBatra commented Jan 11, 2026

Purpose

Linked issue: close #2333

Brief change log

File: ChangelogFlinkTableSource.java
Purpose: Flink table source for $changelog virtual tables
────────────────────────────────────────
File: ChangelogDeserializationSchema.java
Purpose: Deserializes LogRecord → RowData with metadata
────────────────────────────────────────
File: ChangelogRowConverter.java
Purpose: Converts records, adds _change_type, _log_offset,
commit_timestamp
────────────────────────────────────────
File: RecordToFlinkRowConverter.java
Purpose: Interface for record converters
────────────────────────────────────────
File: FlinkTableFactory.java
Change: Routes to ChangelogFlinkTableSource for changelog
tables
────────────────────────────────────────
File: TableDescriptor.java
Change: Added CHANGELOG
* column name constants
────────────────────────────────────────
File: TableDescriptorValidation.java
Change: Validates reserved column names
Test Coverage
Test: testChangelogVirtualTableSchema
Type: Schema
What it verifies: DESCRIBE shows correct columns
────────────────────────────────────────
File: FlinkCatalog.java
Change: Detects $changelog suffix, builds virtual table
schema

  • Access via SELECT * FROM table$changelog
  • Metadata columns: _change_type (VARCHAR), _log_offset (BIGINT), _commit_timestamp (TIMESTAMP)
  • All records output as INSERT (change type in column)
  • Supports PK tables with change types: +I, -U, +U, -D

Tests

File: ChangelogVirtualTableITCase.java
Purpose: Integration tests (7 tests)
────────────────────────────────────────
File: ChangelogRowConverterTest.java
Purpose: Unit tests for converter
────────────────────────────────────────

API and Format

Documentation

@MehulBatra MehulBatra requested a review from wuchong January 11, 2026 20:13
@wuchong
Copy link
Member

wuchong commented Jan 12, 2026

Great! Thank you @MehulBatra , I will review it.

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @MehulBatra for the contribution. I left some comments.

Comment on lines +404 to +408
// Parse the row to validate structure
String[] parts = result.substring(3, result.length() - 1).split(", ", 6);

// Validate change type column
assertThat(parts[0]).isEqualTo("+I");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can set ManualClock to the FlussClusterExtension to control the commit timestamp by setting the current time CLOCK.advanceTime(xxxxx, TimeUnit.MILLISECONDS), see the usage in FlinkTableSourceITCase.

In this way, we can assert the changelog result by comparing the whole row string. (set bucket to 1 to have consistent log_offset numbers)

/**
* Creates a virtual $changelog table by modifying the base table's to include metadata columns.
*/
private CatalogBaseTable getVirtualChangelogTable(ObjectPath objectPath)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add tests in FlinkCatalogITCase to assert the CatalogTable got from a $changelog table.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[flink] Basic $changelog read support without pushdown optimizations for primary key table

2 participants