Skip to content

Commit 8f60842

Browse files
committed
Support 475 Iceberg Glue REST endpoint support
1 parent a2f7a5b commit 8f60842

File tree

699 files changed

+37028
-33466
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

699 files changed

+37028
-33466
lines changed

core/trino-main/src/main/java/io/trino/operator/ChangeOnlyUpdatedColumnsMergeProcessor.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@
1414
package io.trino.operator;
1515

1616
import io.trino.spi.Page;
17+
import io.trino.spi.PageBlockUtil;
1718
import io.trino.spi.block.Block;
18-
import io.trino.spi.block.ColumnarRow;
1919
import io.trino.spi.block.RunLengthEncodedBlock;
2020

2121
import java.util.ArrayList;
2222
import java.util.List;
2323

2424
import static com.google.common.base.Preconditions.checkArgument;
25-
import static io.trino.spi.block.ColumnarRow.toColumnarRow;
25+
import static io.trino.spi.block.RowBlock.getRowFieldsFromBlock;
2626
import static io.trino.spi.predicate.Utils.nativeValueToBlock;
2727
import static io.trino.spi.type.TinyintType.TINYINT;
2828
import static java.util.Objects.requireNonNull;
@@ -67,26 +67,24 @@ public Page transformPage(Page inputPage)
6767
// TODO: Check with Karol to see if we can get empty pages
6868
checkArgument(positionCount > 0, "positionCount should be > 0, but is %s", positionCount);
6969

70-
ColumnarRow mergeRow = toColumnarRow(inputPage.getBlock(mergeRowChannel));
71-
checkArgument(!mergeRow.mayHaveNull(), "The mergeRow may not have null rows");
72-
73-
// We've verified that the mergeRow block has no null rows, so it's okay to get the field blocks
74-
75-
List<Block> builder = new ArrayList<>(dataColumnChannels.size() + 3);
76-
70+
Block mergeRow = inputPage.getBlock(mergeRowChannel).getLoadedBlock();
71+
List<Block> fields = getRowFieldsFromBlock(mergeRow);
72+
List<Block> builder = new ArrayList<>(dataColumnChannels.size() + 4);
7773
for (int channel : dataColumnChannels) {
78-
builder.add(mergeRow.getField(channel));
74+
builder.add(fields.get(channel));
7975
}
80-
Block operationChannelBlock = mergeRow.getField(mergeRow.getFieldCount() - 2);
76+
Block operationChannelBlock = fields.get(fields.size() - 2);
8177
builder.add(operationChannelBlock);
78+
Block caseNumberChannelBlock = fields.get(fields.size() - 1);
79+
builder.add(caseNumberChannelBlock);
8280
builder.add(inputPage.getBlock(rowIdChannel));
8381
builder.add(RunLengthEncodedBlock.create(INSERT_FROM_UPDATE_BLOCK, positionCount));
8482

8583
Page result = new Page(builder.toArray(Block[]::new));
8684

8785
int defaultCaseCount = 0;
8886
for (int position = 0; position < positionCount; position++) {
89-
if (TINYINT.getByte(operationChannelBlock, position) == DEFAULT_CASE_OPERATION_NUMBER) {
87+
if (mergeRow.isNull(position)) {
9088
defaultCaseCount++;
9189
}
9290
}
@@ -97,14 +95,14 @@ public Page transformPage(Page inputPage)
9795
int usedCases = 0;
9896
int[] positions = new int[positionCount - defaultCaseCount];
9997
for (int position = 0; position < positionCount; position++) {
100-
if (TINYINT.getByte(operationChannelBlock, position) != DEFAULT_CASE_OPERATION_NUMBER) {
98+
if (!mergeRow.isNull(position)) {
10199
positions[usedCases] = position;
102100
usedCases++;
103101
}
104102
}
105103

106104
checkArgument(usedCases + defaultCaseCount == positionCount, "usedCases (%s) + defaultCaseCount (%s) != positionCount (%s)", usedCases, defaultCaseCount, positionCount);
107105

108-
return result.getPositions(positions, 0, usedCases);
106+
return PageBlockUtil.getPositions(result, positions, 0, usedCases);
109107
}
110108
}

core/trino-main/src/main/java/io/trino/operator/DeleteAndInsertMergeProcessor.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,19 @@
1919
import io.trino.spi.PageBuilder;
2020
import io.trino.spi.block.Block;
2121
import io.trino.spi.block.BlockBuilder;
22-
import io.trino.spi.block.ColumnarRow;
2322
import io.trino.spi.type.Type;
2423

2524
import java.util.List;
2625

2726
import static com.google.common.base.Preconditions.checkArgument;
2827
import static com.google.common.base.Verify.verify;
29-
import static io.trino.spi.block.ColumnarRow.toColumnarRow;
28+
import static io.trino.spi.block.RowBlock.getRowFieldsFromBlock;
3029
import static io.trino.spi.connector.ConnectorMergeSink.DELETE_OPERATION_NUMBER;
3130
import static io.trino.spi.connector.ConnectorMergeSink.INSERT_OPERATION_NUMBER;
3231
import static io.trino.spi.connector.ConnectorMergeSink.UPDATE_DELETE_OPERATION_NUMBER;
3332
import static io.trino.spi.connector.ConnectorMergeSink.UPDATE_INSERT_OPERATION_NUMBER;
3433
import static io.trino.spi.connector.ConnectorMergeSink.UPDATE_OPERATION_NUMBER;
34+
import static io.trino.spi.type.IntegerType.INTEGER;
3535
import static io.trino.spi.type.TinyintType.TINYINT;
3636
import static java.util.Objects.requireNonNull;
3737

@@ -100,8 +100,9 @@ public Page transformPage(Page inputPage)
100100
int originalPositionCount = inputPage.getPositionCount();
101101
checkArgument(originalPositionCount > 0, "originalPositionCount should be > 0, but is %s", originalPositionCount);
102102

103-
ColumnarRow mergeRow = toColumnarRow(inputPage.getBlock(mergeRowChannel));
104-
Block operationChannelBlock = mergeRow.getField(mergeRow.getFieldCount() - 2);
103+
Block mergeRow = inputPage.getBlock(mergeRowChannel);
104+
List<Block> fields = getRowFieldsFromBlock(mergeRow);
105+
Block operationChannelBlock = fields.get(fields.size() - 2);
105106

106107
int updatePositions = 0;
107108
int insertPositions = 0;
@@ -123,6 +124,7 @@ public Page transformPage(Page inputPage)
123124
List<Type> pageTypes = ImmutableList.<Type>builder()
124125
.addAll(dataColumnTypes)
125126
.add(TINYINT)
127+
.add(INTEGER)
126128
.add(rowIdType)
127129
.add(TINYINT)
128130
.build();
@@ -137,7 +139,7 @@ public Page transformPage(Page inputPage)
137139
}
138140
// Insert and update because both create an insert row
139141
if (operation == INSERT_OPERATION_NUMBER || operation == UPDATE_OPERATION_NUMBER) {
140-
addInsertRow(pageBuilder, mergeRow, position, operation != INSERT_OPERATION_NUMBER);
142+
addInsertRow(pageBuilder, fields, position, operation != INSERT_OPERATION_NUMBER);
141143
}
142144
}
143145
}
@@ -170,33 +172,39 @@ private void addDeleteRow(PageBuilder pageBuilder, Page originalPage, int positi
170172
// Add the operation column == deleted
171173
TINYINT.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size()), causedByUpdate ? UPDATE_DELETE_OPERATION_NUMBER : DELETE_OPERATION_NUMBER);
172174

175+
// Add the dummy case number, delete and insert won't use it, use -1 to mark it shouldn't be used
176+
INTEGER.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size() + 1), -1);
177+
173178
// Copy row ID column
174-
rowIdType.appendTo(originalPage.getBlock(rowIdChannel), position, pageBuilder.getBlockBuilder(dataColumnChannels.size() + 1));
179+
rowIdType.appendTo(originalPage.getBlock(rowIdChannel), position, pageBuilder.getBlockBuilder(dataColumnChannels.size() + 2));
175180

176181
// Write 0, meaning this row is not an insert derived from an update
177-
TINYINT.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size() + 2), 0);
182+
TINYINT.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size() + 3), 0);
178183

179184
pageBuilder.declarePosition();
180185
}
181186

182-
private void addInsertRow(PageBuilder pageBuilder, ColumnarRow mergeCaseBlock, int position, boolean causedByUpdate)
187+
private void addInsertRow(PageBuilder pageBuilder, List<Block> fields, int position, boolean causedByUpdate)
183188
{
184189
// Copy the values from the merge block
185190
for (int targetChannel : dataColumnChannels) {
186191
Type columnType = dataColumnTypes.get(targetChannel);
187192
BlockBuilder targetBlock = pageBuilder.getBlockBuilder(targetChannel);
188193
// The value comes from that column of the page
189-
columnType.appendTo(mergeCaseBlock.getField(targetChannel), position, targetBlock);
194+
columnType.appendTo(fields.get(targetChannel), position, targetBlock);
190195
}
191196

192197
// Add the operation column == insert
193198
TINYINT.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size()), causedByUpdate ? UPDATE_INSERT_OPERATION_NUMBER : INSERT_OPERATION_NUMBER);
194199

200+
// Add the dummy case number, delete and insert won't use it
201+
INTEGER.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size() + 1), 0);
202+
195203
// Add null row ID column
196-
pageBuilder.getBlockBuilder(dataColumnChannels.size() + 1).appendNull();
204+
pageBuilder.getBlockBuilder(dataColumnChannels.size() + 2).appendNull();
197205

198206
// Write 1 if this row is an insert derived from an update, 0 otherwise
199-
TINYINT.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size() + 2), causedByUpdate ? 1 : 0);
207+
TINYINT.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size() + 3), causedByUpdate ? 1 : 0);
200208

201209
pageBuilder.declarePosition();
202210
}

core/trino-main/src/main/java/io/trino/sql/planner/QueryPlanner.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,8 @@ public PlanNode plan(Delete node)
556556
List<Symbol> columnSymbols = columnSymbolsBuilder.build();
557557
Symbol operationSymbol = symbolAllocator.newSymbol("operation", TINYINT);
558558
assignmentsBuilder.put(operationSymbol, new GenericLiteral("TINYINT", String.valueOf(DELETE_OPERATION_NUMBER)));
559+
Symbol caseNumberSymbol = symbolAllocator.newSymbol("case_number", INTEGER);
560+
assignmentsBuilder.put(caseNumberSymbol, new GenericLiteral("INTEGER", String.valueOf(0)));
559561
Symbol projectedRowIdSymbol = symbolAllocator.newSymbol(rowIdSymbol.getName(), rowIdType);
560562
assignmentsBuilder.put(projectedRowIdSymbol, rowIdSymbol.toSymbolReference());
561563
assignmentsBuilder.put(symbolAllocator.newSymbol("insert_from_update", TINYINT), new GenericLiteral("TINYINT", "0"));
@@ -929,11 +931,13 @@ private MergeWriterNode createMergePipeline(Table table, RelationPlan relationPl
929931
}
930932

931933
Symbol operationSymbol = symbolAllocator.newSymbol("operation", TINYINT);
934+
Symbol caseNumberSymbol = symbolAllocator.newSymbol("case_number", INTEGER);
932935
Symbol insertFromUpdateSymbol = symbolAllocator.newSymbol("insert_from_update", TINYINT);
933936

934937
List<Symbol> projectedSymbols = ImmutableList.<Symbol>builder()
935938
.addAll(columnSymbols)
936939
.add(operationSymbol)
940+
.add(caseNumberSymbol)
937941
.add(rowIdSymbol)
938942
.add(insertFromUpdateSymbol)
939943
.build();

0 commit comments

Comments
 (0)