From 16062ecf04c23c37b960bcf640d031be6110c52f Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Sun, 7 Jun 2026 19:13:50 +0530 Subject: [PATCH 1/3] Investigations for ingestion optimization Try to find more areas for optimization. - compress arrow files while writing (lz4_frame or zstd) - create new .part file based on size - one parquet per arrow file (converted in parallel) - separate runtimes to run ingestion tasks, sync and conversion tasks --- src/parseable/staging/writer.rs | 60 +++++++++++--- src/parseable/streams.rs | 142 +++++++++++++++++++------------- src/storage/object_storage.rs | 39 +++++---- src/sync.rs | 4 + 4 files changed, 163 insertions(+), 82 deletions(-) diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index 8e8ee63ac..18bb72e4f 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -26,10 +26,14 @@ use std::{ }; use arrow_array::RecordBatch; -use arrow_ipc::writer::StreamWriter; +use arrow_ipc::{ + CompressionType, + writer::{IpcWriteOptions, StreamWriter}, +}; use arrow_schema::Schema; use arrow_select::concat::concat_batches; use chrono::{TimeDelta, Utc}; +use datafusion::physical_plan::buffer::SizedMessage; use itertools::Itertools; use once_cell::sync::Lazy; use rand::distributions::{Alphanumeric, DistString}; @@ -53,9 +57,20 @@ static DISK_WRITE_BATCH_MAX_AGE_SECS: Lazy = Lazy::new(|| { } }); +const ARROW_FLUSH_SIZE_LIMIT_VAR: &str = "ARROW_FLUSH_SIZE_LIMIT"; +static ARROW_FLUSH_SIZE_LIMIT: Lazy = Lazy::new(|| { + if let Ok(var) = std::env::var(ARROW_FLUSH_SIZE_LIMIT_VAR) + && let Ok(var) = var.parse::() + { + var + } else { + 1024 * 1024 * 1024 * 10 + } +}); + #[derive(Default)] pub struct Writer { - pub mem: MemWriter<16384>, + pub mem: MemWriter<4096>, pub disk: HashMap, disk_pending: HashMap, } @@ -162,7 +177,7 @@ fn write_pending_disk_batch( let schema = pending.batches[0].schema(); let batch = concat_batches(&schema, pending.batches.iter())?; - match disk.get_mut(&filename) { + let s = match disk.get_mut(&filename) { Some(writer) => writer.write(&batch)?, None => { let range = pending.range.expect("pending disk batch must have range"); @@ -170,9 +185,13 @@ fn write_pending_disk_batch( fs::create_dir_all(parent)?; } let mut writer = DiskWriter::try_new(file_path, &schema, range)?; - writer.write(&batch)?; - disk.insert(filename, writer); + let s = writer.write(&batch)?; + disk.insert(filename.clone(), writer); + s } + }; + if s >= *ARROW_FLUSH_SIZE_LIMIT { + disk.remove(&filename); } Ok(()) @@ -203,6 +222,7 @@ pub struct DiskWriter { inner: StreamWriter>, path: PathBuf, range: TimeRange, + size: usize, } impl DiskWriter { @@ -219,9 +239,22 @@ impl DiskWriter { .truncate(true) .create(true) .open(&path)?; - let inner = StreamWriter::try_new_buffered(file, schema)?; - - Ok(Self { inner, path, range }) + let inner = StreamWriter::try_new_with_options( + BufWriter::new(file), + schema, + IpcWriteOptions::default() + .try_with_compression(Some(CompressionType::LZ4_FRAME)) + .unwrap(), + )?; + + let size = 0; + + Ok(Self { + inner, + path, + range, + size, + }) } pub fn is_current(&self) -> bool { @@ -230,8 +263,10 @@ impl DiskWriter { /// Write a single recordbatch into file #[cfg_attr(feature = "hotpath", hotpath::measure)] - pub fn write(&mut self, rb: &RecordBatch) -> Result<(), StagingError> { - self.inner.write(rb).map_err(StagingError::Arrow) + pub fn write(&mut self, rb: &RecordBatch) -> Result { + self.size += rb.size(); + self.inner.write(rb).map_err(StagingError::Arrow)?; + Ok(self.size) } } @@ -260,6 +295,11 @@ impl Drop for DiskWriter { if let Err(err) = std::fs::rename(&self.path, &arrow_path) { error!("Couldn't rename file {:?}, error = {err}", self.path); } + tracing::info!( + "flushing {:?} due to drop with size {}\n", + self.path, + self.size + ); } } diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 0c2b3eb3e..eb6d1a247 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -23,7 +23,7 @@ use arrow_schema::{Field, Fields, Schema}; use chrono::{NaiveDateTime, Timelike, Utc}; use derive_more::derive::{Deref, DerefMut}; use itertools::Itertools; -use once_cell::sync::Lazy; +use once_cell::sync::{Lazy, OnceCell}; use parquet::{ arrow::ArrowWriter, basic::Encoding, @@ -33,6 +33,7 @@ use parquet::{ }, schema::types::ColumnPath, }; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; use relative_path::RelativePathBuf; use std::sync::PoisonError; use std::{ @@ -62,6 +63,7 @@ use crate::{ option::Mode, parseable::{DEFAULT_TENANT, PARSEABLE}, storage::{StreamType, object_storage::to_bytes, retention::Retention}, + sync::FLUSH_AND_CONVERT_RUNTIME, utils::time::{Minute, TimeRange}, }; @@ -70,6 +72,8 @@ use super::{ staging::{StagingError, reader::MergedReverseRecordReader, writer::Writer}, }; +static HOSTNAME: OnceCell = OnceCell::new(); + const DISK_WRITE_BATCH_ROWS_VAR: &str = "DISK_WRITE_BATCH_ROWS"; static DISK_WRITE_BATCH_ROWS: Lazy = Lazy::new(|| { if let Ok(var) = std::env::var(DISK_WRITE_BATCH_ROWS_VAR) @@ -209,12 +213,16 @@ impl Stream { parsed_timestamp: NaiveDateTime, custom_partition_values: &HashMap, ) -> String { - let mut hostname = hostname::get() - .unwrap_or_else(|_| std::ffi::OsString::from(&Ulid::new().to_string())) - .into_string() - .unwrap_or_else(|_| Ulid::new().to_string()) - .matches(|c: char| c.is_alphanumeric() || c == '-' || c == '_') - .collect::(); + let mut hostname = HOSTNAME + .get_or_init(|| { + hostname::get() + .unwrap_or_else(|_| std::ffi::OsString::from(&Ulid::new().to_string())) + .into_string() + .unwrap_or_else(|_| Ulid::new().to_string()) + .matches(|c: char| c.is_alphanumeric() || c == '-' || c == '_') + .collect::() + }) + .clone(); if let Some(id) = &self.ingestor_id { hostname.push_str(id); @@ -257,7 +265,7 @@ impl Stream { return vec![]; }; - //iterate through all the inprocess_ directories and collect all arrow files + // iterate through all the inprocess_ directories and collect all arrow files dir.filter_map(|entry| { let path = entry.ok()?.path(); if path.is_dir() @@ -587,6 +595,7 @@ impl Stream { self.stream_name, poisoned ))) })?; + // why clean Writer.MemWriter? writer.mem.clear(); writer.take_flushable_disk(forced) }; @@ -825,32 +834,52 @@ impl Stream { } self.update_staging_metrics(&staging_files, tenant_id); - for (parquet_path, arrow_files) in staging_files { - let record_reader = MergedReverseRecordReader::try_new(&arrow_files); - if record_reader.readers.is_empty() { - continue; - } - let merged_schema = record_reader.merged_schema(); - let props = self.parquet_writer_props(&merged_schema, time_partition, custom_partition); - schemas.push(merged_schema.clone()); - let schema = Arc::new(merged_schema); - - let part_path = parquet_path.with_extension("part"); - - if !self.write_parquet_part_file( - &part_path, - record_reader, - &schema, - &props, - time_partition, - )? { - continue; - } - if let Err(e) = std::fs::rename(&part_path, &parquet_path) { - error!("Couldn't rename part file: {part_path:?} -> {parquet_path:?}, error = {e}"); - } else { - self.cleanup_arrow_files_and_dir(&arrow_files, tenant_id); + let _schemas: Vec, StagingError>> = staging_files.into_par_iter().map( + |(parquet_path, arrow_files)| -> Result, StagingError> { + let record_reader = MergedReverseRecordReader::try_new(&arrow_files); + if record_reader.readers.is_empty() { + Ok(None) + } else { + let merged_schema = record_reader.merged_schema(); + let props = + self.parquet_writer_props(&merged_schema, time_partition, custom_partition); + // schemas.push(merged_schema.clone()); + let schema = Arc::new(merged_schema.clone()); + + let part_path = parquet_path.with_extension("part"); + + if !self.write_parquet_part_file( + &part_path, + record_reader, + &schema, + &props, + time_partition, + )? { + return Ok(None) + } + + if let Err(e) = std::fs::rename(&part_path, &parquet_path) { + error!( + "Couldn't rename part file: {part_path:?} -> {parquet_path:?}, error = {e}" + ); + } else { + self.cleanup_arrow_files_and_dir(&arrow_files, tenant_id); + } + Ok(Some(merged_schema)) + } + }, + ) + .collect(); + + for res in _schemas { + match res { + Ok(s) => { + if let Some(s) = s { + schemas.push(s) + } + } + Err(e) => return Err(e), } } if schemas.is_empty() { @@ -880,6 +909,8 @@ impl Stream { .open(part_path) .map_err(|_| StagingError::Create)?; let mut writer = ArrowWriter::try_new(&mut part_file, schema.clone(), Some(props.clone()))?; + + // does pruning help with query? let sort_for_metric_pruning = self.is_otel_metrics(); let time_partition_field = time_partition.map_or_else( || DEFAULT_TIMESTAMP_KEY.to_string(), @@ -1409,20 +1440,24 @@ impl Stream { // For regular cycles, use false to only flush non-current writers let forced = init_signal || shutdown_signal; self.flush(forced)?; - info!( - "Flushing stream ({}) took: {}s", - self.stream_name, - start_flush.elapsed().as_secs_f64() - ); + if self.get_stream_type().eq(&StreamType::UserDefined) { + info!( + "Flushing stream ({}) took: {}s", + self.stream_name, + start_flush.elapsed().as_secs_f64() + ); + } let start_convert = Instant::now(); self.prepare_parquet(init_signal, shutdown_signal, tenant_id)?; - info!( - "Converting arrows to parquet on stream ({}) took: {}s", - self.stream_name, - start_convert.elapsed().as_secs_f64() - ); + if self.get_stream_type().eq(&StreamType::UserDefined) { + info!( + "Converting arrows to parquet on stream ({}) took: {}s", + self.stream_name, + start_convert.elapsed().as_secs_f64() + ); + } Ok(()) } @@ -1512,15 +1547,6 @@ impl Streams { } else { vec![] } - - // self.read() - // .expect(LOCK_EXPECT) - // .get(&tenant_id) - // .and_then(|v|v.keys()) - // .map(f) - // .keys() - // .map(String::clone) - // .collect() } pub fn list_internal_streams(&self, tenant_id: &Option) -> Vec { @@ -1553,6 +1579,7 @@ impl Streams { vec![DEFAULT_TENANT.to_owned()] }; + let handle = FLUSH_AND_CONVERT_RUNTIME.handle(); for tenant_id in tenants { let guard = self.read().expect(LOCK_EXPECT); let streams: Vec> = if let Some(tenant_streams) = guard.get(&tenant_id) { @@ -1563,10 +1590,13 @@ impl Streams { for stream in streams { let tenant = tenant_id.clone(); let span = info_span!("stream_sync", stream_name = %stream.stream_name); - joinset.spawn_blocking(move || { - let _guard = span.enter(); - stream.flush_and_convert(init_signal, shutdown_signal, &Some(tenant)) - }); + joinset.spawn_blocking_on( + move || { + let _guard = span.enter(); + stream.flush_and_convert(init_signal, shutdown_signal, &Some(tenant)) + }, + handle, + ); } } } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index a1831b5d8..9c9089faa 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -64,6 +64,7 @@ use crate::storage::field_stats::DATASET_STATS_STREAM_NAME; use crate::storage::field_stats::calculate_field_stats; use crate::storage::field_stats::extract_datetime_from_parquet_path_regex; use crate::sync::ACTIVE_OBJECT_STORE_SYNC_FILES; +use crate::sync::FLUSH_AND_CONVERT_RUNTIME; use super::{ ALERTS_ROOT_DIRECTORY, MANIFEST_FILE, ObjectStorageError, ObjectStoreFormat, @@ -1079,13 +1080,14 @@ async fn process_parquet_files( let parquet_paths: Vec = upload_context .stream .parquet_files() - .into_par_iter() + .into_iter() .filter(|p| !guard.contains(p)) .collect(); let mut ret = Vec::with_capacity(parquet_paths.len()); ret.clone_from(&parquet_paths); guard.extend(parquet_paths); + tracing::info!(ACTIVE_OBJECT_STORE_SYNC_FILES=?ACTIVE_OBJECT_STORE_SYNC_FILES); ret }; @@ -1153,20 +1155,23 @@ async fn spawn_parquet_upload_task( let stream_name = stream_name.to_string(); let schema = upload_context.schema.clone(); - - join_set.spawn(async move { - let _permit = semaphore.acquire().await.expect("semaphore is not closed"); - - upload_single_parquet_file( - store, - path, - stream_relative_path, - stream_name, - schema, - tenant_id, - ) - .await - }); + let handle = FLUSH_AND_CONVERT_RUNTIME.handle(); + join_set.spawn_on( + async move { + let _permit = semaphore.acquire().await.expect("semaphore is not closed"); + + upload_single_parquet_file( + store, + path, + stream_relative_path, + stream_name, + schema, + tenant_id, + ) + .await + }, + handle, + ); } /// Collects results from all upload tasks @@ -1293,6 +1298,7 @@ pub fn sync_all_streams(joinset: &mut JoinSet>) { } else { vec![None] }; + let handle = FLUSH_AND_CONVERT_RUNTIME.handle(); for tenant_id in tenants { for stream_name in PARSEABLE.streams.list(&tenant_id) { if let Ok(stream) = PARSEABLE.get_stream(&stream_name, &tenant_id) @@ -1304,7 +1310,7 @@ pub fn sync_all_streams(joinset: &mut JoinSet>) { let object_store = object_store.clone(); let id = tenant_id.clone(); let span = info_span!("stream_upload", stream_name = %stream_name); - joinset.spawn( + joinset.spawn_on( async move { let start = Instant::now(); let result = object_store @@ -1321,6 +1327,7 @@ pub fn sync_all_streams(joinset: &mut JoinSet>) { result } .instrument(span), + handle, ); } } diff --git a/src/sync.rs b/src/sync.rs index ebdc1d057..f6f20be48 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -26,12 +26,16 @@ use std::panic::AssertUnwindSafe; use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; +use tokio::runtime::Runtime; use tokio::sync::{RwLock, mpsc, oneshot}; use tokio::task::JoinSet; use tokio::time::{Duration, Instant, interval_at, sleep}; use tokio::{select, task}; use tracing::{Instrument, error, info, info_span, trace, warn}; +pub static FLUSH_AND_CONVERT_RUNTIME: Lazy = + Lazy::new(|| Runtime::new().expect("Runtime should be constructible")); + static LOCAL_SYNC_RUNNING: AtomicBool = AtomicBool::new(false); static REMOTE_SYNC_RUNNING: AtomicBool = AtomicBool::new(false); From 5eb16779fd7e8ca578244ae733316397e3e5ce2d Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Mon, 8 Jun 2026 10:57:11 +0530 Subject: [PATCH 2/3] add env vars for sort pruning and parquet creation --- src/parseable/staging/writer.rs | 22 +++++++++++++++++++++- src/parseable/streams.rs | 13 ++++++++++++- 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index 18bb72e4f..c37e5204d 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -38,6 +38,7 @@ use itertools::Itertools; use once_cell::sync::Lazy; use rand::distributions::{Alphanumeric, DistString}; use tracing::error; +use ulid::Ulid; use crate::{ parseable::{ARROW_FILE_EXTENSION, PART_FILE_EXTENSION}, @@ -68,6 +69,17 @@ static ARROW_FLUSH_SIZE_LIMIT: Lazy = Lazy::new(|| { } }); +const ONE_PARQUET_PER_ARROW_VAR: &str = "ONE_PARQUET_PER_ARROW"; +static ONE_PARQUET_PER_ARROW: Lazy = Lazy::new(|| { + if let Ok(var) = std::env::var(ONE_PARQUET_PER_ARROW_VAR) + && let Ok(var) = var.parse::() + { + var + } else { + false + } +}); + #[derive(Default)] pub struct Writer { pub mem: MemWriter<4096>, @@ -279,7 +291,15 @@ impl Drop for DiskWriter { } let mut arrow_path = self.path.to_owned(); - arrow_path.set_extension(ARROW_FILE_EXTENSION); + + // a rudimentary way to ensure one parquet per arrow file + if *ONE_PARQUET_PER_ARROW { + arrow_path.set_extension(Ulid::new().to_string()); + #[allow(clippy::incompatible_msrv)] + arrow_path.add_extension(ARROW_FILE_EXTENSION); + } else { + arrow_path.set_extension(ARROW_FILE_EXTENSION); + } // If file exists, append a random string before .date to avoid overwriting if arrow_path.exists() { diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index eb6d1a247..43f8a716a 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -85,6 +85,17 @@ static DISK_WRITE_BATCH_ROWS: Lazy = Lazy::new(|| { } }); +const SORT_FOR_METRIC_PRUNING_VAR: &str = "SORT_FOR_METRIC_PRUNING"; +static SORT_FOR_METRIC_PRUNING: Lazy = Lazy::new(|| { + if let Ok(var) = std::env::var(SORT_FOR_METRIC_PRUNING_VAR) + && let Ok(var) = var.parse::() + { + var + } else { + true + } +}); + const INPROCESS_DIR_PREFIX: &str = "processing_"; const METRIC_ROW_GROUP_PREP_IN_FLIGHT: usize = 1; @@ -917,7 +928,7 @@ impl Stream { |s| s.as_str().to_string(), ); - if sort_for_metric_pruning { + if sort_for_metric_pruning && *SORT_FOR_METRIC_PRUNING { // Buffer batches up to the row-group target, then // concat + sort + write as a single contiguous batch. The // ArrowWriter splits the sorted batch into row groups at the From 33b5418ec31cef703b7649e3d1d91edd37bb7e28 Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Mon, 8 Jun 2026 13:01:09 +0530 Subject: [PATCH 3/3] revert pruning env var --- src/parseable/streams.rs | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 43f8a716a..eb6d1a247 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -85,17 +85,6 @@ static DISK_WRITE_BATCH_ROWS: Lazy = Lazy::new(|| { } }); -const SORT_FOR_METRIC_PRUNING_VAR: &str = "SORT_FOR_METRIC_PRUNING"; -static SORT_FOR_METRIC_PRUNING: Lazy = Lazy::new(|| { - if let Ok(var) = std::env::var(SORT_FOR_METRIC_PRUNING_VAR) - && let Ok(var) = var.parse::() - { - var - } else { - true - } -}); - const INPROCESS_DIR_PREFIX: &str = "processing_"; const METRIC_ROW_GROUP_PREP_IN_FLIGHT: usize = 1; @@ -928,7 +917,7 @@ impl Stream { |s| s.as_str().to_string(), ); - if sort_for_metric_pruning && *SORT_FOR_METRIC_PRUNING { + if sort_for_metric_pruning { // Buffer batches up to the row-group target, then // concat + sort + write as a single contiguous batch. The // ArrowWriter splits the sorted batch into row groups at the