Skip to content

[flink] Add union read support to datastream#3432

Open
polyzos wants to merge 3 commits into
apache:mainfrom
polyzos:datastream-api-union-read-support
Open

[flink] Add union read support to datastream#3432
polyzos wants to merge 3 commits into
apache:mainfrom
polyzos:datastream-api-union-read-support

Conversation

@polyzos
Copy link
Copy Markdown
Contributor

@polyzos polyzos commented Jun 4, 2026

The DataStream FlussSource/FlussSourceBuilder never created a LakeSource, it always passed null to FlinkSource. So DataStream jobs read only Fluss data, even for datalake-enabled tables. Union read worked only via the Flink SQL/Table connector.

This pr addresses this missing piece

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes a functional gap in the Flink DataStream connector: FlussSourceBuilder now constructs and wires a LakeSource into the underlying FlinkSource when reading a datalake-enabled table in full startup mode, enabling true Fluss+Lake historical/real-time union reads for DataStream jobs (previously only available via the Flink SQL/Table connector).

Changes:

  • Create a LakeSource in FlussSourceBuilder.build() when the table has datalake enabled and the source starts from OffsetsInitializer.full() (snapshot mode), including projection pushdown to the lake source.
  • Extend FlussSource constructors to accept and forward an optional LakeSource to FlinkSource.
  • Add Iceberg integration tests that validate DataStream union read semantics for log tables, PK tables, and projection pushdown.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.

File Description
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadDataStreamITCase.java Adds DataStream-focused union read IT coverage (Iceberg tiered data + Fluss-only data, incl. projection).
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java Creates/configures a LakeSource for full startup on datalake-enabled tables and passes it into the built source.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java Plumbs the optional LakeSource through to FlinkSource to activate existing hybrid lake+Fluss split logic.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Copy Markdown
Member

@fresh-borzoni fresh-borzoni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@polyzos Ty for the PR, left some minor comments, PTAL

if (tableInfo.getTableConfig().isDataLakeEnabled()
&& offsetsInitializer instanceof SnapshotOffsetsInitializer) {
lakeSource =
LakeSourceUtils.createLakeSource(tablePath, tableInfo.getProperties().toMap());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about filters?

}
lakeSource.withProject(nestedProjectedFields);
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setBounded() only works on a lake table started from full(). Other options seems that the job starts and then crashes later with a message like "lakeSource' is null in batch mode". Both bad cases just mean lakeSource came out null.

Should we check it explicitly and early?

*
* <p>These tests mirror the Flink SQL union-read coverage ({@code FlinkUnionReadLogTableITCase} and
* {@code FlinkUnionReadPrimaryKeyTableITCase}) but exercise the programmatic DataStream source.
* Each test asserts the three properties that make a union read meaningful:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

: ... mm?

@polyzos
Copy link
Copy Markdown
Contributor Author

polyzos commented Jun 5, 2026

@fresh-borzoni Thank you for your comments, good catch.. I updated the PR to address them

@polyzos polyzos added this to the v1.0 milestone Jun 5, 2026
Copy link
Copy Markdown
Member

@fresh-borzoni fresh-borzoni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@polyzos Ty, LGTM 👍
Can you please file a followup about partition pruning?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants