Skip to content

Commit 3fae010

Browse files
committed
Add system tables support with registry
SystemTableDataProvider now exposes getDataSource() returning an IndexSegment. Add InMemorySystemTableSegment and update system.tables/system.instances providers Broker runs system table queries using the v1 query engine and reduce path.
1 parent dea5e9c commit 3fae010

File tree

35 files changed

+4368
-22
lines changed

35 files changed

+4368
-22
lines changed

pinot-broker/pom.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@
3333
<pinot.root>${basedir}/..</pinot.root>
3434
</properties>
3535
<dependencies>
36+
<dependency>
37+
<groupId>org.apache.pinot</groupId>
38+
<artifactId>pinot-java-client</artifactId>
39+
</dependency>
3640
<dependency>
3741
<groupId>org.apache.pinot</groupId>
3842
<artifactId>pinot-query-runtime</artifactId>
@@ -90,6 +94,11 @@
9094
<artifactId>pinot-yammer</artifactId>
9195
<scope>test</scope>
9296
</dependency>
97+
<dependency>
98+
<groupId>org.apache.pinot</groupId>
99+
<artifactId>pinot-system-table</artifactId>
100+
<scope>test</scope>
101+
</dependency>
93102
<dependency>
94103
<groupId>org.testng</groupId>
95104
<artifactId>testng</artifactId>
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.broker.api.resources;
20+
21+
import com.fasterxml.jackson.databind.JsonNode;
22+
import javax.inject.Inject;
23+
import javax.ws.rs.POST;
24+
import javax.ws.rs.Path;
25+
import javax.ws.rs.Produces;
26+
import javax.ws.rs.core.Context;
27+
import javax.ws.rs.core.HttpHeaders;
28+
import javax.ws.rs.core.MediaType;
29+
import javax.ws.rs.core.Response;
30+
import org.apache.pinot.broker.requesthandler.SystemTableBrokerRequestHandler;
31+
import org.apache.pinot.common.datatable.DataTable;
32+
import org.apache.pinot.common.datatable.DataTableImplV4;
33+
import org.apache.pinot.core.auth.ManualAuthorization;
34+
import org.apache.pinot.spi.exception.QueryErrorCode;
35+
import org.apache.pinot.spi.trace.RequestScope;
36+
import org.apache.pinot.spi.trace.Tracing;
37+
import org.apache.pinot.spi.utils.JsonUtils;
38+
39+
import static org.apache.pinot.spi.utils.CommonConstants.Broker.Request.SQL;
40+
41+
42+
/**
43+
* Internal endpoint used for broker-to-broker scatter-gather for system tables.
44+
* <p>
45+
* Returns a serialized {@link DataTable} payload (application/octet-stream) that represents the local broker's shard
46+
* of a system table query.
47+
*/
48+
@Path("/")
49+
public class SystemTableDataTableResource {
50+
@Inject
51+
private SystemTableBrokerRequestHandler _systemTableBrokerRequestHandler;
52+
53+
@POST
54+
@Produces(MediaType.APPLICATION_OCTET_STREAM)
55+
@Path("query/systemTable/datatable")
56+
@ManualAuthorization
57+
public Response processSystemTableDataTable(String requestBody,
58+
@Context org.glassfish.grizzly.http.server.Request requestContext, @Context HttpHeaders httpHeaders) {
59+
DataTable dataTable;
60+
try {
61+
JsonNode requestJson = JsonUtils.stringToJsonNode(requestBody);
62+
if (requestJson == null || !requestJson.isObject() || !requestJson.has(SQL)) {
63+
dataTable = new DataTableImplV4();
64+
dataTable.addException(QueryErrorCode.JSON_PARSING, "Payload is missing the query string field 'sql'");
65+
} else {
66+
try (RequestScope requestScope = Tracing.getTracer().createRequestScope()) {
67+
requestScope.setRequestArrivalTimeMillis(System.currentTimeMillis());
68+
dataTable = _systemTableBrokerRequestHandler.handleSystemTableDataTableRequest(requestJson,
69+
PinotClientRequest.makeHttpIdentity(requestContext), requestScope, httpHeaders);
70+
}
71+
}
72+
} catch (Exception e) {
73+
dataTable = new DataTableImplV4();
74+
dataTable.addException(QueryErrorCode.QUERY_EXECUTION, e.getMessage());
75+
}
76+
77+
try {
78+
return Response.ok(dataTable.toBytes(), MediaType.APPLICATION_OCTET_STREAM).build();
79+
} catch (Exception e) {
80+
// As a last resort, return an empty body; the caller will treat this as a failure.
81+
return Response.ok(new byte[0], MediaType.APPLICATION_OCTET_STREAM).build();
82+
}
83+
}
84+
}

pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.helix.HelixManager;
3535
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
3636
import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
37+
import org.apache.pinot.broker.requesthandler.SystemTableBrokerRequestHandler;
3738
import org.apache.pinot.broker.routing.manager.BrokerRoutingManager;
3839
import org.apache.pinot.common.audit.AuditLogFilter;
3940
import org.apache.pinot.common.cursors.AbstractResponseStore;
@@ -78,6 +79,7 @@ public class BrokerAdminApiApplication extends ResourceConfig {
7879
public BrokerAdminApiApplication(BrokerRoutingManager routingManager, BrokerRequestHandler brokerRequestHandler,
7980
BrokerMetrics brokerMetrics, PinotConfiguration brokerConf, SqlQueryExecutor sqlQueryExecutor,
8081
ServerRoutingStatsManager serverRoutingStatsManager, AccessControlFactory accessFactory,
82+
SystemTableBrokerRequestHandler systemTableBrokerRequestHandler,
8183
HelixManager helixManager, QueryQuotaManager queryQuotaManager, ThreadAccountant threadAccountant,
8284
AbstractResponseStore responseStore) {
8385
_brokerResourcePackages = brokerConf.getProperty(CommonConstants.Broker.BROKER_RESOURCE_PACKAGES,
@@ -108,6 +110,7 @@ protected void configure() {
108110
bind(sqlQueryExecutor).to(SqlQueryExecutor.class);
109111
bind(routingManager).to(BrokerRoutingManager.class);
110112
bind(brokerRequestHandler).to(BrokerRequestHandler.class);
113+
bind(systemTableBrokerRequestHandler).to(SystemTableBrokerRequestHandler.class);
111114
bind(brokerMetrics).to(BrokerMetrics.class);
112115
String loggerRootDir = brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_LOGGER_ROOT_DIR);
113116
if (loggerRootDir != null) {

pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.pinot.broker.requesthandler.MultiStageBrokerRequestHandler;
5454
import org.apache.pinot.broker.requesthandler.MultiStageQueryThrottler;
5555
import org.apache.pinot.broker.requesthandler.SingleConnectionBrokerRequestHandler;
56+
import org.apache.pinot.broker.requesthandler.SystemTableBrokerRequestHandler;
5657
import org.apache.pinot.broker.requesthandler.TimeSeriesRequestHandler;
5758
import org.apache.pinot.broker.routing.manager.BrokerRoutingManager;
5859
import org.apache.pinot.common.Utils;
@@ -71,6 +72,7 @@
7172
import org.apache.pinot.common.metrics.BrokerMeter;
7273
import org.apache.pinot.common.metrics.BrokerMetrics;
7374
import org.apache.pinot.common.metrics.BrokerTimer;
75+
import org.apache.pinot.common.systemtable.SystemTableRegistry;
7476
import org.apache.pinot.common.utils.PinotAppConfigs;
7577
import org.apache.pinot.common.utils.ServiceStartableUtils;
7678
import org.apache.pinot.common.utils.ServiceStatus;
@@ -154,6 +156,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
154156
protected BrokerMetrics _brokerMetrics;
155157
protected BrokerRoutingManager _routingManager;
156158
protected AccessControlFactory _accessControlFactory;
159+
protected SystemTableBrokerRequestHandler _systemTableBrokerRequestHandler;
157160
protected BrokerRequestHandler _brokerRequestHandler;
158161
protected SqlQueryExecutor _sqlQueryExecutor;
159162
protected BrokerAdminApiApplication _brokerAdminApplication;
@@ -349,6 +352,7 @@ public void start()
349352
boolean caseInsensitive =
350353
_brokerConf.getProperty(Helix.ENABLE_CASE_INSENSITIVE_KEY, Helix.DEFAULT_ENABLE_CASE_INSENSITIVE);
351354
_tableCache = new ZkTableCache(_propertyStore, caseInsensitive);
355+
SystemTableRegistry.init(_tableCache, _helixAdmin, _clusterName, _brokerConf);
352356

353357
LOGGER.info("Initializing Broker Event Listener Factory");
354358
BrokerQueryEventListenerFactory.init(_brokerConf.subset(Broker.EVENT_LISTENER_CONFIG_PREFIX));
@@ -440,9 +444,13 @@ public void start()
440444
_responseStore.init(responseStoreConfiguration.subset(_responseStore.getType()), _hostname, _port, brokerId,
441445
_brokerMetrics, expirationTime);
442446

447+
_systemTableBrokerRequestHandler =
448+
new SystemTableBrokerRequestHandler(_brokerConf, brokerId, requestIdGenerator, _routingManager,
449+
_accessControlFactory, _queryQuotaManager, _tableCache, _threadAccountant, multiClusterRoutingContext,
450+
_spectatorHelixManager);
443451
_brokerRequestHandler =
444-
new BrokerRequestHandlerDelegate(singleStageBrokerRequestHandler, multiStageBrokerRequestHandler,
445-
timeSeriesRequestHandler, _responseStore);
452+
new BrokerRequestHandlerDelegate(singleStageBrokerRequestHandler, _systemTableBrokerRequestHandler,
453+
multiStageBrokerRequestHandler, timeSeriesRequestHandler, _responseStore);
446454
_brokerRequestHandler.start();
447455

448456
String controllerUrl = _brokerConf.getProperty(Broker.CONTROLLER_URL);
@@ -755,6 +763,11 @@ public void stop() {
755763
} catch (IOException e) {
756764
LOGGER.error("Caught exception when shutting down PinotFsFactory", e);
757765
}
766+
try {
767+
SystemTableRegistry.close();
768+
} catch (Exception e) {
769+
LOGGER.warn("Failed to close system table registry cleanly", e);
770+
}
758771

759772
LOGGER.info("Disconnecting spectator Helix manager");
760773
_spectatorHelixManager.disconnect();
@@ -801,7 +814,8 @@ public BrokerRequestHandler getBrokerRequestHandler() {
801814
protected BrokerAdminApiApplication createBrokerAdminApp() {
802815
BrokerAdminApiApplication brokerAdminApiApplication =
803816
new BrokerAdminApiApplication(_routingManager, _brokerRequestHandler, _brokerMetrics, _brokerConf,
804-
_sqlQueryExecutor, _serverRoutingStatsManager, _accessControlFactory, _spectatorHelixManager,
817+
_sqlQueryExecutor, _serverRoutingStatsManager, _accessControlFactory, _systemTableBrokerRequestHandler,
818+
_spectatorHelixManager,
805819
_queryQuotaManager, _threadAccountant, _responseStore);
806820
brokerAdminApiApplication.register(
807821
new AuditServiceBinder(_defaultClusterConfigChangeHandler, getServiceRole(), _brokerMetrics));

pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,27 +43,29 @@
4343

4444
/**
4545
* {@code BrokerRequestHandlerDelegate} delegates the inbound broker request to one of the enabled
46-
* {@link BrokerRequestHandler} based on the requested handle type.
47-
*
48-
* {@see: @CommonConstant
46+
* {@link BrokerRequestHandler} implementations based on the request type.
4947
*/
5048
public class BrokerRequestHandlerDelegate implements BrokerRequestHandler {
5149
private final BaseSingleStageBrokerRequestHandler _singleStageBrokerRequestHandler;
50+
private final SystemTableBrokerRequestHandler _systemTableBrokerRequestHandler;
5251
private final MultiStageBrokerRequestHandler _multiStageBrokerRequestHandler;
5352
private final TimeSeriesRequestHandler _timeSeriesRequestHandler;
5453
private final AbstractResponseStore _responseStore;
5554

5655
public BrokerRequestHandlerDelegate(BaseSingleStageBrokerRequestHandler singleStageBrokerRequestHandler,
56+
SystemTableBrokerRequestHandler systemTableBrokerRequestHandler,
5757
@Nullable MultiStageBrokerRequestHandler multiStageBrokerRequestHandler,
5858
@Nullable TimeSeriesRequestHandler timeSeriesRequestHandler, AbstractResponseStore responseStore) {
5959
_singleStageBrokerRequestHandler = singleStageBrokerRequestHandler;
60+
_systemTableBrokerRequestHandler = systemTableBrokerRequestHandler;
6061
_multiStageBrokerRequestHandler = multiStageBrokerRequestHandler;
6162
_timeSeriesRequestHandler = timeSeriesRequestHandler;
6263
_responseStore = responseStore;
6364
}
6465

6566
@Override
6667
public void start() {
68+
_systemTableBrokerRequestHandler.start();
6769
_singleStageBrokerRequestHandler.start();
6870
if (_multiStageBrokerRequestHandler != null) {
6971
_multiStageBrokerRequestHandler.start();
@@ -75,6 +77,7 @@ public void start() {
7577

7678
@Override
7779
public void shutDown() {
80+
_systemTableBrokerRequestHandler.shutDown();
7881
_singleStageBrokerRequestHandler.shutDown();
7982
if (_multiStageBrokerRequestHandler != null) {
8083
_multiStageBrokerRequestHandler.shutDown();
@@ -108,11 +111,19 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption
108111

109112
BaseBrokerRequestHandler requestHandler = _singleStageBrokerRequestHandler;
110113
if (QueryOptionsUtils.isUseMultistageEngine(sqlNodeAndOptions.getOptions())) {
114+
if (isSystemTableQuery(sqlNodeAndOptions)) {
115+
requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
116+
return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION,
117+
"System tables require the single-stage query engine. Remove the `useMultistageEngine=true` query option "
118+
+ "to execute this query.");
119+
}
111120
if (_multiStageBrokerRequestHandler != null) {
112121
requestHandler = _multiStageBrokerRequestHandler;
113122
} else {
114123
return new BrokerResponseNative(QueryErrorCode.INTERNAL, "V2 Multi-Stage query engine not enabled.");
115124
}
125+
} else if (isSystemTableQuery(sqlNodeAndOptions)) {
126+
requestHandler = _systemTableBrokerRequestHandler;
116127
}
117128

118129
BrokerResponse response = requestHandler.handleRequest(request, sqlNodeAndOptions, requesterIdentity,
@@ -151,7 +162,7 @@ public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpC
151162
throws Exception {
152163
if (_multiStageBrokerRequestHandler != null && _multiStageBrokerRequestHandler.cancelQuery(
153164
queryId, timeoutMs, executor, connMgr, serverResponses)) {
154-
return true;
165+
return true;
155166
}
156167
return _singleStageBrokerRequestHandler.cancelQuery(queryId, timeoutMs, executor, connMgr, serverResponses);
157168
}
@@ -179,6 +190,10 @@ public OptionalLong getRequestIdByClientId(String clientQueryId) {
179190
return _singleStageBrokerRequestHandler.getRequestIdByClientId(clientQueryId);
180191
}
181192

193+
private static boolean isSystemTableQuery(SqlNodeAndOptions sqlNodeAndOptions) {
194+
return SystemTableQueryDetector.containsSystemTableReference(sqlNodeAndOptions.getSqlNode());
195+
}
196+
182197
private CursorResponse getCursorResponse(Integer numRows, BrokerResponse response)
183198
throws Exception {
184199
if (numRows == null) {

0 commit comments

Comments
 (0)