Skip to content
71 changes: 20 additions & 51 deletions native/shuffle/src/partitioners/empty_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,38 @@

use crate::metrics::ShufflePartitionerMetrics;
use crate::partitioners::ShufflePartitioner;
use crate::ShuffleBlockWriter;
use crate::writers::PartitionWriter;
use arrow::array::RecordBatch;
use arrow::datatypes::SchemaRef;
use datafusion::common::DataFusionError;
use std::fs::OpenOptions;
use std::io::{BufWriter, Seek, Write};
use std::iter;
use tokio::time::Instant;

/// A partitioner for zero-column schemas (e.g. queries where ColumnPruning removes all columns).
/// This handles shuffles for operations like COUNT(*) that produce empty-schema record batches
/// but contain a valid row count. Accumulates the total row count and writes a single
/// zero-column IPC batch to partition 0. All other partitions get empty entries in the index file.
pub(crate) struct EmptySchemaShufflePartitioner {
output_data_file: String,
output_index_file: String,
pub(crate) struct EmptySchemaShufflePartitioner<T: PartitionWriter> {
partition_writer: T,
schema: SchemaRef,
shuffle_block_writer: ShuffleBlockWriter,
num_output_partitions: usize,
total_rows: usize,
metrics: ShufflePartitionerMetrics,
}

impl EmptySchemaShufflePartitioner {
impl<T: PartitionWriter> EmptySchemaShufflePartitioner<T> {
pub(crate) fn try_new(
output_data_file: String,
output_index_file: String,
partition_writer: T,
schema: SchemaRef,
num_output_partitions: usize,
metrics: ShufflePartitionerMetrics,
codec: crate::CompressionCodec,
) -> datafusion::common::Result<Self> {
debug_assert!(
schema.fields().is_empty(),
"EmptySchemaShufflePartitioner requires a zero-column schema"
);
let shuffle_block_writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec)?;
Ok(Self {
output_data_file,
output_index_file,
partition_writer,
schema,
shuffle_block_writer,
num_output_partitions,
total_rows: 0,
metrics,
Expand All @@ -66,7 +57,7 @@ impl EmptySchemaShufflePartitioner {
}

#[async_trait::async_trait]
impl ShufflePartitioner for EmptySchemaShufflePartitioner {
impl<T: PartitionWriter> ShufflePartitioner for EmptySchemaShufflePartitioner<T> {
async fn insert_batch(&mut self, batch: RecordBatch) -> datafusion::common::Result<()> {
let start_time = Instant::now();
let num_rows = batch.num_rows();
Expand All @@ -85,46 +76,24 @@ impl ShufflePartitioner for EmptySchemaShufflePartitioner {
fn shuffle_write(&mut self) -> datafusion::common::Result<()> {
let start_time = Instant::now();

let output_data = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&self.output_data_file)
.map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?;
let mut output_data = BufWriter::new(output_data);

// Write a single zero-column batch with the accumulated row count to partition 0
if self.total_rows > 0 {
let batch = RecordBatch::try_new_with_options(
let batch_opt = if self.total_rows > 0 {
Some(Ok(RecordBatch::try_new_with_options(
self.schema.clone(),
vec![],
&arrow::array::RecordBatchOptions::new().with_row_count(Some(self.total_rows)),
)?;
self.shuffle_block_writer.write_batch(
&batch,
&mut output_data,
&self.metrics.encode_time,
)?;
}

let mut write_timer = self.metrics.write_time.timer();
output_data.flush()?;
let data_file_length = output_data.stream_position()?;
)?))
} else {
None
};

// Write index file: partition 0 spans [0, data_file_length), all others are empty
let index_file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&self.output_index_file)
.map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?;
let mut index_writer = BufWriter::new(index_file);
index_writer.write_all(&0i64.to_le_bytes())?;
for _ in 0..self.num_output_partitions {
index_writer.write_all(&(data_file_length as i64).to_le_bytes())?;
self.partition_writer
.finish_partition(0, &mut batch_opt.into_iter(), &self.metrics)?;
for pid in 1..self.num_output_partitions {
self.partition_writer
.finish_partition(pid, &mut iter::empty(), &self.metrics)?;
}
index_writer.flush()?;
write_timer.stop();
self.partition_writer.finish_all(&self.metrics)?;

self.metrics
.baseline
Expand Down
1 change: 0 additions & 1 deletion native/shuffle/src/partitioners/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,5 @@ mod traits;

pub(crate) use empty_schema::EmptySchemaShufflePartitioner;
pub(crate) use multi_partition::MultiPartitionShuffleRepartitioner;
pub(crate) use partitioned_batch_iterator::PartitionedBatchIterator;
pub(crate) use single_partition::SinglePartitionShufflePartitioner;
pub(crate) use traits::ShufflePartitioner;
Loading
Loading