From ba7a17efca9bbd4568badbf4ba6c9f55adeb8123 Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Tue, 9 Jun 2026 13:43:28 +0530 Subject: [PATCH 1/5] Remove async - make MemWriter optional - remove unnecessary async --- src/handlers/http/ingest.rs | 12 ++------ src/handlers/http/kinesis.rs | 2 +- src/handlers/http/modal/utils/ingest_utils.rs | 4 +-- src/parseable/staging/writer.rs | 30 ++++++++++++------- src/parseable/streams.rs | 16 +++++++--- 5 files changed, 38 insertions(+), 26 deletions(-) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 41a7d6eb4..41a41532d 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -152,9 +152,7 @@ pub async fn ingest( None, telemetry_type, &tenant_id, - ) - .await - { + ) { error!("Ingestion failed for stream {stream_name}: {e}"); return Err(e); } @@ -320,9 +318,7 @@ async fn process_otel_content( None, telemetry_type, &tenant_id, - ) - .await - { + ) { error!("Ingestion failed for stream {stream_name}: {e}"); return Err(e); } @@ -519,9 +515,7 @@ pub async fn post_event( None, TelemetryType::Logs, &tenant_id, - ) - .await - { + ) { error!("Ingestion failed for stream {stream_name}: {e}"); return Err(e); } diff --git a/src/handlers/http/kinesis.rs b/src/handlers/http/kinesis.rs index b8968fa14..6fc713773 100644 --- a/src/handlers/http/kinesis.rs +++ b/src/handlers/http/kinesis.rs @@ -59,7 +59,7 @@ struct Data { // "requestId": "b858288a-f5d8-4181-a746-3f3dd716be8a", // "timestamp": "1704964113659" // } -pub async fn flatten_kinesis_logs(message: Message) -> Result, anyhow::Error> { +pub fn flatten_kinesis_logs(message: Message) -> Result, anyhow::Error> { let mut vec_kinesis_json = Vec::new(); for record in message.records.iter() { diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 0ecdb0486..e41576218 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -62,7 +62,7 @@ const MAX_FIELD_VALUE_LENGTH: usize = 100; ), fields(stream_name) )] -pub async fn flatten_and_push_logs( +pub fn flatten_and_push_logs( json: Value, stream_name: &str, log_source: &LogSource, @@ -79,7 +79,7 @@ pub async fn flatten_and_push_logs( LogSource::Kinesis => { //custom flattening required for Amazon Kinesis let message: Message = serde_json::from_value(json)?; - let flattened_kinesis_data = flatten_kinesis_logs(message).await?; + let flattened_kinesis_data = flatten_kinesis_logs(message)?; let record = convert_to_array(flattened_kinesis_data)?; push_logs( stream_name, diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index c37e5204d..b90cd7ef0 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -80,6 +80,17 @@ static ONE_PARQUET_PER_ARROW: Lazy = Lazy::new(|| { } }); +const ENABLE_MEMORY_STAGING_VAR: &str = "ENABLE_MEMORY_STAGING"; +pub static ENABLE_MEMORY_STAGING: Lazy = Lazy::new(|| { + if let Ok(var) = std::env::var(ENABLE_MEMORY_STAGING_VAR) + && let Ok(var) = var.parse::() + { + var + } else { + true + } +}); + #[derive(Default)] pub struct Writer { pub mem: MemWriter<4096>, @@ -245,7 +256,14 @@ impl DiskWriter { range: TimeRange, ) -> Result { let mut path = path.into(); - path.set_extension(PART_FILE_EXTENSION); + // a rudimentary way to ensure one parquet per arrow file + if *ONE_PARQUET_PER_ARROW { + path.set_extension(Ulid::new().to_string()); + #[allow(clippy::incompatible_msrv)] + path.add_extension(PART_FILE_EXTENSION); + } else { + path.set_extension(PART_FILE_EXTENSION); + } let file = OpenOptions::new() .write(true) .truncate(true) @@ -292,15 +310,7 @@ impl Drop for DiskWriter { let mut arrow_path = self.path.to_owned(); - // a rudimentary way to ensure one parquet per arrow file - if *ONE_PARQUET_PER_ARROW { - arrow_path.set_extension(Ulid::new().to_string()); - #[allow(clippy::incompatible_msrv)] - arrow_path.add_extension(ARROW_FILE_EXTENSION); - } else { - arrow_path.set_extension(ARROW_FILE_EXTENSION); - } - + arrow_path.set_extension(ARROW_FILE_EXTENSION); // If file exists, append a random string before .date to avoid overwriting if arrow_path.exists() { let file_name = arrow_path.file_name().unwrap().to_string_lossy(); diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index eb6d1a247..d17b51a51 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -61,7 +61,7 @@ use crate::{ metadata::{LogStreamMetadata, SchemaVersion}, metrics, option::Mode, - parseable::{DEFAULT_TENANT, PARSEABLE}, + parseable::{DEFAULT_TENANT, PARSEABLE, staging::writer::ENABLE_MEMORY_STAGING}, storage::{StreamType, object_storage::to_bytes, retention::Retention}, sync::FLUSH_AND_CONVERT_RUNTIME, utils::time::{Minute, TimeRange}, @@ -202,7 +202,9 @@ impl Stream { guard.push_disk(filename, record, file_path, range, *DISK_WRITE_BATCH_ROWS)?; } - guard.mem.push(schema_key, record)?; + if *ENABLE_MEMORY_STAGING { + guard.mem.push(schema_key, record)?; + } Ok(()) } @@ -565,7 +567,11 @@ impl Stream { ))) })?; - writer.mem.recordbatch_cloned(schema) + if *ENABLE_MEMORY_STAGING { + writer.mem.recordbatch_cloned(schema) + } else { + Ok(Vec::new()) + } } pub fn clear(&self) -> Result<(), StagingError> { @@ -596,7 +602,9 @@ impl Stream { ))) })?; // why clean Writer.MemWriter? - writer.mem.clear(); + if *ENABLE_MEMORY_STAGING { + writer.mem.clear(); + } writer.take_flushable_disk(forced) }; pending_writes.flush_into(&mut stale_writers, &self.data_path)?; From 58aa5d2dfe40d7d96ebcdd0a0b947d9795c6cf98 Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Tue, 9 Jun 2026 16:27:44 +0530 Subject: [PATCH 2/5] separate ingestion runtime --- src/handlers/http/ingest.rs | 127 +++++----------- src/handlers/http/modal/utils/ingest_utils.rs | 139 +++++++++++++++++- src/otel/otel_utils.rs | 2 +- 3 files changed, 171 insertions(+), 97 deletions(-) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 41a41532d..6f5543783 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -24,16 +24,19 @@ use actix_web::{HttpRequest, HttpResponse, http::header::ContentType}; use arrow_array::RecordBatch; use bytes::Bytes; use chrono::Utc; +use tokio::sync::oneshot; use tracing::error; use crate::event::error::EventError; use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST}; use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry}; use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY}; -use crate::handlers::http::modal::utils::ingest_utils::validate_stream_for_ingestion; +use crate::handlers::http::modal::utils::ingest_utils::{ + ingest_helper, process_otel_content, validate_stream_for_ingestion, +}; use crate::handlers::{ - CONTENT_TYPE_JSON, CONTENT_TYPE_PROTOBUF, DatasetTag, EXTRACT_LOG_KEY, LOG_SOURCE_KEY, - STREAM_NAME_HEADER_KEY, TELEMETRY_TYPE_KEY, TelemetryType, + DatasetTag, EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY, TELEMETRY_TYPE_KEY, + TelemetryType, }; use crate::metadata::SchemaVersion; use crate::metastore::MetastoreError; @@ -81,10 +84,11 @@ pub async fn ingest( .and_then(|h| h.to_str().ok()) .map_or(TelemetryType::default(), TelemetryType::from); - let extract_log = req - .headers() - .get(EXTRACT_LOG_KEY) - .and_then(|h| h.to_str().ok()); + let extract_log = req.headers().get(EXTRACT_LOG_KEY).and_then(|h| { + h.to_str() + .ok() + .map_or_else(|| None, |h| Some(h.to_string())) + }); if matches!( log_source, @@ -97,7 +101,6 @@ pub async fn ingest( } let mut p_custom_fields = get_custom_fields_from_header(&req); - let mut json = json.into_inner(); let fields = match &log_source { @@ -105,7 +108,7 @@ pub async fn ingest( &mut json, &mut p_custom_fields, src, - extract_log, + extract_log.as_deref(), )?, _ => HashSet::new(), }; @@ -129,12 +132,13 @@ pub async fn ingest( e })?; - //if stream exists, fetch the stream log source - //return error if the stream log source is otel traces or otel metrics or otel logs + // if stream exists, fetch the stream log source + // return error if the stream log source is otel traces or otel metrics validate_stream_for_ingestion(&stream_name, &log_source, &tenant_id).map_err(|e| { error!("Ingestion failed for stream {stream_name}: {e}"); e })?; + let stream = PARSEABLE.get_stream(&stream_name, &tenant_id)?; PARSEABLE .add_update_log_source(&stream_name, log_source_entry, &tenant_id) @@ -144,20 +148,27 @@ pub async fn ingest( e })?; - if let Err(e) = flatten_and_push_logs( - json, - &stream_name, - &log_source, - &p_custom_fields, - None, - telemetry_type, - &tenant_id, - ) { - error!("Ingestion failed for stream {stream_name}: {e}"); - return Err(e); - } + let time_partition = stream.get_time_partition(); - Ok(HttpResponse::Ok().finish()) + let (s, r) = oneshot::channel(); + rayon::spawn(move || { + let res = ingest_helper( + stream_name, + tenant_id, + log_source, + telemetry_type, + p_custom_fields, + json, + time_partition, + ); + let _ = s.send(res); + }); + + if let Err(e) = r.await.map_err(|e| PostError::CustomError(e.to_string()))? { + Err(e) + } else { + Ok(HttpResponse::Ok().finish()) + } } pub async fn ingest_internal_stream( @@ -283,74 +294,6 @@ pub async fn setup_otel_stream( Ok((stream_name, log_source, log_source_entry, time_partition)) } -// Common content processing for OTEL ingestion -async fn process_otel_content( - req: &HttpRequest, - body: web::Bytes, - stream_name: &str, - log_source: &LogSource, - telemetry_type: TelemetryType, -) -> Result<(), PostError> { - let p_custom_fields = get_custom_fields_from_header(req); - - match req - .headers() - .get("Content-Type") - .and_then(|h| h.to_str().ok()) - { - Some(content_type) => { - let tenant_id = get_tenant_id_from_request(req); - if content_type == CONTENT_TYPE_JSON { - let json: serde_json::Value = match serde_json::from_slice(&body) { - Ok(v) => v, - Err(e) => { - error!( - "Ingestion failed for stream {stream_name}: malformed JSON in request body" - ); - return Err(PostError::SerdeError(e)); - } - }; - if let Err(e) = flatten_and_push_logs( - json, - stream_name, - log_source, - &p_custom_fields, - None, - telemetry_type, - &tenant_id, - ) { - error!("Ingestion failed for stream {stream_name}: {e}"); - return Err(e); - } - } else if content_type == CONTENT_TYPE_PROTOBUF { - error!( - "Ingestion failed for stream {stream_name}: Protobuf ingestion is not supported in Parseable OSS" - ); - return Err(PostError::Invalid(anyhow::anyhow!( - "Ingestion failed for stream {stream_name}: Protobuf ingestion is not supported in Parseable OSS" - ))); - } else { - error!( - "Ingestion failed for stream {stream_name}: Unsupported Content-Type: {content_type}. Expected application/json or application/x-protobuf" - ); - return Err(PostError::Invalid(anyhow::anyhow!( - "Ingestion failed for stream {stream_name}: Unsupported Content-Type: {content_type}. Expected application/json or application/x-protobuf" - ))); - } - } - None => { - error!( - "Ingestion failed for stream {stream_name}: Missing Content-Type header. Expected application/json or application/x-protobuf" - ); - return Err(PostError::Invalid(anyhow::anyhow!( - "Ingestion failed for stream {stream_name}: Missing Content-Type header. Expected application/json or application/x-protobuf" - ))); - } - } - - Ok(()) -} - // Handler for POST /v1/logs to ingest OTEL logs // ingests events by extracting stream name from header // creates if stream does not exist diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index e41576218..e8e1ca74a 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -16,8 +16,8 @@ * */ -use actix_web::HttpRequest; use actix_web::http::header::USER_AGENT; +use actix_web::{HttpRequest, web}; use chrono::Utc; use opentelemetry_proto::tonic::{ logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData, @@ -25,7 +25,8 @@ use opentelemetry_proto::tonic::{ use rayon::iter::{IntoParallelIterator, ParallelIterator}; use serde_json::Value; use std::collections::HashMap; -use tracing::{instrument, warn}; +use tokio::sync::oneshot; +use tracing::{error, instrument, warn}; use crate::{ event::{ @@ -33,7 +34,8 @@ use crate::{ format::{EventFormat, LogSource, json}, }, handlers::{ - EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY, TelemetryType, + CONTENT_TYPE_JSON, CONTENT_TYPE_PROTOBUF, EXTRACT_LOG_KEY, LOG_SOURCE_KEY, + STREAM_NAME_HEADER_KEY, TelemetryType, http::{ ingest::PostError, kinesis::{Message, flatten_kinesis_logs}, @@ -42,13 +44,142 @@ use crate::{ otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces}, parseable::{DEFAULT_TENANT, PARSEABLE}, storage::StreamType, - utils::json::{convert_array_to_object, flatten::convert_to_array}, + utils::{ + get_tenant_id_from_request, + json::{convert_array_to_object, flatten::convert_to_array}, + }, }; const IGNORE_HEADERS: [&str; 3] = [STREAM_NAME_HEADER_KEY, LOG_SOURCE_KEY, EXTRACT_LOG_KEY]; const MAX_CUSTOM_FIELDS: usize = 10; const MAX_FIELD_VALUE_LENGTH: usize = 100; +pub fn ingest_helper( + stream_name: String, + tenant_id: Option, + log_source: LogSource, + telemetry_type: TelemetryType, + p_custom_fields: HashMap, + json: Value, + time_partition: Option, +) -> Result<(), PostError> { + if let Err(e) = flatten_and_push_logs( + json, + &stream_name, + &log_source, + &p_custom_fields, + time_partition, + telemetry_type, + &tenant_id, + ) { + error!("Ingestion failed for stream {stream_name}: {e}"); + return Err(e); + } + + Ok(()) +} + +// Common content processing for OTEL ingestion +pub async fn process_otel_content( + req: &HttpRequest, + body: web::Bytes, + stream_name: &str, + log_source: &LogSource, + telemetry_type: TelemetryType, +) -> Result<(), PostError> { + let p_custom_fields = get_custom_fields_from_header(req); + let content_type = req + .headers() + .get("Content-Type") + .and_then(|h| h.to_str().ok()) + .map(str::to_owned); + let tenant_id = get_tenant_id_from_request(req); + let (s, r) = oneshot::channel(); + let stream_name = stream_name.to_string(); + let log_source = log_source.clone(); + + rayon::spawn(move || { + let r = handle_otel_ingestion( + body, + p_custom_fields, + stream_name, + log_source, + telemetry_type, + content_type, + tenant_id, + ); + let _ = s.send(r); + }); + + if let Err(e) = r.await.map_err(|e| PostError::CustomError(e.to_string()))? { + Err(e) + } else { + Ok(()) + } +} + +#[allow(clippy::too_many_arguments)] +fn handle_otel_ingestion( + body: web::Bytes, + p_custom_fields: HashMap, + stream_name: String, + log_source: LogSource, + telemetry_type: TelemetryType, + content_type: Option, + tenant_id: Option, +) -> Result<(), PostError> { + match content_type { + Some(content_type) => { + if content_type == CONTENT_TYPE_JSON { + let json: serde_json::Value = match serde_json::from_slice(&body) { + Ok(v) => v, + Err(e) => { + error!( + "Ingestion failed for stream {stream_name}: malformed JSON in request body" + ); + return Err(PostError::SerdeError(e)); + } + }; + if let Err(e) = flatten_and_push_logs( + json, + &stream_name, + &log_source, + &p_custom_fields, + None, + telemetry_type, + &tenant_id, + ) { + error!("Ingestion failed for stream {stream_name}: {e}"); + return Err(e); + } + } else if content_type == CONTENT_TYPE_PROTOBUF { + error!( + "Ingestion failed for stream {stream_name}: Protobuf ingestion is not supported in Parseable OSS" + ); + return Err(PostError::Invalid(anyhow::anyhow!( + "Ingestion failed for stream {stream_name}: Protobuf ingestion is not supported in Parseable OSS" + ))); + } else { + error!( + "Ingestion failed for stream {stream_name}: Unsupported Content-Type: {content_type}. Expected application/json or application/x-protobuf" + ); + return Err(PostError::Invalid(anyhow::anyhow!( + "Ingestion failed for stream {stream_name}: Unsupported Content-Type: {content_type}. Expected application/json or application/x-protobuf" + ))); + } + } + None => { + error!( + "Ingestion failed for stream {stream_name}: Missing Content-Type header. Expected application/json or application/x-protobuf" + ); + return Err(PostError::Invalid(anyhow::anyhow!( + "Ingestion failed for stream {stream_name}: Missing Content-Type header. Expected application/json or application/x-protobuf" + ))); + } + } + Ok(()) +} + #[instrument( name = "flatten_and_push_logs", level = "info", diff --git a/src/otel/otel_utils.rs b/src/otel/otel_utils.rs index 6caec1f21..0662d7b77 100644 --- a/src/otel/otel_utils.rs +++ b/src/otel/otel_utils.rs @@ -140,7 +140,7 @@ pub fn collect_json_from_anyvalue(key: &str, value: AnyValue) -> Map, key: &str) -> Map { let mut value_json: Map = Map::with_capacity(1); From a92e2eee2af6af4c46aea39b38cec42c557ff315 Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Tue, 9 Jun 2026 16:31:36 +0530 Subject: [PATCH 3/5] bump MSRV --- Cargo.toml | 2 +- src/parseable/staging/writer.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 344021102..50940b610 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ name = "parseable" version = "2.8.0" authors = ["Parseable Team "] edition = "2024" -rust-version = "1.88.0" +rust-version = "1.91.0" categories = ["logs", "observability", "metrics", "traces"] build = "build.rs" diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index b90cd7ef0..7295a2a82 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -259,7 +259,6 @@ impl DiskWriter { // a rudimentary way to ensure one parquet per arrow file if *ONE_PARQUET_PER_ARROW { path.set_extension(Ulid::new().to_string()); - #[allow(clippy::incompatible_msrv)] path.add_extension(PART_FILE_EXTENSION); } else { path.set_extension(PART_FILE_EXTENSION); From f756d6d8200dcdee2009f438bf117a44629f79bd Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Wed, 10 Jun 2026 10:53:13 +0530 Subject: [PATCH 4/5] make MemWriter optional --- src/parseable/staging/writer.rs | 37 ++++++++++++++++++++++++--------- src/parseable/streams.rs | 26 +++++++++++++---------- 2 files changed, 42 insertions(+), 21 deletions(-) diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index 7295a2a82..bb1afa0f5 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -69,35 +69,50 @@ static ARROW_FLUSH_SIZE_LIMIT: Lazy = Lazy::new(|| { } }); -const ONE_PARQUET_PER_ARROW_VAR: &str = "ONE_PARQUET_PER_ARROW"; -static ONE_PARQUET_PER_ARROW: Lazy = Lazy::new(|| { - if let Ok(var) = std::env::var(ONE_PARQUET_PER_ARROW_VAR) +const ENABLE_MEMORY_STAGING_VAR: &str = "ENABLE_MEMORY_STAGING"; +pub static ENABLE_MEMORY_STAGING: Lazy = Lazy::new(|| { + if let Ok(var) = std::env::var(ENABLE_MEMORY_STAGING_VAR) && let Ok(var) = var.parse::() { var } else { - false + true } }); -const ENABLE_MEMORY_STAGING_VAR: &str = "ENABLE_MEMORY_STAGING"; -pub static ENABLE_MEMORY_STAGING: Lazy = Lazy::new(|| { - if let Ok(var) = std::env::var(ENABLE_MEMORY_STAGING_VAR) +const ONE_PARQUET_PER_ARROW_VAR: &str = "ONE_PARQUET_PER_ARROW"; +static ONE_PARQUET_PER_ARROW: Lazy = Lazy::new(|| { + if let Ok(var) = std::env::var(ONE_PARQUET_PER_ARROW_VAR) && let Ok(var) = var.parse::() { var } else { - true + false } }); -#[derive(Default)] +// #[derive(Default)] pub struct Writer { - pub mem: MemWriter<4096>, + pub mem: Option>, pub disk: HashMap, disk_pending: HashMap, } +impl Default for Writer { + fn default() -> Self { + let mem = if *ENABLE_MEMORY_STAGING { + Some(MemWriter::default()) + } else { + None + }; + Self { + mem, + disk: HashMap::default(), + disk_pending: HashMap::default(), + } + } +} + impl Writer { #[cfg_attr(feature = "hotpath", hotpath::measure)] pub fn push_disk( @@ -256,6 +271,7 @@ impl DiskWriter { range: TimeRange, ) -> Result { let mut path = path.into(); + // a rudimentary way to ensure one parquet per arrow file if *ONE_PARQUET_PER_ARROW { path.set_extension(Ulid::new().to_string()); @@ -263,6 +279,7 @@ impl DiskWriter { } else { path.set_extension(PART_FILE_EXTENSION); } + let file = OpenOptions::new() .write(true) .truncate(true) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index d17b51a51..d9a9ae1b8 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -61,7 +61,7 @@ use crate::{ metadata::{LogStreamMetadata, SchemaVersion}, metrics, option::Mode, - parseable::{DEFAULT_TENANT, PARSEABLE, staging::writer::ENABLE_MEMORY_STAGING}, + parseable::{DEFAULT_TENANT, PARSEABLE}, storage::{StreamType, object_storage::to_bytes, retention::Retention}, sync::FLUSH_AND_CONVERT_RUNTIME, utils::time::{Minute, TimeRange}, @@ -202,8 +202,8 @@ impl Stream { guard.push_disk(filename, record, file_path, range, *DISK_WRITE_BATCH_ROWS)?; } - if *ENABLE_MEMORY_STAGING { - guard.mem.push(schema_key, record)?; + if let Some(mem) = guard.mem.as_mut() { + mem.push(schema_key, record)?; } Ok(()) @@ -560,22 +560,23 @@ impl Stream { &self, schema: &Arc, ) -> Result, StagingError> { - let writer = self.writer.lock().map_err(|poisoned| { + let mut writer = self.writer.lock().map_err(|poisoned| { StagingError::PoisonError(PoisonError::new(format!( "Writer lock poisoned while cloning record batches for stream {} - {}", self.stream_name, poisoned ))) })?; - if *ENABLE_MEMORY_STAGING { - writer.mem.recordbatch_cloned(schema) + if let Some(mem) = writer.mem.as_mut() { + mem.recordbatch_cloned(schema) } else { Ok(Vec::new()) } } pub fn clear(&self) -> Result<(), StagingError> { - self.writer + if let Some(m) = self + .writer .lock() .map_err(|poisoned| { StagingError::PoisonError(PoisonError::new(format!( @@ -584,7 +585,10 @@ impl Stream { ))) })? .mem - .clear(); + .as_mut() + { + m.clear() + } Ok(()) } @@ -601,9 +605,9 @@ impl Stream { self.stream_name, poisoned ))) })?; - // why clean Writer.MemWriter? - if *ENABLE_MEMORY_STAGING { - writer.mem.clear(); + + if let Some(mem) = writer.mem.as_mut() { + mem.clear(); } writer.take_flushable_disk(forced) }; From 5a910dfb6f412a9a60434aa0bb2f755b4f93eeb9 Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Wed, 10 Jun 2026 12:35:25 +0530 Subject: [PATCH 5/5] docker image versions bump --- Dockerfile | 2 +- Dockerfile.debug | 2 +- Dockerfile.dev | 2 +- Dockerfile.kafka | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Dockerfile b/Dockerfile index 920390bc5..1be4b2fb7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,7 +14,7 @@ # along with this program. If not, see . # build stage -FROM rust:1.93.0-bookworm AS builder +FROM rust:1.96.0-bookworm AS builder LABEL org.opencontainers.image.title="Parseable" LABEL maintainer="Parseable Team " diff --git a/Dockerfile.debug b/Dockerfile.debug index 4a7802400..b6b4443e8 100644 --- a/Dockerfile.debug +++ b/Dockerfile.debug @@ -14,7 +14,7 @@ # along with this program. If not, see . # build stage -FROM docker.io/rust:1.93.0-bookworm AS builder +FROM docker.io/rust:1.96.0-bookworm AS builder LABEL org.opencontainers.image.title="Parseable" LABEL maintainer="Parseable Team " diff --git a/Dockerfile.dev b/Dockerfile.dev index d077819f1..d4f5abe7e 100644 --- a/Dockerfile.dev +++ b/Dockerfile.dev @@ -14,7 +14,7 @@ # along with this program. If not, see . # build stage -FROM rust:1.93.0-bookworm AS builder +FROM rust:1.96.0-bookworm AS builder LABEL org.opencontainers.image.title="Parseable" LABEL maintainer="Parseable Team " diff --git a/Dockerfile.kafka b/Dockerfile.kafka index e8bb88553..f5ced645a 100644 --- a/Dockerfile.kafka +++ b/Dockerfile.kafka @@ -14,7 +14,7 @@ # along with this program. If not, see . # build stage -FROM rust:1.93.0-bookworm AS builder +FROM rust:1.96.0-bookworm AS builder LABEL org.opencontainers.image.title="Parseable" LABEL maintainer="Parseable Team "