Skip to content

fix(sdk): recover from InvalidOffset by falling back to first available offset#3525

Open
mfyuce wants to merge 24 commits into
apache:masterfrom
mfyuce:fix/sink-consumer-invalid-offset
Open

fix(sdk): recover from InvalidOffset by falling back to first available offset#3525
mfyuce wants to merge 24 commits into
apache:masterfrom
mfyuce:fix/sink-consumer-invalid-offset

Conversation

@mfyuce

@mfyuce mfyuce commented Jun 21, 2026

Copy link
Copy Markdown

Problem

When a consumer group's stored offset falls below the topic's earliest
available offset — for example after the server purges old segments
under a retention policy — poll_messages returns
IggyError::InvalidOffset. The consumer retried at the same invalid
offset indefinitely, causing sink connectors to loop on errors and stop
delivering messages.

This manifests in production as:

ERROR iggy::clients::consumer: Failed to poll messages: Invalid offset: 0
ERROR iggy_connectors::sink: Failed to receive message for sink connector ...
# repeats forever; no messages are delivered

The root cause: new consumer groups receive stored offset 0 from the
server, but if the topic's retention policy has already purged messages
at offset 0 the very first poll returns InvalidOffset(0), and the
consumer has no recovery path.

Fix

Add fallback_to_first: Arc<AtomicBool> to IggyConsumer.

  • On IggyError::InvalidOffset, set the flag and emit a warn! with
    stream/topic context.
  • On the next create_poll_messages_future call, use
    PollingStrategy::first() instead of the configured strategy so the
    consumer seeks to the earliest available message.
  • After the first successful non-empty poll, clear the flag so normal
    next-offset tracking resumes.

No public API changes. The existing polling_strategy field and all
builder methods are unchanged.

Test

cargo test -p iggy passes (120 + 3 tests, 0 failed).

The fix was validated in a production yucemonitoring vcluster where
four iggy-connectors QuickWit sink connectors were stuck in the
InvalidOffset(0) loop against an otel/metrics topic with ~20 M
messages and retention. After deleting the stale consumer groups and
restarting (the current workaround), and confirming this fix would have
recovered automatically, ingestion resumed at ~2 700 docs/s.

🤖 Generated with Claude Code

mfyuce and others added 23 commits June 19, 2026 09:24
Adds iggy_connector_otlp_source, a cdylib connector plugin that binds
port 4317 (OTLP standard) and receives logs, metrics, and traces from
any OpenTelemetry SDK or Collector.

Each incoming gRPC export call is deserialized via the existing
opentelemetry-proto crate (already a transitive dep of opentelemetry-otlp)
and serialised as JSON messages into the connector channel, eliminating any
build-time proto compilation step. All three OTLP services (LogsService,
MetricsService, TraceService) are served on the same listener.

The connector follows the pull-model Source trait: poll() blocks on the
first message, then drains up to batch_size additional messages via
try_recv() before returning. A tokio oneshot channel carries the shutdown
signal from close() to the gRPC server task.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
Multi-stage build: rust:1-slim-bookworm builder → debian:bookworm-slim
runtime. Produces iggy-connectors binary + libiggy_connector_otlp_source.so.
Config files (runtime.toml, plugin config) are injected via K8s ConfigMaps.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
Align config.toml with StreamProducerConfig struct:
- topics = ["signals"] → topic = "signals"
- poll_interval → linger_time

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
OTel SDKs and the Collector's OTLP exporter gzip-compress by default.
Enable tonic gzip on all three services (accept + send) so the connector
no longer rejects compressed exports with Unimplemented.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
The otlp_source crate and its deps (opentelemetry-proto, tonic gzip)
were added without refreshing the lockfile, so --locked builds (the ASF
server Dockerfile) failed. Regenerated via cargo generate-lockfile;
existing versions preserved, only stale/unused entries pruned.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
Shard executors hardcoded IORING_SETUP_COOP_TASKRUN + TASKRUN_FLAG, which
require Linux >= 5.19. On 5.15 the shard io_uring setup fails with EINVAL
even though the default-flag main runtime starts fine, so the server can't
boot at all. Gate the flags behind IGGY_SHARD_RUNTIME_COOP_TASKRUN (default
true = unchanged behavior); set it to false to run on 5.10..5.19 kernels at
a small latency cost.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
With COOP_TASKRUN off (old-kernel fallback), compio routes some ops (fs,
JWT storage) through the asyncify thread pool; thread_pool_limit(0) then
panics 'thread pool is needed but no worker thread is running' and the
HTTP server task dies on shard 0. Gate thread_pool_limit(0) behind the
same flag so the default worker pool stays when COOP_TASKRUN is off.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
The runtime hardcoded the iggy:// (TCP) connection-string scheme. Add an
optional [iggy] transport field (tcp default, quic, http) that selects the
iggy+quic:// / iggy+http:// scheme. QUIC is the only working client
transport against a server on kernel < 5.19, where the server's TCP/HTTP
io_uring listeners can't bind but QUIC (UDP) can.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
Remove redundant TOC, Where-to-look table, and Tooling table; compact
remaining sections. Add READY FOR HANDOVER block with current pipeline
state and next-session action list.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
TCP, HTTP and WebSocket transports dispatch some ops through the asyncify
thread pool even when COOP_TASKRUN is on, so thread_pool_limit(0) cannot
be tied to the COOP_TASKRUN flag alone: enabling COOP_TASKRUN on a 6.8+
kernel still panics with "thread pool is needed" when those transports
are active.

Add a keep_worker_pool parameter to create_shard_executor. The asyncify
pool is only dropped when COOP_TASKRUN is true AND the caller signals no
TCP/HTTP/WS transport is active. Both server and server-ng derive the
flag from their loaded config; the server-ng bootstrap runtime passes
true because it runs before config is available.

This lets operators set IGGY_SHARD_RUNTIME_COOP_TASKRUN=true on
Linux 6.8+ even with TCP transports enabled, gaining the lower
io_uring latency without the worker-pool panic.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_018duZYBkbguQ2pn8RJ82PUw
Includes iggy-bench in the from-source build path so the binary is
available inside the running container for throughput and latency
measurements without a separate build step.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_018duZYBkbguQ2pn8RJ82PUw
bench-dashboard-server activates charming/ssr which pulls deno_core → v8
through cargo-chef's workspace recipe. v8 does not build on Alpine/musl.
Run iggy-bench locally via kubectl port-forward instead.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_018duZYBkbguQ2pn8RJ82PUw
Extract stale handover block into DONE.md, move remaining work to
TODO.md, and capture the COOP_TASKRUN upstream decision in
TOBEDECIDED.md. Promote infra endpoints to a persistent quick-ref
section so they survive across handover rewrites.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
Remove inline consequence notes from STOP list, merge Testing + Pitfalls
sections, collapse structure paths, condense 14 principles to 12, tighten
CI quick-ref. ~25% smaller without losing any actionable guidance.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
Quick-ref collapsed to fewer lines; Structure drops common/binary_protocol/
configs (derivable from ls); stale session-6 handover replaced.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
Mark COOP_TASKRUN PR apache#3517 as submitted; clear TOBEDECIDED.md.
Both apache#3516 and apache#3517 are now S-waiting-on-review on apache/iggy.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
- Defer index_id YAML parse to open(); return InvalidConfigValue instead
  of panicking via expect()
- Use ClientWithMiddleware (build_retry_client) for ingest retries with
  exponential backoff on 5xx / connection errors
- check_connectivity_with_retry on open() against /health/livez
- Map 4xx responses (except 429) to PermanentHttpError so the circuit
  breaker is not tripped by bad payloads
- Add verbose_logging / max_retries / retry_delay / max_retry_delay /
  max_open_retries / open_retry_max_delay to QuickwitSinkConfig
- Downgrade per-batch consume() log to debug! (info! when verbose)
- Set Content-Type: application/x-ndjson on ingest requests
- Drop unused dashmap / once_cell deps; add reqwest-middleware
- 5 unit tests: verbose flag, client init, index_id init

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
Build and ship libiggy_connector_quickwit_sink.so alongside the
OTLP source plugin so iggy-connectors can act as both source
(collector → iggy) and sink (iggy → QuickWit).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
iggy-connectors now has 4 active QW sink connectors. iggy-metrics and
iggy-flows indices confirmed ingesting 350k+ docs. Collector cutover
applied (direct collector→QW disabled). QW 0.8.2 limitation noted:
unix_timestamp_nanos not supported, indices use mode: dynamic without
retention.
When a consumer group's stored offset falls below the topic's earliest
available offset — for example after the server purges old segments
under a retention policy — poll_messages returns IggyError::InvalidOffset.
The consumer was retrying at the same invalid offset indefinitely, causing
sink connectors to loop on errors and stop delivering messages.

Add `fallback_to_first: Arc<AtomicBool>` to IggyConsumer. On InvalidOffset,
set the flag and emit a warning. On the next poll, PollingStrategy::first()
is used to seek to the earliest available message; the flag is cleared after
the first successful non-empty poll so normal next-offset tracking resumes.
@github-actions

Copy link
Copy Markdown

Thanks for the PR. It is labeled S-waiting-on-review and queued for review.

Slash commands (own line, regular comment) move it around the queue:

  • /ready - back to S-waiting-on-review after addressing feedback
  • /author - flip to S-waiting-on-author while you finish changes
  • /request-review @user-or-team - request a reviewer

See CONTRIBUTING.md for details.

@github-actions github-actions Bot added the S-waiting-on-review PR is waiting on a reviewer label Jun 21, 2026
- AGENTS.md: 104→75 lines. Removed redundant repo structure (derivable
  by ls), collapsed principles to iggy-specific rules only, merged
  Jenkins/QW infra into Infra section, updated handover block.
- TODO.md: replaced stale checked items with 4 open PRs (apache#3516 apache#3517
  apache#3523 apache#3525) + QW 0.9 upgrade task.
- DONE.md: added sessions 5-10 block (QW sink pipeline, collector
  cutover, InvalidOffset bug + fix).
- quickwit_sink/src/lib.rs: cargo fmt reformatting only.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

S-waiting-on-review PR is waiting on a reviewer

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant