Skip to content

Commit 062e6a4

Browse files
gustavodemoraisdawidwys
authored andcommitted
[FLINK-38211][table] Update explain for MultiJoin node
1 parent d567a9d commit 062e6a4

File tree

1 file changed

+111
-9
lines changed

1 file changed

+111
-9
lines changed

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMultiJoin.java

Lines changed: 111 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
2828
import org.apache.flink.table.planner.plan.trait.FlinkRelDistributionTraitDef;
2929
import org.apache.flink.table.planner.plan.utils.RelExplainUtil;
30+
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
3031
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
3132
import org.apache.flink.table.runtime.operators.join.stream.keyselector.AttributeBasedJoinKeyExtractor.ConditionAttributeRef;
3233
import org.apache.flink.table.runtime.operators.join.stream.keyselector.JoinKeyExtractor;
@@ -52,6 +53,8 @@
5253
import java.util.Collections;
5354
import java.util.List;
5455
import java.util.Map;
56+
import java.util.Objects;
57+
import java.util.Optional;
5558
import java.util.Set;
5659
import java.util.stream.Collectors;
5760

@@ -161,13 +164,25 @@ public RelWriter explainTerms(final RelWriter pw) {
161164
for (final Ord<RelNode> ord : Ord.zip(inputs)) {
162165
pw.input("input#" + ord.i, ord.e);
163166
}
164-
return pw.item("joinFilter", joinFilter)
165-
.item("joinTypes", joinTypes)
166-
.item("joinConditions", joinConditions)
167-
.item("joinAttributeMap", joinAttributeMap)
168-
.itemIf("postJoinFilter", postJoinFilter, postJoinFilter != null)
167+
168+
return pw.item("commonJoinKey", getCommonJoinKeyFieldNames())
169+
.item(
170+
"joinTypes",
171+
joinTypes.stream()
172+
.map(JoinRelType::toString)
173+
.collect(Collectors.joining(", ")))
174+
.item("inputUniqueKeys", formatInputUniqueKeysWithFieldNames())
175+
.item("joinConditions", formatJoinConditionsWithFieldNames(pw))
176+
.itemIf(
177+
"joinFilter",
178+
formatExpressionWithFieldNames(joinFilter, pw),
179+
joinFilter != null)
180+
.itemIf(
181+
"postJoinFilter",
182+
formatExpressionWithFieldNames(postJoinFilter, pw),
183+
postJoinFilter != null)
169184
.item("select", String.join(",", getRowType().getFieldNames()))
170-
.item("rowType", getRowType())
185+
.item("outputRowType", getRowType())
171186
.itemIf("stateTtlHints", RelExplainUtil.hintsToString(hints), !hints.isEmpty());
172187
}
173188

@@ -216,7 +231,7 @@ private RexNode createMultiJoinCondition() {
216231

217232
public List<List<int[]>> getUniqueKeysForInputs() {
218233
if (inputUniqueKeys == null) {
219-
final List<List<int[]>> computed =
234+
inputUniqueKeys =
220235
inputs.stream()
221236
.map(
222237
input -> {
@@ -231,8 +246,7 @@ public List<List<int[]>> getUniqueKeysForInputs() {
231246
.map(ImmutableBitSet::toArray)
232247
.collect(Collectors.toList());
233248
})
234-
.collect(Collectors.toList());
235-
inputUniqueKeys = Collections.unmodifiableList(computed);
249+
.collect(Collectors.toUnmodifiableList());
236250
}
237251
return inputUniqueKeys;
238252
}
@@ -274,6 +288,94 @@ public List<JoinRelType> getJoinTypes() {
274288
return joinTypes;
275289
}
276290

291+
/**
292+
* Returns the common join key field names as a comma-separated string. Uses the field names
293+
* from the first input to map the common join key indices.
294+
*
295+
* @return comma-separated string of common join key field names, or empty string if no common
296+
* join key
297+
*/
298+
private String getCommonJoinKeyFieldNames() {
299+
final int[] commonJoinKeyIndices = keyExtractor.getCommonJoinKeyIndices(0);
300+
final RelNode firstInput = inputs.get(0);
301+
final List<String> fieldNames = firstInput.getRowType().getFieldNames();
302+
final List<String> commonJoinKey = new ArrayList<>();
303+
304+
for (final int index : commonJoinKeyIndices) {
305+
if (index < fieldNames.size()) {
306+
commonJoinKey.add(fieldNames.get(index));
307+
}
308+
}
309+
310+
if (commonJoinKey.isEmpty()) {
311+
return "noCommonJoinKey";
312+
}
313+
314+
return String.join(", ", commonJoinKey);
315+
}
316+
317+
/**
318+
* Formats a RexNode expression with field names for better readability in explain output.
319+
*
320+
* @param expression the expression to format
321+
* @param pw the RelWriter for determining format preferences
322+
* @return formatted expression string with field names
323+
*/
324+
private String formatExpressionWithFieldNames(final RexNode expression, final RelWriter pw) {
325+
if (expression == null) {
326+
return "";
327+
}
328+
329+
return getExpressionString(
330+
expression,
331+
JavaScalaConversionUtil.toScala(getRowType().getFieldNames()).toList(),
332+
JavaScalaConversionUtil.toScala(Optional.empty()),
333+
RelExplainUtil.preferExpressionFormat(pw),
334+
RelExplainUtil.preferExpressionDetail(pw));
335+
}
336+
337+
/**
338+
* Formats join conditions with field names for better readability in explain output.
339+
*
340+
* @param pw the RelWriter for determining format preferences
341+
* @return formatted join conditions string with field names
342+
*/
343+
private String formatJoinConditionsWithFieldNames(final RelWriter pw) {
344+
return joinConditions.stream()
345+
.filter(Objects::nonNull)
346+
.map(condition -> formatExpressionWithFieldNames(condition, pw))
347+
.collect(Collectors.joining(", "));
348+
}
349+
350+
private String formatInputUniqueKeysWithFieldNames() {
351+
final List<String> inputUniqueKeyStrings = new ArrayList<>();
352+
for (final RelNode input : inputs) {
353+
final Set<ImmutableBitSet> uniqueKeys = getUniqueKeys(input);
354+
355+
if (uniqueKeys != null && !uniqueKeys.isEmpty()) {
356+
final List<String> fieldNames = input.getRowType().getFieldNames();
357+
final List<String> uniqueKeyStrings = new ArrayList<>();
358+
for (final ImmutableBitSet uniqueKey : uniqueKeys) {
359+
final List<String> keyFieldNames = new ArrayList<>();
360+
for (final int index : uniqueKey.toArray()) {
361+
if (index < fieldNames.size()) {
362+
keyFieldNames.add(fieldNames.get(index));
363+
}
364+
}
365+
if (!keyFieldNames.isEmpty()) {
366+
uniqueKeyStrings.add("(" + String.join(", ", keyFieldNames) + ")");
367+
}
368+
}
369+
370+
inputUniqueKeyStrings.add(String.join(", ", uniqueKeyStrings));
371+
} else {
372+
inputUniqueKeyStrings.add("noUniqueKey");
373+
}
374+
}
375+
376+
return String.join(", ", inputUniqueKeyStrings);
377+
}
378+
277379
/**
278380
* This is mainly used in `FlinkChangelogModeInferenceProgram.SatisfyUpdateKindTraitVisitor`. If
279381
* the unique key of input is a superset of the common join key, then we can ignore

0 commit comments

Comments
 (0)