Skip to content

Commit 1811dfa

Browse files
committed
Add system tables support with registry
1 parent 227de91 commit 1811dfa

File tree

17 files changed

+1858
-18
lines changed

17 files changed

+1858
-18
lines changed

pinot-broker/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@
3333
<pinot.root>${basedir}/..</pinot.root>
3434
</properties>
3535
<dependencies>
36+
<dependency>
37+
<groupId>org.apache.pinot</groupId>
38+
<artifactId>pinot-java-client</artifactId>
39+
</dependency>
3640
<dependency>
3741
<groupId>org.apache.pinot</groupId>
3842
<artifactId>pinot-query-runtime</artifactId>

pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.pinot.broker.requesthandler.MultiStageBrokerRequestHandler;
5454
import org.apache.pinot.broker.requesthandler.MultiStageQueryThrottler;
5555
import org.apache.pinot.broker.requesthandler.SingleConnectionBrokerRequestHandler;
56+
import org.apache.pinot.broker.requesthandler.SystemTableBrokerRequestHandler;
5657
import org.apache.pinot.broker.requesthandler.TimeSeriesRequestHandler;
5758
import org.apache.pinot.broker.routing.BrokerRoutingManager;
5859
import org.apache.pinot.common.Utils;
@@ -71,6 +72,7 @@
7172
import org.apache.pinot.common.metrics.BrokerMeter;
7273
import org.apache.pinot.common.metrics.BrokerMetrics;
7374
import org.apache.pinot.common.metrics.BrokerTimer;
75+
import org.apache.pinot.common.systemtable.SystemTableRegistry;
7476
import org.apache.pinot.common.utils.PinotAppConfigs;
7577
import org.apache.pinot.common.utils.ServiceStartableUtils;
7678
import org.apache.pinot.common.utils.ServiceStatus;
@@ -354,6 +356,7 @@ public void start()
354356
boolean caseInsensitive =
355357
_brokerConf.getProperty(Helix.ENABLE_CASE_INSENSITIVE_KEY, Helix.DEFAULT_ENABLE_CASE_INSENSITIVE);
356358
TableCache tableCache = new ZkTableCache(_propertyStore, caseInsensitive);
359+
SystemTableRegistry.INSTANCE.init(tableCache, _helixAdmin, _clusterName);
357360

358361
LOGGER.info("Initializing Broker Event Listener Factory");
359362
BrokerQueryEventListenerFactory.init(_brokerConf.subset(Broker.EVENT_LISTENER_CONFIG_PREFIX));
@@ -443,9 +446,12 @@ public void start()
443446
_responseStore.init(responseStoreConfiguration.subset(_responseStore.getType()), _hostname, _port, brokerId,
444447
_brokerMetrics, expirationTime);
445448

449+
SystemTableBrokerRequestHandler systemTableBrokerRequestHandler =
450+
new SystemTableBrokerRequestHandler(_brokerConf, brokerId, requestIdGenerator, _routingManager,
451+
_accessControlFactory, _queryQuotaManager, tableCache, SystemTableRegistry.INSTANCE, _threadAccountant);
446452
_brokerRequestHandler =
447-
new BrokerRequestHandlerDelegate(singleStageBrokerRequestHandler, multiStageBrokerRequestHandler,
448-
timeSeriesRequestHandler, _responseStore);
453+
new BrokerRequestHandlerDelegate(singleStageBrokerRequestHandler, systemTableBrokerRequestHandler,
454+
multiStageBrokerRequestHandler, timeSeriesRequestHandler, _responseStore);
449455
_brokerRequestHandler.start();
450456

451457
String controllerUrl = _brokerConf.getProperty(Broker.CONTROLLER_URL);
@@ -742,6 +748,11 @@ public void stop() {
742748
} catch (IOException e) {
743749
LOGGER.error("Caught exception when shutting down PinotFsFactory", e);
744750
}
751+
try {
752+
SystemTableRegistry.INSTANCE.close();
753+
} catch (Exception e) {
754+
LOGGER.warn("Failed to close system table registry cleanly", e);
755+
}
745756

746757
LOGGER.info("Disconnecting spectator Helix manager");
747758
_spectatorHelixManager.disconnect();

pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.HashMap;
2323
import java.util.Map;
2424
import java.util.OptionalLong;
25+
import java.util.Set;
2526
import java.util.concurrent.Executor;
2627
import javax.annotation.Nullable;
2728
import javax.ws.rs.core.HttpHeaders;
@@ -37,6 +38,7 @@
3738
import org.apache.pinot.spi.exception.QueryException;
3839
import org.apache.pinot.spi.trace.RequestContext;
3940
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
41+
import org.apache.pinot.sql.parsers.CalciteSqlParser;
4042
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
4143
import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
4244

@@ -49,21 +51,25 @@
4951
*/
5052
public class BrokerRequestHandlerDelegate implements BrokerRequestHandler {
5153
private final BaseSingleStageBrokerRequestHandler _singleStageBrokerRequestHandler;
54+
private final SystemTableBrokerRequestHandler _systemTableBrokerRequestHandler;
5255
private final MultiStageBrokerRequestHandler _multiStageBrokerRequestHandler;
5356
private final TimeSeriesRequestHandler _timeSeriesRequestHandler;
5457
private final AbstractResponseStore _responseStore;
5558

5659
public BrokerRequestHandlerDelegate(BaseSingleStageBrokerRequestHandler singleStageBrokerRequestHandler,
60+
SystemTableBrokerRequestHandler systemTableBrokerRequestHandler,
5761
@Nullable MultiStageBrokerRequestHandler multiStageBrokerRequestHandler,
5862
@Nullable TimeSeriesRequestHandler timeSeriesRequestHandler, AbstractResponseStore responseStore) {
5963
_singleStageBrokerRequestHandler = singleStageBrokerRequestHandler;
64+
_systemTableBrokerRequestHandler = systemTableBrokerRequestHandler;
6065
_multiStageBrokerRequestHandler = multiStageBrokerRequestHandler;
6166
_timeSeriesRequestHandler = timeSeriesRequestHandler;
6267
_responseStore = responseStore;
6368
}
6469

6570
@Override
6671
public void start() {
72+
_systemTableBrokerRequestHandler.start();
6773
_singleStageBrokerRequestHandler.start();
6874
if (_multiStageBrokerRequestHandler != null) {
6975
_multiStageBrokerRequestHandler.start();
@@ -75,6 +81,7 @@ public void start() {
7581

7682
@Override
7783
public void shutDown() {
84+
_systemTableBrokerRequestHandler.shutDown();
7885
_singleStageBrokerRequestHandler.shutDown();
7986
if (_multiStageBrokerRequestHandler != null) {
8087
_multiStageBrokerRequestHandler.shutDown();
@@ -106,7 +113,20 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption
106113
}
107114
}
108115

116+
Set<String> tableNames = null;
117+
if (_systemTableBrokerRequestHandler != null) {
118+
try {
119+
tableNames = RequestUtils.getTableNames(CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions));
120+
} catch (Exception e) {
121+
// Ignore compilation exceptions here; the selected request handler will surface them appropriately.
122+
}
123+
}
109124
BaseBrokerRequestHandler requestHandler = _singleStageBrokerRequestHandler;
125+
if (tableNames != null && tableNames.size() == 1
126+
&& _systemTableBrokerRequestHandler != null
127+
&& _systemTableBrokerRequestHandler.canHandle(tableNames.iterator().next())) {
128+
requestHandler = _systemTableBrokerRequestHandler;
129+
}
110130
if (QueryOptionsUtils.isUseMultistageEngine(sqlNodeAndOptions.getOptions())) {
111131
if (_multiStageBrokerRequestHandler != null) {
112132
requestHandler = _multiStageBrokerRequestHandler;
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.broker.requesthandler;
20+
21+
import com.fasterxml.jackson.databind.JsonNode;
22+
import java.util.Collections;
23+
import java.util.Locale;
24+
import java.util.Map;
25+
import java.util.OptionalLong;
26+
import java.util.Set;
27+
import java.util.concurrent.Executor;
28+
import javax.annotation.Nullable;
29+
import javax.ws.rs.core.HttpHeaders;
30+
import org.apache.commons.collections.CollectionUtils;
31+
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
32+
import org.apache.pinot.broker.api.AccessControl;
33+
import org.apache.pinot.broker.broker.AccessControlFactory;
34+
import org.apache.pinot.broker.querylog.QueryLogger;
35+
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
36+
import org.apache.pinot.common.config.provider.TableCache;
37+
import org.apache.pinot.common.metrics.BrokerMeter;
38+
import org.apache.pinot.common.request.PinotQuery;
39+
import org.apache.pinot.common.response.BrokerResponse;
40+
import org.apache.pinot.common.response.broker.BrokerResponseNative;
41+
import org.apache.pinot.common.systemtable.SystemTableDataProvider;
42+
import org.apache.pinot.common.systemtable.SystemTableRegistry;
43+
import org.apache.pinot.common.utils.request.RequestUtils;
44+
import org.apache.pinot.core.routing.RoutingManager;
45+
import org.apache.pinot.spi.accounting.ThreadAccountant;
46+
import org.apache.pinot.spi.auth.AuthorizationResult;
47+
import org.apache.pinot.spi.auth.broker.RequesterIdentity;
48+
import org.apache.pinot.spi.env.PinotConfiguration;
49+
import org.apache.pinot.spi.exception.BadQueryRequestException;
50+
import org.apache.pinot.spi.exception.QueryErrorCode;
51+
import org.apache.pinot.spi.trace.RequestContext;
52+
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
53+
import org.apache.pinot.sql.parsers.CalciteSqlParser;
54+
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
55+
import org.slf4j.Logger;
56+
import org.slf4j.LoggerFactory;
57+
58+
/**
59+
* Broker request handler for system tables (handled entirely on the broker).
60+
*/
61+
public class SystemTableBrokerRequestHandler extends BaseBrokerRequestHandler {
62+
private static final Logger LOGGER = LoggerFactory.getLogger(SystemTableBrokerRequestHandler.class);
63+
64+
private final SystemTableRegistry _systemTableRegistry;
65+
66+
public SystemTableBrokerRequestHandler(PinotConfiguration config, String brokerId,
67+
BrokerRequestIdGenerator requestIdGenerator, RoutingManager routingManager,
68+
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
69+
SystemTableRegistry systemTableRegistry, ThreadAccountant threadAccountant) {
70+
super(config, brokerId, requestIdGenerator, routingManager, accessControlFactory, queryQuotaManager, tableCache,
71+
threadAccountant);
72+
_systemTableRegistry = systemTableRegistry;
73+
}
74+
75+
@Override
76+
public void start() {
77+
}
78+
79+
@Override
80+
public void shutDown() {
81+
}
82+
83+
public boolean canHandle(String tableName) {
84+
return isSystemTable(tableName) && _systemTableRegistry.isRegistered(tableName);
85+
}
86+
87+
@Override
88+
protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndOptions sqlNodeAndOptions,
89+
JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext,
90+
@Nullable HttpHeaders httpHeaders, AccessControl accessControl)
91+
throws Exception {
92+
PinotQuery pinotQuery;
93+
try {
94+
pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions);
95+
} catch (Exception e) {
96+
requestContext.setErrorCode(QueryErrorCode.SQL_PARSING);
97+
return new BrokerResponseNative(QueryErrorCode.SQL_PARSING, e.getMessage());
98+
}
99+
100+
Set<String> tableNames = RequestUtils.getTableNames(pinotQuery);
101+
if (tableNames == null || tableNames.isEmpty()) {
102+
requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
103+
return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, "Failed to extract table name");
104+
}
105+
if (tableNames.size() != 1) {
106+
requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
107+
return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, "System tables do not support joins");
108+
}
109+
String tableName = tableNames.iterator().next();
110+
if (!isSystemTable(tableName)) {
111+
requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
112+
return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, "Not a system table query");
113+
}
114+
AuthorizationResult authorizationResult =
115+
hasTableAccess(requesterIdentity, Set.of(tableName), requestContext, httpHeaders);
116+
if (!authorizationResult.hasAccess()) {
117+
requestContext.setErrorCode(QueryErrorCode.ACCESS_DENIED);
118+
return new BrokerResponseNative(QueryErrorCode.ACCESS_DENIED, authorizationResult.getFailureMessage());
119+
}
120+
121+
return handleSystemTableQuery(pinotQuery, tableName, requestContext, requesterIdentity, query);
122+
}
123+
124+
@Override
125+
protected boolean handleCancel(long queryId, int timeoutMs, Executor executor,
126+
HttpClientConnectionManager connMgr, Map<String, Integer> serverResponses) {
127+
return false;
128+
}
129+
130+
@Override
131+
public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor,
132+
HttpClientConnectionManager connMgr, Map<String, Integer> serverResponses)
133+
throws Exception {
134+
return false;
135+
}
136+
137+
@Override
138+
public Map<Long, String> getRunningQueries() {
139+
return Collections.emptyMap();
140+
}
141+
142+
@Override
143+
public OptionalLong getRequestIdByClientId(String clientQueryId) {
144+
return OptionalLong.empty();
145+
}
146+
147+
private boolean isSystemTable(String tableName) {
148+
return tableName != null && tableName.toLowerCase(Locale.ROOT).startsWith("system.");
149+
}
150+
151+
private BrokerResponse handleSystemTableQuery(PinotQuery pinotQuery, String tableName, RequestContext requestContext,
152+
@Nullable RequesterIdentity requesterIdentity, String query) {
153+
if (pinotQuery.isExplain()) {
154+
return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT;
155+
}
156+
SystemTableDataProvider provider = _systemTableRegistry.get(tableName);
157+
if (provider == null) {
158+
requestContext.setErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST);
159+
return BrokerResponseNative.TABLE_DOES_NOT_EXIST;
160+
}
161+
try {
162+
if (!isSupportedSystemTableQuery(pinotQuery)) {
163+
requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
164+
return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION,
165+
"System tables only support simple projection/filter/limit queries");
166+
}
167+
BrokerResponseNative brokerResponse = provider.getBrokerResponse(pinotQuery);
168+
if (CollectionUtils.isEmpty(brokerResponse.getTablesQueried())) {
169+
brokerResponse.setTablesQueried(Set.of(TableNameBuilder.extractRawTableName(tableName)));
170+
}
171+
brokerResponse.setTimeUsedMs(System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis());
172+
_queryLogger.log(new QueryLogger.QueryLogParams(requestContext, tableName, brokerResponse,
173+
QueryLogger.QueryLogParams.QueryEngine.SINGLE_STAGE, requesterIdentity, null));
174+
return brokerResponse;
175+
} catch (BadQueryRequestException e) {
176+
requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
177+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1);
178+
return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, e.getMessage());
179+
} catch (Exception e) {
180+
LOGGER.warn("Caught exception while handling system table query {}: {}", tableName, e.getMessage(), e);
181+
requestContext.setErrorCode(QueryErrorCode.QUERY_EXECUTION);
182+
return new BrokerResponseNative(QueryErrorCode.QUERY_EXECUTION, e.getMessage());
183+
}
184+
}
185+
186+
private boolean isSupportedSystemTableQuery(PinotQuery pinotQuery) {
187+
return (pinotQuery.getGroupByList() == null || pinotQuery.getGroupByList().isEmpty())
188+
&& pinotQuery.getHavingExpression() == null
189+
&& (pinotQuery.getOrderByList() == null || pinotQuery.getOrderByList().isEmpty());
190+
}
191+
}

pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -192,24 +192,24 @@ public void testCancelQuery() {
192192
new BaseSingleStageBrokerRequestHandler(config, "testBrokerId", new BrokerRequestIdGenerator(), routingManager,
193193
new AllowAllAccessControlFactory(), queryQuotaManager, tableCache,
194194
ThreadAccountantUtils.getNoOpAccountant()) {
195-
@Override
196-
public void start() {
197-
}
195+
@Override
196+
public void start() {
197+
}
198198

199-
@Override
200-
public void shutDown() {
201-
}
199+
@Override
200+
public void shutDown() {
201+
}
202202

203-
@Override
204-
protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
205-
BrokerRequest serverBrokerRequest, TableRouteInfo route, long timeoutMs, ServerStats serverStats,
206-
RequestContext requestContext)
207-
throws Exception {
208-
testRequestId[0] = requestId;
209-
latch.await();
210-
return null;
211-
}
212-
};
203+
@Override
204+
protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
205+
BrokerRequest serverBrokerRequest, TableRouteInfo route, long timeoutMs, ServerStats serverStats,
206+
RequestContext requestContext)
207+
throws Exception {
208+
testRequestId[0] = requestId;
209+
latch.await();
210+
return null;
211+
}
212+
};
213213
CompletableFuture.runAsync(() -> {
214214
try {
215215
requestHandler.handleRequest(String.format("select * from %s limit 10", tableName));

0 commit comments

Comments
 (0)