Skip to content

Commit b03cec4

Browse files
[common] Add V1 log batch format with statistics collection (#2886)
1 parent 219054e commit b03cec4

40 files changed

+4865
-129
lines changed

fluss-client/src/main/java/org/apache/fluss/client/write/ArrowLogWriteBatch.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,14 @@
2323
import org.apache.fluss.memory.MemorySegment;
2424
import org.apache.fluss.metadata.PhysicalTablePath;
2525
import org.apache.fluss.record.ChangeType;
26+
import org.apache.fluss.record.LogRecordBatchStatisticsCollector;
2627
import org.apache.fluss.record.MemoryLogRecordsArrowBuilder;
2728
import org.apache.fluss.record.bytesview.BytesView;
2829
import org.apache.fluss.row.InternalRow;
2930
import org.apache.fluss.row.arrow.ArrowWriter;
3031
import org.apache.fluss.rpc.messages.ProduceLogRequest;
3132

33+
import javax.annotation.Nullable;
3234
import javax.annotation.concurrent.NotThreadSafe;
3335

3436
import java.io.IOException;
@@ -55,11 +57,13 @@ public ArrowLogWriteBatch(
5557
int schemaId,
5658
ArrowWriter arrowWriter,
5759
AbstractPagedOutputView outputView,
58-
long createdMs) {
60+
long createdMs,
61+
@Nullable LogRecordBatchStatisticsCollector statisticsCollector) {
5962
super(bucketId, physicalTablePath, createdMs);
6063
this.outputView = outputView;
6164
this.recordsBuilder =
62-
MemoryLogRecordsArrowBuilder.builder(schemaId, arrowWriter, outputView, true);
65+
MemoryLogRecordsArrowBuilder.builder(
66+
schemaId, arrowWriter, outputView, true, statisticsCollector);
6367
}
6468

6569
@Override

fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.fluss.metadata.TableBucket;
3333
import org.apache.fluss.metadata.TableInfo;
3434
import org.apache.fluss.metrics.MetricNames;
35+
import org.apache.fluss.record.LogRecordBatchStatisticsCollector;
3536
import org.apache.fluss.row.arrow.ArrowWriter;
3637
import org.apache.fluss.row.arrow.ArrowWriterPool;
3738
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
@@ -628,13 +629,20 @@ private WriteBatch createWriteBatch(
628629
outputView.getPreAllocatedSize(),
629630
tableInfo.getRowType(),
630631
tableInfo.getTableConfig().getArrowCompressionInfo());
632+
LogRecordBatchStatisticsCollector statisticsCollector = null;
633+
if (tableInfo.isStatisticsEnabled()) {
634+
statisticsCollector =
635+
new LogRecordBatchStatisticsCollector(
636+
tableInfo.getRowType(), tableInfo.getStatsIndexMapping());
637+
}
631638
return new ArrowLogWriteBatch(
632639
bucketId,
633640
physicalTablePath,
634641
tableInfo.getSchemaId(),
635642
arrowWriter,
636643
outputView,
637-
clock.milliseconds());
644+
clock.milliseconds(),
645+
statisticsCollector);
638646

639647
case COMPACTED_LOG:
640648
return new CompactedLogWriteBatch(

fluss-client/src/test/java/org/apache/fluss/client/write/ArrowLogWriteBatchTest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,8 @@ void testAppendWithPreAllocatedMemorySegments() throws Exception {
136136
DATA1_ROW_TYPE,
137137
DEFAULT_COMPRESSION),
138138
new PreAllocatedPagedOutputView(memorySegmentList),
139-
System.currentTimeMillis());
139+
System.currentTimeMillis(),
140+
null);
140141
assertThat(arrowLogWriteBatch.pooledMemorySegments()).isEqualTo(memorySegmentList);
141142

142143
int count = 0;
@@ -210,7 +211,8 @@ void testArrowCompressionRatioEstimated() throws Exception {
210211
DATA1_TABLE_INFO.getSchemaId(),
211212
arrowWriter,
212213
new PreAllocatedPagedOutputView(memorySegmentList),
213-
System.currentTimeMillis());
214+
System.currentTimeMillis(),
215+
null);
214216

215217
int recordCount = 0;
216218
while (arrowLogWriteBatch.tryAppend(
@@ -310,7 +312,8 @@ private ArrowLogWriteBatch createArrowLogWriteBatch(TableBucket tb, int maxSizeI
310312
DATA1_ROW_TYPE,
311313
DEFAULT_COMPRESSION),
312314
new UnmanagedPagedOutputView(128),
313-
System.currentTimeMillis());
315+
System.currentTimeMillis(),
316+
null);
314317
}
315318

