Skip to content

Commit 59c00e1

Browse files
committed
Support join with dynamic fields
Signed-off-by: Tomoyuki Morita <[email protected]>
1 parent 8fe17a0 commit 59c00e1

File tree

9 files changed

+723
-82
lines changed

9 files changed

+723
-82
lines changed

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1223,6 +1223,8 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) {
12231223
context.relBuilder.peek(),
12241224
context.relBuilder.literal(context.sysLimit.joinSubsearchLimit())));
12251225
}
1226+
List<String> leftAllFields = context.fieldBuilder.getAllFieldNames(1);
1227+
List<String> rightAllFields = context.fieldBuilder.getAllFieldNames(0);
12261228
if (node.getJoinCondition().isEmpty()) {
12271229
// join-with-field-list grammar
12281230
List<String> leftColumns = context.fieldBuilder.getStaticFieldNames(1);
@@ -1254,12 +1256,12 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) {
12541256
|| (node.getArgumentMap().get("overwrite").equals(Literal.TRUE))) {
12551257
toBeRemovedFields =
12561258
duplicatedFieldNames.stream()
1257-
.map(field -> JoinAndLookupUtils.analyzeFieldsForLookUp(field, true, context))
1259+
.map(field -> JoinAndLookupUtils.analyzeFieldsInRight(field, context))
12581260
.toList();
12591261
} else {
12601262
toBeRemovedFields =
12611263
duplicatedFieldNames.stream()
1262-
.map(field -> JoinAndLookupUtils.analyzeFieldsForLookUp(field, false, context))
1264+
.map(field -> JoinAndLookupUtils.analyzeFieldsInLeft(field, context))
12631265
.toList();
12641266
}
12651267
Literal max = node.getArgumentMap().get("max");
@@ -1284,13 +1286,16 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) {
12841286
if (!toBeRemovedFields.isEmpty()) {
12851287
context.relBuilder.projectExcept(toBeRemovedFields);
12861288
}
1289+
context.fieldBuilder.reorganizeDynamicFields(leftAllFields, rightAllFields);
1290+
12871291
return context.relBuilder.peek();
12881292
}
12891293
// The join-with-criteria grammar doesn't allow empty join condition
12901294
RexNode joinCondition =
12911295
node.getJoinCondition()
12921296
.map(c -> rexVisitor.analyzeJoinCondition(c, context))
12931297
.orElse(context.relBuilder.literal(true));
1298+
JoinAndLookupUtils.verifyJoinConditionNotUseAnyType(joinCondition, context);
12941299
if (node.getJoinType() == SEMI || node.getJoinType() == ANTI) {
12951300
// semi and anti join only return left table outputs
12961301
context.relBuilder.join(
@@ -1301,7 +1306,7 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) {
13011306
// when a new project add to stack. To avoid `id0`, we will rename the `id0` to `alias.id`
13021307
// or `tableIdentifier.id`:
13031308
List<String> leftColumns = context.fieldBuilder.getStaticFieldNames(1);
1304-
List<String> rightColumns = context.fieldBuilder.getStaticFieldNames();
1309+
List<String> rightColumns = context.fieldBuilder.getStaticFieldNames(0);
13051310
List<String> rightTableName =
13061311
PlanUtils.findTable(context.relBuilder.peek()).getQualifiedName();
13071312
// Using `table.column` instead of `catalog.database.table.column` as column prefix because
@@ -1336,6 +1341,8 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) {
13361341
}
13371342
context.relBuilder.join(
13381343
JoinAndLookupUtils.translateJoinType(node.getJoinType()), joinCondition);
1344+
1345+
context.fieldBuilder.reorganizeDynamicFields(leftAllFields, rightAllFields);
13391346
JoinAndLookupUtils.renameToExpectedFields(
13401347
rightColumnsWithAliasIfConflict, leftColumns.size(), context);
13411348
}
@@ -1368,8 +1375,13 @@ public Void visitInputRef(RexInputRef inputRef) {
13681375

13691376
private static RexNode buildJoinConditionByFieldName(
13701377
CalcitePlanContext context, String fieldName) {
1371-
RexNode lookupKey = JoinAndLookupUtils.analyzeFieldsForLookUp(fieldName, false, context);
1372-
RexNode sourceKey = JoinAndLookupUtils.analyzeFieldsForLookUp(fieldName, true, context);
1378+
RexNode lookupKey = JoinAndLookupUtils.analyzeFieldsInRight(fieldName, context);
1379+
RexNode sourceKey = JoinAndLookupUtils.analyzeFieldsInLeft(fieldName, context);
1380+
if (context.fieldBuilder.isAnyType(sourceKey)) {
1381+
throw new IllegalArgumentException(
1382+
String.format(
1383+
"Source key `%s` needs to be specific type. Please cast explicitly.", fieldName));
1384+
}
13731385
return context.rexBuilder.equals(sourceKey, lookupKey);
13741386
}
13751387

@@ -1396,6 +1408,10 @@ public RelNode visitLookup(Lookup node, CalcitePlanContext context) {
13961408
// Get lookupColumns from top of stack (after above potential projection).
13971409
List<String> lookupTableFieldNames = context.fieldBuilder.getStaticFieldNames();
13981410

1411+
// For merging with dynamic fields later
1412+
List<String> leftAllFields = context.fieldBuilder.getAllFieldNames(1);
1413+
List<String> rightAllFields = context.fieldBuilder.getAllFieldNames(0);
1414+
13991415
// 3. Find fields which should be removed in lookup-table.
14001416
// For lookup table, the mapping fields should be dropped after join
14011417
// unless they are explicitly put in the output fields
@@ -1409,6 +1425,7 @@ public RelNode visitLookup(Lookup node, CalcitePlanContext context) {
14091425
.toList();
14101426
List<RexNode> toBeRemovedLookupFields =
14111427
toBeRemovedLookupFieldNames.stream()
1428+
.filter(d -> lookupTableFieldNames.contains(d))
14121429
.map(d -> (RexNode) context.fieldBuilder.staticField(2, 1, d))
14131430
.toList();
14141431
List<RexNode> toBeRemovedFields = new ArrayList<>(toBeRemovedLookupFields);
@@ -1420,7 +1437,7 @@ public RelNode visitLookup(Lookup node, CalcitePlanContext context) {
14201437

14211438
List<RexNode> duplicatedSourceFields =
14221439
duplicatedFieldNamesMap.keySet().stream()
1423-
.map(field -> JoinAndLookupUtils.analyzeFieldsForLookUp(field, true, context))
1440+
.map(field -> JoinAndLookupUtils.analyzeFieldsInLeft(field, context))
14241441
.toList();
14251442
// Duplicated fields in source-field should always be removed.
14261443
toBeRemovedFields.addAll(duplicatedSourceFields);
@@ -1432,7 +1449,7 @@ public RelNode visitLookup(Lookup node, CalcitePlanContext context) {
14321449
if (!duplicatedFieldNamesMap.isEmpty() && node.getOutputStrategy() == OutputStrategy.APPEND) {
14331450
List<RexNode> duplicatedProvidedFields =
14341451
duplicatedFieldNamesMap.values().stream()
1435-
.map(field -> JoinAndLookupUtils.analyzeFieldsForLookUp(field, false, context))
1452+
.map(field -> JoinAndLookupUtils.analyzeFieldsInRight(field, context))
14361453
.toList();
14371454
for (int i = 0; i < duplicatedProvidedFields.size(); ++i) {
14381455
newCoalesceList.add(
@@ -1469,7 +1486,7 @@ public RelNode visitLookup(Lookup node, CalcitePlanContext context) {
14691486
context.relBuilder.projectExcept(toBeRemovedFields);
14701487
}
14711488

1472-
// TODO: dedupe dynamic fields
1489+
context.fieldBuilder.reorganizeDynamicFields(leftAllFields, rightAllFields);
14731490

14741491
// 7. Rename the fields to the expected names.
14751492
JoinAndLookupUtils.renameToExpectedFields(

core/src/main/java/org/opensearch/sql/calcite/plan/PPLAggregateConvertRule.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.calcite.rel.core.Project;
2222
import org.apache.calcite.rel.logical.LogicalAggregate;
2323
import org.apache.calcite.rel.logical.LogicalProject;
24-
import org.apache.calcite.rex.RexBuilder;
2524
import org.apache.calcite.rex.RexCall;
2625
import org.apache.calcite.rex.RexInputRef;
2726
import org.apache.calcite.rex.RexLiteral;
@@ -34,6 +33,7 @@
3433
import org.apache.calcite.util.mapping.Mappings;
3534
import org.apache.commons.lang3.tuple.Pair;
3635
import org.immutables.value.Value;
36+
import org.opensearch.sql.calcite.ExtendedRexBuilder;
3737
import org.opensearch.sql.calcite.rel.RelBuilderWrapper;
3838
import org.opensearch.sql.calcite.rel.RelFieldBuilder;
3939

@@ -78,7 +78,8 @@ public void apply(RelOptRuleCall call, LogicalAggregate aggregate, LogicalProjec
7878

7979
final RelBuilder rawRelBuilder = call.builder();
8080
final RelBuilderWrapper relBuilder = new RelBuilderWrapper(rawRelBuilder);
81-
final RexBuilder rexBuilder = aggregate.getCluster().getRexBuilder();
81+
final ExtendedRexBuilder rexBuilder =
82+
(ExtendedRexBuilder) aggregate.getCluster().getRexBuilder();
8283
final RelFieldBuilder fieldBuilder = new RelFieldBuilder(rawRelBuilder, rexBuilder);
8384
relBuilder.push(project.getInput());
8485

core/src/main/java/org/opensearch/sql/calcite/rel/QualifiedNameResolver.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,29 @@ public static Optional<RexNode> resolveField(
3838
if (inputFieldNames.contains(fieldName)) {
3939
return Optional.of(context.fieldBuilder.staticField(inputCount, inputOrdinal, fieldName));
4040
} else if (context.fieldBuilder.isDynamicFieldsExist()) {
41-
return Optional.of(context.fieldBuilder.dynamicField(fieldName));
41+
return Optional.of(context.fieldBuilder.dynamicField(inputCount, inputOrdinal, fieldName));
4242
}
4343
return Optional.empty();
4444
}
4545

46+
/** Resolve field in the top of the stack */
47+
public static Optional<RexNode> resolveField(String fieldName, CalcitePlanContext context) {
48+
return resolveField(1, 0, fieldName, context);
49+
}
50+
51+
/** Resolve field in the specified input. Throw exception if not found. */
4652
public static RexNode resolveFieldOrThrow(
4753
int inputCount, int inputOrdinal, String fieldName, CalcitePlanContext context) {
4854
return resolveField(inputCount, inputOrdinal, fieldName, context)
4955
.orElseThrow(
5056
() -> new IllegalArgumentException(String.format("Field [%s] not found.", fieldName)));
5157
}
5258

59+
/** Resolve field in the top of the stack. Throw exception if not found. */
60+
public static RexNode resolveFieldOrThrow(String fieldName, CalcitePlanContext context) {
61+
return resolveFieldOrThrow(1, 0, fieldName, context);
62+
}
63+
5364
/**
5465
* Resolves a qualified name to a RexNode based on the current context.
5566
*
@@ -78,6 +89,8 @@ private static RexNode resolveInJoinCondition(
7889

7990
return resolveFieldWithAlias(nameNode, context, 2)
8091
.or(() -> resolveFieldWithoutAlias(nameNode, context, 2))
92+
.or(() -> resolveDynamicFieldsWithAlias(nameNode, context, 2))
93+
.or(() -> resolveDynamicFields(nameNode, context, 2))
8194
.orElseThrow(() -> getNotFoundException(nameNode));
8295
}
8396

@@ -139,6 +152,28 @@ private static Optional<RexNode> resolveFieldWithAlias(
139152
return Optional.empty();
140153
}
141154

155+
private static Optional<RexNode> resolveDynamicFieldsWithAlias(
156+
QualifiedName nameNode, CalcitePlanContext context, int inputCount) {
157+
List<String> parts = nameNode.getParts();
158+
log.debug(
159+
"resolveDynamicFieldsWithAlias() called with nameNode={}, parts={}, inputCount={}",
160+
nameNode,
161+
parts,
162+
inputCount);
163+
164+
if (parts.size() >= 2) {
165+
// Consider first part as table alias
166+
String alias = parts.get(0);
167+
168+
String fieldName = String.join(".", parts.subList(1, parts.size()));
169+
Optional<RexNode> dynamicField =
170+
tryToResolveField(alias, DYNAMIC_FIELDS_MAP, context, inputCount);
171+
return dynamicField.map(field -> createItemAccess(field, fieldName, context));
172+
}
173+
174+
return Optional.empty();
175+
}
176+
142177
private static Optional<RexNode> resolveDynamicFields(
143178
QualifiedName nameNode, CalcitePlanContext context, int inputCount) {
144179
List<String> parts = nameNode.getParts();

0 commit comments

Comments
 (0)