Skip to content

feat(connectors): add OTLP gRPC source connector#3516

Open
mfyuce wants to merge 6 commits into
apache:masterfrom
mfyuce:feat/otlp-source-connector
Open

feat(connectors): add OTLP gRPC source connector#3516
mfyuce wants to merge 6 commits into
apache:masterfrom
mfyuce:feat/otlp-source-connector

Conversation

@mfyuce

@mfyuce mfyuce commented Jun 20, 2026

Copy link
Copy Markdown

Summary

  • Adds iggy_connector_otlp_source, a cdylib connector plugin that binds port 4317 (OTLP standard) and receives logs, metrics, and traces from any OpenTelemetry SDK or Collector.
  • Each incoming gRPC export request is deserialized from the opentelemetry-proto wire format (already a transitive dep of opentelemetry-otlp) and serialized as JSON into the connector channel — no build-time proto compilation required.
  • All three OTLP services (LogsService, MetricsService, TraceService) share one listener, with gzip compression enabled on every service (OTel SDKs and the Collector compress payloads by default).
  • Follows the pull-model Source trait: poll() blocks on the first message then drains up to batch_size additional messages via try_recv() before returning.

Design notes

Why opentelemetry-proto instead of tonic-build?
The opentelemetry-proto crate already generates the tonic stubs and ships them as a library. Using it avoids a build.rs / protoc dependency and keeps the build hermetic.

Why a single gRPC listener for all three signals?
Standard OTLP deployments use one endpoint for all telemetry. Separating them would require operators to configure three addresses, matching no existing tooling convention.

JSON schema (documented in README.md):
Each message carries a top-level "signal" field ("log", "metric", or "trace") plus signal-specific fields derived from the OTLP proto structs.

Configuration

[plugin_config]
listen_addr = "0.0.0.0:4317"
channel_capacity = 50000
batch_size = 1000

Point any OTel SDK or Collector at grpc://host:4317.

Test plan

  • cargo clippy -p iggy_connector_otlp_source --all-features --all-targets -- -D warnings — clean
  • cargo test -p iggy_connector_otlp_source — passes
  • License headers present on all new .rs and Cargo.toml files
  • Taplo and markdownlint pass on new TOML/Markdown files
  • Deployed and validated against a live OTel Collector in a Kubernetes environment: logs, metrics, and traces all fan-out correctly into Iggy topics

🤖 Generated with Claude Code

@github-actions

Copy link
Copy Markdown

Thanks for the PR. It is labeled S-waiting-on-review and queued for review.

Slash commands (own line, regular comment) move it around the queue:

  • /ready - back to S-waiting-on-review after addressing feedback
  • /author - flip to S-waiting-on-author while you finish changes
  • /request-review @user-or-team - request a reviewer

See CONTRIBUTING.md for details.

@github-actions github-actions Bot added the S-waiting-on-review PR is waiting on a reviewer label Jun 20, 2026
Adds iggy_connector_otlp_source, a cdylib connector plugin that binds
port 4317 (OTLP standard) and receives logs, metrics, and traces from
any OpenTelemetry SDK or Collector.

Each incoming gRPC export call is deserialized via the opentelemetry-proto
crate (already a transitive dep of opentelemetry-otlp) and serialized as
JSON messages into the connector channel, eliminating any build-time proto
compilation step. All three OTLP services (LogsService, MetricsService,
TraceService) are served on the same listener, with gzip compression enabled
on every service (OTel SDKs and the Collector compress payloads by default).

The connector follows the pull-model Source trait: poll() blocks on the
first message, then drains up to batch_size additional messages via
try_recv() before returning. A tokio oneshot channel carries the shutdown
signal from close() to the gRPC server task.
@mfyuce mfyuce force-pushed the feat/otlp-source-connector branch from d6d1751 to 133d6e6 Compare June 20, 2026 20:00
mfyuce added a commit to mfyuce/iggy that referenced this pull request Jun 20, 2026
mfyuce added a commit to mfyuce/iggy that referenced this pull request Jun 21, 2026
Mark COOP_TASKRUN PR apache#3517 as submitted; clear TOBEDECIDED.md.
Both apache#3516 and apache#3517 are now S-waiting-on-review on apache/iggy.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
mfyuce added a commit to mfyuce/iggy that referenced this pull request Jun 21, 2026
- AGENTS.md: 104→75 lines. Removed redundant repo structure (derivable
  by ls), collapsed principles to iggy-specific rules only, merged
  Jenkins/QW infra into Infra section, updated handover block.
- TODO.md: replaced stale checked items with 4 open PRs (apache#3516 apache#3517
  apache#3523 apache#3525) + QW 0.9 upgrade task.
- DONE.md: added sessions 5-10 block (QW sink pipeline, collector
  cutover, InvalidOffset bug + fix).
- quickwit_sink/src/lib.rs: cargo fmt reformatting only.

let (tx, rx) = mpsc::channel(self.config.channel_capacity);
let (shutdown_tx, shutdown_rx) = oneshot::channel();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bind race: tokio::spawn(server::run_grpc_server(addr, tx, shutdown_rx)) at line 80, then Ok(()) returned at line 90 without any signal from the server that bind succeeded. If bind fails (EADDRINUSE, EPERM), server task exits, tx dropped, rx.recv() at line 99 returns None. poll() returns Ok(empty) (line 102-106). SDK loop serializes empty batch, runtime source_forwarding_loop sends 0 Iggy messages, loops forever at CPU speed. Connector status stays Running with no data and no error. Fix: bind TcpListener synchronously inside open() before tokio::spawn, pass bound listener to server task.

request: Request<ExportLogsServiceRequest>,
) -> Result<Response<ExportLogsServiceResponse>, Status> {
let messages = convert::export_logs_to_messages(request.into_inner());
send_messages(&self.tx, messages, "logs").await;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happening in multiple places for each of the OTLP messages - Logs, traces and metrics. But commenting in place only with line numbers of other places.
97-99, 111-113, 125-127, 137-138 — Channel-full → false SUCCESS: try_send at line 137 fails, message dropped, warn! at line 138 fires. All three service impls return Ok(Response::new(...{ partial_success: None })) — lines 97-99 (logs), 111-113 (metrics), 125-127 (traces). OTel SDK sees full acceptance, does not retry, does not back off. Data permanently lost with no client-visible signal. OTLP gRPC spec requires partial_success.rejected_log_records / rejected_spans / rejected_data_points to be nonzero when export is rejected. Fix: count dropped messages in send_messages, return count, populate
partial_success rejection field in each service impl.

Spawn raced with bind: if EADDRINUSE/EPERM, the server task exited
silently, tx was dropped, and poll() returned Ok(empty) forever at CPU
speed with the connector status stuck at Running.

Bind with TcpIncoming::bind() before tokio::spawn so any port error
surfaces as InitError in open() instead of a silent busy-loop.
Switch run_grpc_server to serve_with_incoming_shutdown.
@mfyuce

mfyuce commented Jun 21, 2026

Copy link
Copy Markdown
Author

Good catch @ryerraguntla — fixed in the latest push.

Root cause: tokio::spawn(run_grpc_server(addr, ...)) returned immediately; if bind failed, the server task exited silently, tx was dropped, and poll() returned Ok(empty) forever at CPU speed with connector status stuck at Running.

Fix: TcpIncoming::bind(addr) is called synchronously in open() before tokio::spawn. Any bind failure (EADDRINUSE, EPERM, …) now propagates as Error::InitError from open(), which surfaces in the connector lifecycle and stops the source cleanly. run_grpc_server now takes the pre-bound TcpIncoming and uses serve_with_incoming_shutdown.

/ready

let mut rx_guard = self.rx.lock().await;
let rx = rx_guard.as_mut().ok_or_else(|| {
Error::InitError("OTLP source connector is not initialized".to_string())
})?;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Channel None returns Ok(empty) not Err: When gRPC server task dies (any reason), tx dropped, rx.recv().await at line 99 returns None. Lines 101-107 return Ok(ProducedMessages { messages: vec![], ... }). SDK loop treats this as success, busy-loops sending empty batches forever. Connector never signals error. Fix: None branch → Err(Error::Connection("gRPC server terminated
unexpectedly".into())).

}

#[async_trait]
impl Source for OtlpSource {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double open() leaks server task: Second call to open() overwrites Mutex<Option<...>> at lines 82-84. Old rx dropped → old tx all-receivers-gone on next send. Old server_task JoinHandle dropped without abort → task keeps running on same addr. Second server's bind fails with EADDRINUSE → bind-race applies → permanent silent failure. Fix: at start of open(), check self.rx.lock().await.is_some(), return Err(Error::InitError("already open".into())) or call close() first.

… closed channel

Two correctness issues:

1. Channel-full drops were silent successes: try_send failure dropped the
   message but each service impl returned partial_success: None, so the
   OTel SDK saw full acceptance and did not retry. Now send_messages
   returns the drop count and each impl populates the OTLP-spec
   rejected_log_records / rejected_data_points / rejected_spans field.

2. rx.recv() == None returned Ok(empty), causing a CPU-speed busy-loop
   when the gRPC server task exited for any reason (panic, OS signal).
   Now returns Err(Error::Connection(...)) so the runtime propagates the
   failure and stops the connector.
}
}

pub fn bytes_to_hex(bytes: &[u8]) -> String {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bytes_to_hex 16 allocs per call: line 292 bytes.iter().map(|b| format!("{b:02x}")).collect() allocates one String per byte. For 16-byte trace_id: 16 heap allocs per call. Called 3x per span (lines 120-122: trace_id, span_id, parent_span_id) = 48 allocs per span. Called 2x per log record (lines 49-50). Fix: let mut s = String::with_capacity(bytes.len() * 2); for b in bytes { let _ = write!(s, "{b:02x}"); } s — one alloc.

@mfyuce

mfyuce commented Jun 21, 2026

Copy link
Copy Markdown
Author

Both issues addressed in this push:

Channel-full → false success (server.rs): send_messages now returns the drop count. Each service impl populates the OTLP-spec rejection field when nonzero — rejected_log_records, rejected_data_points, rejected_spans — so the OTel SDK/Collector sees a partial success and can retry or alert accordingly.

rx.recv() == None → busy-loop (lib.rs): The None branch now returns Err(Error::Connection("OTLP gRPC server terminated unexpectedly")) instead of Ok(empty), so the runtime propagates the failure and stops the connector cleanly instead of spinning.

/ready

"trace_id": bytes_to_hex(&record.trace_id),
"span_id": bytes_to_hex(&record.span_id),
"service_name": service_name,
"resource": resource_attrs.clone(),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both at 52 and 130 line numbers - resource_attrs cloned per record/span: line 52 "resource": resource_attrs.clone() inside the innermost loop over log records; line 130 same pattern for spans. resource_attrs is
Map<String, Value> = BTreeMap. For batch of 1000 log records sharing one resource with 20 attrs: 1000 BTreeMap clones. Fix: serialize resource_attrs to Value once before inner loop, clone pre-built Value per
record.

}
Some(any_value::Value::KvlistValue(kvlist)) => Value::Object(extract_attrs(&kvlist.values)),
Some(any_value::Value::BytesValue(bytes)) => Value::String(bytes_to_hex(bytes)),
Some(any_value::Value::StringValueStrindex(_)) | None => Value::Null,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StringValueStrindex → Value::Null silently: Some(any_value::Value::StringValueStrindex(_)) | None => Value::Null at line 284 — unrecognized oneof variant silently becomes JSON null.
Attribute data discarded with no log, no metric. Fix: add tracing::warn! on this arm logging the attribute key so operators know data was dropped.

…to_hex

Double open() overwrote Mutex fields without aborting the old server task,
leaving a dangling task on the same port (EADDRINUSE on restart). Added an
is_some() guard at the top of open() that returns InitError immediately.

bytes_to_hex allocated one String per byte via format!("{b:02x}") — 16
heap allocs per trace_id/span_id call, 48 per span. Replaced with a
single String::with_capacity(len*2) + write! loop.
@ryerraguntla

ryerraguntla commented Jun 21, 2026

Copy link
Copy Markdown
Contributor

@mfyuce Today spent some time knowing (not learning yet :) about OTLP. This is a very good use case for pumping data into iggy and iggy fans out to appropriate dashboards/analytics. initial couple of them are critical. There are few more I need look at , which I will do later tonight or tomorrow. One can wait for iter 2 review or start fixing these.

…n AnyValue

resource_attrs (Map<String, Value>) was cloned inside the innermost loop
over log records and spans — 1000 Map clones per batch for a shared
resource. Convert to Value::Object once before the scope loops; the inner
loop clones the Value instead.

extract_attrs now matches StringValueStrindex before calling
any_value_to_json, so the attribute key is included in the warn! when an
unrecognized AnyValue variant silently drops data.
@mfyuce

mfyuce commented Jun 21, 2026

Copy link
Copy Markdown
Author

Both addressed in this push:

resource_attrs cloned per record/spanresource_attrs: Map<String, Value> is now moved into Value::Object(resource_attrs) once before the scope loops; the inner loop clones the Value instead of the raw Map. The metrics path already passes resource_attrs by reference to metric_to_data_points, so it was not affected.

StringValueStrindex silent dropextract_attrs now matches StringValueStrindex explicitly before calling any_value_to_json, which gives access to kv.key for the warn!. The any_value_to_json return path for StringValueStrindex (and recursive KvlistValue / ArrayValue nesting) still returns Value::Null; only the top-level attribute key context is logged here.

/ready

@ryerraguntla

Copy link
Copy Markdown
Contributor

is it an agent at work? :)

