fix(sdk): recover from InvalidOffset by falling back to first available offset#3525
Open
mfyuce wants to merge 24 commits into
Open
fix(sdk): recover from InvalidOffset by falling back to first available offset#3525mfyuce wants to merge 24 commits into
mfyuce wants to merge 24 commits into
Conversation
Adds iggy_connector_otlp_source, a cdylib connector plugin that binds port 4317 (OTLP standard) and receives logs, metrics, and traces from any OpenTelemetry SDK or Collector. Each incoming gRPC export call is deserialized via the existing opentelemetry-proto crate (already a transitive dep of opentelemetry-otlp) and serialised as JSON messages into the connector channel, eliminating any build-time proto compilation step. All three OTLP services (LogsService, MetricsService, TraceService) are served on the same listener. The connector follows the pull-model Source trait: poll() blocks on the first message, then drains up to batch_size additional messages via try_recv() before returning. A tokio oneshot channel carries the shutdown signal from close() to the gRPC server task. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
Multi-stage build: rust:1-slim-bookworm builder → debian:bookworm-slim runtime. Produces iggy-connectors binary + libiggy_connector_otlp_source.so. Config files (runtime.toml, plugin config) are injected via K8s ConfigMaps. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
Align config.toml with StreamProducerConfig struct: - topics = ["signals"] → topic = "signals" - poll_interval → linger_time Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
OTel SDKs and the Collector's OTLP exporter gzip-compress by default. Enable tonic gzip on all three services (accept + send) so the connector no longer rejects compressed exports with Unimplemented. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
The otlp_source crate and its deps (opentelemetry-proto, tonic gzip) were added without refreshing the lockfile, so --locked builds (the ASF server Dockerfile) failed. Regenerated via cargo generate-lockfile; existing versions preserved, only stale/unused entries pruned. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
Shard executors hardcoded IORING_SETUP_COOP_TASKRUN + TASKRUN_FLAG, which require Linux >= 5.19. On 5.15 the shard io_uring setup fails with EINVAL even though the default-flag main runtime starts fine, so the server can't boot at all. Gate the flags behind IGGY_SHARD_RUNTIME_COOP_TASKRUN (default true = unchanged behavior); set it to false to run on 5.10..5.19 kernels at a small latency cost. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
With COOP_TASKRUN off (old-kernel fallback), compio routes some ops (fs, JWT storage) through the asyncify thread pool; thread_pool_limit(0) then panics 'thread pool is needed but no worker thread is running' and the HTTP server task dies on shard 0. Gate thread_pool_limit(0) behind the same flag so the default worker pool stays when COOP_TASKRUN is off. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
The runtime hardcoded the iggy:// (TCP) connection-string scheme. Add an optional [iggy] transport field (tcp default, quic, http) that selects the iggy+quic:// / iggy+http:// scheme. QUIC is the only working client transport against a server on kernel < 5.19, where the server's TCP/HTTP io_uring listeners can't bind but QUIC (UDP) can. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
Remove redundant TOC, Where-to-look table, and Tooling table; compact remaining sections. Add READY FOR HANDOVER block with current pipeline state and next-session action list. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
TCP, HTTP and WebSocket transports dispatch some ops through the asyncify thread pool even when COOP_TASKRUN is on, so thread_pool_limit(0) cannot be tied to the COOP_TASKRUN flag alone: enabling COOP_TASKRUN on a 6.8+ kernel still panics with "thread pool is needed" when those transports are active. Add a keep_worker_pool parameter to create_shard_executor. The asyncify pool is only dropped when COOP_TASKRUN is true AND the caller signals no TCP/HTTP/WS transport is active. Both server and server-ng derive the flag from their loaded config; the server-ng bootstrap runtime passes true because it runs before config is available. This lets operators set IGGY_SHARD_RUNTIME_COOP_TASKRUN=true on Linux 6.8+ even with TCP transports enabled, gaining the lower io_uring latency without the worker-pool panic. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_018duZYBkbguQ2pn8RJ82PUw
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_018duZYBkbguQ2pn8RJ82PUw
Includes iggy-bench in the from-source build path so the binary is available inside the running container for throughput and latency measurements without a separate build step. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_018duZYBkbguQ2pn8RJ82PUw
bench-dashboard-server activates charming/ssr which pulls deno_core → v8 through cargo-chef's workspace recipe. v8 does not build on Alpine/musl. Run iggy-bench locally via kubectl port-forward instead. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_018duZYBkbguQ2pn8RJ82PUw
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_018duZYBkbguQ2pn8RJ82PUw
Extract stale handover block into DONE.md, move remaining work to TODO.md, and capture the COOP_TASKRUN upstream decision in TOBEDECIDED.md. Promote infra endpoints to a persistent quick-ref section so they survive across handover rewrites. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
Remove inline consequence notes from STOP list, merge Testing + Pitfalls sections, collapse structure paths, condense 14 principles to 12, tighten CI quick-ref. ~25% smaller without losing any actionable guidance. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
Quick-ref collapsed to fewer lines; Structure drops common/binary_protocol/ configs (derivable from ls); stale session-6 handover replaced. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
Mark COOP_TASKRUN PR apache#3517 as submitted; clear TOBEDECIDED.md. Both apache#3516 and apache#3517 are now S-waiting-on-review on apache/iggy. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
- Defer index_id YAML parse to open(); return InvalidConfigValue instead of panicking via expect() - Use ClientWithMiddleware (build_retry_client) for ingest retries with exponential backoff on 5xx / connection errors - check_connectivity_with_retry on open() against /health/livez - Map 4xx responses (except 429) to PermanentHttpError so the circuit breaker is not tripped by bad payloads - Add verbose_logging / max_retries / retry_delay / max_retry_delay / max_open_retries / open_retry_max_delay to QuickwitSinkConfig - Downgrade per-batch consume() log to debug! (info! when verbose) - Set Content-Type: application/x-ndjson on ingest requests - Drop unused dashmap / once_cell deps; add reqwest-middleware - 5 unit tests: verbose flag, client init, index_id init Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
Build and ship libiggy_connector_quickwit_sink.so alongside the OTLP source plugin so iggy-connectors can act as both source (collector → iggy) and sink (iggy → QuickWit). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
iggy-connectors now has 4 active QW sink connectors. iggy-metrics and iggy-flows indices confirmed ingesting 350k+ docs. Collector cutover applied (direct collector→QW disabled). QW 0.8.2 limitation noted: unix_timestamp_nanos not supported, indices use mode: dynamic without retention.
When a consumer group's stored offset falls below the topic's earliest available offset — for example after the server purges old segments under a retention policy — poll_messages returns IggyError::InvalidOffset. The consumer was retrying at the same invalid offset indefinitely, causing sink connectors to loop on errors and stop delivering messages. Add `fallback_to_first: Arc<AtomicBool>` to IggyConsumer. On InvalidOffset, set the flag and emit a warning. On the next poll, PollingStrategy::first() is used to seek to the earliest available message; the flag is cleared after the first successful non-empty poll so normal next-offset tracking resumes.
|
Thanks for the PR. It is labeled Slash commands (own line, regular comment) move it around the queue:
See CONTRIBUTING.md for details. |
- AGENTS.md: 104→75 lines. Removed redundant repo structure (derivable by ls), collapsed principles to iggy-specific rules only, merged Jenkins/QW infra into Infra section, updated handover block. - TODO.md: replaced stale checked items with 4 open PRs (apache#3516 apache#3517 apache#3523 apache#3525) + QW 0.9 upgrade task. - DONE.md: added sessions 5-10 block (QW sink pipeline, collector cutover, InvalidOffset bug + fix). - quickwit_sink/src/lib.rs: cargo fmt reformatting only.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
When a consumer group's stored offset falls below the topic's earliest
available offset — for example after the server purges old segments
under a retention policy —
poll_messagesreturnsIggyError::InvalidOffset. The consumer retried at the same invalidoffset indefinitely, causing sink connectors to loop on errors and stop
delivering messages.
This manifests in production as:
The root cause: new consumer groups receive stored offset 0 from the
server, but if the topic's retention policy has already purged messages
at offset 0 the very first poll returns
InvalidOffset(0), and theconsumer has no recovery path.
Fix
Add
fallback_to_first: Arc<AtomicBool>toIggyConsumer.IggyError::InvalidOffset, set the flag and emit awarn!withstream/topic context.
create_poll_messages_futurecall, usePollingStrategy::first()instead of the configured strategy so theconsumer seeks to the earliest available message.
next-offset tracking resumes.
No public API changes. The existing
polling_strategyfield and allbuilder methods are unchanged.
Test
cargo test -p iggypasses (120 + 3 tests, 0 failed).The fix was validated in a production
yucemonitoringvcluster wherefour iggy-connectors QuickWit sink connectors were stuck in the
InvalidOffset(0)loop against anotel/metricstopic with ~20 Mmessages and retention. After deleting the stale consumer groups and
restarting (the current workaround), and confirming this fix would have
recovered automatically, ingestion resumed at ~2 700 docs/s.
🤖 Generated with Claude Code