Skip to content

refactor(shuffle): Introduce PartitionWriter interface to decouple shuffle partitioning from storage#4779

Open
wForget wants to merge 14 commits into
apache:mainfrom
wForget:shuffle_partition_writer
Open

refactor(shuffle): Introduce PartitionWriter interface to decouple shuffle partitioning from storage#4779
wForget wants to merge 14 commits into
apache:mainfrom
wForget:shuffle_partition_writer

Conversation

@wForget

@wForget wForget commented Jul 1, 2026

Copy link
Copy Markdown
Member

Which issue does this PR close?

Closes #4780.

Rationale for this change

Comet's shuffle partitioners currently own too much of the local shuffle write
logic directly, including writing shuffle data files, writing index files,
handling spill files, and finalizing partition offsets. This tightly couples the
partitioning logic to the local file-based storage implementation and makes it
hard to introduce alternative shuffle storage backends (e.g. a remote shuffle
writer), since each partitioner would need backend-specific write behavior.

This PR separates shuffle partitioning from shuffle storage so partitioners only
produce partitioned RecordBatch streams, while a storage backend owns how those
batches are persisted and finalized. This creates a clear extension point for
future remote shuffle support without changing the partitioners.

What changes are included in this PR?

  • Add a PartitionWriter trait (writers/partition_writer.rs) that abstracts the
    storage backend for shuffle output. Partitioners drive it as: any number of
    write calls to stage batches, one finish_partition per partition in
    ascending id order, then a single finish_all.
  • Add LocalPartitionWriter (writers/local/local_partition_writer.rs) that
    implements the existing local file-based behavior behind the new interface:
    • Single-partition mode streams all batches through one long-lived
      BufBatchWriter (preserving cross-batch coalescing).
    • Multi-partition mode stages batches per partition via SpillWriters and
      merges them into the data file one partition at a time.
    • Owns data file writes, index file / partition offset generation, and spill
      handling.
  • Move spill.rs under writers/local/ since spilling is a local-storage detail.
  • Simplify the three partitioners (SinglePartitionShufflePartitioner,
    MultiPartitionShuffleRepartitioner, EmptySchemaShufflePartitioner) to depend
    only on a PartitionWriter instead of owning file/index/spill logic.
  • Construct the LocalPartitionWriter in external_shuffle and inject it into the
    selected partitioner.

This is a refactor only; it preserves the existing local shuffle behavior for
single-partition, multi-partition, and empty-schema shuffles.

How are these changes tested?

Covered by existing tests.

@wForget wForget self-assigned this Jul 1, 2026

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors Comet’s native shuffle write path to introduce a PartitionWriter abstraction, moving local shuffle file/index writing and spill handling out of the shuffle partitioners. This aligns with issue #4780’s goal of decoupling partitioning logic from the storage backend so alternative backends (like remote shuffle) can be added later.

Changes:

  • Introduces a new PartitionWriter trait and a LocalPartitionWriter implementation, plus a renamed SpillWriter for local spill files.
  • Updates shuffle partitioners (single_partition, multi_partition, empty_schema) and shuffle_writer wiring to write via the new writer interface.
  • Refactors PartitionedBatchIterator to implement Iterator (but currently drops interleave_time metric tracking).

Reviewed changes

Copilot reviewed 14 out of 14 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
native/shuffle/src/writers/rss/mod.rs Adds placeholder module for future RSS writer implementation.
native/shuffle/src/writers/partition_writer.rs Adds the new PartitionWriter trait interface.
native/shuffle/src/writers/mod.rs Reorganizes writer modules and exports to include local and partition_writer.
native/shuffle/src/writers/local/mod.rs Introduces the local writer module structure.
native/shuffle/src/writers/local/local_partition_writer.rs Implements LocalPartitionWriter that writes shuffle data + index and manages per-partition spill writers.
native/shuffle/src/writers/local/spill.rs Refactors old per-partition writer into SpillWriter for spill file handling.
native/shuffle/src/writers/buf_batch_writer.rs Removes the writer_stream_position helper method.
native/shuffle/src/shuffle_writer.rs Wires shuffle execution to construct a LocalPartitionWriter and pass it into partitioners; updates related tests.
native/shuffle/src/partitioners/single_partition.rs Refactors single-partition shuffle to write via PartitionWriter.
native/shuffle/src/partitioners/multi_partition.rs Refactors multi-partition shuffle and spill to write via PartitionWriter.
native/shuffle/src/partitioners/empty_schema.rs Refactors empty-schema shuffle to write via PartitionWriter.
native/shuffle/src/partitioners/partitioned_batch_iterator.rs Makes PartitionedBatchIterator implement Iterator, removing interleave timing from next().
native/shuffle/src/partitioners/mod.rs Removes re-export of PartitionedBatchIterator.
native/shuffle/src/metrics.rs Temporarily comments out interleave_time metric field and registration.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread native/shuffle/src/metrics.rs
Comment thread native/shuffle/src/partitioners/empty_schema.rs Outdated
Comment thread native/shuffle/src/writers/local/local_partition_writer.rs Outdated
Comment thread native/shuffle/src/writers/partition_writer.rs Outdated
Comment thread native/shuffle/src/writers/local/local_partition_writer.rs Outdated
Comment thread native/shuffle/src/partitioners/single_partition.rs
@wForget wForget force-pushed the shuffle_partition_writer branch from 37c64ac to 2c6bc03 Compare July 2, 2026 09:19

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.

Comment thread native/shuffle/src/partitioners/partitioned_batch_iterator.rs
Comment thread native/shuffle/src/writers/local/local_partition_writer.rs Outdated
@wForget wForget changed the title WIP refactor: implement partition writer interface for shuffle partitioners refactor(shuffle): Introduce PartitionWriter interface to decouple shuffle partitioning from storage Jul 2, 2026
@wForget wForget marked this pull request as ready for review July 2, 2026 12:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: Refactor shuffle partition writing behind a writer interface

2 participants