Skip to content

Commit 5465274

Browse files
committed
Support 475 Iceberg Glue REST endpoint support
1 parent a2f7a5b commit 5465274

File tree

806 files changed

+41915
-36661
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

806 files changed

+41915
-36661
lines changed

core/trino-main/src/main/java/io/trino/operator/ChangeOnlyUpdatedColumnsMergeProcessor.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@
1414
package io.trino.operator;
1515

1616
import io.trino.spi.Page;
17+
import io.trino.spi.PageBlockUtil;
1718
import io.trino.spi.block.Block;
18-
import io.trino.spi.block.ColumnarRow;
1919
import io.trino.spi.block.RunLengthEncodedBlock;
2020

2121
import java.util.ArrayList;
2222
import java.util.List;
2323

2424
import static com.google.common.base.Preconditions.checkArgument;
25-
import static io.trino.spi.block.ColumnarRow.toColumnarRow;
25+
import static io.trino.spi.block.RowBlock.getRowFieldsFromBlock;
2626
import static io.trino.spi.predicate.Utils.nativeValueToBlock;
2727
import static io.trino.spi.type.TinyintType.TINYINT;
2828
import static java.util.Objects.requireNonNull;
@@ -67,26 +67,24 @@ public Page transformPage(Page inputPage)
6767
// TODO: Check with Karol to see if we can get empty pages
6868
checkArgument(positionCount > 0, "positionCount should be > 0, but is %s", positionCount);
6969

70-
ColumnarRow mergeRow = toColumnarRow(inputPage.getBlock(mergeRowChannel));
71-
checkArgument(!mergeRow.mayHaveNull(), "The mergeRow may not have null rows");
72-
73-
// We've verified that the mergeRow block has no null rows, so it's okay to get the field blocks
74-
75-
List<Block> builder = new ArrayList<>(dataColumnChannels.size() + 3);
76-
70+
Block mergeRow = inputPage.getBlock(mergeRowChannel).getLoadedBlock();
71+
List<Block> fields = getRowFieldsFromBlock(mergeRow);
72+
List<Block> builder = new ArrayList<>(dataColumnChannels.size() + 4);
7773
for (int channel : dataColumnChannels) {
78-
builder.add(mergeRow.getField(channel));
74+
builder.add(fields.get(channel));
7975
}
80-
Block operationChannelBlock = mergeRow.getField(mergeRow.getFieldCount() - 2);
76+
Block operationChannelBlock = fields.get(fields.size() - 2);
8177
builder.add(operationChannelBlock);
78+
Block caseNumberChannelBlock = fields.get(fields.size() - 1);
79+
builder.add(caseNumberChannelBlock);
8280
builder.add(inputPage.getBlock(rowIdChannel));
8381
builder.add(RunLengthEncodedBlock.create(INSERT_FROM_UPDATE_BLOCK, positionCount));
8482

8583
Page result = new Page(builder.toArray(Block[]::new));
8684

8785
int defaultCaseCount = 0;
8886
for (int position = 0; position < positionCount; position++) {
89-
if (TINYINT.getByte(operationChannelBlock, position) == DEFAULT_CASE_OPERATION_NUMBER) {
87+
if (mergeRow.isNull(position)) {
9088
defaultCaseCount++;
9189
}
9290
}
@@ -97,14 +95,14 @@ public Page transformPage(Page inputPage)
9795
int usedCases = 0;
9896
int[] positions = new int[positionCount - defaultCaseCount];
9997
for (int position = 0; position < positionCount; position++) {
100-
if (TINYINT.getByte(operationChannelBlock, position) != DEFAULT_CASE_OPERATION_NUMBER) {
98+
if (!mergeRow.isNull(position)) {
10199
positions[usedCases] = position;
102100
usedCases++;
103101
}
104102
}
105103

106104
checkArgument(usedCases + defaultCaseCount == positionCount, "usedCases (%s) + defaultCaseCount (%s) != positionCount (%s)", usedCases, defaultCaseCount, positionCount);
107105

108-
return result.getPositions(positions, 0, usedCases);
106+
return PageBlockUtil.getPositions(result, positions, 0, usedCases);
109107
}
110108
}

core/trino-main/src/main/java/io/trino/operator/DeleteAndInsertMergeProcessor.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,19 @@
1919
import io.trino.spi.PageBuilder;
2020
import io.trino.spi.block.Block;
2121
import io.trino.spi.block.BlockBuilder;
22-
import io.trino.spi.block.ColumnarRow;
2322
import io.trino.spi.type.Type;
2423

