Conversation
6782e91 to
39782c8
Compare
|
Is the second commit "add support for Iceberg v3 deletion vectors" added intentionally? |
Yep. The first two commits are the base from another PR. Row lineage needs deletions to work to fully test that rowid works for update commands. |
chenjian2664
left a comment
There was a problem hiding this comment.
Reviewed: "Add support for reading $row_id and $last_updated_sequence_number"
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV3.java
Show resolved
Hide resolved
| @@ -647,6 +647,7 @@ void testV3InsertProducesRowLineageMetadata() | |||
| assertUpdate("CREATE TABLE " + tableName + " (id INTEGER, v VARCHAR) WITH (format = 'PARQUET', format_version = 3)"); | |||
There was a problem hiding this comment.
ideally, we should also test AVRO and ORC format
There was a problem hiding this comment.
I don't think this is necessary. These columns are synthetic and handled by the engine directly. I don't think file format has any impact on this feature.
There was a problem hiding this comment.
where is the test case exercise the IcebergPageSourceProvider changes (for AVRO and ORC)
There was a problem hiding this comment.
I added that comment after I pushed. I just finally got through applying all of the comments below. This is now covered.
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV3.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV3.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java
Show resolved
Hide resolved
| if (column.isLastUpdatedSequenceNumberColumn()) { | ||
| transforms.transform(new DataSequenceNumberTransform(dataSequenceNumber, ordinal)); | ||
| } | ||
| else if (column.isRowIdColumn() && fileFirstRowId.isPresent()) { | ||
| appendRowNumberColumn = true; | ||
| transforms.transform(new RowIdTransform(fileFirstRowId.get(), ordinal)); |
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV3.java
Show resolved
Hide resolved
chenjian2664
left a comment
There was a problem hiding this comment.
Reviewed: "Allow table procedures to declare which columns to read"
| Optional<TableLayout> layout = metadata.getLayoutForTableExecute(session, executeHandle); | ||
|
|
||
| List<Symbol> symbols = visibleFields(tableScanPlan); | ||
| Set<String> expectedColumnNames = metadata.getColumnNamesForTableExecute(session, executeHandle); |
There was a problem hiding this comment.
What about returns Optional<List<ColumnHandle>> ?
There was a problem hiding this comment.
This API is required and null is not an allowed response. The connector metadata has a default implementation so this is backwards compatible.
There was a problem hiding this comment.
Actually, @electrum pointed out that we could be using ColumHandles here instead of names. I was reacting to the optional part not the column handle part.
| String procedureName, | ||
| Map<String, Object> executeProperties); | ||
|
|
||
| Set<String> getColumnNamesForTableExecute(Session session, TableExecuteHandle tableExecuteHandle); |
There was a problem hiding this comment.
Involve @Praveen2112 @ebyhr
Please help to review this commit
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergParquetConnectorTest.java
Show resolved
Hide resolved
|
This pull request has gone a while without any activity. Ask for help on #core-dev on Trino slack. |
f6e342b to
72789e0
Compare
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
| else if (!fileColumnsByIcebergId.containsKey(column.getBaseColumnIdentity().getId())) { | ||
| Object initialDefault = getInitialDefault(tableSchema, column.getBaseColumnIdentity().getId()); | ||
| transforms.constantValue(nativeValueToBlock(column.getType(), initialDefault)); | ||
| if (column.isLastUpdatedSequenceNumberColumn()) { |
There was a problem hiding this comment.
These should probably be else if on the outer level, after the other if (column.isXxx)
There was a problem hiding this comment.
Actually, this is correct. This handles the case where the row id or LUSN is not present in file and we must synthesize them, otherwise we must read them from the file.
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV3.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMergeSink.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV3.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV3.java
Outdated
Show resolved
Hide resolved
| @@ -647,6 +647,7 @@ void testV3InsertProducesRowLineageMetadata() | |||
| assertUpdate("CREATE TABLE " + tableName + " (id INTEGER, v VARCHAR) WITH (format = 'PARQUET', format_version = 3)"); | |||
There was a problem hiding this comment.
where is the test case exercise the IcebergPageSourceProvider changes (for AVRO and ORC)
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMergeSink.java
Outdated
Show resolved
Hide resolved
| } | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
add a check to verify that the pendingSourceRowId is null - that we don't have unhandled row ids
| private static Block createRowIdBlock(Page inputPage, int dataColumnCount, int[] additionPositions, int additionCount) | ||
| { | ||
| // For V3, we need to extract source_row_id from the merge row ID for UPDATE_INSERT rows. | ||
| // UPDATE_DELETE is immediately followed by UPDATE_INSERT, so we track pending source row IDs. |
There was a problem hiding this comment.
UPDATE_DELETE is immediately followed by UPDATE_INSERT, where is the logic that guarantee it?
There was a problem hiding this comment.
I rewrote this with more explict handling verification. Generally, none of these update systems actually verify this stuff, but I am happy to add it.
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java
Show resolved
Hide resolved
ebyhr
left a comment
There was a problem hiding this comment.
I don't think this PR is ready for merge yet. Please request another review round before you merge this PR. The repeated UPDATE scenario is broken:
CREATE TABLE test (name varchar) WITH (format_version = 3);
INSERT INTO test VALUES 'alice', 'bob';
INSERT INTO test VALUES 'carol', 'david';
SELECT name, "$row_id", "$last_updated_sequence_number" FROM test;
name | $row_id | $last_updated_sequence_number
-------+---------+-------------------------------
alice | 0 | 2
carol | 2 | 3
bob | 1 | 2
david | 3 | 3
UPDATE test SET name = 'BOB' WHERE name = 'bob';
SELECT name, "$row_id", "$last_updated_sequence_number" FROM test;
name | $row_id | $last_updated_sequence_number
-------+---------+-------------------------------
carol | 2 | 3
david | 3 | 3
BOB | 1 | 4
alice | 0 | 2
UPDATE test SET name = 'BOB1' WHERE name = 'BOB';
SELECT name, "$row_id", "$last_updated_sequence_number" FROM test;
name | $row_id | $last_updated_sequence_number
-------+---------+-------------------------------
carol | 2 | 3
BOB1 | 4 | 5
alice | 0 | 2
david | 3 | 3The bottom BOB1 row should return 1 on $row_id column.
| protected TimeUnit storageTimePrecision; | ||
|
|
||
| protected BaseIcebergConnectorTest(IcebergFileFormat format) | ||
| protected BaseIcebergConnectorTest(IcebergFileFormat format, int formatVersion) |
There was a problem hiding this comment.
Please test SHOW STATS with the new metadata columns. I believe it returns incorrect results.
There was a problem hiding this comment.
Also, add MV test case:
@Test
void testRowLineageWithMaterializedViews()
{
try (TestTable table = newTrinoTable("test_materialized_views", "(id int, name varchar) WITH (format_version = 3)")) {
assertUpdate("INSERT INTO " + table.getName() + " VALUES (1, 'Alice'), (2, 'Bob')", 2);
String materializedViewName = "test_materialized_view_" + randomNameSuffix();
assertUpdate("CREATE MATERIALIZED VIEW " + materializedViewName + " AS SELECT id, name, \"$row_id\", \"$last_updated_sequence_number\" FROM " + table.getName());
assertUpdate("REFRESH MATERIALIZED VIEW " + materializedViewName, 2);
assertThat(query("SELECT id, name, \"$row_id\", \"$last_updated_sequence_number\" FROM " + materializedViewName))
.matches("""
VALUES (1, VARCHAR 'Alice', BIGINT '0', BIGINT '2'),
(2, 'Bob', BIGINT '1', BIGINT '2')
""");
assertUpdate("UPDATE " + table.getName() + " SET name = 'Alice Updated' WHERE id = 1", 1);
assertUpdate("REFRESH MATERIALIZED VIEW " + materializedViewName, 2);
assertThat(query("SELECT id, name, \"$row_id\", \"$last_updated_sequence_number\" FROM " + materializedViewName))
.matches("""
VALUES (1, VARCHAR 'Alice Updated', BIGINT '0', BIGINT '3'),
(2, 'Bob', BIGINT '1', BIGINT '2')
""");
assertUpdate("DROP MATERIALIZED VIEW " + materializedViewName);
}
}Column name and variable were not updated to be merge specific when the merge PR was being reviewed.
|
@ebyhr I believe this is ready to go now |
|
@dain Thanks for addressing the comments. I'll review this PR again tomorrow or shortly later. |
|
@ebyhr any updates? |
|
@dain Sorry, I had to work on a different issue yesterday. It looks like there’s still a bug with the Avro format. CREATE TABLE test (name varchar) WITH (format_version = 3, format = 'AVRO');
INSERT INTO test VALUES 'alice', 'bob';
INSERT INTO test VALUES 'carol', 'david';
UPDATE test SET name = 'BOB' WHERE name = 'bob';
SELECT name, "$row_id", "$last_updated_sequence_number" FROM test;BOB should return 1 as $row_id. |
|
The partition table has a bug regardless of the file format: CREATE TABLE test (name varchar, x bigint) WITH (format_version = 3, partitioning = ARRAY['x']);
INSERT INTO test VALUES ('alice', 1), ('bob', 2);
INSERT INTO test VALUES ('carol', 1), ('david', 2);
UPDATE test SET name = 'BOB' WHERE name = 'bob';
SELECT name, "$row_id" FROM test;BOB should return 0 as $row_id. |
Description
This PR adds comprehensive support for Iceberg v3 row lineage, enabling Trino to read and preserve row identity metadata (
$row_idand$last_updated_sequence_number).Changes include:
$row_idand$last_updated_sequence_numberas queryable columns for v3 tables$row_idfor modified rows, maintaining row identity across updatesRelease notes
(X) Release notes are required, with the following suggested text: