Skip to content

Commit bcebe7f

Browse files
[kv] Implement basic Aggregate Merge Engine (#2255)
Co-authored-by: Jark Wu <[email protected]>
1 parent a316c3d commit bcebe7f

File tree

63 files changed

+7428
-56
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+7428
-56
lines changed

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.apache.fluss.exception.TooManyPartitionsException;
5353
import org.apache.fluss.fs.FsPath;
5454
import org.apache.fluss.fs.FsPathAndFileName;
55+
import org.apache.fluss.metadata.AggFunctions;
5556
import org.apache.fluss.metadata.DatabaseDescriptor;
5657
import org.apache.fluss.metadata.DatabaseInfo;
5758
import org.apache.fluss.metadata.DeleteBehavior;
@@ -512,6 +513,47 @@ void testCreateTableWithDeleteBehavior() {
512513
TableInfo tableInfo2 = admin.getTableInfo(tablePath2).join();
513514
assertThat(tableInfo2.getTableConfig().getDeleteBehavior()).hasValue(DeleteBehavior.IGNORE);
514515

516+
// Test 2.5: AGGREGATION merge engine - should set delete behavior to IGNORE
517+
TablePath tablePathAggregate = TablePath.of("fluss", "test_ignore_delete_for_aggregate");
518+
Schema aggregateSchema =
519+
Schema.newBuilder()
520+
.column("id", DataTypes.INT())
521+
.column("count", DataTypes.BIGINT(), AggFunctions.SUM())
522+
.primaryKey("id")
523+
.build();
524+
Map<String, String> propertiesAggregate = new HashMap<>();
525+
propertiesAggregate.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "aggregation");
526+
TableDescriptor tableDescriptorAggregate =
527+
TableDescriptor.builder()
528+
.schema(aggregateSchema)
529+
.comment("aggregate merge engine table")
530+
.properties(propertiesAggregate)
531+
.build();
532+
admin.createTable(tablePathAggregate, tableDescriptorAggregate, false).join();
533+
// Get the table and verify delete behavior is changed to IGNORE
534+
TableInfo tableInfoAggregate = admin.getTableInfo(tablePathAggregate).join();
535+
assertThat(tableInfoAggregate.getTableConfig().getDeleteBehavior())
536+
.hasValue(DeleteBehavior.IGNORE);
537+
538+
// Test 2.6: AGGREGATION merge engine with delete behavior explicitly set to ALLOW - should
539+
// be allowed
540+
TablePath tablePathAggregateAllow =
541+
TablePath.of("fluss", "test_allow_delete_for_aggregate");
542+
Map<String, String> propertiesAggregateAllow = new HashMap<>();
543+
propertiesAggregateAllow.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "aggregation");
544+
propertiesAggregateAllow.put(ConfigOptions.TABLE_DELETE_BEHAVIOR.key(), "ALLOW");
545+
TableDescriptor tableDescriptorAggregateAllow =
546+
TableDescriptor.builder()
547+
.schema(aggregateSchema)
548+
.comment("aggregate merge engine table with allow delete")
549+
.properties(propertiesAggregateAllow)
550+
.build();
551+
admin.createTable(tablePathAggregateAllow, tableDescriptorAggregateAllow, false).join();
552+
// Get the table and verify delete behavior is set to ALLOW
553+
TableInfo tableInfoAggregateAllow = admin.getTableInfo(tablePathAggregateAllow).join();
554+
assertThat(tableInfoAggregateAllow.getTableConfig().getDeleteBehavior())
555+
.hasValue(DeleteBehavior.ALLOW);
556+
515557
// Test 3: FIRST_ROW merge engine with delete behavior explicitly set to ALLOW
516558
TablePath tablePath3 = TablePath.of("fluss", "test_allow_delete_for_first_row");
517559
Map<String, String> properties3 = new HashMap<>();

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1420,9 +1420,10 @@ public class ConfigOptions {
14201420
.noDefaultValue()
14211421
.withDescription(
14221422
"Defines the merge engine for the primary key table. By default, primary key table doesn't have merge engine. "
1423-
+ "The supported merge engines are `first_row` and `versioned`. "
1423+
+ "The supported merge engines are `first_row`, `versioned`, and `aggregation`. "
14241424
+ "The `first_row` merge engine will keep the first row of the same primary key. "
1425-
+ "The `versioned` merge engine will keep the row with the largest version of the same primary key.");
1425+
+ "The `versioned` merge engine will keep the row with the largest version of the same primary key. "
1426+
+ "The `aggregation` merge engine will aggregate rows with the same primary key using field-level aggregate functions.");
14261427

14271428
public static final ConfigOption<String> TABLE_MERGE_ENGINE_VERSION_COLUMN =
14281429
// we may need to introduce "del-column" in the future to support delete operation
@@ -1440,10 +1441,11 @@ public class ConfigOptions {
14401441
.withDescription(
14411442
"Defines the delete behavior for the primary key table. "
14421443
+ "The supported delete behaviors are `allow`, `ignore`, and `disable`. "
1443-
+ "The `allow` behavior allows normal delete operations (default). "
1444+
+ "The `allow` behavior allows normal delete operations (default for default merge engine). "
14441445
+ "The `ignore` behavior silently skips delete requests without error. "
14451446
+ "The `disable` behavior rejects delete requests with a clear error message. "
1446-
+ "For tables with FIRST_ROW or VERSIONED merge engines, this option defaults to `ignore`.");
1447+
+ "For tables with FIRST_ROW, VERSIONED, or AGGREGATION merge engines, this option defaults to `ignore`. "
1448+
+ "Note: For AGGREGATION merge engine, when set to `allow`, delete operations will remove the entire record.");
14471449

14481450
public static final ConfigOption<String> TABLE_AUTO_INCREMENT_FIELDS =
14491451
key("table.auto-increment.fields")
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.metadata;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
22+
import javax.annotation.Nullable;
23+
24+
import java.io.Serializable;
25+
import java.util.Collections;
26+
import java.util.HashMap;
27+
import java.util.Map;
28+
import java.util.Objects;
29+
30+
/**
31+
* Aggregation function with optional parameters for aggregate merge engine.
32+
*
33+
* <p>This class represents a parameterized aggregation function that can be applied to non-primary
34+
* key columns in aggregation merge engine tables. It encapsulates both the function type and
35+
* function-specific parameters (e.g., delimiter for LISTAGG).
36+
*
37+
* <p>Use {@link AggFunctions} utility class to create instances:
38+
*
39+
* <pre>{@code
40+
* AggFunction sumFunc = AggFunctions.SUM();
41+
* AggFunction listaggFunc = AggFunctions.LISTAGG(";");
42+
* }</pre>
43+
*
44+
* @since 0.9
45+
*/
46+
@PublicEvolving
47+
public final class AggFunction implements Serializable {
48+
49+
private static final long serialVersionUID = 1L;
50+
51+
private final AggFunctionType type;
52+
private final Map<String, String> parameters;
53+
54+
/**
55+
* Creates an aggregation function with the specified type and parameters.
56+
*
57+
* @param type the aggregation function type
58+
* @param parameters the function parameters (nullable)
59+
*/
60+
AggFunction(AggFunctionType type, @Nullable Map<String, String> parameters) {
61+
this.type = Objects.requireNonNull(type, "Aggregation function type must not be null");
62+
this.parameters =
63+
parameters == null || parameters.isEmpty()
64+
? Collections.emptyMap()
65+
: Collections.unmodifiableMap(new HashMap<>(parameters));
66+
}
67+
68+
/**
69+
* Returns the aggregation function type.
70+
*
71+
* @return the function type
72+
*/
73+
public AggFunctionType getType() {
74+
return type;
75+
}
76+
77+
/**
78+
* Returns the function parameters.
79+
*
80+
* @return an immutable map of parameters
81+
*/
82+
public Map<String, String> getParameters() {
83+
return parameters;
84+
}
85+
86+
/**
87+
* Gets a specific parameter value.
88+
*
89+
* @param key the parameter key
90+
* @return the parameter value, or null if not found
91+
*/
92+
@Nullable
93+
public String getParameter(String key) {
94+
return parameters.get(key);
95+
}
96+
97+
/**
98+
* Checks if this function has any parameters.
99+
*
100+
* @return true if parameters are present, false otherwise
101+
*/
102+
public boolean hasParameters() {
103+
return !parameters.isEmpty();
104+
}
105+
106+
/**
107+
* Validates all parameters of this aggregation function.
108+
*
109+
* <p>This method checks that:
110+
*
111+
* <ul>
112+
* <li>All parameter names are supported by the function type
113+
* <li>All parameter values are valid
114+
* </ul>
115+
*
116+
* @throws IllegalArgumentException if any parameter is invalid
117+
*/
118+
public void validate() {
119+
for (Map.Entry<String, String> entry : parameters.entrySet()) {
120+
type.validateParameter(entry.getKey(), entry.getValue());
121+
}
122+
}
123+
124+
@Override
125+
public boolean equals(Object o) {
126+
if (this == o) {
127+
return true;
128+
}
129+
if (o == null || getClass() != o.getClass()) {
130+
return false;
131+
}
132+
AggFunction that = (AggFunction) o;
133+
return type == that.type && parameters.equals(that.parameters);
134+
}
135+
136+
@Override
137+
public int hashCode() {
138+
return Objects.hash(type, parameters);
139+
}
140+
141+
@Override
142+
public String toString() {
143+
if (parameters.isEmpty()) {
144+
return type.toString();
145+
}
146+
return type.toString() + parameters;
147+
}
148+
}
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.metadata;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
22+
import java.util.Collections;
23+
import java.util.Locale;
24+
import java.util.Set;
25+
26+
/**
27+
* Aggregation function type for aggregate merge engine.
28+
*
29+
* <p>This enum represents all supported aggregation function types that can be applied to
30+
* non-primary key columns in aggregation merge engine tables.
31+
*/
32+
@PublicEvolving
33+
public enum AggFunctionType {
34+
// Numeric aggregation
35+
SUM,
36+
PRODUCT,
37+
MAX,
38+
MIN,
39+
40+
// Value selection
41+
LAST_VALUE,
42+
LAST_VALUE_IGNORE_NULLS,
43+
FIRST_VALUE,
44+
FIRST_VALUE_IGNORE_NULLS,
45+
46+
// String aggregation
47+
LISTAGG,
48+
STRING_AGG, // Alias for LISTAGG - maps to same factory
49+
50+
// Boolean aggregation
51+
BOOL_AND,
52+
BOOL_OR;
53+
54+
/** Parameter name for delimiter used in LISTAGG and STRING_AGG functions. */
55+
public static final String PARAM_DELIMITER = "delimiter";
56+
57+
/**
58+
* Returns the set of supported parameter names for this aggregation function.
59+
*
60+
* @return an immutable set of parameter names
61+
*/
62+
public Set<String> getSupportedParameters() {
63+
switch (this) {
64+
case LISTAGG:
65+
case STRING_AGG:
66+
// LISTAGG and STRING_AGG support optional "delimiter" parameter
67+
return Collections.singleton(PARAM_DELIMITER);
68+
default:
69+
// All other functions do not accept parameters
70+
return Collections.emptySet();
71+
}
72+
}
73+
74+
/**
75+
* Validates a parameter value for this aggregation function.
76+
*
77+
* @param parameterName the parameter name
78+
* @param parameterValue the parameter value
79+
* @throws IllegalArgumentException if the parameter value is invalid
80+
*/
81+
public void validateParameter(String parameterName, String parameterValue) {
82+
// Check if parameter is supported
83+
if (!getSupportedParameters().contains(parameterName)) {
84+
throw new IllegalArgumentException(
85+
String.format(
86+
"Parameter '%s' is not supported for aggregation function '%s'. "
87+
+ "Supported parameters: %s",
88+
parameterName,
89+
this,
90+
getSupportedParameters().isEmpty()
91+
? "none"
92+
: getSupportedParameters()));
93+
}
94+
95+
// Validate parameter value based on function type and parameter name
96+
switch (this) {
97+
case LISTAGG:
98+
case STRING_AGG:
99+
if (PARAM_DELIMITER.equals(parameterName)) {
100+
if (parameterValue == null || parameterValue.isEmpty()) {
101+
throw new IllegalArgumentException(
102+
String.format(
103+
"Parameter '%s' for aggregation function '%s' must be a non-empty string",
104+
parameterName, this));
105+
}
106+
}
107+
break;
108+
default:
109+
// No validation needed for other functions (they don't have parameters)
110+
break;
111+
}
112+
}
113+
114+
/**
115+
* Converts a string to an AggFunctionType enum value.
116+
*
117+
* <p>This method supports multiple naming formats:
118+
*
119+
* <ul>
120+
* <li>Underscore format: "last_value_ignore_nulls"
121+
* <li>Hyphen format: "last-value-ignore-nulls"
122+
* <li>Case insensitive matching
123+
* </ul>
124+
*
125+
* <p>Note: For string_agg, this will return STRING_AGG enum, but the server-side factory will
126+
* map it to the same implementation as listagg.
127+
*
128+
* @param name the aggregation function type name
129+
* @return the AggFunctionType enum value, or null if not found
130+
*/
131+
public static AggFunctionType fromString(String name) {
132+
if (name == null || name.trim().isEmpty()) {
133+
return null;
134+
}
135+
136+
// Normalize the input: convert hyphens to underscores and uppercase
137+
String normalized = name.replace('-', '_').toUpperCase(Locale.ROOT).trim();
138+
139+
try {
140+
return AggFunctionType.valueOf(normalized);
141+
} catch (IllegalArgumentException e) {
142+
return null;
143+
}
144+
}
145+
146+
/**
147+
* Converts this AggFunctionType to its string identifier.
148+
*
149+
* <p>The identifier is the lowercase name with underscores, e.g., "sum", "last_value".
150+
*
151+
* @return the identifier string
152+
*/
153+
@Override
154+
public String toString() {
155+
return name().toLowerCase(Locale.ROOT);
156+
}
157+
}

0 commit comments

Comments
 (0)