316319
private WriteCallback newWriteCallback() {

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1579,6 +1579,22 @@ public class ConfigOptions {
15791579
+ "This mode reduces storage and transmission costs but loses the ability to track previous values. "
15801580
+ "This option only affects primary key tables.");
15811581

1582+
public static final ConfigOption<String> TABLE_STATISTICS_COLUMNS =
1583+
key("table.statistics.columns")
1584+
.stringType()
1585+
.noDefaultValue()
1586+
.withDescription(
1587+
"Configures column-level statistics collection for the table. "
1588+
+ "By default this option is not set and no column statistics are collected. "
1589+
+ "The value '*' means collect statistics for all supported columns. "
1590+
+ "A comma-separated list of column names means collect statistics only for the specified columns. "
1591+
+ "Supported types include: BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, "
1592+
+ "STRING, CHAR, DECIMAL, DATE, TIME, TIMESTAMP, and TIMESTAMP_LTZ. "
1593+
+ "Example: 'id,name,timestamp' to collect statistics only for specified columns. "
1594+
+ "Note: enabling column statistics requires the V1 batch format. "
1595+
+ "Downstream consumers must be upgraded to Fluss v1.0+ before enabling this option, "
1596+
+ "as older versions cannot parse the extended batch format.");
1597+
15821598
// ------------------------------------------------------------------------
15831599
// ConfigOptions for Kv
15841600
// ------------------------------------------------------------------------
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.config;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
22+
import java.util.Collections;
23+
import java.util.List;
24+
import java.util.Objects;
25+
26+
import static org.apache.fluss.utils.Preconditions.checkNotNull;
27+
28+
/**
29+
* Configuration for statistics columns collection with three states:
30+
*
31+
* <ul>
32+
* <li>{@link Mode#DISABLED}: Statistics collection is disabled (config not set).
33+
* <li>{@link Mode#ALL}: Collect statistics for all supported columns ("*" configuration).
34+
* <li>{@link Mode#SPECIFIED}: Collect statistics for specific columns only.
35+
* </ul>
36+
*
37+
* @since 1.0
38+
*/
39+
@PublicEvolving
40+
public class StatisticsColumnsConfig {
41+
42+
/** The mode of statistics columns collection. */
43+
public enum Mode {
44+
/** Statistics collection is disabled. */
45+
DISABLED,
46+
/** Collect statistics for all supported columns. */
47+
ALL,
48+
/** Collect statistics for specified columns only. */
49+
SPECIFIED
50+
}
51+
52+
private static final StatisticsColumnsConfig DISABLED_INSTANCE =
53+
new StatisticsColumnsConfig(Mode.DISABLED, Collections.emptyList());
54+
55+
private static final StatisticsColumnsConfig ALL_INSTANCE =
56+
new StatisticsColumnsConfig(Mode.ALL, Collections.emptyList());
57+
58+
private final Mode mode;
59+
private final List<String> columns;
60+
61+
private StatisticsColumnsConfig(Mode mode, List<String> columns) {
62+
this.mode = mode;
63+
this.columns = columns;
64+
}
65+
66+
/** Creates a disabled statistics columns configuration. */
67+
public static StatisticsColumnsConfig disabled() {
68+
return DISABLED_INSTANCE;
69+
}
70+
71+
/** Creates a configuration that collects statistics for all supported columns. */
72+
public static StatisticsColumnsConfig all() {
73+
return ALL_INSTANCE;
74+
}
75+
76+
/** Creates a configuration that collects statistics for the specified columns. */
77+
public static StatisticsColumnsConfig of(List<String> columns) {
78+
checkNotNull(columns, "columns must not be null");
79+
return new StatisticsColumnsConfig(Mode.SPECIFIED, Collections.unmodifiableList(columns));
80+
}
81+
82+
/** Returns the mode of statistics columns collection. */
83+
public Mode getMode() {
84+
return mode;
85+
}
86+
87+
/** Returns the specified columns. Only meaningful when mode is {@link Mode#SPECIFIED}. */
88+
public List<String> getColumns() {
89+
return columns;
90+
}
91+
92+
/** Returns whether statistics collection is enabled (mode is not DISABLED). */
93+
public boolean isEnabled() {
94+
return mode != Mode.DISABLED;
95+
}
96+
97+
@Override
98+
public boolean equals(Object o) {
99+
if (this == o) {
100+
return true;
101+
}
102+
if (o == null || getClass() != o.getClass()) {
103+
return false;
104+
}
105+
StatisticsColumnsConfig that = (StatisticsColumnsConfig) o;
106+
return mode == that.mode && Objects.equals(columns, that.columns);
107+
}
108+
109+
@Override
110+
public int hashCode() {
111+
return Objects.hash(mode, columns);
112+
}
113+
114+
@Override
115+
public String toString() {
116+
switch (mode) {
117+
case DISABLED:
118+
return "StatisticsColumnsConfig{DISABLED}";
119+
case ALL:
120+
return "StatisticsColumnsConfig{ALL}";
121+
case SPECIFIED:
122+
return "StatisticsColumnsConfig{SPECIFIED: " + columns + "}";
123+
default:
124+
return "StatisticsColumnsConfig{UNKNOWN}";
125+
}
126+
}
127+
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.config;
19+
20+
import org.apache.fluss.annotation.Internal;
21+
import org.apache.fluss.exception.InvalidConfigException;
22+
import org.apache.fluss.metadata.TableDescriptor;
23+
import org.apache.fluss.types.DataField;
24+
import org.apache.fluss.types.DataType;
25+
import org.apache.fluss.types.DataTypeChecks;
26+
import org.apache.fluss.types.RowType;
27+
28+
import java.util.Map;
29+
import java.util.stream.Collectors;
30+
31+
/**
32+
* Utility class for validating table statistics configuration.
33+
*
34+
* <p>This provides simple validation methods that can be called during CREATE TABLE operations to
35+
* ensure statistics configuration is valid and compatible with the table schema.
36+
*/
37+
@Internal
38+
public class StatisticsConfigUtils {
39+
40+
private StatisticsConfigUtils() {}
41+
42+
/**
43+
* Validates statistics configuration for a table descriptor.
44+
*
45+
* @param tableDescriptor the table descriptor to validate
46+
* @throws InvalidConfigException if the statistics configuration is invalid
47+
*/
48+
public static void validateStatisticsConfig(TableDescriptor tableDescriptor) {
49+
Map<String, String> properties = tableDescriptor.getProperties();
50+
String statisticsColumns = properties.get(ConfigOptions.TABLE_STATISTICS_COLUMNS.key());
51+
52+
// Not set means statistics disabled - no validation needed
53+
if (statisticsColumns == null) {
54+
return;
55+
}
56+
57+
RowType rowType = tableDescriptor.getSchema().getRowType();
58+
59+
// Wildcard means all supported columns - no validation needed
60+
if ("*".equals(statisticsColumns.trim())) {
61+
return;
62+
}
63+
64+
// Parse using TableConfig's logic via StatisticsColumnsConfig
65+
Configuration config = new Configuration();
66+
config.setString(ConfigOptions.TABLE_STATISTICS_COLUMNS.key(), statisticsColumns);
67+
StatisticsColumnsConfig columnsConfig = new TableConfig(config).getStatisticsColumns();
68+
69+
if (columnsConfig.getMode() == StatisticsColumnsConfig.Mode.SPECIFIED
70+
&& columnsConfig.getColumns().isEmpty()) {
71+
throw new InvalidConfigException(
72+
"Statistics columns configuration cannot be empty. "
73+
+ "Use '*' to collect statistics for all supported columns, "
74+
+ "or remove the property to disable statistics collection.");
75+
}
76+
77+
if (columnsConfig.getMode() == StatisticsColumnsConfig.Mode.SPECIFIED) {
78+
validateColumns(rowType, columnsConfig);
79+
}
80+
}
81+
82+
/**
83+
* Validates that the specified columns exist in the schema and are of supported types.
84+
*
85+
* @param rowType the table schema
86+
* @param columnsConfig the statistics columns configuration
87+
* @throws InvalidConfigException if validation fails
88+
*/
89+
private static void validateColumns(RowType rowType, StatisticsColumnsConfig columnsConfig) {
90+
Map<String, DataType> columnTypeMap = buildColumnTypeMap(rowType);
91+
92+
for (String columnName : columnsConfig.getColumns()) {
93+
// Check if column exists
94+
if (!columnTypeMap.containsKey(columnName)) {
95+
throw new InvalidConfigException(
96+
String.format(
97+
"Column '%s' specified in statistics collection does not exist in table schema",
98+
columnName));
99+
}
100+
101+
// Check if column type is supported (whitelist approach)
102+
DataType dataType = columnTypeMap.get(columnName);
103+
if (!DataTypeChecks.isSupportedStatisticsType(dataType)) {
104+
throw new InvalidConfigException(
105+
String.format(
106+
"Column '%s' of type '%s' is not supported for statistics collection. "
107+
+ "Supported types are: BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, "
108+
+ "FLOAT, DOUBLE, STRING, CHAR, DECIMAL, DATE, TIME, TIMESTAMP, "
109+
+ "and TIMESTAMP_LTZ.",
110+
columnName, dataType.asSummaryString()));
111+
}
112+
}
113+
}
114+
115+
/**
116+
* Builds a map from column name to data type for quick lookup.
117+
*
118+
* @param rowType the table schema
119+
* @return map of column name to data type
120+
*/
121+
private static Map<String, DataType> buildColumnTypeMap(RowType rowType) {
122+
return rowType.getFields().stream()
123+
.collect(Collectors.toMap(DataField::getName, DataField::getType));
124+
}
125+
}

0 commit comments

Comments
 (0)