Skip to content

Commit 7f70124

Browse files
authored
Make MSE worker manager pluggable (#17787)
1 parent 6c5d200 commit 7f70124

File tree

13 files changed

+118
-45
lines changed

13 files changed

+118
-45
lines changed

pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,13 @@
8787
import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
8888
import org.apache.pinot.core.query.utils.rewriter.ResultRewriterFactory;
8989
import org.apache.pinot.core.routing.MultiClusterRoutingContext;
90+
import org.apache.pinot.core.routing.RoutingManager;
9091
import org.apache.pinot.core.transport.ListenerConfig;
9192
import org.apache.pinot.core.transport.NettyInspector;
9293
import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
9394
import org.apache.pinot.core.util.ListenerConfigUtil;
9495
import org.apache.pinot.core.util.trace.ContinuousJfrStarter;
96+
import org.apache.pinot.query.routing.WorkerManager;
9597
import org.apache.pinot.query.runtime.operator.factory.DefaultQueryOperatorFactoryProvider;
9698
import org.apache.pinot.query.runtime.operator.factory.QueryOperatorFactoryProvider;
9799
import org.apache.pinot.segment.local.function.GroovyFunctionEvaluator;
@@ -138,7 +140,6 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
138140
protected String _instanceId;
139141
private volatile boolean _isStarting = false;
140142
private volatile boolean _isShuttingDown = false;
141-
142143
// Dedicated handler for listening to cluster config changes
143144
protected final DefaultClusterConfigChangeHandler _clusterConfigChangeHandler =
144145
new DefaultClusterConfigChangeHandler();
@@ -237,6 +238,14 @@ protected QueryOperatorFactoryProvider createQueryOperatorFactoryProvider(PinotC
237238
return DefaultQueryOperatorFactoryProvider.INSTANCE;
238239
}
239240

241+
/**
242+
* Override to customize the {@link WorkerManager} used for multi-stage query engine worker assignment.
243+
*/
244+
protected WorkerManager createWorkerManager(String brokerId, String hostname, int port,
245+
RoutingManager routingManager) {
246+
return new WorkerManager(brokerId, hostname, port, routingManager);
247+
}
248+
240249
private void setupHelixSystemProperties() {
241250
// NOTE: Helix will disconnect the manager and disable the instance if it detects flapping (too frequent disconnect
242251
// from ZooKeeper). Setting flapping time window to a small value can avoid this from happening. Helix ignores the
@@ -438,10 +447,21 @@ public void start()
438447
// multi-stage request handler uses both Netty and GRPC ports.
439448
// worker requires both the "Netty port" for protocol transport; and "GRPC port" for mailbox transport.
440449
// TODO: decouple protocol and engine selection.
450+
String queryRunnerHostname = _brokerConf.getProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
451+
int queryRunnerPort = Integer.parseInt(_brokerConf.getProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
452+
WorkerManager workerManager =
453+
createWorkerManager(brokerId, queryRunnerHostname, queryRunnerPort, _routingManager);
454+
WorkerManager multiClusterWorkerManager;
455+
if (multiClusterRoutingContext != null) {
456+
multiClusterWorkerManager = createWorkerManager(brokerId, queryRunnerHostname, queryRunnerPort,
457+
multiClusterRoutingContext.getMultiClusterRoutingManager());
458+
} else {
459+
multiClusterWorkerManager = workerManager;
460+
}
441461
multiStageBrokerRequestHandler =
442462
new MultiStageBrokerRequestHandler(_brokerConf, brokerId, requestIdGenerator, _routingManager,
443463
_accessControlFactory, _queryQuotaManager, _tableCache, _multiStageQueryThrottler, _failureDetector,
444-
_threadAccountant, multiClusterRoutingContext);
464+
_threadAccountant, multiClusterRoutingContext, workerManager, multiClusterWorkerManager);
445465
MultiStageBrokerRequestHandler finalHandler = multiStageBrokerRequestHandler;
446466
_routingManager.setServerReenableCallback(
447467
serverInstance -> finalHandler.getQueryDispatcher().resetClientConnectionBackoff(serverInstance));
@@ -724,7 +744,7 @@ private String getDefaultBrokerId() {
724744
}
725745
}
726746

727-
private boolean updatePortIfNeeded(Map<String, String> instanceConfigSimpleFields, String key, int port) {
747+
protected boolean updatePortIfNeeded(Map<String, String> instanceConfigSimpleFields, String key, int port) {
728748
String existingPortStr = instanceConfigSimpleFields.get(key);
729749
if (port > 0) {
730750
String portStr = Integer.toString(port);

pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@ public BrokerRequestHandlerDelegate(BaseSingleStageBrokerRequestHandler singleSt
6262
_responseStore = responseStore;
6363
}
6464

65+
@Nullable
66+
public MultiStageBrokerRequestHandler getMultiStageBrokerRequestHandler() {
67+
return _multiStageBrokerRequestHandler;
68+
}
69+
6570
@Override
6671
public void start() {
6772
_singleStageBrokerRequestHandler.start();

pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -148,20 +148,15 @@ public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId
148148
BrokerRequestIdGenerator requestIdGenerator, RoutingManager routingManager,
149149
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
150150
MultiStageQueryThrottler queryThrottler, FailureDetector failureDetector, ThreadAccountant threadAccountant,
151-
MultiClusterRoutingContext multiClusterRoutingContext) {
151+
MultiClusterRoutingContext multiClusterRoutingContext,
152+
WorkerManager workerManager, WorkerManager multiClusterWorkerManager) {
152153
super(config, brokerId, requestIdGenerator, routingManager, accessControlFactory, queryQuotaManager, tableCache,
153154
threadAccountant, multiClusterRoutingContext);
154155
String hostname = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
155156
int port = Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
156157

157-
_workerManager = new WorkerManager(_brokerId, hostname, port, _routingManager);
158-
if (multiClusterRoutingContext != null) {
159-
_multiClusterWorkerManager = new WorkerManager(_brokerId, hostname, port,
160-
multiClusterRoutingContext.getMultiClusterRoutingManager());
161-
} else {
162-
// if multi-cluster routing is not enabled, use the same worker manager.
163-
_multiClusterWorkerManager = _workerManager;
164-
}
158+
_workerManager = workerManager;
159+
_multiClusterWorkerManager = multiClusterWorkerManager;
165160

166161
TlsConfig tlsConfig = config.getProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_TLS_ENABLED,
167162
CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_TLS_ENABLED) ? TlsUtils.extractTlsConfig(config,

pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageWithoutStatsIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.apache.commons.io.FileUtils;
2727
import org.apache.helix.model.HelixConfigScope;
2828
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
29-
import org.apache.pinot.server.starter.helix.SendStatsPredicate;
29+
import org.apache.pinot.query.runtime.SendStatsPredicate;
3030
import org.apache.pinot.spi.config.table.FieldConfig;
3131
import org.apache.pinot.spi.config.table.TableConfig;
3232
import org.apache.pinot.spi.config.table.TableType;

pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ private static boolean isLeafPlan(DispatchablePlanMetadata metadata) {
205205
// --------------------------------------------------------------------------
206206
// Intermediate stage assign logic
207207
// --------------------------------------------------------------------------
208-
private void assignWorkersToIntermediateFragment(PlanFragment fragment, DispatchablePlanContext context) {
208+
protected void assignWorkersToIntermediateFragment(PlanFragment fragment, DispatchablePlanContext context) {
209209
List<PlanFragment> children = fragment.getChildren();
210210
Map<Integer, DispatchablePlanMetadata> metadataMap = context.getDispatchablePlanMetadataMap();
211211
DispatchablePlanMetadata metadata = metadataMap.get(fragment.getFragmentId());
@@ -300,11 +300,13 @@ private void assignWorkersToIntermediateFragment(PlanFragment fragment, Dispatch
300300
childWorkerIdToSegmentsMap.put(workerId, replicatedSegments);
301301
}
302302
} else {
303-
int numWorkers = candidateServers.size();
303+
List<QueryServerInstance> replicatedLeafServers =
304+
getCandidateServersForReplicatedLeaf(context, candidateServers);
305+
int numWorkers = replicatedLeafServers.size();
304306
childWorkerIdToServerInstanceMap = Maps.newHashMapWithExpectedSize(numWorkers);
305307
childWorkerIdToSegmentsMap = Maps.newHashMapWithExpectedSize(numWorkers);
306308
for (int workerId = 0; workerId < numWorkers; workerId++) {
307-
childWorkerIdToServerInstanceMap.put(workerId, candidateServers.get(workerId));
309+
childWorkerIdToServerInstanceMap.put(workerId, replicatedLeafServers.get(workerId));
308310
childWorkerIdToSegmentsMap.put(workerId, replicatedSegments);
309311
}
310312
}
@@ -359,7 +361,7 @@ private boolean isPrePartitionAssignment(List<PlanFragment> children,
359361
/**
360362
* Returns the servers serving any segment of the tables in the query.
361363
*/
362-
private List<QueryServerInstance> getCandidateServers(DispatchablePlanContext context) {
364+
protected List<QueryServerInstance> getCandidateServers(DispatchablePlanContext context) {
363365
List<QueryServerInstance> candidateServers;
364366
if (context.isUseLeafServerForIntermediateStage()) {
365367
Set<QueryServerInstance> leafServerInstances = context.getLeafServerInstances();
@@ -385,7 +387,7 @@ private List<QueryServerInstance> getCandidateServers(DispatchablePlanContext co
385387
return candidateServers;
386388
}
387389

388-
private List<QueryServerInstance> getCandidateServersPerTables(DispatchablePlanContext context) {
390+
protected List<QueryServerInstance> getCandidateServersPerTables(DispatchablePlanContext context) {
389391
Set<String> nonLookupTables = context.getNonLookupTables();
390392
assert !nonLookupTables.isEmpty();
391393
Set<String> servers = new HashSet<>();
@@ -437,6 +439,18 @@ private List<QueryServerInstance> getCandidateServersPerTables(DispatchablePlanC
437439
return candidateServers;
438440
}
439441

442+
/**
443+
* Returns the instances to assign to replicated leaf stage children when there is no local exchange peer. By default,
444+
* uses the same candidates as the intermediate stage.
445+
*
446+
* <p>Subclasses can override to use different instances for replicated leaf stages (e.g., when intermediate stages
447+
* run on non-server instances that cannot scan segments).</p>
448+
*/
449+
protected List<QueryServerInstance> getCandidateServersForReplicatedLeaf(DispatchablePlanContext context,
450+
List<QueryServerInstance> intermediateStageWorkers) {
451+
return intermediateStageWorkers;
452+
}
453+
440454
private void assignWorkersToLeafFragment(PlanFragment fragment, DispatchablePlanContext context) {
441455
DispatchablePlanMetadata metadata = context.getDispatchablePlanMetadataMap().get(fragment.getFragmentId());
442456

pinot-server/src/main/java/org/apache/pinot/server/starter/helix/KeepPipelineBreakerStatsPredicate.java renamed to pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/KeepPipelineBreakerStatsPredicate.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19-
package org.apache.pinot.server.starter.helix;
19+
package org.apache.pinot.query.runtime;
2020

2121
import java.util.Map;
2222
import java.util.Set;

pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java

Lines changed: 50 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ public class QueryRunner {
111111
private ExecutorService _executorService;
112112
private OpChainSchedulerService _opChainScheduler;
113113
private MailboxService _mailboxService;
114+
private boolean _ownsMailboxService = true;
114115
private QueryExecutor _leafQueryExecutor;
115116

116117
// Group-by settings
@@ -143,10 +144,14 @@ public class QueryRunner {
143144
private BooleanSupplier _keepPipelineBreakerStats;
144145

145146
/**
146-
* Initializes the query executor.
147+
* Initializes the query runner.
147148
* <p>Should be called only once and before calling any other method.
149+
*
150+
* @param instanceDataManager when non-null, the leaf query executor and time series visitor are initialized
151+
* for processing leaf-stage queries. When null, only intermediate-stage execution
152+
* is supported.
148153
*/
149-
public void init(PinotConfiguration serverConf, InstanceDataManager instanceDataManager,
154+
public void init(PinotConfiguration serverConf, String instanceId, @Nullable InstanceDataManager instanceDataManager,
150155
@Nullable TlsConfig tlsConfig, BooleanSupplier sendStats, BooleanSupplier keepPipelineBreakerStats) {
151156
String hostname = serverConf.getProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
152157
if (hostname.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)) {
@@ -221,19 +226,25 @@ public void init(PinotConfiguration serverConf, InstanceDataManager instanceData
221226
_executorService = ThrottleOnCriticalHeapUsageExecutor.maybeWrap(
222227
_executorService, serverConf, "multi-stage executor");
223228

224-
_opChainScheduler = new OpChainSchedulerService(instanceDataManager.getInstanceId(), _executorService, serverConf);
225-
_mailboxService = new MailboxService(hostname, port, InstanceType.SERVER, serverConf, tlsConfig);
226-
try {
227-
_leafQueryExecutor = new ServerQueryExecutorV1Impl();
228-
_leafQueryExecutor.init(serverConf.subset(Server.QUERY_EXECUTOR_CONFIG_PREFIX), instanceDataManager,
229-
serverMetrics);
230-
} catch (Exception e) {
231-
throw new RuntimeException(e);
229+
_opChainScheduler = new OpChainSchedulerService(instanceId, _executorService, serverConf);
230+
if (_mailboxService == null) {
231+
_mailboxService = new MailboxService(hostname, port, InstanceType.SERVER, serverConf, tlsConfig);
232232
}
233-
if (StringUtils.isNotBlank(serverConf.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey()))) {
234-
_timeSeriesPhysicalPlanVisitor =
235-
new PhysicalTimeSeriesServerPlanVisitor(_leafQueryExecutor, _executorService, serverMetrics);
236-
TimeSeriesBuilderFactoryProvider.init(serverConf);
233+
234+
if (instanceDataManager != null) {
235+
try {
236+
_leafQueryExecutor = new ServerQueryExecutorV1Impl();
237+
_leafQueryExecutor.init(serverConf.subset(Server.QUERY_EXECUTOR_CONFIG_PREFIX), instanceDataManager,
238+
serverMetrics);
239+
} catch (Exception e) {
240+
throw new RuntimeException(e);
241+
}
242+
if (StringUtils.isNotBlank(
243+
serverConf.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey()))) {
244+
_timeSeriesPhysicalPlanVisitor =
245+
new PhysicalTimeSeriesServerPlanVisitor(_leafQueryExecutor, _executorService, serverMetrics);
246+
TimeSeriesBuilderFactoryProvider.init(serverConf);
247+
}
237248
}
238249

239250
_sendStats = sendStats;
@@ -242,14 +253,35 @@ public void init(PinotConfiguration serverConf, InstanceDataManager instanceData
242253
LOGGER.info("Initialized QueryRunner with hostname: {}, port: {}", hostname, port);
243254
}
244255

256+
/**
257+
* Initializes the query runner with a shared {@link MailboxService}.
258+
*/
259+
public void init(PinotConfiguration serverConf, String instanceId, @Nullable InstanceDataManager instanceDataManager,
260+
@Nullable TlsConfig tlsConfig, BooleanSupplier sendStats, BooleanSupplier keepPipelineBreakerStats,
261+
@Nullable MailboxService sharedMailboxService) {
262+
if (sharedMailboxService != null) {
263+
_mailboxService = sharedMailboxService;
264+
_ownsMailboxService = false;
265+
}
266+
init(serverConf, instanceId, instanceDataManager, tlsConfig, sendStats, keepPipelineBreakerStats);
267+
}
268+
245269
public void start() {
246-
_mailboxService.start();
247-
_leafQueryExecutor.start();
270+
if (_ownsMailboxService) {
271+
_mailboxService.start();
272+
}
273+
if (_leafQueryExecutor != null) {
274+
_leafQueryExecutor.start();
275+
}
248276
}
249277

250278
public void shutDown() {
251-
_leafQueryExecutor.shutDown();
252-
_mailboxService.shutdown();
279+
if (_leafQueryExecutor != null) {
280+
_leafQueryExecutor.shutDown();
281+
}
282+
if (_ownsMailboxService) {
283+
_mailboxService.shutdown();
284+
}
253285
ExecutorServiceUtils.close(_executorService);
254286
}
255287

pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SendStatsPredicate.java renamed to pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/SendStatsPredicate.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19-
package org.apache.pinot.server.starter.helix;
19+
package org.apache.pinot.query.runtime;
2020

2121
import java.util.HashMap;
2222
import java.util.List;

pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,10 @@ public QueryDispatcher(MailboxService mailboxService, FailureDetector failureDet
133133
}
134134
}
135135

136+
public MailboxService getMailboxService() {
137+
return _mailboxService;
138+
}
139+
136140
public void start() {
137141
_mailboxService.start();
138142
}

pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ public QueryServerEnclosure(MockInstanceDataManagerFactory factory, Map<String,
6262
runnerConfig.put(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT, _queryRunnerPort);
6363
InstanceDataManager instanceDataManager = factory.buildInstanceDataManager();
6464
_queryRunner = new QueryRunner();
65-
_queryRunner.init(new PinotConfiguration(runnerConfig), instanceDataManager, null, () -> true, () -> true);
65+
_queryRunner.init(new PinotConfiguration(runnerConfig), instanceDataManager.getInstanceId(), instanceDataManager,
66+
null, () -> true, () -> true);
6667
}
6768

6869
public int getPort() {

0 commit comments

Comments
 (0)