Skip to content

Commit b5b5f6f

Browse files
authored
[Kernel][Clustering #4] add withClusteringColumn api (delta-io#4327)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [x] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> Split the main PR delta-io#4265 for faster review This PR implement the `withClusteringColumn` API in kernel to support table creation as a clustered table. It contains steps below, 1. withClusteringColumn takes logicalColumns as input 2. validation (column exist and cannot present together with partitionColumns) 3. Update the protocol to include `clustering` writer feature 4. convert the logical column name to physical column names to create a metadataDomain 5. Add the domainMetadata to domainMetadatasAdded. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->
1 parent 591624a commit b5b5f6f

File tree

9 files changed

+492
-22
lines changed

9 files changed

+492
-22
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/TransactionBuilder.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.delta.kernel.exceptions.DomainDoesNotExistException;
2222
import io.delta.kernel.exceptions.InvalidConfigurationValueException;
2323
import io.delta.kernel.exceptions.UnknownConfigurationException;
24+
import io.delta.kernel.expressions.Column;
2425
import io.delta.kernel.internal.TableConfig;
2526
import io.delta.kernel.types.StructType;
2627
import java.util.List;
@@ -65,11 +66,23 @@ public interface TransactionBuilder {
6566
*
6667
* @param engine {@link Engine} instance to use.
6768
* @param partitionColumns The partition columns of the table. These should be a subset of the
68-
* columns in the schema.
69+
* columns in the schema. Only top-level columns are allowed to be partitioned. Note:
70+
* Clustering columns and partition columns cannot coexist in a table.
6971
* @return updated {@link TransactionBuilder} instance.
7072
*/
7173
TransactionBuilder withPartitionColumns(Engine engine, List<String> partitionColumns);
7274

75+
/**
76+
* Set the list of clustering columns when create a new clustered table.
77+
*
78+
* @param engine {@link Engine} instance to use.
79+
* @param clusteringColumns The clustering columns of the table. These should be a subset of the
80+
* columns in the schema. Both top-level and nested columns are allowed to be clustered. Note:
81+
* Clustering columns and partition columns cannot coexist in a table.
82+
* @return updated {@link TransactionBuilder} instance.
83+
*/
84+
TransactionBuilder withClusteringColumns(Engine engine, List<Column> clusteringColumns);
85+
7386
/**
7487
* Set the transaction identifier for idempotent writes. Incremental processing systems (e.g.,
7588
* streaming systems) that track progress using their own application-specific versions need to

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,17 @@
2424
import static io.delta.kernel.internal.util.SchemaUtils.casePreservingPartitionColNames;
2525
import static io.delta.kernel.internal.util.VectorUtils.buildArrayValue;
2626
import static io.delta.kernel.internal.util.VectorUtils.stringStringMapValue;
27+
import static java.lang.String.format;
2728
import static java.util.Objects.requireNonNull;
2829
import static java.util.stream.Collectors.toSet;
2930

3031
import io.delta.kernel.*;
3132
import io.delta.kernel.engine.Engine;
3233
import io.delta.kernel.exceptions.KernelException;
3334
import io.delta.kernel.exceptions.TableNotFoundException;
35+
import io.delta.kernel.expressions.Column;
3436
import io.delta.kernel.internal.actions.*;
37+
import io.delta.kernel.internal.clustering.ClusteringUtils;
3538
import io.delta.kernel.internal.fs.Path;
3639
import io.delta.kernel.internal.icebergcompat.IcebergCompatV2MetadataValidatorAndUpdater;
3740
import io.delta.kernel.internal.icebergcompat.IcebergWriterCompatV1MetadataValidatorAndUpdater;
@@ -61,6 +64,7 @@ public class TransactionBuilderImpl implements TransactionBuilder {
6164
private final Operation operation;
6265
private Optional<StructType> schema = Optional.empty();
6366
private Optional<List<String>> partitionColumns = Optional.empty();
67+
private Optional<List<Column>> clusteringColumns = Optional.empty();
6468
private Optional<SetTransaction> setTxnOpt = Optional.empty();
6569
private Optional<Map<String, String>> tableProperties = Optional.empty();
6670
private boolean needDomainMetadataSupport = false;
@@ -92,6 +96,14 @@ public TransactionBuilder withPartitionColumns(Engine engine, List<String> parti
9296
return this;
9397
}
9498

99+
@Override
100+
public TransactionBuilder withClusteringColumns(Engine engine, List<Column> clusteringColumns) {
101+
if (!clusteringColumns.isEmpty()) {
102+
this.clusteringColumns = Optional.of(clusteringColumns);
103+
}
104+
return this;
105+
}
106+
95107
@Override
96108
public TransactionBuilder withTransactionId(
97109
Engine engine, String applicationId, long transactionVersion) {
@@ -174,6 +186,9 @@ public Transaction build(Engine engine) {
174186
if (needDomainMetadataSupport) {
175187
manuallyEnabledFeatures.add(TableFeatures.DOMAIN_METADATA_W_FEATURE);
176188
}
189+
if (clusteringColumns.isPresent()) {
190+
manuallyEnabledFeatures.add(TableFeatures.CLUSTERING_W_FEATURE);
191+
}
177192

178193
Tuple2<Set<TableFeature>, Optional<Metadata>> newFeaturesAndMetadata =
179194
TableFeatures.extractFeaturePropertyOverrides(newMetadata.orElse(snapshotMetadata));
@@ -234,8 +249,15 @@ public Transaction build(Engine engine) {
234249

235250
/* ----- 5: Validate the metadata change ----- */
236251
// Now that all the config and schema changes have been made validate the old vs new metadata
237-
newMetadata.ifPresent(
238-
metadata -> validateMetadataChange(snapshotMetadata, metadata, isNewTable));
252+
if (newMetadata.isPresent()) {
253+
validateMetadataChange(snapshot, snapshotMetadata, newMetadata.get(), isNewTable);
254+
}
255+
256+
/* ----- 6: Additional validation and adjustment ----- */
257+
List<Column> casePreservingClusteringColumns =
258+
SchemaUtils.casePreservingEligibleClusterColumns(
259+
newMetadata.orElse(snapshotMetadata).getSchema(),
260+
clusteringColumns.orElse(Collections.emptyList()));
239261

240262
return new TransactionImpl(
241263
isNewTable,
@@ -247,6 +269,7 @@ public Transaction build(Engine engine) {
247269
newProtocol.orElse(snapshotProtocol),
248270
newMetadata.orElse(snapshotMetadata),
249271
setTxnOpt,
272+
casePreservingClusteringColumns,
250273
newMetadata.isPresent() /* shouldUpdateMetadata */,
251274
newProtocol.isPresent() /* shouldUpdateProtocol */,
252275
maxRetries,
@@ -259,7 +282,8 @@ public Transaction build(Engine engine) {
259282
* <ul>
260283
* <li>Ensures that the table, as defined by the protocol and metadata of its latest version, is
261284
* writable by Kernel
262-
* <li>Partition columns are not specified for an existing table
285+
* <li>Partition columns and clustering columns are not specified for an existing table
286+
* <li>Partition columns and clustering columns cannot be set together
263287
* <li>The provided schema is valid (e.g. no duplicate columns, valid names)
264288
* <li>Partition columns provided are valid (e.g. they exist, valid data types)
265289
* <li>Concurrent txn has not already committed to the table with same txnId
@@ -278,7 +302,19 @@ private void validateTransactionInputs(Engine engine, SnapshotImpl snapshot, boo
278302
"Table already exists, but provided new partition columns. "
279303
+ "Partition columns can only be set on a new table.");
280304
}
305+
if (clusteringColumns.isPresent()) {
306+
throw tableAlreadyExists(
307+
tablePath,
308+
format(
309+
"Table already exists, but provided new clustering columns %s. "
310+
+ "Clustering columns can only be set on a new table for now.",
311+
clusteringColumns.get()));
312+
}
281313
} else {
314+
checkArgument(
315+
!(partitionColumns.isPresent() && clusteringColumns.isPresent()),
316+
"Partition Columns and Clustering Columns cannot be set at the same time");
317+
282318
// New table verify the given schema and partition columns
283319
ColumnMappingMode mappingMode =
284320
ColumnMapping.getColumnMappingMode(tableProperties.orElse(Collections.emptyMap()));
@@ -310,7 +346,7 @@ private void validateTransactionInputs(Engine engine, SnapshotImpl snapshot, boo
310346
* </ul>
311347
*/
312348
private void validateMetadataChange(
313-
Metadata oldMetadata, Metadata newMetadata, boolean isNewTable) {
349+
SnapshotImpl snapshot, Metadata oldMetadata, Metadata newMetadata, boolean isNewTable) {
314350
ColumnMapping.verifyColumnMappingChange(
315351
oldMetadata.getConfiguration(), newMetadata.getConfiguration(), isNewTable);
316352
IcebergWriterCompatV1MetadataValidatorAndUpdater.validateIcebergWriterCompatV1Change(
@@ -330,6 +366,16 @@ private void validateMetadataChange(
330366
throw new KernelException("Cannot update schema for table when column mapping is disabled");
331367
}
332368

369+
// TODO: revisit this once we want to support schema evolution with clustering columns
370+
Optional<List<Column>> clusteringColumns =
371+
ClusteringUtils.getClusteringColumnsOptional(snapshot);
372+
if (clusteringColumns.isPresent() && !clusteringColumns.get().isEmpty()) {
373+
throw new KernelException(
374+
format(
375+
"Update schema for table with clustering columns %s is not yet supported",
376+
clusteringColumns.get()));
377+
}
378+
333379
SchemaUtils.validateUpdatedSchema(
334380
oldMetadata.getSchema(),
335381
newMetadata.getSchema(),

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import io.delta.kernel.internal.actions.*;
3636
import io.delta.kernel.internal.annotation.VisibleForTesting;
3737
import io.delta.kernel.internal.checksum.CRCInfo;
38+
import io.delta.kernel.internal.clustering.ClusteringUtils;
3839
import io.delta.kernel.internal.data.TransactionStateRow;
3940
import io.delta.kernel.internal.fs.Path;
4041
import io.delta.kernel.internal.hook.CheckpointHook;
@@ -79,6 +80,7 @@ public class TransactionImpl implements Transaction {
7980
private final Protocol protocol;
8081
private final SnapshotImpl readSnapshot;
8182
private final Optional<SetTransaction> setTxnOpt;
83+
private final List<Column> clusteringColumns;
8284
private final boolean shouldUpdateProtocol;
8385
private final Clock clock;
8486
private final Map<String, DomainMetadata> domainMetadatasAdded = new HashMap<>();
@@ -100,6 +102,7 @@ public TransactionImpl(
100102
Protocol protocol,
101103
Metadata metadata,
102104
Optional<SetTransaction> setTxnOpt,
105+
List<Column> clusteringColumns,
103106
boolean shouldUpdateMetadata,
104107
boolean shouldUpdateProtocol,
105108
int maxRetries,
@@ -113,6 +116,7 @@ public TransactionImpl(
113116
this.protocol = protocol;
114117
this.metadata = metadata;
115118
this.setTxnOpt = setTxnOpt;
119+
this.clusteringColumns = clusteringColumns;
116120
this.shouldUpdateMetadata = shouldUpdateMetadata;
117121
this.shouldUpdateProtocol = shouldUpdateProtocol;
118122
this.maxRetries = maxRetries;
@@ -198,7 +202,7 @@ public List<DomainMetadata> getDomainMetadatas() {
198202
if (domainMetadatas.isPresent()) {
199203
return domainMetadatas.get();
200204
}
201-
205+
generateClusteringDomainMetadataIfNeeded();
202206
if (domainMetadatasAdded.isEmpty() && domainMetadatasRemoved.isEmpty()) {
203207
// If no domain metadatas are added or removed, return an empty list. This is to avoid
204208
// unnecessary loading of the domain metadatas from the snapshot (which is an expensive
@@ -585,6 +589,18 @@ private Optional<CRCInfo> buildPostCommitCrcInfoIfCurrentCrcAvailable(
585589
Optional.of(txnId.toString())));
586590
}
587591

592+
/**
593+
* Generate the domain metadata for the clustering columns if they are present in the transaction.
594+
*/
595+
private void generateClusteringDomainMetadataIfNeeded() {
596+
if (TableFeatures.isClusteringTableFeatureSupported(protocol) && !clusteringColumns.isEmpty()) {
597+
DomainMetadata clusteringDomainMetadata =
598+
ClusteringUtils.getClusteringDomainMetadata(clusteringColumns);
599+
addDomainMetadataInternal(
600+
clusteringDomainMetadata.getDomain(), clusteringDomainMetadata.getConfiguration());
601+
}
602+
}
603+
588604
/**
589605
* Get the part of the schema of the table that needs the statistics to be collected per file.
590606
*
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.delta.kernel.internal.clustering;
18+
19+
import io.delta.kernel.expressions.Column;
20+
import io.delta.kernel.internal.SnapshotImpl;
21+
import io.delta.kernel.internal.actions.DomainMetadata;
22+
import java.util.List;
23+
import java.util.Optional;
24+
25+
public class ClusteringUtils {
26+
27+
private ClusteringUtils() {
28+
// Empty private constructor to prevent instantiation
29+
}
30+
31+
/**
32+
* Get the domain metadata for the clustering columns. If column mapping is enabled, pass the list
33+
* of physical names assigned; otherwise, use the logical column names.
34+
*/
35+
public static DomainMetadata getClusteringDomainMetadata(List<Column> clusteringColumns) {
36+
ClusteringMetadataDomain clusteringMetadataDomain =
37+
ClusteringMetadataDomain.fromClusteringColumns(clusteringColumns);
38+
return clusteringMetadataDomain.toDomainMetadata();
39+
}
40+
41+
/**
42+
* Extract ClusteringColumns from a given snapshot. Return None if the clustering domain metadata
43+
* is missing.
44+
*/
45+
public static Optional<List<Column>> getClusteringColumnsOptional(SnapshotImpl snapshot) {
46+
return ClusteringMetadataDomain.fromSnapshot(snapshot)
47+
.map(ClusteringMetadataDomain::getClusteringColumns);
48+
}
49+
}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/skipping/StatsSchemaHelper.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,15 @@ public static boolean isSkippingEligibleLiteral(Literal literal) {
5151
return isSkippingEligibleDataType(literal.getDataType());
5252
}
5353

54+
/** Returns true if the given data type is eligible for MIN/MAX data skipping. */
55+
public static boolean isSkippingEligibleDataType(DataType dataType) {
56+
return SKIPPING_ELIGIBLE_TYPE_NAMES.contains(dataType.toString())
57+
||
58+
// DecimalType is eligible but since its string includes scale + precision it needs to
59+
// be matched separately
60+
dataType instanceof DecimalType;
61+
}
62+
5463
/**
5564
* Returns the expected statistics schema given a table schema.
5665
*
@@ -222,15 +231,6 @@ public boolean isSkippingEligibleNullCountColumn(Column column) {
222231
}
223232
};
224233

225-
/** Returns true if the given data type is eligible for MIN/MAX data skipping. */
226-
private static boolean isSkippingEligibleDataType(DataType dataType) {
227-
return SKIPPING_ELIGIBLE_TYPE_NAMES.contains(dataType.toString())
228-
||
229-
// DecimalType is eligible but since its string includes scale + precision it needs to
230-
// be matched separately
231-
dataType instanceof DecimalType;
232-
}
233-
234234
/**
235235
* Given a data schema returns the expected schema for a min or max statistics column. This means
236236
* 1) replace logical names with physical names 2) set nullable=true 3) only keep stats eligible

kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/SchemaUtils.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
import static io.delta.kernel.internal.DeltaErrors.*;
1919
import static io.delta.kernel.internal.util.ColumnMapping.*;
2020
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
21+
import static java.lang.String.format;
2122

2223
import io.delta.kernel.exceptions.KernelException;
2324
import io.delta.kernel.expressions.Column;
2425
import io.delta.kernel.expressions.Literal;
2526
import io.delta.kernel.internal.DeltaErrors;
2627
import io.delta.kernel.internal.actions.Metadata;
28+
import io.delta.kernel.internal.skipping.StatsSchemaHelper;
2729
import io.delta.kernel.types.*;
2830
import java.util.*;
2931
import java.util.function.Function;
@@ -196,6 +198,37 @@ public static Map<String, Literal> casePreservingPartitionColNames(
196198
Map.Entry::getValue));
197199
}
198200

201+
/**
202+
* Verify the clustering columns exists in the table schema.
203+
*
204+
* @param schema The schema of the table
205+
* @param clusteringCols List of clustering columns
206+
*/
207+
public static List<Column> casePreservingEligibleClusterColumns(
208+
StructType schema, List<Column> clusteringCols) {
209+
210+
List<Tuple2<Column, DataType>> physicalColumnsWithTypes =
211+
clusteringCols.stream()
212+
.map(col -> ColumnMapping.getPhysicalColumnNameAndDataType(schema, col))
213+
.collect(Collectors.toList());
214+
215+
List<String> nonSkippingEligibleColumns =
216+
physicalColumnsWithTypes.stream()
217+
.filter(tuple -> !StatsSchemaHelper.isSkippingEligibleDataType(tuple._2))
218+
.map(tuple -> tuple._1.toString() + " : " + tuple._2)
219+
.collect(Collectors.toList());
220+
221+
if (!nonSkippingEligibleColumns.isEmpty()) {
222+
throw new KernelException(
223+
format(
224+
"Clustering is not supported because the following column(s): %s "
225+
+ "don't support data skipping",
226+
nonSkippingEligibleColumns));
227+
}
228+
229+
return physicalColumnsWithTypes.stream().map(tuple -> tuple._1).collect(Collectors.toList());
230+
}
231+
199232
/**
200233
* Search (case-insensitive) for the given {@code colName} in the {@code schema} and return its
201234
* position in the {@code schema}.

0 commit comments

Comments
 (0)