Skip to content

Commit be8273e

Browse files
committed
Add RFC to add thrift support for task status/update/info
1 parent 4a28092 commit be8273e

File tree

1 file changed

+361
-0
lines changed

1 file changed

+361
-0
lines changed
Lines changed: 361 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,361 @@
1+
# **RFC0 for Presto**
2+
3+
See [CONTRIBUTING.md](CONTRIBUTING.md) for instructions on creating your RFC and the process surrounding it.
4+
5+
## [Thrift Serialization for TaskStatus, TaskInfo and TaskUpdateRequest]
6+
7+
Proposers
8+
9+
* Shang Ma
10+
* Vivian Hsu
11+
12+
## [Related Issues]
13+
14+
Related issues may include Github issues, PRs or other RFCs.
15+
- prestodb/presto#25020
16+
- prestodb/presto#25079
17+
18+
## Summary
19+
20+
Support thrift serialization of TaskStatus, TaskInfo, and TaskUpdateRequest classes for getTaskStatus and createOrUpdateTask APIs for both Java and C++ worker types to reduce CPU overhead
21+
22+
## Background
23+
24+
Presto coordinator sends updates to workers and workers respond with taskInfo. Both the taskUpdateRequest and taskInfo are currently serialized using JSON, which can be CPU intensive. And in the case of high task currency, this can become a bottleneck for the coordinator which in turn becomes a bottleneck for the whole cluster.
25+
26+
27+
### [Optional] Goals
28+
1. Support thrift serde for TaskStatus, TaskInfo, and TaskRequestUpdate classes for both Java and C++ workers
29+
2. Maintain backward compatibility with existing JSON serialization
30+
3. Use drift IDL generator to produce the IDL file and use it to generate c++ classes for native workers
31+
4. Allow multiple serialization formats to coexist
32+
5. Support future serialization formats without SPI changes
33+
6. Allow gradual migration from current design to new design
34+
35+
36+
## Proposed Implementation
37+
38+
### Disclaimer: Pseudo code and will be different in real implementation.
39+
### Current Architecture for Json Serde
40+
```java
41+
42+
// Use jackson annotation
43+
public class Split {
44+
@JsonProperty
45+
private final ConnectorSplit connectorSplit;
46+
// ... other fields and methods
47+
@JsonCreator
48+
public Split(...);
49+
}
50+
```
51+
52+
#### For Polymorphic Types e.g. ConnectorSplit
53+
```java
54+
// A handle resolver to return the correct type info in runtime
55+
public HandleResolver()
56+
{
57+
handleResolvers.put(REMOTE_CONNECTOR_ID.toString(), new MaterializedHandleResolver(new RemoteHandleResolver()));
58+
handleResolvers.put("$system", new MaterializedHandleResolver(new SystemHandleResolver()));
59+
handleResolvers.put("$info_schema", new MaterializedHandleResolver(new InformationSchemaHandleResolver()));
60+
handleResolvers.put("$empty", new MaterializedHandleResolver(new EmptySplitHandleResolver()));
61+
62+
functionHandleResolvers.put("$static", new MaterializedFunctionHandleResolver(new BuiltInFunctionNamespaceHandleResolver()));
63+
functionHandleResolvers.put("$session", new MaterializedFunctionHandleResolver(new SessionFunctionHandleResolver()));
64+
}
65+
66+
// Register correct serde methods for different types
67+
protected AbstractTypedJacksonModule(
68+
Class<T> baseClass,
69+
Function<T, String> nameResolver,
70+
Function<String, Class<? extends T>> classResolver)
71+
{
72+
super(baseClass.getSimpleName() + "Module", Version.unknownVersion());
73+
74+
TypeIdResolver typeResolver = new InternalTypeResolver<>(nameResolver, classResolver);
75+
76+
addSerializer(baseClass, new InternalTypeSerializer<>(baseClass, typeResolver));
77+
addDeserializer(baseClass, new InternalTypeDeserializer<>(baseClass, typeResolver));
78+
}
79+
80+
// A class to bind the two above together
81+
public class SplitJacksonModule
82+
extends AbstractTypedJacksonModule<ConnectorSplit>
83+
{
84+
@Inject
85+
public SplitJacksonModule(HandleResolver handleResolver)
86+
{
87+
super(ConnectorSplit.class,
88+
handleResolver::getId,
89+
handleResolver::getSplitClass);
90+
}
91+
}
92+
```
93+
94+
### Option 1: Extend Current Architecture for Thrift Serde
95+
```java
96+
// Use drift annotation
97+
public class Split {
98+
@JsonProperty
99+
@ThriftField
100+
private final ConnectorSplit connectorSplit;
101+
// ... other fields and methods
102+
@JsonCreator
103+
@ThriftConstructor
104+
public Split(...);
105+
}
106+
```
107+
108+
#### For Polymorphic Types e.g. ConnectorSplit
109+
```java
110+
// Similarly, we register correct method for a give type using existing handle resolver
111+
protected AbstractTyped**Thrift**Module(
112+
Class<T> baseClass,
113+
Function<T, String> nameResolver,
114+
Function<String, Class<? extends T>> classResolver)
115+
{
116+
TypeIdResolver typeResolver = new InternalTypeResolver<>(nameResolver, classResolver);
117+
118+
add**Thrift**Serializer(baseClass, new InternalType**Thrift**Serializer<>(baseClass, typeResolver));
119+
add**Thrift**Deserializer(baseClass, new InternalType**Thrift**Deserializer<>(baseClass, typeResolver));
120+
}
121+
```
122+
123+
#### Pros
124+
- Follow existing design in the code base
125+
126+
#### Cons
127+
- If we want to change to a different binary serde, we will have to redo the process.
128+
129+
130+
131+
### Option 2: Pluggable Serde for Polymorphic Types
132+
133+
```java
134+
import java.util.HashMap;
135+
136+
public class Split
137+
{
138+
private final ConnectorId connectorId;
139+
private final ConnectorSplit connectorSplit;
140+
// ... other fields and methods
141+
142+
public Split(...);
143+
}
144+
145+
// In presto-spi, an interface for split serde
146+
public interface Serializer<T>
147+
{
148+
String getType();
149+
150+
byte[] serialize(T object);
151+
152+
T deserialize(byte[] data);
153+
}
154+
155+
// A json serializer for hive split
156+
public class HiveSplitJsonSerializer
157+
implements Serializer<HiveSplit>
158+
{
159+
private final ObjectMapper mapper;
160+
161+
@Override
162+
public String getType()
163+
{
164+
return "json";
165+
}
166+
167+
@Override
168+
public byte[] serialize(HiveSplit split)
169+
{
170+
return mapper.writeValueAsBytes(new HiveSplitSerializable(split));
171+
}
172+
173+
@Override
174+
public HiveSplit deserialize(byte[] data)
175+
{
176+
HiveSplitSerializable serializable = mapper.readValue(data, HiveSplitSerializable.class);
177+
return serializable.toHiveSplit();
178+
}
179+
}
180+
181+
// A thrift serializer for hive split
182+
public class HiveSplitThriftSerializer
183+
implements Serializer<HiveSplit>
184+
{
185+
private final ThriftCodec<ThriftHiveSplit> codec;
186+
187+
@Override
188+
public String getType()
189+
{
190+
return "thrift";
191+
}
192+
193+
@Override
194+
public byte[] serialize(HiveSplit split)
195+
{
196+
ThriftHiveSplit thriftSplit = new ThriftHiveSplit();
197+
// ... populate fields ...
198+
return codec.serialize(thriftSplit);
199+
}
200+
201+
@Override
202+
public HiveSplit deserialize(byte[] data)
203+
{
204+
ThriftHiveSplit thriftSplit = codec.deserialize(data);
205+
return new HiveSplit(/* construct from thrift object */);
206+
}
207+
}
208+
209+
public class ConnectorManager
210+
{
211+
private synchronized void addConnectorInternal(MaterializedConnector connector)
212+
{
213+
// existing code
214+
// ...
215+
// ...
216+
connector.getSplitSeralizerProvider()
217+
.ifPresent(
218+
connectorTypeSerdeProvider ->
219+
connectorTypeSerdeManager.registerSerializer(connectorId, splitSeralizerProvider));
220+
}
221+
}
222+
223+
// Act as registry to hold the serde methods based on connector id and serde type
224+
public class ConnectorTypeSerdeManager
225+
{
226+
227+
private final Map<String, Serializer> serializers = new HashMap();
228+
229+
// Add custom serializer for a given connector type
230+
public void registerSerializer(Stirng connectorId, SerializerProvider serializerProvider) {...}
231+
public Serializer getSerializer(String connectorId) {...}
232+
}
233+
234+
// Register the correct serde method within the corresponding connector factory
235+
public class HiveMetadata implements TransactionalMetadata {
236+
237+
private final Serializer splitSerializer;
238+
239+
public HiveMetadata(...)
240+
{
241+
this.splitSerializer = new HiveSplitThriftSerializer();
242+
}
243+
244+
public Serializer getSplitSerializer()
245+
{
246+
return this.splitSerializer;
247+
}
248+
}
249+
250+
251+
// Use a custom codec for ConnectorSplit
252+
public class SplitCodec implements ThriftCodec<Split>
253+
{
254+
private final ConnectorTypeSerdeManager serdeManager;
255+
256+
@Override
257+
public void write(...)
258+
{
259+
TMemoryBuffer transport = new TMemoryBuffer(1024);
260+
TProtocolWriter writer = new TBinaryProtocol(transport);
261+
262+
// write the connector id/type
263+
writer.writeStructBegin(new TStruct("Split"));
264+
writer.writeFieldBegin(new TField("connectorId", TType.String, (short) 2));
265+
writer.writeString("hive");
266+
writer.writeFieldEnd();
267+
268+
// write the real data with pseudo code
269+
writer.writeBinary(ByteBuffer.wrap(serdeManager.getSerializer(HiveTransactionHandle.class).serialize(aHiveSplitObject)));
270+
writer.writeBinary(ByteBuffer.wrap(serdeManager.getSerializer(HiveSplit.class).serialize(aHiveSplitObject)));
271+
writer.writeBinary(aLifespan);
272+
writer.writeBinary(aSplitContext);
273+
writer.writeStructEnd();
274+
}
275+
276+
@Override
277+
public Split read(...)
278+
{
279+
// first, we read the connector id to know what type of split we are dealing with
280+
reader.read(connectorId);
281+
282+
// say, it is a hive split, then
283+
reader.read(aHiveTransactionHandle);
284+
reader.read(aHiveSplit);
285+
286+
// lastly, we read the rest
287+
reader.read(aLifespan);
288+
reader.read(aSplitContext);
289+
}
290+
291+
}
292+
293+
// Conceptually, within the byte array from serialization, we will find
294+
* String connectId; // hive or other connector
295+
* private final byte[] data; // the real data of a serialized split
296+
297+
```
298+
299+
#### Pros
300+
- Each connector can choose its own serialization format
301+
- The internal details of how serialization is handled are hidden
302+
- Connectors can evolve their serialization format independently
303+
- New connectors can adopt newer, more efficient serialization formats without waiting for the entire system to upgrade
304+
- Existing connectors can migrate to better formats without forcing other connectors to change
305+
- Performance optimizations can be made on a per-connector basis
306+
307+
#### Cons
308+
- This design is different from the existing Jackson serde flow in the code base.
309+
310+
311+
### Q & A
312+
1. What modules are involved
313+
* presto-spi
314+
* presto-main-base
315+
* presto-main
316+
* presto-hive
317+
2. Any new terminologies/concepts/SQL language additions
318+
* N/A
319+
3. Method/class/interface contracts which you deem fit for implementation.
320+
* See above code example
321+
4. Code flow using bullet points or pseudo code as applicable
322+
* See above code example
323+
5. Any new user facing metrics that can be shown on CLI or UI.
324+
* N/A
325+
326+
## [Optional] Metrics
327+
328+
How can we measure the impact of this feature?
329+
1. taskUpdateSerializedCpuNanos
330+
2. taskUpdateDeliveredWallTimeNanos
331+
3. CPU usage for task update serde
332+
333+
## [Optional] Other Approaches Considered
334+
1. See Option 1
335+
336+
## Adoption Plan
337+
338+
### Rollout
339+
* As the first step, we will use drift to annotate all primitive types within those 3 classes mentioned before while keep complicated data types, e.g. Split, MetadataUpdate, TableWriteInfo as json
340+
* During the second step, we will add thrift support for those complicated data classes using one of the two options proposed above.
341+
342+
- What impact (if any) will there be on existing users? Are there any new session parameters, configurations, SPI updates, client API updates, or SQL grammar?
343+
* the thrift serde will be disabled by default and can be enabled by a config
344+
- If we are changing behaviour how will we phase out the older behaviour?
345+
* N/A
346+
- If we need special migration tools, describe them here.
347+
* N/A
348+
- When will we remove the existing behaviour, if applicable.
349+
* N/A
350+
- How should this feature be taught to new and existing users? Basically mention if documentation changes/new blog are needed?
351+
* This feature will be documented in the Presto documentation.
352+
- What related issues do you consider out of scope for this RFC that could be addressed in the future independently of the solution that comes out of this RFC?
353+
* N/A
354+
355+
## Test Plan
356+
357+
How do we ensure the feature works as expected? Mention if any functional tests/integration tests are needed. Special mention for product-test changes. If any PoC has been done already, please mention the relevant test results here that you think will bolster your case of getting this RFC approved.
358+
359+
- A PoC for step 1 about primitive type can be found from the following 2 PRs:
360+
* prestodb/presto#25020
361+
* prestodb/presto#25079

0 commit comments

Comments
 (0)