Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_SUBSEARCH;
import static org.opensearch.sql.calcite.utils.PlanUtils.getRelation;
import static org.opensearch.sql.calcite.utils.PlanUtils.getRexCall;
import static org.opensearch.sql.calcite.utils.PlanUtils.transformPlanToAttachChild;
import static org.opensearch.sql.utils.SystemIndexUtils.DATASOURCES_TABLE_NAME;

import com.google.common.base.Strings;
Expand Down Expand Up @@ -2434,9 +2433,22 @@ public RelNode visitAppendCol(AppendCol node, CalcitePlanContext context) {
context.relBuilder.projectPlus(
context.relBuilder.alias(mainRowNumber, ROW_NUMBER_COLUMN_FOR_MAIN));

// 3. build subsearch tree (attach relation to subsearch)
UnresolvedPlan relation = getRelation(node);
transformPlanToAttachChild(node.getSubSearch(), relation);
// 3. build subsearch tree: attach source plan that includes spath transformations
// so the subsearch can resolve MAP sub-paths like doc.user.city (fixes #5186)
UnresolvedPlan subsearchSource = buildAppendColSubsearchSource(node);
// Walk down the subsearch to find the deepest node that should receive the source.
// The anonymizer may have already attached a raw Relation. We find the node whose
// child is a Relation (or is empty) and (re-)attach our subsearchSource there.
UnresolvedPlan parent = node.getSubSearch();
while (parent.getChild() != null && !parent.getChild().isEmpty()) {
UnresolvedPlan child = (UnresolvedPlan) parent.getChild().getFirst();
if (child instanceof Relation) {
// Parent's child is the Relation -- replace it with our source chain
break;
}
parent = child;
}
parent.attach(subsearchSource);
// 4. resolve subsearch plan
node.getSubSearch().accept(this, context);
// 5. add row_number() column to subsearch
Expand Down Expand Up @@ -2528,6 +2540,41 @@ public RelNode visitAppendCol(AppendCol node, CalcitePlanContext context) {
}
}

/**
* Builds the source plan for the appendcol subsearch. Collects the leaf Relation and any SPath
* nodes from the main plan's child chain, so that schema-transforming commands (spath converts
* STRING fields to MAP) are inherited by the subsearch. Other commands (eval, stats, filter,
* etc.) are NOT propagated because they would alter the available fields unexpectedly.
*/
private UnresolvedPlan buildAppendColSubsearchSource(AppendCol node) {
UnresolvedPlan relation = getRelation(node);
// Walk the child chain from AppendCol down to the Relation, collecting SPath nodes
List<SPath> spathNodes = new ArrayList<>();
UnresolvedPlan current = (UnresolvedPlan) node.getChild().getFirst();
while (current != null) {
if (current instanceof SPath spath) {
spathNodes.add(spath);
}
if (current.getChild() != null && !current.getChild().isEmpty()) {
current = (UnresolvedPlan) current.getChild().getFirst();
} else {
break;
}
}
if (spathNodes.isEmpty()) {
return relation;
}
// Build a new chain: SPath(n) -> ... -> SPath(1) -> Relation
// Create fresh SPath copies to avoid sharing mutable AST nodes
UnresolvedPlan source = relation;
for (SPath spath : spathNodes) {
SPath copy = new SPath(spath.getInField(), spath.getOutField(), spath.getPath());
copy.attach(source);
source = copy;
}
return source;
}

@Override
public RelNode visitAppend(Append node, CalcitePlanContext context) {
// 1. Resolve main plan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,19 @@ public void init() throws Exception {
Request nullDoc2 = new Request("PUT", "/test_spath_null/_doc/2?refresh=true");
nullDoc2.setJsonEntity("{\"doc\": null}");
client().performRequest(nullDoc2);

// Index for testing spath + appendcol (#5186)
Request appendcolDoc1 = new Request("PUT", "/test_spath_appendcol/_doc/1?refresh=true");
appendcolDoc1.setJsonEntity(
"{\"doc\":"
+ " \"{\\\"user\\\":{\\\"name\\\":\\\"John\\\",\\\"age\\\":30,\\\"city\\\":\\\"NYC\\\"}}\"}");
client().performRequest(appendcolDoc1);

Request appendcolDoc2 = new Request("PUT", "/test_spath_appendcol/_doc/2?refresh=true");
appendcolDoc2.setJsonEntity(
"{\"doc\":"
+ " \"{\\\"user\\\":{\\\"name\\\":\\\"Alice\\\",\\\"age\\\":25,\\\"city\\\":\\\"LA\\\"}}\"}");
client().performRequest(appendcolDoc2);
}

