[client] Fix tiering hang on first_row merge engine empty batches#3242
Conversation
There was a problem hiding this comment.
Pull request overview
This PR addresses a tiering hang for tables using the FIRST_ROW merge engine where the log offset can advance via “empty” WAL batches (recordCount=0) even when no ScanRecord is materialized, causing the tiering layer to repeatedly poll the same range (Issue #2371).
Changes:
- Extend
fluss-clientScanRecordsto carry per-bucketnextLogOffsets, and exposepolledBuckets()+nextLogOffset(bucket)so callers can observe offset progress even when no records were produced. - Update
LogFetchCollectorto always recordnextFetchOffsetfor each polled bucket and constructScanRecordswith these offsets. - Update Flink
TieringSplitReader#forLogRecordsto iteratepolledBuckets()and determine end-of-range usingnextLogOffset(with fallback to last-record checks), plus add a regression test reproducing Issue #2371.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java | Adds regression test to ensure tiering completes under FIRST_ROW with duplicate keys/empty batches. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java | Uses polledBuckets() and nextLogOffset to finish splits even when only empty batches occur. |
| fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java | Adds unit tests for legacy constructor behavior and new polledBuckets()/nextLogOffset semantics. |
| fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java | Verifies empty/filtered responses still expose offset advancement via polledBuckets()/nextLogOffset. |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java | Introduces nextLogOffsets, new constructor, polledBuckets(), and nextLogOffset(bucket). |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java | Always records advanced nextFetchOffset per polled bucket and returns it in ScanRecords. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
@luoyuxia Hi, I have addressed the feedback. PTAL |
|
@Kaixuan-Duan Thanks for the pr. Will take a look when i got some time |
luoyuxia
left a comment
There was a problem hiding this comment.
@Kaixuan-Duan Thanks for the pr. Left minor comments. PTAL
6c434d4 to
996de58
Compare
|
@luoyuxia Thank you for the review. Addressed, PTAL |
|
Thanks. WIll have a review later |
|
cc @beryllw |
|
Will have a review today. Hope we can merge it in next week. |
luoyuxia
left a comment
There was a problem hiding this comment.
@Kaixuan-Duan Thanks for that. And sorry for delay review. Left some comments again.
ac5ac25 to
656b27c
Compare
|
@luoyuxia Thank you for the review. Addressed, PTAL. |
luoyuxia
left a comment
There was a problem hiding this comment.
@Kaixuan-Duan Thanks. LGTM. Only one minor comment.
3518359 to
9e9080a
Compare
|
@luoyuxia Hi, I’ve resolved the merge conflicts and reverted the commit you pushed. Please take a look when you have a chance. Thanks! |
Purpose
Linked issue: close #2371
Brief change log
fluss-clientScanRecords: add aMap<TableBucket, Long> nextLogOffsetsfield with a new two-arg constructor (legacy single-arg constructor preserved for backwards compatibility); expose two new accessors:polledBuckets()— union of buckets that produced records and buckets that only advanced their next fetch offset.nextLogOffset(bucket)— exclusive upper bound of consumed offsets in this poll round, ornullif the bucket was not polled.LogFetchCollector#collectFetch: always record the advancednextFetchOffsetper polled bucket, even when the materialized record list is empty, and pack it into the newScanRecordsconstructor.fluss-flink-commonTieringSplitReader#forLogRecords: iteratescanRecords.polledBuckets()instead ofscanRecords.buckets(). Determine end-of-range by comparing the scanner-reportednextLogOffsetagainst the bucket'sstoppingOffset(with the legacylastRecord.logOffset() >= stoppingOffset - 1check kept as a fallback for callers that don't supplynextLogOffset). Tolerate splits that finish with no real record observed by falling back toUNKNOWN_BUCKET_TIMESTAMPwhen computing the finish timestamp.Tests
./mvnw -pl fluss-client,fluss-flink/fluss-flink-common \
-Dtest='ScanRecordsTest,LogFetchCollectorTest,TieringSplitReaderTest#testTieringFirstRowMergeEngineFinishes' \
-DfailIfNoTests=false test
API and Format
Documentation