Skip to content

feat(connectors): opensearch source connector#3515

Open
ryerraguntla wants to merge 20 commits into
apache:masterfrom
ryerraguntla:feat(connectors)/opensearch_source_connector
Open

feat(connectors): opensearch source connector#3515
ryerraguntla wants to merge 20 commits into
apache:masterfrom
ryerraguntla:feat(connectors)/opensearch_source_connector

Conversation

@ryerraguntla

@ryerraguntla ryerraguntla commented Jun 20, 2026

Copy link
Copy Markdown
Contributor

Which issue does this PR close?
Closes #3503

Rationale
Iggy had no native way to ingest documents from OpenSearch clusters into Iggy streams. This connector fills that gap for observability pipelines, audit log forwarding, and data lake ingestion scenarios where OpenSearch is the source of truth.

What changed?
Before: no OpenSearch source connector existed. Users had to write custom consumers using the OpenSearch client to bridge documents into Iggy.

After: a new iggy_connector_opensearch_source cdylib plugin implements the Source trait and is loaded by the connectors runtime via FFI. Each poll() issues a search_after-paginated query on (timestamp_field, _id), publishes each hit's _source as a JSON ProducedMessage, and persists the cursor in ConnectorState.

Module layout

core/connectors/sources/opensearch_source/
├── Cargo.toml
├── README.md
├── dependencies.md
└── src/
    ├── lib.rs            # Source trait, poll loop, search_after cursor, config, 16 unit tests
    ├── state_manager.rs  # File-backed SourceState mirror, atomic rename protocol, 4 unit tests
    ├── retry.rs  # Retry logic and circuit breaker logic. 2 unit tests 
    └── http_tests.rs     # Axum mock server tests (20 tests, no Docker required)
core/connectors/runtime/example_config/connectors/
└── opensearch_source.toml
core/integration/tests/connectors/opensearch/
├── mod.rs
├── opensearch_source.rs      # 7 Docker-backed integration tests
├── opensearch_source_types.rs
├── opensearch_source_resilience.rs
└── fixtures/opensearch/      # Container start/stop, document indexing helpers

Key implementation decisionsCursor mechanism — 
search_after on (timestamp_field, _id) sorted ascending.
Two sort keys give stable pagination when timestamps collide; _id is the tiebreaker. No Scroll API, PIT, or offset paging.

State authority — runtime ConnectorState (MessagePack) is authoritative. An optional file-backed JSON mirror (SourceState) is written atomically on close() using write-tmp → fdatasync → rename → dir-fsync. On restart, if a valid runtime state exists it takes priority and file state is not loaded. If state deserialization fails, open() returns InitError to prevent a silent cursor reset.

Validation at open() — timestamp_field is required and non-empty (validated before any network call). batch_size = 0 is rejected. File state config is validated if state.enabled = true.

Error handling — transient OpenSearch errors fail the poll with the error counter incremented. No circuit breaker or HTTP retry in this version (documented limitation). poll() still sleeps polling_interval on error so the runtime does not spin.

At-least-once delivery — cursor advances in memory before ConnectorState is persisted by the runtime. A crash after poll() returns but before the runtime writes state can re-emit the last batch. Documented in README and Limitations.

Validation

cargo fmt --all
cargo sort --no-format --workspace
cargo clippy -p iggy_connector_opensearch_source --all-targets -- -D warnings
cargo test -p iggy_connector_opensearch_source
./scripts/ci/taplo.sh
./scripts/ci/license-headers.sh
# Docker-backed integration (requires running iggy-server and iggy-connectors binaries):
env CARGO_BIN_EXE_iggy-server=... CARGO_BIN_EXE_iggy-connectors=... \
  cargo test -p integration --test mod -- connectors::opensearch::opensearch_source

All 40 tests pass (16 unit + 4 state_manager unit + 20 axum HTTP mock + 7 Docker integration).

Local Execution
Passed
Pre-commit hooks ran

AI Usage

  1. Tools used: Claude
  2. Scope: scaffolding initial file structure and boilerplate; all logic reviewed and iterated manually
  3. Verification: full local compilation, all tests executed, end-to-end tested with Docker OpenSearch + Iggy server + connector runtime
  4. Can explain every line: yes

Configuration reference

type = "source"
key = "opensearch"
enabled = true
version = 0
name = "OpenSearch source"
path = "<BASE_DIR>/target/release/libiggy_connector_opensearch_source"
plugin_config_format = "json"
[[streams]]
stream = "opensearch_stream"
topic  = "documents"
schema = "json"
batch_length = 100
linger_time  = "5ms"
[plugin_config]
url              = "http://localhost:9200"
index            = "logs-*"
timestamp_field  = "@timestamp"       # required; must exist on every document
polling_interval = "10s"              # default
batch_size       = 100                # default
# username       = "admin"
# password       = "replace_with_secret"
# verbose_logging = false
# Optional: custom query DSL (default: match_all)
# [plugin_config.query]
# match = { "log.level" = "error" }
# Optional: file-backed state mirror (runtime msgpack is authoritative)
# [plugin_config.state]
# enabled      = true
# storage_type = "file"
# state_id     = "opensearch_logs_connector"
# [plugin_config.state.storage_config]
# base_path = "./connector_states"

Required Fields

Field Type Description
url String OpenSearch HTTP base URL
index String Index name or pattern (logs-* supported)
timestamp_field String Document field used as sort key and cursor; must exist on every document

Optional Fields

Field Type Default Description
polling_interval humantime "10s" Sleep after each completed poll cycle
batch_size usize 100 Documents per search request (min 1)
query JSON {"match_all":{}} OpenSearch query DSL applied on every poll
username / password String HTTP Basic auth
verbose_logging bool false Log per-poll counts at info! instead of debug!

Poll cycle

Each poll() call:Issues POST /{index}/_search sorted by (timestamp_field asc, _id asc) with optional search_after.Maps each hit's _source to a JSON ProducedMessage.Advances the search_after cursor to the sort tuple of the last successfully published hit.Returns ProducedMessages with serialized ConnectorState.Sleeps polling_interval.Hits missing a sort tuple or _source are skipped with warn!; they do not block the cursor.

Known limitations

  1. At-least-once delivery — cursor advances before the runtime persists ConnectorState; last batch may be re-emitted on crash.
  2. Single sequential reader — one search_after cursor, one batch per poll; no parallel shard workers.
  3. search_after only — no Scroll API, PIT, or sliced export.
  4. Backfill gap — documents with timestamp_field older than the current cursor are not read without a state reset.
  5. _source-disabled documents skipped permanently.Single-node transport — no cluster sniffing.

Mirror elasticsearch_source architecture using the opensearch crate
v2.4.0 (rustls-tls) for OpenSearch wire-protocol compatibility.
Register as a new workspace member.
Mirror elasticsearch_source documentation, adapted for the
OpenSearch wire protocol and state storage type naming.
Add the four canonical source-state unit tests, plus integration
fixtures (opensearchproject/opensearch:2.19.1 container) and
end-to-end tests covering happy paths (poll, empty index, bulk,
restart state persistence) and a negative path (missing index
surfaces ConnectorStatus::Error via the runtime API).
…nd code cleanup

Refactors the OpenSearch source connector: replaces legacy state handling with a clearer restore_state flow that rejects corrupt runtime state, introduces validation for open() config, and centralises file-backed state storage creation (create_state_storage). Implements search_after cursoring with combined (timestamp_field, _id) sort, robust document timestamp parsing (RFC3339 or epoch), defaults for polling interval and batch size, and optional verbose logging. Cleans up and simplifies state structs (removes scroll/offset fields), improves error messages formatting, and uses parse_duration for interval parsing.