@Test
Expand Down Expand Up @@ -216,4 +229,17 @@ public void testSpathAutoExtractWithSort() throws IOException {
verifySchema(result, schema("doc.user.name", "string"));
verifyDataRowsInOrder(result, rows("Alice"), rows("John"));
}

@Test
public void testSpathAutoExtractWithAppendcol() throws IOException {
// Regression test for #5186: appendcol subquery should resolve MAP sub-paths after spath.
// Use sort to ensure stable row ordering for appendcol row-number based join.
JSONObject result =
executeQuery(
"source=test_spath_appendcol | spath input=doc | sort doc.user.name"
+ " | appendcol [ sort doc.user.city | stats count() as cnt by doc.user.city ]"
+ " | fields doc.user.name, cnt");
verifySchema(result, schema("doc.user.name", "string"), schema("cnt", "bigint"));
verifyDataRows(result, rows("Alice", 1), rows("John", 1));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
setup:
- skip:
features:
- headers
- allowed_warnings
- do:
query.settings:
body:
transient:
plugins.calcite.enabled: true
- do:
indices.create:
index: test_issue_5186
body:
settings:
number_of_shards: 1
number_of_replicas: 0
mappings:
properties:
doc:
type: text
fields:
keyword:
type: keyword
- do:
bulk:
index: test_issue_5186
refresh: true
body:
- '{"index": {}}'
- '{"doc": "{\"user\":{\"name\":\"John\",\"age\":30,\"city\":\"NYC\"}}"}'
- '{"index": {}}'
- '{"doc": "{\"user\":{\"name\":\"Alice\",\"age\":25,\"city\":\"LA\"}}"}'

---
teardown:
- do:
query.settings:
body:
transient:
plugins.calcite.enabled: false

---
"appendcol subquery resolves MAP sub-paths after spath (#5186)":
- do:
headers:
Content-Type: application/json
ppl:
body:
query: "source=test_issue_5186 | spath input=doc | appendcol [ stats count() as cnt by doc.user.city ] | fields doc.user.name, doc.user.city, cnt"
- match: { total: 2 }
- length: { datarows: 2 }
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,33 @@ public CalcitePPLAppendcolTest() {
super(CalciteAssert.SchemaSpec.SCOTT_WITH_TEMPORAL);
}

/** Regression test for #5186: appendcol subsearch should inherit spath transformations. */
@Test
public void testAppendcolWithSpath() {
String ppl =
"source=EMP | spath input=ENAME output=result"
+ " | appendcol [ stats count() as cnt by result.user.city ]"
+ " | fields result.user.name, result.user.city, cnt";
RelNode root = getRelNode(ppl);
// The subsearch should include spath (JSON_EXTRACT_ALL) on ENAME so that
// result.user.city resolves as ITEM(JSON_EXTRACT_ALL(ENAME), 'user.city')
String expectedLogical =
"LogicalProject(result.user.name=[ITEM($8, 'user.name')],"
+ " result.user.city=[$11], cnt=[$10])\n"
+ " LogicalJoin(condition=[=($9, $12)], joinType=[full])\n"
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],"
+ " SAL=[$5], COMM=[$6], DEPTNO=[$7], result=[JSON_EXTRACT_ALL($1)],"
+ " _row_number_main_=[ROW_NUMBER() OVER ()])\n"
+ " LogicalTableScan(table=[[scott, EMP]])\n"
+ " LogicalProject(cnt=[$1], result.user.city=[$0],"
+ " _row_number_subsearch_=[ROW_NUMBER() OVER ()])\n"
+ " LogicalAggregate(group=[{0}], cnt=[COUNT()])\n"
+ " LogicalProject(result.user.city=[ITEM(JSON_EXTRACT_ALL($1),"
+ " 'user.city')])\n"
+ " LogicalTableScan(table=[[scott, EMP]])\n";
verifyLogical(root, expectedLogical);
}

@Test
public void testAppendcol() {
String ppl = "source=EMP | appendcol [ where DEPTNO = 20 ]";
Expand Down
Loading