Skip to content

Commit 3d99a78

Browse files
authored
feat(connector): Add support for custom connector-provided serialization codecs (#26257)
## Description Add support for custom connector-provided serialization codecs ## Motivation and Context This will allow plugin connectors to be written in C++. RFC: prestodb/rfcs#49 End to end changes migrating TPCH to the new framework: #26026 ## Impact No immediate impact ## Test Plan Included tests ## Contributor checklist - [ ] Please make sure your submission complies with our [contributing guide](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md), in particular [code style](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#code-style) and [commit standards](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#commit-standards). - [ ] PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced. - [ ] Documented new properties (with its default value), SQL syntax, functions, or other functionality. - [ ] If release notes are required, they follow the [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines). - [ ] Adequate tests were added if applicable. - [ ] CI passed. ## Release Notes ``` == NO RELEASE NOTE == ```
1 parent 7c80949 commit 3d99a78

File tree

38 files changed

+2751
-42
lines changed

38 files changed

+2751
-42
lines changed

presto-delta/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,11 @@
342342
<artifactId>jakarta.servlet-api</artifactId>
343343
<scope>test</scope>
344344
</dependency>
345+
346+
<dependency>
347+
<groupId>com.facebook.airlift.drift</groupId>
348+
<artifactId>drift-codec</artifactId>
349+
</dependency>
345350
</dependencies>
346351

347352
<build>
@@ -353,6 +358,7 @@
353358
<ignoredDependencies>
354359
<ignoredDependency>org.scala-lang:scala-library:jar</ignoredDependency>
355360
<ignoredDependency>commons-io:commons-io:jar</ignoredDependency>
361+
<ignoredDependency>com.facebook.airlift.drift:drift-codec:jar</ignoredDependency>
356362
</ignoredDependencies>
357363
</configuration>
358364
</plugin>

presto-delta/src/test/java/com/facebook/presto/delta/TestDeltaTableHandle.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.facebook.airlift.bootstrap.Bootstrap;
1717
import com.facebook.airlift.json.JsonCodec;
1818
import com.facebook.airlift.json.JsonModule;
19+
import com.facebook.drift.codec.guice.ThriftCodecModule;
1920
import com.facebook.presto.block.BlockJsonSerde;
2021
import com.facebook.presto.common.block.Block;
2122
import com.facebook.presto.common.block.BlockEncoding;
@@ -24,6 +25,7 @@
2425
import com.facebook.presto.common.type.StandardTypes;
2526
import com.facebook.presto.common.type.Type;
2627
import com.facebook.presto.common.type.TypeManager;
28+
import com.facebook.presto.connector.ConnectorManager;
2729
import com.facebook.presto.metadata.FunctionAndTypeManager;
2830
import com.facebook.presto.metadata.HandleJsonModule;
2931
import com.facebook.presto.metadata.HandleResolver;
@@ -92,6 +94,8 @@ private JsonCodec<DeltaTableHandle> getJsonCodec()
9294
Module module = binder -> {
9395
binder.install(new JsonModule());
9496
binder.install(new HandleJsonModule());
97+
binder.bind(ConnectorManager.class).toProvider(() -> null).in(Scopes.SINGLETON);
98+
binder.install(new ThriftCodecModule());
9599
configBinder(binder).bindConfig(FeaturesConfig.class);
96100
FunctionAndTypeManager functionAndTypeManager = createTestFunctionAndTypeManager();
97101
binder.bind(TypeManager.class).toInstance(functionAndTypeManager);

presto-docs/src/main/sphinx/admin/properties.rst

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,24 @@ shared across all of the partitioned consumers. Increasing this value may
571571
improve network throughput for data transferred between stages if the
572572
network has high latency or if there are many nodes in the cluster.
573573

574+
``use-connector-provided-serialization-codecs``
575+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
576+
577+
* **Type:** ``boolean``
578+
* **Default value:** ``false``
579+
580+
Enables the use of custom connector-provided serialization codecs for handles.
581+
This feature allows connectors to use their own serialization format for
582+
handle objects (such as table handles, column handles, and splits) instead
583+
of standard JSON serialization.
584+
585+
When enabled, connectors that provide a ``ConnectorCodecProvider`` with
586+
appropriate codecs will have their handles serialized using custom binary
587+
formats, which are then Base64-encoded for transport. Connectors without
588+
codec support automatically fall back to standard JSON serialization.
589+
Internal Presto handles (prefixed with ``$``) always use JSON serialization
590+
regardless of this setting.
591+
574592
.. _task-properties:
575593

576594
Task Properties

presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplit.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@
1616
import com.facebook.airlift.bootstrap.Bootstrap;
1717
import com.facebook.airlift.json.JsonCodec;
1818
import com.facebook.airlift.json.JsonModule;
19+
import com.facebook.drift.codec.guice.ThriftCodecModule;
1920
import com.facebook.presto.block.BlockJsonSerde;
2021
import com.facebook.presto.common.block.Block;
2122
import com.facebook.presto.common.block.BlockEncoding;
2223
import com.facebook.presto.common.block.BlockEncodingManager;
2324
import com.facebook.presto.common.block.BlockEncodingSerde;
2425
import com.facebook.presto.common.type.Type;
2526
import com.facebook.presto.common.type.TypeManager;
27+
import com.facebook.presto.connector.ConnectorManager;
2628
import com.facebook.presto.hive.metastore.Column;
2729
import com.facebook.presto.hive.metastore.Storage;
2830
import com.facebook.presto.hive.metastore.StorageFormat;
@@ -153,8 +155,10 @@ private JsonCodec<HiveSplit> getJsonCodec()
153155
{
154156
Module module = binder -> {
155157
binder.install(new JsonModule());
158+
binder.install(new ThriftCodecModule());
156159
binder.install(new HandleJsonModule());
157160
configBinder(binder).bindConfig(FeaturesConfig.class);
161+
binder.bind(ConnectorManager.class).toProvider(() -> null);
158162
FunctionAndTypeManager functionAndTypeManager = createTestFunctionAndTypeManager();
159163
binder.bind(TypeManager.class).toInstance(functionAndTypeManager);
160164
jsonBinder(binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class);

presto-main-base/src/main/java/com/facebook/presto/connector/ConnectorManager.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,16 @@ private Connector createConnector(ConnectorId connectorId, ConnectorFactory fact
393393
}
394394
}
395395

396+
public Optional<ConnectorCodecProvider> getConnectorCodecProvider(ConnectorId connectorId)
397+
{
398+
requireNonNull(connectorId, "connectorId is null");
399+
MaterializedConnector materializedConnector = connectors.get(connectorId);
400+
if (materializedConnector == null) {
401+
return Optional.empty();
402+
}
403+
return materializedConnector.getConnectorCodecProvider();
404+
}
405+
396406
private static class MaterializedConnector
397407
{
398408
private final ConnectorId connectorId;

presto-main-base/src/main/java/com/facebook/presto/index/IndexHandleJacksonModule.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,47 @@
1313
*/
1414
package com.facebook.presto.index;
1515

16+
import com.facebook.presto.connector.ConnectorManager;
1617
import com.facebook.presto.metadata.AbstractTypedJacksonModule;
1718
import com.facebook.presto.metadata.HandleResolver;
19+
import com.facebook.presto.spi.ConnectorCodec;
20+
import com.facebook.presto.spi.ConnectorId;
1821
import com.facebook.presto.spi.ConnectorIndexHandle;
22+
import com.facebook.presto.spi.connector.ConnectorCodecProvider;
23+
import com.facebook.presto.sql.analyzer.FeaturesConfig;
1924
import jakarta.inject.Inject;
25+
import jakarta.inject.Provider;
26+
27+
import java.util.Optional;
28+
import java.util.function.Function;
2029

2130
public class IndexHandleJacksonModule
2231
extends AbstractTypedJacksonModule<ConnectorIndexHandle>
2332
{
2433
@Inject
25-
public IndexHandleJacksonModule(HandleResolver handleResolver)
34+
public IndexHandleJacksonModule(
35+
HandleResolver handleResolver,
36+
Provider<ConnectorManager> connectorManagerProvider,
37+
FeaturesConfig featuresConfig)
38+
{
39+
super(ConnectorIndexHandle.class,
40+
handleResolver::getId,
41+
handleResolver::getIndexHandleClass,
42+
featuresConfig.isUseConnectorProvidedSerializationCodecs(),
43+
connectorId -> connectorManagerProvider.get()
44+
.getConnectorCodecProvider(connectorId)
45+
.flatMap(ConnectorCodecProvider::getConnectorIndexHandleCodec));
46+
}
47+
48+
public IndexHandleJacksonModule(
49+
HandleResolver handleResolver,
50+
FeaturesConfig featuresConfig,
51+
Function<ConnectorId, Optional<ConnectorCodec<ConnectorIndexHandle>>> codecExtractor)
2652
{
2753
super(ConnectorIndexHandle.class,
2854
handleResolver::getId,
29-
handleResolver::getIndexHandleClass);
55+
handleResolver::getIndexHandleClass,
56+
featuresConfig.isUseConnectorProvidedSerializationCodecs(),
57+
codecExtractor);
3058
}
3159
}

presto-main-base/src/main/java/com/facebook/presto/metadata/AbstractTypedJacksonModule.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
*/
1414
package com.facebook.presto.metadata;
1515

16+
import com.facebook.presto.spi.ConnectorCodec;
17+
import com.facebook.presto.spi.ConnectorId;
1618
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
1719
import com.fasterxml.jackson.core.JsonGenerator;
1820
import com.fasterxml.jackson.core.JsonParser;
@@ -38,6 +40,7 @@
3840
import com.google.common.cache.CacheBuilder;
3941

4042
import java.io.IOException;
43+
import java.util.Optional;
4144
import java.util.concurrent.ExecutionException;
4245
import java.util.function.Function;
4346

@@ -49,18 +52,38 @@ public abstract class AbstractTypedJacksonModule<T>
4952
extends SimpleModule
5053
{
5154
private static final String TYPE_PROPERTY = "@type";
55+
private static final String DATA_PROPERTY = "customSerializedValue";
5256

5357
protected AbstractTypedJacksonModule(
5458
Class<T> baseClass,
5559
Function<T, String> nameResolver,
56-
Function<String, Class<? extends T>> classResolver)
60+
Function<String, Class<? extends T>> classResolver,
61+
boolean binarySerializationEnabled,
62+
Function<ConnectorId, Optional<ConnectorCodec<T>>> codecExtractor)
5763
{
5864
super(baseClass.getSimpleName() + "Module", Version.unknownVersion());
5965

60-
TypeIdResolver typeResolver = new InternalTypeResolver<>(nameResolver, classResolver);
66+
requireNonNull(baseClass, "baseClass is null");
67+
requireNonNull(nameResolver, "nameResolver is null");
68+
requireNonNull(classResolver, "classResolver is null");
69+
requireNonNull(codecExtractor, "codecExtractor is null");
6170

62-
addSerializer(baseClass, new InternalTypeSerializer<>(baseClass, typeResolver));
63-
addDeserializer(baseClass, new InternalTypeDeserializer<>(baseClass, typeResolver));
71+
if (binarySerializationEnabled) {
72+
// Use codec serialization
73+
addSerializer(baseClass, new CodecSerializer<>(
74+
TYPE_PROPERTY,
75+
DATA_PROPERTY,
76+
codecExtractor,
77+
nameResolver,
78+
new InternalTypeResolver<>(nameResolver, classResolver)));
79+
addDeserializer(baseClass, new CodecDeserializer<>(TYPE_PROPERTY, DATA_PROPERTY, codecExtractor, classResolver));
80+
}
81+
else {
82+
// Use legacy typed serialization
83+
TypeIdResolver typeResolver = new InternalTypeResolver<>(nameResolver, classResolver);
84+
addSerializer(baseClass, new InternalTypeSerializer<>(baseClass, typeResolver));
85+
addDeserializer(baseClass, new InternalTypeDeserializer<>(baseClass, typeResolver));
86+
}
6487
}
6588

6689
private static class InternalTypeDeserializer<T>
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.metadata;
15+
16+
import com.facebook.presto.spi.ConnectorCodec;
17+
import com.facebook.presto.spi.ConnectorId;
18+
import com.fasterxml.jackson.core.JsonParser;
19+
import com.fasterxml.jackson.core.JsonToken;
20+
import com.fasterxml.jackson.core.TreeNode;
21+
import com.fasterxml.jackson.databind.DeserializationContext;
22+
import com.fasterxml.jackson.databind.JsonDeserializer;
23+
import com.fasterxml.jackson.databind.jsontype.TypeDeserializer;
24+
import com.fasterxml.jackson.databind.node.ObjectNode;
25+
26+
import java.io.IOException;
27+
import java.util.Base64;
28+
import java.util.Optional;
29+
import java.util.function.Function;
30+
31+
import static java.util.Objects.requireNonNull;
32+
33+
class CodecDeserializer<T>
34+
extends JsonDeserializer<T>
35+
{
36+
private final Function<String, Class<? extends T>> classResolver;
37+
private final Function<ConnectorId, Optional<ConnectorCodec<T>>> codecExtractor;
38+
private final String typePropertyName;
39+
private final String dataPropertyName;
40+
41+
public CodecDeserializer(
42+
String typePropertyName,
43+
String dataPropertyName,
44+
Function<ConnectorId, Optional<ConnectorCodec<T>>> codecExtractor,
45+
Function<String, Class<? extends T>> classResolver)
46+
{
47+
this.classResolver = requireNonNull(classResolver, "classResolver is null");
48+
this.codecExtractor = requireNonNull(codecExtractor, "codecExtractor is null");
49+
this.typePropertyName = requireNonNull(typePropertyName, "typePropertyName is null");
50+
this.dataPropertyName = requireNonNull(dataPropertyName, "dataPropertyName is null");
51+
}
52+
53+
@Override
54+
public T deserialize(JsonParser parser, DeserializationContext context)
55+
throws IOException
56+
{
57+
if (parser.getCurrentToken() == JsonToken.VALUE_NULL) {
58+
return null;
59+
}
60+
61+
if (parser.getCurrentToken() != JsonToken.START_OBJECT) {
62+
throw new IOException("Expected START_OBJECT, got " + parser.getCurrentToken());
63+
}
64+
65+
// Parse the JSON tree
66+
TreeNode tree = parser.readValueAsTree();
67+
68+
if (tree instanceof ObjectNode) {
69+
ObjectNode node = (ObjectNode) tree;
70+
71+
// Get the @type field
72+
if (!node.has(typePropertyName)) {
73+
throw new IOException("Missing " + typePropertyName + " field");
74+
}
75+
String connectorIdString = node.get(typePropertyName).asText();
76+
// Check if @data field is present (binary serialization)
77+
if (node.has(dataPropertyName)) {
78+
// Binary data is present, we need a codec to deserialize it
79+
// Special handling for internal handles like "$remote"
80+
if (!connectorIdString.startsWith("$")) {
81+
ConnectorId connectorId = new ConnectorId(connectorIdString);
82+
Optional<ConnectorCodec<T>> codec = codecExtractor.apply(connectorId);
83+
if (codec.isPresent()) {
84+
String base64Data = node.get(dataPropertyName).asText();
85+
byte[] data = Base64.getDecoder().decode(base64Data);
86+
return codec.get().deserialize(data);
87+
}
88+
}
89+
// @data field present but no codec available or internal handle
90+
throw new IOException("Type " + connectorIdString + " has binary data (" + dataPropertyName + " field) but no codec available to deserialize it");
91+
}
92+
93+
// No @data field - use standard JSON deserialization
94+
Class<? extends T> handleClass = classResolver.apply(connectorIdString);
95+
96+
// Remove the @type field and deserialize the remaining content
97+
node.remove(typePropertyName);
98+
return context.readTreeAsValue(node, handleClass);
99+
}
100+
101+
throw new IOException("Unable to deserialize");
102+
}
103+
104+
@Override
105+
public T deserializeWithType(JsonParser p, DeserializationContext ctxt,
106+
TypeDeserializer typeDeserializer)
107+
throws IOException
108+
{
109+
// We handle the type ourselves
110+
return deserialize(p, ctxt);
111+
}
112+
113+
@Override
114+
public T deserializeWithType(JsonParser p, DeserializationContext ctxt,
115+
TypeDeserializer typeDeserializer, T intoValue)
116+
throws IOException
117+
{
118+
// We handle the type ourselves
119+
return deserialize(p, ctxt);
120+
}
121+
}

0 commit comments

Comments
 (0)