Skip to content

Commit c82bb16

Browse files
committed
Add system tables support with registry
SystemTableDataProvider now exposes getDataSource() returning an IndexSegment. Add InMemorySystemTableSegment and update system.tables/system.instances providers Broker runs system table queries using the v1 query engine and reduce path.
1 parent 9ef03d0 commit c82bb16

File tree

35 files changed

+4368
-22
lines changed

35 files changed

+4368
-22
lines changed

pinot-broker/pom.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@
3333
<pinot.root>${basedir}/..</pinot.root>
3434
</properties>
3535
<dependencies>
36+
<dependency>
37+
<groupId>org.apache.pinot</groupId>
38+
<artifactId>pinot-java-client</artifactId>
39+
</dependency>
3640
<dependency>
3741
<groupId>org.apache.pinot</groupId>
3842
<artifactId>pinot-query-runtime</artifactId>
@@ -90,6 +94,11 @@
9094
<artifactId>pinot-yammer</artifactId>
9195
<scope>test</scope>
9296
</dependency>
97+
<dependency>
98+
<groupId>org.apache.pinot</groupId>
99+
<artifactId>pinot-system-table</artifactId>
100+
<scope>test</scope>
101+
</dependency>
93102
<dependency>
94103
<groupId>org.testng</groupId>
95104
<artifactId>testng</artifactId>
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.broker.api.resources;
20+
21+
import com.fasterxml.jackson.databind.JsonNode;
22+
import javax.inject.Inject;
23+
import javax.ws.rs.POST;
24+
import javax.ws.rs.Path;
25+
import javax.ws.rs.Produces;
26+
import javax.ws.rs.core.Context;
27+
import javax.ws.rs.core.HttpHeaders;
28+
import javax.ws.rs.core.MediaType;
29+
import javax.ws.rs.core.Response;
30+
import org.apache.pinot.broker.requesthandler.SystemTableBrokerRequestHandler;
31+
import org.apache.pinot.common.datatable.DataTable;
32+
import org.apache.pinot.common.datatable.DataTableImplV4;
33+
import org.apache.pinot.core.auth.ManualAuthorization;
34+
import org.apache.pinot.spi.exception.QueryErrorCode;
35+
import org.apache.pinot.spi.trace.RequestScope;
36+
import org.apache.pinot.spi.trace.Tracing;
37+
import org.apache.pinot.spi.utils.JsonUtils;
38+
39+
import static org.apache.pinot.spi.utils.CommonConstants.Broker.Request.SQL;
40+
41+
42+
/**
43+
* Internal endpoint used for broker-to-broker scatter-gather for system tables.
44+
* <p>
45+
* Returns a serialized {@link DataTable} payload (application/octet-stream) that represents the local broker's shard
46+
* of a system table query.
47+
*/
48+
@Path("/")
49+
public class SystemTableDataTableResource {
50+
@Inject
51+
private SystemTableBrokerRequestHandler _systemTableBrokerRequestHandler;
52+
53+
@POST
54+
@Produces(MediaType.APPLICATION_OCTET_STREAM)
55+
@Path("query/systemTable/datatable")
56+
@ManualAuthorization
57+
public Response processSystemTableDataTable(String requestBody,
58+
@Context org.glassfish.grizzly.http.server.Request requestContext, @Context HttpHeaders httpHeaders) {
59+
DataTable dataTable;
60+
try {
61+
JsonNode requestJson = JsonUtils.stringToJsonNode(requestBody);
62+
if (requestJson == null || !requestJson.isObject() || !requestJson.has(SQL)) {
63+
dataTable = new DataTableImplV4();
64+
dataTable.addException(QueryErrorCode.JSON_PARSING, "Payload is missing the query string field 'sql'");
65+
} else {
66+
try (RequestScope requestScope = Tracing.getTracer().createRequestScope()) {
67+
requestScope.setRequestArrivalTimeMillis(System.currentTimeMillis());
68+
dataTable = _systemTableBrokerRequestHandler.handleSystemTableDataTableRequest(requestJson,
69+
PinotClientRequest.makeHttpIdentity(requestContext), requestScope, httpHeaders);
70+
}
71+
}
72+
} catch (Exception e) {
73+
dataTable = new DataTableImplV4();
74+
dataTable.addException(QueryErrorCode.QUERY_EXECUTION, e.getMessage());
75+
}
76+
77+
try {
78+
return Response.ok(dataTable.toBytes(), MediaType.APPLICATION_OCTET_STREAM).build();
79+
} catch (Exception e) {
80+
// As a last resort, return an empty body; the caller will treat this as a failure.
81+
return Response.ok(new byte[0], MediaType.APPLICATION_OCTET_STREAM).build();
82+
}
83+
}
84+
}

pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.helix.HelixManager;
3535
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
3636
import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
37+
import org.apache.pinot.broker.requesthandler.SystemTableBrokerRequestHandler;
3738
import org.apache.pinot.broker.routing.manager.BrokerRoutingManager;
3839
import org.apache.pinot.common.audit.AuditLogFilter;
3940
import org.apache.pinot.common.cursors.AbstractResponseStore;
@@ -78,6 +79,7 @@ public class BrokerAdminApiApplication extends ResourceConfig {
7879
public BrokerAdminApiApplication(BrokerRoutingManager routingManager, BrokerRequestHandler brokerRequestHandler,
7980
BrokerMetrics brokerMetrics, PinotConfiguration brokerConf, SqlQueryExecutor sqlQueryExecutor,
8081
ServerRoutingStatsManager serverRoutingStatsManager, AccessControlFactory accessFactory,
82+
SystemTableBrokerRequestHandler systemTableBrokerRequestHandler,
8183
HelixManager helixManager, QueryQuotaManager queryQuotaManager, ThreadAccountant threadAccountant,
8284
AbstractResponseStore responseStore) {
8385
_brokerResourcePackages = brokerConf.getProperty(CommonConstants.Broker.BROKER_RESOURCE_PACKAGES,
@@ -108,6 +110,7 @@ protected void configure() {
108110
bind(sqlQueryExecutor).to(SqlQueryExecutor.class);
109111
bind(routingManager).to(BrokerRoutingManager.class);
110112
bind(brokerRequestHandler).to(BrokerRequestHandler.class);
113+
bind(systemTableBrokerRequestHandler).to(SystemTableBrokerRequestHandler.class);
111114
bind(brokerMetrics).to(BrokerMetrics.class);
112115
String loggerRootDir = brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_LOGGER_ROOT_DIR);
113116
if (loggerRootDir != null) {

pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.pinot.broker.requesthandler.MultiStageBrokerRequestHandler;
5656
import org.apache.pinot.broker.requesthandler.MultiStageQueryThrottler;
5757
import org.apache.pinot.broker.requesthandler.SingleConnectionBrokerRequestHandler;
58+
import org.apache.pinot.broker.requesthandler.SystemTableBrokerRequestHandler;
5859
import org.apache.pinot.broker.requesthandler.TimeSeriesRequestHandler;
5960
import org.apache.pinot.broker.routing.manager.BrokerRoutingManager;
6061
import org.apache.pinot.common.Utils;
@@ -73,6 +74,7 @@
7374
import org.apache.pinot.common.metrics.BrokerMeter;
7475
import org.apache.pinot.common.metrics.BrokerMetrics;
7576
import org.apache.pinot.common.metrics.BrokerTimer;
77+
import org.apache.pinot.common.systemtable.SystemTableRegistry;
7678
import org.apache.pinot.common.utils.PinotAppConfigs;
7779
import org.apache.pinot.common.utils.ServiceStartableUtils;
7880
import org.apache.pinot.common.utils.ServiceStatus;
@@ -157,6 +159,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
157159
protected BrokerMetrics _brokerMetrics;
158160
protected BrokerRoutingManager _routingManager;
159161
protected AccessControlFactory _accessControlFactory;
162+
protected SystemTableBrokerRequestHandler _systemTableBrokerRequestHandler;
160163
protected BrokerRequestHandler _brokerRequestHandler;
161164
protected SqlQueryExecutor _sqlQueryExecutor;
162165
protected BrokerAdminApiApplication _brokerAdminApplication;
@@ -353,6 +356,7 @@ public void start()
353356
boolean caseInsensitive =
354357
_brokerConf.getProperty(Helix.ENABLE_CASE_INSENSITIVE_KEY, Helix.DEFAULT_ENABLE_CASE_INSENSITIVE);
355358
_tableCache = new ZkTableCache(_propertyStore, caseInsensitive);
359+
SystemTableRegistry.init(_tableCache, _helixAdmin, _clusterName, _brokerConf);
356360

357361
LOGGER.info("Initializing Broker Event Listener Factory");
358362
BrokerQueryEventListenerFactory.init(_brokerConf.subset(Broker.EVENT_LISTENER_CONFIG_PREFIX));
@@ -460,9 +464,13 @@ public void start()
460464
_responseStore.init(responseStoreConfiguration.subset(_responseStore.getType()), _hostname, _port, brokerId,
461465
_brokerMetrics, expirationTime);
462466

467+
_systemTableBrokerRequestHandler =
468+
new SystemTableBrokerRequestHandler(_brokerConf, brokerId, requestIdGenerator, _routingManager,
469+
_accessControlFactory, _queryQuotaManager, _tableCache, _threadAccountant, multiClusterRoutingContext,
470+
_spectatorHelixManager);
463471
_brokerRequestHandler =
464-
new BrokerRequestHandlerDelegate(singleStageBrokerRequestHandler, multiStageBrokerRequestHandler,
465-
timeSeriesRequestHandler, _responseStore);
472+
new BrokerRequestHandlerDelegate(singleStageBrokerRequestHandler, _systemTableBrokerRequestHandler,
473+
multiStageBrokerRequestHandler, timeSeriesRequestHandler, _responseStore);
466474
_brokerRequestHandler.start();
467475

468476
String controllerUrl = _brokerConf.getProperty(Broker.CONTROLLER_URL);
@@ -775,6 +783,11 @@ public void stop() {
775783
} catch (IOException e) {
776784
LOGGER.error("Caught exception when shutting down PinotFsFactory", e);
777785
}
786+
try {
787+
SystemTableRegistry.close();
788+
} catch (Exception e) {
789+
LOGGER.warn("Failed to close system table registry cleanly", e);
790+
}
778791

779792
LOGGER.info("Disconnecting spectator Helix manager");
780793
_spectatorHelixManager.disconnect();
@@ -821,7 +834,8 @@ public BrokerRequestHandler getBrokerRequestHandler() {
821834
protected BrokerAdminApiApplication createBrokerAdminApp() {
822835
BrokerAdminApiApplication brokerAdminApiApplication =
823836
new BrokerAdminApiApplication(_routingManager, _brokerRequestHandler, _brokerMetrics, _brokerConf,
824-
_sqlQueryExecutor, _serverRoutingStatsManager, _accessControlFactory, _spectatorHelixManager,
837+
_sqlQueryExecutor, _serverRoutingStatsManager, _accessControlFactory, _systemTableBrokerRequestHandler,
838+
_spectatorHelixManager,
825839
_queryQuotaManager, _threadAccountant, _responseStore);
826840
brokerAdminApiApplication.register(
827841
new AuditServiceBinder(_defaultClusterConfigChangeHandler, getServiceRole(), _brokerMetrics));

pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,27 +43,29 @@
4343

4444
/**
4545
* {@code BrokerRequestHandlerDelegate} delegates the inbound broker request to one of the enabled
46-
* {@link BrokerRequestHandler} based on the requested handle type.
47-
*
48-
* {@see: @CommonConstant
46+
* {@link BrokerRequestHandler} implementations based on the request type.
4947
*/
5048
public class BrokerRequestHandlerDelegate implements BrokerRequestHandler {
5149
private final BaseSingleStageBrokerRequestHandler _singleStageBrokerRequestHandler;
50+
private final SystemTableBrokerRequestHandler _systemTableBrokerRequestHandler;
5251
private final MultiStageBrokerRequestHandler _multiStageBrokerRequestHandler;
5352
private final TimeSeriesRequestHandler _timeSeriesRequestHandler;
5453
private final AbstractResponseStore _responseStore;
5554

5655
public BrokerRequestHandlerDelegate(BaseSingleStageBrokerRequestHandler singleStageBrokerRequestHandler,
56+
SystemTableBrokerRequestHandler systemTableBrokerRequestHandler,
5757
@Nullable MultiStageBrokerRequestHandler multiStageBrokerRequestHandler,
5858
@Nullable TimeSeriesRequestHandler timeSeriesRequestHandler, AbstractResponseStore responseStore) {
5959
_singleStageBrokerRequestHandler = singleStageBrokerRequestHandler;
60+
_systemTableBrokerRequestHandler = systemTableBrokerRequestHandler;
6061
_multiStageBrokerRequestHandler = multiStageBrokerRequestHandler;
6162
_timeSeriesRequestHandler = timeSeriesRequestHandler;
6263
_responseStore = responseStore;
6364
}
6465

6566
@Override
6667
public void start() {
68+
_systemTableBrokerRequestHandler.start();
6769
_singleStageBrokerRequestHandler.start();
6870
if (_multiStageBrokerRequestHandler != null) {
6971
_multiStageBrokerRequestHandler.start();
@@ -75,6 +77,7 @@ public void start() {
7577

7678
@Override
7779
public void shutDown() {
80+
_systemTableBrokerRequestHandler.shutDown();
7881
_singleStageBrokerRequestHandler.shutDown();
7982
if (_multiStageBrokerRequestHandler != null) {
8083
_multiStageBrokerRequestHandler.shutDown();
@@ -108,11 +111,19 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption
108111

109112
BaseBrokerRequestHandler requestHandler = _singleStageBrokerRequestHandler;
110113
if (QueryOptionsUtils.isUseMultistageEngine(sqlNodeAndOptions.getOptions())) {
114+
if (isSystemTableQuery(sqlNodeAndOptions)) {
115+
requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
116+
return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION,
117+
"System tables require the single-stage query engine. Remove the `useMultistageEngine=true` query option "
118+
+ "to execute this query.");
119+
}
111120
if (_multiStageBrokerRequestHandler != null) {
112121
requestHandler = _multiStageBrokerRequestHandler;
113122
} else {
114123
return new BrokerResponseNative(QueryErrorCode.INTERNAL, "V2 Multi-Stage query engine not enabled.");
115124
}
125+
} else if (isSystemTableQuery(sqlNodeAndOptions)) {
126+
requestHandler = _systemTableBrokerRequestHandler;
116127
}
117128

118129
BrokerResponse response = requestHandler.handleRequest(request, sqlNodeAndOptions, requesterIdentity,
@@ -160,7 +171,7 @@ public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpC
160171
throws Exception {
161172
if (_multiStageBrokerRequestHandler != null && _multiStageBrokerRequestHandler.cancelQuery(
162173
queryId, timeoutMs, executor, connMgr, serverResponses)) {
163-
return true;
174+
return true;
164175
}
165176
return _singleStageBrokerRequestHandler.cancelQuery(queryId, timeoutMs, executor, connMgr, serverResponses);
166177
}
@@ -188,6 +199,10 @@ public OptionalLong getRequestIdByClientId(String clientQueryId) {
188199
return _singleStageBrokerRequestHandler.getRequestIdByClientId(clientQueryId);
189200
}
190201

202+
private static boolean isSystemTableQuery(SqlNodeAndOptions sqlNodeAndOptions) {
203+
return SystemTableQueryDetector.containsSystemTableReference(sqlNodeAndOptions.getSqlNode());
204+
}
205+
191206
private CursorResponse getCursorResponse(Integer numRows, BrokerResponse response)
192207
throws Exception {
193208
if (numRows == null) {

0 commit comments

Comments
 (0)