Skip to content

Commit 633db9d

Browse files
authored
feat: Enhance connector optimizer to work on plan with multiple connectors (#26246)
## Description Previously, connector optimizers in Presto were limited to optimizing subplans containing tables from only a single connector. For example, a Hive connector optimizer could only process subplans that exclusively contained Hive table scans, preventing optimization opportunities for federated queries that span multiple data sources. This PR extends the connector optimization framework to support cross-connector optimizations for federated queries. A new method `getSupportedConnectorIds()` has been added to the `ConnectorPlanOptimizer` interface that allows optimizers to declare which combinations of connectors they can optimize together. * When `getSupportedConnectorIds()` returns an empty list, the optimizer maintains the existing behavior - only processing subplans from the connector it's registered with. * When `getSupportedConnectorIds()` returns a non-empty list, the optimizer will be applied to subplans that contain table scans from **exactly** the specified set of connectors. For example, if an optimizer specifies `[hive, thrift]`, it will: process subplans with both Hive and Thrift tables, not subplans with only Hive tables, nor subplans with Hive, Thrift, and MySQL tables ## Motivation and Context To have connector optimizer to work for sub plans with multiple connectors involved. ## Impact To have connector optimizer to work for sub plans with multiple connectors involved. ## Test Plan Unit tests ## Contributor checklist - [ ] Please make sure your submission complies with our [contributing guide](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md), in particular [code style](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#code-style) and [commit standards](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#commit-standards). - [ ] PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced. - [ ] Documented new properties (with its default value), SQL syntax, functions, or other functionality. - [ ] If release notes are required, they follow the [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines). - [ ] Adequate tests were added if applicable. - [ ] CI passed. ## Release Notes Please follow [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines) and fill in the release notes below. ``` == RELEASE NOTES == General Changes * Add new feature to connector optimizer so that it can work for sub plans with multiple connectors ```
1 parent f81a14c commit 633db9d

File tree

3 files changed

+435
-61
lines changed

3 files changed

+435
-61
lines changed

presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/ApplyConnectorOptimization.java

Lines changed: 81 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import java.util.HashMap;
5555
import java.util.HashSet;
5656
import java.util.LinkedList;
57+
import java.util.List;
5758
import java.util.Map;
5859
import java.util.Optional;
5960
import java.util.Queue;
@@ -133,77 +134,97 @@ public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider
133134
// In order to preserve the fixpoint, we will "pretend" the newly added C2 table scan is part of C1's job to maintain.
134135
for (ConnectorId connectorId : connectorIds.build()) {
135136
Set<ConnectorPlanOptimizer> optimizers = connectorOptimizers.get(connectorId);
136-
if (optimizers == null) {
137+
if (optimizers == null || optimizers.isEmpty()) {
137138
continue;
138139
}
139140

141+
ImmutableMap.Builder<List<ConnectorId>, Set<ConnectorPlanOptimizer>> optimizersWithConnectorRange = ImmutableMap.builder();
142+
List<ConnectorId> currentConnectors = null;
143+
ImmutableSet.Builder<ConnectorPlanOptimizer> currentGroup = null;
144+
for (ConnectorPlanOptimizer optimizer : optimizers) {
145+
List<ConnectorId> supportedConnectors = optimizer.getSupportedConnectorIds().isEmpty()
146+
? ImmutableList.of(connectorId)
147+
: optimizer.getSupportedConnectorIds();
148+
149+
if (!supportedConnectors.equals(currentConnectors)) {
150+
if (currentGroup != null) {
151+
optimizersWithConnectorRange.put(currentConnectors, currentGroup.build());
152+
}
153+
currentConnectors = supportedConnectors;
154+
currentGroup = ImmutableSet.builder();
155+
}
156+
currentGroup.add(optimizer);
157+
}
158+
optimizersWithConnectorRange.put(currentConnectors, currentGroup.build());
159+
140160
ImmutableMap.Builder<PlanNode, ConnectorPlanNodeContext> contextMapBuilder = ImmutableMap.builder();
141161
buildConnectorPlanNodeContext(plan, null, contextMapBuilder);
142162
Map<PlanNode, ConnectorPlanNodeContext> contextMap = contextMapBuilder.build();
163+
for (Map.Entry<List<ConnectorId>, Set<ConnectorPlanOptimizer>> entry : optimizersWithConnectorRange.build().entrySet()) {
164+
// keep track of changed nodes; the keys are original nodes and the values are the new nodes
165+
Map<PlanNode, PlanNode> updates = new HashMap<>();
166+
167+
// process connector optimizers
168+
for (PlanNode node : contextMap.keySet()) {
169+
// For a subtree with root `node` to be a max closure, the following conditions must hold:
170+
// * The subtree with root `node` is a closure.
171+
// * `node` has no parent, or the subtree with root as `node`'s parent is not a closure.
172+
ConnectorPlanNodeContext context = contextMap.get(node);
173+
if (!context.isClosure(connectorId, session, entry.getKey()) ||
174+
!context.getParent().isPresent() ||
175+
contextMap.get(context.getParent().get()).isClosure(connectorId, session, entry.getKey())) {
176+
continue;
177+
}
143178

144-
// keep track of changed nodes; the keys are original nodes and the values are the new nodes
145-
Map<PlanNode, PlanNode> updates = new HashMap<>();
146-
147-
// process connector optimizers
148-
for (PlanNode node : contextMap.keySet()) {
149-
// For a subtree with root `node` to be a max closure, the following conditions must hold:
150-
// * The subtree with root `node` is a closure.
151-
// * `node` has no parent, or the subtree with root as `node`'s parent is not a closure.
152-
ConnectorPlanNodeContext context = contextMap.get(node);
153-
if (!context.isClosure(connectorId, session) ||
154-
!context.getParent().isPresent() ||
155-
contextMap.get(context.getParent().get()).isClosure(connectorId, session)) {
156-
continue;
157-
}
158-
159-
PlanNode newNode = node;
179+
PlanNode newNode = node;
160180

161-
// the returned node is still a max closure (only if there is no new connector added, which does happen but ignored here)
162-
for (ConnectorPlanOptimizer optimizer : optimizers) {
163-
long start = System.nanoTime();
164-
newNode = optimizer.optimize(newNode, session.toConnectorSession(connectorId), variableAllocator, idAllocator);
165-
if (enableVerboseRuntimeStats || trackOptimizerRuntime(session, optimizer)) {
166-
session.getRuntimeStats().addMetricValue(String.format("optimizer%sTimeNanos", getOptimizerNameForLog(optimizer)), NANO, System.nanoTime() - start);
181+
// the returned node is still a max closure (only if there is no new connector added, which does happen but ignored here)
182+
for (ConnectorPlanOptimizer optimizer : entry.getValue()) {
183+
long start = System.nanoTime();
184+
newNode = optimizer.optimize(newNode, session.toConnectorSession(connectorId), variableAllocator, idAllocator);
185+
if (enableVerboseRuntimeStats || trackOptimizerRuntime(session, optimizer)) {
186+
session.getRuntimeStats().addMetricValue(String.format("optimizer%sTimeNanos", getOptimizerNameForLog(optimizer)), NANO, System.nanoTime() - start);
187+
}
167188
}
168-
}
169189

170-
if (node != newNode) {
171-
// the optimizer has allocated a new PlanNode
172-
checkState(
173-
containsAll(ImmutableSet.copyOf(newNode.getOutputVariables()), node.getOutputVariables()),
174-
"the connector optimizer from %s returns a node that does not cover all output before optimization",
175-
connectorId);
190+
if (node != newNode) {
191+
// the optimizer has allocated a new PlanNode
192+
checkState(
193+
containsAll(ImmutableSet.copyOf(newNode.getOutputVariables()), node.getOutputVariables()),
194+
"the connector optimizer from %s returns a node that does not cover all output before optimization",
195+
connectorId);
176196

177-
updates.put(node, newNode);
178-
}
179-
}
180-
// up to this point, we have a set of updated nodes; need to recursively update their parents
181-
182-
// alter the plan with a bottom-up approach (but does not have to be strict bottom-up to guarantee the correctness of the algorithm)
183-
// use "original nodes" to keep track of the plan structure and "updates" to keep track of the new nodes
184-
Queue<PlanNode> originalNodes = new LinkedList<>(updates.keySet());
185-
while (!originalNodes.isEmpty()) {
186-
PlanNode originalNode = originalNodes.poll();
187-
188-
if (!contextMap.get(originalNode).getParent().isPresent()) {
189-
// originalNode must be the root; update the plan
190-
plan = updates.get(originalNode);
191-
continue;
197+
updates.put(node, newNode);
198+
}
192199
}
200+
// up to this point, we have a set of updated nodes; need to recursively update their parents
201+
202+
// alter the plan with a bottom-up approach (but does not have to be strict bottom-up to guarantee the correctness of the algorithm)
203+
// use "original nodes" to keep track of the plan structure and "updates" to keep track of the new nodes
204+
Queue<PlanNode> originalNodes = new LinkedList<>(updates.keySet());
205+
while (!originalNodes.isEmpty()) {
206+
PlanNode originalNode = originalNodes.poll();
207+
208+
if (!contextMap.get(originalNode).getParent().isPresent()) {
209+
// originalNode must be the root; update the plan
210+
plan = updates.get(originalNode);
211+
continue;
212+
}
193213

194-
PlanNode originalParent = contextMap.get(originalNode).getParent().get();
214+
PlanNode originalParent = contextMap.get(originalNode).getParent().get();
195215

196-
// need to create a new parent given the child has changed; the new parent needs to point to the new child.
197-
// if a node has been updated, it will occur in `updates`; otherwise, just use the original node
198-
ImmutableList.Builder<PlanNode> newChildren = ImmutableList.builder();
199-
originalParent.getSources().forEach(child -> newChildren.add(updates.getOrDefault(child, child)));
200-
PlanNode newParent = originalParent.replaceChildren(newChildren.build());
216+
// need to create a new parent given the child has changed; the new parent needs to point to the new child.
217+
// if a node has been updated, it will occur in `updates`; otherwise, just use the original node
218+
ImmutableList.Builder<PlanNode> newChildren = ImmutableList.builder();
219+
originalParent.getSources().forEach(child -> newChildren.add(updates.getOrDefault(child, child)));
220+
PlanNode newParent = originalParent.replaceChildren(newChildren.build());
201221

202-
// mark the new parent as updated
203-
updates.put(originalParent, newParent);
222+
// mark the new parent as updated
223+
updates.put(originalParent, newParent);
204224

205-
// enqueue the parent node in order to recursively update its ancestors
206-
originalNodes.add(originalParent);
225+
// enqueue the parent node in order to recursively update its ancestors
226+
originalNodes.add(originalParent);
227+
}
207228
}
208229
}
209230

@@ -306,17 +327,16 @@ public Set<Class<? extends PlanNode>> getReachablePlanNodeTypes()
306327
return reachablePlanNodeTypes;
307328
}
308329

309-
boolean isClosure(ConnectorId connectorId, Session session)
330+
boolean isClosure(ConnectorId connectorId, Session session, List<ConnectorId> supportedConnectorId)
310331
{
311332
// check if all children can reach the only connector
312333
boolean includeValuesNode = isIncludeValuesNodeInConnectorOptimizer(session);
313334
Set<ConnectorId> connectorIds = includeValuesNode ? reachableConnectors.stream().filter(x -> !x.equals(EMPTY_CONNECTOR_ID)).collect(toImmutableSet()) : reachableConnectors;
314-
if (connectorIds.size() != 1 || !connectorIds.contains(connectorId)) {
315-
return false;
335+
if (connectorIds.contains(connectorId) && new HashSet<>(supportedConnectorId).containsAll(connectorIds) && supportedConnectorId.size() == connectorIds.size()) {
336+
// check if all children are accessible by connectors
337+
return containsAll(CONNECTOR_ACCESSIBLE_PLAN_NODES, reachablePlanNodeTypes);
316338
}
317-
318-
// check if all children are accessible by connectors
319-
return containsAll(CONNECTOR_ACCESSIBLE_PLAN_NODES, reachablePlanNodeTypes);
339+
return false;
320340
}
321341
}
322342

0 commit comments

Comments
 (0)