[FIP-37] Add bitmap aggregate functions: rb_build_agg, rb_or_agg, rb_and_agg and register via FlussCatalog#3398
Conversation
| return false; | ||
| public CatalogFunction getFunction(ObjectPath functionPath) | ||
| throws FunctionNotExistException, CatalogException { | ||
| String className = BUILTIN_BITMAP_FUNCTIONS.get(functionPath.getObjectName().toLowerCase()); |
There was a problem hiding this comment.
uses the default locale.. maybe use toLowerCase(Locale.ROOT)?
| */ | ||
| @FunctionHint( | ||
| accumulator = @DataTypeHint(value = "RAW", bridgedTo = RbAndAggFunction.Accumulator.class)) | ||
| public class RbAndAggFunction extends AggregateFunction<byte[], RbAndAggFunction.Accumulator> { |
There was a problem hiding this comment.
RbAndAggFunction has no retract; the other two throw a clear UnsupportedOperationException. In a non-append-only GROUP BY, Flink will fail with a less obvious message. Consider adding an explicit throwing retract for symmetry and clearer errors.
| protected Connection connection; | ||
| protected Admin admin; | ||
|
|
||
| private static final Map<String, String> BUILTIN_BITMAP_FUNCTIONS; |
There was a problem hiding this comment.
could be wrapped in Collections.unmodifiableMap for safety.
|
Thanks for the clean, well-documented PR @Prajwal-banakar. I have a few suggestions before this is ready to merge, mostly one important test gap and one design question. 1. Add an end-to-end SQL ITCase (most important)The current tests call USE CATALOG fluss_catalog;
SELECT rb_build_agg(id), rb_or_agg(b), rb_and_agg(b) FROM t GROUP BY k;through a 2. Built-in registrationionI think hardcoding a I left a few more minor comments/ Once the end-to-end ITCase is added (1) and we align on the registration approach (2), this should be a quick approve. Thanks again! |
Purpose
Linked issue: Part of #3289
This PR adds the three Phase 1 bitmap aggregate functions and registers them as built-in catalog functions in FlussCatalog, building on the infrastructure from PR #3319
After
USE CATALOG fluss_catalog, users can call these functions directly in Flink SQL without anyCREATE TEMPORARY FUNCTIONstatement.Brief change log
New files in
fluss-flink/fluss-flink-common:RbBuildAggFunction.java—rb_build_agg(INT) -> BYTES. Aggregates a stream of 32-bit integers into a serialized RoaringBitmap. ExtendsAbstractRbAggFunctionfrom PR [FIP-37] Add bitmap infrastructure: BitmapUtils, RoaringBitmapSerializer, AbstractRbAggFunction #3319.RbOrAggFunction.java—rb_or_agg(BYTES) -> BYTES. Unions multiple serialized RoaringBitmaps via bitwise OR across rows. ExtendsAbstractRbAggFunction.RbAndAggFunction.java—rb_and_agg(BYTES) -> BYTES. Intersects multiple serialized RoaringBitmaps via bitwise AND across rows. Unlike the OR/build variants, AND requires a customAccumulatorPOJO with aninitializedflag because an empty bitmap cannot serve as a "not yet initialized" sentinel — once the AND result becomes empty, it must remain empty. The innerAccumulatorSerializerfollows the same pattern asRoaringBitmapSerializer.Modified file:
FlinkCatalog.javaOverrides
listFunctions(),functionExists(), andgetFunction()to serve the three aggregate functions as built-in catalog functions via a staticBUILTIN_BITMAP_FUNCTIONSmap usingCatalogFunctionImplfromflink-table-api-java.Test file:
RbAggFunctionsTest.javaUnit tests covering:
rb_build_agg: basic accumulation, null handling, duplicate deduplication, merge, retract throwsrb_or_agg: union correctness, null/empty input handling, mergerb_and_agg: intersection correctness, null inputs ignored, empty intersection returns null, merge, reset, Accumulator serializer round-tripCatalog registration verified in
FlinkCatalogTest.testBitmapFunctionsRegisteredNote:
AccumulatorSerializerinRbAndAggFunctionusesTypeSerializerSingleton, which is deprecated in Flink 1.20 but is the same approach used inRoaringBitmapSerializer(PR #3319, reviewed and approved). This will be revisited when upgrading to Flink 2.x.The scalar functions (rb_cardinality, rb_build, rb_contains, rb_to_array, rb_or, rb_and) will follow in the next PR.
Tests
Unit tests:
RbAggFunctionsTest— 14 tests, all passing.Catalog integration:
FlinkCatalogTest.testBitmapFunctionsRegisteredandtestViewsAndFunctions— verified against a live Fluss cluster.Verified locally:
./mvnw spotless:apply— 0 violations./mvnw test -pl fluss-flink/fluss-flink-common -Dtest="BitmapUtilsTest,RbAggFunctionsTest,RoaringBitmapSerializerTest,AbstractRbAggFunctionITCase,FlinkCatalogTest"— BUILD SUCCESSFull test run: Tests run: 68, Failures: 0, Errors: 0, Skipped: 0
API and Format
This change does not affect any storage format or wire protocol. The functions operate on existing
BYTEScolumns using the standard RoaringBitmap serialization format already used by the server-sideFieldRoaringBitmap32Agg.Documentation
No user-facing documentation yet. Documentation will be added after the scalar functions are implemented and the full Phase 1 function set is available.