2524
import java.util.List;
2625

2726
import static com.google.common.base.Preconditions.checkArgument;
2827
import static com.google.common.base.Verify.verify;
29-
import static io.trino.spi.block.ColumnarRow.toColumnarRow;
28+
import static io.trino.spi.block.RowBlock.getRowFieldsFromBlock;
3029
import static io.trino.spi.connector.ConnectorMergeSink.DELETE_OPERATION_NUMBER;
3130
import static io.trino.spi.connector.ConnectorMergeSink.INSERT_OPERATION_NUMBER;
3231
import static io.trino.spi.connector.ConnectorMergeSink.UPDATE_DELETE_OPERATION_NUMBER;
3332
import static io.trino.spi.connector.ConnectorMergeSink.UPDATE_INSERT_OPERATION_NUMBER;
3433
import static io.trino.spi.connector.ConnectorMergeSink.UPDATE_OPERATION_NUMBER;
34+
import static io.trino.spi.type.IntegerType.INTEGER;
3535
import static io.trino.spi.type.TinyintType.TINYINT;
3636
import static java.util.Objects.requireNonNull;
3737

@@ -100,8 +100,9 @@ public Page transformPage(Page inputPage)
100100
int originalPositionCount = inputPage.getPositionCount();
101101
checkArgument(originalPositionCount > 0, "originalPositionCount should be > 0, but is %s", originalPositionCount);
102102

103-
ColumnarRow mergeRow = toColumnarRow(inputPage.getBlock(mergeRowChannel));
104-
Block operationChannelBlock = mergeRow.getField(mergeRow.getFieldCount() - 2);
103+
Block mergeRow = inputPage.getBlock(mergeRowChannel);
104+
List<Block> fields = getRowFieldsFromBlock(mergeRow);
105+
Block operationChannelBlock = fields.get(fields.size() - 2);
105106

106107
int updatePositions = 0;
107108
int insertPositions = 0;
@@ -123,6 +124,7 @@ public Page transformPage(Page inputPage)
123124
List<Type> pageTypes = ImmutableList.<Type>builder()
124125
.addAll(dataColumnTypes)
125126
.add(TINYINT)
127+
.add(INTEGER)
126128
.add(rowIdType)
127129
.add(TINYINT)
128130
.build();
@@ -137,7 +139,7 @@ public Page transformPage(Page inputPage)
137139
}
138140
// Insert and update because both create an insert row
139141
if (operation == INSERT_OPERATION_NUMBER || operation == UPDATE_OPERATION_NUMBER) {
140-
addInsertRow(pageBuilder, mergeRow, position, operation != INSERT_OPERATION_NUMBER);
142+
addInsertRow(pageBuilder, fields, position, operation != INSERT_OPERATION_NUMBER);
141143
}
142144
}
143145
}
@@ -170,33 +172,39 @@ private void addDeleteRow(PageBuilder pageBuilder, Page originalPage, int positi
170172
// Add the operation column == deleted
171173
TINYINT.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size()), causedByUpdate ? UPDATE_DELETE_OPERATION_NUMBER : DELETE_OPERATION_NUMBER);
172174

175+
// Add the dummy case number, delete and insert won't use it, use -1 to mark it shouldn't be used
176+
INTEGER.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size() + 1), -1);
177+
173178
// Copy row ID column
174-
rowIdType.appendTo(originalPage.getBlock(rowIdChannel), position, pageBuilder.getBlockBuilder(dataColumnChannels.size() + 1));
179+
rowIdType.appendTo(originalPage.getBlock(rowIdChannel), position, pageBuilder.getBlockBuilder(dataColumnChannels.size() + 2));
175180

176181
// Write 0, meaning this row is not an insert derived from an update
177-
TINYINT.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size() + 2), 0);
182+
TINYINT.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size() + 3), 0);
178183

179184
pageBuilder.declarePosition();
180185
}
181186

182-
private void addInsertRow(PageBuilder pageBuilder, ColumnarRow mergeCaseBlock, int position, boolean causedByUpdate)
187+
private void addInsertRow(PageBuilder pageBuilder, List<Block> fields, int position, boolean causedByUpdate)
183188
{
184189
// Copy the values from the merge block
185190
for (int targetChannel : dataColumnChannels) {
186191
Type columnType = dataColumnTypes.get(targetChannel);
187192
BlockBuilder targetBlock = pageBuilder.getBlockBuilder(targetChannel);
188193
// The value comes from that column of the page
189-
columnType.appendTo(mergeCaseBlock.getField(targetChannel), position, targetBlock);
194+
columnType.appendTo(fields.get(targetChannel), position, targetBlock);
190195
}
191196

192197
// Add the operation column == insert
193198
TINYINT.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size()), causedByUpdate ? UPDATE_INSERT_OPERATION_NUMBER : INSERT_OPERATION_NUMBER);
194199

200+
// Add the dummy case number, delete and insert won't use it
201+
INTEGER.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size() + 1), 0);
202+
195203
// Add null row ID column
196-
pageBuilder.getBlockBuilder(dataColumnChannels.size() + 1).appendNull();
204+
pageBuilder.getBlockBuilder(dataColumnChannels.size() + 2).appendNull();
197205

198206
// Write 1 if this row is an insert derived from an update, 0 otherwise
199-
TINYINT.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size() + 2), causedByUpdate ? 1 : 0);
207+
TINYINT.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size() + 3), causedByUpdate ? 1 : 0);
200208

201209
pageBuilder.declarePosition();
202210
}

core/trino-main/src/main/java/io/trino/operator/output/RowPositionsAppender.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,14 @@
2727
import static io.airlift.slice.SizeOf.sizeOf;
2828
import static io.trino.operator.output.PositionsAppenderUtil.calculateBlockResetSize;
2929
import static io.trino.operator.output.PositionsAppenderUtil.calculateNewArraySize;
30-
import static io.trino.spi.block.RowBlock.fromFieldBlocks;
30+
import static io.trino.spi.block.RowBlock.fromNotNullSuppressedFieldBlocks;
3131
import static java.util.Objects.requireNonNull;
3232

3333
public class RowPositionsAppender
3434
implements PositionsAppender
3535
{
3636
private static final int INSTANCE_SIZE = instanceSize(RowPositionsAppender.class);
37+
private final RowType type;
3738
private final PositionsAppender[] fieldAppenders;
3839
private int initialEntryCount;
3940
private boolean initialized;
@@ -55,11 +56,12 @@ public static RowPositionsAppender createRowAppender(
5556
for (int i = 0; i < fields.length; i++) {
5657
fields[i] = positionsAppenderFactory.create(type.getFields().get(i).getType(), expectedPositions, maxPageSizeInBytes);
5758
}
58-
return new RowPositionsAppender(fields, expectedPositions);
59+
return new RowPositionsAppender(type, fields, expectedPositions);
5960
}
6061

61-
private RowPositionsAppender(PositionsAppender[] fieldAppenders, int expectedPositions)
62+
private RowPositionsAppender(RowType type, PositionsAppender[] fieldAppenders, int expectedPositions)
6263
{
64+
this.type = type;
6365
this.fieldAppenders = requireNonNull(fieldAppenders, "fields is null");
6466
this.initialEntryCount = expectedPositions;
6567
resetSize();
@@ -88,14 +90,17 @@ public void append(IntArrayList positions, Block block)
8890

8991
List<Block> fieldBlocks = sourceRowBlock.getChildren();
9092
for (int i = 0; i < fieldAppenders.length; i++) {
91-
fieldAppenders[i].append(nonNullPositions, fieldBlocks.get(i));
93+
fieldAppenders[i].append(positions, fieldBlocks.get(i));
9294
}
9395
}
9496
else if (allPositionsNull(positions, block)) {
9597
// all input positions are null. We can handle that even if block type is not RowBLock.
9698
// append positions.size() nulls
9799
Arrays.fill(rowIsNull, positionCount, positionCount + positions.size(), true);
98100
hasNullRow = true;
101+
for (int i = 0; i < fieldAppenders.length; i++) {
102+
fieldAppenders[i].append(positions, block);
103+
}
99104
}
100105
else {
101106
throw new IllegalArgumentException("unsupported block type: " + block);
@@ -113,6 +118,9 @@ public void appendRle(Block value, int rlePositionCount)
113118
if (sourceRowBlock.isNull(0)) {
114119
// append rlePositionCount nulls
115120
Arrays.fill(rowIsNull, positionCount, positionCount + rlePositionCount, true);
121+
for (int i = 0; i < fieldAppenders.length; i++) {
122+
fieldAppenders[i].appendRle(value.getSingleValueBlock(0), rlePositionCount);
123+
}
116124
hasNullRow = true;
117125
}
118126
else {
@@ -128,6 +136,9 @@ public void appendRle(Block value, int rlePositionCount)
128136
else if (value.isNull(0)) {
129137
// append rlePositionCount nulls
130138
Arrays.fill(rowIsNull, positionCount, positionCount + rlePositionCount, true);
139+
for (int i = 0; i < fieldAppenders.length; i++) {
140+
fieldAppenders[i].appendRle(value.getSingleValueBlock(0), rlePositionCount);
141+
}
131142
hasNullRow = true;
132143
}
133144
else {
@@ -145,6 +156,9 @@ public void append(int position, Block value)
145156
if (sourceRowBlock.isNull(position)) {
146157
rowIsNull[positionCount] = true;
147158
hasNullRow = true;
159+
for (int i = 0; i < fieldAppenders.length; i++) {
160+
fieldAppenders[i].append(position, value);
161+
}
148162
}
149163
else {
150164
// append not null row value
@@ -159,6 +173,9 @@ public void append(int position, Block value)
159173
else if (value.isNull(position)) {
160174
rowIsNull[positionCount] = true;
161175
hasNullRow = true;
176+
for (int i = 0; i < fieldAppenders.length; i++) {
177+
fieldAppenders[i].append(position, value);
178+
}
162179
}
163180
else {
164181
throw new IllegalArgumentException("unsupported block type: " + value);
@@ -176,10 +193,10 @@ public Block build()
176193
}
177194
Block result;
178195
if (hasNonNullRow) {
179-
result = fromFieldBlocks(positionCount, hasNullRow ? Optional.of(rowIsNull) : Optional.empty(), fieldBlocks);
196+
result = fromNotNullSuppressedFieldBlocks(positionCount, hasNullRow ? Optional.of(rowIsNull) : Optional.empty(), fieldBlocks);
180197
}
181198
else {
182-
Block nullRowBlock = fromFieldBlocks(1, Optional.of(new boolean[] {true}), fieldBlocks);
199+
Block nullRowBlock = type.createNullBlock();
183200
result = RunLengthEncodedBlock.create(nullRowBlock, positionCount);
184201
}
185202

core/trino-main/src/main/java/io/trino/sql/planner/QueryPlanner.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,8 @@ public PlanNode plan(Delete node)
556556
List<Symbol> columnSymbols = columnSymbolsBuilder.build();
557557
Symbol operationSymbol = symbolAllocator.newSymbol("operation", TINYINT);
558558
assignmentsBuilder.put(operationSymbol, new GenericLiteral("TINYINT", String.valueOf(DELETE_OPERATION_NUMBER)));
559+
Symbol caseNumberSymbol = symbolAllocator.newSymbol("case_number", INTEGER);
560+
assignmentsBuilder.put(caseNumberSymbol, new GenericLiteral("INTEGER", String.valueOf(0)));
559561
Symbol projectedRowIdSymbol = symbolAllocator.newSymbol(rowIdSymbol.getName(), rowIdType);
560562
assignmentsBuilder.put(projectedRowIdSymbol, rowIdSymbol.toSymbolReference());
561563
assignmentsBuilder.put(symbolAllocator.newSymbol("insert_from_update", TINYINT), new GenericLiteral("TINYINT", "0"));
@@ -929,11 +931,13 @@ private MergeWriterNode createMergePipeline(Table table, RelationPlan relationPl
929931
}
930932

931933
Symbol operationSymbol = symbolAllocator.newSymbol("operation", TINYINT);
934+
Symbol caseNumberSymbol = symbolAllocator.newSymbol("case_number", INTEGER);
932935
Symbol insertFromUpdateSymbol = symbolAllocator.newSymbol("insert_from_update", TINYINT);
933936

934937
List<Symbol> projectedSymbols = ImmutableList.<Symbol>builder()
935938
.addAll(columnSymbols)
936939
.add(operationSymbol)
940+
.add(caseNumberSymbol)
937941
.add(rowIdSymbol)
938942
.add(insertFromUpdateSymbol)
939943
.build();

0 commit comments

Comments
 (0)