Skip to content

Commit 9c97cfb

Browse files
authored
Add JSON_EXTRACT_ALL internal function for Calcite PPL (#4489)
* Add JSON_EXTRACT_ALL internal function for Calcite PPL Signed-off-by: Tomoyuki Morita <[email protected]> * Address comments Signed-off-by: Tomoyuki Morita <[email protected]> * Minor fix Signed-off-by: Tomoyuki Morita <[email protected]> --------- Signed-off-by: Tomoyuki Morita <[email protected]>
1 parent 89dbc31 commit 9c97cfb

File tree

6 files changed

+956
-0
lines changed

6 files changed

+956
-0
lines changed

core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ public enum BuiltinFunctionName {
248248
JSON_ARRAY(FunctionName.of("json_array")),
249249
JSON_ARRAY_LENGTH(FunctionName.of("json_array_length")),
250250
JSON_EXTRACT(FunctionName.of("json_extract")),
251+
JSON_EXTRACT_ALL(FunctionName.of("json_extract_all"), true),
251252
JSON_KEYS(FunctionName.of("json_keys")),
252253
JSON_SET(FunctionName.of("json_set")),
253254
JSON_DELETE(FunctionName.of("json_delete")),

core/src/main/java/org/opensearch/sql/expression/function/PPLBuiltinOperators.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.opensearch.sql.expression.function.jsonUDF.JsonArrayLengthFunctionImpl;
5454
import org.opensearch.sql.expression.function.jsonUDF.JsonDeleteFunctionImpl;
5555
import org.opensearch.sql.expression.function.jsonUDF.JsonExtendFunctionImpl;
56+
import org.opensearch.sql.expression.function.jsonUDF.JsonExtractAllFunctionImpl;
5657
import org.opensearch.sql.expression.function.jsonUDF.JsonExtractFunctionImpl;
5758
import org.opensearch.sql.expression.function.jsonUDF.JsonFunctionImpl;
5859
import org.opensearch.sql.expression.function.jsonUDF.JsonKeysFunctionImpl;
@@ -112,6 +113,8 @@ public class PPLBuiltinOperators extends ReflectiveSqlOperatorTable {
112113
new JsonArrayLengthFunctionImpl().toUDF("JSON_ARRAY_LENGTH");
113114
public static final SqlOperator JSON_EXTRACT =
114115
new JsonExtractFunctionImpl().toUDF("JSON_EXTRACT");
116+
public static final SqlOperator JSON_EXTRACT_ALL =
117+
new JsonExtractAllFunctionImpl().toUDF("JSON_EXTRACT_ALL");
115118
public static final SqlOperator JSON_KEYS = new JsonKeysFunctionImpl().toUDF("JSON_KEYS");
116119
public static final SqlOperator JSON_SET = new JsonSetFunctionImpl().toUDF("JSON_SET");
117120
public static final SqlOperator JSON_DELETE = new JsonDeleteFunctionImpl().toUDF("JSON_DELETE");

core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@
9999
import static org.opensearch.sql.expression.function.BuiltinFunctionName.JSON_DELETE;
100100
import static org.opensearch.sql.expression.function.BuiltinFunctionName.JSON_EXTEND;
101101
import static org.opensearch.sql.expression.function.BuiltinFunctionName.JSON_EXTRACT;
102+
import static org.opensearch.sql.expression.function.BuiltinFunctionName.JSON_EXTRACT_ALL;
102103
import static org.opensearch.sql.expression.function.BuiltinFunctionName.JSON_KEYS;
103104
import static org.opensearch.sql.expression.function.BuiltinFunctionName.JSON_OBJECT;
104105
import static org.opensearch.sql.expression.function.BuiltinFunctionName.JSON_SET;
@@ -930,6 +931,7 @@ void populate() {
930931
registerOperator(JSON_DELETE, PPLBuiltinOperators.JSON_DELETE);
931932
registerOperator(JSON_APPEND, PPLBuiltinOperators.JSON_APPEND);
932933
registerOperator(JSON_EXTEND, PPLBuiltinOperators.JSON_EXTEND);
934+
registerOperator(JSON_EXTRACT_ALL, PPLBuiltinOperators.JSON_EXTRACT_ALL); // internal
933935

934936
// Register operators with a different type checker
935937

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.expression.function.jsonUDF;
7+
8+
import static org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.TYPE_FACTORY;
9+
10+
import com.fasterxml.jackson.core.JsonFactory;
11+
import com.fasterxml.jackson.core.JsonParser;
12+
import com.fasterxml.jackson.core.JsonToken;
13+
import java.io.IOException;
14+
import java.util.Collection;
15+
import java.util.HashMap;
16+
import java.util.LinkedList;
17+
import java.util.List;
18+
import java.util.Map;
19+
import java.util.Stack;
20+
import org.apache.calcite.adapter.enumerable.NotNullImplementor;
21+
import org.apache.calcite.adapter.enumerable.NullPolicy;
22+
import org.apache.calcite.adapter.enumerable.RexImpTable;
23+
import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
24+
import org.apache.calcite.linq4j.tree.Expression;
25+
import org.apache.calcite.linq4j.tree.Types;
26+
import org.apache.calcite.rex.RexCall;
27+
import org.apache.calcite.schema.impl.ScalarFunctionImpl;
28+
import org.apache.calcite.sql.type.OperandTypes;
29+
import org.apache.calcite.sql.type.ReturnTypes;
30+
import org.apache.calcite.sql.type.SqlReturnTypeInference;
31+
import org.apache.calcite.sql.type.SqlTypeFamily;
32+
import org.apache.calcite.sql.type.SqlTypeName;
33+
import org.opensearch.sql.expression.function.ImplementorUDF;
34+
import org.opensearch.sql.expression.function.UDFOperandMetadata;
35+
36+
/**
37+
* UDF which extract all the fields from JSON to a MAP. Items are collected from input JSON and
38+
* stored with the key of their path in the JSON. This UDF is designed to be used for `spath`
39+
* command without path param. See {@ref JsonExtractAllFunctionImplTest} for the detailed spec.
40+
*/
41+
public class JsonExtractAllFunctionImpl extends ImplementorUDF {
42+
private static final String ARRAY_SUFFIX = "{}";
43+
private static final JsonFactory JSON_FACTORY = new JsonFactory();
44+
45+
public JsonExtractAllFunctionImpl() {
46+
super(new JsonExtractAllImplementor(), NullPolicy.ANY);
47+
}
48+
49+
@Override
50+
public SqlReturnTypeInference getReturnTypeInference() {
51+
return ReturnTypes.explicit(
52+
TYPE_FACTORY.createMapType(
53+
TYPE_FACTORY.createSqlType(SqlTypeName.VARCHAR),
54+
TYPE_FACTORY.createSqlType(SqlTypeName.ANY),
55+
true));
56+
}
57+
58+
@Override
59+
public UDFOperandMetadata getOperandMetadata() {
60+
return UDFOperandMetadata.wrap(OperandTypes.family(SqlTypeFamily.STRING));
61+
}
62+
63+
public static class JsonExtractAllImplementor implements NotNullImplementor {
64+
@Override
65+
public Expression implement(
66+
RexToLixTranslator translator, RexCall call, List<Expression> translatedOperands) {
67+
ScalarFunctionImpl function =
68+
(ScalarFunctionImpl)
69+
ScalarFunctionImpl.create(
70+
Types.lookupMethod(JsonExtractAllFunctionImpl.class, "eval", Object[].class));
71+
return function.getImplementor().implement(translator, call, RexImpTable.NullAs.NULL);
72+
}
73+
}
74+
75+
public static Object eval(Object... args) {
76+
if (args.length < 1) {
77+
return null;
78+
}
79+
80+
String jsonStr = (String) args[0];
81+
if (jsonStr == null || jsonStr.trim().isEmpty()) {
82+
return null;
83+
}
84+
85+
return parseJson(jsonStr);
86+
}
87+
88+
private static Map<String, Object> parseJson(String jsonStr) {
89+
Map<String, Object> resultMap = new HashMap<>();
90+
Stack<String> pathStack = new Stack<>();
91+
92+
try (JsonParser parser = JSON_FACTORY.createParser(jsonStr)) {
93+
JsonToken token;
94+
95+
while ((token = parser.nextToken()) != null) {
96+
switch (token) {
97+
case START_OBJECT:
98+
break;
99+
100+
case END_OBJECT:
101+
if (!pathStack.isEmpty() && !isInArray(pathStack)) {
102+
pathStack.pop();
103+
}
104+
break;
105+
106+
case START_ARRAY:
107+
pathStack.push(ARRAY_SUFFIX);
108+
break;
109+
110+
case END_ARRAY:
111+
pathStack.pop();
112+
if (!pathStack.isEmpty() && !isInArray(pathStack)) {
113+
pathStack.pop();
114+
}
115+
break;
116+
117+
case FIELD_NAME:
118+
String fieldName = parser.currentName();
119+
pathStack.push(fieldName);
120+
break;
121+
122+
case VALUE_STRING:
123+
case VALUE_NUMBER_INT:
124+
case VALUE_NUMBER_FLOAT:
125+
case VALUE_TRUE:
126+
case VALUE_FALSE:
127+
case VALUE_NULL:
128+
if (pathStack.isEmpty()) {
129+
// ignore top level value
130+
return null;
131+
}
132+
133+
appendValue(resultMap, buildPath(pathStack), extractValue(parser, token));
134+
135+
if (!isInArray(pathStack)) {
136+
pathStack.pop();
137+
}
138+
break;
139+
default:
140+
// Skip other tokens
141+
break;
142+
}
143+
}
144+
} catch (IOException e) {
145+
// ignore exception, and current result will be returned
146+
}
147+
return resultMap;
148+
}
149+
150+
@SuppressWarnings("unchecked")
151+
private static void appendValue(Map<String, Object> resultMap, String path, Object value) {
152+
Object existingValue = resultMap.get(path);
153+
if (existingValue == null) {
154+
resultMap.put(path, value);
155+
} else if (existingValue instanceof List) {
156+
((List<Object>) existingValue).add(value);
157+
} else {
158+
resultMap.put(path, list(existingValue, value));
159+
}
160+
}
161+
162+
private static List<Object> list(Object... args) {
163+
List<Object> result = new LinkedList<>();
164+
for (Object arg : args) {
165+
result.add(arg);
166+
}
167+
return result;
168+
}
169+
170+
private static boolean isInArray(List<String> path) {
171+
return path.size() >= 1 && path.getLast().equals(ARRAY_SUFFIX);
172+
}
173+
174+
private static Object extractValue(JsonParser parser, JsonToken token) throws IOException {
175+
switch (token) {
176+
case VALUE_STRING:
177+
return parser.getValueAsString();
178+
case VALUE_NUMBER_INT:
179+
return getIntValue(parser);
180+
case VALUE_NUMBER_FLOAT:
181+
return parser.getDoubleValue();
182+
case VALUE_TRUE:
183+
return true;
184+
case VALUE_FALSE:
185+
return false;
186+
case VALUE_NULL:
187+
return null;
188+
default:
189+
return parser.getValueAsString();
190+
}
191+
}
192+
193+
private static Object getIntValue(JsonParser parser) throws IOException {
194+
if (parser.getNumberType() == JsonParser.NumberType.INT) {
195+
return parser.getIntValue();
196+
} else if (parser.getNumberType() == JsonParser.NumberType.LONG) {
197+
return parser.getLongValue();
198+
} else {
199+
// store as double in case of BIG_INTEGER (exceed LONG precision)
200+
return parser.getBigIntegerValue().doubleValue();
201+
}
202+
}
203+
204+
private static String buildPath(Collection<String> pathStack) {
205+
StringBuilder builder = new StringBuilder();
206+
for (String path : pathStack) {
207+
if (ARRAY_SUFFIX.equals(path)) {
208+
builder.append(ARRAY_SUFFIX);
209+
} else if (!path.isEmpty()) {
210+
if (!builder.isEmpty()) {
211+
builder.append(".");
212+
}
213+
builder.append(path);
214+
}
215+
}
216+
return builder.toString();
217+
}
218+
}

0 commit comments

Comments
 (0)