feat(connectors): opensearch source connector#3515
Open
ryerraguntla wants to merge 20 commits into
Open
Conversation
Mirror elasticsearch_source architecture using the opensearch crate v2.4.0 (rustls-tls) for OpenSearch wire-protocol compatibility. Register as a new workspace member.
Mirror elasticsearch_source documentation, adapted for the OpenSearch wire protocol and state storage type naming.
Add the four canonical source-state unit tests, plus integration fixtures (opensearchproject/opensearch:2.19.1 container) and end-to-end tests covering happy paths (poll, empty index, bulk, restart state persistence) and a negative path (missing index surfaces ConnectorStatus::Error via the runtime API).
…nd code cleanup Refactors the OpenSearch source connector: replaces legacy state handling with a clearer restore_state flow that rejects corrupt runtime state, introduces validation for open() config, and centralises file-backed state storage creation (create_state_storage). Implements search_after cursoring with combined (timestamp_field, _id) sort, robust document timestamp parsing (RFC3339 or epoch), defaults for polling interval and batch size, and optional verbose logging. Cleans up and simplifies state structs (removes scroll/offset fields), improves error messages formatting, and uses parse_duration for interval parsing. Also updates docs and metadata: rewrites the connector README to focus on usage and state semantics, adds a dependencies.md listing runtime deps, updates Cargo.toml to remove simd-json and adjust ignored list, and adds a docker-compose.yml for integration tests. Tests and fixtures were updated to reflect the new cursor/state behaviour and naming conventions.
Add comprehensive HTTP-based unit tests for the OpenSearch source and improve connector state handling and file-backed storage. Introduces a new http_tests module with many tokio/axum tests exercising open/poll/close, auth, queries, paging and file-state persistence. Refactor state serialization/deserialization to use a typed State struct and SOURCE_STATE_VERSION, ensure runtime ConnectorState is authoritative (runtime_state_restored), and validate state storage config separately. Improve file state storage with atomic temp-write+rename, clearer error kinds, and helper validation/creation functions; add unit tests for storage. Update integration fixtures to include typed-fields support and add a default connector state JSON and dev-dependencies (axum, tempfile) for tests.
Major refactor of the OpenSearch source: enforce and skip hits missing sort tuples (log a warning), and move search/poll bookkeeping into a new finalize_poll flow that returns produced messages and a serialized state. Track and persist additional processing stats (avg_batch_processing_time_ms, empty/successful poll counts, last_successful_poll, total_bytes_processed) and compute running average across restarts. Improve state storage: simplify enabled-checks, use atomic temp file write with OpenOptions + sync_data, better error messages, and use ConnectorState.deserialize helper for restores. Add a small-batch test fixture and an integration test for pagination (fetching across search_after pages). Misc: delete default connector state JSON, update dependencies.md note for humantime, add unit test for skipping hits without sort, and rename/clean up several test functions for clarity.
Rename State.total_documents_fetched to total_documents_published across source, state manager, and tests; add serde alias to preserve backwards compat. Change verbose_logging from Option<bool> to bool with a default and update tests accordingly. Improve search_after/cursor handling: preserve and restore cursor across empty polls, expose test helpers (test_search_after, test_last_poll_timestamp), and warn when hits lack _source. Harden file state persistence: use compact JSON, perform atomic write with cleanup on error, sync temp file and parent directory, and only save state when appropriate. Add an example opensearch_source.toml and extend integration tests to detect cursor-reset regressions.
Rename test function from `given_unparseable_timestamp_value_should_return_none` to `given_unparsable_timestamp_value_should_return_none` to correct spelling/consistency in the tests. This is a non-functional change and does not affect runtime behavior.
Rewrite and expand the OpenSearch source README: replace brief Features section with an Architecture overview, add detailed "How it works" (poll cycle, search request, per-hit processing, timestamp parsing), and document state & persistence (runtime msgpack + optional file mirror). Clarify config schema with types/defaults, change default polling_interval to "10s", and add requirements, error variants, limitations, throughput tuning, and troubleshooting guidance. Also standardize tables and behavior notes (search_after semantics, cursor invariants, file save atomicity, at-least-once delivery, missing _source handling).
Change OpenSearch source behavior to treat a batch where none of the hits include a sort tuple as an error and avoid advancing the cursor. In lib.rs last_sort is captured earlier and an explicit Storage error is returned when hits are present but no hit provided a sort tuple. Tests in http_tests.rs were updated: the previous "skip when sort missing" test was renamed and now expects poll() to return an error and increment the error metric; additional test verifies that a trailing hit missing _source still advances the cursor past that hit so it won't be re-fetched. These changes prevent corrupting the search_after cursor for all-no-sort batches while preserving correct advancement for batches that contain missing _source entries.
Avoid treating an empty 'sort' array as valid by filtering out empty arrays when extracting the sort value from hits (core/connectors/sources/opensearch_source/src/lib.rs). In the state storage rename path (core/connectors/sources/opensearch_source/src/state_manager.rs), attempt to remove the temporary file if fs::rename fails and return a Storage error, preventing leftover temp files on failure.
Implement HTTP retry and circuit-breaker resilience for the OpenSearch source. Adds configurable plugin options (max_retries, retry_delay, retry_max_delay, max_open_retries, open_retry_max_delay, circuit_breaker_threshold, circuit_breaker_cool_down) and example config updates. Introduces a new retry helper module (retry.rs) with backoff/jitter/retry-after handling and integrates retry logic into index-exists and search calls in lib.rs (send_search_with_retry, check_index_exists_with_retry). Adds circuit breaker usage (recording successes/failures and skipping polls while open) and refactors poll/error handling. Documents behavior in docs/RESILIENCE.md and expands README with semantics and usage notes (including cursor advancement rules for missing sort/_source). Adds unit tests (http_tests.rs) and integration fixtures/tests (wiremock-backed resilience fixtures and runtime tests), plus connector test configs.. Some of the files are transient. They will be cleaned up after all known issues are fixed.
Refactor OpenSearch source initialization and polling: add max_retries/max_open_retries config options and pre-build a search_body_base during open() instead of rebuilding the query each poll. Harden error handling by returning Connection errors for uninitialized client, converting some Init/Storage errors, and refusing to advance the cursor when an entire batch has valid sort tuples but every hit lacks _source (to avoid losing data). Other changes: optimize send_search_with_retry to avoid unnecessary clones on the final attempt; treat numeric timestamps >1e12 as milliseconds (keep seconds otherwise); capture/clone a state snapshot before persisting and adjust avg batch time calculation to use saturating subtraction; increment error_count/last_error on poll failures; make file state writes race-safe by adding PID to temp file suffix and use sync_all; add serde default for state version. Update tests and README formatting, and remove the humantime dependency from the Opensearch source crate.
Bring CI actions, workflows, and hawkeye.version in line with upstream.
Embed the resilience documentation (retry policy, circuit breaker, at-least-once delivery, and backfill behavior) from core/connectors/sources/opensearch_source/docs/RESILIENCE.md into the connector README. Remove the standalone RESILIENCE.md file and update internal references to point to the new Resilience section in README. No behavioral/code changes—documentation relocation and link updates only.
|
Thanks for the PR. It is labeled Slash commands (own line, regular comment) move it around the queue:
See CONTRIBUTING.md for details. |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #3515 +/- ##
=============================================
- Coverage 74.27% 46.85% -27.42%
Complexity 937 937
=============================================
Files 1259 1259
Lines 125969 110774 -15195
Branches 101644 86493 -15151
=============================================
- Hits 93558 51899 -41659
- Misses 29396 56146 +26750
+ Partials 3015 2729 -286
🚀 New features to boost your workflow:
|
Add Apache License, Version 2.0 header comments to various OpenSearch source and test files for licensing compliance. Affected paths include core/connectors/sources/opensearch_source (lib, retry, state_manager, http_tests) and multiple integration test fixtures and opensearch test files under core/integration/tests/connectors/opensearch. No functional changes — only file header updates to ensure consistent license attribution.
Strip redundant Apache Software Foundation license block comments from OpenSearch source and integration test files to clean up file headers. Affected files include core/connectors/sources/opensearch_source/src/{http_tests.rs,lib.rs,retry.rs,state_manager.rs} and several integration test fixtures and opensearch tests under core/integration/tests/connectors/{fixtures/opensearch,opensearch}. No functional code changes.
Expand the ok_or_else closure into a block for clearer formatting in core/connectors/sources/opensearch_source/src/lib.rs. This is a non-functional change that improves readability around the connector initialization error message.
Use an inclusive range contains check when deciding if a numeric timestamp is already in milliseconds (improves readability and avoids manual comparisons). Update OpenSearch resilience test fixture: import new env constants, set ENV_SOURCE_MAX_OPEN_RETRIES to "2", and configure circuit breaker envs (ENV_SOURCE_CIRCUIT_BREAKER_THRESHOLD="3", ENV_SOURCE_CIRCUIT_BREAKER_COOL_DOWN="500ms") so tests exercise the circuit-breaker behavior.
Contributor
Author
|
/ready |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #3503
Rationale
Iggy had no native way to ingest documents from OpenSearch clusters into Iggy streams. This connector fills that gap for observability pipelines, audit log forwarding, and data lake ingestion scenarios where OpenSearch is the source of truth.
What changed?
Before: no OpenSearch source connector existed. Users had to write custom consumers using the OpenSearch client to bridge documents into Iggy.
After: a new iggy_connector_opensearch_source cdylib plugin implements the Source trait and is loaded by the connectors runtime via FFI. Each poll() issues a search_after-paginated query on (timestamp_field, _id), publishes each hit's _source as a JSON ProducedMessage, and persists the cursor in ConnectorState.
Module layout
Key implementation decisionsCursor mechanism —
search_after on (timestamp_field, _id) sorted ascending.
Two sort keys give stable pagination when timestamps collide; _id is the tiebreaker. No Scroll API, PIT, or offset paging.
State authority — runtime ConnectorState (MessagePack) is authoritative. An optional file-backed JSON mirror (SourceState) is written atomically on close() using write-tmp → fdatasync → rename → dir-fsync. On restart, if a valid runtime state exists it takes priority and file state is not loaded. If state deserialization fails, open() returns InitError to prevent a silent cursor reset.
Validation at open() — timestamp_field is required and non-empty (validated before any network call). batch_size = 0 is rejected. File state config is validated if state.enabled = true.
Error handling — transient OpenSearch errors fail the poll with the error counter incremented. No circuit breaker or HTTP retry in this version (documented limitation). poll() still sleeps polling_interval on error so the runtime does not spin.
At-least-once delivery — cursor advances in memory before ConnectorState is persisted by the runtime. A crash after poll() returns but before the runtime writes state can re-emit the last batch. Documented in README and Limitations.
Validation
All 40 tests pass (16 unit + 4 state_manager unit + 20 axum HTTP mock + 7 Docker integration).
Local Execution
Passed
Pre-commit hooks ran
AI Usage
Configuration reference
Required Fields
Optional Fields
Poll cycle
Each poll() call:Issues POST /{index}/_search sorted by (timestamp_field asc, _id asc) with optional search_after.Maps each hit's _source to a JSON ProducedMessage.Advances the search_after cursor to the sort tuple of the last successfully published hit.Returns ProducedMessages with serialized ConnectorState.Sleeps polling_interval.Hits missing a sort tuple or _source are skipped with warn!; they do not block the cursor.
Known limitations