Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -443,43 +443,18 @@ protected LogicalPlan doRule(LogicalPlan plan) {
childrenOutput.addAll(output);
}

if (plan instanceof Aggregate aggregate) {
return resolveAggregate(aggregate, childrenOutput);
}

if (plan instanceof Drop d) {
return resolveDrop(d, childrenOutput);
}

if (plan instanceof Rename r) {
return resolveRename(r, childrenOutput);
}

if (plan instanceof Keep p) {
return resolveKeep(p, childrenOutput);
}

if (plan instanceof Eval p) {
return resolveEval(p, childrenOutput);
}

if (plan instanceof Enrich p) {
return resolveEnrich(p, childrenOutput);
}

if (plan instanceof MvExpand p) {
return resolveMvExpand(p, childrenOutput);
}

if (plan instanceof Lookup l) {
return resolveLookup(l, childrenOutput);
}

if (plan instanceof LookupJoin j) {
return resolveLookupJoin(j);
}

return plan.transformExpressionsOnly(UnresolvedAttribute.class, ua -> maybeResolveAttribute(ua, childrenOutput));
return switch (plan) {
case Aggregate aggregate -> resolveAggregate(aggregate, childrenOutput);
case Drop d -> resolveDrop(d, childrenOutput);
case Rename r -> resolveRename(r, childrenOutput);
case Keep p -> resolveKeep(p, childrenOutput);
case Eval p -> resolveEval(p, childrenOutput);
case Enrich p -> resolveEnrich(p, childrenOutput);
case MvExpand p -> resolveMvExpand(p, childrenOutput);
case Lookup l -> resolveLookup(l, childrenOutput);
case LookupJoin j -> resolveLookupJoin(j);
default -> plan.transformExpressionsOnly(UnresolvedAttribute.class, ua -> maybeResolveAttribute(ua, childrenOutput));
};
}

private Aggregate resolveAggregate(Aggregate aggregate, List<Attribute> childrenOutput) {
Expand Down Expand Up @@ -1091,7 +1066,7 @@ private static Expression cast(org.elasticsearch.xpack.esql.core.expression.func
return processScalarOrGroupingFunction(f, registry);
}
if (f instanceof EsqlArithmeticOperation || f instanceof BinaryComparison) {
return processBinaryOperator((BinaryOperator) f);
return processBinaryOperator((BinaryOperator<?, ?, ?, ?>) f);
}
return f;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,57 +193,37 @@ public LocalExecutionPlan plan(FoldContext foldCtx, PhysicalPlan localPhysicalPl
}

private PhysicalOperation plan(PhysicalPlan node, LocalExecutionPlannerContext context) {
if (node instanceof AggregateExec aggregate) {
return planAggregation(aggregate, context);
} else if (node instanceof FieldExtractExec fieldExtractExec) {
return planFieldExtractNode(fieldExtractExec, context);
} else if (node instanceof ExchangeExec exchangeExec) {
return planExchange(exchangeExec, context);
} else if (node instanceof TopNExec topNExec) {
return planTopN(topNExec, context);
} else if (node instanceof EvalExec eval) {
return planEval(eval, context);
} else if (node instanceof DissectExec dissect) {
return planDissect(dissect, context);
} else if (node instanceof GrokExec grok) {
return planGrok(grok, context);
} else if (node instanceof ProjectExec project) {
return planProject(project, context);
} else if (node instanceof FilterExec filter) {
return planFilter(filter, context);
} else if (node instanceof LimitExec limit) {
return planLimit(limit, context);
} else if (node instanceof MvExpandExec mvExpand) {
return planMvExpand(mvExpand, context);
}
// source nodes
else if (node instanceof EsQueryExec esQuery) {
return planEsQueryNode(esQuery, context);
} else if (node instanceof EsStatsQueryExec statsQuery) {
return planEsStats(statsQuery, context);
} else if (node instanceof LocalSourceExec localSource) {
return planLocal(localSource, context);
} else if (node instanceof ShowExec show) {
return planShow(show);
} else if (node instanceof ExchangeSourceExec exchangeSource) {
return planExchangeSource(exchangeSource, context);
}
// lookups and joins
else if (node instanceof EnrichExec enrich) {
return planEnrich(enrich, context);
} else if (node instanceof HashJoinExec join) {
return planHashJoin(join, context);
} else if (node instanceof LookupJoinExec join) {
return planLookupJoin(join, context);
}
// output
else if (node instanceof OutputExec outputExec) {
return planOutput(outputExec, context);
} else if (node instanceof ExchangeSinkExec exchangeSink) {
return planExchangeSink(exchangeSink, context);
}

throw new EsqlIllegalArgumentException("unknown physical plan node [" + node.nodeName() + "]");
return switch (node) {
case AggregateExec aggregate -> planAggregation(aggregate, context);
case FieldExtractExec fieldExtractExec -> planFieldExtractNode(fieldExtractExec, context);
case ExchangeExec exchangeExec -> planExchange(exchangeExec, context);
case TopNExec topNExec -> planTopN(topNExec, context);
case EvalExec eval -> planEval(eval, context);
case DissectExec dissect -> planDissect(dissect, context);
case GrokExec grok -> planGrok(grok, context);
case ProjectExec project -> planProject(project, context);
case FilterExec filter -> planFilter(filter, context);
case LimitExec limit -> planLimit(limit, context);
case MvExpandExec mvExpand -> planMvExpand(mvExpand, context);

// source nodes
case EsQueryExec esQuery -> planEsQueryNode(esQuery, context);
case EsStatsQueryExec statsQuery -> planEsStats(statsQuery, context);
case LocalSourceExec localSource -> planLocal(localSource, context);
case ShowExec show -> planShow(show);
case ExchangeSourceExec exchangeSource -> planExchangeSource(exchangeSource, context);

// lookups and joins
case EnrichExec enrich -> planEnrich(enrich, context);
case HashJoinExec join -> planHashJoin(join, context);
case LookupJoinExec join -> planLookupJoin(join, context);

// output
case OutputExec outputExec -> planOutput(outputExec, context);
case ExchangeSinkExec exchangeSink -> planExchangeSink(exchangeSink, context);
default -> throw new EsqlIllegalArgumentException("unknown physical plan node [" + node.nodeName() + "]");
};

}

private PhysicalOperation planAggregation(AggregateExec aggregate, LocalExecutionPlannerContext context) {
Expand Down