Skip to content

Commit bff6aa5

Browse files
committed
Support join with dynamic fields
Signed-off-by: Tomoyuki Morita <[email protected]>
1 parent 7495a39 commit bff6aa5

File tree

9 files changed

+723
-82
lines changed

9 files changed

+723
-82
lines changed

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1224,6 +1224,8 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) {
12241224
context.relBuilder.peek(),
12251225
context.relBuilder.literal(context.sysLimit.joinSubsearchLimit())));
12261226
}
1227+
List<String> leftAllFields = context.fieldBuilder.getAllFieldNames(1);
1228+
List<String> rightAllFields = context.fieldBuilder.getAllFieldNames(0);
12271229
if (node.getJoinCondition().isEmpty()) {
12281230
// join-with-field-list grammar
12291231
List<String> leftColumns = context.fieldBuilder.getStaticFieldNames(1);
@@ -1255,12 +1257,12 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) {
12551257
|| (node.getArgumentMap().get("overwrite").equals(Literal.TRUE))) {
12561258
toBeRemovedFields =
12571259
duplicatedFieldNames.stream()
1258-
.map(field -> JoinAndLookupUtils.analyzeFieldsForLookUp(field, true, context))
1260+
.map(field -> JoinAndLookupUtils.analyzeFieldsInRight(field, context))
12591261
.toList();
12601262
} else {
12611263
toBeRemovedFields =
12621264
duplicatedFieldNames.stream()
1263-
.map(field -> JoinAndLookupUtils.analyzeFieldsForLookUp(field, false, context))
1265+
.map(field -> JoinAndLookupUtils.analyzeFieldsInLeft(field, context))
12641266
.toList();
12651267
}
12661268
Literal max = node.getArgumentMap().get("max");
@@ -1285,13 +1287,16 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) {
12851287
if (!toBeRemovedFields.isEmpty()) {
12861288
context.relBuilder.projectExcept(toBeRemovedFields);
12871289
}
1290+
context.fieldBuilder.reorganizeDynamicFields(leftAllFields, rightAllFields);
1291+
12881292
return context.relBuilder.peek();
12891293
}
12901294
// The join-with-criteria grammar doesn't allow empty join condition
12911295
RexNode joinCondition =
12921296
node.getJoinCondition()
12931297
.map(c -> rexVisitor.analyzeJoinCondition(c, context))
12941298
.orElse(context.relBuilder.literal(true));
1299+
JoinAndLookupUtils.verifyJoinConditionNotUseAnyType(joinCondition, context);
12951300
if (node.getJoinType() == SEMI || node.getJoinType() == ANTI) {
12961301
// semi and anti join only return left table outputs
12971302
context.relBuilder.join(
@@ -1302,7 +1307,7 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) {
13021307
// when a new project add to stack. To avoid `id0`, we will rename the `id0` to `alias.id`
13031308
// or `tableIdentifier.id`:
13041309
List<String> leftColumns = context.fieldBuilder.getStaticFieldNames(1);
1305-
List<String> rightColumns = context.fieldBuilder.getStaticFieldNames();
1310+
List<String> rightColumns = context.fieldBuilder.getStaticFieldNames(0);
13061311
List<String> rightTableName =
13071312
PlanUtils.findTable(context.relBuilder.peek()).getQualifiedName();
13081313
// Using `table.column` instead of `catalog.database.table.column` as column prefix because
@@ -1337,6 +1342,8 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) {
13371342
}
13381343
context.relBuilder.join(
13391344
JoinAndLookupUtils.translateJoinType(node.getJoinType()), joinCondition);
1345+
1346+
context.fieldBuilder.reorganizeDynamicFields(leftAllFields, rightAllFields);
13401347
JoinAndLookupUtils.renameToExpectedFields(
13411348
rightColumnsWithAliasIfConflict, leftColumns.size(), context);
13421349
}
@@ -1369,8 +1376,13 @@ public Void visitInputRef(RexInputRef inputRef) {
13691376

13701377
private static RexNode buildJoinConditionByFieldName(
13711378
CalcitePlanContext context, String fieldName) {
1372-
RexNode lookupKey = JoinAndLookupUtils.analyzeFieldsForLookUp(fieldName, false, context);
1373-
RexNode sourceKey = JoinAndLookupUtils.analyzeFieldsForLookUp(fieldName, true, context);
1379+
RexNode lookupKey = JoinAndLookupUtils.analyzeFieldsInRight(fieldName, context);
1380+
RexNode sourceKey = JoinAndLookupUtils.analyzeFieldsInLeft(fieldName, context);
1381+
if (context.fieldBuilder.isAnyType(sourceKey)) {
1382+
throw new IllegalArgumentException(
1383+
String.format(
1384+
"Source key `%s` needs to be specific type. Please cast explicitly.", fieldName));
1385+
}
13741386
return context.rexBuilder.equals(sourceKey, lookupKey);
13751387
}
13761388

@@ -1397,6 +1409,10 @@ public RelNode visitLookup(Lookup node, CalcitePlanContext context) {
13971409
// Get lookupColumns from top of stack (after above potential projection).
13981410
List<String> lookupTableFieldNames = context.fieldBuilder.getStaticFieldNames();
13991411

1412+
// For merging with dynamic fields later
1413+
List<String> leftAllFields = context.fieldBuilder.getAllFieldNames(1);
1414+
List<String> rightAllFields = context.fieldBuilder.getAllFieldNames(0);
1415+
14001416
// 3. Find fields which should be removed in lookup-table.
14011417
// For lookup table, the mapping fields should be dropped after join
14021418
// unless they are explicitly put in the output fields
@@ -1410,6 +1426,7 @@ public RelNode visitLookup(Lookup node, CalcitePlanContext context) {
14101426
.toList();
14111427
List<RexNode> toBeRemovedLookupFields =
14121428
toBeRemovedLookupFieldNames.stream()
1429+
.filter(d -> lookupTableFieldNames.contains(d))
14131430
.map(d -> (RexNode) context.fieldBuilder.staticField(2, 1, d))
14141431
.toList();
14151432
List<RexNode> toBeRemovedFields = new ArrayList<>(toBeRemovedLookupFields);
@@ -1421,7 +1438,7 @@ public RelNode visitLookup(Lookup node, CalcitePlanContext context) {
14211438

14221439
List<RexNode> duplicatedSourceFields =
14231440
duplicatedFieldNamesMap.keySet().stream()
1424-
.map(field -> JoinAndLookupUtils.analyzeFieldsForLookUp(field, true, context))
1441+
.map(field -> JoinAndLookupUtils.analyzeFieldsInLeft(field, context))
14251442
.toList();
14261443
// Duplicated fields in source-field should always be removed.
14271444
toBeRemovedFields.addAll(duplicatedSourceFields);
@@ -1433,7 +1450,7 @@ public RelNode visitLookup(Lookup node, CalcitePlanContext context) {
14331450
if (!duplicatedFieldNamesMap.isEmpty() && node.getOutputStrategy() == OutputStrategy.APPEND) {
14341451
List<RexNode> duplicatedProvidedFields =
14351452
duplicatedFieldNamesMap.values().stream()
1436-
.map(field -> JoinAndLookupUtils.analyzeFieldsForLookUp(field, false, context))
1453+
.map(field -> JoinAndLookupUtils.analyzeFieldsInRight(field, context))
14371454
.toList();
14381455
for (int i = 0; i < duplicatedProvidedFields.size(); ++i) {
14391456
newCoalesceList.add(
@@ -1470,7 +1487,7 @@ public RelNode visitLookup(Lookup node, CalcitePlanContext context) {
14701487
context.relBuilder.projectExcept(toBeRemovedFields);
14711488
}
14721489

1473-
// TODO: dedupe dynamic fields
1490+
context.fieldBuilder.reorganizeDynamicFields(leftAllFields, rightAllFields);
14741491

14751492
// 7. Rename the fields to the expected names.
14761493
JoinAndLookupUtils.renameToExpectedFields(

core/src/main/java/org/opensearch/sql/calcite/plan/PPLAggregateConvertRule.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.calcite.rel.core.Project;
2222
import org.apache.calcite.rel.logical.LogicalAggregate;
2323
import org.apache.calcite.rel.logical.LogicalProject;
24-
import org.apache.calcite.rex.RexBuilder;
2524
import org.apache.calcite.rex.RexCall;
2625
import org.apache.calcite.rex.RexInputRef;
2726
import org.apache.calcite.rex.RexLiteral;
@@ -34,6 +33,7 @@
3433
import org.apache.calcite.util.mapping.Mappings;
3534
import org.apache.commons.lang3.tuple.Pair;
3635
import org.immutables.value.Value;
36+
import org.opensearch.sql.calcite.ExtendedRexBuilder;
3737
import org.opensearch.sql.calcite.rel.RelBuilderWrapper;
3838
import org.opensearch.sql.calcite.rel.RelFieldBuilder;
3939

@@ -78,7 +78,8 @@ public void apply(RelOptRuleCall call, LogicalAggregate aggregate, LogicalProjec
7878

7979
final RelBuilder rawRelBuilder = call.builder();
8080
final RelBuilderWrapper relBuilder = new RelBuilderWrapper(rawRelBuilder);
81-
final RexBuilder rexBuilder = aggregate.getCluster().getRexBuilder();
81+
final ExtendedRexBuilder rexBuilder =
82+
(ExtendedRexBuilder) aggregate.getCluster().getRexBuilder();
8283
final RelFieldBuilder fieldBuilder = new RelFieldBuilder(rawRelBuilder, rexBuilder);
8384
relBuilder.push(project.getInput());
8485

core/src/main/java/org/opensearch/sql/calcite/rel/QualifiedNameResolver.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,29 @@ public static Optional<RexNode> resolveField(
3838
if (inputFieldNames.contains(fieldName)) {
3939
return Optional.of(context.fieldBuilder.staticField(inputCount, inputOrdinal, fieldName));
4040
} else if (context.fieldBuilder.isDynamicFieldsExist()) {
41-
return Optional.of(context.fieldBuilder.dynamicField(fieldName));
41+
return Optional.of(context.fieldBuilder.dynamicField(inputCount, inputOrdinal, fieldName));
4242
}
4343
return Optional.empty();
4444
}
4545

46+
/** Resolve field in the top of the stack */
47+
public static Optional<RexNode> resolveField(String fieldName, CalcitePlanContext context) {
48+
return resolveField(1, 0, fieldName, context);
49+
}
50+
51+
/** Resolve field in the specified input. Throw exception if not found. */
4652
public static RexNode resolveFieldOrThrow(
4753
int inputCount, int inputOrdinal, String fieldName, CalcitePlanContext context) {
4854
return resolveField(inputCount, inputOrdinal, fieldName, context)
4955
.orElseThrow(
5056
() -> new IllegalArgumentException(String.format("Field [%s] not found.", fieldName)));
5157
}
5258

59+
/** Resolve field in the top of the stack. Throw exception if not found. */
60+
public static RexNode resolveFieldOrThrow(String fieldName, CalcitePlanContext context) {
61+
return resolveFieldOrThrow(1, 0, fieldName, context);
62+
}
63+
5364
/**
5465
* Resolves a qualified name to a RexNode based on the current context.
5566
*
@@ -78,6 +89,8 @@ private static RexNode resolveInJoinCondition(
7889

7990
return resolveFieldWithAlias(nameNode, context, 2)
8091
.or(() -> resolveFieldWithoutAlias(nameNode, context, 2))
92+
.or(() -> resolveDynamicFieldsWithAlias(nameNode, context, 2))
93+
.or(() -> resolveDynamicFields(nameNode, context, 2))
8194
.orElseThrow(() -> getNotFoundException(nameNode));
8295
}
8396

@@ -139,6 +152,28 @@ private static Optional<RexNode> resolveFieldWithAlias(
139152
return Optional.empty();
140153
}
141154

155+
private static Optional<RexNode> resolveDynamicFieldsWithAlias(
156+
QualifiedName nameNode, CalcitePlanContext context, int inputCount) {
157+
List<String> parts = nameNode.getParts();
158+
log.debug(
159+
"resolveDynamicFieldsWithAlias() called with nameNode={}, parts={}, inputCount={}",
160+
nameNode,
161+
parts,
162+
inputCount);
163+
164+
if (parts.size() >= 2) {
165+
// Consider first part as table alias
166+
String alias = parts.get(0);
167+
168+
String fieldName = String.join(".", parts.subList(1, parts.size()));
169+
Optional<RexNode> dynamicField =
170+
tryToResolveField(alias, DYNAMIC_FIELDS_MAP, context, inputCount);
171+
return dynamicField.map(field -> createItemAccess(field, fieldName, context));
172+
}
173+
174+
return Optional.empty();
175+
}
176+
142177
private static Optional<RexNode> resolveDynamicFields(
143178
QualifiedName nameNode, CalcitePlanContext context, int inputCount) {
144179
List<String> parts = nameNode.getParts();

0 commit comments

Comments
 (0)