Skip to content

feat(gateways): iggy gateway for kafka listener#3519

Open
ryerraguntla wants to merge 15 commits into
apache:masterfrom
ryerraguntla:feat(gateways)/kafka_to_iggy_listener
Open

feat(gateways): iggy gateway for kafka listener#3519
ryerraguntla wants to merge 15 commits into
apache:masterfrom
ryerraguntla:feat(gateways)/kafka_to_iggy_listener

Conversation

@ryerraguntla

Copy link
Copy Markdown
Contributor

Which issue does this PR address?

Closes #3421

Rationale

Existing Kafka clients cannot talk to Iggy without a protocol translation layer. Issue #3421 calls for a foundation-layer TCP listener on the Kafka wire port (9093) so that any Kafka-compatible client can eventually route messages into Iggy streams without any client-side changes. This PR delivers that foundation: decode, version-firewall, and stub responses with no Iggy backend integration yet.

What changed?

Before this PR, Iggy had no surface that spoke the Kafka binary protocol. Kafka clients connecting on port 9093 would receive no response or a connection reset.

This PR adds a new gateways/kafka/ workspace crate (iggy-gateway-kafka) with a tokio-based TCP listener that accepts Kafka wire connections, auto-detects request header v1/v2, enforces a version-firewall (SUPPORTED_RANGES), decodes and stub-encodes six API keys (ApiVersions 18, Metadata 3, Produce 0, Fetch 1, ListOffsets 2, CreateTopics 19), and rejects unsupported keys or versions with UNSUPPORTED_VERSION (error 35) without dropping the connection. A kafka-message-gen helper tool generates golden wire fixtures used by the regression suite.

Key implementation notes:

  • Produce hot path keeps RecordBatch as opaque Bytes — no RecordBatch decode in this layer.
  • BrokerAdvertise is Arc-cloned per connection; no per-connection string allocation.
  • Frame read uses BytesMut + read_buf loop; no zero-initialised heap allocation.
  • SUPPORTED_RANGES is the single source of truth; ApiVersions advertises exactly what the firewall allows.

103 regression tests across 12 suites cover: primitive codec round-trips, adversarial wire inputs, header v1/v2 parsing, version boundary matrix, per-API golden byte fixtures, stub error codes, and full TCP round-trips via a real KafkaServer listener.

Local Execution

  • Passed
  • Pre-commit hooks ran
# Fixture generation (required for decode_validation_tests)
cargo run -p kafka-message-gen -- generate \
  --output gateways/kafka/tools/kafka-tool/kafka_messages \
  --api-key 0 --api-key 1 --api-key 2 --api-key 19

# Regression suite
cargo test -p iggy-gateway-kafka

# Lint
cargo clippy -p iggy-gateway-kafka --all-targets -- -D warnings
cargo fmt --all --check

All 103 tests pass. clippy -D warnings clean. Manual smoke-test procedure followed per gateways/kafka/docs/MANUAL_TESTING.md (ApiVersions, Metadata, Produce, Fetch round-trips; version firewall; oversized frame rejection).

AI Usage

  1. Tools: Claude Sonnet 4.6 (Cursor Cloud Agent)
  2. Scope: Protocol codec implementation (requests.rs, responses.rs, codec.rs, header.rs), server framing (server.rs), error type (error.rs), regression test suites, documentation (SCOPE.md, TEST_SUITE.md, MANUAL_TESTING.md, kafka_api_keys_reference.md), and the kafka-tool fixture generator.
  3. Verification: Every generated file was reviewed line by line. The 103-test regression suite was run locally against the implementation. Manual testing followed the procedure in MANUAL_TESTING.md using kcat and the kafka-message-gen tool against a live listener. Golden byte fixtures pin exact wire output against known-correct Kafka responses.
  4. Explainability: Yes — the author can explain every line. The codec follows the Kafka protocol specification field-by-field; non-obvious decisions (opaque RecordBatch, SUPPORTED_RANGES governance model, Arc<BrokerAdvertise> per-connection, varint=0 null compact array) are documented inline or in SCOPE.md.

ryerraguntla and others added 7 commits June 20, 2026 07:12
TCP listener on 9093 with scoped API decode/validation and stub
responses for apache#3421. Includes kafka-message-gen test tool.

Co-authored-by: Cursor <cursoragent@cursor.com>
Add tokio-util dependency and implement BrokerAdvertise to advertise host/port from server bind address. Replace dynamic supported_api_ranges with a static table and pass BrokerAdvertise into handler so metadata responses include the advertised broker. Harden codec primitives: add MAX_COLLECTION_LEN, checked read_i32_array_count and read_compact_array_count, tagged-field bounds checks, better varint validation, and string-length checks (write_nullable_string returns Result). Update response encoders to take references and use checked conversions for counts. Improve server: use tokio_util::TaskTracker for graceful shutdown, handle transient accept errors, return error-only response for unsupported request header versions, extract correlation id, and add safer frame read/write size checks. Update docs and tests to reflect compact/flexible encoding and new APIs.
Critical protocol correctness:
- Metadata non-flexible: controller_id now written before topics (v1+);
  add rack (v1+), cluster_id (v2+), is_internal (v1+) to match spec
- Metadata flexible (v9): is_internal now written before partitions array
- ListOffsets: replace hardcoded 1_700_000_000_000 stub timestamp with -1
  (Kafka "not available" sentinel)
- read_compact_array_count: treat varint=0 as Ok(0) (null compact array)
  instead of Err; fixes Fetch v7+ with null forgotten_topics from
  librdkafka/kafka-go

Performance:
- read_frame: replace vec![0u8; frame_len] with BytesMut + read_buf loop
  (no zero-initialization)
