Skip to content

Commit f0b516c

Browse files
[Kernel] Minor refactor to column mapping related code for clarity (delta-io#4348)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [X] Kernel - [ ] Other (fill in here) ## Description Rename some fields, improve docs, and move some code for better clarity / readability. ## How was this patch tested? Existing tests suffice. ## Does this PR introduce _any_ user-facing changes? No
1 parent 836d5b6 commit f0b516c

File tree

5 files changed

+42
-32
lines changed

5 files changed

+42
-32
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import io.delta.kernel.internal.tablefeatures.TableFeatures;
4848
import io.delta.kernel.internal.util.*;
4949
import io.delta.kernel.internal.util.Clock;
50-
import io.delta.kernel.internal.util.ColumnMapping;
5150
import io.delta.kernel.internal.util.FileNames;
5251
import io.delta.kernel.internal.util.InCommitTimestampUtils;
5352
import io.delta.kernel.internal.util.VectorUtils;
@@ -122,14 +121,7 @@ public TransactionImpl(
122121

123122
@Override
124123
public Row getTransactionState(Engine engine) {
125-
ColumnMapping.ColumnMappingMode mappingMode =
126-
ColumnMapping.getColumnMappingMode(metadata.getConfiguration());
127-
StructType basePhysicalSchema = metadata.getSchema();
128-
StructType physicalSchema =
129-
ColumnMapping.convertToPhysicalSchema(
130-
metadata.getSchema(), basePhysicalSchema, mappingMode);
131-
132-
return TransactionStateRow.of(metadata, dataPath.toString(), physicalSchema);
124+
return TransactionStateRow.of(metadata, dataPath.toString());
133125
}
134126

135127
@Override

kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Metadata.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.delta.kernel.internal.data.GenericRow;
2424
import io.delta.kernel.internal.lang.Lazy;
2525
import io.delta.kernel.internal.types.DataTypeJsonSerDe;
26+
import io.delta.kernel.internal.util.ColumnMapping;
2627
import io.delta.kernel.internal.util.VectorUtils;
2728
import io.delta.kernel.types.*;
2829
import java.util.*;
@@ -239,6 +240,18 @@ public Map<String, String> getConfiguration() {
239240
return Collections.unmodifiableMap(configuration.get());
240241
}
241242

243+
/**
244+
* The full schema (including partition columns) with the field names converted to their physical
245+
* names (column names used in the data files) based on the table's column mapping mode. When
246+
* column mapping mode is ID, fieldId metadata is preserved in the field metadata; all column
247+
* metadata is otherwise removed.
248+
*/
249+
public StructType getPhysicalSchema() {
250+
ColumnMapping.ColumnMappingMode mappingMode =
251+
ColumnMapping.getColumnMappingMode(getConfiguration());
252+
return ColumnMapping.convertToPhysicalSchema(schema, schema, mappingMode);
253+
}
254+
242255
/**
243256
* Filter out the key-value pair matches exactly with the old properties.
244257
*

kernel/kernel-api/src/main/java/io/delta/kernel/internal/data/TransactionStateRow.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@ public class TransactionStateRow extends GenericRow {
4646
.boxed()
4747
.collect(toMap(i -> SCHEMA.at(i).getName(), i -> i));
4848

49-
public static TransactionStateRow of(
50-
Metadata metadata, String tablePath, StructType physicalSchema) {
49+
public static TransactionStateRow of(Metadata metadata, String tablePath) {
5150
HashMap<Integer, Object> valueMap = new HashMap<>();
5251
valueMap.put(COL_NAME_TO_ORDINAL.get("logicalSchemaString"), metadata.getSchemaString());
53-
valueMap.put(COL_NAME_TO_ORDINAL.get("physicalSchemaString"), physicalSchema.toJson());
52+
valueMap.put(
53+
COL_NAME_TO_ORDINAL.get("physicalSchemaString"), metadata.getPhysicalSchema().toJson());
5454
valueMap.put(COL_NAME_TO_ORDINAL.get("partitionColumns"), metadata.getPartitionColumns());
5555
valueMap.put(COL_NAME_TO_ORDINAL.get("configuration"), metadata.getConfigurationMapValue());
5656
valueMap.put(COL_NAME_TO_ORDINAL.get("tablePath"), tablePath);

kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/ColumnMapping.java

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -88,21 +88,26 @@ public static ColumnMappingMode getColumnMappingMode(Map<String, String> configu
8888

8989
/**
9090
* Helper method that converts the logical schema (requested by the connector) to physical schema
91-
* of the data stored in data files based on the table's column mapping mode.
91+
* of the data stored in data files based on the table's column mapping mode. Field-id column
92+
* metadata is preserved when cmMode = ID, all column metadata is otherwise removed.
9293
*
93-
* @param logicalSchema Logical schema of the scan
94-
* @param physicalSchema Physical schema of the scan
94+
* <p>We require {@code fullSchema} in addition to the pruned schema we want to convert since we
95+
* need the complete field metadata as it is stored in the schema in the _delta_log. We cannot be
96+
* sure (and do not enforce) that this metadata is preserved by the connector.
97+
*
98+
* @param prunedSchema the logical read schema requested by the connector
99+
* @param fullSchema the full delta schema (with complete metadata) as read from the _delta_log
95100
* @param columnMappingMode Column mapping mode
96101
*/
97102
public static StructType convertToPhysicalSchema(
98-
StructType logicalSchema, StructType physicalSchema, ColumnMappingMode columnMappingMode) {
103+
StructType prunedSchema, StructType fullSchema, ColumnMappingMode columnMappingMode) {
99104
switch (columnMappingMode) {
100105
case NONE:
101-
return logicalSchema;
106+
return prunedSchema;
102107
case ID: // fall through
103108
case NAME:
104109
boolean includeFieldIds = columnMappingMode == ColumnMappingMode.ID;
105-
return convertToPhysicalSchema(logicalSchema, physicalSchema, includeFieldIds);
110+
return convertToPhysicalSchema(prunedSchema, fullSchema, includeFieldIds);
106111
default:
107112
throw new UnsupportedOperationException(
108113
"Unsupported column mapping mode: " + columnMappingMode);
@@ -239,20 +244,20 @@ private static int findMaxColumnId(StructField field, int maxColumnId) {
239244
/**
240245
* Utility method to convert the given logical schema to physical schema, recursively converting
241246
* sub-types in case of complex types. When {@code includeFieldId} is true, converted physical
242-
* schema will have field ids in the metadata.
247+
* schema will have field ids in the metadata. Column metadata is otherwise removed.
243248
*/
244249
private static StructType convertToPhysicalSchema(
245-
StructType logicalSchema, StructType physicalSchema, boolean includeFieldId) {
250+
StructType prunedSchema, StructType fullSchema, boolean includeFieldId) {
246251
StructType newSchema = new StructType();
247-
for (StructField logicalField : logicalSchema.fields()) {
248-
DataType logicalType = logicalField.getDataType();
249-
StructField physicalField = physicalSchema.get(logicalField.getName());
252+
for (StructField prunedField : prunedSchema.fields()) {
253+
StructField completeField = fullSchema.get(prunedField.getName());
250254
DataType physicalType =
251-
convertToPhysicalType(logicalType, physicalField.getDataType(), includeFieldId);
252-
String physicalName = physicalField.getMetadata().getString(COLUMN_MAPPING_PHYSICAL_NAME_KEY);
255+
convertToPhysicalType(
256+
prunedField.getDataType(), completeField.getDataType(), includeFieldId);
257+
String physicalName = completeField.getMetadata().getString(COLUMN_MAPPING_PHYSICAL_NAME_KEY);
253258

254259
if (includeFieldId) {
255-
Long fieldId = physicalField.getMetadata().getLong(COLUMN_MAPPING_ID_KEY);
260+
Long fieldId = completeField.getMetadata().getLong(COLUMN_MAPPING_ID_KEY);
256261
FieldMetadata.Builder builder =
257262
FieldMetadata.builder().putLong(PARQUET_FIELD_ID_KEY, fieldId);
258263

@@ -261,15 +266,15 @@ private static StructType convertToPhysicalSchema(
261266
// the 'element' and 'key'/'value' fields of Arrays/Maps haven been written,
262267
// then IcebergCompatV2 is enabled because the schema we are looking at is from
263268
// the DeltaLog and has nested field IDs setup
264-
if (hasNestedColumnIds(physicalField)) {
269+
if (hasNestedColumnIds(completeField)) {
265270
builder.putFieldMetadata(
266-
PARQUET_FIELD_NESTED_IDS_METADATA_KEY, getNestedColumnIds(physicalField));
271+
PARQUET_FIELD_NESTED_IDS_METADATA_KEY, getNestedColumnIds(completeField));
267272
}
268273

269274
newSchema =
270-
newSchema.add(physicalName, physicalType, logicalField.isNullable(), builder.build());
275+
newSchema.add(physicalName, physicalType, prunedField.isNullable(), builder.build());
271276
} else {
272-
newSchema = newSchema.add(physicalName, physicalType, logicalField.isNullable());
277+
newSchema = newSchema.add(physicalName, physicalType, prunedField.isNullable());
273278
}
274279
}
275280
return newSchema;

kernel/kernel-api/src/test/scala/io/delta/kernel/TransactionSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ class TransactionSuite extends AnyFunSuite with VectorTestUtils with MockEngineU
156156
VectorUtils.buildArrayValue(Seq.empty.asJava, StringType.STRING),
157157
Optional.empty(),
158158
stringStringMapValue(configMap.asJava))
159-
val txnState = TransactionStateRow.of(metadata, "table path", schema)
159+
val txnState = TransactionStateRow.of(metadata, "table path")
160160

161161
// Get statistics columns and define expected result
162162
val statsColumns = TransactionImpl.getStatisticsColumns(txnState)
@@ -262,7 +262,7 @@ object TransactionSuite extends VectorTestUtils with MockEngineUtils {
262262
Optional.empty(), // createdTime
263263
stringStringMapValue(configurationMap.asJava) // configurationMap
264264
)
265-
TransactionStateRow.of(metadata, "table path", schema)
265+
TransactionStateRow.of(metadata, "table path")
266266
}
267267

268268
def testStats(numRowsOpt: Option[Long]): Option[DataFileStatistics] = {

0 commit comments

Comments
 (0)