Skip to content

Commit 338c201

Browse files
IGNITE-27313 SQL: Add information about query initiator id to log
1 parent d9a5476 commit 338c201

File tree

12 files changed

+149
-27
lines changed

12 files changed

+149
-27
lines changed

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -752,7 +752,8 @@ private <T> T processQuery(
752752
(q, ex) -> qryReg.unregister(q.id(), ex),
753753
log,
754754
qryPlannerTimeout,
755-
timeout
755+
timeout,
756+
fldsQry != null ? fldsQry.getQueryInitiatorId() : null
756757
);
757758

758759
if (qrys != null)

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.concurrent.ConcurrentHashMap;
2323
import java.util.concurrent.ConcurrentMap;
2424
import org.apache.ignite.cache.query.QueryCancelledException;
25-
import org.apache.ignite.cache.query.SqlFieldsQuery;
2625
import org.apache.ignite.internal.GridKernalContext;
2726
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
2827
import org.apache.ignite.internal.processors.query.GridQueryCancel;
@@ -58,12 +57,8 @@ public QueryRegistryImpl(GridKernalContext ctx) {
5857

5958
RunningQueryManager qryMgr = kctx.query().runningQueryManager();
6059

61-
SqlFieldsQuery fieldsQry = rootQry.context().unwrap(SqlFieldsQuery.class);
62-
63-
String initiatorId = fieldsQry != null ? fieldsQry.getQueryInitiatorId() : null;
64-
6560
long locId = qryMgr.register(rootQry.sql(), GridCacheQueryType.SQL_FIELDS, rootQry.context().schemaName(),
66-
false, createCancelToken(qry), initiatorId, false, true, false);
61+
false, createCancelToken(qry), rootQry.initiatorId(), false, true, false);
6762

6863
rootQry.localQueryId(locId);
6964

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ public class RootQuery<RowT> extends Query<RowT> implements TrackableQuery {
9191
/** */
9292
private final long totalTimeout;
9393

94+
/** */
95+
private final String initiatorId;
96+
9497
/** */
9598
private volatile long locQryId;
9699

@@ -113,7 +116,8 @@ public RootQuery(
113116
BiConsumer<Query<RowT>, Throwable> unregister,
114117
IgniteLogger log,
115118
long plannerTimeout,
116-
long totalTimeout
119+
long totalTimeout,
120+
String initiatorId
117121
) {
118122
super(
119123
UUID.randomUUID(),
@@ -135,6 +139,7 @@ public RootQuery(
135139

136140
this.plannerTimeout = totalTimeout > 0 ? Math.min(plannerTimeout, totalTimeout) : plannerTimeout;
137141
this.totalTimeout = totalTimeout;
142+
this.initiatorId = initiatorId;
138143

139144
Context parent = Commons.convert(qryCtx);
140145

@@ -172,7 +177,9 @@ public RootQuery<RowT> childQuery(SchemaPlus schema) {
172177
unregister,
173178
log,
174179
plannerTimeout,
175-
totalTimeout);
180+
totalTimeout,
181+
initiatorId
182+
);
176183
}
177184

178185
/** */
@@ -444,6 +451,7 @@ else if (state == QueryState.CLOSING)
444451
.append(", type=CALCITE")
445452
.append(", state=").append(state)
446453
.append(", schema=").append(ctx.schemaName())
454+
.append(", initiatorId=").append(initiatorId)
447455
.append(", sql='").append(sql);
448456

449457
msgSb.append(']');
@@ -468,6 +476,13 @@ public long remainingTime() {
468476
return curTimeout <= 0 ? 0 : curTimeout;
469477
}
470478

479+
/**
480+
* @return Query initiator ID.
481+
*/
482+
public String initiatorId() {
483+
return initiatorId;
484+
}
485+
471486
/** */
472487
@Override public String toString() {
473488
return S.toString(RootQuery.class, this);

modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1002,7 +1002,7 @@ private void cancelAllQueriesAndWaitForCompletion(IgniteEx ignite, IgniteInterna
10021002
}
10031003
}
10041004

1005-
/** Verifies that user-defined query initiator ID is present in the SQL_QUERY_HISTORY system view. */
1005+
/** Verifies that user-defined query initiator ID is present in the SQL_QUERY_HISTORY system view and logs. */
10061006
@Test
10071007
public void testSqlFieldsQueryWithInitiatorId() throws Exception {
10081008
IgniteEx grid = grid(0);
@@ -1027,6 +1027,15 @@ public void testSqlFieldsQueryWithInitiatorId() throws Exception {
10271027
return testId.equals(view.initiatorId());
10281028
}, 3_000));
10291029
}
1030+
1031+
LogListener logLsnr = LogListener.matches(LONG_QUERY_FINISHED_MSG).andMatches("initiatorId=testId2").build();
1032+
1033+
log.registerListener(logLsnr);
1034+
1035+
cache.query(new SqlFieldsQuery("SELECT sleep(?)").setArgs(LONG_QRY_TIMEOUT).setQueryInitiatorId("testId2"))
1036+
.getAll();
1037+
1038+
assertTrue(logLsnr.check(1000));
10301039
}
10311040

10321041
/**
@@ -1136,6 +1145,16 @@ public static boolean waitLatch(long time) {
11361145
return true;
11371146
}
11381147

1148+
/** */
1149+
@QuerySqlFunction
1150+
public static boolean sleep(int sleep) {
1151+
doSleep(sleep);
1152+
1153+
GridTestClockTimer.update();
1154+
1155+
return true;
1156+
}
1157+
11391158
/** */
11401159
@QuerySqlFunction
11411160
public static String innerSql(String ignite, String cache, String val) {

modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,10 @@ public class H2QueryInfo implements TrackableQuery {
7272
private final UUID nodeId;
7373

7474
/** Query id. */
75-
private final long queryId;
75+
private final long qryId;
76+
77+
/** Query initiator ID. */
78+
private final String initiatorId;
7679

7780
/** Query SQL plan. */
7881
private volatile String plan;
@@ -82,16 +85,18 @@ public class H2QueryInfo implements TrackableQuery {
8285
* @param stmt Query statement.
8386
* @param sql Query statement.
8487
* @param nodeId Originator node id.
85-
* @param queryId Query id.
88+
* @param qryId Query id.
89+
* @param initiatorId Query initiator id.
8690
*/
87-
public H2QueryInfo(QueryType type, PreparedStatement stmt, String sql, UUID nodeId, long queryId) {
91+
public H2QueryInfo(QueryType type, PreparedStatement stmt, String sql, UUID nodeId, long qryId, String initiatorId) {
8892
try {
8993
assert stmt != null;
9094

9195
this.type = type;
9296
this.sql = sql;
9397
this.nodeId = nodeId;
94-
this.queryId = queryId;
98+
this.qryId = qryId;
99+
this.initiatorId = initiatorId;
95100

96101
beginTs = U.currentTimeMillis();
97102

@@ -116,7 +121,7 @@ public UUID nodeId() {
116121

117122
/** */
118123
public long queryId() {
119-
return queryId;
124+
return qryId;
120125
}
121126

122127
/** */
@@ -183,10 +188,10 @@ public synchronized void resumeTracking() {
183188
@Override public String queryInfo(@Nullable String additionalInfo) {
184189
StringBuilder msgSb = new StringBuilder();
185190

186-
if (queryId == RunningQueryManager.UNDEFINED_QUERY_ID)
191+
if (qryId == RunningQueryManager.UNDEFINED_QUERY_ID)
187192
msgSb.append(" [globalQueryId=(undefined), node=").append(nodeId);
188193
else
189-
msgSb.append(" [globalQueryId=").append(QueryUtils.globalQueryId(nodeId, queryId));
194+
msgSb.append(" [globalQueryId=").append(QueryUtils.globalQueryId(nodeId, qryId));
190195

191196
if (additionalInfo != null)
192197
msgSb.append(", ").append(additionalInfo);
@@ -197,6 +202,7 @@ public synchronized void resumeTracking() {
197202
.append(", enforceJoinOrder=").append(enforceJoinOrder)
198203
.append(", lazy=").append(lazy)
199204
.append(", schema=").append(schema)
205+
.append(", initiatorId=").append(initiatorId)
200206
.append(", sql='").append(sql)
201207
.append("', plan=").append(plan());
202208

modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ private GridQueryFieldsResult executeSelectLocal(
435435
H2Utils.bindParameters(stmt, F.asList(params));
436436

437437
qryInfo = new H2QueryInfo(H2QueryInfo.QueryType.LOCAL, stmt, qry,
438-
ctx.localNodeId(), qryId);
438+
ctx.localNodeId(), qryId, qryDesc.queryInitiatorId());
439439

440440
heavyQryTracker.startTracking(qryInfo);
441441

@@ -1436,6 +1436,7 @@ private Iterable<List<?>> executeSelectDistributed(
14361436
return IgniteH2Indexing.this.rdcQryExec.query(
14371437
qryId,
14381438
qryDesc.schemaName(),
1439+
qryDesc.queryInitiatorId(),
14391440
twoStepQry,
14401441
keepBinary,
14411442
qryDesc.enforceJoinOrder(),

modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/MapH2QueryInfo.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,20 @@ public class MapH2QueryInfo extends H2QueryInfo {
3535
* @param sql Query statement.
3636
* @param nodeId Originator node id.
3737
* @param qryId Query id.
38+
* @param initiatorId Query initiator id.
3839
* @param reqId Request ID.
3940
* @param segment Segment.
4041
*/
41-
public MapH2QueryInfo(PreparedStatement stmt, String sql, UUID nodeId, long qryId, long reqId,
42-
int segment) {
43-
super(QueryType.MAP, stmt, sql, nodeId, qryId);
42+
public MapH2QueryInfo(
43+
PreparedStatement stmt,
44+
String sql,
45+
UUID nodeId,
46+
long qryId,
47+
String initiatorId,
48+
long reqId,
49+
int segment
50+
) {
51+
super(QueryType.MAP, stmt, sql, nodeId, qryId, initiatorId);
4452

4553
this.reqId = reqId;
4654
this.segment = segment;

modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ReduceH2QueryInfo.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,11 @@ public class ReduceH2QueryInfo extends H2QueryInfo {
3232
* @param sql Query statement.
3333
* @param nodeId Originator node id.
3434
* @param qryId Query id.
35+
* @param initiatorId Query initiator id.
3536
* @param reqId Request ID.
3637
*/
37-
public ReduceH2QueryInfo(PreparedStatement stmt, String sql, UUID nodeId, long qryId, long reqId) {
38-
super(QueryType.REDUCE, stmt, sql, nodeId, qryId);
38+
public ReduceH2QueryInfo(PreparedStatement stmt, String sql, UUID nodeId, long qryId, String initiatorId, long reqId) {
39+
super(QueryType.REDUCE, stmt, sql, nodeId, qryId, initiatorId);
3940

4041
this.reqId = reqId;
4142
}

modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ public void onQueryRequest(final ClusterNode node, final GridH2QueryRequest req)
257257
req.requestId(),
258258
segment0,
259259
req.schemaName(),
260+
req.queryInitiatorId(),
260261
req.queries(),
261262
cacheIds,
262263
req.topologyVersion(),
@@ -285,6 +286,7 @@ public void onQueryRequest(final ClusterNode node, final GridH2QueryRequest req)
285286
req.requestId(),
286287
firstSegment,
287288
req.schemaName(),
289+
req.queryInitiatorId(),
288290
req.queries(),
289291
cacheIds,
290292
req.topologyVersion(),
@@ -312,6 +314,7 @@ public void onQueryRequest(final ClusterNode node, final GridH2QueryRequest req)
312314
* @param reqId Request ID.
313315
* @param segmentId index segment ID.
314316
* @param schemaName Schema name.
317+
* @param qryInitiatorId Query initiator ID.
315318
* @param qrys Queries to execute.
316319
* @param cacheIds Caches which will be affected by these queries.
317320
* @param topVer Topology version.
@@ -332,6 +335,7 @@ private void onQueryRequest0(
332335
final long reqId,
333336
final int segmentId,
334337
final String schemaName,
338+
final String qryInitiatorId,
335339
final Collection<GridCacheSqlQuery> qrys,
336340
final List<Integer> cacheIds,
337341
final AffinityTopologyVersion topVer,
@@ -464,7 +468,7 @@ private void onQueryRequest0(
464468

465469
H2Utils.bindParameters(stmt, params0);
466470

467-
qryInfo = new MapH2QueryInfo(stmt, qry.query(), node.id(), qryId, reqId, segmentId);
471+
qryInfo = new MapH2QueryInfo(stmt, qry.query(), node.id(), qryId, qryInitiatorId, reqId, segmentId);
468472

469473
h2.heavyQueriesTracker().startTracking(qryInfo);
470474

modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ else if (msg.page() == 0) // Count down only on each first page received.
326326
/**
327327
* @param qryId Query ID.
328328
* @param schemaName Schema name.
329+
* @param qryInitiatorId Query initiator ID.
329330
* @param qry Query.
330331
* @param keepBinary Keep binary.
331332
* @param enforceJoinOrder Enforce join order of tables.
@@ -342,6 +343,7 @@ else if (msg.page() == 0) // Count down only on each first page received.
342343
public Iterator<List<?>> query(
343344
long qryId,
344345
String schemaName,
346+
String qryInitiatorId,
345347
final GridCacheTwoStepQuery qry,
346348
boolean keepBinary,
347349
boolean enforceJoinOrder,
@@ -438,7 +440,8 @@ public Iterator<List<?>> query(
438440
.flags(queryFlags(qry, enforceJoinOrder, lazy, dataPageScanEnabled))
439441
.timeout(timeoutMillis)
440442
.explicitTimeout(true)
441-
.schemaName(schemaName);
443+
.schemaName(schemaName)
444+
.queryInitiatorId(qryInitiatorId);
442445

443446
final C2<ClusterNode, Message, Message> spec =
444447
parts == null ? null : new ReducePartitionsSpecializer(mapping.queryPartitionsMap());
@@ -513,7 +516,7 @@ else if (QueryUtils.wasCancelled(err))
513516
H2Utils.bindParameters(stmt, F.asList(rdc.parameters(params)));
514517

515518
qryInfo = new ReduceH2QueryInfo(stmt, qry.originalSql(),
516-
ctx.localNodeId(), qryId, qryReqId);
519+
ctx.localNodeId(), qryId, qryInitiatorId, qryReqId);
517520

518521
h2.heavyQueriesTracker().startTracking(qryInfo);
519522

@@ -1257,7 +1260,6 @@ private static Map<UUID, int[]> convert(Map<ClusterNode, IntArray> m) {
12571260
* @return Table.
12581261
* @throws IgniteCheckedException If failed.
12591262
*/
1260-
@SuppressWarnings("unchecked")
12611263
private ReduceTable createMergeTable(H2PooledConnection conn, GridCacheSqlQuery qry, boolean explain)
12621264
throws IgniteCheckedException {
12631265
try {

0 commit comments

Comments
 (0)