diff --git a/Cargo.toml b/Cargo.toml index 344021102..50940b610 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ name = "parseable" version = "2.8.0" authors = ["Parseable Team "] edition = "2024" -rust-version = "1.88.0" +rust-version = "1.91.0" categories = ["logs", "observability", "metrics", "traces"] build = "build.rs" diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 41a7d6eb4..6f5543783 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -24,16 +24,19 @@ use actix_web::{HttpRequest, HttpResponse, http::header::ContentType}; use arrow_array::RecordBatch; use bytes::Bytes; use chrono::Utc; +use tokio::sync::oneshot; use tracing::error; use crate::event::error::EventError; use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST}; use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry}; use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY}; -use crate::handlers::http::modal::utils::ingest_utils::validate_stream_for_ingestion; +use crate::handlers::http::modal::utils::ingest_utils::{ + ingest_helper, process_otel_content, validate_stream_for_ingestion, +}; use crate::handlers::{ - CONTENT_TYPE_JSON, CONTENT_TYPE_PROTOBUF, DatasetTag, EXTRACT_LOG_KEY, LOG_SOURCE_KEY, - STREAM_NAME_HEADER_KEY, TELEMETRY_TYPE_KEY, TelemetryType, + DatasetTag, EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY, TELEMETRY_TYPE_KEY, + TelemetryType, }; use crate::metadata::SchemaVersion; use crate::metastore::MetastoreError; @@ -81,10 +84,11 @@ pub async fn ingest( .and_then(|h| h.to_str().ok()) .map_or(TelemetryType::default(), TelemetryType::from); - let extract_log = req - .headers() - .get(EXTRACT_LOG_KEY) - .and_then(|h| h.to_str().ok()); + let extract_log = req.headers().get(EXTRACT_LOG_KEY).and_then(|h| { + h.to_str() + .ok() + .map_or_else(|| None, |h| Some(h.to_string())) + }); if matches!( log_source, @@ -97,7 +101,6 @@ pub async fn ingest( } let mut p_custom_fields = get_custom_fields_from_header(&req); - let mut json = json.into_inner(); let fields = match &log_source { @@ -105,7 +108,7 @@ pub async fn ingest( &mut json, &mut p_custom_fields, src, - extract_log, + extract_log.as_deref(), )?, _ => HashSet::new(), }; @@ -129,12 +132,13 @@ pub async fn ingest( e })?; - //if stream exists, fetch the stream log source - //return error if the stream log source is otel traces or otel metrics or otel logs + // if stream exists, fetch the stream log source + // return error if the stream log source is otel traces or otel metrics validate_stream_for_ingestion(&stream_name, &log_source, &tenant_id).map_err(|e| { error!("Ingestion failed for stream {stream_name}: {e}"); e })?; + let stream = PARSEABLE.get_stream(&stream_name, &tenant_id)?; PARSEABLE .add_update_log_source(&stream_name, log_source_entry, &tenant_id) @@ -144,22 +148,27 @@ pub async fn ingest( e })?; - if let Err(e) = flatten_and_push_logs( - json, - &stream_name, - &log_source, - &p_custom_fields, - None, - telemetry_type, - &tenant_id, - ) - .await - { - error!("Ingestion failed for stream {stream_name}: {e}"); - return Err(e); - } + let time_partition = stream.get_time_partition(); - Ok(HttpResponse::Ok().finish()) + let (s, r) = oneshot::channel(); + rayon::spawn(move || { + let res = ingest_helper( + stream_name, + tenant_id, + log_source, + telemetry_type, + p_custom_fields, + json, + time_partition, + ); + let _ = s.send(res); + }); + + if let Err(e) = r.await.map_err(|e| PostError::CustomError(e.to_string()))? { + Err(e) + } else { + Ok(HttpResponse::Ok().finish()) + } } pub async fn ingest_internal_stream( @@ -285,76 +294,6 @@ pub async fn setup_otel_stream( Ok((stream_name, log_source, log_source_entry, time_partition)) } -// Common content processing for OTEL ingestion -async fn process_otel_content( - req: &HttpRequest, - body: web::Bytes, - stream_name: &str, - log_source: &LogSource, - telemetry_type: TelemetryType, -) -> Result<(), PostError> { - let p_custom_fields = get_custom_fields_from_header(req); - - match req - .headers() - .get("Content-Type") - .and_then(|h| h.to_str().ok()) - { - Some(content_type) => { - let tenant_id = get_tenant_id_from_request(req); - if content_type == CONTENT_TYPE_JSON { - let json: serde_json::Value = match serde_json::from_slice(&body) { - Ok(v) => v, - Err(e) => { - error!( - "Ingestion failed for stream {stream_name}: malformed JSON in request body" - ); - return Err(PostError::SerdeError(e)); - } - }; - if let Err(e) = flatten_and_push_logs( - json, - stream_name, - log_source, - &p_custom_fields, - None, - telemetry_type, - &tenant_id, - ) - .await - { - error!("Ingestion failed for stream {stream_name}: {e}"); - return Err(e); - } - } else if content_type == CONTENT_TYPE_PROTOBUF { - error!( - "Ingestion failed for stream {stream_name}: Protobuf ingestion is not supported in Parseable OSS" - ); - return Err(PostError::Invalid(anyhow::anyhow!( - "Ingestion failed for stream {stream_name}: Protobuf ingestion is not supported in Parseable OSS" - ))); - } else { - error!( - "Ingestion failed for stream {stream_name}: Unsupported Content-Type: {content_type}. Expected application/json or application/x-protobuf" - ); - return Err(PostError::Invalid(anyhow::anyhow!( - "Ingestion failed for stream {stream_name}: Unsupported Content-Type: {content_type}. Expected application/json or application/x-protobuf" - ))); - } - } - None => { - error!( - "Ingestion failed for stream {stream_name}: Missing Content-Type header. Expected application/json or application/x-protobuf" - ); - return Err(PostError::Invalid(anyhow::anyhow!( - "Ingestion failed for stream {stream_name}: Missing Content-Type header. Expected application/json or application/x-protobuf" - ))); - } - } - - Ok(()) -} - // Handler for POST /v1/logs to ingest OTEL logs // ingests events by extracting stream name from header // creates if stream does not exist @@ -519,9 +458,7 @@ pub async fn post_event( None, TelemetryType::Logs, &tenant_id, - ) - .await - { + ) { error!("Ingestion failed for stream {stream_name}: {e}"); return Err(e); } diff --git a/src/handlers/http/kinesis.rs b/src/handlers/http/kinesis.rs index b8968fa14..6fc713773 100644 --- a/src/handlers/http/kinesis.rs +++ b/src/handlers/http/kinesis.rs @@ -59,7 +59,7 @@ struct Data { // "requestId": "b858288a-f5d8-4181-a746-3f3dd716be8a", // "timestamp": "1704964113659" // } -pub async fn flatten_kinesis_logs(message: Message) -> Result, anyhow::Error> { +pub fn flatten_kinesis_logs(message: Message) -> Result, anyhow::Error> { let mut vec_kinesis_json = Vec::new(); for record in message.records.iter() { diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 0ecdb0486..e8e1ca74a 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -16,8 +16,8 @@ * */ -use actix_web::HttpRequest; use actix_web::http::header::USER_AGENT; +use actix_web::{HttpRequest, web}; use chrono::Utc; use opentelemetry_proto::tonic::{ logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData, @@ -25,7 +25,8 @@ use opentelemetry_proto::tonic::{ use rayon::iter::{IntoParallelIterator, ParallelIterator}; use serde_json::Value; use std::collections::HashMap; -use tracing::{instrument, warn}; +use tokio::sync::oneshot; +use tracing::{error, instrument, warn}; use crate::{ event::{ @@ -33,7 +34,8 @@ use crate::{ format::{EventFormat, LogSource, json}, }, handlers::{ - EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY, TelemetryType, + CONTENT_TYPE_JSON, CONTENT_TYPE_PROTOBUF, EXTRACT_LOG_KEY, LOG_SOURCE_KEY, + STREAM_NAME_HEADER_KEY, TelemetryType, http::{ ingest::PostError, kinesis::{Message, flatten_kinesis_logs}, @@ -42,13 +44,142 @@ use crate::{ otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces}, parseable::{DEFAULT_TENANT, PARSEABLE}, storage::StreamType, - utils::json::{convert_array_to_object, flatten::convert_to_array}, + utils::{ + get_tenant_id_from_request, + json::{convert_array_to_object, flatten::convert_to_array}, + }, }; const IGNORE_HEADERS: [&str; 3] = [STREAM_NAME_HEADER_KEY, LOG_SOURCE_KEY, EXTRACT_LOG_KEY]; const MAX_CUSTOM_FIELDS: usize = 10; const MAX_FIELD_VALUE_LENGTH: usize = 100; +pub fn ingest_helper( + stream_name: String, + tenant_id: Option, + log_source: LogSource, + telemetry_type: TelemetryType, + p_custom_fields: HashMap, + json: Value, + time_partition: Option, +) -> Result<(), PostError> { + if let Err(e) = flatten_and_push_logs( + json, + &stream_name, + &log_source, + &p_custom_fields, + time_partition, + telemetry_type, + &tenant_id, + ) { + error!("Ingestion failed for stream {stream_name}: {e}"); + return Err(e); + } + + Ok(()) +} + +// Common content processing for OTEL ingestion +pub async fn process_otel_content( + req: &HttpRequest, + body: web::Bytes, + stream_name: &str, + log_source: &LogSource, + telemetry_type: TelemetryType, +) -> Result<(), PostError> { + let p_custom_fields = get_custom_fields_from_header(req); + let content_type = req + .headers() + .get("Content-Type") + .and_then(|h| h.to_str().ok()) + .map(str::to_owned); + let tenant_id = get_tenant_id_from_request(req); + let (s, r) = oneshot::channel(); + let stream_name = stream_name.to_string(); + let log_source = log_source.clone(); + + rayon::spawn(move || { + let r = handle_otel_ingestion( + body, + p_custom_fields, + stream_name, + log_source, + telemetry_type, + content_type, + tenant_id, + ); + let _ = s.send(r); + }); + + if let Err(e) = r.await.map_err(|e| PostError::CustomError(e.to_string()))? { + Err(e) + } else { + Ok(()) + } +} + +#[allow(clippy::too_many_arguments)] +fn handle_otel_ingestion( + body: web::Bytes, + p_custom_fields: HashMap, + stream_name: String, + log_source: LogSource, + telemetry_type: TelemetryType, + content_type: Option, + tenant_id: Option, +) -> Result<(), PostError> { + match content_type { + Some(content_type) => { + if content_type == CONTENT_TYPE_JSON { + let json: serde_json::Value = match serde_json::from_slice(&body) { + Ok(v) => v, + Err(e) => { + error!( + "Ingestion failed for stream {stream_name}: malformed JSON in request body" + ); + return Err(PostError::SerdeError(e)); + } + }; + if let Err(e) = flatten_and_push_logs( + json, + &stream_name, + &log_source, + &p_custom_fields, + None, + telemetry_type, + &tenant_id, + ) { + error!("Ingestion failed for stream {stream_name}: {e}"); + return Err(e); + } + } else if content_type == CONTENT_TYPE_PROTOBUF { + error!( + "Ingestion failed for stream {stream_name}: Protobuf ingestion is not supported in Parseable OSS" + ); + return Err(PostError::Invalid(anyhow::anyhow!( + "Ingestion failed for stream {stream_name}: Protobuf ingestion is not supported in Parseable OSS" + ))); + } else { + error!( + "Ingestion failed for stream {stream_name}: Unsupported Content-Type: {content_type}. Expected application/json or application/x-protobuf" + ); + return Err(PostError::Invalid(anyhow::anyhow!( + "Ingestion failed for stream {stream_name}: Unsupported Content-Type: {content_type}. Expected application/json or application/x-protobuf" + ))); + } + } + None => { + error!( + "Ingestion failed for stream {stream_name}: Missing Content-Type header. Expected application/json or application/x-protobuf" + ); + return Err(PostError::Invalid(anyhow::anyhow!( + "Ingestion failed for stream {stream_name}: Missing Content-Type header. Expected application/json or application/x-protobuf" + ))); + } + } + Ok(()) +} + #[instrument( name = "flatten_and_push_logs", level = "info", @@ -62,7 +193,7 @@ const MAX_FIELD_VALUE_LENGTH: usize = 100; ), fields(stream_name) )] -pub async fn flatten_and_push_logs( +pub fn flatten_and_push_logs( json: Value, stream_name: &str, log_source: &LogSource, @@ -79,7 +210,7 @@ pub async fn flatten_and_push_logs( LogSource::Kinesis => { //custom flattening required for Amazon Kinesis let message: Message = serde_json::from_value(json)?; - let flattened_kinesis_data = flatten_kinesis_logs(message).await?; + let flattened_kinesis_data = flatten_kinesis_logs(message)?; let record = convert_to_array(flattened_kinesis_data)?; push_logs( stream_name, diff --git a/src/otel/otel_utils.rs b/src/otel/otel_utils.rs index 6caec1f21..0662d7b77 100644 --- a/src/otel/otel_utils.rs +++ b/src/otel/otel_utils.rs @@ -140,7 +140,7 @@ pub fn collect_json_from_anyvalue(key: &str, value: AnyValue) -> Map, key: &str) -> Map { let mut value_json: Map = Map::with_capacity(1); diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index c37e5204d..7295a2a82 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -80,6 +80,17 @@ static ONE_PARQUET_PER_ARROW: Lazy = Lazy::new(|| { } }); +const ENABLE_MEMORY_STAGING_VAR: &str = "ENABLE_MEMORY_STAGING"; +pub static ENABLE_MEMORY_STAGING: Lazy = Lazy::new(|| { + if let Ok(var) = std::env::var(ENABLE_MEMORY_STAGING_VAR) + && let Ok(var) = var.parse::() + { + var + } else { + true + } +}); + #[derive(Default)] pub struct Writer { pub mem: MemWriter<4096>, @@ -245,7 +256,13 @@ impl DiskWriter { range: TimeRange, ) -> Result { let mut path = path.into(); - path.set_extension(PART_FILE_EXTENSION); + // a rudimentary way to ensure one parquet per arrow file + if *ONE_PARQUET_PER_ARROW { + path.set_extension(Ulid::new().to_string()); + path.add_extension(PART_FILE_EXTENSION); + } else { + path.set_extension(PART_FILE_EXTENSION); + } let file = OpenOptions::new() .write(true) .truncate(true) @@ -292,15 +309,7 @@ impl Drop for DiskWriter { let mut arrow_path = self.path.to_owned(); - // a rudimentary way to ensure one parquet per arrow file - if *ONE_PARQUET_PER_ARROW { - arrow_path.set_extension(Ulid::new().to_string()); - #[allow(clippy::incompatible_msrv)] - arrow_path.add_extension(ARROW_FILE_EXTENSION); - } else { - arrow_path.set_extension(ARROW_FILE_EXTENSION); - } - + arrow_path.set_extension(ARROW_FILE_EXTENSION); // If file exists, append a random string before .date to avoid overwriting if arrow_path.exists() { let file_name = arrow_path.file_name().unwrap().to_string_lossy(); diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index eb6d1a247..d17b51a51 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -61,7 +61,7 @@ use crate::{ metadata::{LogStreamMetadata, SchemaVersion}, metrics, option::Mode, - parseable::{DEFAULT_TENANT, PARSEABLE}, + parseable::{DEFAULT_TENANT, PARSEABLE, staging::writer::ENABLE_MEMORY_STAGING}, storage::{StreamType, object_storage::to_bytes, retention::Retention}, sync::FLUSH_AND_CONVERT_RUNTIME, utils::time::{Minute, TimeRange}, @@ -202,7 +202,9 @@ impl Stream { guard.push_disk(filename, record, file_path, range, *DISK_WRITE_BATCH_ROWS)?; } - guard.mem.push(schema_key, record)?; + if *ENABLE_MEMORY_STAGING { + guard.mem.push(schema_key, record)?; + } Ok(()) } @@ -565,7 +567,11 @@ impl Stream { ))) })?; - writer.mem.recordbatch_cloned(schema) + if *ENABLE_MEMORY_STAGING { + writer.mem.recordbatch_cloned(schema) + } else { + Ok(Vec::new()) + } } pub fn clear(&self) -> Result<(), StagingError> { @@ -596,7 +602,9 @@ impl Stream { ))) })?; // why clean Writer.MemWriter? - writer.mem.clear(); + if *ENABLE_MEMORY_STAGING { + writer.mem.clear(); + } writer.take_flushable_disk(forced) }; pending_writes.flush_into(&mut stale_writers, &self.data_path)?;