Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
public final class CatalogConstants {
public static final String SNAPSHOTS_JSON_KEY = "snapshotsJsonToBePut";
public static final String SNAPSHOTS_REFS_KEY = "snapshotsRefs";
public static final String INTERMEDIATE_SCHEMAS_KEY = "newIntermediateSchemas";
public static final String SORT_ORDER_KEY = "sortOrder";
public static final String IS_STAGE_CREATE_KEY = "isStageCreate";
public static final String OPENHOUSE_TABLE_VERSION = "openhouse.tableVersion";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.linkedin.openhouse.cluster.metrics.micrometer.MetricsReporter;
import com.linkedin.openhouse.cluster.storage.Storage;
import com.linkedin.openhouse.cluster.storage.StorageClient;
Expand Down Expand Up @@ -188,40 +190,38 @@ public void commit(TableMetadata base, TableMetadata metadata) {
}
}

/** An internal helper method to rebuild the {@link TableMetadata} object. */
private TableMetadata rebuildTblMetaWithSchema(
TableMetadata newMetadata, String schemaKey, boolean reuseMetadata) {
Schema writerSchema = SchemaParser.fromJson(newMetadata.properties().get(schemaKey));
/**
* An internal helper method to rebuild the {@link TableMetadata} object with a parsed schema.
*
* @param newMetadata The current table metadata
* @param schemaJson The parsed schema object
* @param reuseMetadata Whether to reuse existing metadata or build from empty
* @return Table metadata builder with the new schema set as current
*/
private TableMetadata.Builder rebuildTblMetaWithSchemaBuilder(
TableMetadata newMetadata, String schemaJson, boolean reuseMetadata) {
Schema writerSchema = SchemaParser.fromJson(schemaJson);

if (reuseMetadata) {
return TableMetadata.buildFrom(newMetadata)
.setCurrentSchema(writerSchema, writerSchema.highestFieldId())
.build();
.setCurrentSchema(writerSchema, writerSchema.highestFieldId());
} else {
return TableMetadata.buildFromEmpty()
.setLocation(newMetadata.location())
.setCurrentSchema(writerSchema, newMetadata.lastColumnId())
.addPartitionSpec(
rebuildPartitionSpec(newMetadata.spec(), newMetadata.schema(), writerSchema))
.addSortOrder(rebuildSortOrder(newMetadata.sortOrder(), writerSchema))
.setProperties(newMetadata.properties())
.build();
.setProperties(newMetadata.properties());
}
}

@SuppressWarnings("checkstyle:MissingSwitchDefault")
@Override
protected void doCommit(TableMetadata base, TableMetadata metadata) {

/**
* During table creation, the table metadata object that arrives here has the field-ids
* reassigned from the client supplied schema.This code block creates a new table metadata
* object using the client supplied schema by preserving its field-ids.
*/
if (base == null && metadata.properties().get(CatalogConstants.CLIENT_TABLE_SCHEMA) != null) {
metadata = rebuildTblMetaWithSchema(metadata, CatalogConstants.CLIENT_TABLE_SCHEMA, false);
} else if (metadata.properties().get(CatalogConstants.EVOLVED_SCHEMA_KEY) != null) {
metadata = rebuildTblMetaWithSchema(metadata, CatalogConstants.EVOLVED_SCHEMA_KEY, true);
}
// Handle all schema-related processing (client schema, evolved schema, intermediate schemas)
metadata = processSchemas(base, metadata);

int version = currentVersion() + 1;
CommitStatus commitStatus = CommitStatus.FAILURE;
Expand Down Expand Up @@ -254,6 +254,11 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
if (properties.containsKey(CatalogConstants.EVOLVED_SCHEMA_KEY)) {
properties.remove(CatalogConstants.EVOLVED_SCHEMA_KEY);
}

if (properties.containsKey(CatalogConstants.INTERMEDIATE_SCHEMAS_KEY)) {
properties.remove(CatalogConstants.INTERMEDIATE_SCHEMAS_KEY);
}

String serializedSnapshotsToPut = properties.remove(CatalogConstants.SNAPSHOTS_JSON_KEY);
String serializedSnapshotRefs = properties.remove(CatalogConstants.SNAPSHOTS_REFS_KEY);
boolean isStageCreate =
Expand Down Expand Up @@ -549,6 +554,67 @@ private void failIfRetryUpdate(Map<String, String> properties) {
}
}

/**
* Process all schema-related operations including client schema (for new tables), evolved schema
* (for updates), and intermediate schemas (for replication scenarios). This consolidates all
* schema handling logic into a single method.
*
* @param base The base table metadata (null for new tables)
* @param metadata The current table metadata
* @return Updated table metadata with all schema changes applied
*/
private TableMetadata processSchemas(TableMetadata base, TableMetadata metadata) {
boolean isNewTable = (base == null);
String finalSchemaUpdate =
isNewTable && metadata.properties().get(CatalogConstants.CLIENT_TABLE_SCHEMA) != null
? metadata.properties().get(CatalogConstants.CLIENT_TABLE_SCHEMA)
: metadata.properties().get(CatalogConstants.EVOLVED_SCHEMA_KEY);
String serializedNewIntermediateSchemas =
metadata.properties().get(CatalogConstants.INTERMEDIATE_SCHEMAS_KEY);
// If there is no schema update, return the original metadata
if (finalSchemaUpdate == null) {
return metadata;
}
TableMetadata.Builder updatedMetadataBuilder;

// Process intermediate schemas first if present
if (serializedNewIntermediateSchemas != null) {
List<String> newIntermediateSchemas =
new GsonBuilder()
.create()
.fromJson(
serializedNewIntermediateSchemas, new TypeToken<List<String>>() {}.getType());

// Process schemas in order
int startingSchemaId = metadata.currentSchemaId();
updatedMetadataBuilder =
rebuildTblMetaWithSchemaBuilder(metadata, newIntermediateSchemas.get(0), !isNewTable);
for (int i = 1; i < newIntermediateSchemas.size(); i++) {
String schemaJson = newIntermediateSchemas.get(i);
int nextSchemaId = startingSchemaId + i + 1;
try {
Schema writerSchema = SchemaParser.fromJson(schemaJson);
updatedMetadataBuilder =
updatedMetadataBuilder.setCurrentSchema(writerSchema, writerSchema.highestFieldId());
} catch (Exception e) {
log.error(
"Failed to process intermediate schema with ID {} for table {}",
nextSchemaId,
tableIdentifier,
e);
}
}
// Then apply the final schema (either client schema for new tables or evolved schema for
// updates)
Schema finalSchema = SchemaParser.fromJson(finalSchemaUpdate);
updatedMetadataBuilder =
updatedMetadataBuilder.setCurrentSchema(finalSchema, finalSchema.highestFieldId());
return updatedMetadataBuilder.build();
} else {
return rebuildTblMetaWithSchemaBuilder(metadata, finalSchemaUpdate, !isNewTable).build();
}
}

/** Helper function to dump contents for map in debugging mode. */
private void logPropertiesMap(Map<String, String> map) {
log.debug(" === Printing the table properties within doCommit method === ");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import com.linkedin.openhouse.tables.client.model.GetTableResponseBody;
import com.linkedin.openhouse.tables.client.model.IcebergSnapshotsRequestBody;
import com.linkedin.openhouse.tables.client.model.Policies;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -186,6 +188,17 @@ protected CreateUpdateTableRequestBody constructMetadataRequestBody(
&& metadata.properties().containsKey(OPENHOUSE_TABLE_TYPE_KEY)) {
createUpdateTableRequestBody.setTableType(getTableType(base, metadata));
}
// TODO: consider allowing this for any table type, not just replica tables
if (isMultiSchemaUpdateCommit(base, metadata)
&& getTableType(base, metadata)
== CreateUpdateTableRequestBody.TableTypeEnum.REPLICA_TABLE) {
List<String> newIntermediateSchemas = new ArrayList<>();
int startSchemaId = base == null ? 0 : base.currentSchemaId() + 1;
for (int i = startSchemaId; i < metadata.currentSchemaId(); i++) {
newIntermediateSchemas.add(SchemaParser.toJson(metadata.schemasById().get(i), false));
}
createUpdateTableRequestBody.setNewIntermediateSchemas(newIntermediateSchemas);
}
// If base table is a replicated table, retain the property from base table
if (base != null && base.properties().containsKey(OPENHOUSE_IS_TABLE_REPLICATED_KEY)) {
Map<String, String> newTblProperties = createUpdateTableRequestBody.getTableProperties();
Expand Down Expand Up @@ -297,6 +310,11 @@ protected boolean areSnapshotsUpdated(TableMetadata base, TableMetadata newMetad
|| !base.refs().equals(newMetadata.refs());
}

protected boolean isMultiSchemaUpdateCommit(TableMetadata base, TableMetadata newMetadata) {
return (base == null && newMetadata.currentSchemaId() > 0)
|| (base != null && newMetadata.currentSchemaId() > base.currentSchemaId() + 1);
}

/**
* A wrapper for a remote REST call to put snapshot.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package com.linkedin.openhouse.spark.catalogtest.e2e;

import com.linkedin.openhouse.javaclient.OpenHouseCatalog;
import com.linkedin.openhouse.tablestest.OpenHouseSparkITest;
import java.util.HashMap;
import java.util.Map;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import scala.collection.JavaConverters;

public class SparkMultiSchemaEvolutionTest extends OpenHouseSparkITest {

@Test
void testMultiSchemaEvolution() throws Exception {
SparkSession spark = null;
try {
spark = getSparkSession();
spark.sql(
"CREATE TABLE openhouse.multiSchemaTest.t1 (name string, id int) TBLPROPERTIES ('openhouse.tableType' = 'REPLICA_TABLE');");
spark.sql("INSERT INTO openhouse.multiSchemaTest.t1 VALUES ('Alice', 1)");
spark.sql("INSERT INTO openhouse.multiSchemaTest.t1 VALUES ('Bob', 2), ('Charlie', 3)");
TableIdentifier tableIdentifier = TableIdentifier.of("multiSchemaTest", "t1");
OpenHouseCatalog ohCatalog = (OpenHouseCatalog) getOpenHouseCatalog(spark);
TableOperations ops = ohCatalog.newTableOps(tableIdentifier);
Schema evolvedSchema =
new Schema(
Types.NestedField.optional(1, "name", Types.StringType.get()),
Types.NestedField.optional(2, "id", Types.IntegerType.get()),
Types.NestedField.optional(3, "newCol", Types.IntegerType.get()));
Schema finalEvolvedSchema =
new Schema(
Types.NestedField.optional(1, "name", Types.StringType.get()),
Types.NestedField.optional(2, "id", Types.IntegerType.get()),
Types.NestedField.optional(3, "newCol1", Types.IntegerType.get()),
Types.NestedField.optional(4, "newCol2", Types.IntegerType.get()));

TableMetadata metadata = ops.current();
TableMetadata evolvedMetadata =
TableMetadata.buildFrom(metadata)
.addSchema(evolvedSchema, evolvedSchema.highestFieldId())
.build();
TableMetadata finalEvolvedMetadata =
TableMetadata.buildFrom(evolvedMetadata)
.addSchema(finalEvolvedSchema, finalEvolvedSchema.highestFieldId())
.setCurrentSchema(2)
.build();

Assertions.assertEquals(finalEvolvedMetadata.schemas().size(), 3);
ops.commit(metadata, finalEvolvedMetadata);
TableMetadata result = ops.current();
Assertions.assertEquals(3, result.schemas().size());
Assertions.assertTrue(result.schema().sameSchema(finalEvolvedSchema));
} finally {
if (spark != null) {
spark.sql("DROP TABLE openhouse.multiSchemaTest.t1");
}
}
}

@Test
void testMultiSchemaEvolutionColumnOrderingOnCreate() throws Exception {
SparkSession spark = null;
try {
spark = getSparkSession();
TableIdentifier tableIdentifier = TableIdentifier.of("multiSchemaTest", "t2");
OpenHouseCatalog ohCatalog = (OpenHouseCatalog) getOpenHouseCatalog(spark);
Schema schemaColumnOrdering =
new Schema(
Types.NestedField.optional(2, "name", Types.StringType.get()),
Types.NestedField.optional(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(4, "newCol1", Types.IntegerType.get()),
Types.NestedField.optional(3, "newCol2", Types.IntegerType.get()));
Map<String, String> tableProperties = new HashMap<>();
tableProperties.put("openhouse.tableType", "REPLICA_TABLE");
tableProperties.put("openhouse.isTableReplicated", "true");
tableProperties.put("client.table.schema", SchemaParser.toJson(schemaColumnOrdering));
ohCatalog.createTable(tableIdentifier, schemaColumnOrdering, null, tableProperties);
TableOperations ops = ohCatalog.newTableOps(tableIdentifier);
TableMetadata metadata = ops.current();
Assertions.assertEquals(metadata.schema().findColumnName(2), "name");
Assertions.assertTrue(metadata.schema().sameSchema(schemaColumnOrdering));
Schema schemaColumnOrdering2 =
new Schema(
Types.NestedField.optional(2, "name", Types.StringType.get()),
Types.NestedField.optional(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(4, "newCol1", Types.IntegerType.get()),
Types.NestedField.optional(3, "newCol2", Types.IntegerType.get()),
Types.NestedField.optional(5, "newCol3", Types.IntegerType.get()));

Schema schemaColumnOrdering3 =
new Schema(
Types.NestedField.optional(2, "name", Types.StringType.get()),
Types.NestedField.optional(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(4, "newCol1", Types.IntegerType.get()),
Types.NestedField.optional(3, "newCol2", Types.IntegerType.get()),
Types.NestedField.optional(5, "newCol3", Types.IntegerType.get()),
Types.NestedField.optional(6, "newCol4", Types.IntegerType.get()),
Types.NestedField.optional(7, "newCol5", Types.IntegerType.get()));

TableMetadata evolvedMetadata =
TableMetadata.buildFrom(metadata)
.addSchema(schemaColumnOrdering2, schemaColumnOrdering2.highestFieldId())
.build();
TableMetadata finalEvolvedMetadata =
TableMetadata.buildFrom(evolvedMetadata)
.addSchema(schemaColumnOrdering3, schemaColumnOrdering3.highestFieldId())
.setCurrentSchema(2)
.build();

Assertions.assertEquals(finalEvolvedMetadata.schemas().size(), 3);
ops.commit(metadata, finalEvolvedMetadata);
TableMetadata result = ops.current();
Assertions.assertEquals(3, result.schemas().size());
// Validate ordering of columns persists on creation
Assertions.assertEquals(result.schema().findColumnName(2), "name");
Assertions.assertTrue(result.schema().sameSchema(schemaColumnOrdering3));
} finally {
if (spark != null) {
spark.sql("DROP TABLE openhouse.multiSchemaTest.t2");
}
}
}

private Catalog getOpenHouseCatalog(SparkSession spark) {
final Map<String, String> catalogProperties = new HashMap<>();
final String catalogPropertyPrefix = "spark.sql.catalog.openhouse.";
final Map<String, String> sparkProperties = JavaConverters.mapAsJavaMap(spark.conf().getAll());
for (Map.Entry<String, String> entry : sparkProperties.entrySet()) {
if (entry.getKey().startsWith(catalogPropertyPrefix)) {
catalogProperties.put(
entry.getKey().substring(catalogPropertyPrefix.length()), entry.getValue());
}
}
// this initializes the catalog based on runtime Catalog class passed in catalog-impl conf.
return CatalogUtil.loadCatalog(
sparkProperties.get("spark.sql.catalog.openhouse.catalog-impl"),
"openhouse",
catalogProperties,
spark.sparkContext().hadoopConfiguration());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,18 @@ public class CreateUpdateTableRequestBody {
example =
"{\"type\": \"struct\", "
+ "\"fields\": [{\"id\": 1,\"required\": true,\"name\": \"id\",\"type\": \"string\"}, "
+ "{\"id\": 2,\"required\": true,\"name\": \"name\", \"type\": \"string\"}, "
+ "{\"id\": 3,\"required\": true,\"name\":\"timestampColumn\",\"type\": \"timestamp\"}]}")
@NotEmpty(message = "schema cannot be empty")
private String schema;

@Schema(
nullable = true,
description =
"List of schema JSON strings for intermediate schemas created during this operation. "
+ "This is used to preserve schema history when multiple schema updates occur in a single commit.",
example = "[\"{\\\"type\\\": \\\"struct\\\", \\\"fields\\\": [...]}\", \"{...}\"]")
private List<String> newIntermediateSchemas;

@Schema(
nullable = true,
description = "Time partitioning of the table",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public interface TablesMapper {
@Mapping(source = "requestBody.timePartitioning", target = "timePartitioning"),
@Mapping(source = "requestBody.clustering", target = "clustering"),
@Mapping(source = "requestBody.tableProperties", target = "tableProperties"),
@Mapping(source = "requestBody.newIntermediateSchemas", target = "newIntermediateSchemas"),
@Mapping(source = "requestBody.policies", target = "policies", qualifiedByName = "mapPolicies"),
@Mapping(source = "requestBody.stageCreate", target = "stageCreate"),
@Mapping(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public class TableDto {

@ElementCollection private Map<String, String> tableProperties;

@ElementCollection private List<String> newIntermediateSchemas;

private boolean stageCreate;

/**
Expand Down
Loading