-
Notifications
You must be signed in to change notification settings - Fork 487
[flink] changelog read support for pk table without pushdown optimizations #2347
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Great! Thank you @MehulBatra , I will review it. |
wuchong
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @MehulBatra for the contribution. I left some comments.
...ss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
Show resolved
Hide resolved
...ss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
Show resolved
Hide resolved
...ss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
Show resolved
Hide resolved
...ss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
Show resolved
Hide resolved
...fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/RecordToFlinkRowConverter.java
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
Show resolved
Hide resolved
...nk-1.20/src/test/java/org/apache/fluss/flink/source/Flink120ChangelogVirtualTableITCase.java
Show resolved
Hide resolved
| // Parse the row to validate structure | ||
| String[] parts = result.substring(3, result.length() - 1).split(", ", 6); | ||
|
|
||
| // Validate change type column | ||
| assertThat(parts[0]).isEqualTo("+I"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can set ManualClock to the FlussClusterExtension to control the commit timestamp by setting the current time CLOCK.advanceTime(xxxxx, TimeUnit.MILLISECONDS), see the usage in FlinkTableSourceITCase.
In this way, we can assert the changelog result by comparing the whole row string. (set bucket to 1 to have consistent log_offset numbers)
| /** | ||
| * Creates a virtual $changelog table by modifying the base table's to include metadata columns. | ||
| */ | ||
| private CatalogBaseTable getVirtualChangelogTable(ObjectPath objectPath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add tests in FlinkCatalogITCase to assert the CatalogTable got from a $changelog table.
Purpose
Linked issue: close #2333
Brief change log
File: ChangelogFlinkTableSource.java
Purpose: Flink table source for $changelog virtual tables
────────────────────────────────────────
File: ChangelogDeserializationSchema.java
Purpose: Deserializes LogRecord → RowData with metadata
────────────────────────────────────────
File: ChangelogRowConverter.java
Purpose: Converts records, adds _change_type, _log_offset,
commit_timestamp
────────────────────────────────────────
File: RecordToFlinkRowConverter.java
Purpose: Interface for record converters
────────────────────────────────────────
File: FlinkTableFactory.java
Change: Routes to ChangelogFlinkTableSource for changelog
tables
────────────────────────────────────────
File: TableDescriptor.java
Change: Added CHANGELOG* column name constants
────────────────────────────────────────
File: TableDescriptorValidation.java
Change: Validates reserved column names
Test Coverage
Test: testChangelogVirtualTableSchema
Type: Schema
What it verifies: DESCRIBE shows correct columns
────────────────────────────────────────
File: FlinkCatalog.java
Change: Detects $changelog suffix, builds virtual table
schema
Tests
File: ChangelogVirtualTableITCase.java
Purpose: Integration tests (7 tests)
────────────────────────────────────────
File: ChangelogRowConverterTest.java
Purpose: Unit tests for converter
────────────────────────────────────────
API and Format
Documentation