-
Notifications
You must be signed in to change notification settings - Fork 70
Add multi schema update support for tables during replica table commits #407
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add multi schema update support for tables during replica table commits #407
Conversation
d8d7dad to
d366bce
Compare
|
this is a similar requirement to snapshots in snapshots API, we pass List<snapshots> and List<snapshotrefs> via tabledto and deserialize them on server. server also has access to the existing baseMetadata object. so, once on server we:
i think it would be useful for code reuse and logically simpler to implement the same API, but for schemas. so, pass List<schemas> which would be all schemas, and do a truncate (i don't think schemas can be truncated) and a append (only add new schemas) which would preserve ordering. |
|
does schema require a pointer per branch or is it global? is last schema in the list always the current schema? |
|
the table metadata object that we must serialize has these attributes private final String metadataFileLocation;
private final int formatVersion;
private final String uuid;
private final String location;
private final long lastSequenceNumber;
private final long lastUpdatedMillis;
private final int lastColumnId;
private final int currentSchemaId;
private final List<Schema> schemas;
private final int defaultSpecId;
private final List<PartitionSpec> specs;
private final int lastAssignedPartitionId;
private final int defaultSortOrderId;
private final List<SortOrder> sortOrders;
private final Map<String, String> properties;
private final long currentSnapshotId;
private final Map<Integer, Schema> schemasById;
private final Map<Integer, PartitionSpec> specsById;
private final Map<Integer, SortOrder> sortOrdersById;
private final List<HistoryEntry> snapshotLog;
private final List<MetadataLogEntry> previousFiles;
private final List<StatisticsFile> statisticsFiles;
private final List<PartitionStatisticsFile> partitionStatisticsFiles;
private final List<MetadataUpdate> changes;
private SerializableSupplier<List<Snapshot>> snapshotsSupplier;
private volatile List<Snapshot> snapshots;
private volatile Map<Long, Snapshot> snapshotsById;
private volatile Map<String, SnapshotRef> refs;
private volatile boolean snapshotsLoaded;we know that list has to be serialized but what about these? private final int currentSchemaId;
private final Map<Integer, Schema> schemasById;one idea for why curretnschemaid is required is for if we ever support schema rollback, the api will be ready. schemasById <- i'm not sure what this is but maybe it would be helpful |
|
@cbb330 good questions so I'll go over it one at a time. I checked through the docs here based on your questions: https://iceberg.apache.org/docs/nightly/branching/#usage
I didn't want to always send in all schemas in the request similarly to snapshots because like you said, schemas don't go through truncation. Also schemas never get expired on Iceberg, even when the snapshots referencing the schema is expired, so copying over all the schemas from the metadata in each request can blow up the payload size. Hence why the code here only looks at deltas. There is a concern though that there are already some tables that currently have a mismatch from source <-> dest schemas already, in which case the proposed approach to ensure equality is better. It becomes a tradeoff of handling very large schemas that have been evolved a lot vs fixing existing tables that are already corrupted. I think it is safer to optimize for only sending in the schema deltas though, especially since schemas can't be expired unlike snapshots. Let me know your thoughts. |
...main/java/com/linkedin/openhouse/tables/repository/impl/OpenHouseInternalRepositoryImpl.java
Outdated
Show resolved
Hide resolved
...ain/java/com/linkedin/openhouse/tables/api/spec/v0/request/CreateUpdateTableRequestBody.java
Outdated
Show resolved
Hide resolved
...e-java-runtime/src/main/java/com/linkedin/openhouse/javaclient/OpenHouseTableOperations.java
Outdated
Show resolved
Hide resolved
...e-java-runtime/src/main/java/com/linkedin/openhouse/javaclient/OpenHouseTableOperations.java
Show resolved
Hide resolved
...e-java-runtime/src/main/java/com/linkedin/openhouse/javaclient/OpenHouseTableOperations.java
Show resolved
Hide resolved
.../src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperations.java
Outdated
Show resolved
Hide resolved
cbb330
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
strategy wise, intermediate schemas approach seems like an ok tradeoff vs sending all schemas, given that schemas list is append only and content per schema are huge. i have some concerns on rollout.
- what will be the impact to destination tables which are already in a "bad" state?
- are there any source tables which are in a bad state? because in theory its possible if client evolved the schema twice before committing.
the answer to these two questions may dictate us to ideate on the rollout strategy because we will be "correcting" the old behavior which will change the contract with existing tables.
starting the conversation on the 1) assume destination table has schemas [0,1] but source table has [0,1,2,3,4]. this works now because we don't calculate "newschemas", and destinatino table snapshots which refer to 4 are somehow fine in spark ( but trino doesn't work) the only way to fix this is a true merge because newsnapshots can't be calculated.
.../src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperations.java
Outdated
Show resolved
Hide resolved
.../src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperations.java
Show resolved
Hide resolved
.../internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/CatalogConstants.java
Outdated
Show resolved
Hide resolved
apps/spark/src/test/java/com/linkedin/openhouse/catalog/e2e/SparkSchemaEvolutionTest.java
Outdated
Show resolved
Hide resolved
apps/spark/src/test/java/com/linkedin/openhouse/catalog/e2e/SparkSchemaEvolutionTest.java
Outdated
Show resolved
Hide resolved
|
Strategy wise I will perform analysis to determine the number of corrupted tables given this limitation by checking all replica tables against their latest snapshot. Then I'll need to fix these tables manually, once they are fixed then going forward new evolutions will be safe. |
.../src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperations.java
Show resolved
Hide resolved
...ain/java/com/linkedin/openhouse/tables/api/spec/v0/request/CreateUpdateTableRequestBody.java
Show resolved
Hide resolved
|
This bug will do this for replicated tables: Desired: [1a,2b,3c,4d] where number is the ID and letter is the schema json and after the fix will do Desired: [1a,2b,3c,4d] i'm thinking about the effect it has on manifests which point at their schemaId if the history looks not perfect but mostly accurate i think since minimal replica tables have broken in the before state, it should be ok, wdyt? |
|
After the fix the actual should be If fixed by hand (what I'll need to do pre-rollout) the actual will look like: Which will mean going forward from the timestamp where the metadata is fixed it will be accurate. |
.../src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperations.java
Show resolved
Hide resolved
.../src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperations.java
Outdated
Show resolved
Hide resolved
cbb330
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for sorting through the complexity here and coming up with a good solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Will-Lo for the work on this and handling the schema update scenario properly!
Summary
Issue Briefly discuss the summary of the changes made in this
pull request in 2-3 lines.
When committing table metadatas when multiple schema evolutions have been performed within a single commit, there can be issues ensuring that the schema IDs remain consistent. This is more commonly seen in replication tables as shown below:
This causes issues because replication uses table snapshots generated directly on the source table, where each snapshot contains a reference to its corresponding schema ID. This causes the replica table to be unreadable for that specific snapshot. This affects certain compute engines such as Trino, as well as time travel queries.
To resolve this issue, we want to support multiple schema updates within a single commit on the server side. This resolves an extra field to send the delta of schemas, identified by schema ID, when performing a replicated table commit (this can be generalized to not only affect replica tables). Each schema needs to be serialized in order to ensure that the column IDs are consistent.
Changes
For all the boxes checked, please include additional details of the changes made in this pull request.
Testing Done
For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request.
Additional Information
For all the boxes checked, include additional details of the changes made in this pull request.