Skip to content

[FIP-37] Add bitmap aggregate functions: rb_build_agg, rb_or_agg, rb_and_agg and register via FlussCatalog#3398

Open
Prajwal-banakar wants to merge 2 commits into
apache:mainfrom
Prajwal-banakar:RoaringBitmap-UDFs-pr2
Open

[FIP-37] Add bitmap aggregate functions: rb_build_agg, rb_or_agg, rb_and_agg and register via FlussCatalog#3398
Prajwal-banakar wants to merge 2 commits into
apache:mainfrom
Prajwal-banakar:RoaringBitmap-UDFs-pr2

Conversation

@Prajwal-banakar
Copy link
Copy Markdown
Contributor

Purpose

Linked issue: Part of #3289

This PR adds the three Phase 1 bitmap aggregate functions and registers them as built-in catalog functions in FlussCatalog, building on the infrastructure from PR #3319

After USE CATALOG fluss_catalog, users can call these functions directly in Flink SQL without any CREATE TEMPORARY FUNCTION statement.

Brief change log

New files in fluss-flink/fluss-flink-common:

  • RbBuildAggFunction.javarb_build_agg(INT) -> BYTES. Aggregates a stream of 32-bit integers into a serialized RoaringBitmap. Extends AbstractRbAggFunction from PR [FIP-37] Add bitmap infrastructure: BitmapUtils, RoaringBitmapSerializer, AbstractRbAggFunction #3319.

  • RbOrAggFunction.javarb_or_agg(BYTES) -> BYTES. Unions multiple serialized RoaringBitmaps via bitwise OR across rows. Extends AbstractRbAggFunction.

  • RbAndAggFunction.javarb_and_agg(BYTES) -> BYTES. Intersects multiple serialized RoaringBitmaps via bitwise AND across rows. Unlike the OR/build variants, AND requires a custom Accumulator POJO with an initialized flag because an empty bitmap cannot serve as a "not yet initialized" sentinel — once the AND result becomes empty, it must remain empty. The inner AccumulatorSerializer follows the same pattern as RoaringBitmapSerializer.

Modified file: FlinkCatalog.java

Overrides listFunctions(), functionExists(), and getFunction() to serve the three aggregate functions as built-in catalog functions via a static BUILTIN_BITMAP_FUNCTIONS map using CatalogFunctionImpl from flink-table-api-java.

Test file: RbAggFunctionsTest.java

Unit tests covering:

  • rb_build_agg: basic accumulation, null handling, duplicate deduplication, merge, retract throws
  • rb_or_agg: union correctness, null/empty input handling, merge
  • rb_and_agg: intersection correctness, null inputs ignored, empty intersection returns null, merge, reset, Accumulator serializer round-trip

Catalog registration verified in FlinkCatalogTest.testBitmapFunctionsRegistered

Note: AccumulatorSerializer in RbAndAggFunction uses TypeSerializerSingleton, which is deprecated in Flink 1.20 but is the same approach used in RoaringBitmapSerializer (PR #3319, reviewed and approved). This will be revisited when upgrading to Flink 2.x.

The scalar functions (rb_cardinality, rb_build, rb_contains, rb_to_array, rb_or, rb_and) will follow in the next PR.

Tests

Unit tests: RbAggFunctionsTest — 14 tests, all passing.

Catalog integration: FlinkCatalogTest.testBitmapFunctionsRegistered and testViewsAndFunctions — verified against a live Fluss cluster.
Verified locally:

  • ./mvnw spotless:apply — 0 violations
  • ./mvnw test -pl fluss-flink/fluss-flink-common -Dtest="BitmapUtilsTest,RbAggFunctionsTest,RoaringBitmapSerializerTest,AbstractRbAggFunctionITCase,FlinkCatalogTest" — BUILD SUCCESS

Full test run: Tests run: 68, Failures: 0, Errors: 0, Skipped: 0

API and Format

This change does not affect any storage format or wire protocol. The functions operate on existing BYTES columns using the standard RoaringBitmap serialization format already used by the server-side FieldRoaringBitmap32Agg.

Documentation

No user-facing documentation yet. Documentation will be added after the scalar functions are implemented and the full Phase 1 function set is available.

@Prajwal-banakar
Copy link
Copy Markdown
Contributor Author

Prajwal-banakar commented May 29, 2026

Hi @polyzos @wuchong could you please help review this?

return false;
public CatalogFunction getFunction(ObjectPath functionPath)
throws FunctionNotExistException, CatalogException {
String className = BUILTIN_BITMAP_FUNCTIONS.get(functionPath.getObjectName().toLowerCase());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uses the default locale.. maybe use toLowerCase(Locale.ROOT)?

*/
@FunctionHint(
accumulator = @DataTypeHint(value = "RAW", bridgedTo = RbAndAggFunction.Accumulator.class))
public class RbAndAggFunction extends AggregateFunction<byte[], RbAndAggFunction.Accumulator> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RbAndAggFunction has no retract; the other two throw a clear UnsupportedOperationException. In a non-append-only GROUP BY, Flink will fail with a less obvious message. Consider adding an explicit throwing retract for symmetry and clearer errors.

protected Connection connection;
protected Admin admin;

private static final Map<String, String> BUILTIN_BITMAP_FUNCTIONS;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be wrapped in Collections.unmodifiableMap for safety.

@polyzos
Copy link
Copy Markdown
Contributor

polyzos commented Jun 5, 2026

Thanks for the clean, well-documented PR @Prajwal-banakar.
The code is in good shape and the rationale comments make it easy to follow.

I have a few suggestions before this is ready to merge, mostly one important test gap and one design question.

1. Add an end-to-end SQL ITCase (most important)

The current tests call accumulate()/getValue() directly, which bypasses the Flink planner. The riskiest part of this feature is the @FunctionHint(accumulator = @DataTypeHint("RAW")) type inference and the custom accumulator serializer flowing through real codegen, plus the new catalog registration resolving the function isn't exercised yet. RAW-accumulator UDFs commonly pass unit tests but break at planner time. Could we add an ITCase (the existing AbstractRbAggFunctionITCase from #3319 is a natural home) that runs something like:

USE CATALOG fluss_catalog;
SELECT rb_build_agg(id), rb_or_agg(b), rb_and_agg(b) FROM t GROUP BY k;

through a TableEnvironment? That would directly validate the "register via FlussCatalog" claim in the PR title.

2. Built-in registrationion

I think hardcoding a name -> className map inside the general-purpose FlinkCatalog means the functions get reported under every database via listFunctions(dbName). Since we only have these functions i think its safe and this list won't keep growing, however I would like more people's thoughts just in case.. @platinumhamburg @wuchong WDYT?

I left a few more minor comments/

Once the end-to-end ITCase is added (1) and we align on the registration approach (2), this should be a quick approve. Thanks again!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants