Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1556,4 +1556,102 @@ public void testAddAndRemoveServerTags() throws Exception {
"Server tag PERMANENT_OFFLINE not exists for server 2, the current "
+ "server tag of this server is TEMPORARY_OFFLINE.");
}

@Test
public void testCreateTableWithInvalidAggFunctionDataType() throws Exception {
TablePath tablePath =
TablePath.of(
DEFAULT_TABLE_PATH.getDatabaseName(),
"test_invalid_data_type_for_aggfunction");
Map<String, String> propertiesAggregate = new HashMap<>();
propertiesAggregate.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "aggregation");

Schema schema1 =
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("and_value", DataTypes.STRING(), AggFunctions.BOOL_AND())
.primaryKey("id")
.build();
TableDescriptor t1 =
TableDescriptor.builder()
.schema(schema1)
.comment("aggregate merge engine table")
.properties(propertiesAggregate)
.build();
assertThatThrownBy(() -> admin.createTable(tablePath, t1, false).get())
.cause()
.isInstanceOf(InvalidConfigException.class)
.hasMessageContaining(
"Data type for bool_and column must be 'BooleanType' but was 'STRING'");

Schema schema2 =
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("count", DataTypes.STRING(), AggFunctions.SUM())
.primaryKey("id")
.build();
TableDescriptor t2 =
TableDescriptor.builder()
.schema(schema2)
.comment("aggregate merge engine table")
.properties(propertiesAggregate)
.build();
assertThatThrownBy(() -> admin.createTable(tablePath, t2, false).get())
.cause()
.isInstanceOf(InvalidConfigException.class)
.hasMessageContaining("Data type for sum column must be part of [NUMERIC]");

Schema schema3 =
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("max_value", DataTypes.BOOLEAN(), AggFunctions.MAX())
.primaryKey("id")
.build();
TableDescriptor t3 =
TableDescriptor.builder()
.schema(schema3)
.comment("aggregate merge engine table")
.properties(propertiesAggregate)
.build();
assertThatThrownBy(() -> admin.createTable(tablePath, t3, false).get())
.cause()
.isInstanceOf(InvalidConfigException.class)
.hasMessageContaining(
"Data type for max column must be part of [CHARACTER_STRING, NUMERIC, DATETIME]");

Schema schema4 =
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("list_agg_value", DataTypes.BOOLEAN(), AggFunctions.LISTAGG())
.primaryKey("id")
.build();
TableDescriptor t4 =
TableDescriptor.builder()
.schema(schema4)
.comment("aggregate merge engine table")
.properties(propertiesAggregate)
.build();
assertThatThrownBy(() -> admin.createTable(tablePath, t4, false).get())
.cause()
.isInstanceOf(InvalidConfigException.class)
.hasMessageContaining(
"Data type for listagg column must be part of [CHARACTER_STRING]");

Schema schema5 =
Schema.newBuilder()
.column("id", DataTypes.INT())
.column(
"first",
DataTypes.ARRAY(DataTypes.BIGINT()),
AggFunctions.FIRST_VALUE())
.primaryKey("id")
.build();
TableDescriptor t5 =
TableDescriptor.builder()
.schema(schema5)
.comment("aggregate merge engine table")
.properties(propertiesAggregate)
.build();
admin.createTable(tablePath, t5, false).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.fluss.metadata;

import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.types.DataType;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -104,18 +106,31 @@ public boolean hasParameters() {
}

/**
* Validates all parameters of this aggregation function.
* Validates all parameters and data type of this aggregation function.
*
* <p>This method checks that:
*
* <ul>
* <li>All parameter names are supported by the function type
* <li>All parameter values are valid
* <li>The field data type is valid
* </ul>
*
* @throws IllegalArgumentException if any parameter is invalid
* @param fieldType the field data type
* @throws IllegalArgumentException if any parameter is invalid or data type is invalid
*/
public void validate() {
public void validate(DataType fieldType) {
validateParameters();
validateDataType(fieldType);
}

@VisibleForTesting
void validateDataType(DataType fieldType) {
type.validateDataType(fieldType);
}

@VisibleForTesting
void validateParameters() {
for (Map.Entry<String, String> entry : parameters.entrySet()) {
type.validateParameter(entry.getKey(), entry.getValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@
package org.apache.fluss.metadata;

import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.types.BooleanType;
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.DataTypeFamily;

import java.util.Arrays;
import java.util.Collections;
import java.util.Locale;
import java.util.Set;

import static org.apache.fluss.utils.Preconditions.checkArgument;

/**
* Aggregation function type for aggregate merge engine.
*
Expand Down Expand Up @@ -111,6 +117,60 @@ public void validateParameter(String parameterName, String parameterValue) {
}
}

/**
* Validates a data type for this aggregation function.
*
* @param fieldType the field data type
* @throws IllegalArgumentException if the data type is invalid
*/
public void validateDataType(DataType fieldType) {
switch (this) {
// The bool_and and bool_or don't have specific DataFamily, validate them by
// dataType directly.
case BOOL_AND:
case BOOL_OR:
checkArgument(
fieldType instanceof BooleanType,
"Data type for %s column must be 'BooleanType' but was '%s'.",
toString(),
fieldType);
break;
default:
DataTypeFamily[] dataTypeFamilies = getSupportedDataFamilies();
checkArgument(
fieldType.isAnyOf(dataTypeFamilies),
"Data type for %s column must be part of %s but was '%s'.",
toString(),
Arrays.deepToString(dataTypeFamilies),
fieldType);
break;
}
}

private DataTypeFamily[] getSupportedDataFamilies() {
switch (this) {
case SUM:
case PRODUCT:
return new DataTypeFamily[] {DataTypeFamily.NUMERIC};
case MAX:
case MIN:
return new DataTypeFamily[] {
DataTypeFamily.CHARACTER_STRING, DataTypeFamily.NUMERIC, DataTypeFamily.DATETIME
};
case LAST_VALUE:
case LAST_VALUE_IGNORE_NULLS:
case FIRST_VALUE:
case FIRST_VALUE_IGNORE_NULLS:
return DataTypeFamily.values();
case LISTAGG:
case STRING_AGG:
return new DataTypeFamily[] {DataTypeFamily.CHARACTER_STRING};
default:
throw new IllegalArgumentException(
String.format("%s doesn't support any data type", this));
}
}

/**
* Converts a string to an AggFunctionType enum value.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ void testPartitionedTable() {
@Test
void testInvalidListaggParameterEmptyDelimiter() {
// LISTAGG with empty delimiter - should fail
assertThatThrownBy(() -> AggFunctions.LISTAGG("").validate())
assertThatThrownBy(() -> AggFunctions.LISTAGG("").validateParameters())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("must be a non-empty string");
}
Expand All @@ -332,7 +332,8 @@ void testInvalidListaggParameterUnknownParameter() {
Map<String, String> params = new HashMap<>();
params.put("unknown_param", "value");

assertThatThrownBy(() -> AggFunctions.of(AggFunctionType.LISTAGG, params).validate())
assertThatThrownBy(
() -> AggFunctions.of(AggFunctionType.LISTAGG, params).validateParameters())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("unknown_param")
.hasMessageContaining("not supported");
Expand All @@ -344,7 +345,7 @@ void testInvalidSumFunctionWithParameters() {
Map<String, String> params = new HashMap<>();
params.put("some_param", "value");

assertThatThrownBy(() -> AggFunctions.of(AggFunctionType.SUM, params).validate())
assertThatThrownBy(() -> AggFunctions.of(AggFunctionType.SUM, params).validateParameters())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("some_param")
.hasMessageContaining("not supported");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,11 +296,12 @@ private static void checkMergeEngine(
/**
* Validates aggregation function parameters in the schema.
*
* <p>This method delegates to {@link AggFunction#validate()} to ensure all parameters are valid
* according to the function's requirements.
* <p>This method delegates to {@link AggFunction#validate(DataType)} to ensure all parameters
* and data type are valid according to the function's requirements.
*
* @param schema the schema to validate
* @throws InvalidConfigException if any aggregation function has invalid parameters
* @throws InvalidConfigException if any aggregation function has invalid parameters or data
* types
*/
private static void validateAggregationFunctionParameters(Schema schema) {
// Get primary key columns for early exit
Expand All @@ -317,9 +318,9 @@ private static void validateAggregationFunctionParameters(Schema schema) {
continue;
}

// Validate aggregation function parameters
// Validate aggregation function parameters and data type
try {
aggFunctionOpt.get().validate();
aggFunctionOpt.get().validate(column.getDataType());
} catch (IllegalArgumentException e) {
throw new InvalidConfigException(
String.format(
Expand Down
14 changes: 7 additions & 7 deletions website/docs/table-design/merge-engines/aggregation.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ TableDescriptor.builder()

Computes the product of values across multiple rows.

- **Supported Data Types**: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DECIMAL
- **Supported Data Types**: `TINYINT`, `SMALLINT`, `INT`, `BIGINT`, `FLOAT`, `DOUBLE`, `DECIMAL`
- **Behavior**: Multiplies incoming values with the accumulator
- **Null Handling**: Null values are ignored

Expand Down Expand Up @@ -314,7 +314,7 @@ TableDescriptor.builder()

Identifies and retains the maximum value.

- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
- **Supported Data Types**: `CHAR`, `STRING`, `TINYINT`, `SMALLINT`, `INT`, `BIGINT`, `FLOAT`, `DOUBLE`, `DECIMAL`, `DATE`, `TIME`, `TIMESTAMP`, `TIMESTAMP_LTZ`
- **Behavior**: Keeps the larger value between accumulator and incoming value
- **Null Handling**: Null values are ignored

Expand Down Expand Up @@ -372,7 +372,7 @@ TableDescriptor.builder()

Identifies and retains the minimum value.

- **Supported Data Types**: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
- **Supported Data Types**: `CHAR`, `STRING`, `TINYINT`, `SMALLINT`, `INT`, `BIGINT`, `FLOAT`, `DOUBLE`, `DECIMAL`, `DATE`, `TIME`, `TIMESTAMP`, `TIMESTAMP_LTZ`
- **Behavior**: Keeps the smaller value between accumulator and incoming value
- **Null Handling**: Null values are ignored

Expand Down Expand Up @@ -695,7 +695,7 @@ TableDescriptor.builder()

Concatenates multiple string values into a single string with a delimiter.

- **Supported Data Types**: STRING, CHAR
- **Supported Data Types**: `STRING`, `CHAR`
- **Behavior**: Concatenates values using the specified delimiter
- **Null Handling**: Null values are skipped
- **Delimiter**: Specify delimiter directly in the aggregation function (default is comma `,`)
Expand Down Expand Up @@ -757,7 +757,7 @@ TableDescriptor.builder()

Alias for `listagg`. Concatenates multiple string values into a single string with a delimiter.

- **Supported Data Types**: STRING, CHAR
- **Supported Data Types**: `STRING`, `CHAR`
- **Behavior**: Same as `listagg` - concatenates values using the specified delimiter
- **Null Handling**: Null values are skipped
- **Delimiter**: Specify delimiter directly in the aggregation function (default is comma `,`)
Expand Down Expand Up @@ -824,7 +824,7 @@ TableDescriptor.builder()

Evaluates whether all boolean values in a set are true (logical AND).

- **Supported Data Types**: BOOLEAN
- **Supported Data Types**: `BOOLEAN`
- **Behavior**: Returns true only if all values are true
- **Null Handling**: Null values are ignored

Expand Down Expand Up @@ -881,7 +881,7 @@ TableDescriptor.builder()

Checks if at least one boolean value in a set is true (logical OR).

- **Supported Data Types**: BOOLEAN
- **Supported Data Types**: `BOOLEAN`
- **Behavior**: Returns true if any value is true
- **Null Handling**: Null values are ignored

Expand Down