Skip to content

Commit 091a0bd

Browse files
committed
Reuse IcebergPageSourceProvider across splits for a scan in Lakehouse
This is necessary to reuse equality deletes in an iceberg scan across splits in a worker
1 parent f1a0f2a commit 091a0bd

File tree

1 file changed

+37
-2
lines changed

1 file changed

+37
-2
lines changed

plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehousePageSourceProviderFactory.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,17 @@
2323
import io.trino.plugin.iceberg.IcebergPageSourceProviderFactory;
2424
import io.trino.plugin.iceberg.IcebergTableHandle;
2525
import io.trino.plugin.iceberg.system.files.FilesTableSplit;
26+
import io.trino.spi.connector.ColumnHandle;
27+
import io.trino.spi.connector.ConnectorPageSource;
2628
import io.trino.spi.connector.ConnectorPageSourceProvider;
2729
import io.trino.spi.connector.ConnectorPageSourceProviderFactory;
30+
import io.trino.spi.connector.ConnectorSession;
2831
import io.trino.spi.connector.ConnectorSplit;
2932
import io.trino.spi.connector.ConnectorTableHandle;
33+
import io.trino.spi.connector.ConnectorTransactionHandle;
34+
import io.trino.spi.connector.DynamicFilter;
35+
36+
import java.util.List;
3037

3138
import static java.util.Objects.requireNonNull;
3239

@@ -54,8 +61,36 @@ public LakehousePageSourceProviderFactory(
5461
@Override
5562
public ConnectorPageSourceProvider createPageSourceProvider()
5663
{
57-
return (transaction, session, split, table, columns, dynamicFilter) ->
58-
forHandle(split, table).createPageSource(transaction, session, split, table, columns, dynamicFilter);
64+
// createPageSourceProvider is called for each scan within a query
65+
// we hold on to ConnectorPageSourceProvider instance to allow IcebergPageSourceProvider to reuse equality deletes between splits of the same scan
66+
return new ConnectorPageSourceProvider()
67+
{
68+
private volatile ConnectorPageSourceProvider delegate;
69+
70+
@Override
71+
public ConnectorPageSource createPageSource(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, ConnectorTableHandle table, List<ColumnHandle> columns, DynamicFilter dynamicFilter)
72+
{
73+
if (delegate == null) {
74+
synchronized (this) {
75+
if (delegate == null) {
76+
delegate = forHandle(split, table);
77+
}
78+
}
79+
}
80+
return delegate.createPageSource(transaction, session, split, table, columns, dynamicFilter);
81+
}
82+
83+
@Override
84+
public long getMemoryUsage()
85+
{
86+
ConnectorPageSourceProvider provider = delegate;
87+
if (provider == null) {
88+
// No page source was created, so no memory is used
89+
return 0;
90+
}
91+
return provider.getMemoryUsage();
92+
}
93+
};
5994
}
6095

6196
private ConnectorPageSourceProvider forHandle(ConnectorSplit split, ConnectorTableHandle handle)

0 commit comments

Comments
 (0)