refactor(shuffle): Introduce PartitionWriter interface to decouple shuffle partitioning from storage#4779
Open
wForget wants to merge 14 commits into
Open
refactor(shuffle): Introduce PartitionWriter interface to decouple shuffle partitioning from storage#4779wForget wants to merge 14 commits into
wForget wants to merge 14 commits into
Conversation
There was a problem hiding this comment.
Pull request overview
This PR refactors Comet’s native shuffle write path to introduce a PartitionWriter abstraction, moving local shuffle file/index writing and spill handling out of the shuffle partitioners. This aligns with issue #4780’s goal of decoupling partitioning logic from the storage backend so alternative backends (like remote shuffle) can be added later.
Changes:
- Introduces a new
PartitionWritertrait and aLocalPartitionWriterimplementation, plus a renamedSpillWriterfor local spill files. - Updates shuffle partitioners (
single_partition,multi_partition,empty_schema) andshuffle_writerwiring to write via the new writer interface. - Refactors
PartitionedBatchIteratorto implementIterator(but currently dropsinterleave_timemetric tracking).
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| native/shuffle/src/writers/rss/mod.rs | Adds placeholder module for future RSS writer implementation. |
| native/shuffle/src/writers/partition_writer.rs | Adds the new PartitionWriter trait interface. |
| native/shuffle/src/writers/mod.rs | Reorganizes writer modules and exports to include local and partition_writer. |
| native/shuffle/src/writers/local/mod.rs | Introduces the local writer module structure. |
| native/shuffle/src/writers/local/local_partition_writer.rs | Implements LocalPartitionWriter that writes shuffle data + index and manages per-partition spill writers. |
| native/shuffle/src/writers/local/spill.rs | Refactors old per-partition writer into SpillWriter for spill file handling. |
| native/shuffle/src/writers/buf_batch_writer.rs | Removes the writer_stream_position helper method. |
| native/shuffle/src/shuffle_writer.rs | Wires shuffle execution to construct a LocalPartitionWriter and pass it into partitioners; updates related tests. |
| native/shuffle/src/partitioners/single_partition.rs | Refactors single-partition shuffle to write via PartitionWriter. |
| native/shuffle/src/partitioners/multi_partition.rs | Refactors multi-partition shuffle and spill to write via PartitionWriter. |
| native/shuffle/src/partitioners/empty_schema.rs | Refactors empty-schema shuffle to write via PartitionWriter. |
| native/shuffle/src/partitioners/partitioned_batch_iterator.rs | Makes PartitionedBatchIterator implement Iterator, removing interleave timing from next(). |
| native/shuffle/src/partitioners/mod.rs | Removes re-export of PartitionedBatchIterator. |
| native/shuffle/src/metrics.rs | Temporarily comments out interleave_time metric field and registration. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
37c64ac to
2c6bc03
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #4780.
Rationale for this change
Comet's shuffle partitioners currently own too much of the local shuffle write
logic directly, including writing shuffle data files, writing index files,
handling spill files, and finalizing partition offsets. This tightly couples the
partitioning logic to the local file-based storage implementation and makes it
hard to introduce alternative shuffle storage backends (e.g. a remote shuffle
writer), since each partitioner would need backend-specific write behavior.
This PR separates shuffle partitioning from shuffle storage so partitioners only
produce partitioned
RecordBatchstreams, while a storage backend owns how thosebatches are persisted and finalized. This creates a clear extension point for
future remote shuffle support without changing the partitioners.
What changes are included in this PR?
PartitionWritertrait (writers/partition_writer.rs) that abstracts thestorage backend for shuffle output. Partitioners drive it as: any number of
writecalls to stage batches, onefinish_partitionper partition inascending id order, then a single
finish_all.LocalPartitionWriter(writers/local/local_partition_writer.rs) thatimplements the existing local file-based behavior behind the new interface:
BufBatchWriter(preserving cross-batch coalescing).SpillWriters andmerges them into the data file one partition at a time.
handling.
spill.rsunderwriters/local/since spilling is a local-storage detail.SinglePartitionShufflePartitioner,MultiPartitionShuffleRepartitioner,EmptySchemaShufflePartitioner) to dependonly on a
PartitionWriterinstead of owning file/index/spill logic.LocalPartitionWriterinexternal_shuffleand inject it into theselected partitioner.
This is a refactor only; it preserves the existing local shuffle behavior for
single-partition, multi-partition, and empty-schema shuffles.
How are these changes tested?
Covered by existing tests.