From b44e8515221ed5d5bfb94220f5148ab3e61946f1 Mon Sep 17 00:00:00 2001 From: warmbupt Date: Thu, 4 Jun 2026 10:55:57 +0800 Subject: [PATCH 1/3] docs: comprehensive README update MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add crates.io, docs.rs, and license badges - Add feature matrix and language bindings overview tables - Add project structure tree diagram - Add KV table (upsert + lookup) code example - Update log table example to use current Config API - Add configuration reference table - Add all 5 examples with descriptions - Add development setup instructions - Fix outdated API usage (Args → Config, ConnectionConfig → Config) - Add proper error handling (Result) instead of unwrap() --- README.md | 370 +++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 284 insertions(+), 86 deletions(-) diff --git a/README.md b/README.md index a88ec2f3..055537af 100644 --- a/README.md +++ b/README.md @@ -1,125 +1,323 @@ -# Apache Fluss™ Rust (Incubating) + + +# Apache Fluss™ Rust Client (Incubating) ![Experimental](https://img.shields.io/badge/status-experimental-orange) +[![crates.io](https://img.shields.io/crates/v/fluss-rs.svg)](https://crates.io/crates/fluss-rs) +[![docs.rs](https://img.shields.io/docsrs/fluss-rs)](https://docs.rs/fluss-rs/) +[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0) + +The official Rust client for [Apache Fluss™](https://fluss.apache.org/) (Incubating) — a streaming storage built for real-time analytics, serving as the real-time data layer for Lakehouse architectures. + +This repository contains the **Rust core client** (`fluss-rs`) and **language bindings** for Python, C++, and Elixir. + +--- + +## What is Fluss? + +[Fluss](https://fluss.apache.org/) bridges the gap between streaming data and the data Lakehouse by enabling **low-latency, high-throughput data ingestion and processing** while seamlessly integrating with popular compute engines (Flink, Spark, Trino). + +Key concepts: -Rust implementation of [Apache Fluss™](https://fluss.apache.org/). +- **Log table** — append-only table (no primary key). Immutable records, ideal for event streams and audit trails. +- **Primary Key (KV) table** — keyed table supporting upsert, delete, and point/prefixed lookups. +- **Bucket** — parallelism unit within a table (similar to Kafka partitions). +- **Partition** — data organization by column values (e.g., by date or region). +--- -## Why Fluss? -[Fluss](https://fluss.apache.org/) is a streaming storage built for real-time analytics which can serve as the real-time data layer for Lakehouse architectures. -It bridges the gap between streaming data and the data Lakehouse by enabling low-latency, high-throughput data ingestion and processing while seamlessly integrating with popular compute engines. +## Features + +### Core Client (`fluss-rs`) + +| Category | Capabilities | +| --------------- | ------------------------------------------------------------------------- | +| **Connection** | Bootstrap to Fluss cluster, SASL authentication, graceful shutdown | +| **Admin** | Create/drop/list databases & tables, manage partitions, list offsets | +| **Log Tables** | Append (single-row + Arrow `RecordBatch`), scan with subscribe/poll | +| **KV Tables** | Upsert, delete, point lookup, **prefix lookup**, partitioned KV support | +| **Data Types** | Int, BigInt, String, Float, Double, Boolean, Bytes, Decimal, Date, Time, Timestamp, TimestampLTZ, Char, Binary | +| **Config** | Batch sizing, buffering, retries, compression, timeouts, prefetch, concurrency | +| **Storage** | Memory, Filesystem, S3, OSS (via [OpenDAL](https://opendal.apache.org/)) | +| **WASM** | Compiles for `wasm32` target | + +### Language Bindings + +| Language | Package / Build | Async Runtime | Data Format | +| ---------- | ------------------------ | ------------------------ | --------------------------- | +| **Rust** | [fluss-rs](https://crates.io/crates/fluss-rs) (crates.io) | Tokio | Arrow `RecordBatch` / `GenericRow` | +| **Python** | Build from source (PyO3) | asyncio | PyArrow / Pandas / dict | +| **C++** | CMake / Bazel (FFI) | Synchronous (Tokio internally) | Arrow RecordBatch / GenericRow | +| **Elixir** | [Rustler](https://github.com/rusterlium/rustler) NIFs | Erlang processes | Elixir values | + +--- + +## Project Structure + +``` +fluss-rust/ +├── crates/ +│ ├── fluss/ # Core Rust client (fluss-rs) +│ │ ├── src/client/ # Connection, Admin, Table, Scan, Upsert, Lookup +│ │ ├── src/metadata/ # Schema, TableDescriptor, DataTypes, Partitions +│ │ ├── src/row/ # GenericRow, InternalRow, Arrow integration +│ │ ├── src/rpc/ # gRPC transport layer +│ │ └── src/config.rs # Client configuration +│ ├── examples/ # 5 runnable examples (log, KV, partitioned, prefix lookup) +│ └── fluss-test-cluster/ # Test harness for integration tests +├── bindings/ +│ ├── python/ # Python binding (PyO3) +│ ├── cpp/ # C++ binding (FFI + header) +│ └── elixir/ # Elixir binding (Rustler NIF) +├── website/ # Docusaurus documentation site +├── docs/ # Supplementary documentation +└── scripts/ # Release & version management +``` -## Why Fluss Rust Client -It's an official Rust client for interacting with Fluss. This client provides foundational capabilities for table management and log streaming operations, enabling developers to explore Fluss within Rust ecosystems. +--- -## Quick-Start +## Quick Start -### Step1 Start Fluss cluster -#### Requirements -Fluss runs on all UNIX-like environments, e.g. Linux, Mac OS X. Before you start to setup the system, make sure you have the following software installed on your test machine: +### Prerequisites -Java 17 or higher (Java 8 and Java 11 are not recommended) -If your cluster does not fulfill these software requirements you will need to install/upgrade it. +- [Java 17+](https://adoptium.net/) for running the Fluss cluster +- [Rust](https://www.rust-lang.org/tools/install) (latest stable) +- Linux or macOS (Windows is not currently supported) -Fluss requires the JAVA_HOME environment variable to be set on all nodes and point to the directory of your Java installation. +### 1. Start a Fluss Cluster -#### Fluss Setup -Go to the [downloads](https://fluss.apache.org/downloads/) page and download the latest Fluss release (currently 0.8.0). Make sure to pick the Fluss package matching your Java version. After downloading the latest release, extract it: ```shell +# Download and extract Fluss (0.8.0+) +curl -LO https://dlcdn.apache.org/incubator/fluss/0.8.0/fluss-0.8.0-incubating-bin.tgz tar -xzf fluss-0.8.0-incubating-bin.tgz cd fluss-0.8.0-incubating/ -``` -You can start Fluss local cluster by running the following command: -```shell + +# Start a local cluster ./bin/local-cluster.sh start ``` -After that, the Fluss local cluster is started. -### Run Provided Example -Only supports Linux or macOs. You will need to [install Rust](https://www.rust-lang.org/tools/install) firstly. +### 2. Add `fluss-rs` to Your Project -After that, go the project directory, build it and run the example: -```shell -cargo build --example example-table --release -cd target/release/examples -./example-table +```toml +[dependencies] +fluss = { package = "fluss-rs", version = "0.2" } +tokio = "1" ``` -The example code is as follows: + +### 3. Write Code + +#### Log Table: Append + Scan + ```rust +use fluss::client::{EARLIEST_OFFSET, FlussConnection}; +use fluss::config::Config; +use fluss::error::Result; +use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; +use fluss::row::{DataGetters, GenericRow}; +use std::time::Duration; + #[tokio::main] -pub async fn main() -> Result<()> { - // 1: create the table; - let mut args = Args::default(); - args.bootstrap_servers = "127.0.0.1:9123".to_string(); - let conn_config = ConnectionConfig::from_args(args); - let conn = FlussConnection::new(conn_config).await; - - let admin = conn.get_admin(); - - let table_descriptor = TableDescriptor::builder() - .schema( - Schema::builder() - .column("c1", DataTypes::int()) - .column("c2", DataTypes::string()) - .build(), - ) - .build(); - - let table_path = TablePath::new("fluss".to_owned(), "rust_test".to_owned()); - - admin - .create_table(&table_path, &table_descriptor, true) - .await - .unwrap(); - - // 2: get the table - let table_info = admin.get_table_info(&table_path).await.unwrap(); - print!("Get created table:\n {}\n", table_info); - - // let's sleep 2 seconds to wait leader ready - thread::sleep(Duration::from_secs(2)); - - // 3: append log to the table - let table = conn.get_table(&table_path).await; - let append_writer = table.new_append().create_writer(); - let batch = record_batch!(("c1", Int32, [1, 2, 3, 4, 5, 6]), ("c2", Utf8, ["a1", "a2", "a3", "a4", "a5", "a6"])).unwrap(); - append_writer.append(batch)?; - append_writer.flush().await?; - println!("Start to scan log records......"); - // 4: scan the records - let log_scanner = table.new_scan().create_log_scanner(); - log_scanner.subscribe(0, 0).await; +async fn main() -> Result<()> { + let mut config = Config::default(); + config.bootstrap_servers = "127.0.0.1:9123".to_string(); + let conn = FlussConnection::new(config).await?; + let admin = conn.get_admin()?; + // Create a log table + let table_path = TablePath::new("my_db", "events"); + let schema = Schema::builder() + .column("ts", DataTypes::bigint()) + .column("message", DataTypes::string()) + .build()?; + let descriptor = TableDescriptor::builder().schema(schema).build()?; + admin.create_table(&table_path, &descriptor, true).await?; + + // Append rows + let table = conn.get_table(&table_path).await?; + let writer = table.new_append()?.create_writer()?; + let mut row = GenericRow::new(2); + row.set_field(0, 1_700_000_000_000i64); + row.set_field(1, "hello fluss"); + writer.append(&row)?; + writer.flush().await?; + + // Scan logs + let scanner = table.new_scan()?.create_log_scanner()?; + scanner.subscribe(0, EARLIEST_OFFSET).await?; loop { - let scan_records = log_scanner.poll(Duration::from_secs(10)).await?; - println!("Start to poll records......"); - for record in scan_records { + let records = scanner.poll(Duration::from_secs(5)).await?; + for record in records { let row = record.row(); - println!( - "{{{}, {}}}@{}", - row.get_int(0), - row.get_string(1), - record.offset() - ); + println!("offset={}, c1={}, c2={}", + record.offset(), row.get_long(0)?, row.get_string(1)?); } } +} +``` + +#### KV Table: Upsert + Lookup + +```rust +use fluss::client::FlussConnection; +use fluss::config::Config; +use fluss::error::Result; +use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; +use fluss::row::{DataGetters, GenericRow}; + +#[tokio::main] +async fn main() -> Result<()> { + let mut config = Config::default(); + config.bootstrap_servers = "127.0.0.1:9123".to_string(); + let conn = FlussConnection::new(config).await?; + let admin = conn.get_admin()?; + + // Create a KV table + let table_path = TablePath::new("my_db", "users"); + let schema = Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .column("score", DataTypes::bigint()) + .primary_key(vec!["id"]) + .build()?; + let descriptor = TableDescriptor::builder().schema(schema).build()?; + admin.create_table(&table_path, &descriptor, true).await?; + + // Upsert rows + let table = conn.get_table(&table_path).await?; + let writer = table.new_upsert()?.create_writer()?; + for (id, name, score) in [(1, "Alice", 95i64), (2, "Bob", 87)] { + let mut row = GenericRow::new(3); + row.set_field(0, id); + row.set_field(1, name); + row.set_field(2, score); + writer.upsert(&row)?; + } + writer.flush().await?; + + // Point lookup by primary key + let lookuper = table.new_lookup()?.create_lookuper()?; + let mut key = GenericRow::new(1); + key.set_field(0, 1i32); + if let Some(row) = lookuper.lookup(&key).await?.get_single_row()? { + println!("id={}, name={}, score={}", + row.get_int(0)?, row.get_string(1)?, row.get_long(2)?); + } + Ok(()) } ``` -You can change it according to your needs, have fun! +#### More Examples + +| Example | Description | +| ---------------------------------------- | ---------------------------------------------- | +| `example-table` | Log table: append + scan with Arrow batch | +| `example-kv-table` | KV table: upsert + point lookup | +| `example-partitioned-kv-table` | KV table with partitions | +| `example-prefix-lookup` | Prefix lookup on bucket keys | +| `example-partitioned-prefix-lookup` | Prefix lookup on partitioned tables | + +Build and run any example: -#### Clear environment -Then, stop your Fluss cluster. Go to your Fluss home, stop it via the following commands: ```shell -./bin/local-cluster.sh stop +cargo build --example example-table --release +./target/release/examples/example-table ``` +--- + +## Configuration + +`Config` supports the following key options (all with sensible defaults): + +| Option | Default | Description | +| ------------------------------------- | ----------------- | --------------------------------------------- | +| `bootstrap_servers` | `127.0.0.1:9123` | Fluss coordinator address | +| `writer.batch.size` | 2 MB | Max batch size before flushing | +| `writer.batch.timeout_ms` | 100 ms | Max time before auto-flush | +| `writer.buffer.memory` | 64 MB | Total buffer memory for pending writes | +| `writer.retries` | `i32::MAX` | Max write retries | +| `scanner.log.fetch.max.bytes` | 16 MB | Max bytes per fetch request | +| `scanner.log.fetch.wait.max.time_ms` | 500 ms | Max wait time for fetch | +| `scanner.remote_log.read.concurrency` | 4 | Concurrency for remote log reads | +| `connect.timeout_ms` | 120 s | Connection timeout | +| `security.sasl.username` / `password` | — | SASL PLAIN authentication | + +Configuration can be set programmatically or via CLI flags (using [`clap`](https://docs.rs/clap)). + +--- + ## Documentation -- [Development Guide](DEVELOPMENT.md) – Build, test, and contribute to fluss-rust. -- [Release Guide](website/docs/release/create-release.md) – How to build, release, and sign official Fluss client packages (Rust, Python, C++). +- **[User Guide](https://clients.fluss.apache.org/)** — Full documentation for Rust, Python, C++, and Elixir clients +- **[API Docs (docs.rs)](https://docs.rs/fluss-rs/)** — Rust crate API reference +- **[Development Guide](DEVELOPMENT.md)** — Build, test, and contribute +- **[Release Guide](website/docs/release/create-release.md)** — How to create an official release + +--- + +## Development + +```shell +# Build +cargo build + +# Run tests +cargo test + +# Run integration tests (requires Docker for test cluster) +cargo test --features integration_tests + +# Build C++ bindings +cd bindings/cpp && mkdir build && cd build && cmake .. && cmake --build . + +# Build Python bindings +cd bindings/python && maturin develop + +# Elixir tests +cd bindings/elixir && mix test +``` + +--- + +## Contributing + +This project is part of the Apache Fluss (Incubating) community. Contributions are welcome! + +- Join the [dev mailing list](https://fluss.apache.org/community/welcome/) +- Check out [DEVELOPMENT.md](DEVELOPMENT.md) for setup instructions +- Submit PRs following the [Apache contribution guidelines](https://www.apache.org/foundation/policies/conduct.html) + +--- ## License -Licensed under the [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0) \ No newline at end of file +Licensed under the [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0). + +``` +Copyright 2025-2026 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (https://www.apache.org/). +``` + +Apache Fluss, Fluss, Apache, the Apache feather logo, and the Apache Fluss project logo +are either registered trademarks or trademarks of The Apache Software Foundation +in the United States and other countries. \ No newline at end of file From 3dcb1719b7f62528115335fe04be1dbf9abbc3db Mon Sep 17 00:00:00 2001 From: warmbupt Date: Thu, 4 Jun 2026 11:38:41 +0800 Subject: [PATCH 2/3] docs: comprehensive README update MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add crates.io, docs.rs, and license badges - Add feature matrix and language bindings overview tables - Add project structure tree diagram - Add KV table (upsert + lookup) code example - Update log table example to use current Config API - Add configuration reference table - Add all 5 examples with descriptions - Add development setup instructions - Fix outdated API usage (Args → Config, ConnectionConfig → Config) - Add proper error handling (Result) instead of unwrap() - Clarify this is a client SDK, not the server --- README.md | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 055537af..437881aa 100644 --- a/README.md +++ b/README.md @@ -24,9 +24,12 @@ [![docs.rs](https://img.shields.io/docsrs/fluss-rs)](https://docs.rs/fluss-rs/) [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0) -The official Rust client for [Apache Fluss™](https://fluss.apache.org/) (Incubating) — a streaming storage built for real-time analytics, serving as the real-time data layer for Lakehouse architectures. +The official Rust **client** library for [Apache Fluss™](https://fluss.apache.org/) (Incubating) — a streaming storage built for real-time analytics, serving as the real-time data layer for Lakehouse architectures. This is a **client SDK**, not the Fluss server itself. -This repository contains the **Rust core client** (`fluss-rs`) and **language bindings** for Python, C++, and Elixir. +This repository contains: + +- **`fluss-rs`** — the Rust core client (crates.io: [`fluss-rs`](https://crates.io/crates/fluss-rs)) +- **Language bindings** — Python, C++, and Elixir clients built on top of `fluss-rs` --- From 484e94ae91f2f3f98a26ec4a5145c9fd715ca6bd Mon Sep 17 00:00:00 2001 From: warmbupt Date: Sat, 6 Jun 2026 17:26:47 +0800 Subject: [PATCH 3/3] fix: address Copilot review comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix tokio dep to include full features (#[tokio::main] requirement) - Use default 'fluss' database instead of non-existent 'my_db' - Remove incorrect '?' on new_scan() (returns TableScan, not Result) - Fix column labels in scan output (c1/c2 → ts/message) - Fix example names to match Cargo.toml targets - Fix config option names to match actual Rust snake_case fields --- README.md | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 437881aa..63d6a9d3 100644 --- a/README.md +++ b/README.md @@ -121,7 +121,7 @@ cd fluss-0.8.0-incubating/ ```toml [dependencies] fluss = { package = "fluss-rs", version = "0.2" } -tokio = "1" +tokio = { version = "1", features = ["full"] } ``` ### 3. Write Code @@ -144,7 +144,7 @@ async fn main() -> Result<()> { let admin = conn.get_admin()?; // Create a log table - let table_path = TablePath::new("my_db", "events"); + let table_path = TablePath::new("fluss", "events"); let schema = Schema::builder() .column("ts", DataTypes::bigint()) .column("message", DataTypes::string()) @@ -162,13 +162,13 @@ async fn main() -> Result<()> { writer.flush().await?; // Scan logs - let scanner = table.new_scan()?.create_log_scanner()?; + let scanner = table.new_scan().create_log_scanner()?; scanner.subscribe(0, EARLIEST_OFFSET).await?; loop { let records = scanner.poll(Duration::from_secs(5)).await?; for record in records { let row = record.row(); - println!("offset={}, c1={}, c2={}", + println!("offset={}, ts={}, message={}", record.offset(), row.get_long(0)?, row.get_string(1)?); } } @@ -192,7 +192,7 @@ async fn main() -> Result<()> { let admin = conn.get_admin()?; // Create a KV table - let table_path = TablePath::new("my_db", "users"); + let table_path = TablePath::new("fluss", "users"); let schema = Schema::builder() .column("id", DataTypes::int()) .column("name", DataTypes::string()) @@ -232,8 +232,8 @@ async fn main() -> Result<()> { | Example | Description | | ---------------------------------------- | ---------------------------------------------- | | `example-table` | Log table: append + scan with Arrow batch | -| `example-kv-table` | KV table: upsert + point lookup | -| `example-partitioned-kv-table` | KV table with partitions | +| `example-upsert-lookup` | KV table: upsert + point lookup | +| `example-partitioned-upsert-lookup` | KV table with partitions | | `example-prefix-lookup` | Prefix lookup on bucket keys | | `example-partitioned-prefix-lookup` | Prefix lookup on partitioned tables | @@ -252,16 +252,16 @@ cargo build --example example-table --release | Option | Default | Description | | ------------------------------------- | ----------------- | --------------------------------------------- | -| `bootstrap_servers` | `127.0.0.1:9123` | Fluss coordinator address | -| `writer.batch.size` | 2 MB | Max batch size before flushing | -| `writer.batch.timeout_ms` | 100 ms | Max time before auto-flush | -| `writer.buffer.memory` | 64 MB | Total buffer memory for pending writes | -| `writer.retries` | `i32::MAX` | Max write retries | -| `scanner.log.fetch.max.bytes` | 16 MB | Max bytes per fetch request | -| `scanner.log.fetch.wait.max.time_ms` | 500 ms | Max wait time for fetch | -| `scanner.remote_log.read.concurrency` | 4 | Concurrency for remote log reads | -| `connect.timeout_ms` | 120 s | Connection timeout | -| `security.sasl.username` / `password` | — | SASL PLAIN authentication | +| `bootstrap_servers` | `127.0.0.1:9123` | Fluss coordinator address | +| `writer_batch_size` | 2 MB | Max batch size before flushing | +| `writer_batch_timeout_ms` | 100 ms | Max time before auto-flush | +| `writer_buffer_memory_size` | 64 MB | Total buffer memory for pending writes | +| `writer_retries` | `i32::MAX` | Max write retries | +| `scanner_log_fetch_max_bytes` | 16 MB | Max bytes per fetch request | +| `scanner_log_fetch_wait_max_time_ms` | 500 ms | Max wait time for fetch | +| `scanner_remote_log_read_concurrency` | 4 | Concurrency for remote log reads | +| `connect_timeout_ms` | 120 s | Connection timeout | +| `security_sasl_username` / `security_sasl_password` | — | SASL PLAIN authentication | Configuration can be set programmatically or via CLI flags (using [`clap`](https://docs.rs/clap)).