feat(gateways): iggy gateway for kafka listener#3519
Open
ryerraguntla wants to merge 15 commits into
Open
Conversation
TCP listener on 9093 with scoped API decode/validation and stub responses for apache#3421. Includes kafka-message-gen test tool. Co-authored-by: Cursor <cursoragent@cursor.com>
Add tokio-util dependency and implement BrokerAdvertise to advertise host/port from server bind address. Replace dynamic supported_api_ranges with a static table and pass BrokerAdvertise into handler so metadata responses include the advertised broker. Harden codec primitives: add MAX_COLLECTION_LEN, checked read_i32_array_count and read_compact_array_count, tagged-field bounds checks, better varint validation, and string-length checks (write_nullable_string returns Result). Update response encoders to take references and use checked conversions for counts. Improve server: use tokio_util::TaskTracker for graceful shutdown, handle transient accept errors, return error-only response for unsupported request header versions, extract correlation id, and add safer frame read/write size checks. Update docs and tests to reflect compact/flexible encoding and new APIs.
Critical protocol correctness: - Metadata non-flexible: controller_id now written before topics (v1+); add rack (v1+), cluster_id (v2+), is_internal (v1+) to match spec - Metadata flexible (v9): is_internal now written before partitions array - ListOffsets: replace hardcoded 1_700_000_000_000 stub timestamp with -1 (Kafka "not available" sentinel) - read_compact_array_count: treat varint=0 as Ok(0) (null compact array) instead of Err; fixes Fetch v7+ with null forgotten_topics from librdkafka/kafka-go Performance: - read_frame: replace vec![0u8; frame_len] with BytesMut + read_buf loop (no zero-initialization) - handle_connection: single BytesMut alloc for length+header+body via new send_response() helper and ResponseHeader::encode_into() - BrokerAdvertise: wrap in Arc, clone Arc per connection instead of String-copying the struct Codec hardening: - read_nullable_string / read_compact_nullable_string: single alloc via str::from_utf8 on borrowed slice + to_owned() - read_tagged_fields: remove dead usize::try_from (always succeeds on 64-bit); compare directly as u64 - write_nullable_bytes: add debug_assert for i32::MAX overflow API cleanliness: - Remove 11 out-of-scope API_KEY_* constants (dead code, not referenced) - requests.rs: null topic name now returns Err(NullTopicName) instead of silently mapping to "" - error.rs: add NullTopicName variant - header.rs: add encode_into() and encoded_size() for zero-copy framing Tests: update golden Metadata v0 fixture and api_handler test to match corrected field order (no controller_id in v0). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Foundation for the Kafka gateway (apache#3421): add manual testing and test-suite docs, a full API key reference, and update README/SCOPE to reflect bind/config notes and test coverage. Implement various protocol, header, codec and server tweaks (src/protocol/*, server.rs, lib.rs, main.rs, error.rs) and add/extend many tests and test helpers (103 regression tests across new and updated suites). Add kafka-tool response helper and fixtures tooling updates to support decode/validation tests and manual end-to-end checks. These changes bundle documentation, test infra, and protocol-level fixes required to validate wire decoding, version firewalling, and stub responses for the gateway.
Add/relax several clippy allow attributes in the Kafka codec module; simplify i32->usize conversion for array length by using a direct cast with a safety comment (avoids unnecessary try_from and map_err), and add #[must_use] to Encoder::freeze to prevent accidental discards. Also add a TODO comment in the Produce response placeholder for populating the topic name.
Rename the crate/binary to iggy-gateway-kafka and update README and Cargo.toml accordingly. Introduce DEFAULT_KAFKA_PORT and refactor the server to accept a pre-bound TcpListener (eliminates bind TOCTOU) and use listener.local_addr() to compute advertised broker information, returning clearer config errors. Harden protocol handling: add Encoder helpers (unchecked nullable strings, write_null_bytes), handle metadata parsing errors safely, downgrade some decode errors to warnings, and adjust metadata encoding for flexible versions. Improve read_frame (single deadline for length+body, chunked reservation to avoid large upfront allocs, truncate extra bytes, and transient accept backoff). Update tests and CI: add rust-gateway component and include gateways in PR-title workflow.
Multiple fixes and improvements to the Kafka gateway: - Docs: document KAFKA_* env vars in README and normalize package name references from `iggy_gateway_kafka` to `iggy-gateway-kafka` across MANUAL_TESTING and TEST_SUITE. - Protocol: add MAX_SUPPORTED_METADATA_VERSION and clamp metadata responses to the highest implemented version; treat empty or malformed metadata request bodies as a 0-topic response with appropriate per-topic error instead of failing; remove unused KafkaProtocolError import. - Server: validate KAFKA_ADVERTISED_HOST length against Kafka nullable string limit and return a config error if exceeded; log a warn when setting TCP_NODELAY fails. - I/O: rewrite read_frame to use bounded read() slices (avoid allocator over-reads via read_buf) so pipelined frames are not consumed accidentally. - Tests: add tests for advertised-host length, corrupt metadata partial-body behavior returning zero topics, and that read_frame does not consume pipelined bytes; adjust existing tests/docs to match crate name changes. These changes fix metadata decoding semantics, prevent frame bleed between pipelined requests, harden config validation, and align documentation/test commands with the package name.
|
Thanks for the PR. It is labeled Slash commands (own line, regular comment) move it around the queue:
See CONTRIBUTING.md for details. |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #3519 +/- ##
=============================================
- Coverage 74.27% 16.49% -57.78%
Complexity 937 937
=============================================
Files 1259 1256 -3
Lines 125969 109914 -16055
Branches 101644 85633 -16011
=============================================
- Hits 93558 18130 -75428
- Misses 29396 91334 +61938
+ Partials 3015 450 -2565
🚀 New features to boost your workflow:
|
Adjust trailing newline/whitespace in .github/config/publish.yml and .github/dependabot.yml. These are non-functional formatting fixes to normalize end-of-file newlines and do not change any configuration values.
and updating the documentation.
Co-authored-by: Cursor <cursoragent@cursor.com>
2 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR address?
Closes #3421
Rationale
Existing Kafka clients cannot talk to Iggy without a protocol translation layer. Issue #3421 calls for a foundation-layer TCP listener on the Kafka wire port (9093) so that any Kafka-compatible client can eventually route messages into Iggy streams without any client-side changes. This PR delivers that foundation: decode, version-firewall, and stub responses with no Iggy backend integration yet.
What changed?
Before this PR, Iggy had no surface that spoke the Kafka binary protocol. Kafka clients connecting on port 9093 would receive no response or a connection reset.
This PR adds a new
gateways/kafka/workspace crate (iggy-gateway-kafka) with atokio-based TCP listener that accepts Kafka wire connections, auto-detects request header v1/v2, enforces a version-firewall (SUPPORTED_RANGES), decodes and stub-encodes six API keys (ApiVersions 18, Metadata 3, Produce 0, Fetch 1, ListOffsets 2, CreateTopics 19), and rejects unsupported keys or versions withUNSUPPORTED_VERSION(error 35) without dropping the connection. Akafka-message-genhelper tool generates golden wire fixtures used by the regression suite.Key implementation notes:
Bytes— no RecordBatch decode in this layer.BrokerAdvertiseisArc-cloned per connection; no per-connection string allocation.BytesMut+read_bufloop; no zero-initialised heap allocation.SUPPORTED_RANGESis the single source of truth;ApiVersionsadvertises exactly what the firewall allows.103 regression tests across 12 suites cover: primitive codec round-trips, adversarial wire inputs, header v1/v2 parsing, version boundary matrix, per-API golden byte fixtures, stub error codes, and full TCP round-trips via a real
KafkaServerlistener.Local Execution
All 103 tests pass.
clippy -D warningsclean. Manual smoke-test procedure followed pergateways/kafka/docs/MANUAL_TESTING.md(ApiVersions, Metadata, Produce, Fetch round-trips; version firewall; oversized frame rejection).AI Usage
requests.rs,responses.rs,codec.rs,header.rs), server framing (server.rs), error type (error.rs), regression test suites, documentation (SCOPE.md,TEST_SUITE.md,MANUAL_TESTING.md,kafka_api_keys_reference.md), and thekafka-toolfixture generator.MANUAL_TESTING.mdusingkcatand thekafka-message-gentool against a live listener. Golden byte fixtures pin exact wire output against known-correct Kafka responses.SUPPORTED_RANGESgovernance model,Arc<BrokerAdvertise>per-connection,varint=0null compact array) are documented inline or inSCOPE.md.