feat(connectors): add OTLP gRPC source connector#3516
Conversation
|
Thanks for the PR. It is labeled Slash commands (own line, regular comment) move it around the queue:
See CONTRIBUTING.md for details. |
Adds iggy_connector_otlp_source, a cdylib connector plugin that binds port 4317 (OTLP standard) and receives logs, metrics, and traces from any OpenTelemetry SDK or Collector. Each incoming gRPC export call is deserialized via the opentelemetry-proto crate (already a transitive dep of opentelemetry-otlp) and serialized as JSON messages into the connector channel, eliminating any build-time proto compilation step. All three OTLP services (LogsService, MetricsService, TraceService) are served on the same listener, with gzip compression enabled on every service (OTel SDKs and the Collector compress payloads by default). The connector follows the pull-model Source trait: poll() blocks on the first message, then drains up to batch_size additional messages via try_recv() before returning. A tokio oneshot channel carries the shutdown signal from close() to the gRPC server task.
d6d1751 to
133d6e6
Compare
Mark COOP_TASKRUN PR apache#3517 as submitted; clear TOBEDECIDED.md. Both apache#3516 and apache#3517 are now S-waiting-on-review on apache/iggy. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
- AGENTS.md: 104→75 lines. Removed redundant repo structure (derivable by ls), collapsed principles to iggy-specific rules only, merged Jenkins/QW infra into Infra section, updated handover block. - TODO.md: replaced stale checked items with 4 open PRs (apache#3516 apache#3517 apache#3523 apache#3525) + QW 0.9 upgrade task. - DONE.md: added sessions 5-10 block (QW sink pipeline, collector cutover, InvalidOffset bug + fix). - quickwit_sink/src/lib.rs: cargo fmt reformatting only.
|
|
||
| let (tx, rx) = mpsc::channel(self.config.channel_capacity); | ||
| let (shutdown_tx, shutdown_rx) = oneshot::channel(); | ||
|
|
There was a problem hiding this comment.
Bind race: tokio::spawn(server::run_grpc_server(addr, tx, shutdown_rx)) at line 80, then Ok(()) returned at line 90 without any signal from the server that bind succeeded. If bind fails (EADDRINUSE, EPERM), server task exits, tx dropped, rx.recv() at line 99 returns None. poll() returns Ok(empty) (line 102-106). SDK loop serializes empty batch, runtime source_forwarding_loop sends 0 Iggy messages, loops forever at CPU speed. Connector status stays Running with no data and no error. Fix: bind TcpListener synchronously inside open() before tokio::spawn, pass bound listener to server task.
| request: Request<ExportLogsServiceRequest>, | ||
| ) -> Result<Response<ExportLogsServiceResponse>, Status> { | ||
| let messages = convert::export_logs_to_messages(request.into_inner()); | ||
| send_messages(&self.tx, messages, "logs").await; |
There was a problem hiding this comment.
Happening in multiple places for each of the OTLP messages - Logs, traces and metrics. But commenting in place only with line numbers of other places.
97-99, 111-113, 125-127, 137-138 — Channel-full → false SUCCESS: try_send at line 137 fails, message dropped, warn! at line 138 fires. All three service impls return Ok(Response::new(...{ partial_success: None })) — lines 97-99 (logs), 111-113 (metrics), 125-127 (traces). OTel SDK sees full acceptance, does not retry, does not back off. Data permanently lost with no client-visible signal. OTLP gRPC spec requires partial_success.rejected_log_records / rejected_spans / rejected_data_points to be nonzero when export is rejected. Fix: count dropped messages in send_messages, return count, populate
partial_success rejection field in each service impl.
Spawn raced with bind: if EADDRINUSE/EPERM, the server task exited silently, tx was dropped, and poll() returned Ok(empty) forever at CPU speed with the connector status stuck at Running. Bind with TcpIncoming::bind() before tokio::spawn so any port error surfaces as InitError in open() instead of a silent busy-loop. Switch run_grpc_server to serve_with_incoming_shutdown.
|
Good catch @ryerraguntla — fixed in the latest push. Root cause: Fix: /ready |
| let mut rx_guard = self.rx.lock().await; | ||
| let rx = rx_guard.as_mut().ok_or_else(|| { | ||
| Error::InitError("OTLP source connector is not initialized".to_string()) | ||
| })?; |
There was a problem hiding this comment.
Channel None returns Ok(empty) not Err: When gRPC server task dies (any reason), tx dropped, rx.recv().await at line 99 returns None. Lines 101-107 return Ok(ProducedMessages { messages: vec![], ... }). SDK loop treats this as success, busy-loops sending empty batches forever. Connector never signals error. Fix: None branch → Err(Error::Connection("gRPC server terminated
unexpectedly".into())).
| } | ||
|
|
||
| #[async_trait] | ||
| impl Source for OtlpSource { |
There was a problem hiding this comment.
Double open() leaks server task: Second call to open() overwrites Mutex<Option<...>> at lines 82-84. Old rx dropped → old tx all-receivers-gone on next send. Old server_task JoinHandle dropped without abort → task keeps running on same addr. Second server's bind fails with EADDRINUSE → bind-race applies → permanent silent failure. Fix: at start of open(), check self.rx.lock().await.is_some(), return Err(Error::InitError("already open".into())) or call close() first.
… closed channel Two correctness issues: 1. Channel-full drops were silent successes: try_send failure dropped the message but each service impl returned partial_success: None, so the OTel SDK saw full acceptance and did not retry. Now send_messages returns the drop count and each impl populates the OTLP-spec rejected_log_records / rejected_data_points / rejected_spans field. 2. rx.recv() == None returned Ok(empty), causing a CPU-speed busy-loop when the gRPC server task exited for any reason (panic, OS signal). Now returns Err(Error::Connection(...)) so the runtime propagates the failure and stops the connector.
| } | ||
| } | ||
|
|
||
| pub fn bytes_to_hex(bytes: &[u8]) -> String { |
There was a problem hiding this comment.
bytes_to_hex 16 allocs per call: line 292 bytes.iter().map(|b| format!("{b:02x}")).collect() allocates one String per byte. For 16-byte trace_id: 16 heap allocs per call. Called 3x per span (lines 120-122: trace_id, span_id, parent_span_id) = 48 allocs per span. Called 2x per log record (lines 49-50). Fix: let mut s = String::with_capacity(bytes.len() * 2); for b in bytes { let _ = write!(s, "{b:02x}"); } s — one alloc.
|
Both issues addressed in this push: Channel-full → false success (
/ready |
| "trace_id": bytes_to_hex(&record.trace_id), | ||
| "span_id": bytes_to_hex(&record.span_id), | ||
| "service_name": service_name, | ||
| "resource": resource_attrs.clone(), |
There was a problem hiding this comment.
both at 52 and 130 line numbers - resource_attrs cloned per record/span: line 52 "resource": resource_attrs.clone() inside the innermost loop over log records; line 130 same pattern for spans. resource_attrs is
Map<String, Value> = BTreeMap. For batch of 1000 log records sharing one resource with 20 attrs: 1000 BTreeMap clones. Fix: serialize resource_attrs to Value once before inner loop, clone pre-built Value per
record.
| } | ||
| Some(any_value::Value::KvlistValue(kvlist)) => Value::Object(extract_attrs(&kvlist.values)), | ||
| Some(any_value::Value::BytesValue(bytes)) => Value::String(bytes_to_hex(bytes)), | ||
| Some(any_value::Value::StringValueStrindex(_)) | None => Value::Null, |
There was a problem hiding this comment.
StringValueStrindex → Value::Null silently: Some(any_value::Value::StringValueStrindex(_)) | None => Value::Null at line 284 — unrecognized oneof variant silently becomes JSON null.
Attribute data discarded with no log, no metric. Fix: add tracing::warn! on this arm logging the attribute key so operators know data was dropped.
…to_hex
Double open() overwrote Mutex fields without aborting the old server task,
leaving a dangling task on the same port (EADDRINUSE on restart). Added an
is_some() guard at the top of open() that returns InitError immediately.
bytes_to_hex allocated one String per byte via format!("{b:02x}") — 16
heap allocs per trace_id/span_id call, 48 per span. Replaced with a
single String::with_capacity(len*2) + write! loop.
|
@mfyuce Today spent some time knowing (not learning yet :) about OTLP. This is a very good use case for pumping data into iggy and iggy fans out to appropriate dashboards/analytics. initial couple of them are critical. There are few more I need look at , which I will do later tonight or tomorrow. One can wait for iter 2 review or start fixing these. |
…n AnyValue resource_attrs (Map<String, Value>) was cloned inside the innermost loop over log records and spans — 1000 Map clones per batch for a shared resource. Convert to Value::Object once before the scope loops; the inner loop clones the Value instead. extract_attrs now matches StringValueStrindex before calling any_value_to_json, so the attribute key is included in the warn! when an unrecognized AnyValue variant silently drops data.
|
Both addressed in this push:
/ready |
|
is it an agent at work? :) |
Yes, Claude Code as a pair programmer :); it proposes the fixes, I review and understand each one before it goes in. Your feedback has been the real driver here; the bind race, partial_success, and channel-close issues were genuine bugs I wouldn't have caught this fast on my own. Looking forward to the rest of the review. |
| async fn close(&mut self) -> Result<(), Error> { | ||
| if let Some(tx) = self.shutdown_tx.lock().await.take() { | ||
| let _ = tx.send(()); | ||
| } |
There was a problem hiding this comment.
task.abort() without .await: abort() at line 144 schedules cancellation but returns immediately. close() returns Ok(()) while gRPC server task may still hold the TCP port. A subsequent
open() call (restart) hits EADDRINUSE → combines with double-open finding, restart permanently broken. Fix: task.abort(); let _ = task.await; (ignore JoinError::Cancelled).
| })) | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Log flood + misleading async: try_send failure at line 144 logs warn! per dropped message. Batch of 1000 saturated-channel messages → 1000 warn! calls per export. Additionally async fn send_messages contains no .await — misleading signature, function never yields. Fix: make it fn send_messages (non-async); count drops in loop, single warn!("dropped {dropped}/{total} {signal} messages") after loop.
| # under the License. | ||
|
|
||
| [package] | ||
| name = "iggy_connector_otlp_source" |
There was a problem hiding this comment.
Please cross check this comment - version = "0.1.0" diverges from workspace pattern 0.4.1-edge.1. Version scripts (scripts/extract-version.sh, sync-rustc-version.sh) key off workspace-aligned versions; this crate will be skipped or produce wrong tags. Fix: version = "0.4.1-edge.1".
| async-trait = { workspace = true } | ||
| dashmap = { workspace = true } | ||
| iggy_connector_sdk = { workspace = true } | ||
| once_cell = { workspace = true } |
There was a problem hiding this comment.
opentelemetry-proto = { version = "0.32.0", ... } not in [workspace.dependencies]. Workspace carries this transitively via opentelemetry-otlp (workspace line 218). Direct pin
unmanaged by workspace tooling; future OTel family bumps silently diverge this pin. scripts/ci/third-party-licenses.sh depends on workspace dep resolution; out-of-workspace pin can corrupt generated manifest.
Fix: move to [workspace.dependencies], reference { workspace = true }.
| use tokio::task::JoinHandle; | ||
| use tonic::transport::server::TcpIncoming; | ||
| use tracing::info; | ||
|
|
There was a problem hiding this comment.
pub mod convert; pub mod server; should be pub(crate): Plugin .so loaded via FFI; only intended public surface is source_connector!-generated symbols. All other source connectors use private internal modules. Fix: pub(crate) mod convert; pub(crate) mod server. Use #[cfg(test)] pub use for test-specific exposure if needed.
| match serde_json::to_vec(&doc) { | ||
| Ok(payload) => messages.push(ProducedMessage { | ||
| id: None, | ||
| checksum: None, |
There was a problem hiding this comment.
Zero time_unix_nano treated as valid timestamp: timestamp: Some(record.time_unix_nano) at line 62. Proto scalar default for unset timestamp is 0. Code emits timestamp: Some(0) = Unix
epoch 1970-01-01T00:00:00Z instead of "unset". Corrupts time-based queries. Fix: timestamp: if record.time_unix_nano == 0 { None } else { Some(record.time_unix_nano) }. Same applies to line 140
(span.start_time_unix_nano).
…e dep, visibility, zero timestamp close() now awaits the aborted task so the TCP port is released before returning; rx is also cleared to allow a subsequent open() after restart. send_messages is non-async (it has no .await); drops are counted and emitted as a single warn! instead of one per message. Version aligned to workspace pattern 0.4.1-edge.1. opentelemetry-proto added to workspace deps; crate references it via workspace = true. convert and server modules narrowed to pub(crate); only FFI symbols generated by source_connector! remain part of the public surface. Proto scalar default 0 for time_unix_nano / start_time_unix_nano is now treated as unset and emits None instead of Some(0) / Unix epoch.
|
All six addressed in this push:
Log flood / misleading async: Version mismatch: bumped to
Zero /ready |
Summary
iggy_connector_otlp_source, a cdylib connector plugin that binds port 4317 (OTLP standard) and receives logs, metrics, and traces from any OpenTelemetry SDK or Collector.opentelemetry-protowire format (already a transitive dep ofopentelemetry-otlp) and serialized as JSON into the connector channel — no build-time proto compilation required.LogsService,MetricsService,TraceService) share one listener, with gzip compression enabled on every service (OTel SDKs and the Collector compress payloads by default).Sourcetrait:poll()blocks on the first message then drains up tobatch_sizeadditional messages viatry_recv()before returning.Design notes
Why
opentelemetry-protoinstead oftonic-build?The
opentelemetry-protocrate already generates the tonic stubs and ships them as a library. Using it avoids abuild.rs/protocdependency and keeps the build hermetic.Why a single gRPC listener for all three signals?
Standard OTLP deployments use one endpoint for all telemetry. Separating them would require operators to configure three addresses, matching no existing tooling convention.
JSON schema (documented in
README.md):Each message carries a top-level
"signal"field ("log","metric", or"trace") plus signal-specific fields derived from the OTLP proto structs.Configuration
Point any OTel SDK or Collector at
grpc://host:4317.Test plan
cargo clippy -p iggy_connector_otlp_source --all-features --all-targets -- -D warnings— cleancargo test -p iggy_connector_otlp_source— passes.rsandCargo.tomlfiles🤖 Generated with Claude Code