- handle_connection: single BytesMut alloc for length+header+body via
  new send_response() helper and ResponseHeader::encode_into()
- BrokerAdvertise: wrap in Arc, clone Arc per connection instead of
  String-copying the struct

Codec hardening:
- read_nullable_string / read_compact_nullable_string: single alloc via
  str::from_utf8 on borrowed slice + to_owned()
- read_tagged_fields: remove dead usize::try_from (always succeeds on
  64-bit); compare directly as u64
- write_nullable_bytes: add debug_assert for i32::MAX overflow

API cleanliness:
- Remove 11 out-of-scope API_KEY_* constants (dead code, not referenced)
- requests.rs: null topic name now returns Err(NullTopicName) instead of
  silently mapping to ""
- error.rs: add NullTopicName variant
- header.rs: add encode_into() and encoded_size() for zero-copy framing

Tests: update golden Metadata v0 fixture and api_handler test to match
corrected field order (no controller_id in v0).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Foundation for the Kafka gateway (apache#3421): add manual testing and test-suite docs, a full API key reference, and update README/SCOPE to reflect bind/config notes and test coverage. Implement various protocol, header, codec and server tweaks (src/protocol/*, server.rs, lib.rs, main.rs, error.rs) and add/extend many tests and test helpers (103 regression tests across new and updated suites). Add kafka-tool response helper and fixtures tooling updates to support decode/validation tests and manual end-to-end checks. These changes bundle documentation, test infra, and protocol-level fixes required to validate wire decoding, version firewalling, and stub responses for the gateway.
Add/relax several clippy allow attributes in the Kafka codec module; simplify i32->usize conversion for array length by using a direct cast with a safety comment (avoids unnecessary try_from and map_err), and add #[must_use] to Encoder::freeze to prevent accidental discards. Also add a TODO comment in the Produce response placeholder for populating the topic name.
Rename the crate/binary to iggy-gateway-kafka and update README and Cargo.toml accordingly. Introduce DEFAULT_KAFKA_PORT and refactor the server to accept a pre-bound TcpListener (eliminates bind TOCTOU) and use listener.local_addr() to compute advertised broker information, returning clearer config errors. Harden protocol handling: add Encoder helpers (unchecked nullable strings, write_null_bytes), handle metadata parsing errors safely, downgrade some decode errors to warnings, and adjust metadata encoding for flexible versions. Improve read_frame (single deadline for length+body, chunked reservation to avoid large upfront allocs, truncate extra bytes, and transient accept backoff). Update tests and CI: add rust-gateway component and include gateways in PR-title workflow.
Multiple fixes and improvements to the Kafka gateway:

- Docs: document KAFKA_* env vars in README and normalize package name references from `iggy_gateway_kafka` to `iggy-gateway-kafka` across MANUAL_TESTING and TEST_SUITE.
- Protocol: add MAX_SUPPORTED_METADATA_VERSION and clamp metadata responses to the highest implemented version; treat empty or malformed metadata request bodies as a 0-topic response with appropriate per-topic error instead of failing; remove unused KafkaProtocolError import.
- Server: validate KAFKA_ADVERTISED_HOST length against Kafka nullable string limit and return a config error if exceeded; log a warn when setting TCP_NODELAY fails.
- I/O: rewrite read_frame to use bounded read() slices (avoid allocator over-reads via read_buf) so pipelined frames are not consumed accidentally.
- Tests: add tests for advertised-host length, corrupt metadata partial-body behavior returning zero topics, and that read_frame does not consume pipelined bytes; adjust existing tests/docs to match crate name changes.

These changes fix metadata decoding semantics, prevent frame bleed between pipelined requests, harden config validation, and align documentation/test commands with the package name.
@github-actions

Copy link
Copy Markdown

Thanks for the PR. It is labeled S-waiting-on-review and queued for review.

Slash commands (own line, regular comment) move it around the queue:

  • /ready - back to S-waiting-on-review after addressing feedback
  • /author - flip to S-waiting-on-author while you finish changes
  • /request-review @user-or-team - request a reviewer

See CONTRIBUTING.md for details.

@github-actions github-actions Bot added the S-waiting-on-review PR is waiting on a reviewer label Jun 21, 2026
@codecov

codecov Bot commented Jun 21, 2026

Copy link
Copy Markdown

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 16.49%. Comparing base (4a48008) to head (eaf9fcb).

Additional details and impacted files
@@              Coverage Diff              @@
##             master    #3519       +/-   ##
=============================================
- Coverage     74.27%   16.49%   -57.78%     
  Complexity      937      937               
=============================================
  Files          1259     1256        -3     
  Lines        125969   109914    -16055     
  Branches     101644    85633    -16011     
=============================================
- Hits          93558    18130    -75428     
- Misses        29396    91334    +61938     
+ Partials       3015      450     -2565     
Components Coverage Δ
Rust Core 0.27% <ø> (-74.89%) ⬇️
Java SDK 58.57% <ø> (ø)
C# SDK 71.40% <ø> (-0.71%) ⬇️
Python SDK 88.88% <ø> (ø)
PHP SDK 84.29% <ø> (ø)
Node SDK 91.35% <ø> (+0.12%) ⬆️
Go SDK 40.36% <ø> (ø)
see 739 files with indirect coverage changes
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Adjust trailing newline/whitespace in .github/config/publish.yml and .github/dependabot.yml. These are non-functional formatting fixes to normalize end-of-file newlines and do not change any configuration values.
@ryerraguntla ryerraguntla changed the title feat(gateways):iggy gateway for kafka - listener feat(gateways): iggy gateway for kafka listener Jun 21, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

S-waiting-on-review PR is waiting on a reviewer

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat(gateways): Build Gateway to Iggy for existing Kafka users - Create a TCP Listener for Kafka users

1 participant