Skip to content

Commit 34faf04

Browse files
authored
IGNITE-26485 Sql. Introduce system view exposing cached query plans (#6773)
1 parent fd08268 commit 34faf04

File tree

10 files changed

+358
-17
lines changed

10 files changed

+358
-17
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.sql.engine.systemviews;
19+
20+
import static org.junit.jupiter.api.Assertions.assertNotEquals;
21+
22+
import java.time.ZoneId;
23+
import org.apache.ignite.internal.TestWrappers;
24+
import org.apache.ignite.internal.catalog.CatalogManager;
25+
import org.apache.ignite.internal.sql.engine.util.MetadataMatcher;
26+
import org.apache.ignite.internal.type.NativeTypes;
27+
import org.apache.ignite.sql.ColumnType;
28+
import org.apache.ignite.tx.Transaction;
29+
import org.junit.jupiter.api.Test;
30+
31+
/**
32+
* End-to-end tests to verify {@code SQL_CACHED_QUERY_PLANS} view.
33+
*/
34+
public class ItCachedQueryPlansSystemViewTest extends AbstractSystemViewTest {
35+
36+
@Override
37+
protected int initialNodes() {
38+
return 2;
39+
}
40+
41+
@Test
42+
public void testMetadata() {
43+
assertQuery("SELECT * FROM SYSTEM.SQL_CACHED_QUERY_PLANS")
44+
.columnMetadata(
45+
new MetadataMatcher().name("NODE_ID").type(ColumnType.STRING).nullable(false),
46+
new MetadataMatcher().name("PLAN_ID").type(ColumnType.STRING).nullable(true),
47+
new MetadataMatcher().name("CATALOG_VERSION").type(ColumnType.INT32).nullable(true),
48+
new MetadataMatcher().name("QUERY_DEFAULT_SCHEMA").type(ColumnType.STRING).nullable(true),
49+
new MetadataMatcher().name("SQL").type(ColumnType.STRING).nullable(true),
50+
new MetadataMatcher().name("QUERY_TYPE").type(ColumnType.STRING).nullable(true),
51+
new MetadataMatcher().name("QUERY_PLAN").type(ColumnType.STRING).nullable(true),
52+
new MetadataMatcher().name("QUERY_PREPARE_TIME").type(ColumnType.TIMESTAMP)
53+
.precision(NativeTypes.MAX_TIME_PRECISION).nullable(true)
54+
)
55+
.check();
56+
}
57+
58+
@Test
59+
public void test() throws InterruptedException {
60+
CatalogManager catalogManager = TestWrappers.unwrapIgniteImpl(node(0)).catalogManager();
61+
62+
int v1 = catalogManager.latestCatalogVersion();
63+
64+
String plan1 = executeAndGetPlan(0, "PUBLIC", "SELECT 1");
65+
String plan2 = executeAndGetPlan(1, "PUBLIC", "SELECT 2");
66+
67+
// increase catalog version
68+
sql("CREATE SCHEMA test_schema");
69+
sql("CREATE TABLE test_table (id INT, val INT, PRIMARY KEY(id))");
70+
71+
int v2 = catalogManager.latestCatalogVersion();
72+
assertNotEquals(v1, v2, "catalog versions should differ");
73+
74+
String plan3 = executeAndGetPlan(0, "TEST_SCHEMA", "SELECT 3");
75+
String plan4 = executeAndGetPlan(1, "PUBLIC", "INSERT INTO test_table (id, val) SELECT x, y FROM (VALUES(1, 2)) t(x, y)");
76+
String plan5 = executeAndGetPlan(0, "PUBLIC", "SELECT * FROM test_table WHERE id = 1");
77+
78+
String node1 = node(0).name();
79+
String node2 = node(1).name();
80+
81+
String selectCachedQueries = "SELECT "
82+
+ "node_id, catalog_version, query_default_schema, sql, query_type, query_plan "
83+
+ "FROM system.sql_cached_query_plans "
84+
// exclude this query out the result.
85+
+ "WHERE sql not like '%SQL_CACHED_QUERY_PLANS%' "
86+
+ "ORDER BY sql";
87+
88+
assertQuery(selectCachedQueries)
89+
.returns(node2, v2, "PUBLIC", "INSERT INTO `TEST_TABLE` (`ID`, `VAL`)\n"
90+
+ "SELECT `X`, `Y`\n"
91+
+ "FROM (VALUES ROW(1, 2)) AS `T` (`X`, `Y`)", "DML", plan4)
92+
.returns(node1, v2, "PUBLIC", "SELECT *\n"
93+
+ "FROM `TEST_TABLE`\n"
94+
+ "WHERE `ID` = 1", "QUERY", plan5)
95+
.returns(node1, v1, "PUBLIC", "SELECT 1", "QUERY", plan1)
96+
.returns(node2, v1, "PUBLIC", "SELECT 2", "QUERY", plan2)
97+
.returns(node1, v2, "TEST_SCHEMA", "SELECT 3", "QUERY", plan3)
98+
.ordered()
99+
.check();
100+
}
101+
102+
private static String executeAndGetPlan(int nodeIndex, String schema, String query) {
103+
return sql(node(nodeIndex), (Transaction) null, schema, ZoneId.systemDefault(),
104+
"EXPLAIN PLAN FOR " + query)
105+
.get(0).get(0).toString();
106+
}
107+
}

modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueriesViewProvider.java

Lines changed: 56 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,18 @@
2020
import static org.apache.ignite.internal.type.NativeTypes.stringOf;
2121

2222
import java.time.Instant;
23+
import java.util.List;
2324
import java.util.UUID;
2425
import java.util.concurrent.CompletableFuture;
2526
import java.util.concurrent.Flow.Publisher;
2627
import org.apache.ignite.internal.sql.engine.exec.fsm.ExecutionPhase;
2728
import org.apache.ignite.internal.sql.engine.exec.fsm.QueryExecutor;
2829
import org.apache.ignite.internal.sql.engine.exec.fsm.QueryInfo;
30+
import org.apache.ignite.internal.sql.engine.prepare.ExplainablePlan;
31+
import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
32+
import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
33+
import org.apache.ignite.internal.sql.engine.prepare.PreparedPlan;
34+
import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
2935
import org.apache.ignite.internal.systemview.api.SystemView;
3036
import org.apache.ignite.internal.systemview.api.SystemViews;
3137
import org.apache.ignite.internal.type.NativeType;
@@ -37,24 +43,34 @@
3743
public class SqlQueriesViewProvider {
3844
public static final String SCRIPT_QUERY_TYPE = "SCRIPT";
3945

46+
private static final NativeType TIMESTAMP_TYPE = NativeTypes.timestamp(NativeTypes.MAX_TIME_PRECISION);
47+
4048
private final CompletableFuture<QueryExecutor> queryExecutorFuture = new CompletableFuture<>();
4149

50+
private final CompletableFuture<PrepareService> prepareServiceFuture = new CompletableFuture<>();
51+
4252
/** Initializes provided with query executor used as datasource of running queries. */
43-
public void init(QueryExecutor queryExecutor) {
53+
public void init(QueryExecutor queryExecutor, PrepareServiceImpl prepareSvc) {
4454
queryExecutorFuture.complete(queryExecutor);
55+
prepareServiceFuture.complete(prepareSvc);
56+
}
57+
58+
/** Returns system views. */
59+
public List<SystemView<?>> getViews() {
60+
return List.of(
61+
queries(),
62+
cachedPlans()
63+
);
4564
}
4665

47-
/** Returns system view exposing running queries. */
48-
public SystemView<?> get() {
66+
private SystemView<?> queries() {
4967
Publisher<QueryInfo> viewDataPublisher = SubscriptionUtils.fromIterable(
5068
queryExecutorFuture.thenApply(queryExecutor -> () -> queryExecutor.runningQueries().iterator())
5169
);
5270

5371
NativeType stringType = stringOf(Short.MAX_VALUE);
5472
NativeType idType = stringOf(36);
5573

56-
NativeType timestampType = NativeTypes.timestamp(NativeTypes.MAX_TIME_PRECISION);
57-
5874
return SystemViews.<QueryInfo>nodeViewBuilder()
5975
.name("SQL_QUERIES")
6076
.nodeNameColumnAlias("INITIATOR_NODE")
@@ -63,7 +79,7 @@ public SystemView<?> get() {
6379
.<String>addColumn("QUERY_TYPE", stringOf(10), SqlQueriesViewProvider::deriveQueryType)
6480
.<String>addColumn("QUERY_DEFAULT_SCHEMA", stringType, QueryInfo::schema)
6581
.<String>addColumn("SQL", stringType, QueryInfo::sql)
66-
.<Instant>addColumn("QUERY_START_TIME", timestampType, QueryInfo::startTime)
82+
.<Instant>addColumn("QUERY_START_TIME", TIMESTAMP_TYPE, QueryInfo::startTime)
6783
.<String>addColumn("TRANSACTION_ID", idType, info -> mapId(info.transactionId()))
6884
.<String>addColumn("PARENT_QUERY_ID", idType, info -> mapId(info.parentId()))
6985
.<Integer>addColumn("QUERY_STATEMENT_ORDINAL", NativeTypes.INT32, info -> mapStatementNum(info.statementNum()))
@@ -73,7 +89,7 @@ public SystemView<?> get() {
7389
.<String>addColumn("PHASE", stringOf(10), info -> mapPhase(info.phase()))
7490
.<String>addColumn("TYPE", stringOf(10), SqlQueriesViewProvider::deriveQueryType)
7591
.<String>addColumn("SCHEMA", stringType, QueryInfo::schema)
76-
.<Instant>addColumn("START_TIME", timestampType, QueryInfo::startTime)
92+
.<Instant>addColumn("START_TIME", TIMESTAMP_TYPE, QueryInfo::startTime)
7793
.<String>addColumn("PARENT_ID", idType, info -> mapId(info.parentId()))
7894
.<Integer>addColumn("STATEMENT_NUM", NativeTypes.INT32, info -> mapStatementNum(info.statementNum()))
7995
// End of legacy columns list. New columns must be added below this line.
@@ -92,7 +108,7 @@ public SystemView<?> get() {
92108

93109
SqlQueryType queryType = info.queryType();
94110

95-
return queryType == null ? null : queryType.toString();
111+
return queryType == null ? null : mapQueryType(queryType);
96112
}
97113

98114
private static String mapPhase(ExecutionPhase phase) {
@@ -116,4 +132,36 @@ private static String mapPhase(ExecutionPhase phase) {
116132
private static @Nullable Integer mapStatementNum(int statementNum) {
117133
return statementNum >= 0 ? statementNum : null;
118134
}
135+
136+
private SystemView<?> cachedPlans() {
137+
Publisher<PreparedPlan> viewDataPublisher = SubscriptionUtils.fromIterable(
138+
prepareServiceFuture.thenApply(queryExecutor -> () -> queryExecutor.preparedPlans().iterator())
139+
);
140+
141+
return SystemViews.<PreparedPlan>nodeViewBuilder()
142+
.name("SQL_CACHED_QUERY_PLANS")
143+
.nodeNameColumnAlias("NODE_ID")
144+
.addColumn("PLAN_ID", NativeTypes.STRING, (v) -> v.queryPlan().id().toString())
145+
.addColumn("CATALOG_VERSION", NativeTypes.INT32, PreparedPlan::catalogVersion)
146+
.addColumn("QUERY_DEFAULT_SCHEMA", NativeTypes.STRING, PreparedPlan::defaultSchemaName)
147+
.addColumn("SQL", NativeTypes.STRING, PreparedPlan::sql)
148+
.addColumn("QUERY_TYPE", NativeTypes.STRING, (v) -> mapQueryType(v.queryPlan().type()))
149+
.addColumn("QUERY_PLAN", NativeTypes.STRING, (v) -> mapQueryPlan(v.queryPlan()))
150+
.addColumn("QUERY_PREPARE_TIME", TIMESTAMP_TYPE, PreparedPlan::timestamp)
151+
.dataProvider(viewDataPublisher)
152+
.build();
153+
}
154+
155+
private static String mapQueryType(SqlQueryType type) {
156+
return type.toString();
157+
}
158+
159+
private static @Nullable String mapQueryPlan(QueryPlan plan) {
160+
if (plan instanceof ExplainablePlan) {
161+
ExplainablePlan explainablePlan = (ExplainablePlan) plan;
162+
return explainablePlan.explain();
163+
} else {
164+
return null;
165+
}
166+
}
119167
}

modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ public synchronized CompletableFuture<Void> startAsync(ComponentContext componen
392392
sqlQueryMetricSource
393393
));
394394

395-
queriesViewProvider.init(queryExecutor);
395+
queriesViewProvider.init(queryExecutor, prepareSvc);
396396

397397
clusterSrvc.topologyService().addEventHandler(executionSrvc);
398398
clusterSrvc.topologyService().addEventHandler(mailboxRegistry);
@@ -607,7 +607,7 @@ public List<QueryInfo> runningQueries() {
607607

608608
@Override
609609
public List<SystemView<?>> systemViews() {
610-
return List.of(queriesViewProvider.get());
610+
return queriesViewProvider.getViews();
611611
}
612612

613613
@Override

modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/CacheKey.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class CacheKey {
4343
*
4444
* @param catalogVersion Catalog version.
4545
* @param schemaName Schema name.
46-
* @param query Query string.
46+
* @param query Query string.
4747
* @param paramTypes Types of all dynamic parameters, no any type can be {@code null}.
4848
*/
4949
public CacheKey(int catalogVersion, String schemaName, String query, ColumnType[] paramTypes) {
@@ -65,6 +65,10 @@ ColumnType[] paramTypes() {
6565
return paramTypes;
6666
}
6767

68+
String query() {
69+
return query;
70+
}
71+
6872
/** {@inheritDoc} */
6973
@Override
7074
public boolean equals(Object o) {

modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlanId.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,9 @@ public int hashCode() {
6060
result = 31 * result + (int) (planNumber ^ (planNumber >>> 32));
6161
return result;
6262
}
63+
64+
@Override
65+
public String toString() {
66+
return prepareServiceId + "-" + planNumber;
67+
}
6368
}

modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareService.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,11 @@ public interface PrepareService extends LifecycleAware {
4848
default CompletableFuture<Void> invalidateCache(Set<String> tableNames) {
4949
return CompletableFutures.nullCompletedFuture();
5050
}
51+
52+
/**
53+
* Returns prepared plans stored in cached.
54+
*
55+
* @return Cached prepared plans.
56+
*/
57+
Set<PreparedPlan> preparedPlans();
5158
}

modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
3232
import it.unimi.dsi.fastutil.ints.IntSet;
3333
import java.time.Duration;
34+
import java.time.Instant;
3435
import java.util.ArrayList;
3536
import java.util.Collections;
3637
import java.util.List;
@@ -116,6 +117,7 @@
116117
import org.apache.ignite.table.QualifiedName;
117118
import org.apache.ignite.table.QualifiedNameHelper;
118119
import org.jetbrains.annotations.Nullable;
120+
import org.jetbrains.annotations.TestOnly;
119121

120122
/**
121123
* An implementation of the {@link PrepareService} that uses a Calcite-based query planner to validate and optimize a given query.
@@ -398,6 +400,27 @@ public CompletableFuture<Void> invalidateCache(Set<String> tableNames) {
398400
}, planningPool);
399401
}
400402

403+
/** {@inheritDoc} */
404+
@Override
405+
public Set<PreparedPlan> preparedPlans() {
406+
return cache.entrySet().stream()
407+
.filter(e -> {
408+
CompletableFuture<PlanInfo> f = e.getValue();
409+
return f.isDone() && !f.isCompletedExceptionally() && !f.isCancelled();
410+
})
411+
.map(e -> {
412+
CacheKey key = e.getKey();
413+
PlanInfo value = e.getValue().getNow(null);
414+
Instant timestamp = value.timestamp;
415+
return new PreparedPlan(key, value.queryPlan, timestamp);
416+
}).collect(Collectors.toSet());
417+
}
418+
419+
@TestOnly
420+
UUID prepareServiceId() {
421+
return this.prepareServiceId;
422+
}
423+
401424
/** Check if the given query plan matches the given predicate. */
402425
public static boolean planMatches(QueryPlan plan, Predicate<QualifiedName> predicate) {
403426
assert plan instanceof ExplainablePlan;
@@ -1306,6 +1329,7 @@ static class PlanInfo {
13061329
@Nullable
13071330
private final IntSet sources;
13081331
private volatile boolean needToInvalidate;
1332+
private final Instant timestamp = Instant.now();
13091333

13101334
private PlanInfo(
13111335
QueryPlan plan,

0 commit comments

Comments
 (0)