Skip to content

Commit e92bf68

Browse files
authored
Fix ClassCastException for value-storing aggregates on nested PPL fields (#4360)
* fix first/last Signed-off-by: Kai Huang <[email protected]> # Conflicts: # integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAggregationIT.java * fix Signed-off-by: Kai Huang <[email protected]> * add tests Signed-off-by: Kai Huang <[email protected]> * update approach Signed-off-by: Kai Huang <[email protected]> --------- Signed-off-by: Kai Huang <[email protected]>
1 parent 06fa7d7 commit e92bf68

File tree

6 files changed

+332
-1
lines changed

6 files changed

+332
-1
lines changed

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAggregationIT.java

Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_DATATYPE_NUMERIC;
1212
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_DATE_FORMATS;
1313
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_LOGS;
14+
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_TELEMETRY;
1415
import static org.opensearch.sql.util.MatcherUtils.assertJsonEquals;
1516
import static org.opensearch.sql.util.MatcherUtils.rows;
1617
import static org.opensearch.sql.util.MatcherUtils.schema;
@@ -41,6 +42,7 @@ public void init() throws Exception {
4142
loadIndex(Index.DATE_FORMATS);
4243
loadIndex(Index.DATA_TYPE_NUMERIC);
4344
loadIndex(Index.LOGS);
45+
loadIndex(Index.TELEMETRY);
4446
loadIndex(Index.TIME_TEST_DATA);
4547
}
4648

@@ -1246,4 +1248,255 @@ public void testLimitAfterAggregation() throws IOException {
12461248
verifySchema(response, schema("count()", "bigint"), schema("age", "int"));
12471249
verifyDataRows(response, rows(1, 39), rows(2, 36), rows(1, 34));
12481250
}
1251+
1252+
@Test
1253+
public void testFirstLastWithSimpleField() throws IOException {
1254+
// This should work - testing simple field first
1255+
JSONObject actual =
1256+
executeQuery(
1257+
String.format("source=%s | stats first(severityNumber)", TEST_INDEX_TELEMETRY));
1258+
verifySchema(actual, schema("first(severityNumber)", "int"));
1259+
verifyDataRows(actual, rows(9));
1260+
}
1261+
1262+
@Test
1263+
public void testFirstLastWithDeepNestedField() throws IOException {
1264+
// This test should now work with the fix for ClassCastException
1265+
JSONObject actual =
1266+
executeQuery(
1267+
String.format(
1268+
"source=%s | stats first(`resource.attributes.telemetry.sdk.language`)",
1269+
TEST_INDEX_TELEMETRY));
1270+
verifySchema(actual, schema("first(`resource.attributes.telemetry.sdk.language`)", "string"));
1271+
verifyDataRows(actual, rows("java"));
1272+
}
1273+
1274+
@Test
1275+
public void testLastWithDeepNestedField() throws IOException {
1276+
// This test should now work with the fix for ClassCastException
1277+
JSONObject actual =
1278+
executeQuery(
1279+
String.format(
1280+
"source=%s | stats last(`resource.attributes.telemetry.sdk.language`)",
1281+
TEST_INDEX_TELEMETRY));
1282+
verifySchema(actual, schema("last(`resource.attributes.telemetry.sdk.language`)", "string"));
1283+
verifyDataRows(actual, rows("rust"));
1284+
}
1285+
1286+
@Test
1287+
public void testFirstLastWithDeepNestedFieldByGroup() throws IOException {
1288+
// This test should now work with the fix for ClassCastException
1289+
JSONObject actual =
1290+
executeQuery(
1291+
String.format(
1292+
"source=%s | stats first(`resource.attributes.telemetry.sdk.language`) by"
1293+
+ " severityNumber",
1294+
TEST_INDEX_TELEMETRY));
1295+
verifySchema(
1296+
actual,
1297+
schema("first(`resource.attributes.telemetry.sdk.language`)", "string"),
1298+
schema("severityNumber", "int"));
1299+
verifyDataRows(actual, rows("java", 9), rows("python", 12), rows("go", 16));
1300+
}
1301+
1302+
@Test
1303+
public void testMinWithDeepNestedField() throws IOException {
1304+
// Test that min() works with deeply nested fields after the ClassCastException fix
1305+
JSONObject actual =
1306+
executeQuery(
1307+
String.format(
1308+
"source=%s | stats min(`resource.attributes.telemetry.sdk.language`)",
1309+
TEST_INDEX_TELEMETRY));
1310+
verifySchema(actual, schema("min(`resource.attributes.telemetry.sdk.language`)", "string"));
1311+
verifyDataRows(
1312+
actual, rows("go")); // Alphabetically first: go < java < javascript < python < rust
1313+
}
1314+
1315+
@Test
1316+
public void testMaxWithDeepNestedField() throws IOException {
1317+
// Test that max() works with deeply nested fields after the ClassCastException fix
1318+
JSONObject actual =
1319+
executeQuery(
1320+
String.format(
1321+
"source=%s | stats max(`resource.attributes.telemetry.sdk.language`)",
1322+
TEST_INDEX_TELEMETRY));
1323+
verifySchema(actual, schema("max(`resource.attributes.telemetry.sdk.language`)", "string"));
1324+
verifyDataRows(
1325+
actual, rows("rust")); // Alphabetically last: go < java < javascript < python < rust
1326+
}
1327+
1328+
@Test
1329+
public void testMinMaxWithDeepNestedFieldByGroup() throws IOException {
1330+
// Test that min() and max() work with deeply nested fields and grouping
1331+
JSONObject actual =
1332+
executeQuery(
1333+
String.format(
1334+
"source=%s | stats min(`resource.attributes.telemetry.sdk.language`) by"
1335+
+ " severityNumber | sort severityNumber",
1336+
TEST_INDEX_TELEMETRY));
1337+
verifySchema(
1338+
actual,
1339+
schema("min(`resource.attributes.telemetry.sdk.language`)", "string"),
1340+
schema("severityNumber", "int"));
1341+
// severityNumber 9: java, javascript -> min = java
1342+
// severityNumber 12: python, rust -> min = python
1343+
// severityNumber 16: go -> min = go
1344+
verifyDataRows(actual, rows("java", 9), rows("python", 12), rows("go", 16));
1345+
}
1346+
1347+
@Test
1348+
public void testMinMaxMultipleNestedFields() throws IOException {
1349+
// Test min/max with multiple nested field aggregations in one query
1350+
JSONObject actual =
1351+
executeQuery(
1352+
String.format(
1353+
"source=%s | stats min(`resource.attributes.telemetry.sdk.language`) as min_lang,"
1354+
+ " max(`resource.attributes.telemetry.sdk.language`) as max_lang",
1355+
TEST_INDEX_TELEMETRY));
1356+
verifySchema(actual, schema("min_lang", "string"), schema("max_lang", "string"));
1357+
verifyDataRows(actual, rows("go", "rust"));
1358+
}
1359+
1360+
@Test
1361+
public void testMinWithIntegerNestedField() throws IOException {
1362+
// Test that min() works with deeply nested integer fields
1363+
JSONObject actual =
1364+
executeQuery(
1365+
String.format(
1366+
"source=%s | stats min(`resource.attributes.telemetry.sdk.version`)",
1367+
TEST_INDEX_TELEMETRY));
1368+
verifySchema(actual, schema("min(`resource.attributes.telemetry.sdk.version`)", "int"));
1369+
verifyDataRows(actual, rows(10)); // Minimum version is 10
1370+
}
1371+
1372+
@Test
1373+
public void testMaxWithIntegerNestedField() throws IOException {
1374+
// Test that max() works with deeply nested integer fields
1375+
JSONObject actual =
1376+
executeQuery(
1377+
String.format(
1378+
"source=%s | stats max(`resource.attributes.telemetry.sdk.version`)",
1379+
TEST_INDEX_TELEMETRY));
1380+
verifySchema(actual, schema("max(`resource.attributes.telemetry.sdk.version`)", "int"));
1381+
verifyDataRows(actual, rows(14)); // Maximum version is 14
1382+
}
1383+
1384+
@Test
1385+
public void testMinMaxIntegerNestedFieldsByGroup() throws IOException {
1386+
// Test min/max on integer nested fields with grouping
1387+
JSONObject actual =
1388+
executeQuery(
1389+
String.format(
1390+
"source=%s | stats min(`resource.attributes.telemetry.sdk.version`) as min_ver,"
1391+
+ " max(`resource.attributes.telemetry.sdk.version`) as max_ver by"
1392+
+ " severityNumber",
1393+
TEST_INDEX_TELEMETRY));
1394+
verifySchema(
1395+
actual,
1396+
schema("min_ver", "int"),
1397+
schema("max_ver", "int"),
1398+
schema("severityNumber", "int"));
1399+
// severityNumber 9: versions 10, 12 -> min=10, max=12
1400+
// severityNumber 12: versions 11, 14 -> min=11, max=14
1401+
// severityNumber 16: version 13 -> min=13, max=13
1402+
verifyDataRows(actual, rows(10, 12, 9), rows(11, 14, 12), rows(13, 13, 16));
1403+
}
1404+
1405+
@Test
1406+
public void testFirstLastWithIntegerNestedField() throws IOException {
1407+
// Test first/last with deeply nested integer fields
1408+
JSONObject actual =
1409+
executeQuery(
1410+
String.format(
1411+
"source=%s | stats first(`resource.attributes.telemetry.sdk.version`) as first_ver,"
1412+
+ " last(`resource.attributes.telemetry.sdk.version`) as last_ver",
1413+
TEST_INDEX_TELEMETRY));
1414+
verifySchema(actual, schema("first_ver", "int"), schema("last_ver", "int"));
1415+
verifyDataRows(actual, rows(10, 14));
1416+
}
1417+
1418+
@Test
1419+
public void testFirstLastWithBooleanNestedField() throws IOException {
1420+
// Test first/last with deeply nested boolean fields
1421+
JSONObject actual =
1422+
executeQuery(
1423+
String.format(
1424+
"source=%s | stats first(`resource.attributes.telemetry.sdk.enabled`) as"
1425+
+ " first_enabled, last(`resource.attributes.telemetry.sdk.enabled`) as"
1426+
+ " last_enabled",
1427+
TEST_INDEX_TELEMETRY));
1428+
verifySchema(actual, schema("first_enabled", "boolean"), schema("last_enabled", "boolean"));
1429+
verifyDataRows(actual, rows(true, true)); // First record is true, last record is true
1430+
}
1431+
1432+
@Test
1433+
public void testCountWithBooleanNestedFieldGroupBy() throws IOException {
1434+
// Test count aggregation grouped by boolean nested field
1435+
JSONObject actual =
1436+
executeQuery(
1437+
String.format(
1438+
"source=%s | stats count() as cnt by `resource.attributes.telemetry.sdk.enabled`",
1439+
TEST_INDEX_TELEMETRY));
1440+
verifySchema(
1441+
actual,
1442+
schema("cnt", "bigint"),
1443+
schema("resource.attributes.telemetry.sdk.enabled", "boolean"));
1444+
verifyDataRows(actual, rows(2L, false), rows(3L, true)); // 2 false, 3 true values
1445+
}
1446+
1447+
@Test
1448+
public void testMinMaxWithBooleanNestedField() throws IOException {
1449+
// Test min/max with deeply nested boolean fields
1450+
JSONObject actual =
1451+
executeQuery(
1452+
String.format(
1453+
"source=%s | stats min(`resource.attributes.telemetry.sdk.enabled`) as min_enabled,"
1454+
+ " max(`resource.attributes.telemetry.sdk.enabled`) as max_enabled",
1455+
TEST_INDEX_TELEMETRY));
1456+
verifySchema(actual, schema("min_enabled", "boolean"), schema("max_enabled", "boolean"));
1457+
verifyDataRows(actual, rows(false, true)); // Min is false, max is true
1458+
}
1459+
1460+
@Test
1461+
public void testBooleanNestedFieldByGroup() throws IOException {
1462+
// Test boolean nested fields with grouping by other fields
1463+
JSONObject actual =
1464+
executeQuery(
1465+
String.format(
1466+
"source=%s | stats count() as cnt,"
1467+
+ " first(`resource.attributes.telemetry.sdk.enabled`) as enabled by"
1468+
+ " severityNumber",
1469+
TEST_INDEX_TELEMETRY));
1470+
verifySchema(
1471+
actual,
1472+
schema("cnt", "bigint"),
1473+
schema("enabled", "boolean"),
1474+
schema("severityNumber", "int"));
1475+
// severityNumber 9: java (true), javascript (true) -> 2 records, first is true
1476+
// severityNumber 12: python (false), rust (true) -> 2 records, first is false
1477+
// severityNumber 16: go (false) -> 1 record, first is false
1478+
verifyDataRows(actual, rows(2L, true, 9), rows(2L, false, 12), rows(1L, false, 16));
1479+
}
1480+
1481+
@Test
1482+
public void testMixedTypesNestedFieldAggregations() throws IOException {
1483+
// Test aggregating multiple nested field types in one query
1484+
JSONObject actual =
1485+
executeQuery(
1486+
String.format(
1487+
"source=%s | stats min(`resource.attributes.telemetry.sdk.version`) as min_ver,"
1488+
+ " max(`resource.attributes.telemetry.sdk.version`) as max_ver,"
1489+
+ " min(`resource.attributes.telemetry.sdk.enabled`) as min_enabled,"
1490+
+ " max(`resource.attributes.telemetry.sdk.enabled`) as max_enabled,"
1491+
+ " first(`resource.attributes.telemetry.sdk.language`) as first_lang",
1492+
TEST_INDEX_TELEMETRY));
1493+
verifySchema(
1494+
actual,
1495+
schema("min_ver", "int"),
1496+
schema("max_ver", "int"),
1497+
schema("min_enabled", "boolean"),
1498+
schema("max_enabled", "boolean"),
1499+
schema("first_lang", "string"));
1500+
verifyDataRows(actual, rows(10, 14, false, true, "java"));
1501+
}
12491502
}

integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -660,6 +660,11 @@ public enum Index {
660660
"_doc",
661661
getDeepNestedIndexMapping(),
662662
"src/test/resources/deep_nested_index_data.json"),
663+
TELEMETRY(
664+
TestsConstants.TEST_INDEX_TELEMETRY,
665+
"_doc",
666+
getMappingFile("telemetry_test_mapping.json"),
667+
"src/test/resources/telemetry_test_data.json"),
663668
DATA_TYPE_NUMERIC(
664669
TestsConstants.TEST_INDEX_DATATYPE_NUMERIC,
665670
"_doc",

integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public class TestsConstants {
4747
public static final String TEST_INDEX_DATE = TEST_INDEX + "_date";
4848
public static final String TEST_INDEX_DATE_TIME = TEST_INDEX + "_datetime";
4949
public static final String TEST_INDEX_DEEP_NESTED = TEST_INDEX + "_deep_nested";
50+
public static final String TEST_INDEX_TELEMETRY = TEST_INDEX + "_telemetry";
5051
public static final String TEST_INDEX_STRINGS = TEST_INDEX + "_strings";
5152
public static final String TEST_INDEX_DATATYPE_NUMERIC = TEST_INDEX + "_datatypes_numeric";
5253
public static final String TEST_INDEX_DATATYPE_NONNUMERIC = TEST_INDEX + "_datatypes_nonnumeric";
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
{
2+
"mappings": {
3+
"properties": {
4+
"resource": {
5+
"properties": {
6+
"attributes": {
7+
"properties": {
8+
"telemetry": {
9+
"properties": {
10+
"sdk": {
11+
"properties": {
12+
"language": {
13+
"type": "keyword",
14+
"ignore_above": 256
15+
},
16+
"name": {
17+
"type": "keyword",
18+
"ignore_above": 256
19+
},
20+
"version": {
21+
"type": "integer"
22+
},
23+
"enabled": {
24+
"type": "boolean"
25+
}
26+
}
27+
}
28+
}
29+
}
30+
}
31+
}
32+
}
33+
},
34+
"severityNumber": {
35+
"type": "integer"
36+
}
37+
}
38+
}
39+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{"index":{"_id":"1"}}
2+
{"resource": {"attributes": {"telemetry": {"sdk": {"language": "java", "name": "opentelemetry", "version": 10, "enabled": true}}}}, "severityNumber": 9}
3+
{"index":{"_id":"2"}}
4+
{"resource": {"attributes": {"telemetry": {"sdk": {"language": "python", "name": "opentelemetry", "version": 11, "enabled": false}}}}, "severityNumber": 12}
5+
{"index":{"_id":"3"}}
6+
{"resource": {"attributes": {"telemetry": {"sdk": {"language": "javascript", "name": "opentelemetry", "version": 12, "enabled": true}}}}, "severityNumber": 9}
7+
{"index":{"_id":"4"}}
8+
{"resource": {"attributes": {"telemetry": {"sdk": {"language": "go", "name": "opentelemetry", "version": 13, "enabled": false}}}}, "severityNumber": 16}
9+
{"index":{"_id":"5"}}
10+
{"resource": {"attributes": {"telemetry": {"sdk": {"language": "rust", "name": "opentelemetry", "version": 14, "enabled": true}}}}, "severityNumber": 12}

opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,8 @@ public ExprValue construct(String jsonString, boolean supportArrays) {
182182
* @return ExprValue
183183
*/
184184
public ExprValue construct(String field, Object value, boolean supportArrays) {
185-
return parse(new ObjectContent(value), field, type(field), supportArrays);
185+
Object extractedValue = extractFinalPrimitiveValue(value);
186+
return parse(new ObjectContent(extractedValue), field, type(field), supportArrays);
186187
}
187188

188189
private ExprValue parse(
@@ -531,4 +532,26 @@ private ExprValue parseInnerArrayValue(
531532
private String makeField(String path, String field) {
532533
return path.equalsIgnoreCase(TOP_PATH) ? field : String.join(".", path, field);
533534
}
535+
536+
/**
537+
* Recursively extracts the final primitive value from nested Map structures. For example:
538+
* {attributes={telemetry={sdk={language=java}}}} -> "java"
539+
*
540+
* @param value The value to extract from
541+
* @return The extracted primitive value, or the original value if extraction is not possible
542+
*/
543+
@SuppressWarnings("unchecked")
544+
private Object extractFinalPrimitiveValue(Object value) {
545+
if (value == null || !(value instanceof Map)) {
546+
return value;
547+
}
548+
549+
Map<String, Object> map = (Map<String, Object>) value;
550+
if (map.size() == 1) {
551+
Object singleValue = map.values().iterator().next();
552+
return extractFinalPrimitiveValue(singleValue);
553+
}
554+
555+
return value;
556+
}
534557
}

0 commit comments

Comments
 (0)