Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 71 additions & 11 deletions src/parseable/staging/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,19 @@ use std::{
};

use arrow_array::RecordBatch;
use arrow_ipc::writer::StreamWriter;
use arrow_ipc::{
CompressionType,
writer::{IpcWriteOptions, StreamWriter},
};
use arrow_schema::Schema;
use arrow_select::concat::concat_batches;
use chrono::{TimeDelta, Utc};
use datafusion::physical_plan::buffer::SizedMessage;
use itertools::Itertools;
use once_cell::sync::Lazy;
use rand::distributions::{Alphanumeric, DistString};
use tracing::error;
use ulid::Ulid;

use crate::{
parseable::{ARROW_FILE_EXTENSION, PART_FILE_EXTENSION},
Expand All @@ -53,9 +58,31 @@ static DISK_WRITE_BATCH_MAX_AGE_SECS: Lazy<i64> = Lazy::new(|| {
}
});

const ARROW_FLUSH_SIZE_LIMIT_VAR: &str = "ARROW_FLUSH_SIZE_LIMIT";
static ARROW_FLUSH_SIZE_LIMIT: Lazy<usize> = Lazy::new(|| {
if let Ok(var) = std::env::var(ARROW_FLUSH_SIZE_LIMIT_VAR)
&& let Ok(var) = var.parse::<usize>()
{
var
} else {
1024 * 1024 * 1024 * 10
}
});

const ONE_PARQUET_PER_ARROW_VAR: &str = "ONE_PARQUET_PER_ARROW";
static ONE_PARQUET_PER_ARROW: Lazy<bool> = Lazy::new(|| {
if let Ok(var) = std::env::var(ONE_PARQUET_PER_ARROW_VAR)
&& let Ok(var) = var.parse::<bool>()
{
var
} else {
false
}
});

#[derive(Default)]
pub struct Writer {
pub mem: MemWriter<16384>,
pub mem: MemWriter<4096>,
pub disk: HashMap<String, DiskWriter>,
disk_pending: HashMap<String, PendingDiskBatch>,
}
Expand Down Expand Up @@ -162,17 +189,21 @@ fn write_pending_disk_batch(

let schema = pending.batches[0].schema();
let batch = concat_batches(&schema, pending.batches.iter())?;
match disk.get_mut(&filename) {
let s = match disk.get_mut(&filename) {
Some(writer) => writer.write(&batch)?,
None => {
let range = pending.range.expect("pending disk batch must have range");
if let Some(parent) = file_path.parent() {
fs::create_dir_all(parent)?;
}
let mut writer = DiskWriter::try_new(file_path, &schema, range)?;
writer.write(&batch)?;
disk.insert(filename, writer);
let s = writer.write(&batch)?;
disk.insert(filename.clone(), writer);
s
}
};
if s >= *ARROW_FLUSH_SIZE_LIMIT {
disk.remove(&filename);
}

Ok(())
Expand Down Expand Up @@ -203,6 +234,7 @@ pub struct DiskWriter {
inner: StreamWriter<BufWriter<File>>,
path: PathBuf,
range: TimeRange,
size: usize,
}

impl DiskWriter {
Expand All @@ -219,9 +251,22 @@ impl DiskWriter {
.truncate(true)
.create(true)
.open(&path)?;
let inner = StreamWriter::try_new_buffered(file, schema)?;

Ok(Self { inner, path, range })
let inner = StreamWriter::try_new_with_options(
BufWriter::new(file),
schema,
IpcWriteOptions::default()
.try_with_compression(Some(CompressionType::LZ4_FRAME))
.unwrap(),
)?;

let size = 0;

Ok(Self {
inner,
path,
range,
size,
})
}

pub fn is_current(&self) -> bool {
Expand All @@ -230,8 +275,10 @@ impl DiskWriter {

/// Write a single recordbatch into file
#[cfg_attr(feature = "hotpath", hotpath::measure)]
pub fn write(&mut self, rb: &RecordBatch) -> Result<(), StagingError> {
self.inner.write(rb).map_err(StagingError::Arrow)
pub fn write(&mut self, rb: &RecordBatch) -> Result<usize, StagingError> {
self.size += rb.size();
self.inner.write(rb).map_err(StagingError::Arrow)?;
Ok(self.size)
}
}

Expand All @@ -244,7 +291,15 @@ impl Drop for DiskWriter {
}

let mut arrow_path = self.path.to_owned();
arrow_path.set_extension(ARROW_FILE_EXTENSION);

// a rudimentary way to ensure one parquet per arrow file
if *ONE_PARQUET_PER_ARROW {
arrow_path.set_extension(Ulid::new().to_string());
#[allow(clippy::incompatible_msrv)]
arrow_path.add_extension(ARROW_FILE_EXTENSION);
} else {
arrow_path.set_extension(ARROW_FILE_EXTENSION);
}

// If file exists, append a random string before .date to avoid overwriting
if arrow_path.exists() {
Expand All @@ -260,6 +315,11 @@ impl Drop for DiskWriter {
if let Err(err) = std::fs::rename(&self.path, &arrow_path) {
error!("Couldn't rename file {:?}, error = {err}", self.path);
}
tracing::info!(
"flushing {:?} due to drop with size {}\n",
self.path,
self.size
);
}
}

Expand Down
142 changes: 86 additions & 56 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use arrow_schema::{Field, Fields, Schema};
use chrono::{NaiveDateTime, Timelike, Utc};
use derive_more::derive::{Deref, DerefMut};
use itertools::Itertools;
use once_cell::sync::Lazy;
use once_cell::sync::{Lazy, OnceCell};
use parquet::{
arrow::ArrowWriter,
basic::Encoding,
Expand All @@ -33,6 +33,7 @@ use parquet::{
},
schema::types::ColumnPath,
};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use relative_path::RelativePathBuf;
use std::sync::PoisonError;
use std::{
Expand Down Expand Up @@ -62,6 +63,7 @@ use crate::{
option::Mode,
parseable::{DEFAULT_TENANT, PARSEABLE},
storage::{StreamType, object_storage::to_bytes, retention::Retention},
sync::FLUSH_AND_CONVERT_RUNTIME,
utils::time::{Minute, TimeRange},
};

Expand All @@ -70,6 +72,8 @@ use super::{
staging::{StagingError, reader::MergedReverseRecordReader, writer::Writer},
};

static HOSTNAME: OnceCell<String> = OnceCell::new();

const DISK_WRITE_BATCH_ROWS_VAR: &str = "DISK_WRITE_BATCH_ROWS";
static DISK_WRITE_BATCH_ROWS: Lazy<usize> = Lazy::new(|| {
if let Ok(var) = std::env::var(DISK_WRITE_BATCH_ROWS_VAR)
Expand Down Expand Up @@ -209,12 +213,16 @@ impl Stream {
parsed_timestamp: NaiveDateTime,
custom_partition_values: &HashMap<String, String>,
) -> String {
let mut hostname = hostname::get()
.unwrap_or_else(|_| std::ffi::OsString::from(&Ulid::new().to_string()))
.into_string()
.unwrap_or_else(|_| Ulid::new().to_string())
.matches(|c: char| c.is_alphanumeric() || c == '-' || c == '_')
.collect::<String>();
let mut hostname = HOSTNAME
.get_or_init(|| {
hostname::get()
.unwrap_or_else(|_| std::ffi::OsString::from(&Ulid::new().to_string()))
.into_string()
.unwrap_or_else(|_| Ulid::new().to_string())
.matches(|c: char| c.is_alphanumeric() || c == '-' || c == '_')
.collect::<String>()
})
.clone();

if let Some(id) = &self.ingestor_id {
hostname.push_str(id);
Expand Down Expand Up @@ -257,7 +265,7 @@ impl Stream {
return vec![];
};

//iterate through all the inprocess_ directories and collect all arrow files
// iterate through all the inprocess_ directories and collect all arrow files
dir.filter_map(|entry| {
let path = entry.ok()?.path();
if path.is_dir()
Expand Down Expand Up @@ -587,6 +595,7 @@ impl Stream {
self.stream_name, poisoned
)))
})?;
// why clean Writer.MemWriter?
writer.mem.clear();
writer.take_flushable_disk(forced)
};
Expand Down Expand Up @@ -825,32 +834,52 @@ impl Stream {
}

self.update_staging_metrics(&staging_files, tenant_id);
for (parquet_path, arrow_files) in staging_files {
let record_reader = MergedReverseRecordReader::try_new(&arrow_files);
if record_reader.readers.is_empty() {
continue;
}
let merged_schema = record_reader.merged_schema();
let props = self.parquet_writer_props(&merged_schema, time_partition, custom_partition);
schemas.push(merged_schema.clone());
let schema = Arc::new(merged_schema);

let part_path = parquet_path.with_extension("part");

if !self.write_parquet_part_file(
&part_path,
record_reader,
&schema,
&props,
time_partition,
)? {
continue;
}

if let Err(e) = std::fs::rename(&part_path, &parquet_path) {
error!("Couldn't rename part file: {part_path:?} -> {parquet_path:?}, error = {e}");
} else {
self.cleanup_arrow_files_and_dir(&arrow_files, tenant_id);
let _schemas: Vec<Result<Option<Schema>, StagingError>> = staging_files.into_par_iter().map(
|(parquet_path, arrow_files)| -> Result<Option<Schema>, StagingError> {
let record_reader = MergedReverseRecordReader::try_new(&arrow_files);
if record_reader.readers.is_empty() {
Ok(None)
} else {
let merged_schema = record_reader.merged_schema();
let props =
self.parquet_writer_props(&merged_schema, time_partition, custom_partition);
// schemas.push(merged_schema.clone());
let schema = Arc::new(merged_schema.clone());

let part_path = parquet_path.with_extension("part");

if !self.write_parquet_part_file(
&part_path,
record_reader,
&schema,
&props,
time_partition,
)? {
return Ok(None)
}

if let Err(e) = std::fs::rename(&part_path, &parquet_path) {
error!(
"Couldn't rename part file: {part_path:?} -> {parquet_path:?}, error = {e}"
);
} else {
self.cleanup_arrow_files_and_dir(&arrow_files, tenant_id);
}
Ok(Some(merged_schema))
}
},
)
.collect();

for res in _schemas {
match res {
Ok(s) => {
if let Some(s) = s {
schemas.push(s)
}
}
Err(e) => return Err(e),
}
}
if schemas.is_empty() {
Expand Down Expand Up @@ -880,6 +909,8 @@ impl Stream {
.open(part_path)
.map_err(|_| StagingError::Create)?;
let mut writer = ArrowWriter::try_new(&mut part_file, schema.clone(), Some(props.clone()))?;

// does pruning help with query?
let sort_for_metric_pruning = self.is_otel_metrics();
let time_partition_field = time_partition.map_or_else(
|| DEFAULT_TIMESTAMP_KEY.to_string(),
Expand Down Expand Up @@ -1409,20 +1440,24 @@ impl Stream {
// For regular cycles, use false to only flush non-current writers
let forced = init_signal || shutdown_signal;
self.flush(forced)?;
info!(
"Flushing stream ({}) took: {}s",
self.stream_name,
start_flush.elapsed().as_secs_f64()
);
if self.get_stream_type().eq(&StreamType::UserDefined) {
info!(
"Flushing stream ({}) took: {}s",
self.stream_name,
start_flush.elapsed().as_secs_f64()
);
}

let start_convert = Instant::now();

self.prepare_parquet(init_signal, shutdown_signal, tenant_id)?;
info!(
"Converting arrows to parquet on stream ({}) took: {}s",
self.stream_name,
start_convert.elapsed().as_secs_f64()
);
if self.get_stream_type().eq(&StreamType::UserDefined) {
info!(
"Converting arrows to parquet on stream ({}) took: {}s",
self.stream_name,
start_convert.elapsed().as_secs_f64()
);
}

Ok(())
}
Expand Down Expand Up @@ -1512,15 +1547,6 @@ impl Streams {
} else {
vec![]
}

// self.read()
// .expect(LOCK_EXPECT)
// .get(&tenant_id)
// .and_then(|v|v.keys())
// .map(f)
// .keys()
// .map(String::clone)
// .collect()
}

pub fn list_internal_streams(&self, tenant_id: &Option<String>) -> Vec<String> {
Expand Down Expand Up @@ -1553,6 +1579,7 @@ impl Streams {
vec![DEFAULT_TENANT.to_owned()]
};

let handle = FLUSH_AND_CONVERT_RUNTIME.handle();
for tenant_id in tenants {
let guard = self.read().expect(LOCK_EXPECT);
let streams: Vec<Arc<Stream>> = if let Some(tenant_streams) = guard.get(&tenant_id) {
Expand All @@ -1563,10 +1590,13 @@ impl Streams {
for stream in streams {
let tenant = tenant_id.clone();
let span = info_span!("stream_sync", stream_name = %stream.stream_name);
joinset.spawn_blocking(move || {
let _guard = span.enter();
stream.flush_and_convert(init_signal, shutdown_signal, &Some(tenant))
});
joinset.spawn_blocking_on(
move || {
let _guard = span.enter();
stream.flush_and_convert(init_signal, shutdown_signal, &Some(tenant))
},
handle,
);
}
}
}
Expand Down
Loading
Loading