diff --git a/pom.xml b/pom.xml index 047fc2841f919..6ae7ff08f088b 100644 --- a/pom.xml +++ b/pom.xml @@ -237,12 +237,12 @@ 3.9.1 - - + + org.jetbrains.kotlin kotlin-stdlib-jdk8 1.9.25 - + org.eclipse.jetty diff --git a/presto-common/src/main/java/com/facebook/presto/common/transaction/TransactionId.java b/presto-common/src/main/java/com/facebook/presto/common/transaction/TransactionId.java index 5befd2ef4abc5..7933539d8863d 100644 --- a/presto-common/src/main/java/com/facebook/presto/common/transaction/TransactionId.java +++ b/presto-common/src/main/java/com/facebook/presto/common/transaction/TransactionId.java @@ -23,18 +23,24 @@ import java.util.UUID; import static java.util.Objects.requireNonNull; +import static java.util.UUID.fromString; @ThriftStruct public final class TransactionId { private final UUID uuid; - @ThriftConstructor public TransactionId(UUID uuid) { this.uuid = requireNonNull(uuid, "uuid is null"); } + @ThriftConstructor + public TransactionId(String uuid) + { + this(fromString(uuid)); + } + public static TransactionId create() { return new TransactionId(UUID.randomUUID()); @@ -43,7 +49,7 @@ public static TransactionId create() @JsonCreator public static TransactionId valueOf(String value) { - return new TransactionId(UUID.fromString(value)); + return new TransactionId(fromString(value)); } @Override @@ -67,12 +73,12 @@ public boolean equals(Object obj) @Override @JsonValue + @ThriftField(value = 1, name = "uuid") public String toString() { return uuid.toString(); } - @ThriftField(value = 1, name = "uuid") public UUID getUuid() { return uuid; diff --git a/presto-common/src/main/java/com/facebook/presto/common/type/TypeSignature.java b/presto-common/src/main/java/com/facebook/presto/common/type/TypeSignature.java index 7cdf0b91aee2f..0e47b01600af2 100644 --- a/presto-common/src/main/java/com/facebook/presto/common/type/TypeSignature.java +++ b/presto-common/src/main/java/com/facebook/presto/common/type/TypeSignature.java @@ -101,7 +101,6 @@ public TypeSignature(String base, List parameters) this(TypeSignatureBase.of(base), parameters); } - @ThriftConstructor public TypeSignature(TypeSignatureBase typeSignatureBase, List parameters) { this.base = typeSignatureBase; @@ -111,12 +110,18 @@ public TypeSignature(TypeSignatureBase typeSignatureBase, List getParameters() { return parameters; @@ -700,6 +704,7 @@ private static boolean isValidStartOfIdentifier(char c) @Override @JsonValue + @ThriftField(value = 1, name = "signature") public String toString() { String baseString = base.toString(); @@ -723,6 +728,12 @@ public String toString() return typeName.toString(); } + @ThriftField(2) + public boolean getIgnore() + { + return true; + } + private static void checkArgument(boolean argument, String format, Object... args) { if (!argument) { diff --git a/presto-common/src/main/java/com/facebook/presto/common/type/TypeSignatureParameterUnion.java b/presto-common/src/main/java/com/facebook/presto/common/type/TypeSignatureParameterUnion.java index 031a9063ad61c..f342255eab3ac 100644 --- a/presto-common/src/main/java/com/facebook/presto/common/type/TypeSignatureParameterUnion.java +++ b/presto-common/src/main/java/com/facebook/presto/common/type/TypeSignatureParameterUnion.java @@ -21,7 +21,6 @@ import java.util.Objects; -import static com.facebook.drift.annotations.ThriftField.Requiredness.OPTIONAL; import static com.facebook.presto.common.type.BigintEnumType.LongEnumMap; import static com.facebook.presto.common.type.ParameterKind.DISTINCT_TYPE; import static com.facebook.presto.common.type.ParameterKind.LONG; @@ -50,7 +49,7 @@ public TypeSignatureParameterUnion(TypeSignature typeSignature) this.id = (short) TYPE.getValue(); } - @ThriftField(value = 1, requiredness = OPTIONAL) + @ThriftField(1) public TypeSignature getTypeSignature() { return typeSignature; @@ -63,11 +62,12 @@ public TypeSignatureParameterUnion(NamedTypeSignature namedTypeSignature) this.id = (short) NAMED_TYPE.getValue(); } - @ThriftField(value = 2, requiredness = OPTIONAL) + @ThriftField(2) public NamedTypeSignature getNamedTypeSignature() { return namedTypeSignature; } + @ThriftConstructor public TypeSignatureParameterUnion(Long longLiteral) { @@ -75,7 +75,7 @@ public TypeSignatureParameterUnion(Long longLiteral) this.id = (short) LONG.getValue(); } - @ThriftField(value = 3, requiredness = OPTIONAL) + @ThriftField(3) public Long getLongLiteral() { return longLiteral; @@ -88,7 +88,7 @@ public TypeSignatureParameterUnion(String variable) this.id = (short) VARIABLE.getValue(); } - @ThriftField(value = 4, requiredness = OPTIONAL) + @ThriftField(4) public String getVariable() { return variable; @@ -101,7 +101,7 @@ public TypeSignatureParameterUnion(LongEnumMap longEnumMap) this.id = (short) LONG_ENUM.getValue(); } - @ThriftField(value = 5, requiredness = OPTIONAL) + @ThriftField(5) public LongEnumMap getLongEnumMap() { return longEnumMap; @@ -114,7 +114,7 @@ public TypeSignatureParameterUnion(VarcharEnumMap varcharEnumMap) this.id = (short) VARCHAR_ENUM.getValue(); } - @ThriftField(value = 6, requiredness = OPTIONAL) + @ThriftField(6) public VarcharEnumMap getVarcharEnumMap() { return varcharEnumMap; @@ -127,7 +127,7 @@ public TypeSignatureParameterUnion(DistinctTypeInfo distinctTypeInfo) this.id = (short) DISTINCT_TYPE.getValue(); } - @ThriftField(value = 7, requiredness = OPTIONAL) + @ThriftField(7) public DistinctTypeInfo getDistinctTypeInfo() { return distinctTypeInfo; diff --git a/presto-main-base/pom.xml b/presto-main-base/pom.xml index 259fdad0d025e..b9a5b2d3f5a6b 100644 --- a/presto-main-base/pom.xml +++ b/presto-main-base/pom.xml @@ -302,7 +302,6 @@ jts-core - org.apache.datasketches datasketches-memory diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/ExecutionFailureInfo.java b/presto-main-base/src/main/java/com/facebook/presto/execution/ExecutionFailureInfo.java index 88c0f957a4e98..c9da4a4ba2196 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/execution/ExecutionFailureInfo.java +++ b/presto-main-base/src/main/java/com/facebook/presto/execution/ExecutionFailureInfo.java @@ -15,6 +15,7 @@ import com.facebook.drift.annotations.ThriftConstructor; import com.facebook.drift.annotations.ThriftField; +import com.facebook.drift.annotations.ThriftIdlAnnotation; import com.facebook.drift.annotations.ThriftStruct; import com.facebook.presto.client.ErrorLocation; import com.facebook.presto.client.FailureInfo; @@ -99,7 +100,8 @@ public String getMessage() @Nullable @JsonProperty - @ThriftField(value = 3, isRecursive = TRUE, requiredness = OPTIONAL) + @ThriftField(value = 3, isRecursive = TRUE, requiredness = OPTIONAL, idlAnnotations = { + @ThriftIdlAnnotation(key = "cpp.ref_type", value = "\"shared\"")}) public ExecutionFailureInfo getCause() { return cause; diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/ScheduledSplit.java b/presto-main-base/src/main/java/com/facebook/presto/execution/ScheduledSplit.java index ae1d2a01bfd59..89f782bda2114 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/execution/ScheduledSplit.java +++ b/presto-main-base/src/main/java/com/facebook/presto/execution/ScheduledSplit.java @@ -13,6 +13,9 @@ */ package com.facebook.presto.execution; +import com.facebook.drift.annotations.ThriftConstructor; +import com.facebook.drift.annotations.ThriftField; +import com.facebook.drift.annotations.ThriftStruct; import com.facebook.presto.metadata.Split; import com.facebook.presto.spi.plan.PlanNodeId; import com.fasterxml.jackson.annotation.JsonCreator; @@ -22,6 +25,7 @@ import static com.google.common.base.MoreObjects.toStringHelper; import static java.util.Objects.requireNonNull; +@ThriftStruct public class ScheduledSplit { private final long sequenceId; @@ -29,6 +33,7 @@ public class ScheduledSplit private final Split split; @JsonCreator + @ThriftConstructor public ScheduledSplit( @JsonProperty("sequenceId") long sequenceId, @JsonProperty("planNodeId") PlanNodeId planNodeId, @@ -40,18 +45,21 @@ public ScheduledSplit( } @JsonProperty + @ThriftField(1) public long getSequenceId() { return sequenceId; } @JsonProperty + @ThriftField(2) public PlanNodeId getPlanNodeId() { return planNodeId; } @JsonProperty + @ThriftField(3) public Split getSplit() { return split; diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/TaskSource.java b/presto-main-base/src/main/java/com/facebook/presto/execution/TaskSource.java index 93384ebf7eb46..965305d6a5847 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/execution/TaskSource.java +++ b/presto-main-base/src/main/java/com/facebook/presto/execution/TaskSource.java @@ -13,6 +13,9 @@ */ package com.facebook.presto.execution; +import com.facebook.drift.annotations.ThriftConstructor; +import com.facebook.drift.annotations.ThriftField; +import com.facebook.drift.annotations.ThriftStruct; import com.facebook.presto.spi.plan.PlanNodeId; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -24,6 +27,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; +@ThriftStruct public class TaskSource { private final PlanNodeId planNodeId; @@ -32,6 +36,7 @@ public class TaskSource private final boolean noMoreSplits; @JsonCreator + @ThriftConstructor public TaskSource( @JsonProperty("planNodeId") PlanNodeId planNodeId, @JsonProperty("splits") Set splits, @@ -50,24 +55,28 @@ public TaskSource(PlanNodeId planNodeId, Set splits, boolean noM } @JsonProperty + @ThriftField(1) public PlanNodeId getPlanNodeId() { return planNodeId; } @JsonProperty + @ThriftField(2) public Set getSplits() { return splits; } @JsonProperty + @ThriftField(3) public Set getNoMoreSplitsForLifespan() { return noMoreSplitsForLifespan; } @JsonProperty + @ThriftField(value = 4, name = "noMoreSplits") public boolean isNoMoreSplits() { return noMoreSplits; diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/buffer/OutputBuffers.java b/presto-main-base/src/main/java/com/facebook/presto/execution/buffer/OutputBuffers.java index af214fce156fc..a861782f60c1e 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/execution/buffer/OutputBuffers.java +++ b/presto-main-base/src/main/java/com/facebook/presto/execution/buffer/OutputBuffers.java @@ -15,6 +15,7 @@ import com.facebook.drift.annotations.ThriftConstructor; import com.facebook.drift.annotations.ThriftEnum; +import com.facebook.drift.annotations.ThriftEnumValue; import com.facebook.drift.annotations.ThriftField; import com.facebook.drift.annotations.ThriftStruct; import com.facebook.presto.spi.plan.PartitioningHandle; @@ -81,11 +82,24 @@ public static OutputBuffers createSpoolingOutputBuffers() @ThriftEnum public enum BufferType { - PARTITIONED, - BROADCAST, - ARBITRARY, - DISCARDING, - SPOOLING, + PARTITIONED(0), + BROADCAST(1), + ARBITRARY(2), + DISCARDING(3), + SPOOLING(4); + + private final int value; + + BufferType(int value) + { + this.value = value; + } + + @ThriftEnumValue + public int getValue() + { + return value; + } } private final BufferType type; diff --git a/presto-main-base/src/main/java/com/facebook/presto/operator/BlockedReason.java b/presto-main-base/src/main/java/com/facebook/presto/operator/BlockedReason.java index 1fc155bc9684a..6e8af1e7d7c24 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/operator/BlockedReason.java +++ b/presto-main-base/src/main/java/com/facebook/presto/operator/BlockedReason.java @@ -19,11 +19,18 @@ @ThriftEnum public enum BlockedReason { - WAITING_FOR_MEMORY; + WAITING_FOR_MEMORY(0); + + private final int value; + + BlockedReason(int value) + { + this.value = value; + } @ThriftEnumValue public int getValue() { - return 1; + return value; } } diff --git a/presto-main-base/src/main/java/com/facebook/presto/operator/OperatorInfoUnion.java b/presto-main-base/src/main/java/com/facebook/presto/operator/OperatorInfoUnion.java index 5bd9ac0cfc86d..887eca18bec98 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/operator/OperatorInfoUnion.java +++ b/presto-main-base/src/main/java/com/facebook/presto/operator/OperatorInfoUnion.java @@ -20,8 +20,6 @@ import com.facebook.presto.operator.exchange.LocalExchangeBufferInfo; import com.facebook.presto.operator.repartition.PartitionedOutputInfo; -import static com.facebook.drift.annotations.ThriftField.Requiredness.OPTIONAL; - @ThriftUnion public class OperatorInfoUnion { @@ -54,7 +52,7 @@ public OperatorInfoUnion(ExchangeClientStatus exchangeClientStatus) this.id = 1; } - @ThriftField(value = 1, requiredness = OPTIONAL) + @ThriftField(1) public ExchangeClientStatus getExchangeClientStatus() { return exchangeClientStatus; @@ -67,7 +65,7 @@ public OperatorInfoUnion(LocalExchangeBufferInfo localExchangeBufferInfo) this.id = 2; } - @ThriftField(value = 2, requiredness = OPTIONAL) + @ThriftField(2) public LocalExchangeBufferInfo getLocalExchangeBufferInfo() { return localExchangeBufferInfo; @@ -80,7 +78,7 @@ public OperatorInfoUnion(TableFinishInfo tableFinishInfo) this.id = 3; } - @ThriftField(value = 3, requiredness = OPTIONAL) + @ThriftField(3) public TableFinishInfo getTableFinishInfo() { return tableFinishInfo; @@ -93,7 +91,7 @@ public OperatorInfoUnion(SplitOperatorInfo splitOperatorInfo) this.id = 4; } - @ThriftField(value = 4, requiredness = OPTIONAL) + @ThriftField(4) public SplitOperatorInfo getSplitOperatorInfo() { return splitOperatorInfo; @@ -106,7 +104,7 @@ public OperatorInfoUnion(HashCollisionsInfo hashCollisionsInfo) this.id = 5; } - @ThriftField(value = 5, requiredness = OPTIONAL) + @ThriftField(5) public HashCollisionsInfo getHashCollisionsInfo() { return hashCollisionsInfo; @@ -119,7 +117,7 @@ public OperatorInfoUnion(PartitionedOutputInfo partitionedOutputInfo) this.id = 6; } - @ThriftField(value = 6, requiredness = OPTIONAL) + @ThriftField(6) public PartitionedOutputInfo getPartitionedOutputInfo() { return partitionedOutputInfo; @@ -132,7 +130,7 @@ public OperatorInfoUnion(JoinOperatorInfo joinOperatorInfo) this.id = 7; } - @ThriftField(value = 7, requiredness = OPTIONAL) + @ThriftField(7) public JoinOperatorInfo getJoinOperatorInfo() { return joinOperatorInfo; @@ -145,7 +143,7 @@ public OperatorInfoUnion(WindowInfo windowInfo) this.id = 8; } - @ThriftField(value = 8, requiredness = OPTIONAL) + @ThriftField(8) public WindowInfo getWindowInfo() { return windowInfo; @@ -158,7 +156,7 @@ public OperatorInfoUnion(TableWriterOperator.TableWriterInfo tableWriterInfo) this.id = 9; } - @ThriftField(value = 9, requiredness = OPTIONAL) + @ThriftField(9) public TableWriterOperator.TableWriterInfo getTableWriterInfo() { return tableWriterInfo; @@ -171,7 +169,7 @@ public OperatorInfoUnion(TableWriterMergeInfo tableWriterMergeInfo) this.id = 10; } - @ThriftField(value = 10, requiredness = OPTIONAL) + @ThriftField(10) public TableWriterMergeInfo getTableWriterMergeInfo() { return tableWriterMergeInfo; diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/InternalCommunicationConfig.java b/presto-main-base/src/main/java/com/facebook/presto/server/InternalCommunicationConfig.java index ba60a7cd76134..d76ff77dd1a03 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/server/InternalCommunicationConfig.java +++ b/presto-main-base/src/main/java/com/facebook/presto/server/InternalCommunicationConfig.java @@ -42,6 +42,8 @@ public class InternalCommunicationConfig private boolean binaryTransportEnabled; private boolean thriftTransportEnabled; private boolean taskInfoThriftTransportEnabled; + private boolean taskUpdateRequestThriftSerdeEnabled; + private boolean taskInfoResponseThriftSerdeEnabled; private Protocol thriftProtocol = Protocol.BINARY; private DataSize maxTaskUpdateSize = new DataSize(16, MEGABYTE); private CommunicationProtocol taskCommunicationProtocol = CommunicationProtocol.HTTP; @@ -187,6 +189,32 @@ public InternalCommunicationConfig setThriftTransportEnabled(boolean thriftTrans return this; } + public boolean isTaskUpdateRequestThriftSerdeEnabled() + { + return taskUpdateRequestThriftSerdeEnabled; + } + + @Config("experimental.internal-communication.task-update-request-thrift-serde-enabled") + @ConfigDescription("Enables thrift encoding support for Task Update Request") + public InternalCommunicationConfig setTaskUpdateRequestThriftSerdeEnabled(boolean taskUpdateRequestThriftSerdeEnabled) + { + this.taskUpdateRequestThriftSerdeEnabled = taskUpdateRequestThriftSerdeEnabled; + return this; + } + + public boolean isTaskInfoResponseThriftSerdeEnabled() + { + return taskInfoResponseThriftSerdeEnabled; + } + + @Config("experimental.internal-communication.task-info-response-thrift-serde-enabled") + @ConfigDescription("Enables thrift encoding support for Task Info Response") + public InternalCommunicationConfig setTaskInfoResponseThriftSerdeEnabled(boolean taskInfoResponseThriftSerdeEnabled) + { + this.taskInfoResponseThriftSerdeEnabled = taskInfoResponseThriftSerdeEnabled; + return this; + } + public boolean isTaskInfoThriftTransportEnabled() { return taskInfoThriftTransportEnabled; diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/TaskResourceUtils.java b/presto-main-base/src/main/java/com/facebook/presto/server/TaskResourceUtils.java index 409bc41e1b025..ebf53381f606f 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/server/TaskResourceUtils.java +++ b/presto-main-base/src/main/java/com/facebook/presto/server/TaskResourceUtils.java @@ -40,7 +40,7 @@ private TaskResourceUtils() { } - public static boolean isThriftRequest(HttpHeaders httpHeaders) + public static boolean isThriftAcceptable(HttpHeaders httpHeaders) { return httpHeaders.getAcceptableMediaTypes().stream() .anyMatch(mediaType -> mediaType.toString().contains("application/x-thrift")); diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/TaskUpdateRequest.java b/presto-main-base/src/main/java/com/facebook/presto/server/TaskUpdateRequest.java index df993dcee1673..634e71b6cc130 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/server/TaskUpdateRequest.java +++ b/presto-main-base/src/main/java/com/facebook/presto/server/TaskUpdateRequest.java @@ -13,6 +13,9 @@ */ package com.facebook.presto.server; +import com.facebook.drift.annotations.ThriftConstructor; +import com.facebook.drift.annotations.ThriftField; +import com.facebook.drift.annotations.ThriftStruct; import com.facebook.presto.SessionRepresentation; import com.facebook.presto.execution.TaskSource; import com.facebook.presto.execution.buffer.OutputBuffers; @@ -30,6 +33,7 @@ import static com.google.common.base.MoreObjects.toStringHelper; import static java.util.Objects.requireNonNull; +@ThriftStruct public class TaskUpdateRequest { private final SessionRepresentation session; @@ -40,6 +44,7 @@ public class TaskUpdateRequest private final OutputBuffers outputIds; private final Optional tableWriteInfo; + @ThriftConstructor @JsonCreator public TaskUpdateRequest( @JsonProperty("session") SessionRepresentation session, @@ -65,12 +70,14 @@ public TaskUpdateRequest( } @JsonProperty + @ThriftField(1) public SessionRepresentation getSession() { return session; } @JsonProperty + @ThriftField(2) public Map getExtraCredentials() { return extraCredentials; @@ -78,24 +85,28 @@ public Map getExtraCredentials() @JsonInclude(NON_ABSENT) @JsonProperty + @ThriftField(3) public Optional getFragment() { return fragment; } @JsonProperty + @ThriftField(4) public List getSources() { return sources; } @JsonProperty + @ThriftField(5) public OutputBuffers getOutputIds() { return outputIds; } @JsonProperty + @ThriftField(6) public Optional getTableWriteInfo() { return tableWriteInfo; diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/CustomCodecUtils.java b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/CustomCodecUtils.java new file mode 100644 index 0000000000000..a6e4b7a32df5e --- /dev/null +++ b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/CustomCodecUtils.java @@ -0,0 +1,115 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.server.thrift; + +import com.facebook.airlift.json.JsonCodec; +import com.facebook.drift.TException; +import com.facebook.drift.codec.metadata.DefaultThriftTypeReference; +import com.facebook.drift.codec.metadata.FieldKind; +import com.facebook.drift.codec.metadata.ThriftCatalog; +import com.facebook.drift.codec.metadata.ThriftFieldExtractor; +import com.facebook.drift.codec.metadata.ThriftFieldMetadata; +import com.facebook.drift.codec.metadata.ThriftStructMetadata; +import com.facebook.drift.codec.metadata.ThriftType; +import com.facebook.drift.protocol.TField; +import com.facebook.drift.protocol.TProtocolException; +import com.facebook.drift.protocol.TProtocolReader; +import com.facebook.drift.protocol.TProtocolWriter; +import com.facebook.drift.protocol.TStruct; +import com.facebook.drift.protocol.TType; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import java.util.Optional; + +import static com.facebook.drift.annotations.ThriftField.Requiredness.NONE; +import static com.google.common.base.Preconditions.checkNotNull; +import static java.lang.String.format; + +/*** + * When we need a custom codec for a primitive type, we need a wrapper to pass the needsCodec check within ThriftCodecByteCodeGenerator.java + */ +public class CustomCodecUtils +{ + private CustomCodecUtils() {} + + public static ThriftStructMetadata createSyntheticMetadata(ThriftCatalog thriftCatalog, short fieldId, String fieldName, Class originalType, Class referencedType) + { + checkNotNull(thriftCatalog.getThriftType(referencedType), "Can not find corresponding thrift type for type %s", referencedType); + + ThriftType thriftType = thriftCatalog.getThriftType(referencedType); + + ThriftFieldMetadata fieldMetaData = new ThriftFieldMetadata( + fieldId, + false, false, NONE, ImmutableMap.of(), + new DefaultThriftTypeReference(thriftType), + fieldName, + FieldKind.THRIFT_FIELD, + ImmutableList.of(), + Optional.empty(), + Optional.empty(), + Optional.of(new ThriftFieldExtractor( + fieldId, + fieldName, + FieldKind.THRIFT_FIELD, + originalType.getDeclaredFields()[0], // Any field should work since we are handing extraction in codec on our own + referencedType)), + Optional.empty()); + return new ThriftStructMetadata( + originalType.getSimpleName() + "Wrapper", + ImmutableMap.of(), + originalType, null, + ThriftStructMetadata.MetadataType.STRUCT, + Optional.empty(), ImmutableList.of(), ImmutableList.of(fieldMetaData), Optional.empty(), ImmutableList.of()); + } + + public static T readSingleJsonField(TProtocolReader protocol, JsonCodec jsonCodec, short fieldId, String fieldName) + throws TException + { + protocol.readStructBegin(); + String jsonValue = null; + TField field = protocol.readFieldBegin(); + while (field.getType() != TType.STOP) { + if (field.getId() == fieldId) { + if (field.getType() == TType.STRING) { + jsonValue = protocol.readString(); + } + else { + throw new TProtocolException(format("Unexpected field type: %s for field %s", field.getType(), fieldName)); + } + } + protocol.readFieldEnd(); + field = protocol.readFieldBegin(); + } + protocol.readStructEnd(); + + if (jsonValue == null) { + throw new TProtocolException(format("Required field '%s' was not found", fieldName)); + } + return jsonCodec.fromJson(jsonValue); + } + + public static void writeSingleJsonField(T value, TProtocolWriter protocol, JsonCodec jsonCodec, short fieldId, String fieldName, String structName) + throws TException + { + protocol.writeStructBegin(new TStruct(structName)); + + protocol.writeFieldBegin(new TField(fieldName, TType.STRING, fieldId)); + protocol.writeString(jsonCodec.toJson(value)); + protocol.writeFieldEnd(); + + protocol.writeFieldStop(); + protocol.writeStructEnd(); + } +} diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/MetadataUpdatesCodec.java b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/MetadataUpdatesCodec.java new file mode 100644 index 0000000000000..d53fe9986a01f --- /dev/null +++ b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/MetadataUpdatesCodec.java @@ -0,0 +1,73 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.server.thrift; + +import com.facebook.airlift.json.JsonCodec; +import com.facebook.drift.codec.ThriftCodec; +import com.facebook.drift.codec.metadata.ThriftCatalog; +import com.facebook.drift.codec.metadata.ThriftStructMetadata; +import com.facebook.drift.codec.metadata.ThriftType; +import com.facebook.drift.protocol.TProtocolReader; +import com.facebook.drift.protocol.TProtocolWriter; +import com.facebook.presto.metadata.MetadataUpdates; + +import javax.inject.Inject; + +import static com.facebook.presto.server.thrift.CustomCodecUtils.createSyntheticMetadata; +import static com.facebook.presto.server.thrift.CustomCodecUtils.readSingleJsonField; +import static com.facebook.presto.server.thrift.CustomCodecUtils.writeSingleJsonField; +import static java.util.Objects.requireNonNull; + +public class MetadataUpdatesCodec + implements ThriftCodec +{ + private static final short METADATA_UPDATES_DATA_FIELD_ID = 1; + private static final String METADATA_UPDATES_DATA_FIELD_NAME = "metadataUpdates"; + private static final String METADATA_UPDATES_STRUCT_NAME = "MetadataUpdates"; + + private final ThriftCatalog thriftCatalog; + private final ThriftType syntheticStructType; + private final JsonCodec jsonCodec; + + @Inject + public MetadataUpdatesCodec(JsonCodec jsonCodec, ThriftCatalog thriftCatalog) + { + this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null"); + this.thriftCatalog = requireNonNull(thriftCatalog, "thriftCatalog is null"); + + ThriftStructMetadata structMetadata = createSyntheticMetadata(thriftCatalog, METADATA_UPDATES_DATA_FIELD_ID, METADATA_UPDATES_DATA_FIELD_NAME, MetadataUpdates.class, String.class); + this.syntheticStructType = ThriftType.struct(structMetadata); + + thriftCatalog.addThriftType(syntheticStructType); + } + + public ThriftType getType() + { + return syntheticStructType; + } + + @Override + public MetadataUpdates read(TProtocolReader protocol) + throws Exception + { + return readSingleJsonField(protocol, jsonCodec, METADATA_UPDATES_DATA_FIELD_ID, METADATA_UPDATES_DATA_FIELD_NAME); + } + + @Override + public void write(MetadataUpdates value, TProtocolWriter protocol) + throws Exception + { + writeSingleJsonField(value, protocol, jsonCodec, METADATA_UPDATES_DATA_FIELD_ID, METADATA_UPDATES_DATA_FIELD_NAME, METADATA_UPDATES_STRUCT_NAME); + } +} diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/SplitCodec.java b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/SplitCodec.java new file mode 100644 index 0000000000000..911b094aebf46 --- /dev/null +++ b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/SplitCodec.java @@ -0,0 +1,73 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.server.thrift; + +import com.facebook.airlift.json.JsonCodec; +import com.facebook.drift.codec.ThriftCodec; +import com.facebook.drift.codec.metadata.ThriftCatalog; +import com.facebook.drift.codec.metadata.ThriftStructMetadata; +import com.facebook.drift.codec.metadata.ThriftType; +import com.facebook.drift.protocol.TProtocolReader; +import com.facebook.drift.protocol.TProtocolWriter; +import com.facebook.presto.metadata.Split; + +import javax.inject.Inject; + +import static com.facebook.presto.server.thrift.CustomCodecUtils.createSyntheticMetadata; +import static com.facebook.presto.server.thrift.CustomCodecUtils.readSingleJsonField; +import static com.facebook.presto.server.thrift.CustomCodecUtils.writeSingleJsonField; +import static java.util.Objects.requireNonNull; + +public class SplitCodec + implements ThriftCodec +{ + private static final short SPLIT_DATA_FIELD_ID = 1; + private static final String SPLIT_DATA_FIELD_NAME = "split"; + private static final String SPLIT_DATA_STRUCT_NAME = "Split"; + + private final ThriftCatalog thriftCatalog; + private final ThriftType syntheticStructType; + private final JsonCodec jsonCodec; + + @Inject + public SplitCodec(JsonCodec jsonCodec, ThriftCatalog thriftCatalog) + { + this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null"); + this.thriftCatalog = requireNonNull(thriftCatalog, "thriftCatalog is null"); + + ThriftStructMetadata structMetadata = createSyntheticMetadata(thriftCatalog, SPLIT_DATA_FIELD_ID, SPLIT_DATA_FIELD_NAME, Split.class, String.class); + this.syntheticStructType = ThriftType.struct(structMetadata); + + thriftCatalog.addThriftType(syntheticStructType); + } + + public ThriftType getType() + { + return syntheticStructType; + } + + @Override + public Split read(TProtocolReader protocol) + throws Exception + { + return readSingleJsonField(protocol, jsonCodec, SPLIT_DATA_FIELD_ID, SPLIT_DATA_FIELD_NAME); + } + + @Override + public void write(Split value, TProtocolWriter protocol) + throws Exception + { + writeSingleJsonField(value, protocol, jsonCodec, SPLIT_DATA_FIELD_ID, SPLIT_DATA_FIELD_NAME, SPLIT_DATA_STRUCT_NAME); + } +} diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/TableWriteInfoCodec.java b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/TableWriteInfoCodec.java new file mode 100644 index 0000000000000..9b6953ca8055f --- /dev/null +++ b/presto-main-base/src/main/java/com/facebook/presto/server/thrift/TableWriteInfoCodec.java @@ -0,0 +1,73 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.server.thrift; + +import com.facebook.airlift.json.JsonCodec; +import com.facebook.drift.codec.ThriftCodec; +import com.facebook.drift.codec.metadata.ThriftCatalog; +import com.facebook.drift.codec.metadata.ThriftStructMetadata; +import com.facebook.drift.codec.metadata.ThriftType; +import com.facebook.drift.protocol.TProtocolReader; +import com.facebook.drift.protocol.TProtocolWriter; +import com.facebook.presto.execution.scheduler.TableWriteInfo; + +import javax.inject.Inject; + +import static com.facebook.presto.server.thrift.CustomCodecUtils.createSyntheticMetadata; +import static com.facebook.presto.server.thrift.CustomCodecUtils.readSingleJsonField; +import static com.facebook.presto.server.thrift.CustomCodecUtils.writeSingleJsonField; +import static java.util.Objects.requireNonNull; + +public class TableWriteInfoCodec + implements ThriftCodec +{ + private static final short TABLE_WRITE_INFO_DATA_FIELD_ID = 1; + private static final String TABLE_WRITE_INFO_DATA_FIELD_NAME = "tableWriteInfo"; + private static final String TABLE_WRITE_INFO_STRUCT_NAME = "TableWriteInfo"; + + private final ThriftCatalog thriftCatalog; + private final ThriftType syntheticStructType; + private final JsonCodec jsonCodec; + + @Inject + public TableWriteInfoCodec(JsonCodec jsonCodec, ThriftCatalog thriftCatalog) + { + this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null"); + this.thriftCatalog = requireNonNull(thriftCatalog, "thriftCatalog is null"); + + ThriftStructMetadata structMetadata = createSyntheticMetadata(thriftCatalog, TABLE_WRITE_INFO_DATA_FIELD_ID, TABLE_WRITE_INFO_DATA_FIELD_NAME, TableWriteInfo.class, String.class); + this.syntheticStructType = ThriftType.struct(structMetadata); + + thriftCatalog.addThriftType(syntheticStructType); + } + + public ThriftType getType() + { + return syntheticStructType; + } + + @Override + public TableWriteInfo read(TProtocolReader protocol) + throws Exception + { + return readSingleJsonField(protocol, jsonCodec, TABLE_WRITE_INFO_DATA_FIELD_ID, TABLE_WRITE_INFO_DATA_FIELD_NAME); + } + + @Override + public void write(TableWriteInfo value, TProtocolWriter protocol) + throws Exception + { + writeSingleJsonField(value, protocol, jsonCodec, TABLE_WRITE_INFO_DATA_FIELD_ID, TABLE_WRITE_INFO_DATA_FIELD_NAME, TABLE_WRITE_INFO_STRUCT_NAME); + } +} diff --git a/presto-main-base/src/test/java/com/facebook/presto/server/TestInternalCommunicationConfig.java b/presto-main-base/src/test/java/com/facebook/presto/server/TestInternalCommunicationConfig.java index 75df9f97f9471..b171d5fdda8c3 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/server/TestInternalCommunicationConfig.java +++ b/presto-main-base/src/test/java/com/facebook/presto/server/TestInternalCommunicationConfig.java @@ -50,6 +50,8 @@ public void testDefaults() .setThriftProtocol(Protocol.BINARY) .setMemoizeDeadNodesEnabled(false) .setSharedSecret(null) + .setTaskUpdateRequestThriftSerdeEnabled(false) + .setTaskInfoResponseThriftSerdeEnabled(false) .setInternalJwtEnabled(false)); } @@ -76,6 +78,8 @@ public void testExplicitPropertyMappings() .put("internal-communication.memoize-dead-nodes-enabled", "true") .put("internal-communication.shared-secret", "secret") .put("internal-communication.jwt.enabled", "true") + .put("experimental.internal-communication.task-update-request-thrift-serde-enabled", "true") + .put("experimental.internal-communication.task-info-response-thrift-serde-enabled", "true") .build(); InternalCommunicationConfig expected = new InternalCommunicationConfig() @@ -97,7 +101,9 @@ public void testExplicitPropertyMappings() .setThriftProtocol(Protocol.COMPACT) .setMemoizeDeadNodesEnabled(true) .setSharedSecret("secret") - .setInternalJwtEnabled(true); + .setInternalJwtEnabled(true) + .setTaskUpdateRequestThriftSerdeEnabled(true) + .setTaskInfoResponseThriftSerdeEnabled(true); assertFullMapping(properties, expected); } diff --git a/presto-main/src/main/java/com/facebook/presto/server/RequestHelpers.java b/presto-main/src/main/java/com/facebook/presto/server/RequestHelpers.java index ad70b778f8097..65e1165de99cc 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/RequestHelpers.java +++ b/presto-main/src/main/java/com/facebook/presto/server/RequestHelpers.java @@ -15,6 +15,7 @@ import com.facebook.airlift.http.client.Request.Builder; +import static com.facebook.airlift.http.client.thrift.ThriftRequestUtils.APPLICATION_THRIFT_BINARY; import static com.facebook.presto.PrestoMediaTypes.APPLICATION_JACKSON_SMILE; import static com.google.common.net.HttpHeaders.ACCEPT; import static com.google.common.net.MediaType.JSON_UTF_8; @@ -38,6 +39,35 @@ public static Builder setContentTypeHeaders(boolean isBinaryTransportEnabled, Bu return getJsonTransportBuilder(requestBuilder); } + public static Builder setTaskUpdateRequestContentTypeHeaders(boolean isTaskUpdateRequestThriftTransportEnabled, boolean isBinaryTransportEnabled, Builder requestBuilder) + { + if (isTaskUpdateRequestThriftTransportEnabled) { + requestBuilder.setHeader(CONTENT_TYPE, APPLICATION_THRIFT_BINARY); + } + else if (isBinaryTransportEnabled) { + requestBuilder.setHeader(CONTENT_TYPE, APPLICATION_JACKSON_SMILE); + } + else { + requestBuilder.setHeader(CONTENT_TYPE, JSON_UTF_8.toString()); + } + + return requestBuilder; + } + + public static Builder setTaskInfoAcceptTypeHeaders(boolean isTaskInfoThriftTransportEnabled, boolean isBinaryTransportEnabled, Builder requestBuilder) + { + if (isTaskInfoThriftTransportEnabled) { + requestBuilder.setHeader(ACCEPT, APPLICATION_THRIFT_BINARY); + } + else if (isBinaryTransportEnabled) { + requestBuilder.setHeader(ACCEPT, APPLICATION_JACKSON_SMILE); + } + else { + requestBuilder.setHeader(ACCEPT, JSON_UTF_8.toString()); + } + return requestBuilder; + } + public static Builder getBinaryTransportBuilder(Builder requestBuilder) { return requestBuilder diff --git a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java index ecce8a92ce850..7c9f8742ccea7 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java +++ b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java @@ -100,6 +100,7 @@ import com.facebook.presto.metadata.MetadataUpdates; import com.facebook.presto.metadata.SchemaPropertyManager; import com.facebook.presto.metadata.SessionPropertyManager; +import com.facebook.presto.metadata.Split; import com.facebook.presto.metadata.StaticCatalogStore; import com.facebook.presto.metadata.StaticCatalogStoreConfig; import com.facebook.presto.metadata.StaticFunctionNamespaceStore; @@ -140,10 +141,14 @@ import com.facebook.presto.resourcemanager.ResourceManagerResourceGroupService; import com.facebook.presto.server.remotetask.HttpLocationFactory; import com.facebook.presto.server.thrift.FixedAddressSelector; +import com.facebook.presto.server.thrift.MetadataUpdatesCodec; +import com.facebook.presto.server.thrift.SplitCodec; +import com.facebook.presto.server.thrift.TableWriteInfoCodec; import com.facebook.presto.server.thrift.ThriftServerInfoClient; import com.facebook.presto.server.thrift.ThriftServerInfoService; import com.facebook.presto.server.thrift.ThriftTaskClient; import com.facebook.presto.server.thrift.ThriftTaskService; +import com.facebook.presto.server.thrift.ThriftTaskUpdateRequestBodyReader; import com.facebook.presto.sessionpropertyproviders.JavaWorkerSessionPropertyProvider; import com.facebook.presto.sessionpropertyproviders.NativeWorkerSessionPropertyProvider; import com.facebook.presto.spi.ConnectorMetadataUpdateHandle; @@ -417,6 +422,8 @@ else if (serverConfig.isCoordinator()) { // task execution jaxrsBinder(binder).bind(TaskResource.class); + jaxrsBinder(binder).bind(ThriftTaskUpdateRequestBodyReader.class); + newExporter(binder).export(TaskResource.class).withGeneratedName(); jaxrsBinder(binder).bind(TaskExecutorResource.class); newExporter(binder).export(TaskExecutorResource.class).withGeneratedName(); @@ -425,6 +432,9 @@ else if (serverConfig.isCoordinator()) { install(new DefaultThriftCodecsModule()); thriftCodecBinder(binder).bindCustomThriftCodec(SqlInvokedFunctionCodec.class); thriftCodecBinder(binder).bindCustomThriftCodec(SqlFunctionIdCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(MetadataUpdatesCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(SplitCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(TableWriteInfoCodec.class); jsonCodecBinder(binder).bindListJsonCodec(TaskMemoryReservationSummary.class); binder.bind(SqlTaskManager.class).in(Scopes.SINGLETON); @@ -558,6 +568,7 @@ public ListeningExecutorService createResourceManagerExecutor(ResourceManagerCon jsonCodecBinder(binder).bindJsonCodec(TableCommitContext.class); jsonCodecBinder(binder).bindJsonCodec(SqlInvokedFunction.class); jsonCodecBinder(binder).bindJsonCodec(TaskSource.class); + jsonCodecBinder(binder).bindJsonCodec(Split.class); jsonCodecBinder(binder).bindJsonCodec(TableWriteInfo.class); smileCodecBinder(binder).bindSmileCodec(TaskStatus.class); smileCodecBinder(binder).bindSmileCodec(TaskInfo.class); @@ -704,6 +715,7 @@ public ListeningExecutorService createResourceManagerExecutor(ResourceManagerCon jsonBinder(binder).addSerializerBinding(Expression.class).to(ExpressionSerializer.class); jsonBinder(binder).addDeserializerBinding(Expression.class).to(ExpressionDeserializer.class); jsonBinder(binder).addDeserializerBinding(FunctionCall.class).to(FunctionCallDeserializer.class); + thriftCodecBinder(binder).bindThriftCodec(TaskUpdateRequest.class); // metadata updates jsonCodecBinder(binder).bindJsonCodec(MetadataUpdates.class); diff --git a/presto-main/src/main/java/com/facebook/presto/server/TaskResource.java b/presto-main/src/main/java/com/facebook/presto/server/TaskResource.java index 17dc3450790e7..dc0c288d4cb37 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/TaskResource.java +++ b/presto-main/src/main/java/com/facebook/presto/server/TaskResource.java @@ -69,7 +69,7 @@ import static com.facebook.presto.client.PrestoHeaders.PRESTO_CURRENT_STATE; import static com.facebook.presto.client.PrestoHeaders.PRESTO_MAX_WAIT; import static com.facebook.presto.server.TaskResourceUtils.convertToThriftTaskInfo; -import static com.facebook.presto.server.TaskResourceUtils.isThriftRequest; +import static com.facebook.presto.server.TaskResourceUtils.isThriftAcceptable; import static com.facebook.presto.server.security.RoleType.INTERNAL; import static com.facebook.presto.util.TaskUtils.randomizeWaitTime; import static com.google.common.collect.Iterables.transform; @@ -129,9 +129,11 @@ public List getAllTaskInfo(@Context UriInfo uriInfo) @POST @Path("{taskId}") - @Consumes({APPLICATION_JSON, APPLICATION_JACKSON_SMILE}) - @Produces({APPLICATION_JSON, APPLICATION_JACKSON_SMILE}) - public Response createOrUpdateTask(@PathParam("taskId") TaskId taskId, TaskUpdateRequest taskUpdateRequest, @Context UriInfo uriInfo) + @Consumes({APPLICATION_JSON, APPLICATION_JACKSON_SMILE, APPLICATION_THRIFT_BINARY}) + @Produces({APPLICATION_JSON, APPLICATION_JACKSON_SMILE, APPLICATION_THRIFT_BINARY}) + public Response createOrUpdateTask(@PathParam("taskId") TaskId taskId, + TaskUpdateRequest taskUpdateRequest, + @Context UriInfo uriInfo) { requireNonNull(taskUpdateRequest, "taskUpdateRequest is null"); @@ -146,7 +148,6 @@ public Response createOrUpdateTask(@PathParam("taskId") TaskId taskId, TaskUpdat if (shouldSummarize(uriInfo)) { taskInfo = taskInfo.summarize(); } - return Response.ok().entity(taskInfo).build(); } @@ -164,7 +165,7 @@ public void getTaskInfo( { requireNonNull(taskId, "taskId is null"); - boolean isThriftRequest = isThriftRequest(httpHeaders); + boolean isThriftRequest = isThriftAcceptable(httpHeaders); if (currentState == null || maxWait == null) { TaskInfo taskInfo = taskManager.getTaskInfo(taskId); @@ -272,7 +273,7 @@ public TaskInfo deleteTask( taskInfo = taskInfo.summarize(); } - if (isThriftRequest(httpHeaders)) { + if (isThriftAcceptable(httpHeaders)) { taskInfo = convertToThriftTaskInfo(taskInfo, connectorTypeSerdeManager, handleResolver); } diff --git a/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTask.java b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTask.java index 651e5d1961d5e..7a1518e1f468c 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTask.java +++ b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTask.java @@ -58,6 +58,7 @@ import com.facebook.presto.server.SimpleHttpResponseHandler; import com.facebook.presto.server.TaskUpdateRequest; import com.facebook.presto.server.smile.BaseResponse; +import com.facebook.presto.server.thrift.ThriftHttpResponseHandler; import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.SplitWeight; import com.facebook.presto.spi.plan.PlanNode; @@ -118,6 +119,8 @@ import static com.facebook.presto.server.RequestErrorTracker.isExpectedError; import static com.facebook.presto.server.RequestErrorTracker.taskRequestErrorTracker; import static com.facebook.presto.server.RequestHelpers.setContentTypeHeaders; +import static com.facebook.presto.server.RequestHelpers.setTaskInfoAcceptTypeHeaders; +import static com.facebook.presto.server.RequestHelpers.setTaskUpdateRequestContentTypeHeaders; import static com.facebook.presto.server.TaskResourceUtils.convertFromThriftTaskInfo; import static com.facebook.presto.server.smile.AdaptingJsonResponseHandler.createAdaptingJsonResponseHandler; import static com.facebook.presto.server.smile.FullSmileResponseHandler.createFullSmileResponseHandler; @@ -206,6 +209,7 @@ public final class HttpRemoteTask //Json codec required for TaskUpdateRequest endpoint which uses JSON and returns a TaskInfo private final Codec taskInfoJsonCodec; private final Codec taskUpdateRequestCodec; + private final Codec taskInfoResponseCodec; private final Codec planFragmentCodec; private final RequestErrorTracker updateErrorTracker; @@ -221,6 +225,8 @@ public final class HttpRemoteTask private final boolean binaryTransportEnabled; private final boolean thriftTransportEnabled; private final boolean taskInfoThriftTransportEnabled; + private final boolean taskUpdateRequestThriftSerdeEnabled; + private final boolean taskInfoResponseThriftSerdeEnabled; private final Protocol thriftProtocol; private final ConnectorTypeSerdeManager connectorTypeSerdeManager; private final HandleResolver handleResolver; @@ -255,6 +261,7 @@ public HttpRemoteTask( Codec taskInfoCodec, Codec taskInfoJsonCodec, Codec taskUpdateRequestCodec, + Codec taskInfoResponseCodec, Codec planFragmentCodec, Codec metadataUpdatesCodec, NodeStatsTracker nodeStatsTracker, @@ -262,6 +269,8 @@ public HttpRemoteTask( boolean binaryTransportEnabled, boolean thriftTransportEnabled, boolean taskInfoThriftTransportEnabled, + boolean taskUpdateRequestThriftSerdeEnabled, + boolean taskInfoResponseThriftSerdeEnabled, Protocol thriftProtocol, TableWriteInfo tableWriteInfo, long maxTaskUpdateSizeInBytes, @@ -314,6 +323,7 @@ public HttpRemoteTask( this.taskInfoCodec = taskInfoCodec; this.taskInfoJsonCodec = taskInfoJsonCodec; this.taskUpdateRequestCodec = taskUpdateRequestCodec; + this.taskInfoResponseCodec = taskInfoResponseCodec; this.planFragmentCodec = planFragmentCodec; this.updateErrorTracker = taskRequestErrorTracker(taskId, location, maxErrorDuration, errorScheduledExecutor, "updating task"); this.nodeStatsTracker = requireNonNull(nodeStatsTracker, "nodeStatsTracker is null"); @@ -322,6 +332,8 @@ public HttpRemoteTask( this.binaryTransportEnabled = binaryTransportEnabled; this.thriftTransportEnabled = thriftTransportEnabled; this.taskInfoThriftTransportEnabled = taskInfoThriftTransportEnabled; + this.taskUpdateRequestThriftSerdeEnabled = taskUpdateRequestThriftSerdeEnabled; + this.taskInfoResponseThriftSerdeEnabled = taskInfoResponseThriftSerdeEnabled; this.thriftProtocol = thriftProtocol; this.connectorTypeSerdeManager = connectorTypeSerdeManager; this.handleResolver = handleResolver; @@ -880,44 +892,51 @@ private synchronized void sendUpdate() outputBuffers.get(), writeInfo); long serializeStartCpuTimeNanos = THREAD_MX_BEAN.getCurrentThreadCpuTime(); - byte[] taskUpdateRequestJson = taskUpdateRequestCodec.toBytes(updateRequest); + + Request.Builder requestBuilder; + HttpUriBuilder uriBuilder = getHttpUriBuilder(taskStatus); + + byte[] taskUpdateRequestBytes = taskUpdateRequestCodec.toBytes(updateRequest); schedulerStatsTracker.recordTaskUpdateSerializedCpuTime(THREAD_MX_BEAN.getCurrentThreadCpuTime() - serializeStartCpuTimeNanos); - if (taskUpdateRequestJson.length > maxTaskUpdateSizeInBytes) { - failTask(new PrestoException(EXCEEDED_TASK_UPDATE_SIZE_LIMIT, getExceededTaskUpdateSizeMessage(taskUpdateRequestJson))); + if (taskUpdateRequestBytes.length > maxTaskUpdateSizeInBytes) { + failTask(new PrestoException(EXCEEDED_TASK_UPDATE_SIZE_LIMIT, getExceededTaskUpdateSizeMessage(taskUpdateRequestBytes))); } if (taskUpdateSizeTrackingEnabled) { - taskUpdateRequestSize.add(taskUpdateRequestJson.length); + taskUpdateRequestSize.add(taskUpdateRequestBytes.length); if (fragment.isPresent()) { - stats.updateWithPlanSize(taskUpdateRequestJson.length); + stats.updateWithPlanSize(taskUpdateRequestBytes.length); } else { if (ThreadLocalRandom.current().nextDouble() < UPDATE_WITHOUT_PLAN_STATS_SAMPLE_RATE) { // This is to keep track of the task update size even when the plan fragment is NOT present - stats.updateWithoutPlanSize(taskUpdateRequestJson.length); + stats.updateWithoutPlanSize(taskUpdateRequestBytes.length); } } } - - HttpUriBuilder uriBuilder = getHttpUriBuilder(taskStatus); - Request request = setContentTypeHeaders(binaryTransportEnabled, preparePost()) + requestBuilder = setTaskUpdateRequestContentTypeHeaders(taskUpdateRequestThriftSerdeEnabled, binaryTransportEnabled, preparePost()); + requestBuilder = setTaskInfoAcceptTypeHeaders(taskInfoResponseThriftSerdeEnabled, binaryTransportEnabled, requestBuilder); + Request request = requestBuilder .setUri(uriBuilder.build()) - .setBodyGenerator(createStaticBodyGenerator(taskUpdateRequestJson)) + .setBodyGenerator(createStaticBodyGenerator(taskUpdateRequestBytes)) .build(); ResponseHandler responseHandler; - if (binaryTransportEnabled) { - responseHandler = createFullSmileResponseHandler((SmileCodec) taskInfoCodec); + if (taskInfoResponseThriftSerdeEnabled) { + responseHandler = new ThriftResponseHandler(unwrapThriftCodec(taskInfoResponseCodec)); + } + else if (binaryTransportEnabled) { + responseHandler = createFullSmileResponseHandler((SmileCodec) taskInfoResponseCodec); } else { - responseHandler = createAdaptingJsonResponseHandler((JsonCodec) taskInfoJsonCodec); + responseHandler = createAdaptingJsonResponseHandler((JsonCodec) taskInfoResponseCodec); } updateErrorTracker.startRequest(); - ListenableFuture> future = httpClient.executeAsync(request, responseHandler); + ListenableFuture future = httpClient.executeAsync(request, responseHandler); currentRequest = future; currentRequestStartNanos = System.nanoTime(); if (!taskUpdateTimeline.isEmpty()) { @@ -928,10 +947,18 @@ private synchronized void sendUpdate() // and does so without grabbing the instance lock. needsUpdate.set(false); - Futures.addCallback( - future, - new SimpleHttpResponseHandler<>(new UpdateResponseHandler(sources), request.getUri(), stats.getHttpResponseStats(), REMOTE_TASK_ERROR), - executor); + if (taskInfoResponseThriftSerdeEnabled) { + Futures.addCallback( + (ListenableFuture>) future, + new ThriftHttpResponseHandler<>(new UpdateResponseHandler(sources), request.getUri(), stats.getHttpResponseStats(), REMOTE_TASK_ERROR), + executor); + } + else { + Futures.addCallback( + (ListenableFuture>) future, + new SimpleHttpResponseHandler<>(new UpdateResponseHandler(sources), request.getUri(), stats.getHttpResponseStats(), REMOTE_TASK_ERROR), + executor); + } } private String getExceededTaskUpdateSizeMessage(byte[] taskUpdateRequestJson) diff --git a/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTaskFactory.java b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTaskFactory.java index 169ba6419311e..9fe54c899f722 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTaskFactory.java +++ b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTaskFactory.java @@ -84,6 +84,7 @@ public class HttpRemoteTaskFactory //Json codec required for TaskUpdateRequest endpoint which uses JSON and returns a TaskInfo private final Codec taskInfoJsonCodec; private final Codec taskUpdateRequestCodec; + private final Codec taskInfoResponseCodec; private final Codec planFragmentCodec; private final Codec metadataUpdatesCodec; private final Duration maxErrorDuration; @@ -102,6 +103,8 @@ public class HttpRemoteTaskFactory private final boolean binaryTransportEnabled; private final boolean thriftTransportEnabled; private final boolean taskInfoThriftTransportEnabled; + private final boolean taskUpdateRequestThriftSerdeEnabled; + private final boolean taskInfoResponseThriftSerdeEnabled; private final Protocol thriftProtocol; private final int maxTaskUpdateSizeInBytes; private final MetadataManager metadataManager; @@ -125,6 +128,7 @@ public HttpRemoteTaskFactory( ThriftCodec taskInfoThriftCodec, JsonCodec taskUpdateRequestJsonCodec, SmileCodec taskUpdateRequestSmileCodec, + ThriftCodec taskUpdateRequestThriftCodec, JsonCodec planFragmentJsonCodec, SmileCodec planFragmentSmileCodec, JsonCodec metadataUpdatesJsonCodec, @@ -153,6 +157,9 @@ public HttpRemoteTaskFactory( binaryTransportEnabled = communicationConfig.isBinaryTransportEnabled(); thriftTransportEnabled = communicationConfig.isThriftTransportEnabled(); taskInfoThriftTransportEnabled = communicationConfig.isTaskInfoThriftTransportEnabled(); + taskUpdateRequestThriftSerdeEnabled = communicationConfig.isTaskUpdateRequestThriftSerdeEnabled(); + taskInfoResponseThriftSerdeEnabled = communicationConfig.isTaskInfoResponseThriftSerdeEnabled(); + thriftProtocol = communicationConfig.getThriftProtocol(); this.maxTaskUpdateSizeInBytes = toIntExact(requireNonNull(communicationConfig, "communicationConfig is null").getMaxTaskUpdateSize().toBytes()); @@ -176,13 +183,31 @@ else if (binaryTransportEnabled) { this.taskInfoCodec = taskInfoJsonCodec; } + if (taskUpdateRequestThriftSerdeEnabled) { + this.taskUpdateRequestCodec = wrapThriftCodec(taskUpdateRequestThriftCodec); + } + else if (binaryTransportEnabled) { + this.taskUpdateRequestCodec = taskUpdateRequestSmileCodec; + } + else { + this.taskUpdateRequestCodec = taskUpdateRequestJsonCodec; + } + + if (taskInfoResponseThriftSerdeEnabled) { + this.taskInfoResponseCodec = wrapThriftCodec(taskInfoThriftCodec); + } + else if (binaryTransportEnabled) { + this.taskInfoResponseCodec = taskInfoSmileCodec; + } + else { + this.taskInfoResponseCodec = taskInfoJsonCodec; + } + this.taskInfoJsonCodec = taskInfoJsonCodec; if (binaryTransportEnabled) { - this.taskUpdateRequestCodec = taskUpdateRequestSmileCodec; this.metadataUpdatesCodec = metadataUpdatesSmileCodec; } else { - this.taskUpdateRequestCodec = taskUpdateRequestJsonCodec; this.metadataUpdatesCodec = metadataUpdatesJsonCodec; } this.planFragmentCodec = planFragmentJsonCodec; @@ -264,6 +289,7 @@ public RemoteTask createRemoteTask( taskInfoCodec, taskInfoJsonCodec, taskUpdateRequestCodec, + taskInfoResponseCodec, planFragmentCodec, metadataUpdatesCodec, nodeStatsTracker, @@ -271,6 +297,8 @@ public RemoteTask createRemoteTask( binaryTransportEnabled, thriftTransportEnabled, taskInfoThriftTransportEnabled, + taskUpdateRequestThriftSerdeEnabled, + taskInfoResponseThriftSerdeEnabled, thriftProtocol, tableWriteInfo, maxTaskUpdateSizeInBytes, @@ -306,6 +334,7 @@ public RemoteTask createRemoteTask( taskInfoCodec, taskInfoJsonCodec, taskUpdateRequestCodec, + taskInfoResponseCodec, planFragmentCodec, metadataUpdatesCodec, nodeStatsTracker, @@ -313,6 +342,8 @@ public RemoteTask createRemoteTask( binaryTransportEnabled, thriftTransportEnabled, taskInfoThriftTransportEnabled, + taskUpdateRequestThriftSerdeEnabled, + taskInfoResponseThriftSerdeEnabled, thriftProtocol, tableWriteInfo, maxTaskUpdateSizeInBytes, diff --git a/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTaskWithEventLoop.java b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTaskWithEventLoop.java index d41ecd3502921..713ab1302023f 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTaskWithEventLoop.java +++ b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTaskWithEventLoop.java @@ -58,6 +58,7 @@ import com.facebook.presto.server.SimpleHttpResponseHandler; import com.facebook.presto.server.TaskUpdateRequest; import com.facebook.presto.server.smile.BaseResponse; +import com.facebook.presto.server.thrift.ThriftHttpResponseHandler; import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.SplitWeight; import com.facebook.presto.spi.plan.PlanNode; @@ -113,6 +114,8 @@ import static com.facebook.presto.server.RequestErrorTracker.isExpectedError; import static com.facebook.presto.server.RequestErrorTracker.taskRequestErrorTracker; import static com.facebook.presto.server.RequestHelpers.setContentTypeHeaders; +import static com.facebook.presto.server.RequestHelpers.setTaskInfoAcceptTypeHeaders; +import static com.facebook.presto.server.RequestHelpers.setTaskUpdateRequestContentTypeHeaders; import static com.facebook.presto.server.TaskResourceUtils.convertFromThriftTaskInfo; import static com.facebook.presto.server.smile.AdaptingJsonResponseHandler.createAdaptingJsonResponseHandler; import static com.facebook.presto.server.smile.FullSmileResponseHandler.createFullSmileResponseHandler; @@ -203,6 +206,7 @@ public final class HttpRemoteTaskWithEventLoop //Json codec required for TaskUpdateRequest endpoint which uses JSON and returns a TaskInfo private final Codec taskInfoJsonCodec; private final Codec taskUpdateRequestCodec; + private final Codec taskInfoResponseCodec; private final Codec planFragmentCodec; private final RequestErrorTracker updateErrorTracker; @@ -218,6 +222,8 @@ public final class HttpRemoteTaskWithEventLoop private final boolean binaryTransportEnabled; private final boolean thriftTransportEnabled; private final boolean taskInfoThriftTransportEnabled; + private final boolean taskUpdateRequestThriftSerdeEnabled; + private final boolean taskInfoResponseThriftSerdeEnabled; private final Protocol thriftProtocol; private final ConnectorTypeSerdeManager connectorTypeSerdeManager; private final HandleResolver handleResolver; @@ -253,6 +259,7 @@ public static HttpRemoteTaskWithEventLoop createHttpRemoteTaskWithEventLoop( Codec taskInfoCodec, Codec taskInfoJsonCodec, Codec taskUpdateRequestCodec, + Codec taskInfoResponseCodec, Codec planFragmentCodec, Codec metadataUpdatesCodec, NodeStatsTracker nodeStatsTracker, @@ -260,6 +267,8 @@ public static HttpRemoteTaskWithEventLoop createHttpRemoteTaskWithEventLoop( boolean binaryTransportEnabled, boolean thriftTransportEnabled, boolean taskInfoThriftTransportEnabled, + boolean taskUpdateRequestThriftTransportEnabled, + boolean taskInfoResponseThriftTransportEnabled, Protocol thriftProtocol, TableWriteInfo tableWriteInfo, int maxTaskUpdateSizeInBytes, @@ -290,6 +299,7 @@ public static HttpRemoteTaskWithEventLoop createHttpRemoteTaskWithEventLoop( taskInfoCodec, taskInfoJsonCodec, taskUpdateRequestCodec, + taskInfoResponseCodec, planFragmentCodec, metadataUpdatesCodec, nodeStatsTracker, @@ -297,6 +307,8 @@ public static HttpRemoteTaskWithEventLoop createHttpRemoteTaskWithEventLoop( binaryTransportEnabled, thriftTransportEnabled, taskInfoThriftTransportEnabled, + taskUpdateRequestThriftTransportEnabled, + taskInfoResponseThriftTransportEnabled, thriftProtocol, tableWriteInfo, maxTaskUpdateSizeInBytes, @@ -330,6 +342,7 @@ private HttpRemoteTaskWithEventLoop(Session session, Codec taskInfoCodec, Codec taskInfoJsonCodec, Codec taskUpdateRequestCodec, + Codec taskInfoResponseCodec, Codec planFragmentCodec, Codec metadataUpdatesCodec, NodeStatsTracker nodeStatsTracker, @@ -337,6 +350,8 @@ private HttpRemoteTaskWithEventLoop(Session session, boolean binaryTransportEnabled, boolean thriftTransportEnabled, boolean taskInfoThriftTransportEnabled, + boolean taskUpdateRequestThriftSerdeEnabled, + boolean taskInfoResponseThriftSerdeEnabled, Protocol thriftProtocol, TableWriteInfo tableWriteInfo, int maxTaskUpdateSizeInBytes, @@ -388,6 +403,7 @@ private HttpRemoteTaskWithEventLoop(Session session, this.taskInfoCodec = taskInfoCodec; this.taskInfoJsonCodec = taskInfoJsonCodec; this.taskUpdateRequestCodec = taskUpdateRequestCodec; + this.taskInfoResponseCodec = taskInfoResponseCodec; this.planFragmentCodec = planFragmentCodec; this.updateErrorTracker = taskRequestErrorTracker(taskId, location, maxErrorDuration, taskEventLoop, "updating task"); this.nodeStatsTracker = requireNonNull(nodeStatsTracker, "nodeStatsTracker is null"); @@ -396,6 +412,8 @@ private HttpRemoteTaskWithEventLoop(Session session, this.binaryTransportEnabled = binaryTransportEnabled; this.thriftTransportEnabled = thriftTransportEnabled; this.taskInfoThriftTransportEnabled = taskInfoThriftTransportEnabled; + this.taskUpdateRequestThriftSerdeEnabled = taskUpdateRequestThriftSerdeEnabled; + this.taskInfoResponseThriftSerdeEnabled = taskInfoResponseThriftSerdeEnabled; this.thriftProtocol = thriftProtocol; this.connectorTypeSerdeManager = connectorTypeSerdeManager; this.handleResolver = handleResolver; @@ -989,47 +1007,51 @@ private void sendUpdate() outputBuffers, writeInfo); long serializeStartCpuTimeNanos = THREAD_MX_BEAN.getCurrentThreadCpuTime(); - byte[] taskUpdateRequestJson = taskUpdateRequestCodec.toBytes(updateRequest); - schedulerStatsTracker.recordTaskUpdateSerializedCpuTime(THREAD_MX_BEAN.getCurrentThreadCpuTime() - serializeStartCpuTimeNanos); + Request.Builder requestBuilder; + HttpUriBuilder uriBuilder = getHttpUriBuilder(taskStatus); - taskUpdateRequestSize.add(taskUpdateRequestJson.length); + byte[] taskUpdateRequestBytes = taskUpdateRequestCodec.toBytes(updateRequest); + schedulerStatsTracker.recordTaskUpdateSerializedCpuTime(THREAD_MX_BEAN.getCurrentThreadCpuTime() - serializeStartCpuTimeNanos); - if (taskUpdateRequestJson.length > maxTaskUpdateSizeInBytes) { - failTask(new PrestoException(EXCEEDED_TASK_UPDATE_SIZE_LIMIT, getExceededTaskUpdateSizeMessage(taskUpdateRequestJson))); - return; + if (taskUpdateRequestBytes.length > maxTaskUpdateSizeInBytes) { + failTask(new PrestoException(EXCEEDED_TASK_UPDATE_SIZE_LIMIT, getExceededTaskUpdateSizeMessage(taskUpdateRequestBytes))); } if (taskUpdateSizeTrackingEnabled) { - taskUpdateRequestSize.add(taskUpdateRequestJson.length); + taskUpdateRequestSize.add(taskUpdateRequestBytes.length); if (fragment.isPresent()) { - stats.updateWithPlanSize(taskUpdateRequestJson.length); + stats.updateWithPlanSize(taskUpdateRequestBytes.length); } else { if (ThreadLocalRandom.current().nextDouble() < UPDATE_WITHOUT_PLAN_STATS_SAMPLE_RATE) { // This is to keep track of the task update size even when the plan fragment is NOT present - stats.updateWithoutPlanSize(taskUpdateRequestJson.length); + stats.updateWithoutPlanSize(taskUpdateRequestBytes.length); } } } - HttpUriBuilder uriBuilder = getHttpUriBuilder(taskStatus); - Request request = setContentTypeHeaders(binaryTransportEnabled, preparePost()) + requestBuilder = setTaskUpdateRequestContentTypeHeaders(taskUpdateRequestThriftSerdeEnabled, binaryTransportEnabled, preparePost()); + requestBuilder = setTaskInfoAcceptTypeHeaders(taskInfoResponseThriftSerdeEnabled, binaryTransportEnabled, requestBuilder); + Request request = requestBuilder .setUri(uriBuilder.build()) - .setBodyGenerator(createStaticBodyGenerator(taskUpdateRequestJson)) + .setBodyGenerator(createStaticBodyGenerator(taskUpdateRequestBytes)) .build(); ResponseHandler responseHandler; - if (binaryTransportEnabled) { - responseHandler = createFullSmileResponseHandler((SmileCodec) taskInfoCodec); + if (taskInfoResponseThriftSerdeEnabled) { + responseHandler = new ThriftResponseHandler(unwrapThriftCodec(taskInfoResponseCodec)); + } + else if (binaryTransportEnabled) { + responseHandler = createFullSmileResponseHandler((SmileCodec) taskInfoResponseCodec); } else { - responseHandler = createAdaptingJsonResponseHandler((JsonCodec) taskInfoJsonCodec); + responseHandler = createAdaptingJsonResponseHandler((JsonCodec) taskInfoResponseCodec); } updateErrorTracker.startRequest(); - ListenableFuture> future = httpClient.executeAsync(request, responseHandler); + ListenableFuture future = httpClient.executeAsync(request, responseHandler); currentRequest = future; currentRequestStartNanos = System.nanoTime(); if (!taskUpdateTimeline.isEmpty()) { @@ -1040,10 +1062,18 @@ private void sendUpdate() // and does so without grabbing the instance lock. needsUpdate = false; - Futures.addCallback( - future, - new SimpleHttpResponseHandler<>(new UpdateResponseHandler(sources), request.getUri(), stats.getHttpResponseStats(), REMOTE_TASK_ERROR), - taskEventLoop); + if (taskInfoResponseThriftSerdeEnabled) { + Futures.addCallback( + (ListenableFuture>) future, + new ThriftHttpResponseHandler<>(new UpdateResponseHandler(sources), request.getUri(), stats.getHttpResponseStats(), REMOTE_TASK_ERROR), + taskEventLoop); + } + else { + Futures.addCallback( + (ListenableFuture>) future, + new SimpleHttpResponseHandler<>(new UpdateResponseHandler(sources), request.getUri(), stats.getHttpResponseStats(), REMOTE_TASK_ERROR), + taskEventLoop); + } }, "sendUpdate"); } diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/ThriftCodecWrapper.java b/presto-main/src/main/java/com/facebook/presto/server/thrift/ThriftCodecWrapper.java similarity index 63% rename from presto-main-base/src/main/java/com/facebook/presto/server/thrift/ThriftCodecWrapper.java rename to presto-main/src/main/java/com/facebook/presto/server/thrift/ThriftCodecWrapper.java index c4109c41db70f..e6f85644e2770 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/server/thrift/ThriftCodecWrapper.java +++ b/presto-main/src/main/java/com/facebook/presto/server/thrift/ThriftCodecWrapper.java @@ -13,12 +13,20 @@ */ package com.facebook.presto.server.thrift; +import com.facebook.airlift.http.client.thrift.ThriftProtocolException; +import com.facebook.airlift.http.client.thrift.ThriftProtocolUtils; import com.facebook.airlift.json.Codec; import com.facebook.drift.codec.ThriftCodec; +import com.facebook.drift.transport.netty.codec.Protocol; +import com.facebook.presto.spi.PrestoException; +import io.airlift.slice.DynamicSliceOutput; +import io.airlift.slice.SliceOutput; +import io.airlift.slice.Slices; import java.io.InputStream; import java.io.OutputStream; +import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.google.common.base.Verify.verify; import static java.util.Objects.requireNonNull; @@ -46,13 +54,25 @@ public static ThriftCodec unwrapThriftCodec(Codec codec) @Override public byte[] toBytes(T instance) { - throw new UnsupportedOperationException("Operation not supported"); + try { + SliceOutput sliceOutput = new DynamicSliceOutput(1024); + ThriftProtocolUtils.write(instance, thriftCodec, Protocol.BINARY, sliceOutput); + return sliceOutput.slice().getBytes(); + } + catch (ThriftProtocolException e) { + throw new PrestoException(NOT_SUPPORTED, "Can not serialize instance to bytes", e); + } } @Override public T fromBytes(byte[] bytes) { - throw new UnsupportedOperationException("Operation not supported"); + try { + return ThriftProtocolUtils.read(thriftCodec, Protocol.BINARY, Slices.wrappedBuffer(bytes).getInput()); + } + catch (ThriftProtocolException e) { + throw new PrestoException(NOT_SUPPORTED, "Can not deserialize instance from bytes", e); + } } @Override diff --git a/presto-main/src/main/java/com/facebook/presto/server/thrift/ThriftTaskUpdateRequestBodyReader.java b/presto-main/src/main/java/com/facebook/presto/server/thrift/ThriftTaskUpdateRequestBodyReader.java new file mode 100644 index 0000000000000..9bfe517a4feef --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/server/thrift/ThriftTaskUpdateRequestBodyReader.java @@ -0,0 +1,63 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.server.thrift; + +import com.facebook.airlift.json.Codec; +import com.facebook.drift.codec.ThriftCodec; +import com.facebook.presto.server.TaskUpdateRequest; +import com.google.common.io.ByteStreams; + +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.ext.MessageBodyReader; +import javax.ws.rs.ext.Provider; + +import java.io.IOException; +import java.io.InputStream; +import java.lang.annotation.Annotation; +import java.lang.reflect.Type; + +import static com.facebook.airlift.http.client.thrift.ThriftRequestUtils.APPLICATION_THRIFT_BINARY; +import static com.facebook.presto.server.thrift.ThriftCodecWrapper.wrapThriftCodec; +import static java.util.Objects.requireNonNull; + +@Provider +@Consumes(APPLICATION_THRIFT_BINARY) +public class ThriftTaskUpdateRequestBodyReader + implements MessageBodyReader +{ + private final Codec codec; + + @Inject + public ThriftTaskUpdateRequestBodyReader(ThriftCodec thriftCodec) + { + this.codec = wrapThriftCodec(requireNonNull(thriftCodec, "thriftCodec is null")); + } + + @Override + public boolean isReadable(Class aClass, Type type, Annotation[] annotations, MediaType mediaType) + { + return type.equals(TaskUpdateRequest.class) && mediaType.isCompatible(MediaType.valueOf(APPLICATION_THRIFT_BINARY)); + } + + @Override + public TaskUpdateRequest readFrom(Class aClass, Type type, Annotation[] annotations, MediaType mediaType, MultivaluedMap multivaluedMap, InputStream inputStream) + throws IOException, WebApplicationException + { + return codec.fromBytes(ByteStreams.toByteArray(inputStream)); + } +} diff --git a/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTask.java b/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTask.java index 6c93591583411..b703ec779f460 100644 --- a/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTask.java +++ b/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTask.java @@ -27,6 +27,7 @@ import com.facebook.drift.codec.utils.DataSizeToBytesThriftCodec; import com.facebook.drift.codec.utils.DurationToMillisThriftCodec; import com.facebook.drift.codec.utils.JodaDateTimeToEpochMillisThriftCodec; +import com.facebook.drift.codec.utils.LocaleToLanguageTagCodec; import com.facebook.presto.SessionTestUtils; import com.facebook.presto.client.NodeVersion; import com.facebook.presto.common.ErrorCode; @@ -58,6 +59,9 @@ import com.facebook.presto.server.ConnectorMetadataUpdateHandleJsonSerde; import com.facebook.presto.server.InternalCommunicationConfig; import com.facebook.presto.server.TaskUpdateRequest; +import com.facebook.presto.server.thrift.MetadataUpdatesCodec; +import com.facebook.presto.server.thrift.SplitCodec; +import com.facebook.presto.server.thrift.TableWriteInfoCodec; import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.plan.PlanNodeId; import com.facebook.presto.spi.relation.VariableReferenceExpression; @@ -371,10 +375,17 @@ public void configure(Binder binder) jsonCodecBinder(binder).bindJsonCodec(TaskUpdateRequest.class); jsonCodecBinder(binder).bindJsonCodec(PlanFragment.class); jsonCodecBinder(binder).bindJsonCodec(MetadataUpdates.class); + jsonCodecBinder(binder).bindJsonCodec(TableWriteInfo.class); + jsonCodecBinder(binder).bindJsonCodec(Split.class); jsonBinder(binder).addKeySerializerBinding(VariableReferenceExpression.class).to(Serialization.VariableReferenceExpressionSerializer.class); jsonBinder(binder).addKeyDeserializerBinding(VariableReferenceExpression.class).to(Serialization.VariableReferenceExpressionDeserializer.class); thriftCodecBinder(binder).bindThriftCodec(TaskStatus.class); thriftCodecBinder(binder).bindThriftCodec(TaskInfo.class); + thriftCodecBinder(binder).bindThriftCodec(TaskUpdateRequest.class); + thriftCodecBinder(binder).bindCustomThriftCodec(MetadataUpdatesCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(SplitCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(TableWriteInfoCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(LocaleToLanguageTagCodec.class); thriftCodecBinder(binder).bindCustomThriftCodec(JodaDateTimeToEpochMillisThriftCodec.class); thriftCodecBinder(binder).bindCustomThriftCodec(DurationToMillisThriftCodec.class); thriftCodecBinder(binder).bindCustomThriftCodec(DataSizeToBytesThriftCodec.class); @@ -392,6 +403,7 @@ private HttpRemoteTaskFactory createHttpRemoteTaskFactory( SmileCodec taskInfoSmileCodec, JsonCodec taskUpdateRequestJsonCodec, SmileCodec taskUpdateRequestSmileCodec, + ThriftCodec taskUpdateRequestThriftCodec, JsonCodec planFragmentJsonCodec, SmileCodec planFragmentSmileCodec, JsonCodec metadataUpdatesJsonCodec, @@ -413,6 +425,7 @@ private HttpRemoteTaskFactory createHttpRemoteTaskFactory( taskInfoThriftCodec, taskUpdateRequestJsonCodec, taskUpdateRequestSmileCodec, + taskUpdateRequestThriftCodec, planFragmentJsonCodec, planFragmentSmileCodec, metadataUpdatesJsonCodec, diff --git a/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTaskWithEventLoop.java b/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTaskWithEventLoop.java index d301958eec80c..ca08715f8c1b4 100644 --- a/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTaskWithEventLoop.java +++ b/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTaskWithEventLoop.java @@ -27,6 +27,7 @@ import com.facebook.drift.codec.utils.DataSizeToBytesThriftCodec; import com.facebook.drift.codec.utils.DurationToMillisThriftCodec; import com.facebook.drift.codec.utils.JodaDateTimeToEpochMillisThriftCodec; +import com.facebook.drift.codec.utils.LocaleToLanguageTagCodec; import com.facebook.presto.client.NodeVersion; import com.facebook.presto.common.ErrorCode; import com.facebook.presto.common.type.Type; @@ -56,6 +57,9 @@ import com.facebook.presto.server.ConnectorMetadataUpdateHandleJsonSerde; import com.facebook.presto.server.InternalCommunicationConfig; import com.facebook.presto.server.TaskUpdateRequest; +import com.facebook.presto.server.thrift.MetadataUpdatesCodec; +import com.facebook.presto.server.thrift.SplitCodec; +import com.facebook.presto.server.thrift.TableWriteInfoCodec; import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.plan.PlanNodeId; import com.facebook.presto.spi.relation.VariableReferenceExpression; @@ -379,10 +383,17 @@ public void configure(Binder binder) jsonCodecBinder(binder).bindJsonCodec(TaskUpdateRequest.class); jsonCodecBinder(binder).bindJsonCodec(PlanFragment.class); jsonCodecBinder(binder).bindJsonCodec(MetadataUpdates.class); + jsonCodecBinder(binder).bindJsonCodec(TableWriteInfo.class); + jsonCodecBinder(binder).bindJsonCodec(Split.class); jsonBinder(binder).addKeySerializerBinding(VariableReferenceExpression.class).to(Serialization.VariableReferenceExpressionSerializer.class); jsonBinder(binder).addKeyDeserializerBinding(VariableReferenceExpression.class).to(Serialization.VariableReferenceExpressionDeserializer.class); thriftCodecBinder(binder).bindThriftCodec(TaskStatus.class); thriftCodecBinder(binder).bindThriftCodec(TaskInfo.class); + thriftCodecBinder(binder).bindThriftCodec(TaskUpdateRequest.class); + thriftCodecBinder(binder).bindCustomThriftCodec(MetadataUpdatesCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(SplitCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(TableWriteInfoCodec.class); + thriftCodecBinder(binder).bindCustomThriftCodec(LocaleToLanguageTagCodec.class); thriftCodecBinder(binder).bindCustomThriftCodec(JodaDateTimeToEpochMillisThriftCodec.class); thriftCodecBinder(binder).bindCustomThriftCodec(DurationToMillisThriftCodec.class); thriftCodecBinder(binder).bindCustomThriftCodec(DataSizeToBytesThriftCodec.class); @@ -400,6 +411,7 @@ private HttpRemoteTaskFactory createHttpRemoteTaskFactory( SmileCodec taskInfoSmileCodec, JsonCodec taskUpdateRequestJsonCodec, SmileCodec taskUpdateRequestSmileCodec, + ThriftCodec taskUpdateRequestThriftCodec, JsonCodec planFragmentJsonCodec, SmileCodec planFragmentSmileCodec, JsonCodec metadataUpdatesJsonCodec, @@ -421,6 +433,7 @@ private HttpRemoteTaskFactory createHttpRemoteTaskFactory( taskInfoThriftCodec, taskUpdateRequestJsonCodec, taskUpdateRequestSmileCodec, + taskUpdateRequestThriftCodec, planFragmentJsonCodec, planFragmentSmileCodec, metadataUpdatesJsonCodec, diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorSplit.java b/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorSplit.java index 0048a7919786e..9fc16316f5ab0 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorSplit.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorSplit.java @@ -14,6 +14,7 @@ package com.facebook.presto.spi; import com.facebook.presto.spi.schedule.NodeSelectionStrategy; +import com.fasterxml.jackson.annotation.JsonIgnore; import java.util.List; import java.util.Map; @@ -43,6 +44,7 @@ public interface ConnectorSplit *

Instead use {@link #getInfoMap()} method which returns a

{@code Map}
*/ @Deprecated + @JsonIgnore Object getInfo(); default Map getInfoMap() diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/function/FunctionKind.java b/presto-spi/src/main/java/com/facebook/presto/spi/function/FunctionKind.java index a554a1c2248f7..26ccab46ab46b 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/function/FunctionKind.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/function/FunctionKind.java @@ -19,9 +19,9 @@ @ThriftEnum public enum FunctionKind { - SCALAR(1), - AGGREGATE(2), - WINDOW(3); + SCALAR(0), + AGGREGATE(1), + WINDOW(2); private final int value; diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/function/RoutineCharacteristics.java b/presto-spi/src/main/java/com/facebook/presto/spi/function/RoutineCharacteristics.java index 7a5b1a159be3b..66bf6969519d4 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/function/RoutineCharacteristics.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/function/RoutineCharacteristics.java @@ -88,8 +88,8 @@ public String toString() @ThriftEnum public enum Determinism { - DETERMINISTIC(1), - NOT_DETERMINISTIC(2); + DETERMINISTIC(0), + NOT_DETERMINISTIC(1); private final int value; private Determinism(int value) @@ -107,8 +107,8 @@ public int getValue() @ThriftEnum public enum NullCallClause { - RETURNS_NULL_ON_NULL_INPUT(1), - CALLED_ON_NULL_INPUT(2); + RETURNS_NULL_ON_NULL_INPUT(0), + CALLED_ON_NULL_INPUT(1); private final int value; private NullCallClause(int value) diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/function/SqlFunctionId.java b/presto-spi/src/main/java/com/facebook/presto/spi/function/SqlFunctionId.java index 9e103fd419023..2f0e18e3c5e2b 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/function/SqlFunctionId.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/function/SqlFunctionId.java @@ -40,19 +40,22 @@ public class SqlFunctionId private final List argumentTypes; @ThriftConstructor + public SqlFunctionId(String signature) + { + this(parseSqlFunctionId(signature).getFunctionName(), parseSqlFunctionId(signature).getArgumentTypes()); + } + public SqlFunctionId(QualifiedObjectName functionName, List argumentTypes) { this.functionName = requireNonNull(functionName, "functionName is null"); this.argumentTypes = requireNonNull(argumentTypes, "argumentTypes is null"); } - @ThriftField(1) public QualifiedObjectName getFunctionName() { return functionName; } - @ThriftField(2) public List getArgumentTypes() { return argumentTypes; @@ -93,6 +96,7 @@ public String toString() } @JsonValue + @ThriftField(value = 1, name = "signature") public String toJsonString() { return format("%s;%s", functionName.toString(), argumentTypes.stream().map(TypeSignature::toString).collect(joining(";"))); diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/function/SqlInvokedFunction.java b/presto-spi/src/main/java/com/facebook/presto/spi/function/SqlInvokedFunction.java index 2b09e511dcd67..42fb3607f5e09 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/function/SqlInvokedFunction.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/function/SqlInvokedFunction.java @@ -208,7 +208,7 @@ public SqlInvokedFunction withVersion(String version) } @Override - @ThriftField(1) + @ThriftField(6) @JsonProperty public Signature getSignature() { @@ -241,28 +241,28 @@ public String getDescription() return description; } - @ThriftField(3) + @ThriftField(1) @JsonProperty public List getParameters() { return parameters; } - @ThriftField(4) + @ThriftField(3) @JsonProperty public RoutineCharacteristics getRoutineCharacteristics() { return routineCharacteristics; } - @ThriftField(5) + @ThriftField(4) @JsonProperty public String getBody() { return body; } - @ThriftField(6) + @ThriftField(7) @JsonProperty public SqlFunctionId getFunctionId() { @@ -284,6 +284,7 @@ public FunctionVersion getVersion() return functionVersion; } + @ThriftField(5) public boolean getVariableArity() { return variableArity; diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/security/SelectedRole.java b/presto-spi/src/main/java/com/facebook/presto/spi/security/SelectedRole.java index d51d04e9e4d9a..aef7d0b93765f 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/security/SelectedRole.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/security/SelectedRole.java @@ -34,7 +34,7 @@ public class SelectedRole @ThriftEnum public enum Type { - ROLE(1), ALL(2), NONE(3); + ROLE(0), ALL(1), NONE(2); private final int value;