Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ public enum AggFunctionType {

// Boolean aggregation
BOOL_AND,
BOOL_OR;
BOOL_OR,

// Roaring bitmap aggregation
RBM32,
RBM64;

/** Parameter name for delimiter used in LISTAGG and STRING_AGG functions. */
public static final String PARAM_DELIMITER = "delimiter";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,32 @@ public static AggFunction BOOL_OR() {
return new AggFunction(AggFunctionType.BOOL_OR, null);
}

// ===================================================================================
// Roaring Bitmap Aggregation Functions
// ===================================================================================

/**
* Creates a RBM32 aggregation function that merges serialized 32-bit roaring bitmaps.
*
* <p>Supported data types: BYTES
*
* @return a RBM32 aggregation function
*/
public static AggFunction RBM32() {
return new AggFunction(AggFunctionType.RBM32, null);
}

/**
* Creates a RBM64 aggregation function that merges serialized 64-bit roaring bitmaps.
*
* <p>Supported data types: BYTES
*
* @return a RBM64 aggregation function
*/
public static AggFunction RBM64() {
return new AggFunction(AggFunctionType.RBM64, null);
}

// ===================================================================================
// Internal Factory Methods
// ===================================================================================
Expand Down
10 changes: 10 additions & 0 deletions fluss-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@
<artifactId>fluss-shaded-zookeeper</artifactId>
</dependency>

<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
<version>${roaringbitmap.version}</version>
</dependency>

<!--
we will start zookeeper server when start a local cluster,
need include this module for starting zookeeper requires it -->
Expand Down Expand Up @@ -138,6 +144,10 @@
<pattern>org.apache.commons</pattern>
<shadedPattern>org.apache.fluss.shaded.org.apache.commons</shadedPattern>
</relocation>
<relocation>
<pattern>org.roaringbitmap</pattern>
<shadedPattern>org.apache.fluss.shaded.org.roaringbitmap</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.server.kv.rowmerger.aggregate.factory;

/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. */

import org.apache.fluss.metadata.AggFunction;
import org.apache.fluss.metadata.AggFunctionType;
import org.apache.fluss.server.kv.rowmerger.aggregate.functions.FieldRoaringBitmap32Agg;
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.DataTypeRoot;

import static org.apache.fluss.utils.Preconditions.checkArgument;

/** Factory for {@link FieldRoaringBitmap32Agg}. */
public class FieldRoaringBitmap32AggFactory implements FieldAggregatorFactory {

@Override
public FieldRoaringBitmap32Agg create(DataType fieldType, AggFunction aggFunction) {
checkArgument(
fieldType.getTypeRoot() == DataTypeRoot.BYTES,
"Data type for rbm32 column must be 'BytesType' but was '%s'.",
fieldType);
return new FieldRoaringBitmap32Agg(fieldType);
}

@Override
public String identifier() {
return AggFunctionType.RBM32.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.server.kv.rowmerger.aggregate.factory;

/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. */

import org.apache.fluss.metadata.AggFunction;
import org.apache.fluss.metadata.AggFunctionType;
import org.apache.fluss.server.kv.rowmerger.aggregate.functions.FieldRoaringBitmap64Agg;
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.DataTypeRoot;

import static org.apache.fluss.utils.Preconditions.checkArgument;

/** Factory for {@link FieldRoaringBitmap64Agg}. */
public class FieldRoaringBitmap64AggFactory implements FieldAggregatorFactory {

@Override
public FieldRoaringBitmap64Agg create(DataType fieldType, AggFunction aggFunction) {
checkArgument(
fieldType.getTypeRoot() == DataTypeRoot.BYTES,
"Data type for rbm64 column must be 'BytesType' but was '%s'.",
fieldType);
return new FieldRoaringBitmap64Agg(fieldType);
}

@Override
public String identifier() {
return AggFunctionType.RBM64.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.server.kv.rowmerger.aggregate.functions;

/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. */

import org.apache.fluss.server.utils.RoaringBitmapUtils;
import org.apache.fluss.types.DataType;

import org.roaringbitmap.RoaringBitmap;

import java.io.IOException;

/** Roaring bitmap aggregator for serialized 32-bit bitmaps. */
public class FieldRoaringBitmap32Agg extends FieldAggregator {

private static final long serialVersionUID = 1L;
private final RoaringBitmap roaringBitmapAcc;
private final RoaringBitmap roaringBitmapInput;

public FieldRoaringBitmap32Agg(DataType dataType) {
super(dataType);
this.roaringBitmapAcc = new RoaringBitmap();
this.roaringBitmapInput = new RoaringBitmap();
}

@Override
public Object agg(Object accumulator, Object inputField) {
if (accumulator == null || inputField == null) {
return accumulator == null ? inputField : accumulator;
}

try {
RoaringBitmapUtils.deserializeRoaringBitmap32(roaringBitmapAcc, (byte[]) accumulator);
RoaringBitmapUtils.deserializeRoaringBitmap32(roaringBitmapInput, (byte[]) inputField);
roaringBitmapAcc.or(roaringBitmapInput);
return RoaringBitmapUtils.serializeRoaringBitmap32(roaringBitmapAcc);
} catch (IOException e) {
throw new RuntimeException("Unable to se/deserialize roaring bitmap.", e);
} finally {
roaringBitmapAcc.clear();
roaringBitmapInput.clear();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.server.kv.rowmerger.aggregate.functions;

/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. */

import org.apache.fluss.server.utils.RoaringBitmapUtils;
import org.apache.fluss.types.DataType;

import org.roaringbitmap.longlong.Roaring64Bitmap;

import java.io.IOException;

/** Roaring bitmap aggregator for serialized 64-bit bitmaps. */
public class FieldRoaringBitmap64Agg extends FieldAggregator {

private static final long serialVersionUID = 1L;
private final Roaring64Bitmap roaringBitmapAcc;
private final Roaring64Bitmap roaringBitmapInput;

public FieldRoaringBitmap64Agg(DataType dataType) {
super(dataType);
this.roaringBitmapAcc = new Roaring64Bitmap();
this.roaringBitmapInput = new Roaring64Bitmap();
}

@Override
public Object agg(Object accumulator, Object inputField) {
if (accumulator == null || inputField == null) {
return accumulator == null ? inputField : accumulator;
}

try {
RoaringBitmapUtils.deserializeRoaringBitmap64(roaringBitmapAcc, (byte[]) accumulator);
RoaringBitmapUtils.deserializeRoaringBitmap64(roaringBitmapInput, (byte[]) inputField);
roaringBitmapAcc.or(roaringBitmapInput);
return RoaringBitmapUtils.serializeRoaringBitmap64(roaringBitmapAcc);
} catch (IOException e) {
throw new RuntimeException("Unable to se/deserialize roaring bitmap.", e);
} finally {
roaringBitmapAcc.clear();
roaringBitmapInput.clear();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.server.utils;

import org.roaringbitmap.RoaringBitmap;
import org.roaringbitmap.longlong.Roaring64Bitmap;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

/** Utility methods for serializing roaring bitmaps. */
public final class RoaringBitmapUtils {

private RoaringBitmapUtils() {
// Utility class, no instantiation
}

/**
* Serializes a 32-bit RoaringBitmap to a byte array using ByteBuffer.
*
* <p>Uses ByteBuffer as recommended by the RoaringBitmap Javadoc: "This is the preferred method
* to serialize to a byte array (byte[])".
*/
public static byte[] serializeRoaringBitmap32(RoaringBitmap bitmap) throws IOException {
bitmap.runOptimize();
ByteBuffer buffer = ByteBuffer.allocate(bitmap.serializedSizeInBytes());
bitmap.serialize(buffer);
return buffer.array();
}

public static void deserializeRoaringBitmap32(RoaringBitmap bitmap, byte[] bytes)
throws IOException {
bitmap.deserialize(ByteBuffer.wrap(bytes));
}

/**
* Serializes a 64-bit Roaring64Bitmap to a byte array using DataOutputStream.
*
* <p>Note: Unlike RoaringBitmap (32-bit), Roaring64Bitmap does not provide a
* serialize(ByteBuffer) method. It only supports serialize(DataOutput), hence the different
* serialization strategy.
*/
public static byte[] serializeRoaringBitmap64(Roaring64Bitmap bitmap) throws IOException {
bitmap.runOptimize();
try (ByteArrayOutputStream output = new ByteArrayOutputStream();
DataOutputStream dataOutput = new DataOutputStream(output)) {
bitmap.serialize(dataOutput);
Comment thread
platinumhamburg marked this conversation as resolved.
return output.toByteArray();
}
}

public static void deserializeRoaringBitmap64(Roaring64Bitmap bitmap, byte[] bytes)
throws IOException {
bitmap.deserialize(ByteBuffer.wrap(bytes));
}
}
1 change: 1 addition & 0 deletions fluss-server/src/main/resources/META-INF/NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ This project bundles the following dependencies under the Apache Software Licens
- commons-cli:commons-cli:1.5.0
- org.apache.commons:commons-lang3:3.18.0
- org.apache.commons:commons-math3:3.6.1
- org.roaringbitmap:RoaringBitmap:1.3.0
- at.yawk.lz4:lz4-java:1.10.2
- org.xerial.snappy:snappy-java:1.1.10.4

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,5 @@ org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldBoolAndAggFactory
org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldBoolOrAggFactory
org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldListaggAggFactory
org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldStringAggFactory
org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldRoaringBitmap32AggFactory
org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldRoaringBitmap64AggFactory
Loading