Also updates docs and metadata: rewrites the connector README to focus on usage and state semantics, adds a dependencies.md listing runtime deps, updates Cargo.toml to remove simd-json and adjust ignored list, and adds a docker-compose.yml for integration tests. Tests and fixtures were updated to reflect the new cursor/state behaviour and naming conventions.
Add comprehensive HTTP-based unit tests for the OpenSearch source and improve connector state handling and file-backed storage. Introduces a new http_tests module with many tokio/axum tests exercising open/poll/close, auth, queries, paging and file-state persistence. Refactor state serialization/deserialization to use a typed State struct and SOURCE_STATE_VERSION, ensure runtime ConnectorState is authoritative (runtime_state_restored), and validate state storage config separately. Improve file state storage with atomic temp-write+rename, clearer error kinds, and helper validation/creation functions; add unit tests for storage. Update integration fixtures to include typed-fields support and add a default connector state JSON and dev-dependencies (axum, tempfile) for tests.
Major refactor of the OpenSearch source: enforce and skip hits missing sort tuples (log a warning), and move search/poll bookkeeping into a new finalize_poll flow that returns produced messages and a serialized state. Track and persist additional processing stats (avg_batch_processing_time_ms, empty/successful poll counts, last_successful_poll, total_bytes_processed) and compute running average across restarts. Improve state storage: simplify enabled-checks, use atomic temp file write with OpenOptions + sync_data, better error messages, and use ConnectorState.deserialize helper for restores. Add a small-batch test fixture and an integration test for pagination (fetching across search_after pages). Misc: delete default connector state JSON, update dependencies.md note for humantime, add unit test for skipping hits without sort, and rename/clean up several test functions for clarity.
Rename State.total_documents_fetched to total_documents_published across source, state manager, and tests; add serde alias to preserve backwards compat. Change verbose_logging from Option<bool> to bool with a default and update tests accordingly. Improve search_after/cursor handling: preserve and restore cursor across empty polls, expose test helpers (test_search_after, test_last_poll_timestamp), and warn when hits lack _source. Harden file state persistence: use compact JSON, perform atomic write with cleanup on error, sync temp file and parent directory, and only save state when appropriate. Add an example opensearch_source.toml and extend integration tests to detect cursor-reset regressions.
Rename test function from `given_unparseable_timestamp_value_should_return_none` to `given_unparsable_timestamp_value_should_return_none` to correct spelling/consistency in the tests. This is a non-functional change and does not affect runtime behavior.
Rewrite and expand the OpenSearch source README: replace brief Features section with an Architecture overview, add detailed "How it works" (poll cycle, search request, per-hit processing, timestamp parsing), and document state & persistence (runtime msgpack + optional file mirror). Clarify config schema with types/defaults, change default polling_interval to "10s", and add requirements, error variants, limitations, throughput tuning, and troubleshooting guidance. Also standardize tables and behavior notes (search_after semantics, cursor invariants, file save atomicity, at-least-once delivery, missing _source handling).
Change OpenSearch source behavior to treat a batch where none of the hits include a sort tuple as an error and avoid advancing the cursor. In lib.rs last_sort is captured earlier and an explicit Storage error is returned when hits are present but no hit provided a sort tuple. Tests in http_tests.rs were updated: the previous "skip when sort missing" test was renamed and now expects poll() to return an error and increment the error metric; additional test verifies that a trailing hit missing _source still advances the cursor past that hit so it won't be re-fetched. These changes prevent corrupting the search_after cursor for all-no-sort batches while preserving correct advancement for batches that contain missing _source entries.
Avoid treating an empty 'sort' array as valid by filtering out empty arrays when extracting the sort value from hits (core/connectors/sources/opensearch_source/src/lib.rs). In the state storage rename path (core/connectors/sources/opensearch_source/src/state_manager.rs), attempt to remove the temporary file if fs::rename fails and return a Storage error, preventing leftover temp files on failure.
Implement HTTP retry and circuit-breaker resilience for the OpenSearch source. Adds configurable plugin options (max_retries, retry_delay, retry_max_delay, max_open_retries, open_retry_max_delay, circuit_breaker_threshold, circuit_breaker_cool_down) and example config updates. Introduces a new retry helper module (retry.rs) with backoff/jitter/retry-after handling and integrates retry logic into index-exists and search calls in lib.rs (send_search_with_retry, check_index_exists_with_retry). Adds circuit breaker usage (recording successes/failures and skipping polls while open) and refactors poll/error handling. Documents behavior in docs/RESILIENCE.md and expands README with semantics and usage notes (including cursor advancement rules for missing sort/_source). Adds unit tests (http_tests.rs) and integration fixtures/tests (wiremock-backed resilience fixtures and runtime tests), plus connector test configs.. Some of the files are transient. They will be cleaned up after all known issues are fixed.
Refactor OpenSearch source initialization and polling: add max_retries/max_open_retries config options and pre-build a search_body_base during open() instead of rebuilding the query each poll. Harden error handling by returning Connection errors for uninitialized client, converting some Init/Storage errors, and refusing to advance the cursor when an entire batch has valid sort tuples but every hit lacks _source (to avoid losing data).

Other changes: optimize send_search_with_retry to avoid unnecessary clones on the final attempt; treat numeric timestamps >1e12 as milliseconds (keep seconds otherwise); capture/clone a state snapshot before persisting and adjust avg batch time calculation to use saturating subtraction; increment error_count/last_error on poll failures; make file state writes race-safe by adding PID to temp file suffix and use sync_all; add serde default for state version. Update tests and README formatting, and remove the humantime dependency from the Opensearch source crate.
Bring CI actions, workflows, and hawkeye.version in line with upstream.
Embed the resilience documentation (retry policy, circuit breaker, at-least-once delivery, and backfill behavior) from core/connectors/sources/opensearch_source/docs/RESILIENCE.md into the connector README. Remove the standalone RESILIENCE.md file and update internal references to point to the new Resilience section in README. No behavioral/code changes—documentation relocation and link updates only.
@github-actions