@mfyuce

mfyuce commented Jun 21, 2026

Copy link
Copy Markdown
Author

is it an agent at work? :)

Yes, Claude Code as a pair programmer :); it proposes the fixes, I review and understand each one before it goes in. Your feedback has been the real driver here; the bind race, partial_success, and channel-close issues were genuine bugs I wouldn't have caught this fast on my own. Looking forward to the rest of the review.

async fn close(&mut self) -> Result<(), Error> {
if let Some(tx) = self.shutdown_tx.lock().await.take() {
let _ = tx.send(());
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

task.abort() without .await: abort() at line 144 schedules cancellation but returns immediately. close() returns Ok(()) while gRPC server task may still hold the TCP port. A subsequent
open() call (restart) hits EADDRINUSE → combines with double-open finding, restart permanently broken. Fix: task.abort(); let _ = task.await; (ignore JoinError::Cancelled).

}))
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log flood + misleading async: try_send failure at line 144 logs warn! per dropped message. Batch of 1000 saturated-channel messages → 1000 warn! calls per export. Additionally async fn send_messages contains no .await — misleading signature, function never yields. Fix: make it fn send_messages (non-async); count drops in loop, single warn!("dropped {dropped}/{total} {signal} messages") after loop.

# under the License.

[package]
name = "iggy_connector_otlp_source"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please cross check this comment - version = "0.1.0" diverges from workspace pattern 0.4.1-edge.1. Version scripts (scripts/extract-version.sh, sync-rustc-version.sh) key off workspace-aligned versions; this crate will be skipped or produce wrong tags. Fix: version = "0.4.1-edge.1".

async-trait = { workspace = true }
dashmap = { workspace = true }
iggy_connector_sdk = { workspace = true }
once_cell = { workspace = true }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

opentelemetry-proto = { version = "0.32.0", ... } not in [workspace.dependencies]. Workspace carries this transitively via opentelemetry-otlp (workspace line 218). Direct pin
unmanaged by workspace tooling; future OTel family bumps silently diverge this pin. scripts/ci/third-party-licenses.sh depends on workspace dep resolution; out-of-workspace pin can corrupt generated manifest.
Fix: move to [workspace.dependencies], reference { workspace = true }.

use tokio::task::JoinHandle;
use tonic::transport::server::TcpIncoming;
use tracing::info;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pub mod convert; pub mod server; should be pub(crate): Plugin .so loaded via FFI; only intended public surface is source_connector!-generated symbols. All other source connectors use private internal modules. Fix: pub(crate) mod convert; pub(crate) mod server. Use #[cfg(test)] pub use for test-specific exposure if needed.

match serde_json::to_vec(&doc) {
Ok(payload) => messages.push(ProducedMessage {
id: None,
checksum: None,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Zero time_unix_nano treated as valid timestamp: timestamp: Some(record.time_unix_nano) at line 62. Proto scalar default for unset timestamp is 0. Code emits timestamp: Some(0) = Unix
epoch 1970-01-01T00:00:00Z instead of "unset". Corrupts time-based queries. Fix: timestamp: if record.time_unix_nano == 0 { None } else { Some(record.time_unix_nano) }. Same applies to line 140
(span.start_time_unix_nano).

…e dep, visibility, zero timestamp

close() now awaits the aborted task so the TCP port is released before
returning; rx is also cleared to allow a subsequent open() after restart.

send_messages is non-async (it has no .await); drops are counted and
emitted as a single warn! instead of one per message.

Version aligned to workspace pattern 0.4.1-edge.1.

opentelemetry-proto added to workspace deps; crate references it via
workspace = true.

convert and server modules narrowed to pub(crate); only FFI symbols
generated by source_connector! remain part of the public surface.

Proto scalar default 0 for time_unix_nano / start_time_unix_nano is now
treated as unset and emits None instead of Some(0) / Unix epoch.
@mfyuce

mfyuce commented Jun 22, 2026

Copy link
Copy Markdown
Author

All six addressed in this push:

task.abort() without await: close() now takes the handle out of the mutex first (to release the lock), calls abort(), then let _ = task.await to drain the cancelled task before returning. rx is also cleared so a subsequent open() after restart passes the already-open guard.

Log flood / misleading async: send_messages is now a plain fn (no .await inside). Drops are counted silently in the loop; a single warn!("dropped {dropped}/{total} {signal} messages") fires once after the loop.

Version mismatch: bumped to 0.4.1-edge.1 to match workspace connector pattern.

opentelemetry-proto not in workspace: added to [workspace.dependencies] with default-features = false; crate now uses { workspace = true, features = [...] }.

pub mod visibility: convert and server narrowed to pub(crate); only the FFI symbols from source_connector! remain public.

Zero time_unix_nano as epoch: both log records and spans now use .then_some() so proto default 0 maps to None instead of Some(0).

/ready

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

S-waiting-on-review PR is waiting on a reviewer

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants