diff --git a/native/shuffle/src/partitioners/empty_schema.rs b/native/shuffle/src/partitioners/empty_schema.rs index 45decfec05..2901b82e65 100644 --- a/native/shuffle/src/partitioners/empty_schema.rs +++ b/native/shuffle/src/partitioners/empty_schema.rs @@ -17,47 +17,38 @@ use crate::metrics::ShufflePartitionerMetrics; use crate::partitioners::ShufflePartitioner; -use crate::ShuffleBlockWriter; +use crate::writers::PartitionWriter; use arrow::array::RecordBatch; use arrow::datatypes::SchemaRef; -use datafusion::common::DataFusionError; -use std::fs::OpenOptions; -use std::io::{BufWriter, Seek, Write}; +use std::iter; use tokio::time::Instant; /// A partitioner for zero-column schemas (e.g. queries where ColumnPruning removes all columns). /// This handles shuffles for operations like COUNT(*) that produce empty-schema record batches /// but contain a valid row count. Accumulates the total row count and writes a single /// zero-column IPC batch to partition 0. All other partitions get empty entries in the index file. -pub(crate) struct EmptySchemaShufflePartitioner { - output_data_file: String, - output_index_file: String, +pub(crate) struct EmptySchemaShufflePartitioner { + partition_writer: T, schema: SchemaRef, - shuffle_block_writer: ShuffleBlockWriter, num_output_partitions: usize, total_rows: usize, metrics: ShufflePartitionerMetrics, } -impl EmptySchemaShufflePartitioner { +impl EmptySchemaShufflePartitioner { pub(crate) fn try_new( - output_data_file: String, - output_index_file: String, + partition_writer: T, schema: SchemaRef, num_output_partitions: usize, metrics: ShufflePartitionerMetrics, - codec: crate::CompressionCodec, ) -> datafusion::common::Result { debug_assert!( schema.fields().is_empty(), "EmptySchemaShufflePartitioner requires a zero-column schema" ); - let shuffle_block_writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec)?; Ok(Self { - output_data_file, - output_index_file, + partition_writer, schema, - shuffle_block_writer, num_output_partitions, total_rows: 0, metrics, @@ -66,7 +57,7 @@ impl EmptySchemaShufflePartitioner { } #[async_trait::async_trait] -impl ShufflePartitioner for EmptySchemaShufflePartitioner { +impl ShufflePartitioner for EmptySchemaShufflePartitioner { async fn insert_batch(&mut self, batch: RecordBatch) -> datafusion::common::Result<()> { let start_time = Instant::now(); let num_rows = batch.num_rows(); @@ -85,46 +76,24 @@ impl ShufflePartitioner for EmptySchemaShufflePartitioner { fn shuffle_write(&mut self) -> datafusion::common::Result<()> { let start_time = Instant::now(); - let output_data = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(&self.output_data_file) - .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; - let mut output_data = BufWriter::new(output_data); - // Write a single zero-column batch with the accumulated row count to partition 0 - if self.total_rows > 0 { - let batch = RecordBatch::try_new_with_options( + let batch_opt = if self.total_rows > 0 { + Some(Ok(RecordBatch::try_new_with_options( self.schema.clone(), vec![], &arrow::array::RecordBatchOptions::new().with_row_count(Some(self.total_rows)), - )?; - self.shuffle_block_writer.write_batch( - &batch, - &mut output_data, - &self.metrics.encode_time, - )?; - } - - let mut write_timer = self.metrics.write_time.timer(); - output_data.flush()?; - let data_file_length = output_data.stream_position()?; + )?)) + } else { + None + }; - // Write index file: partition 0 spans [0, data_file_length), all others are empty - let index_file = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(&self.output_index_file) - .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; - let mut index_writer = BufWriter::new(index_file); - index_writer.write_all(&0i64.to_le_bytes())?; - for _ in 0..self.num_output_partitions { - index_writer.write_all(&(data_file_length as i64).to_le_bytes())?; + self.partition_writer + .finish_partition(0, &mut batch_opt.into_iter(), &self.metrics)?; + for pid in 1..self.num_output_partitions { + self.partition_writer + .finish_partition(pid, &mut iter::empty(), &self.metrics)?; } - index_writer.flush()?; - write_timer.stop(); + self.partition_writer.finish_all(&self.metrics)?; self.metrics .baseline diff --git a/native/shuffle/src/partitioners/mod.rs b/native/shuffle/src/partitioners/mod.rs index a0bc652b4b..8f4239a820 100644 --- a/native/shuffle/src/partitioners/mod.rs +++ b/native/shuffle/src/partitioners/mod.rs @@ -23,6 +23,5 @@ mod traits; pub(crate) use empty_schema::EmptySchemaShufflePartitioner; pub(crate) use multi_partition::MultiPartitionShuffleRepartitioner; -pub(crate) use partitioned_batch_iterator::PartitionedBatchIterator; pub(crate) use single_partition::SinglePartitionShufflePartitioner; pub(crate) use traits::ShufflePartitioner; diff --git a/native/shuffle/src/partitioners/multi_partition.rs b/native/shuffle/src/partitioners/multi_partition.rs index efdbb970aa..be2d977e83 100644 --- a/native/shuffle/src/partitioners/multi_partition.rs +++ b/native/shuffle/src/partitioners/multi_partition.rs @@ -16,26 +16,20 @@ // under the License. use crate::metrics::ShufflePartitionerMetrics; -use crate::partitioners::partitioned_batch_iterator::{ - PartitionedBatchIterator, PartitionedBatchesProducer, -}; +use crate::partitioners::partitioned_batch_iterator::PartitionedBatchesProducer; use crate::partitioners::ShufflePartitioner; -use crate::writers::{BufBatchWriter, PartitionWriter}; -use crate::{comet_partitioning, CometPartitioning, CompressionCodec, ShuffleBlockWriter}; +use crate::writers::PartitionWriter; +use crate::{comet_partitioning, CometPartitioning}; use arrow::array::{Array, ArrayData, ArrayRef, RecordBatch}; -use arrow::datatypes::SchemaRef; use datafusion::common::utils::proxy::VecAllocExt; use datafusion::common::{DataFusionError, HashSet}; use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion::execution::runtime_env::RuntimeEnv; -use datafusion::physical_plan::metrics::Time; use datafusion_comet_common::tracing::{with_trace, with_trace_async}; use datafusion_comet_spark_expr::murmur3::create_murmur3_hashes; use itertools::Itertools; use std::fmt; use std::fmt::{Debug, Formatter}; -use std::fs::{File, OpenOptions}; -use std::io::{BufWriter, Seek, Write}; use std::sync::Arc; use tokio::time::Instant; @@ -105,16 +99,12 @@ impl ScratchSpace { } /// A partitioner that uses a hash function to partition data into multiple partitions -pub(crate) struct MultiPartitionShuffleRepartitioner { - output_data_file: String, - output_index_file: String, +pub(crate) struct MultiPartitionShuffleRepartitioner { buffered_batches: Vec, partition_indices: Vec>, - partition_writers: Vec, - shuffle_block_writer: ShuffleBlockWriter, + partition_writer: T, /// Partitioning scheme to use partitioning: CometPartitioning, - runtime: Arc, metrics: ShufflePartitionerMetrics, /// Reused scratch space for computing partition indices scratch: ScratchSpace, @@ -123,8 +113,6 @@ pub(crate) struct MultiPartitionShuffleRepartitioner { /// Reservation for repartitioning reservation: MemoryReservation, tracing_enabled: bool, - /// Size of the write buffer in bytes - write_buffer_size: usize, /// Start addresses (as `usize`, since raw pointers are not `Send`) of the backing buffers /// currently pinned by `buffered_batches`, so the spill reservation charges each distinct /// allocation once rather than once per slice that references it. Cleared whenever the @@ -176,20 +164,16 @@ fn count_new_buffers(batch: &RecordBatch, seen: &mut HashSet) -> usize { total } -impl MultiPartitionShuffleRepartitioner { +impl MultiPartitionShuffleRepartitioner { #[allow(clippy::too_many_arguments)] pub(crate) fn try_new( partition: usize, - output_data_file: String, - output_index_file: String, - schema: SchemaRef, + partition_writer: T, partitioning: CometPartitioning, metrics: ShufflePartitionerMetrics, runtime: Arc, batch_size: usize, - codec: CompressionCodec, tracing_enabled: bool, - write_buffer_size: usize, ) -> datafusion::common::Result { let num_output_partitions = partitioning.partition_count(); assert_ne!( @@ -214,31 +198,20 @@ impl MultiPartitionShuffleRepartitioner { partition_starts: vec![0; num_output_partitions + 1], }; - let shuffle_block_writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?; - - let partition_writers = (0..num_output_partitions) - .map(|_| PartitionWriter::try_new(shuffle_block_writer.clone())) - .collect::>>()?; - let reservation = MemoryConsumer::new(format!("ShuffleRepartitioner[{partition}]")) .with_can_spill(true) .register(&runtime.memory_pool); Ok(Self { - output_data_file, - output_index_file, buffered_batches: vec![], partition_indices: vec![vec![]; num_output_partitions], - partition_writers, - shuffle_block_writer, + partition_writer, partitioning, - runtime, metrics, scratch, batch_size, reservation, tracing_enabled, - write_buffer_size, pinned_buffers: HashSet::new(), }) } @@ -485,31 +458,6 @@ impl MultiPartitionShuffleRepartitioner { Ok(()) } - #[allow(clippy::too_many_arguments)] - fn shuffle_write_partition( - partition_iter: &mut PartitionedBatchIterator, - shuffle_block_writer: &mut ShuffleBlockWriter, - output_data: &mut BufWriter, - interleave_time: &Time, - encode_time: &Time, - write_time: &Time, - write_buffer_size: usize, - batch_size: usize, - ) -> datafusion::common::Result<()> { - let mut buf_batch_writer = BufBatchWriter::new( - shuffle_block_writer, - output_data, - write_buffer_size, - batch_size, - ); - while let Some(batch) = partition_iter.next(interleave_time) { - let batch = batch?; - buf_batch_writer.write(&batch, encode_time, write_time)?; - } - buf_batch_writer.flush(encode_time, write_time)?; - Ok(()) - } - fn used(&self) -> usize { self.reservation.size() } @@ -553,38 +501,32 @@ impl MultiPartitionShuffleRepartitioner { } with_trace("shuffle_spill", self.tracing_enabled, || { - let num_output_partitions = self.partition_writers.len(); + let num_output_partitions = self.partition_indices.len(); let mut partitioned_batches = self.partitioned_batches(); - let mut spilled_bytes = 0; for partition_id in 0..num_output_partitions { - let partition_writer = &mut self.partition_writers[partition_id]; - let mut iter = partitioned_batches.produce(partition_id); - spilled_bytes += partition_writer.spill( - &mut iter, - &self.runtime, + self.partition_writer.write( + partition_id, + &mut partitioned_batches.produce(partition_id, &self.metrics.interleave_time), &self.metrics, - self.write_buffer_size, - self.batch_size, )?; } self.reservation.free(); self.pinned_buffers.clear(); self.metrics.spill_count.add(1); - self.metrics.spilled_bytes.add(spilled_bytes); Ok(()) }) } #[cfg(test)] - pub(crate) fn partition_writers(&self) -> &[PartitionWriter] { - &self.partition_writers + pub(crate) fn partition_writer(&self) -> &T { + &self.partition_writer } } #[async_trait::async_trait] -impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { +impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { /// Shuffles rows in input batch into corresponding partition buffer. /// This function will slice input batch according to configured batch size and then /// shuffle rows into corresponding partition buffer. @@ -616,78 +558,28 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { let mut partitioned_batches = self.partitioned_batches(); self.pinned_buffers.clear(); let num_output_partitions = self.partition_indices.len(); - let mut offsets = vec![0; num_output_partitions + 1]; - - let data_file = self.output_data_file.clone(); - let index_file = self.output_index_file.clone(); - - let output_data = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(data_file) - .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; - - let mut output_data = BufWriter::with_capacity(self.write_buffer_size, output_data); #[allow(clippy::needless_range_loop)] for i in 0..num_output_partitions { - offsets[i] = output_data.stream_position()?; - - // if we wrote a spill file for this partition then copy the - // contents into the shuffle file - if let Some(spill_path) = self.partition_writers[i].path() { - // Use raw File handle (not BufReader) so that std::io::copy - // can use copy_file_range/sendfile for zero-copy on Linux. - let mut spill_file = File::open(spill_path)?; - let mut write_timer = self.metrics.write_time.timer(); - std::io::copy(&mut spill_file, &mut output_data)?; - write_timer.stop(); - } - - // Write in memory batches to output data file - let mut partition_iter = partitioned_batches.produce(i); - Self::shuffle_write_partition( - &mut partition_iter, - &mut self.shuffle_block_writer, - &mut output_data, - &self.metrics.interleave_time, - &self.metrics.encode_time, - &self.metrics.write_time, - self.write_buffer_size, - self.batch_size, + self.partition_writer.finish_partition( + i, + &mut partitioned_batches.produce(i, &self.metrics.interleave_time), + &self.metrics, )?; } - let mut write_timer = self.metrics.write_time.timer(); - output_data.flush()?; - write_timer.stop(); - - // add one extra offset at last to ease partition length computation - offsets[num_output_partitions] = output_data.stream_position()?; - - let mut write_timer = self.metrics.write_time.timer(); - let mut output_index = - BufWriter::new(File::create(index_file).map_err(|e| { - DataFusionError::Execution(format!("shuffle write error: {e:?}")) - })?); - for offset in offsets { - output_index.write_all(&(offset as i64).to_le_bytes()[..])?; - } - output_index.flush()?; - write_timer.stop(); + self.partition_writer.finish_all(&self.metrics)?; self.metrics .baseline .elapsed_compute() .add_duration(start_time.elapsed()); - Ok(()) }) } } -impl Debug for MultiPartitionShuffleRepartitioner { +impl Debug for MultiPartitionShuffleRepartitioner { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("ShuffleRepartitioner") .field("memory_used", &self.used()) diff --git a/native/shuffle/src/partitioners/partitioned_batch_iterator.rs b/native/shuffle/src/partitioners/partitioned_batch_iterator.rs index c7f1781866..f124d98ff2 100644 --- a/native/shuffle/src/partitioners/partitioned_batch_iterator.rs +++ b/native/shuffle/src/partitioners/partitioned_batch_iterator.rs @@ -42,11 +42,16 @@ impl PartitionedBatchesProducer { } } - pub(super) fn produce(&mut self, partition_id: usize) -> PartitionedBatchIterator<'_> { + pub(super) fn produce<'a>( + &'a mut self, + partition_id: usize, + interleave_time: &'a Time, + ) -> PartitionedBatchIterator<'a> { PartitionedBatchIterator::new( &self.partition_indices[partition_id], &self.buffered_batches, self.batch_size, + interleave_time, ) } } @@ -57,6 +62,7 @@ pub(crate) struct PartitionedBatchIterator<'a> { batch_size: usize, indices: Vec<(usize, usize)>, pos: usize, + interleave_time: &'a Time, } impl<'a> PartitionedBatchIterator<'a> { @@ -64,6 +70,7 @@ impl<'a> PartitionedBatchIterator<'a> { indices: &'a [(u32, u32)], buffered_batches: &'a [RecordBatch], batch_size: usize, + interleave_time: &'a Time, ) -> Self { if indices.is_empty() { // Avoid unnecessary allocations when the partition is empty @@ -72,6 +79,7 @@ impl<'a> PartitionedBatchIterator<'a> { batch_size, indices: vec![], pos: 0, + interleave_time, }; } let record_batches = buffered_batches.iter().collect::>(); @@ -84,21 +92,22 @@ impl<'a> PartitionedBatchIterator<'a> { batch_size, indices: current_indices, pos: 0, + interleave_time, } } +} + +impl Iterator for PartitionedBatchIterator<'_> { + type Item = datafusion::common::Result; - /// Returns the next shuffled batch, recording the gather cost into `interleave_time`. - pub(crate) fn next( - &mut self, - interleave_time: &Time, - ) -> Option> { + fn next(&mut self) -> Option { if self.pos >= self.indices.len() { return None; } let indices_end = std::cmp::min(self.pos + self.batch_size, self.indices.len()); let indices = &self.indices[self.pos..indices_end]; - let mut timer = interleave_time.timer(); + let mut timer = self.interleave_time.timer(); let result = interleave_record_batch(&self.record_batches, indices); timer.stop(); match result { diff --git a/native/shuffle/src/partitioners/single_partition.rs b/native/shuffle/src/partitioners/single_partition.rs index 5801ef613b..4a2f6f64a9 100644 --- a/native/shuffle/src/partitioners/single_partition.rs +++ b/native/shuffle/src/partitioners/single_partition.rs @@ -17,20 +17,15 @@ use crate::metrics::ShufflePartitionerMetrics; use crate::partitioners::ShufflePartitioner; -use crate::writers::BufBatchWriter; -use crate::{CompressionCodec, ShuffleBlockWriter}; +use crate::writers::PartitionWriter; use arrow::array::RecordBatch; -use arrow::datatypes::SchemaRef; use datafusion::common::DataFusionError; -use std::fs::{File, OpenOptions}; -use std::io::{BufWriter, Write}; +use std::iter; use tokio::time::Instant; /// A partitioner that writes all shuffle data to a single file and a single index file -pub(crate) struct SinglePartitionShufflePartitioner { - // output_data_file: File, - output_data_writer: BufBatchWriter, - output_index_path: String, +pub(crate) struct SinglePartitionShufflePartitioner { + partition_writer: T, /// Batches that are smaller than the batch size and to be concatenated buffered_batches: Vec, /// Number of rows in the concatenating batches @@ -41,34 +36,14 @@ pub(crate) struct SinglePartitionShufflePartitioner { batch_size: usize, } -impl SinglePartitionShufflePartitioner { +impl SinglePartitionShufflePartitioner { pub(crate) fn try_new( - output_data_path: String, - output_index_path: String, - schema: SchemaRef, + partition_writer: T, metrics: ShufflePartitionerMetrics, batch_size: usize, - codec: CompressionCodec, - write_buffer_size: usize, ) -> datafusion::common::Result { - let shuffle_block_writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?; - - let output_data_file = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(output_data_path)?; - - let output_data_writer = BufBatchWriter::new( - shuffle_block_writer, - output_data_file, - write_buffer_size, - batch_size, - ); - Ok(Self { - output_data_writer, - output_index_path, + partition_writer, buffered_batches: vec![], num_buffered_rows: 0, metrics, @@ -109,7 +84,7 @@ impl SinglePartitionShufflePartitioner { } #[async_trait::async_trait] -impl ShufflePartitioner for SinglePartitionShufflePartitioner { +impl ShufflePartitioner for SinglePartitionShufflePartitioner { async fn insert_batch(&mut self, batch: RecordBatch) -> datafusion::common::Result<()> { let start_time = Instant::now(); let num_rows = batch.num_rows(); @@ -123,20 +98,14 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner { // Write the concatenated buffered batch if let Some(batch) = concatenated_batch { - self.output_data_writer.write( - &batch, - &self.metrics.encode_time, - &self.metrics.write_time, - )?; + self.partition_writer + .write(0, &mut iter::once(Ok(batch)), &self.metrics)?; } if num_rows >= self.batch_size { // Write the new batch - self.output_data_writer.write( - &batch, - &self.metrics.encode_time, - &self.metrics.write_time, - )?; + self.partition_writer + .write(0, &mut iter::once(Ok(batch)), &self.metrics)?; } else { // Add the new batch to the buffer self.add_buffered_batch(batch); @@ -160,28 +129,14 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner { // Write the concatenated buffered batch if let Some(batch) = concatenated_batch { - self.output_data_writer.write( - &batch, - &self.metrics.encode_time, - &self.metrics.write_time, - )?; - } - self.output_data_writer - .flush(&self.metrics.encode_time, &self.metrics.write_time)?; - - // Write index file. It should only contain 2 entries: 0 and the total number of bytes written - let index_file = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(self.output_index_path.clone()) - .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; - let mut index_buf_writer = BufWriter::new(index_file); - let data_file_length = self.output_data_writer.writer_stream_position()?; - for offset in [0, data_file_length] { - index_buf_writer.write_all(&(offset as i64).to_le_bytes()[..])?; + self.partition_writer + .write(0, &mut iter::once(Ok(batch)), &self.metrics)?; } - index_buf_writer.flush()?; + + self.partition_writer + .finish_partition(0, &mut iter::empty(), &self.metrics)?; + + self.partition_writer.finish_all(&self.metrics)?; self.metrics .baseline diff --git a/native/shuffle/src/shuffle_writer.rs b/native/shuffle/src/shuffle_writer.rs index 756e753b3c..de6989e52a 100644 --- a/native/shuffle/src/shuffle_writer.rs +++ b/native/shuffle/src/shuffle_writer.rs @@ -22,7 +22,8 @@ use crate::partitioners::{ EmptySchemaShufflePartitioner, MultiPartitionShuffleRepartitioner, ShufflePartitioner, SinglePartitionShufflePartitioner, }; -use crate::{CometPartitioning, CompressionCodec}; +use crate::writers::LocalPartitionWriter; +use crate::{CometPartitioning, CompressionCodec, ShuffleBlockWriter}; use async_trait::async_trait; use datafusion::common::exec_datafusion_err; use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; @@ -202,39 +203,40 @@ async fn external_shuffle( ) -> Result { let schema = input.schema(); + let shuffle_block_writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?; + let local_partition_writer = LocalPartitionWriter::try_new( + output_data_file, + output_index_file, + shuffle_block_writer, + partitioning.partition_count(), + context.session_config().batch_size(), + write_buffer_size, + context.runtime_env(), + )?; + let mut repartitioner: Box = match &partitioning { _ if schema.fields().is_empty() => { log::debug!("found empty schema, overriding {partitioning:?} partitioning with EmptySchemaShufflePartitioner"); Box::new(EmptySchemaShufflePartitioner::try_new( - output_data_file, - output_index_file, + local_partition_writer, Arc::clone(&schema), partitioning.partition_count(), metrics, - codec, )?) } any if any.partition_count() == 1 => Box::new(SinglePartitionShufflePartitioner::try_new( - output_data_file, - output_index_file, - Arc::clone(&schema), + local_partition_writer, metrics, context.session_config().batch_size(), - codec, - write_buffer_size, )?), _ => Box::new(MultiPartitionShuffleRepartitioner::try_new( partition, - output_data_file, - output_index_file, - Arc::clone(&schema), + local_partition_writer, partitioning, metrics, context.runtime_env(), context.session_config().batch_size(), - codec, tracing_enabled, - write_buffer_size, )?), }; @@ -346,38 +348,47 @@ mod test { let num_partitions = 2; let runtime_env = create_runtime(memory_limit); let metrics_set = ExecutionPlanMetricsSet::new(); - let mut repartitioner = MultiPartitionShuffleRepartitioner::try_new( - 0, + let shuffle_block_writer = + ShuffleBlockWriter::try_new(batch.schema().as_ref(), CompressionCodec::Lz4Frame) + .unwrap(); + let local_partition_writer = LocalPartitionWriter::try_new( "/tmp/data.out".to_string(), "/tmp/index.out".to_string(), - batch.schema(), + shuffle_block_writer, + num_partitions, + 1024, + 1024 * 1024, // write_buffer_size: 1MB default + Arc::clone(&runtime_env), + ) + .unwrap(); + let mut repartitioner = MultiPartitionShuffleRepartitioner::try_new( + 0, + local_partition_writer, CometPartitioning::Hash(vec![Arc::new(Column::new("a", 0))], num_partitions), ShufflePartitionerMetrics::new(&metrics_set, 0), runtime_env, 1024, - CompressionCodec::Lz4Frame, false, - 1024 * 1024, // write_buffer_size: 1MB default ) .unwrap(); repartitioner.insert_batch(batch.clone()).await.unwrap(); { - let partition_writers = repartitioner.partition_writers(); - assert_eq!(partition_writers.len(), 2); + let spill_writers = repartitioner.partition_writer().get_spill_writers(); + assert_eq!(spill_writers.len(), 2); - assert!(!partition_writers[0].has_spill_file()); - assert!(!partition_writers[1].has_spill_file()); + assert!(!spill_writers[0].has_spill_file()); + assert!(!spill_writers[1].has_spill_file()); } repartitioner.spill().unwrap(); // after spill, there should be spill files { - let partition_writers = repartitioner.partition_writers(); - assert!(partition_writers[0].has_spill_file()); - assert!(partition_writers[1].has_spill_file()); + let spill_writers = repartitioner.partition_writer().get_spill_writers(); + assert!(spill_writers[0].has_spill_file()); + assert!(spill_writers[1].has_spill_file()); } // insert another batch after spilling @@ -409,18 +420,26 @@ mod test { let data_size = metrics.data_size.clone(); let spill_count = metrics.spill_count.clone(); let dir = tempfile::tempdir().unwrap(); - let mut repartitioner = MultiPartitionShuffleRepartitioner::try_new( - 0, + let shuffle_block_writer = + ShuffleBlockWriter::try_new(schema.as_ref(), CompressionCodec::Lz4Frame).unwrap(); + let local_partition_writer = LocalPartitionWriter::try_new( dir.path().join("data.out").to_str().unwrap().to_string(), dir.path().join("index.out").to_str().unwrap().to_string(), - backing.schema(), + shuffle_block_writer, + num_partitions, + 1024, + 1024 * 1024, // write_buffer_size: 1MB default + Arc::clone(&runtime_env), + ) + .unwrap(); + let mut repartitioner = MultiPartitionShuffleRepartitioner::try_new( + 0, + local_partition_writer, CometPartitioning::Hash(vec![Arc::new(Column::new("a", 0))], num_partitions), metrics, runtime_env, batch_size, - CompressionCodec::Lz4Frame, false, - 1024 * 1024, ) .unwrap(); diff --git a/native/shuffle/src/writers/local/local_partition_writer.rs b/native/shuffle/src/writers/local/local_partition_writer.rs new file mode 100644 index 0000000000..3a9a6484db --- /dev/null +++ b/native/shuffle/src/writers/local/local_partition_writer.rs @@ -0,0 +1,296 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metrics::ShufflePartitionerMetrics; +use crate::writers::local::spill::SpillWriter; +use crate::writers::partition_writer::PartitionWriter; +use crate::writers::BufBatchWriter; +use crate::ShuffleBlockWriter; +use arrow::array::RecordBatch; +use datafusion::common::DataFusionError; +use datafusion::execution::runtime_env::RuntimeEnv; +use std::fs::{File, OpenOptions}; +use std::io::{BufWriter, Seek, Write}; +use std::sync::Arc; + +/// Output target for the shuffle data file. +/// +/// The two shuffle modes drive the writer differently: +/// +/// * Single-partition shuffles stream every batch through a single long-lived +/// [`BufBatchWriter`]. Keeping it alive across `write` calls preserves +/// cross-batch coalescing in the internal `BatchCoalescer` and limits +/// flushing (which also finalizes partially coalesced batches) to +/// [`PartitionWriter::finish_all`]. +/// * Multi-partition shuffles finalize one partition at a time in +/// [`PartitionWriter::finish_partition`], each with its own short-lived +/// `BufBatchWriter`, so coalescing intentionally does not cross partition +/// boundaries. They hold the raw output writer and block writer directly. +enum DataOutput { + /// Single-partition output: one long-lived writer streams all batches. + Single(BufBatchWriter), + /// Multi-partition output: batches are staged per partition and merged into + /// `output_writer` one partition at a time during `finish_partition`. + Multi { + output_writer: BufWriter, + shuffle_block_writer: ShuffleBlockWriter, + /// One spill file per output partition, buffered until `finish_partition` + /// merges them into the shuffle output. + spill_writers: Vec, + /// Runtime used to allocate the temporary spill files. + runtime: Arc, + }, +} + +/// Local file-based [`PartitionWriter`] implementation. +/// +/// Writes shuffle output to a single data file plus an index file recording the +/// byte offset where each partition begins. See [`DataOutput`] for how the +/// single- and multi-partition modes differ. +pub(crate) struct LocalPartitionWriter { + output_index_file: String, + data_output: DataOutput, + /// Start offset of each partition in the data file, plus a trailing entry + /// with the total length so partition sizes are simple offset differences. + /// Has `num_output_partitions + 1` elements. + offsets: Vec, + batch_size: usize, + write_buffer_size: usize, + num_output_partitions: usize, + /// Id of the last partition passed to `finish_partition`, used to assert + /// partitions are finalized in ascending order. `-1` before any call. + last_finish_pid: i32, +} + +impl LocalPartitionWriter { + pub(crate) fn try_new( + output_data_file: String, + output_index_file: String, + shuffle_block_writer: ShuffleBlockWriter, + num_output_partitions: usize, + batch_size: usize, + write_buffer_size: usize, + runtime: Arc, + ) -> datafusion::common::Result { + let output_file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(output_data_file.clone()) + .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; + + let data_output = if num_output_partitions == 1 { + DataOutput::Single(BufBatchWriter::new( + shuffle_block_writer, + output_file, + write_buffer_size, + batch_size, + )) + } else { + let output_writer = BufWriter::with_capacity(write_buffer_size, output_file); + let spill_writers = (0..num_output_partitions) + .map(|_| { + SpillWriter::try_new( + shuffle_block_writer.clone(), + write_buffer_size, + batch_size, + ) + }) + .collect::>>()?; + DataOutput::Multi { + output_writer, + shuffle_block_writer, + spill_writers, + runtime, + } + }; + Ok(Self { + output_index_file, + data_output, + offsets: vec![0u64; num_output_partitions + 1], + batch_size, + write_buffer_size, + num_output_partitions, + last_finish_pid: -1, + }) + } + + #[cfg(test)] + pub(crate) fn get_spill_writers(&self) -> &Vec { + match &self.data_output { + DataOutput::Multi { spill_writers, .. } => spill_writers, + DataOutput::Single(_) => panic!("single-partition output has no spill writers"), + } + } +} + +impl PartitionWriter for LocalPartitionWriter { + fn write( + &mut self, + pid: usize, + iter: &mut I, + metrics: &ShufflePartitionerMetrics, + ) -> datafusion::common::Result<()> + where + I: Iterator>, + { + match &mut self.data_output { + DataOutput::Single(writer) => { + if pid != 0 { + return Err(DataFusionError::Execution( + "LocalPartitionWriter single-partition output only supports partition 0." + .to_string(), + )); + } + + // Stream batches through the long-lived writer so small batches keep + // coalescing across calls. Do not flush here: flushing also finalizes any + // partially coalesced batch, which would defeat cross-call coalescing and + // increase flush frequency. The single-partition writer is flushed once, in + // `finish_all`. + for batch in iter.by_ref() { + let batch = batch?; + writer.write(&batch, &metrics.encode_time, &metrics.write_time)?; + } + } + DataOutput::Multi { + spill_writers, + runtime, + .. + } => { + // Multi-partition output buffers each partition's batches into its own + // spill file. `finish_partition` later merges the spill files (and any + // remaining in-memory batches) into the shuffle output in partition order. + spill_writers[pid].write(iter, runtime, metrics)?; + } + } + + Ok(()) + } + + fn finish_partition( + &mut self, + pid: usize, + iter: &mut I, + metrics: &ShufflePartitionerMetrics, + ) -> datafusion::common::Result<()> + where + I: Iterator>, + { + if pid as i32 - self.last_finish_pid != 1 { + return Err(DataFusionError::Execution( + "LocalPartitionWriter::finish_partition must be called in order.".to_string(), + )); + } + self.last_finish_pid = pid as i32; + + let write_buffer_size = self.write_buffer_size; + let batch_size = self.batch_size; + + match &mut self.data_output { + DataOutput::Single(writer) => { + // Single-partition data was already streamed via `write`, starting at + // offset 0 (already recorded in `self.offsets[0]`). Stream any trailing + // batches (normally none) without flushing; the long-lived writer is + // flushed once in `finish_all`. + for batch in iter.by_ref() { + let batch = batch?; + writer.write(&batch, &metrics.encode_time, &metrics.write_time)?; + } + } + DataOutput::Multi { + output_writer, + shuffle_block_writer, + spill_writers, + .. + } => { + self.offsets[pid] = output_writer.stream_position()?; + + // if we wrote a spill file for this partition then copy the + // contents into the shuffle file + if let Some(writer) = spill_writers.get(pid) { + if let Some(spill_path) = writer.path() { + // Use raw File handle (not BufReader) so that std::io::copy + // can use copy_file_range/sendfile for zero-copy on Linux. + let mut spill_file = File::open(spill_path)?; + let mut write_timer = metrics.write_time.timer(); + std::io::copy(&mut spill_file, output_writer)?; + write_timer.stop(); + } + } + + // Write in memory batches to output data file. Each partition uses its + // own writer so coalescing does not cross partition boundaries. + let mut buf_batch_writer = BufBatchWriter::new( + shuffle_block_writer, + output_writer, + write_buffer_size, + batch_size, + ); + for batch in iter.by_ref() { + let batch = batch?; + buf_batch_writer.write(&batch, &metrics.encode_time, &metrics.write_time)?; + } + buf_batch_writer.flush(&metrics.encode_time, &metrics.write_time)?; + } + } + Ok(()) + } + + fn finish_all( + &mut self, + metrics: &ShufflePartitionerMetrics, + ) -> datafusion::common::Result<()> { + // Flush the data output and capture the final position. For the + // single-partition writer this also finalizes the last coalesced batch. + let final_offset = match &mut self.data_output { + DataOutput::Single(writer) => { + writer.flush(&metrics.encode_time, &metrics.write_time)?; + writer.writer_stream_position()? + } + DataOutput::Multi { output_writer, .. } => { + let mut write_timer = metrics.write_time.timer(); + output_writer.flush()?; + let pos = output_writer.stream_position()?; + write_timer.stop(); + pos + } + }; + + // add one extra offset at last to ease partition length computation + self.offsets[self.num_output_partitions] = final_offset; + + let mut write_timer = metrics.write_time.timer(); + let mut output_index = BufWriter::new( + File::create(self.output_index_file.clone()) + .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?, + ); + + for offset in &self.offsets { + let offset_i64 = i64::try_from(*offset).map_err(|_| { + DataFusionError::Execution(format!( + "shuffle write error: offset overflow ({offset})" + )) + })?; + output_index.write_all(&offset_i64.to_le_bytes())?; + } + output_index.flush()?; + write_timer.stop(); + + Ok(()) + } +} diff --git a/native/shuffle/src/writers/local/mod.rs b/native/shuffle/src/writers/local/mod.rs new file mode 100644 index 0000000000..e76bfba21a --- /dev/null +++ b/native/shuffle/src/writers/local/mod.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub(crate) mod local_partition_writer; +mod spill; diff --git a/native/shuffle/src/writers/spill.rs b/native/shuffle/src/writers/local/spill.rs similarity index 77% rename from native/shuffle/src/writers/spill.rs rename to native/shuffle/src/writers/local/spill.rs index 624a45befe..450b617421 100644 --- a/native/shuffle/src/writers/spill.rs +++ b/native/shuffle/src/writers/local/spill.rs @@ -15,87 +15,60 @@ // specific language governing permissions and limitations // under the License. -use super::ShuffleBlockWriter; use crate::metrics::ShufflePartitionerMetrics; -use crate::partitioners::PartitionedBatchIterator; -use crate::writers::buf_batch_writer::BufBatchWriter; +use crate::writers::BufBatchWriter; +use crate::ShuffleBlockWriter; +use arrow::record_batch::RecordBatch; use datafusion::common::DataFusionError; use datafusion::execution::disk_manager::RefCountedTempFile; use datafusion::execution::runtime_env::RuntimeEnv; use std::fs::{File, OpenOptions}; -/// A temporary disk file for spilling a partition's intermediate shuffle data. struct SpillFile { temp_file: RefCountedTempFile, file: File, } -/// Manages encoding and optional disk spilling for a single shuffle partition. -pub(crate) struct PartitionWriter { - /// Spill file for intermediate shuffle output for this partition. Each spill event - /// will append to this file and the contents will be copied to the shuffle file at - /// the end of processing. - spill_file: Option, - /// Writer that performs encoding and compression +pub(crate) struct SpillWriter { shuffle_block_writer: ShuffleBlockWriter, + write_buffer_size: usize, + batch_size: usize, + spill_file: Option, } -impl PartitionWriter { +impl SpillWriter { pub(crate) fn try_new( shuffle_block_writer: ShuffleBlockWriter, + write_buffer_size: usize, + batch_size: usize, ) -> datafusion::common::Result { Ok(Self { - spill_file: None, shuffle_block_writer, + write_buffer_size, + batch_size, + spill_file: None, }) } - fn ensure_spill_file_created( + pub(crate) fn write>>( &mut self, - runtime: &RuntimeEnv, - ) -> datafusion::common::Result<()> { - if self.spill_file.is_none() { - // Spill file is not yet created, create it - let spill_file = runtime - .disk_manager - .create_tmp_file("shuffle writer spill")?; - let spill_data = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(spill_file.path()) - .map_err(|e| { - DataFusionError::Execution(format!("Error occurred while spilling {e}")) - })?; - self.spill_file = Some(SpillFile { - temp_file: spill_file, - file: spill_data, - }); - } - Ok(()) - } - - pub(crate) fn spill( - &mut self, - iter: &mut PartitionedBatchIterator, + iter: &mut I, runtime: &RuntimeEnv, metrics: &ShufflePartitionerMetrics, - write_buffer_size: usize, - batch_size: usize, - ) -> datafusion::common::Result { - if let Some(batch) = iter.next(&metrics.interleave_time) { + ) -> datafusion::common::Result<()> { + if let Some(batch) = iter.next() { self.ensure_spill_file_created(runtime)?; let total_bytes_written = { let mut buf_batch_writer = BufBatchWriter::new( &mut self.shuffle_block_writer, &mut self.spill_file.as_mut().unwrap().file, - write_buffer_size, - batch_size, + self.write_buffer_size, + self.batch_size, ); let mut bytes_written = buf_batch_writer.write(&batch?, &metrics.encode_time, &metrics.write_time)?; - while let Some(batch) = iter.next(&metrics.interleave_time) { + for batch in iter.by_ref() { let batch = batch?; bytes_written += buf_batch_writer.write( &batch, @@ -106,11 +79,34 @@ impl PartitionWriter { buf_batch_writer.flush(&metrics.encode_time, &metrics.write_time)?; bytes_written }; + metrics.spilled_bytes.add(total_bytes_written); + } + Ok(()) + } - Ok(total_bytes_written) - } else { - Ok(0) + fn ensure_spill_file_created( + &mut self, + runtime: &RuntimeEnv, + ) -> datafusion::common::Result<()> { + if self.spill_file.is_none() { + // Spill file is not yet created, create it + let spill_file = runtime + .disk_manager + .create_tmp_file("shuffle writer spill")?; + let spill_data = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(spill_file.path()) + .map_err(|e| { + DataFusionError::Execution(format!("Error occurred while spilling {e}")) + })?; + self.spill_file = Some(SpillFile { + temp_file: spill_file, + file: spill_data, + }); } + Ok(()) } pub(crate) fn path(&self) -> Option<&std::path::Path> { diff --git a/native/shuffle/src/writers/mod.rs b/native/shuffle/src/writers/mod.rs index 75caf9f3a3..6d330fd12a 100644 --- a/native/shuffle/src/writers/mod.rs +++ b/native/shuffle/src/writers/mod.rs @@ -17,10 +17,12 @@ mod buf_batch_writer; mod checksum; +mod local; +mod partition_writer; mod shuffle_block_writer; -mod spill; pub(crate) use buf_batch_writer::BufBatchWriter; pub(crate) use checksum::Checksum; +pub(crate) use local::local_partition_writer::LocalPartitionWriter; +pub(crate) use partition_writer::PartitionWriter; pub use shuffle_block_writer::{CompressionCodec, ShuffleBlockWriter}; -pub(crate) use spill::PartitionWriter; diff --git a/native/shuffle/src/writers/partition_writer.rs b/native/shuffle/src/writers/partition_writer.rs new file mode 100644 index 0000000000..25b0e598df --- /dev/null +++ b/native/shuffle/src/writers/partition_writer.rs @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metrics::ShufflePartitionerMetrics; +use arrow::record_batch::RecordBatch; + +/// Storage backend abstraction for shuffle partition output. +/// +/// Decouples partitioning from storage: partitioners only produce partitioned +/// `RecordBatch` streams, while implementations of this trait own how those +/// batches are stored and finalized. [`LocalPartitionWriter`] implements the +/// local file behavior; other backends (e.g. a remote shuffle writer) can be +/// added without changing the partitioners. +/// +/// A partitioner drives a writer as: any number of +/// [`write`](PartitionWriter::write) calls to stage batches, then one +/// [`finish_partition`](PartitionWriter::finish_partition) per partition in +/// ascending id order, then a single [`finish_all`](PartitionWriter::finish_all). +/// +/// [`LocalPartitionWriter`]: crate::writers::local::local_partition_writer::LocalPartitionWriter +pub(crate) trait PartitionWriter: Send + Sync { + /// Stages the batches from `iter` for partition `pid` without finalizing it. + /// + /// Used to stream single-partition output and to stage multi-partition + /// spilled batches. A partition may be written multiple times and in any + /// order; staged data is only guaranteed visible after + /// [`finish_partition`](PartitionWriter::finish_partition). + fn write( + &mut self, + pid: usize, + iter: &mut I, + metrics: &ShufflePartitionerMetrics, + ) -> datafusion::common::Result<()> + where + I: Iterator>; + + /// Finalizes partition `pid`, writing any remaining batches from `iter` and + /// combining them with data previously staged via + /// [`write`](PartitionWriter::write). + /// + /// Must be called exactly once per partition, in ascending id order, so the + /// writer can lay partitions out contiguously and record their offsets. + fn finish_partition( + &mut self, + pid: usize, + iter: &mut I, + metrics: &ShufflePartitionerMetrics, + ) -> datafusion::common::Result<()> + where + I: Iterator>; + + /// Completes the shuffle write, flushing output and emitting the partition + /// index. Called exactly once, after the last + /// [`finish_partition`](PartitionWriter::finish_partition). + fn finish_all(&mut self, metrics: &ShufflePartitionerMetrics) + -> datafusion::common::Result<()>; +}