Skip to content

Commit 5c80e80

Browse files
authored
[FLINK-37385][table] Add support for smile format
1 parent 940d746 commit 5c80e80

29 files changed

+202
-88
lines changed

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/CompiledPlan.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ public interface CompiledPlan extends Explainable<CompiledPlan>, Executable {
6565
/** Convert the plan to a JSON string representation. */
6666
String asJsonString();
6767

68+
/** Convert the plan to a Smile binary representation. */
69+
byte[] asSmileBytes();
70+
6871
/**
6972
* @see #writeToFile(File)
7073
*/

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PlanReference.java

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919
package org.apache.flink.table.api;
2020

2121
import org.apache.flink.annotation.Experimental;
22+
import org.apache.flink.annotation.PublicEvolving;
2223

2324
import java.io.File;
2425
import java.nio.file.Path;
2526
import java.nio.file.Paths;
27+
import java.util.Arrays;
2628
import java.util.Objects;
2729

2830
/**
@@ -67,7 +69,13 @@ public static PlanReference fromFile(File file) {
6769
/** Create a reference starting from a JSON string. */
6870
public static PlanReference fromJsonString(String jsonString) {
6971
Objects.requireNonNull(jsonString, "Json string cannot be null");
70-
return new ContentPlanReference(jsonString);
72+
return new JsonContentPlanReference(jsonString);
73+
}
74+
75+
/** Create a reference starting from a Smile binary representation. */
76+
public static PlanReference fromSmileBytes(byte[] smileBytes) {
77+
Objects.requireNonNull(smileBytes, "Smile bytes cannot be null");
78+
return new BytesContentPlanReference(smileBytes);
7179
}
7280

7381
/**
@@ -124,12 +132,12 @@ public String toString() {
124132
}
125133

126134
/** Plan reference to a string containing the serialized persisted plan in JSON. */
127-
@Experimental
128-
public static class ContentPlanReference extends PlanReference {
135+
@PublicEvolving
136+
public static class JsonContentPlanReference extends PlanReference {
129137

130138
private final String content;
131139

132-
private ContentPlanReference(String content) {
140+
private JsonContentPlanReference(String content) {
133141
this.content = content;
134142
}
135143

@@ -145,7 +153,7 @@ public boolean equals(Object o) {
145153
if (o == null || getClass() != o.getClass()) {
146154
return false;
147155
}
148-
ContentPlanReference that = (ContentPlanReference) o;
156+
JsonContentPlanReference that = (JsonContentPlanReference) o;
149157
return Objects.equals(content, that.content);
150158
}
151159

@@ -160,6 +168,43 @@ public String toString() {
160168
}
161169
}
162170

171+
/** Plan reference to a string containing the serialized persisted plan in Smile. */
172+
@PublicEvolving
173+
public static class BytesContentPlanReference extends PlanReference {
174+
175+
private final byte[] content;
176+
177+
private BytesContentPlanReference(byte[] content) {
178+
this.content = content;
179+
}
180+
181+
public byte[] getContent() {
182+
return content;
183+
}
184+
185+
@Override
186+
public boolean equals(Object o) {
187+
if (this == o) {
188+
return true;
189+
}
190+
if (o == null || getClass() != o.getClass()) {
191+
return false;
192+
}
193+
BytesContentPlanReference that = (BytesContentPlanReference) o;
194+
return Arrays.equals(content, that.content);
195+
}
196+
197+
@Override
198+
public int hashCode() {
199+
return Arrays.hashCode(content);
200+
}
201+
202+
@Override
203+
public String toString() {
204+
return "Plan:\n" + Arrays.toString(content);
205+
}
206+
}
207+
163208
/** Plan reference to a file in the provided {@link ClassLoader}. */
164209
@Experimental
165210
public static class ResourcePlanReference extends PlanReference {

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CompiledPlanImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ public String asJsonString() {
5050
return internalPlan.asJsonString();
5151
}
5252

53+
@Override
54+
public byte[] asSmileBytes() {
55+
return internalPlan.asSmileBytes();
56+
}
57+
5358
@Override
5459
public void writeToFile(File file, boolean ignoreIfExists) {
5560
internalPlan.writeToFile(

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/InternalPlan.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ public interface InternalPlan {
3939
*/
4040
String asJsonString();
4141

42+
/**
43+
* @see CompiledPlan#asSmileBytes()
44+
*/
45+
byte[] asSmileBytes();
46+
4247
/**
4348
* Note that {@code ignoreIfExists} has precedence over {@code failIfExists}.
4449
*

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,18 @@
4242
public class ExecNodeGraphInternalPlan implements InternalPlan {
4343

4444
private final Supplier<String> serializedPlanSupplier;
45+
private final Supplier<byte[]> smileSerializedPlanSupplier;
4546
private final ExecNodeGraph execNodeGraph;
4647

47-
private String serializedPlan;
48+
private String jsonSerializedPlan;
49+
private byte[] smileSerializedPlan;
4850

4951
public ExecNodeGraphInternalPlan(
50-
Supplier<String> serializedPlanSupplier, ExecNodeGraph execNodeGraph) {
51-
this.serializedPlanSupplier = serializedPlanSupplier;
52+
Supplier<String> jsonSerializedPlanSupplier,
53+
Supplier<byte[]> smileSerializedPlanSupplier,
54+
ExecNodeGraph execNodeGraph) {
55+
this.serializedPlanSupplier = jsonSerializedPlanSupplier;
56+
this.smileSerializedPlanSupplier = smileSerializedPlanSupplier;
5257
this.execNodeGraph = execNodeGraph;
5358
}
5459

@@ -58,10 +63,18 @@ public ExecNodeGraph getExecNodeGraph() {
5863

5964
@Override
6065
public String asJsonString() {
61-
if (serializedPlan == null) {
62-
serializedPlan = serializedPlanSupplier.get();
66+
if (jsonSerializedPlan == null) {
67+
jsonSerializedPlan = serializedPlanSupplier.get();
6368
}
64-
return serializedPlan;
69+
return jsonSerializedPlan;
70+
}
71+
72+
@Override
73+
public byte[] asSmileBytes() {
74+
if (smileSerializedPlan == null) {
75+
smileSerializedPlan = smileSerializedPlanSupplier.get();
76+
}
77+
return smileSerializedPlan;
6578
}
6679

6780
@Override

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonDeserializer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@
4545
import static org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.KIND_PHYSICAL;
4646
import static org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.METADATA_KEY;
4747
import static org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.NAME;
48-
import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.deserializeOptionalField;
49-
import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.traverse;
48+
import static org.apache.flink.table.planner.plan.nodes.exec.serde.CompiledPlanSerdeUtil.deserializeOptionalField;
49+
import static org.apache.flink.table.planner.plan.nodes.exec.serde.CompiledPlanSerdeUtil.traverse;
5050

5151
/**
5252
* JSON deserializer for {@link Column}.

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonSerializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
import java.io.IOException;
2929

30-
import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.serializeOptionalField;
30+
import static org.apache.flink.table.planner.plan.nodes.exec.serde.CompiledPlanSerdeUtil.serializeOptionalField;
3131

3232
/**
3333
* JSON serializer for {@link Column}.

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java renamed to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/CompiledPlanSerdeUtil.java

Lines changed: 50 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@
6161
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.jsontype.NamedType;
6262
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
6363
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
64+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.smile.SmileFactory;
65+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.smile.SmileGenerator;
6466

6567
import org.apache.calcite.rel.core.AggregateCall;
6668
import org.apache.calcite.rel.type.RelDataType;
@@ -71,46 +73,74 @@
7173
import java.io.IOException;
7274
import java.util.Optional;
7375

74-
/** A utility class that provide abilities for JSON serialization and deserialization. */
76+
/** A utility class that provide abilities for JSON and Smile serialization and deserialization. */
7577
@Internal
76-
public class JsonSerdeUtil {
78+
public class CompiledPlanSerdeUtil {
7779

7880
/**
7981
* Object mapper shared instance to serialize and deserialize the plan. Note that creating and
8082
* copying of object mappers is expensive and should be avoided.
8183
*
8284
* <p>This is not exposed to avoid bad usages, like adding new modules. If you need to read and
83-
* write json persisted plans, use {@link #createObjectWriter(SerdeContext)} and {@link
84-
* #createObjectReader(SerdeContext)}.
85+
* write json or smile persisted plans, use {@link #createJsonObjectWriter(SerdeContext)} and
86+
* {@link #createJsonObjectReader(SerdeContext)}.
8587
*/
86-
private static final ObjectMapper OBJECT_MAPPER_INSTANCE;
88+
private static final ObjectMapper JSON_OBJECT_MAPPER_INSTANCE;
89+
90+
private static final ObjectMapper SMILE_OBJECT_MAPPER_INSTANCE;
8791

8892
static {
89-
OBJECT_MAPPER_INSTANCE = JacksonMapperFactory.createObjectMapper();
90-
91-
OBJECT_MAPPER_INSTANCE.setTypeFactory(
92-
// Make sure to register the classloader of the planner
93-
OBJECT_MAPPER_INSTANCE
94-
.getTypeFactory()
95-
.withClassLoader(JsonSerdeUtil.class.getClassLoader()));
96-
OBJECT_MAPPER_INSTANCE.configure(MapperFeature.USE_GETTERS_AS_SETTERS, false);
97-
OBJECT_MAPPER_INSTANCE.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
98-
OBJECT_MAPPER_INSTANCE.registerModule(createFlinkTableJacksonModule());
93+
JSON_OBJECT_MAPPER_INSTANCE = JacksonMapperFactory.createObjectMapper();
94+
JSON_OBJECT_MAPPER_INSTANCE
95+
.setTypeFactory(
96+
// Make sure to register the classloader of the planner
97+
JSON_OBJECT_MAPPER_INSTANCE
98+
.getTypeFactory()
99+
.withClassLoader(CompiledPlanSerdeUtil.class.getClassLoader()))
100+
.disable(MapperFeature.USE_GETTERS_AS_SETTERS)
101+
.enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS)
102+
.registerModule(createFlinkTableJacksonModule());
103+
104+
SMILE_OBJECT_MAPPER_INSTANCE = JacksonMapperFactory.createObjectMapper(new SmileFactory());
105+
SMILE_OBJECT_MAPPER_INSTANCE
106+
.setTypeFactory(
107+
// Make sure to register the classloader of the planner
108+
SMILE_OBJECT_MAPPER_INSTANCE
109+
.getTypeFactory()
110+
.withClassLoader(CompiledPlanSerdeUtil.class.getClassLoader()))
111+
.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
112+
.disable(MapperFeature.USE_GETTERS_AS_SETTERS)
113+
.registerModule(createFlinkTableJacksonModule());
99114
}
100115

101-
public static ObjectReader createObjectReader(SerdeContext serdeContext) {
102-
return OBJECT_MAPPER_INSTANCE
116+
public static ObjectReader createJsonObjectReader(SerdeContext serdeContext) {
117+
return JSON_OBJECT_MAPPER_INSTANCE
103118
.reader()
104119
.withAttribute(SerdeContext.SERDE_CONTEXT_KEY, serdeContext)
105120
.with(defaultInjectedValues());
106121
}
107122

108-
public static ObjectWriter createObjectWriter(SerdeContext serdeContext) {
109-
return OBJECT_MAPPER_INSTANCE
123+
public static ObjectWriter createJsonObjectWriter(SerdeContext serdeContext) {
124+
return JSON_OBJECT_MAPPER_INSTANCE
110125
.writer()
111126
.withAttribute(SerdeContext.SERDE_CONTEXT_KEY, serdeContext);
112127
}
113128

129+
public static ObjectWriter createSmileObjectWriter(SerdeContext serdeContext) {
130+
return SMILE_OBJECT_MAPPER_INSTANCE
131+
.writer()
132+
.withAttribute(SerdeContext.SERDE_CONTEXT_KEY, serdeContext)
133+
.with(SmileGenerator.Feature.CHECK_SHARED_STRING_VALUES);
134+
}
135+
136+
public static ObjectReader createSmileObjectReader(SerdeContext serdeContext) {
137+
return SMILE_OBJECT_MAPPER_INSTANCE
138+
.reader()
139+
.withAttribute(SerdeContext.SERDE_CONTEXT_KEY, serdeContext)
140+
.with(SmileGenerator.Feature.CHECK_SHARED_STRING_VALUES)
141+
.with(defaultInjectedValues());
142+
}
143+
114144
private static InjectableValues defaultInjectedValues() {
115145
return new InjectableValues.Std().addValue("isDeserialize", true);
116146
}
@@ -244,5 +274,5 @@ static Class<?> loadClass(String className, SerdeContext serdeContext, String ex
244274
}
245275
}
246276

247-
private JsonSerdeUtil() {}
277+
private CompiledPlanSerdeUtil() {}
248278
}

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonDeserializer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,15 +73,15 @@ public ContextResolvedTable deserialize(JsonParser jsonParser, DeserializationCo
7373

7474
// Deserialize the two fields, if available
7575
final ObjectIdentifier identifier =
76-
JsonSerdeUtil.deserializeOptionalField(
76+
CompiledPlanSerdeUtil.deserializeOptionalField(
7777
objectNode,
7878
FIELD_NAME_IDENTIFIER,
7979
ObjectIdentifier.class,
8080
jsonParser.getCodec(),
8181
ctx)
8282
.orElse(null);
8383
final ResolvedCatalogTable resolvedCatalogTable =
84-
JsonSerdeUtil.deserializeOptionalField(
84+
CompiledPlanSerdeUtil.deserializeOptionalField(
8585
objectNode,
8686
FIELD_NAME_CATALOG_TABLE,
8787
ResolvedCatalogTable.class,

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonDeserializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,14 @@
4444
import java.util.stream.Collectors;
4545
import java.util.stream.IntStream;
4646

47+
import static org.apache.flink.table.planner.plan.nodes.exec.serde.CompiledPlanSerdeUtil.loadClass;
4748
import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerializer.FIELD_NAME_CONVERSION_CLASS;
4849
import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerializer.FIELD_NAME_ELEMENT_CLASS;
4950
import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerializer.FIELD_NAME_FIELDS;
5051
import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerializer.FIELD_NAME_FIELD_NAME;
5152
import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerializer.FIELD_NAME_KEY_CLASS;
5253
import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerializer.FIELD_NAME_TYPE;
5354
import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerializer.FIELD_NAME_VALUE_CLASS;
54-
import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.loadClass;
5555

5656
/**
5757
* JSON deserializer for {@link DataType}.

0 commit comments

Comments
 (0)