From 1bd106ee7299b5bafcbb6a2ca609acb3a5a33284 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Wed, 1 Jul 2026 14:30:50 +0800 Subject: [PATCH 01/14] refactor: implement partition writer interface for shuffle partitioners --- .../shuffle/src/partitioners/empty_schema.rs | 67 ++---- native/shuffle/src/partitioners/mod.rs | 1 - .../src/partitioners/multi_partition.rs | 147 ++----------- .../partitioned_batch_iterator.rs | 13 +- .../src/partitioners/single_partition.rs | 79 ++----- native/shuffle/src/shuffle_writer.rs | 78 ++++--- .../shuffle/src/writers/buf_batch_writer.rs | 6 - .../writers/local/local_partition_writer.rs | 205 ++++++++++++++++++ native/shuffle/src/writers/local/mod.rs | 2 + .../shuffle/src/writers/{ => local}/spill.rs | 91 ++++---- native/shuffle/src/writers/mod.rs | 5 +- .../shuffle/src/writers/partition_writer.rs | 54 +++++ native/shuffle/src/writers/rss/mod.rs | 1 + 13 files changed, 419 insertions(+), 330 deletions(-) create mode 100644 native/shuffle/src/writers/local/local_partition_writer.rs create mode 100644 native/shuffle/src/writers/local/mod.rs rename native/shuffle/src/writers/{ => local}/spill.rs (80%) create mode 100644 native/shuffle/src/writers/partition_writer.rs create mode 100644 native/shuffle/src/writers/rss/mod.rs diff --git a/native/shuffle/src/partitioners/empty_schema.rs b/native/shuffle/src/partitioners/empty_schema.rs index 45decfec05..b7b5dce60a 100644 --- a/native/shuffle/src/partitioners/empty_schema.rs +++ b/native/shuffle/src/partitioners/empty_schema.rs @@ -17,47 +17,42 @@ use crate::metrics::ShufflePartitionerMetrics; use crate::partitioners::ShufflePartitioner; +use crate::writers::partition_writer::PartitionWriter; use crate::ShuffleBlockWriter; use arrow::array::RecordBatch; use arrow::datatypes::SchemaRef; use datafusion::common::DataFusionError; use std::fs::OpenOptions; use std::io::{BufWriter, Seek, Write}; +use std::iter; use tokio::time::Instant; /// A partitioner for zero-column schemas (e.g. queries where ColumnPruning removes all columns). /// This handles shuffles for operations like COUNT(*) that produce empty-schema record batches /// but contain a valid row count. Accumulates the total row count and writes a single /// zero-column IPC batch to partition 0. All other partitions get empty entries in the index file. -pub(crate) struct EmptySchemaShufflePartitioner { - output_data_file: String, - output_index_file: String, +pub(crate) struct EmptySchemaShufflePartitioner { + partition_writer: T, schema: SchemaRef, - shuffle_block_writer: ShuffleBlockWriter, num_output_partitions: usize, total_rows: usize, metrics: ShufflePartitionerMetrics, } -impl EmptySchemaShufflePartitioner { +impl EmptySchemaShufflePartitioner { pub(crate) fn try_new( - output_data_file: String, - output_index_file: String, + partition_writer: T, schema: SchemaRef, num_output_partitions: usize, metrics: ShufflePartitionerMetrics, - codec: crate::CompressionCodec, ) -> datafusion::common::Result { debug_assert!( schema.fields().is_empty(), "EmptySchemaShufflePartitioner requires a zero-column schema" ); - let shuffle_block_writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec)?; Ok(Self { - output_data_file, - output_index_file, + partition_writer, schema, - shuffle_block_writer, num_output_partitions, total_rows: 0, metrics, @@ -66,7 +61,7 @@ impl EmptySchemaShufflePartitioner { } #[async_trait::async_trait] -impl ShufflePartitioner for EmptySchemaShufflePartitioner { +impl ShufflePartitioner for EmptySchemaShufflePartitioner { async fn insert_batch(&mut self, batch: RecordBatch) -> datafusion::common::Result<()> { let start_time = Instant::now(); let num_rows = batch.num_rows(); @@ -85,45 +80,25 @@ impl ShufflePartitioner for EmptySchemaShufflePartitioner { fn shuffle_write(&mut self) -> datafusion::common::Result<()> { let start_time = Instant::now(); - let output_data = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(&self.output_data_file) - .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; - let mut output_data = BufWriter::new(output_data); + let mut write_timer = self.metrics.write_time.timer(); // Write a single zero-column batch with the accumulated row count to partition 0 - if self.total_rows > 0 { - let batch = RecordBatch::try_new_with_options( + let batch_opt = if self.total_rows > 0 { + Some(Ok(RecordBatch::try_new_with_options( self.schema.clone(), vec![], &arrow::array::RecordBatchOptions::new().with_row_count(Some(self.total_rows)), - )?; - self.shuffle_block_writer.write_batch( - &batch, - &mut output_data, - &self.metrics.encode_time, - )?; - } - - let mut write_timer = self.metrics.write_time.timer(); - output_data.flush()?; - let data_file_length = output_data.stream_position()?; - - // Write index file: partition 0 spans [0, data_file_length), all others are empty - let index_file = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(&self.output_index_file) - .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; - let mut index_writer = BufWriter::new(index_file); - index_writer.write_all(&0i64.to_le_bytes())?; - for _ in 0..self.num_output_partitions { - index_writer.write_all(&(data_file_length as i64).to_le_bytes())?; + )?)) + } else { + None + }; + self.partition_writer + .finish_partition(0, &mut batch_opt.into_iter(), &self.metrics)?; + for pid in 1..self.num_output_partitions { + self.partition_writer + .finish_partition(pid, &mut iter::empty(), &self.metrics)?; } - index_writer.flush()?; + self.partition_writer.finish_all(&self.metrics)?; write_timer.stop(); self.metrics diff --git a/native/shuffle/src/partitioners/mod.rs b/native/shuffle/src/partitioners/mod.rs index a0bc652b4b..8f4239a820 100644 --- a/native/shuffle/src/partitioners/mod.rs +++ b/native/shuffle/src/partitioners/mod.rs @@ -23,6 +23,5 @@ mod traits; pub(crate) use empty_schema::EmptySchemaShufflePartitioner; pub(crate) use multi_partition::MultiPartitionShuffleRepartitioner; -pub(crate) use partitioned_batch_iterator::PartitionedBatchIterator; pub(crate) use single_partition::SinglePartitionShufflePartitioner; pub(crate) use traits::ShufflePartitioner; diff --git a/native/shuffle/src/partitioners/multi_partition.rs b/native/shuffle/src/partitioners/multi_partition.rs index efdbb970aa..923bf177e4 100644 --- a/native/shuffle/src/partitioners/multi_partition.rs +++ b/native/shuffle/src/partitioners/multi_partition.rs @@ -16,26 +16,20 @@ // under the License. use crate::metrics::ShufflePartitionerMetrics; -use crate::partitioners::partitioned_batch_iterator::{ - PartitionedBatchIterator, PartitionedBatchesProducer, -}; +use crate::partitioners::partitioned_batch_iterator::PartitionedBatchesProducer; use crate::partitioners::ShufflePartitioner; -use crate::writers::{BufBatchWriter, PartitionWriter}; -use crate::{comet_partitioning, CometPartitioning, CompressionCodec, ShuffleBlockWriter}; +use crate::writers::partition_writer::PartitionWriter; +use crate::{comet_partitioning, CometPartitioning}; use arrow::array::{Array, ArrayData, ArrayRef, RecordBatch}; -use arrow::datatypes::SchemaRef; use datafusion::common::utils::proxy::VecAllocExt; use datafusion::common::{DataFusionError, HashSet}; use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion::execution::runtime_env::RuntimeEnv; -use datafusion::physical_plan::metrics::Time; use datafusion_comet_common::tracing::{with_trace, with_trace_async}; use datafusion_comet_spark_expr::murmur3::create_murmur3_hashes; use itertools::Itertools; use std::fmt; use std::fmt::{Debug, Formatter}; -use std::fs::{File, OpenOptions}; -use std::io::{BufWriter, Seek, Write}; use std::sync::Arc; use tokio::time::Instant; @@ -105,13 +99,10 @@ impl ScratchSpace { } /// A partitioner that uses a hash function to partition data into multiple partitions -pub(crate) struct MultiPartitionShuffleRepartitioner { - output_data_file: String, - output_index_file: String, +pub(crate) struct MultiPartitionShuffleRepartitioner { buffered_batches: Vec, partition_indices: Vec>, - partition_writers: Vec, - shuffle_block_writer: ShuffleBlockWriter, + partition_writer: T, /// Partitioning scheme to use partitioning: CometPartitioning, runtime: Arc, @@ -123,8 +114,6 @@ pub(crate) struct MultiPartitionShuffleRepartitioner { /// Reservation for repartitioning reservation: MemoryReservation, tracing_enabled: bool, - /// Size of the write buffer in bytes - write_buffer_size: usize, /// Start addresses (as `usize`, since raw pointers are not `Send`) of the backing buffers /// currently pinned by `buffered_batches`, so the spill reservation charges each distinct /// allocation once rather than once per slice that references it. Cleared whenever the @@ -176,20 +165,16 @@ fn count_new_buffers(batch: &RecordBatch, seen: &mut HashSet) -> usize { total } -impl MultiPartitionShuffleRepartitioner { +impl MultiPartitionShuffleRepartitioner { #[allow(clippy::too_many_arguments)] pub(crate) fn try_new( partition: usize, - output_data_file: String, - output_index_file: String, - schema: SchemaRef, + partition_writer: T, partitioning: CometPartitioning, metrics: ShufflePartitionerMetrics, runtime: Arc, batch_size: usize, - codec: CompressionCodec, tracing_enabled: bool, - write_buffer_size: usize, ) -> datafusion::common::Result { let num_output_partitions = partitioning.partition_count(); assert_ne!( @@ -214,23 +199,14 @@ impl MultiPartitionShuffleRepartitioner { partition_starts: vec![0; num_output_partitions + 1], }; - let shuffle_block_writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?; - - let partition_writers = (0..num_output_partitions) - .map(|_| PartitionWriter::try_new(shuffle_block_writer.clone())) - .collect::>>()?; - let reservation = MemoryConsumer::new(format!("ShuffleRepartitioner[{partition}]")) .with_can_spill(true) .register(&runtime.memory_pool); Ok(Self { - output_data_file, - output_index_file, buffered_batches: vec![], partition_indices: vec![vec![]; num_output_partitions], - partition_writers, - shuffle_block_writer, + partition_writer, partitioning, runtime, metrics, @@ -238,7 +214,6 @@ impl MultiPartitionShuffleRepartitioner { batch_size, reservation, tracing_enabled, - write_buffer_size, pinned_buffers: HashSet::new(), }) } @@ -485,31 +460,6 @@ impl MultiPartitionShuffleRepartitioner { Ok(()) } - #[allow(clippy::too_many_arguments)] - fn shuffle_write_partition( - partition_iter: &mut PartitionedBatchIterator, - shuffle_block_writer: &mut ShuffleBlockWriter, - output_data: &mut BufWriter, - interleave_time: &Time, - encode_time: &Time, - write_time: &Time, - write_buffer_size: usize, - batch_size: usize, - ) -> datafusion::common::Result<()> { - let mut buf_batch_writer = BufBatchWriter::new( - shuffle_block_writer, - output_data, - write_buffer_size, - batch_size, - ); - while let Some(batch) = partition_iter.next(interleave_time) { - let batch = batch?; - buf_batch_writer.write(&batch, encode_time, write_time)?; - } - buf_batch_writer.flush(encode_time, write_time)?; - Ok(()) - } - fn used(&self) -> usize { self.reservation.size() } @@ -553,38 +503,33 @@ impl MultiPartitionShuffleRepartitioner { } with_trace("shuffle_spill", self.tracing_enabled, || { - let num_output_partitions = self.partition_writers.len(); + let num_output_partitions = self.partition_indices.len(); let mut partitioned_batches = self.partitioned_batches(); - let mut spilled_bytes = 0; for partition_id in 0..num_output_partitions { - let partition_writer = &mut self.partition_writers[partition_id]; - let mut iter = partitioned_batches.produce(partition_id); - spilled_bytes += partition_writer.spill( - &mut iter, + self.partition_writer.spill( + partition_id, + &mut partitioned_batches.produce(partition_id), &self.runtime, &self.metrics, - self.write_buffer_size, - self.batch_size, )?; } self.reservation.free(); self.pinned_buffers.clear(); self.metrics.spill_count.add(1); - self.metrics.spilled_bytes.add(spilled_bytes); Ok(()) }) } #[cfg(test)] - pub(crate) fn partition_writers(&self) -> &[PartitionWriter] { - &self.partition_writers + pub(crate) fn partition_writer(&self) -> &T { + &self.partition_writer } } #[async_trait::async_trait] -impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { +impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { /// Shuffles rows in input batch into corresponding partition buffer. /// This function will slice input batch according to configured batch size and then /// shuffle rows into corresponding partition buffer. @@ -616,78 +561,28 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { let mut partitioned_batches = self.partitioned_batches(); self.pinned_buffers.clear(); let num_output_partitions = self.partition_indices.len(); - let mut offsets = vec![0; num_output_partitions + 1]; - - let data_file = self.output_data_file.clone(); - let index_file = self.output_index_file.clone(); - - let output_data = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(data_file) - .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; - - let mut output_data = BufWriter::with_capacity(self.write_buffer_size, output_data); #[allow(clippy::needless_range_loop)] for i in 0..num_output_partitions { - offsets[i] = output_data.stream_position()?; - - // if we wrote a spill file for this partition then copy the - // contents into the shuffle file - if let Some(spill_path) = self.partition_writers[i].path() { - // Use raw File handle (not BufReader) so that std::io::copy - // can use copy_file_range/sendfile for zero-copy on Linux. - let mut spill_file = File::open(spill_path)?; - let mut write_timer = self.metrics.write_time.timer(); - std::io::copy(&mut spill_file, &mut output_data)?; - write_timer.stop(); - } - - // Write in memory batches to output data file - let mut partition_iter = partitioned_batches.produce(i); - Self::shuffle_write_partition( - &mut partition_iter, - &mut self.shuffle_block_writer, - &mut output_data, - &self.metrics.interleave_time, - &self.metrics.encode_time, - &self.metrics.write_time, - self.write_buffer_size, - self.batch_size, + self.partition_writer.finish_partition( + i, + &mut partitioned_batches.produce(i), + &self.metrics, )?; } - let mut write_timer = self.metrics.write_time.timer(); - output_data.flush()?; - write_timer.stop(); - - // add one extra offset at last to ease partition length computation - offsets[num_output_partitions] = output_data.stream_position()?; - - let mut write_timer = self.metrics.write_time.timer(); - let mut output_index = - BufWriter::new(File::create(index_file).map_err(|e| { - DataFusionError::Execution(format!("shuffle write error: {e:?}")) - })?); - for offset in offsets { - output_index.write_all(&(offset as i64).to_le_bytes()[..])?; - } - output_index.flush()?; - write_timer.stop(); + self.partition_writer.finish_all(&self.metrics)?; self.metrics .baseline .elapsed_compute() .add_duration(start_time.elapsed()); - Ok(()) }) } } -impl Debug for MultiPartitionShuffleRepartitioner { +impl Debug for MultiPartitionShuffleRepartitioner { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("ShuffleRepartitioner") .field("memory_used", &self.used()) diff --git a/native/shuffle/src/partitioners/partitioned_batch_iterator.rs b/native/shuffle/src/partitioners/partitioned_batch_iterator.rs index c7f1781866..2863616ddd 100644 --- a/native/shuffle/src/partitioners/partitioned_batch_iterator.rs +++ b/native/shuffle/src/partitioners/partitioned_batch_iterator.rs @@ -18,7 +18,6 @@ use arrow::array::RecordBatch; use arrow::compute::interleave_record_batch; use datafusion::common::DataFusionError; -use datafusion::physical_plan::metrics::Time; /// A helper struct to produce shuffled batches. /// This struct takes ownership of the buffered batches and partition indices from the @@ -86,21 +85,19 @@ impl<'a> PartitionedBatchIterator<'a> { pos: 0, } } +} + +impl Iterator for PartitionedBatchIterator<'_> { + type Item = datafusion::common::Result; - /// Returns the next shuffled batch, recording the gather cost into `interleave_time`. - pub(crate) fn next( - &mut self, - interleave_time: &Time, - ) -> Option> { + fn next(&mut self) -> Option { if self.pos >= self.indices.len() { return None; } let indices_end = std::cmp::min(self.pos + self.batch_size, self.indices.len()); let indices = &self.indices[self.pos..indices_end]; - let mut timer = interleave_time.timer(); let result = interleave_record_batch(&self.record_batches, indices); - timer.stop(); match result { Ok(batch) => { self.pos = indices_end; diff --git a/native/shuffle/src/partitioners/single_partition.rs b/native/shuffle/src/partitioners/single_partition.rs index 5801ef613b..7215857cfd 100644 --- a/native/shuffle/src/partitioners/single_partition.rs +++ b/native/shuffle/src/partitioners/single_partition.rs @@ -17,20 +17,15 @@ use crate::metrics::ShufflePartitionerMetrics; use crate::partitioners::ShufflePartitioner; -use crate::writers::BufBatchWriter; -use crate::{CompressionCodec, ShuffleBlockWriter}; +use crate::writers::partition_writer::PartitionWriter; use arrow::array::RecordBatch; -use arrow::datatypes::SchemaRef; use datafusion::common::DataFusionError; -use std::fs::{File, OpenOptions}; -use std::io::{BufWriter, Write}; +use std::iter; use tokio::time::Instant; /// A partitioner that writes all shuffle data to a single file and a single index file -pub(crate) struct SinglePartitionShufflePartitioner { - // output_data_file: File, - output_data_writer: BufBatchWriter, - output_index_path: String, +pub(crate) struct SinglePartitionShufflePartitioner { + partition_writer: T, /// Batches that are smaller than the batch size and to be concatenated buffered_batches: Vec, /// Number of rows in the concatenating batches @@ -41,34 +36,14 @@ pub(crate) struct SinglePartitionShufflePartitioner { batch_size: usize, } -impl SinglePartitionShufflePartitioner { +impl SinglePartitionShufflePartitioner { pub(crate) fn try_new( - output_data_path: String, - output_index_path: String, - schema: SchemaRef, + partition_writer: T, metrics: ShufflePartitionerMetrics, batch_size: usize, - codec: CompressionCodec, - write_buffer_size: usize, ) -> datafusion::common::Result { - let shuffle_block_writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?; - - let output_data_file = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(output_data_path)?; - - let output_data_writer = BufBatchWriter::new( - shuffle_block_writer, - output_data_file, - write_buffer_size, - batch_size, - ); - Ok(Self { - output_data_writer, - output_index_path, + partition_writer, buffered_batches: vec![], num_buffered_rows: 0, metrics, @@ -109,7 +84,7 @@ impl SinglePartitionShufflePartitioner { } #[async_trait::async_trait] -impl ShufflePartitioner for SinglePartitionShufflePartitioner { +impl ShufflePartitioner for SinglePartitionShufflePartitioner { async fn insert_batch(&mut self, batch: RecordBatch) -> datafusion::common::Result<()> { let start_time = Instant::now(); let num_rows = batch.num_rows(); @@ -123,20 +98,14 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner { // Write the concatenated buffered batch if let Some(batch) = concatenated_batch { - self.output_data_writer.write( - &batch, - &self.metrics.encode_time, - &self.metrics.write_time, - )?; + self.partition_writer + .write(0, &mut iter::once(Ok(batch)), &self.metrics)?; } if num_rows >= self.batch_size { // Write the new batch - self.output_data_writer.write( - &batch, - &self.metrics.encode_time, - &self.metrics.write_time, - )?; + self.partition_writer + .write(0, &mut iter::once(Ok(batch)), &self.metrics)?; } else { // Add the new batch to the buffer self.add_buffered_batch(batch); @@ -160,28 +129,10 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner { // Write the concatenated buffered batch if let Some(batch) = concatenated_batch { - self.output_data_writer.write( - &batch, - &self.metrics.encode_time, - &self.metrics.write_time, - )?; - } - self.output_data_writer - .flush(&self.metrics.encode_time, &self.metrics.write_time)?; - - // Write index file. It should only contain 2 entries: 0 and the total number of bytes written - let index_file = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(self.output_index_path.clone()) - .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; - let mut index_buf_writer = BufWriter::new(index_file); - let data_file_length = self.output_data_writer.writer_stream_position()?; - for offset in [0, data_file_length] { - index_buf_writer.write_all(&(offset as i64).to_le_bytes()[..])?; + self.partition_writer + .write(0, &mut iter::once(Ok(batch)), &self.metrics)?; } - index_buf_writer.flush()?; + self.partition_writer.finish_all(&self.metrics)?; self.metrics .baseline diff --git a/native/shuffle/src/shuffle_writer.rs b/native/shuffle/src/shuffle_writer.rs index 756e753b3c..c3fe17491e 100644 --- a/native/shuffle/src/shuffle_writer.rs +++ b/native/shuffle/src/shuffle_writer.rs @@ -22,7 +22,8 @@ use crate::partitioners::{ EmptySchemaShufflePartitioner, MultiPartitionShuffleRepartitioner, ShufflePartitioner, SinglePartitionShufflePartitioner, }; -use crate::{CometPartitioning, CompressionCodec}; +use crate::writers::local::local_partition_writer::LocalPartitionWriter; +use crate::{CometPartitioning, CompressionCodec, ShuffleBlockWriter}; use async_trait::async_trait; use datafusion::common::exec_datafusion_err; use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; @@ -202,39 +203,39 @@ async fn external_shuffle( ) -> Result { let schema = input.schema(); + let shuffle_block_writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?; + let local_partition_writer = LocalPartitionWriter::try_new( + output_data_file, + output_index_file, + shuffle_block_writer, + partitioning.partition_count(), + context.session_config().batch_size(), + write_buffer_size, + )?; + let mut repartitioner: Box = match &partitioning { _ if schema.fields().is_empty() => { log::debug!("found empty schema, overriding {partitioning:?} partitioning with EmptySchemaShufflePartitioner"); Box::new(EmptySchemaShufflePartitioner::try_new( - output_data_file, - output_index_file, + local_partition_writer, Arc::clone(&schema), partitioning.partition_count(), metrics, - codec, )?) } any if any.partition_count() == 1 => Box::new(SinglePartitionShufflePartitioner::try_new( - output_data_file, - output_index_file, - Arc::clone(&schema), + local_partition_writer, metrics, context.session_config().batch_size(), - codec, - write_buffer_size, )?), _ => Box::new(MultiPartitionShuffleRepartitioner::try_new( partition, - output_data_file, - output_index_file, - Arc::clone(&schema), + local_partition_writer, partitioning, metrics, context.runtime_env(), context.session_config().batch_size(), - codec, tracing_enabled, - write_buffer_size, )?), }; @@ -346,38 +347,46 @@ mod test { let num_partitions = 2; let runtime_env = create_runtime(memory_limit); let metrics_set = ExecutionPlanMetricsSet::new(); - let mut repartitioner = MultiPartitionShuffleRepartitioner::try_new( - 0, + let shuffle_block_writer = + ShuffleBlockWriter::try_new(batch.schema().as_ref(), CompressionCodec::Lz4Frame) + .unwrap(); + let local_partition_writer = LocalPartitionWriter::try_new( "/tmp/data.out".to_string(), "/tmp/index.out".to_string(), - batch.schema(), + shuffle_block_writer, + num_partitions, + 1024, + 1024 * 1024, // write_buffer_size: 1MB default + ) + .unwrap(); + let mut repartitioner = MultiPartitionShuffleRepartitioner::try_new( + 0, + local_partition_writer, CometPartitioning::Hash(vec![Arc::new(Column::new("a", 0))], num_partitions), ShufflePartitionerMetrics::new(&metrics_set, 0), runtime_env, 1024, - CompressionCodec::Lz4Frame, false, - 1024 * 1024, // write_buffer_size: 1MB default ) .unwrap(); repartitioner.insert_batch(batch.clone()).await.unwrap(); { - let partition_writers = repartitioner.partition_writers(); - assert_eq!(partition_writers.len(), 2); + let spill_writers = repartitioner.partition_writer().get_spill_writers(); + assert_eq!(spill_writers.len(), 2); - assert!(!partition_writers[0].has_spill_file()); - assert!(!partition_writers[1].has_spill_file()); + assert!(!spill_writers[0].has_spill_file()); + assert!(!spill_writers[1].has_spill_file()); } repartitioner.spill().unwrap(); // after spill, there should be spill files { - let partition_writers = repartitioner.partition_writers(); - assert!(partition_writers[0].has_spill_file()); - assert!(partition_writers[1].has_spill_file()); + let spill_writers = repartitioner.partition_writer().get_spill_writers(); + assert!(spill_writers[0].has_spill_file()); + assert!(spill_writers[1].has_spill_file()); } // insert another batch after spilling @@ -409,18 +418,25 @@ mod test { let data_size = metrics.data_size.clone(); let spill_count = metrics.spill_count.clone(); let dir = tempfile::tempdir().unwrap(); - let mut repartitioner = MultiPartitionShuffleRepartitioner::try_new( - 0, + let shuffle_block_writer = + ShuffleBlockWriter::try_new(schema.as_ref(), CompressionCodec::Lz4Frame).unwrap(); + let local_partition_writer = LocalPartitionWriter::try_new( dir.path().join("data.out").to_str().unwrap().to_string(), dir.path().join("index.out").to_str().unwrap().to_string(), - backing.schema(), + shuffle_block_writer, + num_partitions, + 1024, + 1024 * 1024, // write_buffer_size: 1MB default + ) + .unwrap(); + let mut repartitioner = MultiPartitionShuffleRepartitioner::try_new( + 0, + local_partition_writer, CometPartitioning::Hash(vec![Arc::new(Column::new("a", 0))], num_partitions), metrics, runtime_env, batch_size, - CompressionCodec::Lz4Frame, false, - 1024 * 1024, ) .unwrap(); diff --git a/native/shuffle/src/writers/buf_batch_writer.rs b/native/shuffle/src/writers/buf_batch_writer.rs index cfddb46539..9b8d17c315 100644 --- a/native/shuffle/src/writers/buf_batch_writer.rs +++ b/native/shuffle/src/writers/buf_batch_writer.rs @@ -134,9 +134,3 @@ impl, W: Write> BufBatchWriter { Ok(()) } } - -impl, W: Write + Seek> BufBatchWriter { - pub(crate) fn writer_stream_position(&mut self) -> datafusion::common::Result { - self.writer.stream_position().map_err(Into::into) - } -} diff --git a/native/shuffle/src/writers/local/local_partition_writer.rs b/native/shuffle/src/writers/local/local_partition_writer.rs new file mode 100644 index 0000000000..5a0b1850fd --- /dev/null +++ b/native/shuffle/src/writers/local/local_partition_writer.rs @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metrics::ShufflePartitionerMetrics; +use crate::writers::local::spill::SpillWriter; +use crate::writers::partition_writer::PartitionWriter; +use crate::writers::BufBatchWriter; +use crate::ShuffleBlockWriter; +use arrow::array::RecordBatch; +use datafusion::common::DataFusionError; +use datafusion::execution::runtime_env::RuntimeEnv; +use std::fs::{File, OpenOptions}; +use std::io::{BufWriter, Seek, Write}; + +pub(crate) struct LocalPartitionWriter { + output_index_file: String, + spill_writers: Vec, + shuffle_block_writer: ShuffleBlockWriter, + output_writer: BufWriter, + offsets: Vec, + batch_size: usize, + write_buffer_size: usize, + num_output_partitions: usize, + last_finish_pid: usize, +} + +impl LocalPartitionWriter { + pub(crate) fn try_new( + output_data_file: String, + output_index_file: String, + shuffle_block_writer: ShuffleBlockWriter, + num_output_partitions: usize, + batch_size: usize, + write_buffer_size: usize, + ) -> datafusion::common::Result { + let spill_writers = if num_output_partitions == 1 { + vec![] + } else { + (0..num_output_partitions) + .map(|_| { + SpillWriter::try_new( + shuffle_block_writer.clone(), + write_buffer_size, + batch_size, + ) + }) + .collect::>>()? + }; + let output_writer = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(output_data_file.clone()) + .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; + + let output_writer = BufWriter::with_capacity(write_buffer_size, output_writer); + Ok(Self { + output_index_file, + spill_writers, + shuffle_block_writer, + output_writer, + offsets: vec![0u64; num_output_partitions + 1], + batch_size, + write_buffer_size, + num_output_partitions, + last_finish_pid: num_output_partitions - 1, + }) + } + + #[cfg(test)] + pub(crate) fn get_spill_writers(&self) -> &Vec { + &self.spill_writers + } +} + +impl PartitionWriter for LocalPartitionWriter { + fn spill( + &mut self, + pid: usize, + iter: &mut I, + runtime: &RuntimeEnv, + metrics: &ShufflePartitionerMetrics, + ) -> datafusion::common::Result<()> + where + I: Iterator>, + { + self.spill_writers[pid] + .write(iter, runtime, metrics) + .map(|_| ()) + } + + fn write( + &mut self, + pid: usize, + iter: &mut I, + metrics: &ShufflePartitionerMetrics, + ) -> datafusion::common::Result<()> + where + I: Iterator>, + { + assert!( + pid == 0 && self.spill_writers.is_empty(), + "LocalPartitionWriter::write only for single shuffle partition." + ); + + let mut buf_batch_writer = BufBatchWriter::new( + &mut self.shuffle_block_writer, + &mut self.output_writer, + self.write_buffer_size, + self.batch_size, + ); + + while let Some(batch) = iter.next() { + let batch = batch?; + buf_batch_writer.write(&batch, &metrics.encode_time, &metrics.write_time)?; + } + buf_batch_writer.flush(&metrics.encode_time, &metrics.write_time)?; + + Ok(()) + } + + fn finish_partition( + &mut self, + pid: usize, + iter: &mut I, + metrics: &ShufflePartitionerMetrics, + ) -> datafusion::common::Result<()> + where + I: Iterator>, + { + assert_eq!( + (pid + self.num_output_partitions - self.last_finish_pid) % self.num_output_partitions, + 1, + "LocalPartitionWriter::finish_partition must be called in order." + ); + self.last_finish_pid = pid; + + self.offsets[pid] = self.output_writer.stream_position()?; + + // if we wrote a spill file for this partition then copy the + // contents into the shuffle file + if let Some(spill_path) = self.spill_writers[pid].path() { + // Use raw File handle (not BufReader) so that std::io::copy + // can use copy_file_range/sendfile for zero-copy on Linux. + let mut spill_file = File::open(spill_path)?; + let mut write_timer = metrics.write_time.timer(); + std::io::copy(&mut spill_file, &mut self.output_writer)?; + write_timer.stop(); + } + + // Write in memory batches to output data file + let mut buf_batch_writer = BufBatchWriter::new( + &mut self.shuffle_block_writer, + &mut self.output_writer, + self.write_buffer_size, + self.batch_size, + ); + while let Some(batch) = iter.next() { + let batch = batch?; + buf_batch_writer.write(&batch, &metrics.encode_time, &metrics.write_time)?; + } + buf_batch_writer.flush(&metrics.encode_time, &metrics.write_time)?; + Ok(()) + } + + fn finish_all( + &mut self, + metrics: &ShufflePartitionerMetrics, + ) -> datafusion::common::Result<()> { + let mut write_timer = metrics.write_time.timer(); + self.output_writer.flush()?; + write_timer.stop(); + + // add one extra offset at last to ease partition length computation + self.offsets[self.num_output_partitions] = self.output_writer.stream_position()?; + + let mut write_timer = metrics.write_time.timer(); + let mut output_index = BufWriter::new( + File::create(self.output_index_file.clone()) + .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?, + ); + + self.offsets.iter().for_each(|offset| { + output_index.write_all(&(offset.to_le_bytes()[..])).unwrap(); + }); + output_index.flush()?; + write_timer.stop(); + + Ok(()) + } +} diff --git a/native/shuffle/src/writers/local/mod.rs b/native/shuffle/src/writers/local/mod.rs new file mode 100644 index 0000000000..96788ffd8d --- /dev/null +++ b/native/shuffle/src/writers/local/mod.rs @@ -0,0 +1,2 @@ +pub mod local_partition_writer; +mod spill; diff --git a/native/shuffle/src/writers/spill.rs b/native/shuffle/src/writers/local/spill.rs similarity index 80% rename from native/shuffle/src/writers/spill.rs rename to native/shuffle/src/writers/local/spill.rs index 624a45befe..a494bef322 100644 --- a/native/shuffle/src/writers/spill.rs +++ b/native/shuffle/src/writers/local/spill.rs @@ -15,87 +15,60 @@ // specific language governing permissions and limitations // under the License. -use super::ShuffleBlockWriter; use crate::metrics::ShufflePartitionerMetrics; -use crate::partitioners::PartitionedBatchIterator; -use crate::writers::buf_batch_writer::BufBatchWriter; +use crate::writers::BufBatchWriter; +use crate::ShuffleBlockWriter; +use arrow::record_batch::RecordBatch; use datafusion::common::DataFusionError; use datafusion::execution::disk_manager::RefCountedTempFile; use datafusion::execution::runtime_env::RuntimeEnv; use std::fs::{File, OpenOptions}; -/// A temporary disk file for spilling a partition's intermediate shuffle data. struct SpillFile { temp_file: RefCountedTempFile, file: File, } -/// Manages encoding and optional disk spilling for a single shuffle partition. -pub(crate) struct PartitionWriter { - /// Spill file for intermediate shuffle output for this partition. Each spill event - /// will append to this file and the contents will be copied to the shuffle file at - /// the end of processing. - spill_file: Option, - /// Writer that performs encoding and compression +pub(crate) struct SpillWriter { shuffle_block_writer: ShuffleBlockWriter, + write_buffer_size: usize, + batch_size: usize, + spill_file: Option, } -impl PartitionWriter { +impl SpillWriter { pub(crate) fn try_new( shuffle_block_writer: ShuffleBlockWriter, + write_buffer_size: usize, + batch_size: usize, ) -> datafusion::common::Result { Ok(Self { - spill_file: None, shuffle_block_writer, + write_buffer_size, + batch_size, + spill_file: None, }) } - fn ensure_spill_file_created( - &mut self, - runtime: &RuntimeEnv, - ) -> datafusion::common::Result<()> { - if self.spill_file.is_none() { - // Spill file is not yet created, create it - let spill_file = runtime - .disk_manager - .create_tmp_file("shuffle writer spill")?; - let spill_data = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(spill_file.path()) - .map_err(|e| { - DataFusionError::Execution(format!("Error occurred while spilling {e}")) - })?; - self.spill_file = Some(SpillFile { - temp_file: spill_file, - file: spill_data, - }); - } - Ok(()) - } - - pub(crate) fn spill( + pub(crate) fn write>>( &mut self, - iter: &mut PartitionedBatchIterator, + iter: &mut I, runtime: &RuntimeEnv, metrics: &ShufflePartitionerMetrics, - write_buffer_size: usize, - batch_size: usize, ) -> datafusion::common::Result { - if let Some(batch) = iter.next(&metrics.interleave_time) { + if let Some(batch) = iter.next() { self.ensure_spill_file_created(runtime)?; let total_bytes_written = { let mut buf_batch_writer = BufBatchWriter::new( &mut self.shuffle_block_writer, &mut self.spill_file.as_mut().unwrap().file, - write_buffer_size, - batch_size, + self.write_buffer_size, + self.batch_size, ); let mut bytes_written = buf_batch_writer.write(&batch?, &metrics.encode_time, &metrics.write_time)?; - while let Some(batch) = iter.next(&metrics.interleave_time) { + while let Some(batch) = iter.next() { let batch = batch?; bytes_written += buf_batch_writer.write( &batch, @@ -106,6 +79,7 @@ impl PartitionWriter { buf_batch_writer.flush(&metrics.encode_time, &metrics.write_time)?; bytes_written }; + metrics.spilled_bytes.add(total_bytes_written); Ok(total_bytes_written) } else { @@ -113,6 +87,31 @@ impl PartitionWriter { } } + fn ensure_spill_file_created( + &mut self, + runtime: &RuntimeEnv, + ) -> datafusion::common::Result<()> { + if self.spill_file.is_none() { + // Spill file is not yet created, create it + let spill_file = runtime + .disk_manager + .create_tmp_file("shuffle writer spill")?; + let spill_data = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(spill_file.path()) + .map_err(|e| { + DataFusionError::Execution(format!("Error occurred while spilling {e}")) + })?; + self.spill_file = Some(SpillFile { + temp_file: spill_file, + file: spill_data, + }); + } + Ok(()) + } + pub(crate) fn path(&self) -> Option<&std::path::Path> { self.spill_file .as_ref() diff --git a/native/shuffle/src/writers/mod.rs b/native/shuffle/src/writers/mod.rs index 75caf9f3a3..83ad57d3f0 100644 --- a/native/shuffle/src/writers/mod.rs +++ b/native/shuffle/src/writers/mod.rs @@ -17,10 +17,11 @@ mod buf_batch_writer; mod checksum; +pub mod local; +pub mod partition_writer; +pub mod rss; mod shuffle_block_writer; -mod spill; pub(crate) use buf_batch_writer::BufBatchWriter; pub(crate) use checksum::Checksum; pub use shuffle_block_writer::{CompressionCodec, ShuffleBlockWriter}; -pub(crate) use spill::PartitionWriter; diff --git a/native/shuffle/src/writers/partition_writer.rs b/native/shuffle/src/writers/partition_writer.rs new file mode 100644 index 0000000000..37fc565965 --- /dev/null +++ b/native/shuffle/src/writers/partition_writer.rs @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metrics::ShufflePartitionerMetrics; +use arrow::record_batch::RecordBatch; +use datafusion::execution::runtime_env::RuntimeEnv; + +#[async_trait::async_trait] +pub(crate) trait PartitionWriter: Send + Sync { + fn spill( + &mut self, + pid: usize, + iter: &mut I, + runtime: &RuntimeEnv, + metrics: &ShufflePartitionerMetrics, + ) -> datafusion::common::Result<()> + where + I: Iterator>; + + fn write( + &mut self, + pid: usize, + iter: &mut I, + metrics: &ShufflePartitionerMetrics, + ) -> datafusion::common::Result<()> + where + I: Iterator>; + + fn finish_partition( + &mut self, + pid: usize, + iter: &mut I, + metrics: &ShufflePartitionerMetrics, + ) -> datafusion::common::Result<()> + where + I: Iterator>; + + fn finish_all(&mut self, metrics: &ShufflePartitionerMetrics) + -> datafusion::common::Result<()>; +} diff --git a/native/shuffle/src/writers/rss/mod.rs b/native/shuffle/src/writers/rss/mod.rs new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/native/shuffle/src/writers/rss/mod.rs @@ -0,0 +1 @@ + From d99e4b047d2dd61e6e456558144d548075eec5c1 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Wed, 1 Jul 2026 15:27:15 +0800 Subject: [PATCH 02/14] license --- native/shuffle/src/writers/local/mod.rs | 17 +++++++++++++++++ native/shuffle/src/writers/rss/mod.rs | 17 ++++++++++++++++- 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/native/shuffle/src/writers/local/mod.rs b/native/shuffle/src/writers/local/mod.rs index 96788ffd8d..7939c26da8 100644 --- a/native/shuffle/src/writers/local/mod.rs +++ b/native/shuffle/src/writers/local/mod.rs @@ -1,2 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + pub mod local_partition_writer; mod spill; diff --git a/native/shuffle/src/writers/rss/mod.rs b/native/shuffle/src/writers/rss/mod.rs index 8b13789179..b248758bc1 100644 --- a/native/shuffle/src/writers/rss/mod.rs +++ b/native/shuffle/src/writers/rss/mod.rs @@ -1 +1,16 @@ - +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. From dc2d37936b7dd11d7a9828ef83a655e081ab3744 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Wed, 1 Jul 2026 15:38:11 +0800 Subject: [PATCH 03/14] fix clippy --- native/shuffle/src/metrics.rs | 4 ++-- native/shuffle/src/partitioners/empty_schema.rs | 4 ---- native/shuffle/src/writers/local/local_partition_writer.rs | 4 ++-- native/shuffle/src/writers/local/spill.rs | 2 +- 4 files changed, 5 insertions(+), 9 deletions(-) diff --git a/native/shuffle/src/metrics.rs b/native/shuffle/src/metrics.rs index bda245fd93..a8c890b031 100644 --- a/native/shuffle/src/metrics.rs +++ b/native/shuffle/src/metrics.rs @@ -28,7 +28,7 @@ pub(crate) struct ShufflePartitionerMetrics { pub(crate) repart_time: Time, /// Time spent in `interleave_record_batch` gathering shuffled batches - pub(crate) interleave_time: Time, + // TODO: pub(crate) interleave_time: Time, /// Time encoding batches to IPC format pub(crate) encode_time: Time, @@ -54,7 +54,7 @@ impl ShufflePartitionerMetrics { Self { baseline: BaselineMetrics::new(metrics, partition), repart_time: MetricBuilder::new(metrics).subset_time("repart_time", partition), - interleave_time: MetricBuilder::new(metrics).subset_time("interleave_time", partition), + // interleave_time: MetricBuilder::new(metrics).subset_time("interleave_time", partition), encode_time: MetricBuilder::new(metrics).subset_time("encode_time", partition), write_time: MetricBuilder::new(metrics).subset_time("write_time", partition), input_batches: MetricBuilder::new(metrics).counter("input_batches", partition), diff --git a/native/shuffle/src/partitioners/empty_schema.rs b/native/shuffle/src/partitioners/empty_schema.rs index b7b5dce60a..303c86f374 100644 --- a/native/shuffle/src/partitioners/empty_schema.rs +++ b/native/shuffle/src/partitioners/empty_schema.rs @@ -18,12 +18,8 @@ use crate::metrics::ShufflePartitionerMetrics; use crate::partitioners::ShufflePartitioner; use crate::writers::partition_writer::PartitionWriter; -use crate::ShuffleBlockWriter; use arrow::array::RecordBatch; use arrow::datatypes::SchemaRef; -use datafusion::common::DataFusionError; -use std::fs::OpenOptions; -use std::io::{BufWriter, Seek, Write}; use std::iter; use tokio::time::Instant; diff --git a/native/shuffle/src/writers/local/local_partition_writer.rs b/native/shuffle/src/writers/local/local_partition_writer.rs index 5a0b1850fd..b66743d8e4 100644 --- a/native/shuffle/src/writers/local/local_partition_writer.rs +++ b/native/shuffle/src/writers/local/local_partition_writer.rs @@ -124,7 +124,7 @@ impl PartitionWriter for LocalPartitionWriter { self.batch_size, ); - while let Some(batch) = iter.next() { + for batch in iter.by_ref() { let batch = batch?; buf_batch_writer.write(&batch, &metrics.encode_time, &metrics.write_time)?; } @@ -169,7 +169,7 @@ impl PartitionWriter for LocalPartitionWriter { self.write_buffer_size, self.batch_size, ); - while let Some(batch) = iter.next() { + for batch in iter.by_ref() { let batch = batch?; buf_batch_writer.write(&batch, &metrics.encode_time, &metrics.write_time)?; } diff --git a/native/shuffle/src/writers/local/spill.rs b/native/shuffle/src/writers/local/spill.rs index a494bef322..85db2db2d2 100644 --- a/native/shuffle/src/writers/local/spill.rs +++ b/native/shuffle/src/writers/local/spill.rs @@ -68,7 +68,7 @@ impl SpillWriter { ); let mut bytes_written = buf_batch_writer.write(&batch?, &metrics.encode_time, &metrics.write_time)?; - while let Some(batch) = iter.next() { + for batch in iter.by_ref() { let batch = batch?; bytes_written += buf_batch_writer.write( &batch, From a276e86d3375be651feafefccfd46ae5c2d49b96 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Wed, 1 Jul 2026 16:44:53 +0800 Subject: [PATCH 04/14] fix --- .../shuffle/src/writers/local/local_partition_writer.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/native/shuffle/src/writers/local/local_partition_writer.rs b/native/shuffle/src/writers/local/local_partition_writer.rs index b66743d8e4..8e3ee2fd82 100644 --- a/native/shuffle/src/writers/local/local_partition_writer.rs +++ b/native/shuffle/src/writers/local/local_partition_writer.rs @@ -35,7 +35,7 @@ pub(crate) struct LocalPartitionWriter { batch_size: usize, write_buffer_size: usize, num_output_partitions: usize, - last_finish_pid: usize, + last_finish_pid: i32, } impl LocalPartitionWriter { @@ -77,7 +77,7 @@ impl LocalPartitionWriter { batch_size, write_buffer_size, num_output_partitions, - last_finish_pid: num_output_partitions - 1, + last_finish_pid: -1, }) } @@ -143,11 +143,11 @@ impl PartitionWriter for LocalPartitionWriter { I: Iterator>, { assert_eq!( - (pid + self.num_output_partitions - self.last_finish_pid) % self.num_output_partitions, + pid as i32 - self.last_finish_pid, 1, "LocalPartitionWriter::finish_partition must be called in order." ); - self.last_finish_pid = pid; + self.last_finish_pid = pid as i32; self.offsets[pid] = self.output_writer.stream_position()?; From 5857c2a383b577fb4060522b6a49e9a15cd73379 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Wed, 1 Jul 2026 17:52:17 +0800 Subject: [PATCH 05/14] fix --- .../src/writers/local/local_partition_writer.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/native/shuffle/src/writers/local/local_partition_writer.rs b/native/shuffle/src/writers/local/local_partition_writer.rs index 8e3ee2fd82..1ee5d3ffe1 100644 --- a/native/shuffle/src/writers/local/local_partition_writer.rs +++ b/native/shuffle/src/writers/local/local_partition_writer.rs @@ -153,13 +153,15 @@ impl PartitionWriter for LocalPartitionWriter { // if we wrote a spill file for this partition then copy the // contents into the shuffle file - if let Some(spill_path) = self.spill_writers[pid].path() { - // Use raw File handle (not BufReader) so that std::io::copy - // can use copy_file_range/sendfile for zero-copy on Linux. - let mut spill_file = File::open(spill_path)?; - let mut write_timer = metrics.write_time.timer(); - std::io::copy(&mut spill_file, &mut self.output_writer)?; - write_timer.stop(); + if let Some(writer) = self.spill_writers.get(pid) { + if let Some(spill_path) = writer.path() { + // Use raw File handle (not BufReader) so that std::io::copy + // can use copy_file_range/sendfile for zero-copy on Linux. + let mut spill_file = File::open(spill_path)?; + let mut write_timer = metrics.write_time.timer(); + std::io::copy(&mut spill_file, &mut self.output_writer)?; + write_timer.stop(); + } } // Write in memory batches to output data file From 22d441aa61c2d5cb6a08562c16d4e8060072baf1 Mon Sep 17 00:00:00 2001 From: Zhen Wang <643348094@qq.com> Date: Thu, 2 Jul 2026 11:25:56 +0800 Subject: [PATCH 06/14] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- native/shuffle/src/writers/partition_writer.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/native/shuffle/src/writers/partition_writer.rs b/native/shuffle/src/writers/partition_writer.rs index 37fc565965..0d91a5bc55 100644 --- a/native/shuffle/src/writers/partition_writer.rs +++ b/native/shuffle/src/writers/partition_writer.rs @@ -19,7 +19,6 @@ use crate::metrics::ShufflePartitionerMetrics; use arrow::record_batch::RecordBatch; use datafusion::execution::runtime_env::RuntimeEnv; -#[async_trait::async_trait] pub(crate) trait PartitionWriter: Send + Sync { fn spill( &mut self, From 14a8096326fca8bd039a28e80f1c0c23ab2bc0f5 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Thu, 2 Jul 2026 11:29:10 +0800 Subject: [PATCH 07/14] address copilot comments --- native/shuffle/src/partitioners/single_partition.rs | 3 +++ .../shuffle/src/writers/local/local_partition_writer.rs | 9 ++++++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/native/shuffle/src/partitioners/single_partition.rs b/native/shuffle/src/partitioners/single_partition.rs index 7215857cfd..c5d0cf7ac6 100644 --- a/native/shuffle/src/partitioners/single_partition.rs +++ b/native/shuffle/src/partitioners/single_partition.rs @@ -132,6 +132,9 @@ impl ShufflePartitioner for SinglePartitionShufflePartitione self.partition_writer .write(0, &mut iter::once(Ok(batch)), &self.metrics)?; } + + self.partition_writer.finish_partition(0, &mut iter::empty(), &self.metrics)?; + self.partition_writer.finish_all(&self.metrics)?; self.metrics diff --git a/native/shuffle/src/writers/local/local_partition_writer.rs b/native/shuffle/src/writers/local/local_partition_writer.rs index 1ee5d3ffe1..a155bd4c0b 100644 --- a/native/shuffle/src/writers/local/local_partition_writer.rs +++ b/native/shuffle/src/writers/local/local_partition_writer.rs @@ -196,9 +196,12 @@ impl PartitionWriter for LocalPartitionWriter { .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?, ); - self.offsets.iter().for_each(|offset| { - output_index.write_all(&(offset.to_le_bytes()[..])).unwrap(); - }); + for offset in &self.offsets { + let offset_i64 = i64::try_from(*offset).map_err(|_| { + DataFusionError::Execution(format!("shuffle write error: offset overflow ({offset})")) + })?; + output_index.write_all(&offset_i64.to_le_bytes())?; + } output_index.flush()?; write_timer.stop(); From 12640cb6b67c7aa19a24410d26e0e35516939f9f Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Thu, 2 Jul 2026 11:37:21 +0800 Subject: [PATCH 08/14] address copilot comments --- native/shuffle/src/partitioners/empty_schema.rs | 4 +--- native/shuffle/src/writers/local/local_partition_writer.rs | 2 -- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/native/shuffle/src/partitioners/empty_schema.rs b/native/shuffle/src/partitioners/empty_schema.rs index 303c86f374..6b820ae053 100644 --- a/native/shuffle/src/partitioners/empty_schema.rs +++ b/native/shuffle/src/partitioners/empty_schema.rs @@ -76,8 +76,6 @@ impl ShufflePartitioner for EmptySchemaShufflePartitioner fn shuffle_write(&mut self) -> datafusion::common::Result<()> { let start_time = Instant::now(); - let mut write_timer = self.metrics.write_time.timer(); - // Write a single zero-column batch with the accumulated row count to partition 0 let batch_opt = if self.total_rows > 0 { Some(Ok(RecordBatch::try_new_with_options( @@ -88,6 +86,7 @@ impl ShufflePartitioner for EmptySchemaShufflePartitioner } else { None }; + self.partition_writer .finish_partition(0, &mut batch_opt.into_iter(), &self.metrics)?; for pid in 1..self.num_output_partitions { @@ -95,7 +94,6 @@ impl ShufflePartitioner for EmptySchemaShufflePartitioner .finish_partition(pid, &mut iter::empty(), &self.metrics)?; } self.partition_writer.finish_all(&self.metrics)?; - write_timer.stop(); self.metrics .baseline diff --git a/native/shuffle/src/writers/local/local_partition_writer.rs b/native/shuffle/src/writers/local/local_partition_writer.rs index a155bd4c0b..ec36668fcd 100644 --- a/native/shuffle/src/writers/local/local_partition_writer.rs +++ b/native/shuffle/src/writers/local/local_partition_writer.rs @@ -185,12 +185,10 @@ impl PartitionWriter for LocalPartitionWriter { ) -> datafusion::common::Result<()> { let mut write_timer = metrics.write_time.timer(); self.output_writer.flush()?; - write_timer.stop(); // add one extra offset at last to ease partition length computation self.offsets[self.num_output_partitions] = self.output_writer.stream_position()?; - let mut write_timer = metrics.write_time.timer(); let mut output_index = BufWriter::new( File::create(self.output_index_file.clone()) .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?, From 51618cfd181d4f6767e8b45811aa82971718da66 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Thu, 2 Jul 2026 11:46:49 +0800 Subject: [PATCH 09/14] interleave timer and style fix --- native/shuffle/src/metrics.rs | 4 ++-- native/shuffle/src/partitioners/multi_partition.rs | 4 ++-- .../src/partitioners/partitioned_batch_iterator.rs | 14 +++++++++++++- .../shuffle/src/partitioners/single_partition.rs | 3 ++- .../src/writers/local/local_partition_writer.rs | 4 +++- 5 files changed, 22 insertions(+), 7 deletions(-) diff --git a/native/shuffle/src/metrics.rs b/native/shuffle/src/metrics.rs index a8c890b031..bda245fd93 100644 --- a/native/shuffle/src/metrics.rs +++ b/native/shuffle/src/metrics.rs @@ -28,7 +28,7 @@ pub(crate) struct ShufflePartitionerMetrics { pub(crate) repart_time: Time, /// Time spent in `interleave_record_batch` gathering shuffled batches - // TODO: pub(crate) interleave_time: Time, + pub(crate) interleave_time: Time, /// Time encoding batches to IPC format pub(crate) encode_time: Time, @@ -54,7 +54,7 @@ impl ShufflePartitionerMetrics { Self { baseline: BaselineMetrics::new(metrics, partition), repart_time: MetricBuilder::new(metrics).subset_time("repart_time", partition), - // interleave_time: MetricBuilder::new(metrics).subset_time("interleave_time", partition), + interleave_time: MetricBuilder::new(metrics).subset_time("interleave_time", partition), encode_time: MetricBuilder::new(metrics).subset_time("encode_time", partition), write_time: MetricBuilder::new(metrics).subset_time("write_time", partition), input_batches: MetricBuilder::new(metrics).counter("input_batches", partition), diff --git a/native/shuffle/src/partitioners/multi_partition.rs b/native/shuffle/src/partitioners/multi_partition.rs index 923bf177e4..62867aaa32 100644 --- a/native/shuffle/src/partitioners/multi_partition.rs +++ b/native/shuffle/src/partitioners/multi_partition.rs @@ -509,7 +509,7 @@ impl MultiPartitionShuffleRepartitioner { for partition_id in 0..num_output_partitions { self.partition_writer.spill( partition_id, - &mut partitioned_batches.produce(partition_id), + &mut partitioned_batches.produce(partition_id, &self.metrics.interleave_time), &self.runtime, &self.metrics, )?; @@ -566,7 +566,7 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartition for i in 0..num_output_partitions { self.partition_writer.finish_partition( i, - &mut partitioned_batches.produce(i), + &mut partitioned_batches.produce(i, &self.metrics.interleave_time), &self.metrics, )?; } diff --git a/native/shuffle/src/partitioners/partitioned_batch_iterator.rs b/native/shuffle/src/partitioners/partitioned_batch_iterator.rs index 2863616ddd..58ec5bc9b5 100644 --- a/native/shuffle/src/partitioners/partitioned_batch_iterator.rs +++ b/native/shuffle/src/partitioners/partitioned_batch_iterator.rs @@ -18,6 +18,7 @@ use arrow::array::RecordBatch; use arrow::compute::interleave_record_batch; use datafusion::common::DataFusionError; +use datafusion::physical_expr_common::metrics::Time; /// A helper struct to produce shuffled batches. /// This struct takes ownership of the buffered batches and partition indices from the @@ -41,11 +42,16 @@ impl PartitionedBatchesProducer { } } - pub(super) fn produce(&mut self, partition_id: usize) -> PartitionedBatchIterator<'_> { + pub(super) fn produce<'a>( + &'a mut self, + partition_id: usize, + interleave_time: &'a Time, + ) -> PartitionedBatchIterator<'a> { PartitionedBatchIterator::new( &self.partition_indices[partition_id], &self.buffered_batches, self.batch_size, + interleave_time, ) } } @@ -56,6 +62,7 @@ pub(crate) struct PartitionedBatchIterator<'a> { batch_size: usize, indices: Vec<(usize, usize)>, pos: usize, + interleave_time: &'a Time, } impl<'a> PartitionedBatchIterator<'a> { @@ -63,6 +70,7 @@ impl<'a> PartitionedBatchIterator<'a> { indices: &'a [(u32, u32)], buffered_batches: &'a [RecordBatch], batch_size: usize, + interleave_time: &'a Time, ) -> Self { if indices.is_empty() { // Avoid unnecessary allocations when the partition is empty @@ -71,6 +79,7 @@ impl<'a> PartitionedBatchIterator<'a> { batch_size, indices: vec![], pos: 0, + interleave_time, }; } let record_batches = buffered_batches.iter().collect::>(); @@ -83,6 +92,7 @@ impl<'a> PartitionedBatchIterator<'a> { batch_size, indices: current_indices, pos: 0, + interleave_time, } } } @@ -97,7 +107,9 @@ impl Iterator for PartitionedBatchIterator<'_> { let indices_end = std::cmp::min(self.pos + self.batch_size, self.indices.len()); let indices = &self.indices[self.pos..indices_end]; + let mut timer = self.interleave_time.timer(); let result = interleave_record_batch(&self.record_batches, indices); + timer.stop(); match result { Ok(batch) => { self.pos = indices_end; diff --git a/native/shuffle/src/partitioners/single_partition.rs b/native/shuffle/src/partitioners/single_partition.rs index c5d0cf7ac6..5c14b1df85 100644 --- a/native/shuffle/src/partitioners/single_partition.rs +++ b/native/shuffle/src/partitioners/single_partition.rs @@ -133,7 +133,8 @@ impl ShufflePartitioner for SinglePartitionShufflePartitione .write(0, &mut iter::once(Ok(batch)), &self.metrics)?; } - self.partition_writer.finish_partition(0, &mut iter::empty(), &self.metrics)?; + self.partition_writer + .finish_partition(0, &mut iter::empty(), &self.metrics)?; self.partition_writer.finish_all(&self.metrics)?; diff --git a/native/shuffle/src/writers/local/local_partition_writer.rs b/native/shuffle/src/writers/local/local_partition_writer.rs index ec36668fcd..af47391237 100644 --- a/native/shuffle/src/writers/local/local_partition_writer.rs +++ b/native/shuffle/src/writers/local/local_partition_writer.rs @@ -196,7 +196,9 @@ impl PartitionWriter for LocalPartitionWriter { for offset in &self.offsets { let offset_i64 = i64::try_from(*offset).map_err(|_| { - DataFusionError::Execution(format!("shuffle write error: offset overflow ({offset})")) + DataFusionError::Execution(format!( + "shuffle write error: offset overflow ({offset})" + )) })?; output_index.write_all(&offset_i64.to_le_bytes())?; } From 2c6bc038aedc9be0ab2e256384ad5db0f2620ccd Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Thu, 2 Jul 2026 17:18:52 +0800 Subject: [PATCH 10/14] remove spill method and optimize single partition write --- .../src/partitioners/multi_partition.rs | 5 +- native/shuffle/src/shuffle_writer.rs | 3 + .../shuffle/src/writers/buf_batch_writer.rs | 6 + .../writers/local/local_partition_writer.rs | 223 ++++++++++++------ .../shuffle/src/writers/partition_writer.rs | 11 - 5 files changed, 158 insertions(+), 90 deletions(-) diff --git a/native/shuffle/src/partitioners/multi_partition.rs b/native/shuffle/src/partitioners/multi_partition.rs index 62867aaa32..7f9f236b2b 100644 --- a/native/shuffle/src/partitioners/multi_partition.rs +++ b/native/shuffle/src/partitioners/multi_partition.rs @@ -105,7 +105,6 @@ pub(crate) struct MultiPartitionShuffleRepartitioner { partition_writer: T, /// Partitioning scheme to use partitioning: CometPartitioning, - runtime: Arc, metrics: ShufflePartitionerMetrics, /// Reused scratch space for computing partition indices scratch: ScratchSpace, @@ -208,7 +207,6 @@ impl MultiPartitionShuffleRepartitioner { partition_indices: vec![vec![]; num_output_partitions], partition_writer, partitioning, - runtime, metrics, scratch, batch_size, @@ -507,10 +505,9 @@ impl MultiPartitionShuffleRepartitioner { let mut partitioned_batches = self.partitioned_batches(); for partition_id in 0..num_output_partitions { - self.partition_writer.spill( + self.partition_writer.write( partition_id, &mut partitioned_batches.produce(partition_id, &self.metrics.interleave_time), - &self.runtime, &self.metrics, )?; } diff --git a/native/shuffle/src/shuffle_writer.rs b/native/shuffle/src/shuffle_writer.rs index c3fe17491e..e631e2073c 100644 --- a/native/shuffle/src/shuffle_writer.rs +++ b/native/shuffle/src/shuffle_writer.rs @@ -211,6 +211,7 @@ async fn external_shuffle( partitioning.partition_count(), context.session_config().batch_size(), write_buffer_size, + context.runtime_env(), )?; let mut repartitioner: Box = match &partitioning { @@ -357,6 +358,7 @@ mod test { num_partitions, 1024, 1024 * 1024, // write_buffer_size: 1MB default + Arc::clone(&runtime_env), ) .unwrap(); let mut repartitioner = MultiPartitionShuffleRepartitioner::try_new( @@ -427,6 +429,7 @@ mod test { num_partitions, 1024, 1024 * 1024, // write_buffer_size: 1MB default + Arc::clone(&runtime_env), ) .unwrap(); let mut repartitioner = MultiPartitionShuffleRepartitioner::try_new( diff --git a/native/shuffle/src/writers/buf_batch_writer.rs b/native/shuffle/src/writers/buf_batch_writer.rs index 9b8d17c315..cfddb46539 100644 --- a/native/shuffle/src/writers/buf_batch_writer.rs +++ b/native/shuffle/src/writers/buf_batch_writer.rs @@ -134,3 +134,9 @@ impl, W: Write> BufBatchWriter { Ok(()) } } + +impl, W: Write + Seek> BufBatchWriter { + pub(crate) fn writer_stream_position(&mut self) -> datafusion::common::Result { + self.writer.stream_position().map_err(Into::into) + } +} diff --git a/native/shuffle/src/writers/local/local_partition_writer.rs b/native/shuffle/src/writers/local/local_partition_writer.rs index af47391237..f6621ef059 100644 --- a/native/shuffle/src/writers/local/local_partition_writer.rs +++ b/native/shuffle/src/writers/local/local_partition_writer.rs @@ -25,12 +25,37 @@ use datafusion::common::DataFusionError; use datafusion::execution::runtime_env::RuntimeEnv; use std::fs::{File, OpenOptions}; use std::io::{BufWriter, Seek, Write}; +use std::sync::Arc; + +/// Output target for the shuffle data file. +/// +/// The two shuffle modes drive the writer differently: +/// +/// * Single-partition shuffles stream every batch through a single long-lived +/// [`BufBatchWriter`]. Keeping it alive across `write` calls preserves +/// cross-batch coalescing in the internal `BatchCoalescer` and limits +/// flushing (which also finalizes partially coalesced batches) to +/// [`PartitionWriter::finish_all`]. +/// * Multi-partition shuffles finalize one partition at a time in +/// [`PartitionWriter::finish_partition`], each with its own short-lived +/// `BufBatchWriter`, so coalescing intentionally does not cross partition +/// boundaries. They hold the raw output writer and block writer directly. +enum DataOutput { + Single(BufBatchWriter>), + Multi { + output_writer: BufWriter, + shuffle_block_writer: ShuffleBlockWriter, + /// One spill file per output partition, buffered until `finish_partition` + /// merges them into the shuffle output. + spill_writers: Vec, + /// Runtime used to allocate the temporary spill files. + runtime: Arc, + }, +} pub(crate) struct LocalPartitionWriter { output_index_file: String, - spill_writers: Vec, - shuffle_block_writer: ShuffleBlockWriter, - output_writer: BufWriter, + data_output: DataOutput, offsets: Vec, batch_size: usize, write_buffer_size: usize, @@ -46,11 +71,25 @@ impl LocalPartitionWriter { num_output_partitions: usize, batch_size: usize, write_buffer_size: usize, + runtime: Arc, ) -> datafusion::common::Result { - let spill_writers = if num_output_partitions == 1 { - vec![] + let output_file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(output_data_file.clone()) + .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; + + let output_writer = BufWriter::with_capacity(write_buffer_size, output_file); + let data_output = if num_output_partitions == 1 { + DataOutput::Single(BufBatchWriter::new( + shuffle_block_writer, + output_writer, + write_buffer_size, + batch_size, + )) } else { - (0..num_output_partitions) + let spill_writers = (0..num_output_partitions) .map(|_| { SpillWriter::try_new( shuffle_block_writer.clone(), @@ -58,21 +97,17 @@ impl LocalPartitionWriter { batch_size, ) }) - .collect::>>()? + .collect::>>()?; + DataOutput::Multi { + output_writer, + shuffle_block_writer, + spill_writers, + runtime, + } }; - let output_writer = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(output_data_file.clone()) - .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; - - let output_writer = BufWriter::with_capacity(write_buffer_size, output_writer); Ok(Self { output_index_file, - spill_writers, - shuffle_block_writer, - output_writer, + data_output, offsets: vec![0u64; num_output_partitions + 1], batch_size, write_buffer_size, @@ -83,26 +118,14 @@ impl LocalPartitionWriter { #[cfg(test)] pub(crate) fn get_spill_writers(&self) -> &Vec { - &self.spill_writers + match &self.data_output { + DataOutput::Multi { spill_writers, .. } => spill_writers, + DataOutput::Single(_) => panic!("single-partition output has no spill writers"), + } } } impl PartitionWriter for LocalPartitionWriter { - fn spill( - &mut self, - pid: usize, - iter: &mut I, - runtime: &RuntimeEnv, - metrics: &ShufflePartitionerMetrics, - ) -> datafusion::common::Result<()> - where - I: Iterator>, - { - self.spill_writers[pid] - .write(iter, runtime, metrics) - .map(|_| ()) - } - fn write( &mut self, pid: usize, @@ -112,23 +135,36 @@ impl PartitionWriter for LocalPartitionWriter { where I: Iterator>, { - assert!( - pid == 0 && self.spill_writers.is_empty(), - "LocalPartitionWriter::write only for single shuffle partition." - ); + match &mut self.data_output { + DataOutput::Single(writer) => { + if pid != 0 { + return Err(DataFusionError::Execution( + "LocalPartitionWriter single-partition output only supports partition 0." + .to_string(), + )); + } - let mut buf_batch_writer = BufBatchWriter::new( - &mut self.shuffle_block_writer, - &mut self.output_writer, - self.write_buffer_size, - self.batch_size, - ); - - for batch in iter.by_ref() { - let batch = batch?; - buf_batch_writer.write(&batch, &metrics.encode_time, &metrics.write_time)?; + // Stream batches through the long-lived writer so small batches keep + // coalescing across calls. Do not flush here: flushing also finalizes any + // partially coalesced batch, which would defeat cross-call coalescing and + // increase flush frequency. The single-partition writer is flushed once, in + // `finish_all`. + for batch in iter.by_ref() { + let batch = batch?; + writer.write(&batch, &metrics.encode_time, &metrics.write_time)?; + } + } + DataOutput::Multi { + spill_writers, + runtime, + .. + } => { + // Multi-partition output buffers each partition's batches into its own + // spill file. `finish_partition` later merges the spill files (and any + // remaining in-memory batches) into the shuffle output in partition order. + spill_writers[pid].write(iter, runtime, metrics)?; + } } - buf_batch_writer.flush(&metrics.encode_time, &metrics.write_time)?; Ok(()) } @@ -149,33 +185,56 @@ impl PartitionWriter for LocalPartitionWriter { ); self.last_finish_pid = pid as i32; - self.offsets[pid] = self.output_writer.stream_position()?; + let write_buffer_size = self.write_buffer_size; + let batch_size = self.batch_size; - // if we wrote a spill file for this partition then copy the - // contents into the shuffle file - if let Some(writer) = self.spill_writers.get(pid) { - if let Some(spill_path) = writer.path() { - // Use raw File handle (not BufReader) so that std::io::copy - // can use copy_file_range/sendfile for zero-copy on Linux. - let mut spill_file = File::open(spill_path)?; - let mut write_timer = metrics.write_time.timer(); - std::io::copy(&mut spill_file, &mut self.output_writer)?; - write_timer.stop(); + match &mut self.data_output { + DataOutput::Single(writer) => { + // Single-partition data was already streamed via `write`, starting at + // offset 0 (already recorded in `self.offsets[0]`). Stream any trailing + // batches (normally none) without flushing; the long-lived writer is + // flushed once in `finish_all`. + for batch in iter.by_ref() { + let batch = batch?; + writer.write(&batch, &metrics.encode_time, &metrics.write_time)?; + } } - } + DataOutput::Multi { + output_writer, + shuffle_block_writer, + spill_writers, + .. + } => { + self.offsets[pid] = output_writer.stream_position()?; - // Write in memory batches to output data file - let mut buf_batch_writer = BufBatchWriter::new( - &mut self.shuffle_block_writer, - &mut self.output_writer, - self.write_buffer_size, - self.batch_size, - ); - for batch in iter.by_ref() { - let batch = batch?; - buf_batch_writer.write(&batch, &metrics.encode_time, &metrics.write_time)?; + // if we wrote a spill file for this partition then copy the + // contents into the shuffle file + if let Some(writer) = spill_writers.get(pid) { + if let Some(spill_path) = writer.path() { + // Use raw File handle (not BufReader) so that std::io::copy + // can use copy_file_range/sendfile for zero-copy on Linux. + let mut spill_file = File::open(spill_path)?; + let mut write_timer = metrics.write_time.timer(); + std::io::copy(&mut spill_file, output_writer)?; + write_timer.stop(); + } + } + + // Write in memory batches to output data file. Each partition uses its + // own writer so coalescing does not cross partition boundaries. + let mut buf_batch_writer = BufBatchWriter::new( + shuffle_block_writer, + output_writer, + write_buffer_size, + batch_size, + ); + for batch in iter.by_ref() { + let batch = batch?; + buf_batch_writer.write(&batch, &metrics.encode_time, &metrics.write_time)?; + } + buf_batch_writer.flush(&metrics.encode_time, &metrics.write_time)?; + } } - buf_batch_writer.flush(&metrics.encode_time, &metrics.write_time)?; Ok(()) } @@ -183,12 +242,26 @@ impl PartitionWriter for LocalPartitionWriter { &mut self, metrics: &ShufflePartitionerMetrics, ) -> datafusion::common::Result<()> { - let mut write_timer = metrics.write_time.timer(); - self.output_writer.flush()?; + // Flush the data output and capture the final position. For the + // single-partition writer this also finalizes the last coalesced batch. + let final_offset = match &mut self.data_output { + DataOutput::Single(writer) => { + writer.flush(&metrics.encode_time, &metrics.write_time)?; + writer.writer_stream_position()? + } + DataOutput::Multi { output_writer, .. } => { + let mut write_timer = metrics.write_time.timer(); + output_writer.flush()?; + let pos = output_writer.stream_position()?; + write_timer.stop(); + pos + } + }; // add one extra offset at last to ease partition length computation - self.offsets[self.num_output_partitions] = self.output_writer.stream_position()?; + self.offsets[self.num_output_partitions] = final_offset; + let mut write_timer = metrics.write_time.timer(); let mut output_index = BufWriter::new( File::create(self.output_index_file.clone()) .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?, diff --git a/native/shuffle/src/writers/partition_writer.rs b/native/shuffle/src/writers/partition_writer.rs index 0d91a5bc55..2528d1ab97 100644 --- a/native/shuffle/src/writers/partition_writer.rs +++ b/native/shuffle/src/writers/partition_writer.rs @@ -17,19 +17,8 @@ use crate::metrics::ShufflePartitionerMetrics; use arrow::record_batch::RecordBatch; -use datafusion::execution::runtime_env::RuntimeEnv; pub(crate) trait PartitionWriter: Send + Sync { - fn spill( - &mut self, - pid: usize, - iter: &mut I, - runtime: &RuntimeEnv, - metrics: &ShufflePartitionerMetrics, - ) -> datafusion::common::Result<()> - where - I: Iterator>; - fn write( &mut self, pid: usize, From 759901aa32566ef6ce685b60b168a50e68409d77 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Thu, 2 Jul 2026 20:10:12 +0800 Subject: [PATCH 11/14] refactor --- .../shuffle/src/partitioners/partitioned_batch_iterator.rs | 2 +- native/shuffle/src/writers/local/local_partition_writer.rs | 6 +++--- native/shuffle/src/writers/local/spill.rs | 7 ++----- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/native/shuffle/src/partitioners/partitioned_batch_iterator.rs b/native/shuffle/src/partitioners/partitioned_batch_iterator.rs index 58ec5bc9b5..f124d98ff2 100644 --- a/native/shuffle/src/partitioners/partitioned_batch_iterator.rs +++ b/native/shuffle/src/partitioners/partitioned_batch_iterator.rs @@ -18,7 +18,7 @@ use arrow::array::RecordBatch; use arrow::compute::interleave_record_batch; use datafusion::common::DataFusionError; -use datafusion::physical_expr_common::metrics::Time; +use datafusion::physical_plan::metrics::Time; /// A helper struct to produce shuffled batches. /// This struct takes ownership of the buffered batches and partition indices from the diff --git a/native/shuffle/src/writers/local/local_partition_writer.rs b/native/shuffle/src/writers/local/local_partition_writer.rs index f6621ef059..3bde83d781 100644 --- a/native/shuffle/src/writers/local/local_partition_writer.rs +++ b/native/shuffle/src/writers/local/local_partition_writer.rs @@ -41,7 +41,7 @@ use std::sync::Arc; /// `BufBatchWriter`, so coalescing intentionally does not cross partition /// boundaries. They hold the raw output writer and block writer directly. enum DataOutput { - Single(BufBatchWriter>), + Single(BufBatchWriter), Multi { output_writer: BufWriter, shuffle_block_writer: ShuffleBlockWriter, @@ -80,15 +80,15 @@ impl LocalPartitionWriter { .open(output_data_file.clone()) .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; - let output_writer = BufWriter::with_capacity(write_buffer_size, output_file); let data_output = if num_output_partitions == 1 { DataOutput::Single(BufBatchWriter::new( shuffle_block_writer, - output_writer, + output_file, write_buffer_size, batch_size, )) } else { + let output_writer = BufWriter::with_capacity(write_buffer_size, output_file); let spill_writers = (0..num_output_partitions) .map(|_| { SpillWriter::try_new( diff --git a/native/shuffle/src/writers/local/spill.rs b/native/shuffle/src/writers/local/spill.rs index 85db2db2d2..450b617421 100644 --- a/native/shuffle/src/writers/local/spill.rs +++ b/native/shuffle/src/writers/local/spill.rs @@ -55,7 +55,7 @@ impl SpillWriter { iter: &mut I, runtime: &RuntimeEnv, metrics: &ShufflePartitionerMetrics, - ) -> datafusion::common::Result { + ) -> datafusion::common::Result<()> { if let Some(batch) = iter.next() { self.ensure_spill_file_created(runtime)?; @@ -80,11 +80,8 @@ impl SpillWriter { bytes_written }; metrics.spilled_bytes.add(total_bytes_written); - - Ok(total_bytes_written) - } else { - Ok(0) } + Ok(()) } fn ensure_spill_file_created( From 2abf4f99816e8eeb00e63d4806bfde71865d8083 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Thu, 2 Jul 2026 20:17:33 +0800 Subject: [PATCH 12/14] chore --- native/shuffle/src/partitioners/empty_schema.rs | 2 +- .../shuffle/src/partitioners/multi_partition.rs | 2 +- .../shuffle/src/partitioners/single_partition.rs | 2 +- native/shuffle/src/shuffle_writer.rs | 2 +- native/shuffle/src/writers/mod.rs | 7 ++++--- native/shuffle/src/writers/rss/mod.rs | 16 ---------------- 6 files changed, 8 insertions(+), 23 deletions(-) delete mode 100644 native/shuffle/src/writers/rss/mod.rs diff --git a/native/shuffle/src/partitioners/empty_schema.rs b/native/shuffle/src/partitioners/empty_schema.rs index 6b820ae053..2901b82e65 100644 --- a/native/shuffle/src/partitioners/empty_schema.rs +++ b/native/shuffle/src/partitioners/empty_schema.rs @@ -17,7 +17,7 @@ use crate::metrics::ShufflePartitionerMetrics; use crate::partitioners::ShufflePartitioner; -use crate::writers::partition_writer::PartitionWriter; +use crate::writers::PartitionWriter; use arrow::array::RecordBatch; use arrow::datatypes::SchemaRef; use std::iter; diff --git a/native/shuffle/src/partitioners/multi_partition.rs b/native/shuffle/src/partitioners/multi_partition.rs index 7f9f236b2b..be2d977e83 100644 --- a/native/shuffle/src/partitioners/multi_partition.rs +++ b/native/shuffle/src/partitioners/multi_partition.rs @@ -18,7 +18,7 @@ use crate::metrics::ShufflePartitionerMetrics; use crate::partitioners::partitioned_batch_iterator::PartitionedBatchesProducer; use crate::partitioners::ShufflePartitioner; -use crate::writers::partition_writer::PartitionWriter; +use crate::writers::PartitionWriter; use crate::{comet_partitioning, CometPartitioning}; use arrow::array::{Array, ArrayData, ArrayRef, RecordBatch}; use datafusion::common::utils::proxy::VecAllocExt; diff --git a/native/shuffle/src/partitioners/single_partition.rs b/native/shuffle/src/partitioners/single_partition.rs index 5c14b1df85..4a2f6f64a9 100644 --- a/native/shuffle/src/partitioners/single_partition.rs +++ b/native/shuffle/src/partitioners/single_partition.rs @@ -17,7 +17,7 @@ use crate::metrics::ShufflePartitionerMetrics; use crate::partitioners::ShufflePartitioner; -use crate::writers::partition_writer::PartitionWriter; +use crate::writers::PartitionWriter; use arrow::array::RecordBatch; use datafusion::common::DataFusionError; use std::iter; diff --git a/native/shuffle/src/shuffle_writer.rs b/native/shuffle/src/shuffle_writer.rs index e631e2073c..de6989e52a 100644 --- a/native/shuffle/src/shuffle_writer.rs +++ b/native/shuffle/src/shuffle_writer.rs @@ -22,7 +22,7 @@ use crate::partitioners::{ EmptySchemaShufflePartitioner, MultiPartitionShuffleRepartitioner, ShufflePartitioner, SinglePartitionShufflePartitioner, }; -use crate::writers::local::local_partition_writer::LocalPartitionWriter; +use crate::writers::LocalPartitionWriter; use crate::{CometPartitioning, CompressionCodec, ShuffleBlockWriter}; use async_trait::async_trait; use datafusion::common::exec_datafusion_err; diff --git a/native/shuffle/src/writers/mod.rs b/native/shuffle/src/writers/mod.rs index 83ad57d3f0..6d330fd12a 100644 --- a/native/shuffle/src/writers/mod.rs +++ b/native/shuffle/src/writers/mod.rs @@ -17,11 +17,12 @@ mod buf_batch_writer; mod checksum; -pub mod local; -pub mod partition_writer; -pub mod rss; +mod local; +mod partition_writer; mod shuffle_block_writer; pub(crate) use buf_batch_writer::BufBatchWriter; pub(crate) use checksum::Checksum; +pub(crate) use local::local_partition_writer::LocalPartitionWriter; +pub(crate) use partition_writer::PartitionWriter; pub use shuffle_block_writer::{CompressionCodec, ShuffleBlockWriter}; diff --git a/native/shuffle/src/writers/rss/mod.rs b/native/shuffle/src/writers/rss/mod.rs deleted file mode 100644 index b248758bc1..0000000000 --- a/native/shuffle/src/writers/rss/mod.rs +++ /dev/null @@ -1,16 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. From 02fae5665bc7942599bece641d7effdd9f2312dd Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Thu, 2 Jul 2026 20:41:04 +0800 Subject: [PATCH 13/14] comments --- .../writers/local/local_partition_writer.rs | 13 +++++++++ native/shuffle/src/writers/local/mod.rs | 2 +- .../shuffle/src/writers/partition_writer.rs | 29 +++++++++++++++++++ 3 files changed, 43 insertions(+), 1 deletion(-) diff --git a/native/shuffle/src/writers/local/local_partition_writer.rs b/native/shuffle/src/writers/local/local_partition_writer.rs index 3bde83d781..83196b3308 100644 --- a/native/shuffle/src/writers/local/local_partition_writer.rs +++ b/native/shuffle/src/writers/local/local_partition_writer.rs @@ -41,7 +41,10 @@ use std::sync::Arc; /// `BufBatchWriter`, so coalescing intentionally does not cross partition /// boundaries. They hold the raw output writer and block writer directly. enum DataOutput { + /// Single-partition output: one long-lived writer streams all batches. Single(BufBatchWriter), + /// Multi-partition output: batches are staged per partition and merged into + /// `output_writer` one partition at a time during `finish_partition`. Multi { output_writer: BufWriter, shuffle_block_writer: ShuffleBlockWriter, @@ -53,13 +56,23 @@ enum DataOutput { }, } +/// Local file-based [`PartitionWriter`] implementation. +/// +/// Writes shuffle output to a single data file plus an index file recording the +/// byte offset where each partition begins. See [`DataOutput`] for how the +/// single- and multi-partition modes differ. pub(crate) struct LocalPartitionWriter { output_index_file: String, data_output: DataOutput, + /// Start offset of each partition in the data file, plus a trailing entry + /// with the total length so partition sizes are simple offset differences. + /// Has `num_output_partitions + 1` elements. offsets: Vec, batch_size: usize, write_buffer_size: usize, num_output_partitions: usize, + /// Id of the last partition passed to `finish_partition`, used to assert + /// partitions are finalized in ascending order. `-1` before any call. last_finish_pid: i32, } diff --git a/native/shuffle/src/writers/local/mod.rs b/native/shuffle/src/writers/local/mod.rs index 7939c26da8..e76bfba21a 100644 --- a/native/shuffle/src/writers/local/mod.rs +++ b/native/shuffle/src/writers/local/mod.rs @@ -15,5 +15,5 @@ // specific language governing permissions and limitations // under the License. -pub mod local_partition_writer; +pub(crate) mod local_partition_writer; mod spill; diff --git a/native/shuffle/src/writers/partition_writer.rs b/native/shuffle/src/writers/partition_writer.rs index 2528d1ab97..25b0e598df 100644 --- a/native/shuffle/src/writers/partition_writer.rs +++ b/native/shuffle/src/writers/partition_writer.rs @@ -18,7 +18,27 @@ use crate::metrics::ShufflePartitionerMetrics; use arrow::record_batch::RecordBatch; +/// Storage backend abstraction for shuffle partition output. +/// +/// Decouples partitioning from storage: partitioners only produce partitioned +/// `RecordBatch` streams, while implementations of this trait own how those +/// batches are stored and finalized. [`LocalPartitionWriter`] implements the +/// local file behavior; other backends (e.g. a remote shuffle writer) can be +/// added without changing the partitioners. +/// +/// A partitioner drives a writer as: any number of +/// [`write`](PartitionWriter::write) calls to stage batches, then one +/// [`finish_partition`](PartitionWriter::finish_partition) per partition in +/// ascending id order, then a single [`finish_all`](PartitionWriter::finish_all). +/// +/// [`LocalPartitionWriter`]: crate::writers::local::local_partition_writer::LocalPartitionWriter pub(crate) trait PartitionWriter: Send + Sync { + /// Stages the batches from `iter` for partition `pid` without finalizing it. + /// + /// Used to stream single-partition output and to stage multi-partition + /// spilled batches. A partition may be written multiple times and in any + /// order; staged data is only guaranteed visible after + /// [`finish_partition`](PartitionWriter::finish_partition). fn write( &mut self, pid: usize, @@ -28,6 +48,12 @@ pub(crate) trait PartitionWriter: Send + Sync { where I: Iterator>; + /// Finalizes partition `pid`, writing any remaining batches from `iter` and + /// combining them with data previously staged via + /// [`write`](PartitionWriter::write). + /// + /// Must be called exactly once per partition, in ascending id order, so the + /// writer can lay partitions out contiguously and record their offsets. fn finish_partition( &mut self, pid: usize, @@ -37,6 +63,9 @@ pub(crate) trait PartitionWriter: Send + Sync { where I: Iterator>; + /// Completes the shuffle write, flushing output and emitting the partition + /// index. Called exactly once, after the last + /// [`finish_partition`](PartitionWriter::finish_partition). fn finish_all(&mut self, metrics: &ShufflePartitionerMetrics) -> datafusion::common::Result<()>; } From 05ccd367b956cf41f09c401db8e367e46d3bb065 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Thu, 2 Jul 2026 20:44:33 +0800 Subject: [PATCH 14/14] address copilot comment --- .../src/writers/local/local_partition_writer.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/native/shuffle/src/writers/local/local_partition_writer.rs b/native/shuffle/src/writers/local/local_partition_writer.rs index 83196b3308..3a9a6484db 100644 --- a/native/shuffle/src/writers/local/local_partition_writer.rs +++ b/native/shuffle/src/writers/local/local_partition_writer.rs @@ -191,11 +191,11 @@ impl PartitionWriter for LocalPartitionWriter { where I: Iterator>, { - assert_eq!( - pid as i32 - self.last_finish_pid, - 1, - "LocalPartitionWriter::finish_partition must be called in order." - ); + if pid as i32 - self.last_finish_pid != 1 { + return Err(DataFusionError::Execution( + "LocalPartitionWriter::finish_partition must be called in order.".to_string(), + )); + } self.last_finish_pid = pid as i32; let write_buffer_size = self.write_buffer_size;