Copy link
Copy Markdown

Thanks for the PR. It is labeled S-waiting-on-review and queued for review.

Slash commands (own line, regular comment) move it around the queue:

  • /ready - back to S-waiting-on-review after addressing feedback
  • /author - flip to S-waiting-on-author while you finish changes
  • /request-review @user-or-team - request a reviewer

See CONTRIBUTING.md for details.

@github-actions github-actions Bot added the S-waiting-on-review PR is waiting on a reviewer label Jun 20, 2026
@ryerraguntla ryerraguntla changed the title Feat(connectors)/opensearch source connector feat(connectors): opensearch source connector Jun 20, 2026
@codecov

codecov Bot commented Jun 20, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 86.16279% with 119 lines in your changes missing coverage. Please review.
✅ Project coverage is 46.85%. Comparing base (4a48008) to head (f8b7e7b).

Files with missing lines Patch % Lines
...re/connectors/sources/opensearch_source/src/lib.rs 85.16% 83 Missing and 17 partials ⚠️
...ors/sources/opensearch_source/src/state_manager.rs 88.07% 10 Missing and 8 partials ⚠️
.../connectors/sources/opensearch_source/src/retry.rs 97.14% 1 Missing ⚠️
Additional details and impacted files
@@              Coverage Diff              @@
##             master    #3515       +/-   ##
=============================================
- Coverage     74.27%   46.85%   -27.42%     
  Complexity      937      937               
=============================================
  Files          1259     1259               
  Lines        125969   110774    -15195     
  Branches     101644    86493    -15151     
=============================================
- Hits          93558    51899    -41659     
- Misses        29396    56146    +26750     
+ Partials       3015     2729      -286     
Components Coverage Δ
Rust Core 39.79% <86.16%> (-35.37%) ⬇️
Java SDK 58.57% <ø> (ø)
C# SDK 71.41% <ø> (-0.70%) ⬇️
Python SDK 88.88% <ø> (ø)
PHP SDK 84.29% <ø> (ø)
Node SDK 91.35% <ø> (+0.12%) ⬆️
Go SDK 40.36% <ø> (ø)
Files with missing lines Coverage Δ
.../connectors/sources/opensearch_source/src/retry.rs 97.14% <97.14%> (ø)
...ors/sources/opensearch_source/src/state_manager.rs 88.07% <88.07%> (ø)
...re/connectors/sources/opensearch_source/src/lib.rs 85.16% <85.16%> (ø)

... and 364 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

ryerraguntla and others added 5 commits June 19, 2026 23:11
Add Apache License, Version 2.0 header comments to various OpenSearch source and test files for licensing compliance. Affected paths include core/connectors/sources/opensearch_source (lib, retry, state_manager, http_tests) and multiple integration test fixtures and opensearch test files under core/integration/tests/connectors/opensearch. No functional changes — only file header updates to ensure consistent license attribution.
Strip redundant Apache Software Foundation license block comments from OpenSearch source and integration test files to clean up file headers. Affected files include core/connectors/sources/opensearch_source/src/{http_tests.rs,lib.rs,retry.rs,state_manager.rs} and several integration test fixtures and opensearch tests under core/integration/tests/connectors/{fixtures/opensearch,opensearch}. No functional code changes.
Expand the ok_or_else closure into a block for clearer formatting in core/connectors/sources/opensearch_source/src/lib.rs. This is a non-functional change that improves readability around the connector initialization error message.
Use an inclusive range contains check when deciding if a numeric timestamp is already in milliseconds (improves readability and avoids manual comparisons). Update OpenSearch resilience test fixture: import new env constants, set ENV_SOURCE_MAX_OPEN_RETRIES to "2", and configure circuit breaker envs (ENV_SOURCE_CIRCUIT_BREAKER_THRESHOLD="3", ENV_SOURCE_CIRCUIT_BREAKER_COOL_DOWN="500ms") so tests exercise the circuit-breaker behavior.
@ryerraguntla

Copy link
Copy Markdown
Contributor Author

/ready

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

S-waiting-on-review PR is waiting on a reviewer

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat(connector): Add Opensearch source connector

1 participant