Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
public final class CatalogConstants {
public static final String SNAPSHOTS_JSON_KEY = "snapshotsJsonToBePut";
public static final String SNAPSHOTS_REFS_KEY = "snapshotsRefs";
public static final String INTERMEDIATE_SCHEMAS_KEY = "newIntermediateSchemas";
public static final String SORT_ORDER_KEY = "sortOrder";
public static final String IS_STAGE_CREATE_KEY = "isStageCreate";
public static final String OPENHOUSE_TABLE_VERSION = "openhouse.tableVersion";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.linkedin.openhouse.cluster.metrics.micrometer.MetricsReporter;
import com.linkedin.openhouse.cluster.storage.Storage;
import com.linkedin.openhouse.cluster.storage.StorageClient;
Expand All @@ -23,6 +25,7 @@
import java.io.IOException;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -188,40 +191,38 @@ public void commit(TableMetadata base, TableMetadata metadata) {
}
}

/** An internal helper method to rebuild the {@link TableMetadata} object. */
private TableMetadata rebuildTblMetaWithSchema(
TableMetadata newMetadata, String schemaKey, boolean reuseMetadata) {
Schema writerSchema = SchemaParser.fromJson(newMetadata.properties().get(schemaKey));
/**
* An internal helper method to rebuild the {@link TableMetadata} object with a parsed schema.
*
* @param newMetadata The current table metadata
* @param schemaJson The parsed schema object
* @param reuseMetadata Whether to reuse existing metadata or build from empty
* @return Table metadata builder with the new schema set as current
*/
private TableMetadata.Builder rebuildTblMetaWithSchemaBuilder(
TableMetadata newMetadata, String schemaJson, boolean reuseMetadata) {
Schema writerSchema = SchemaParser.fromJson(schemaJson);

if (reuseMetadata) {
return TableMetadata.buildFrom(newMetadata)
.setCurrentSchema(writerSchema, writerSchema.highestFieldId())
.build();
.setCurrentSchema(writerSchema, writerSchema.highestFieldId());
} else {
return TableMetadata.buildFromEmpty()
.setLocation(newMetadata.location())
.setCurrentSchema(writerSchema, newMetadata.lastColumnId())
.addPartitionSpec(
rebuildPartitionSpec(newMetadata.spec(), newMetadata.schema(), writerSchema))
.addSortOrder(rebuildSortOrder(newMetadata.sortOrder(), writerSchema))
.setProperties(newMetadata.properties())
.build();
.setProperties(newMetadata.properties());
}
}

@SuppressWarnings("checkstyle:MissingSwitchDefault")
@Override
protected void doCommit(TableMetadata base, TableMetadata metadata) {

/**
* During table creation, the table metadata object that arrives here has the field-ids
* reassigned from the client supplied schema.This code block creates a new table metadata
* object using the client supplied schema by preserving its field-ids.
*/
if (base == null && metadata.properties().get(CatalogConstants.CLIENT_TABLE_SCHEMA) != null) {
metadata = rebuildTblMetaWithSchema(metadata, CatalogConstants.CLIENT_TABLE_SCHEMA, false);
} else if (metadata.properties().get(CatalogConstants.EVOLVED_SCHEMA_KEY) != null) {
metadata = rebuildTblMetaWithSchema(metadata, CatalogConstants.EVOLVED_SCHEMA_KEY, true);
}
// Handle all schema-related processing (client schema, evolved schema, intermediate schemas)
metadata = processSchemas(base, metadata);

int version = currentVersion() + 1;
CommitStatus commitStatus = CommitStatus.FAILURE;
Expand Down Expand Up @@ -254,6 +255,11 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
if (properties.containsKey(CatalogConstants.EVOLVED_SCHEMA_KEY)) {
properties.remove(CatalogConstants.EVOLVED_SCHEMA_KEY);
}

if (properties.containsKey(CatalogConstants.INTERMEDIATE_SCHEMAS_KEY)) {
properties.remove(CatalogConstants.INTERMEDIATE_SCHEMAS_KEY);
}

String serializedSnapshotsToPut = properties.remove(CatalogConstants.SNAPSHOTS_JSON_KEY);
String serializedSnapshotRefs = properties.remove(CatalogConstants.SNAPSHOTS_REFS_KEY);
boolean isStageCreate =
Expand Down Expand Up @@ -549,6 +555,49 @@ private void failIfRetryUpdate(Map<String, String> properties) {
}
}

/**
* Process all schema-related operations including client schema (for new tables), evolved schema
* (for updates), and intermediate schemas (for replication scenarios). This consolidates all
* schema handling logic into a single method.
*
* @param base The base table metadata (null for new tables)
* @param metadata The current table metadata
* @return Updated table metadata with all schema changes applied
*/
private TableMetadata processSchemas(TableMetadata base, TableMetadata metadata) {
boolean isNewTable = (base == null);
String finalSchemaUpdate =
isNewTable && metadata.properties().get(CatalogConstants.CLIENT_TABLE_SCHEMA) != null
? metadata.properties().get(CatalogConstants.CLIENT_TABLE_SCHEMA)
: metadata.properties().get(CatalogConstants.EVOLVED_SCHEMA_KEY);
// If there is no schema update, return the original metadata
if (finalSchemaUpdate == null) {
return metadata;
}
List<String> newSchemas = getIntermediateSchemasFromProps(metadata);
newSchemas.add(finalSchemaUpdate);
TableMetadata.Builder updatedMetadataBuilder;

// Process intermediate schemas first if present
updatedMetadataBuilder =
rebuildTblMetaWithSchemaBuilder(metadata, newSchemas.get(0), !isNewTable);

newSchemas.stream()
.skip(1) // Skip the initialization schema
.forEach(
schemaJson -> {
try {
Schema schema = SchemaParser.fromJson(schemaJson);
updatedMetadataBuilder.setCurrentSchema(schema, schema.highestFieldId());
} catch (Exception e) {
log.error(
"Failed to process schema: {} for table {}", schemaJson, tableIdentifier, e);
}
});

return updatedMetadataBuilder.build();
}

/** Helper function to dump contents for map in debugging mode. */
private void logPropertiesMap(Map<String, String> map) {
log.debug(" === Printing the table properties within doCommit method === ");
Expand Down Expand Up @@ -607,4 +656,15 @@ private boolean isReplicatedTableCreate(Map<String, String> properties) {
CatalogConstants.OPENHOUSE_TABLE_VERSION, CatalogConstants.INITIAL_VERSION)
.equals(CatalogConstants.INITIAL_VERSION);
}

private List<String> getIntermediateSchemasFromProps(TableMetadata metadata) {
String serializedNewIntermediateSchemas =
metadata.properties().get(CatalogConstants.INTERMEDIATE_SCHEMAS_KEY);
if (serializedNewIntermediateSchemas == null) {
return new ArrayList<>();
}
return new GsonBuilder()
.create()
.fromJson(serializedNewIntermediateSchemas, new TypeToken<List<String>>() {}.getType());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import com.linkedin.openhouse.tables.client.model.GetTableResponseBody;
import com.linkedin.openhouse.tables.client.model.IcebergSnapshotsRequestBody;
import com.linkedin.openhouse.tables.client.model.Policies;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -186,6 +188,17 @@ protected CreateUpdateTableRequestBody constructMetadataRequestBody(
&& metadata.properties().containsKey(OPENHOUSE_TABLE_TYPE_KEY)) {
createUpdateTableRequestBody.setTableType(getTableType(base, metadata));
}
// TODO: consider allowing this for any table type, not just replica tables
if (isMultiSchemaUpdateCommit(base, metadata)
&& getTableType(base, metadata)
== CreateUpdateTableRequestBody.TableTypeEnum.REPLICA_TABLE) {
List<String> newIntermediateSchemas = new ArrayList<>();
int startSchemaId = base == null ? 0 : base.currentSchemaId() + 1;
for (int i = startSchemaId; i < metadata.currentSchemaId(); i++) {
newIntermediateSchemas.add(SchemaParser.toJson(metadata.schemasById().get(i), false));
}
createUpdateTableRequestBody.setNewIntermediateSchemas(newIntermediateSchemas);
}
// If base table is a replicated table, retain the property from base table
if (base != null && base.properties().containsKey(OPENHOUSE_IS_TABLE_REPLICATED_KEY)) {
Map<String, String> newTblProperties = createUpdateTableRequestBody.getTableProperties();
Expand Down Expand Up @@ -297,6 +310,11 @@ protected boolean areSnapshotsUpdated(TableMetadata base, TableMetadata newMetad
|| !base.refs().equals(newMetadata.refs());
}

protected boolean isMultiSchemaUpdateCommit(TableMetadata base, TableMetadata newMetadata) {
return (base == null && newMetadata.currentSchemaId() > 0)
|| (base != null && newMetadata.currentSchemaId() > base.currentSchemaId() + 1);
}

/**
* A wrapper for a remote REST call to put snapshot.
*
Expand Down
Loading