[flink] Add union read support to datastream#3432
Conversation
There was a problem hiding this comment.
Pull request overview
This PR fixes a functional gap in the Flink DataStream connector: FlussSourceBuilder now constructs and wires a LakeSource into the underlying FlinkSource when reading a datalake-enabled table in full startup mode, enabling true Fluss+Lake historical/real-time union reads for DataStream jobs (previously only available via the Flink SQL/Table connector).
Changes:
- Create a
LakeSourceinFlussSourceBuilder.build()when the table has datalake enabled and the source starts fromOffsetsInitializer.full()(snapshot mode), including projection pushdown to the lake source. - Extend
FlussSourceconstructors to accept and forward an optionalLakeSourcetoFlinkSource. - Add Iceberg integration tests that validate DataStream union read semantics for log tables, PK tables, and projection pushdown.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadDataStreamITCase.java |
Adds DataStream-focused union read IT coverage (Iceberg tiered data + Fluss-only data, incl. projection). |
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java |
Creates/configures a LakeSource for full startup on datalake-enabled tables and passes it into the built source. |
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java |
Plumbs the optional LakeSource through to FlinkSource to activate existing hybrid lake+Fluss split logic. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
fresh-borzoni
left a comment
There was a problem hiding this comment.
@polyzos Ty for the PR, left some minor comments, PTAL
| if (tableInfo.getTableConfig().isDataLakeEnabled() | ||
| && offsetsInitializer instanceof SnapshotOffsetsInitializer) { | ||
| lakeSource = | ||
| LakeSourceUtils.createLakeSource(tablePath, tableInfo.getProperties().toMap()); |
| } | ||
| lakeSource.withProject(nestedProjectedFields); | ||
| } | ||
| } |
There was a problem hiding this comment.
setBounded() only works on a lake table started from full(). Other options seems that the job starts and then crashes later with a message like "lakeSource' is null in batch mode". Both bad cases just mean lakeSource came out null.
Should we check it explicitly and early?
| * | ||
| * <p>These tests mirror the Flink SQL union-read coverage ({@code FlinkUnionReadLogTableITCase} and | ||
| * {@code FlinkUnionReadPrimaryKeyTableITCase}) but exercise the programmatic DataStream source. | ||
| * Each test asserts the three properties that make a union read meaningful: |
|
@fresh-borzoni Thank you for your comments, good catch.. I updated the PR to address them |
fresh-borzoni
left a comment
There was a problem hiding this comment.
@polyzos Ty, LGTM 👍
Can you please file a followup about partition pruning?
The DataStream
FlussSource/FlussSourceBuildernever created aLakeSource, it always passednullto FlinkSource. So DataStream jobs read only Fluss data, even for datalake-enabled tables. Union read worked only via the Flink SQL/Table connector.This pr addresses this missing piece