feat(connectors): add SurrealDB sink connector#3453
Conversation
SurrealDB is a document database target for Iggy connector users, so the sink writes batches with deterministic record ids and bulk INSERT IGNORE to keep runtime redelivery idempotent without per-message round trips. Constraint: User explicitly requested the latest SurrealDB Rust SDK and chose to keep it despite BUSL-1.1 license-validation warnings for SurrealDB crates. Constraint: Local Docker daemon was unavailable, so real-container integration execution could not run here. Rejected: Per-message SDK writes | too many round trips and weaker batching throughput. Rejected: Using the testcontainers SurrealDB module | module source hardcodes an older SurrealDB image. Confidence: medium Scope-risk: moderate Directive: Keep record ids deterministic across releases; changing build_record_id breaks replay idempotency. Tested: cargo fmt --all; cargo sort --no-format --workspace; cargo clippy --all-features --all-targets -- -D warnings; cargo check --all --all-features; cargo test -p iggy_connector_surrealdb_sink; cargo test -p integration --no-run connectors::surrealdb; cargo test --locked --doc; cargo doc --no-deps --all-features --quiet; taplo/license/shellcheck/version/diff/binary checks; prek install Not-tested: Docker-backed SurrealDB integration execution, because Docker daemon was not running locally.
7b8305a to
48c3a9d
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #3453 +/- ##
=============================================
- Coverage 74.27% 50.05% -24.22%
Complexity 937 937
=============================================
Files 1259 1258 -1
Lines 125969 110912 -15057
Branches 101644 86630 -15014
=============================================
- Hits 93558 55521 -38037
- Misses 29396 52579 +23183
+ Partials 3015 2812 -203
🚀 New features to boost your workflow:
|
|
/author |
|
Please check the pre-checks failure |
|
Sure |
HawkEye maps Rust files to the double-slash license style, so the block comments in the new SurrealDB connector files were treated as missing headers by CI. Constraint: CI runs the updated HawkEye-based license check with strict header matching. Confidence: high Scope-risk: narrow Tested: PATH=/opt/homebrew/bin:/Users/radudiaconu/.vite-plus/bin:/Users/radudiaconu/.codex/tmp/arg0/codex-arg0uTrL1r:/Users/radudiaconu/Library/pnpm/bin:/Users/radudiaconu/.opencode/bin:/opt/homebrew/opt/ruby/bin:/opt/homebrew/opt/ruby/bin:/opt/homebrew/opt/ruby/bin:/opt/homebrew/lib/ruby/gems/4.0.0/bin:/Users/radudiaconu/.local/bin:/Users/radudiaconu/Library/Application Support/Herd/bin/:/Users/radudiaconu/.bun/bin:/usr/local/bin:/System/Cryptexes/App/usr/bin:/usr/bin:/bin:/usr/sbin:/sbin:/var/run/com.apple.security.cryptexd/codex.system/bootstrap/usr/local/bin:/var/run/com.apple.security.cryptexd/codex.system/bootstrap/usr/bin:/var/run/com.apple.security.cryptexd/codex.system/bootstrap/usr/appleinternal/bin:/pkg/env/global/bin:/Library/Apple/usr/bin:/usr/local/share/dotnet:~/.dotnet/tools:/opt/homebrew/bin:/opt/zerobrew/bin:/Users/radudiaconu/.zerobrew/bin:/Users/radudiaconu/.cargo/bin:/Users/radudiaconu/Library/Application Support/JetBrains/Toolbox/scripts:/Users/radudiaconu/Library/Android/sdk/platform-tools:/Applications/Codex.app/Contents/Resources ./scripts/ci/license-headers.sh --check; cargo fmt --all --check; git diff --check
The Rust pre-merge machete job reported that the SurrealDB sink crate declared toml without using it. Removing the dev-dependency is simpler than adding an ignore entry. Constraint: CI runs cargo machete --with-metadata and fails on unused dependencies. Rejected: Add cargo-machete metadata ignore | the dependency is genuinely unused. Confidence: high Scope-risk: narrow Tested: cargo sort --no-format --workspace; cargo test -p iggy_connector_surrealdb_sink; PATH=/opt/homebrew/bin:/Users/radudiaconu/.vite-plus/bin:/Users/radudiaconu/.codex/tmp/arg0/codex-arg0uTrL1r:/Users/radudiaconu/Library/pnpm/bin:/Users/radudiaconu/.opencode/bin:/opt/homebrew/opt/ruby/bin:/opt/homebrew/opt/ruby/bin:/opt/homebrew/opt/ruby/bin:/opt/homebrew/lib/ruby/gems/4.0.0/bin:/Users/radudiaconu/.local/bin:/Users/radudiaconu/Library/Application Support/Herd/bin/:/Users/radudiaconu/.bun/bin:/usr/local/bin:/System/Cryptexes/App/usr/bin:/usr/bin:/bin:/usr/sbin:/sbin:/var/run/com.apple.security.cryptexd/codex.system/bootstrap/usr/local/bin:/var/run/com.apple.security.cryptexd/codex.system/bootstrap/usr/bin:/var/run/com.apple.security.cryptexd/codex.system/bootstrap/usr/appleinternal/bin:/pkg/env/global/bin:/Library/Apple/usr/bin:/usr/local/share/dotnet:~/.dotnet/tools:/opt/homebrew/bin:/opt/zerobrew/bin:/Users/radudiaconu/.zerobrew/bin:/Users/radudiaconu/.cargo/bin:/Users/radudiaconu/Library/Application Support/JetBrains/Toolbox/scripts:/Users/radudiaconu/Library/Android/sdk/platform-tools:/Applications/Codex.app/Contents/Resources ./scripts/ci/license-headers.sh --check; cargo fmt --all --check; git diff --check; cargo metadata confirms toml is absent from iggy_connector_surrealdb_sink
|
/ready |
|
/author - could you please check the pre-checks failures |
|
@countradooku - Please check the Cargo.toml comment on licensing and distribution in the apache redistribution. Please check with any other packages which are distributed with apache license. Unresolved this could be a show stopper. otherwise looks good with more of nits. |
|
/author |
|
/ready |
| "Failed to insert SurrealDB batch for connector ID: {}, table: {}, error: {error}", | ||
| self.id, self.table | ||
| ); | ||
| last_error = Some(error); |
There was a problem hiding this comment.
last_error = Some(error) overwrites on every failing batch. Only final batch error returned to caller. Batches 1..N-1 errors silently dropped. Runtime commits offsets on Ok(()) return — earlier data loss not signaled back. Fix: return first error immediately, or collect compound error.
| out.push(HEX[(byte & 0x0f) as usize] as char); | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
validate_identifier — called for table in open() (always). Called for namespace/database only inside ensure_namespace_database(), which runs only when auto_define_table=true (default: false). In
default path, arbitrary namespace/database values flow into Surreal-NS/Surreal-DB HTTP headers and /signin JSON body unvalidated. Fix: call validate_identifier("namespace", ...) and validate_identifier("database", ...) unconditionally in open(), same pattern as Doris sink.
| pub fn new(id: u32, config: SurrealDbSinkConfig) -> Self { | ||
| let table = config.table.clone(); | ||
| let base_url = build_base_url(&config.endpoint, config.use_tls.unwrap_or(false)); | ||
| let auth_scope = AuthScope::from_config(config.auth_scope.as_deref()); |
There was a problem hiding this comment.
AuthScope::from_config — unknown value silently defaults to AuthScope::Root (highest privilege) with only a warn!. Operator typo "roo" → root access granted. Fix: return
Err(Error::InvalidConfigValue) on unknown scope, or default to AuthScope::None.
| self.client.get_mut().take(); | ||
|
|
||
| let messages_processed = self.messages_processed.load(Ordering::Relaxed); | ||
| let insertion_errors = self.insertion_errors.load(Ordering::Relaxed); |
There was a problem hiding this comment.
insertion_errors — on build failure in insert_batch, process_messages calls fetch_add(batch.len()). But insert_batch early-returns on first build_record failure — remaining messages never attempted. Counter inflated by batch.len() - 1 phantom errors. Fix: count 1 error for the single build failure; or count only unprocessed messages.
| .clone() | ||
| .ok_or_else(|| Error::InitError("SurrealDB sink is not connected".to_string())) | ||
| } | ||
|
|
There was a problem hiding this comment.
N concurrent consume() tasks hitting connection error simultaneously each call connect_and_select() (network + signin + optional DDL) in parallel. Last writer wins; N-1 connections orphaned. Under sustained failure: thundering herd. Fix: atomic reconnecting flag + try-approach so losing threads skip reconnect and retry with existing client.
|
@countradooku Except for the process_messages where error reporting could mess up, all others are minor warnings, nits - good to haves and cleanups. |
| let message = error.to_string().to_ascii_lowercase(); | ||
| message.contains("transaction conflict") || message.contains("transaction can be retried") | ||
| } | ||
|
|
There was a problem hiding this comment.
is_connection_error — message.contains("websocket") and message.contains("channel") are dead. Connector uses HTTP-only reqwest::Client; these strings never appear in reqwest errors. Fix: remove both
arms.
| } | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
serde_json::to_string(records) allocates full-batch JSON, then format!("INSERT IGNORE INTO {table} {records}...") allocates again. Fix: serde_json::to_writer into Vec with
manual prefix/suffix.
| ) -> Result<Vec<SurrealSqlStatement>, SurrealDbRequestError> { | ||
| self.execute_sql_request(client, query, None).await | ||
| } | ||
|
|
There was a problem hiding this comment.
.body(query.to_string()) where query: &str converts &str -> String -> Body. But callers have owned String (from build_insert_query) that was borrowed down to &str. Round-trip wastes one allocation per batch attempt. Fix: propagate String or pass Body::from(owned_string) directly.
| message_id: u128, | ||
| offset: u64, | ||
| ) -> String { | ||
| let mut id = |
There was a problem hiding this comment.
String::with_capacity(stream.len() + topic.len() + 70). Actual tail: 's'(1) + '_t'(2) + '_p'(2) + u32::MAX(10) + '_o'(2) + u64::MAX(20) + '_m'(2) + u128-hex(32) = 71. One-char underestimate at max values; one realloc. Fix: +72.
|
/author |
Summary
Adds a SurrealDB sink connector for writing Iggy messages into SurrealDB using the latest SurrealDB Rust SDK version available during implementation,
3.1.4.The connector supports deterministic record IDs, bulk
INSERT IGNOREwrites for idempotent replay, configurable batch sizing, root/namespace/database/no-auth modes, optional table and offset-index definition, payload modes (auto,json,text,base64), metadata/header/checksum/origin timestamp fields, retry/backoff handling, and runtime metrics logging.This also wires the connector into workspace membership, connector docs, example runtime config, binary artifact builds, edge-release output, version bump scripts, and Docker-backed integration test scaffolding.
Tests
cargo fmt --allcargo sort --no-format --workspacecargo clippy --all-features --all-targets -- -D warningscargo check --all --all-featurescargo test -p iggy_connector_surrealdb_sinkcargo test -p integration --no-run connectors::surrealdbcargo test --locked --doccargo doc --no-deps --all-features --quiet./scripts/ci/taplo.sh./scripts/ci/license-headers.sh./scripts/ci/shellcheck.sh./scripts/ci/binary-artifacts.sh --check./scripts/extract-version.sh --checkgit diff --checkprek installKnown Review Items
./scripts/ci/third-party-licenses.sh --validate --manifest core/connectors/sinks/surrealdb_sink/Cargo.tomlreports BUSL-1.1 license failures from SurrealDB SDK crates. This PR intentionally keeps the SDK dependency because the connector targets the latest SurrealDB SDK.