Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "parseable"
version = "2.8.0"
authors = ["Parseable Team <hi@parseable.com>"]
edition = "2024"
rust-version = "1.88.0"
rust-version = "1.91.0"
categories = ["logs", "observability", "metrics", "traces"]
build = "build.rs"

Expand Down
135 changes: 36 additions & 99 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,19 @@ use actix_web::{HttpRequest, HttpResponse, http::header::ContentType};
use arrow_array::RecordBatch;
use bytes::Bytes;
use chrono::Utc;
use tokio::sync::oneshot;
use tracing::error;

use crate::event::error::EventError;
use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST};
use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry};
use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY};
use crate::handlers::http::modal::utils::ingest_utils::validate_stream_for_ingestion;
use crate::handlers::http::modal::utils::ingest_utils::{
ingest_helper, process_otel_content, validate_stream_for_ingestion,
};
use crate::handlers::{
CONTENT_TYPE_JSON, CONTENT_TYPE_PROTOBUF, DatasetTag, EXTRACT_LOG_KEY, LOG_SOURCE_KEY,
STREAM_NAME_HEADER_KEY, TELEMETRY_TYPE_KEY, TelemetryType,
DatasetTag, EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY, TELEMETRY_TYPE_KEY,
TelemetryType,
};
use crate::metadata::SchemaVersion;
use crate::metastore::MetastoreError;
Expand Down Expand Up @@ -81,10 +84,11 @@ pub async fn ingest(
.and_then(|h| h.to_str().ok())
.map_or(TelemetryType::default(), TelemetryType::from);

let extract_log = req
.headers()
.get(EXTRACT_LOG_KEY)
.and_then(|h| h.to_str().ok());
let extract_log = req.headers().get(EXTRACT_LOG_KEY).and_then(|h| {
h.to_str()
.ok()
.map_or_else(|| None, |h| Some(h.to_string()))
});

if matches!(
log_source,
Expand All @@ -97,15 +101,14 @@ pub async fn ingest(
}

let mut p_custom_fields = get_custom_fields_from_header(&req);

let mut json = json.into_inner();

let fields = match &log_source {
LogSource::Custom(src) => KNOWN_SCHEMA_LIST.extract_from_inline_log(
&mut json,
&mut p_custom_fields,
src,
extract_log,
extract_log.as_deref(),
)?,
_ => HashSet::new(),
};
Expand All @@ -129,12 +132,13 @@ pub async fn ingest(
e
})?;

//if stream exists, fetch the stream log source
//return error if the stream log source is otel traces or otel metrics or otel logs
// if stream exists, fetch the stream log source
// return error if the stream log source is otel traces or otel metrics
validate_stream_for_ingestion(&stream_name, &log_source, &tenant_id).map_err(|e| {
error!("Ingestion failed for stream {stream_name}: {e}");
e
})?;
let stream = PARSEABLE.get_stream(&stream_name, &tenant_id)?;

PARSEABLE
.add_update_log_source(&stream_name, log_source_entry, &tenant_id)
Expand All @@ -144,22 +148,27 @@ pub async fn ingest(
e
})?;

if let Err(e) = flatten_and_push_logs(
json,
&stream_name,
&log_source,
&p_custom_fields,
None,
telemetry_type,
&tenant_id,
)
.await
{
error!("Ingestion failed for stream {stream_name}: {e}");
return Err(e);
}
let time_partition = stream.get_time_partition();

Ok(HttpResponse::Ok().finish())
let (s, r) = oneshot::channel();
rayon::spawn(move || {
let res = ingest_helper(
stream_name,
tenant_id,
log_source,
telemetry_type,
p_custom_fields,
json,
time_partition,
);
let _ = s.send(res);
});

if let Err(e) = r.await.map_err(|e| PostError::CustomError(e.to_string()))? {
Err(e)
} else {
Ok(HttpResponse::Ok().finish())
}
}

pub async fn ingest_internal_stream(
Expand Down Expand Up @@ -285,76 +294,6 @@ pub async fn setup_otel_stream(
Ok((stream_name, log_source, log_source_entry, time_partition))
}

// Common content processing for OTEL ingestion
async fn process_otel_content(
req: &HttpRequest,
body: web::Bytes,
stream_name: &str,
log_source: &LogSource,
telemetry_type: TelemetryType,
) -> Result<(), PostError> {
let p_custom_fields = get_custom_fields_from_header(req);

match req
.headers()
.get("Content-Type")
.and_then(|h| h.to_str().ok())
{
Some(content_type) => {
let tenant_id = get_tenant_id_from_request(req);
if content_type == CONTENT_TYPE_JSON {
let json: serde_json::Value = match serde_json::from_slice(&body) {
Ok(v) => v,
Err(e) => {
error!(
"Ingestion failed for stream {stream_name}: malformed JSON in request body"
);
return Err(PostError::SerdeError(e));
}
};
if let Err(e) = flatten_and_push_logs(
json,
stream_name,
log_source,
&p_custom_fields,
None,
telemetry_type,
&tenant_id,
)
.await
{
error!("Ingestion failed for stream {stream_name}: {e}");
return Err(e);
}
} else if content_type == CONTENT_TYPE_PROTOBUF {
error!(
"Ingestion failed for stream {stream_name}: Protobuf ingestion is not supported in Parseable OSS"
);
return Err(PostError::Invalid(anyhow::anyhow!(
"Ingestion failed for stream {stream_name}: Protobuf ingestion is not supported in Parseable OSS"
)));
} else {
error!(
"Ingestion failed for stream {stream_name}: Unsupported Content-Type: {content_type}. Expected application/json or application/x-protobuf"
);
return Err(PostError::Invalid(anyhow::anyhow!(
"Ingestion failed for stream {stream_name}: Unsupported Content-Type: {content_type}. Expected application/json or application/x-protobuf"
)));
}
}
None => {
error!(
"Ingestion failed for stream {stream_name}: Missing Content-Type header. Expected application/json or application/x-protobuf"
);
return Err(PostError::Invalid(anyhow::anyhow!(
"Ingestion failed for stream {stream_name}: Missing Content-Type header. Expected application/json or application/x-protobuf"
)));
}
}

Ok(())
}

// Handler for POST /v1/logs to ingest OTEL logs
// ingests events by extracting stream name from header
// creates if stream does not exist
Expand Down Expand Up @@ -519,9 +458,7 @@ pub async fn post_event(
None,
TelemetryType::Logs,
&tenant_id,
)
.await
{
) {
error!("Ingestion failed for stream {stream_name}: {e}");
return Err(e);
}
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ struct Data {
// "requestId": "b858288a-f5d8-4181-a746-3f3dd716be8a",
// "timestamp": "1704964113659"
// }
pub async fn flatten_kinesis_logs(message: Message) -> Result<Vec<Value>, anyhow::Error> {
pub fn flatten_kinesis_logs(message: Message) -> Result<Vec<Value>, anyhow::Error> {
let mut vec_kinesis_json = Vec::new();

for record in message.records.iter() {
Expand Down
Loading
Loading