Skip to content

Commit e2a24fc

Browse files
IGNITE-27313 SQL: Add information about query initiator id to log - Fixes #12573.
Signed-off-by: Aleksey Plekhanov <[email protected]>
1 parent 098ddec commit e2a24fc

File tree

14 files changed

+164
-29
lines changed

14 files changed

+164
-29
lines changed

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -752,7 +752,8 @@ private <T> T processQuery(
752752
(q, ex) -> qryReg.unregister(q.id(), ex),
753753
log,
754754
qryPlannerTimeout,
755-
timeout
755+
timeout,
756+
fldsQry != null ? fldsQry.getQueryInitiatorId() : null
756757
);
757758

758759
if (qrys != null)

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.concurrent.ConcurrentHashMap;
2323
import java.util.concurrent.ConcurrentMap;
2424
import org.apache.ignite.cache.query.QueryCancelledException;
25-
import org.apache.ignite.cache.query.SqlFieldsQuery;
2625
import org.apache.ignite.internal.GridKernalContext;
2726
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
2827
import org.apache.ignite.internal.processors.query.GridQueryCancel;
@@ -58,12 +57,8 @@ public QueryRegistryImpl(GridKernalContext ctx) {
5857

5958
RunningQueryManager qryMgr = kctx.query().runningQueryManager();
6059

61-
SqlFieldsQuery fieldsQry = rootQry.context().unwrap(SqlFieldsQuery.class);
62-
63-
String initiatorId = fieldsQry != null ? fieldsQry.getQueryInitiatorId() : null;
64-
6560
long locId = qryMgr.register(rootQry.sql(), GridCacheQueryType.SQL_FIELDS, rootQry.context().schemaName(),
66-
false, createCancelToken(qry), initiatorId, false, true, false);
61+
false, createCancelToken(qry), rootQry.initiatorId(), false, true, false);
6762

6863
rootQry.localQueryId(locId);
6964

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ public class RootQuery<RowT> extends Query<RowT> implements TrackableQuery {
9191
/** */
9292
private final long totalTimeout;
9393

94+
/** */
95+
private final String initiatorId;
96+
9497
/** */
9598
private volatile long locQryId;
9699

@@ -113,7 +116,8 @@ public RootQuery(
113116
BiConsumer<Query<RowT>, Throwable> unregister,
114117
IgniteLogger log,
115118
long plannerTimeout,
116-
long totalTimeout
119+
long totalTimeout,
120+
String initiatorId
117121
) {
118122
super(
119123
UUID.randomUUID(),
@@ -135,6 +139,7 @@ public RootQuery(
135139

136140
this.plannerTimeout = totalTimeout > 0 ? Math.min(plannerTimeout, totalTimeout) : plannerTimeout;
137141
this.totalTimeout = totalTimeout;
142+
this.initiatorId = initiatorId;
138143

139144
Context parent = Commons.convert(qryCtx);
140145

@@ -172,7 +177,9 @@ public RootQuery<RowT> childQuery(SchemaPlus schema) {
172177
unregister,
173178
log,
174179
plannerTimeout,
175-
totalTimeout);
180+
totalTimeout,
181+
initiatorId
182+
);
176183
}
177184

178185
/** */
@@ -444,6 +451,7 @@ else if (state == QueryState.CLOSING)
444451
.append(", type=CALCITE")
445452
.append(", state=").append(state)
446453
.append(", schema=").append(ctx.schemaName())
454+
.append(", initiatorId=").append(initiatorId)
447455
.append(", sql='").append(sql);
448456

449457
msgSb.append(']');
@@ -468,6 +476,13 @@ public long remainingTime() {
468476
return curTimeout <= 0 ? 0 : curTimeout;
469477
}
470478

479+
/**
480+
* @return Query initiator ID.
481+
*/
482+
public String initiatorId() {
483+
return initiatorId;
484+
}
485+
471486
/** */
472487
@Override public String toString() {
473488
return S.toString(RootQuery.class, this);

modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1002,7 +1002,7 @@ private void cancelAllQueriesAndWaitForCompletion(IgniteEx ignite, IgniteInterna
10021002
}
10031003
}
10041004

1005-
/** Verifies that user-defined query initiator ID is present in the SQL_QUERY_HISTORY system view. */
1005+
/** Verifies that user-defined query initiator ID is present in the SQL_QUERY_HISTORY system view and logs. */
10061006
@Test
10071007
public void testSqlFieldsQueryWithInitiatorId() throws Exception {
10081008
IgniteEx grid = grid(0);
@@ -1027,6 +1027,18 @@ public void testSqlFieldsQueryWithInitiatorId() throws Exception {
10271027
return testId.equals(view.initiatorId());
10281028
}, 3_000));
10291029
}
1030+
1031+
String initiatorId = "testId2";
1032+
1033+
LogListener logLsnr = LogListener.matches(LONG_QUERY_FINISHED_MSG)
1034+
.andMatches("initiatorId=" + initiatorId).build();
1035+
1036+
log.registerListener(logLsnr);
1037+
1038+
cache.query(new SqlFieldsQuery("SELECT sleep(?)").setArgs(LONG_QRY_TIMEOUT).setQueryInitiatorId(initiatorId))
1039+
.getAll();
1040+
1041+
assertTrue(logLsnr.check(1000));
10301042
}
10311043

10321044
/**
@@ -1136,6 +1148,16 @@ public static boolean waitLatch(long time) {
11361148
return true;
11371149
}
11381150

1151+
/** */
1152+
@QuerySqlFunction
1153+
public static boolean sleep(int sleep) {
1154+
doSleep(sleep);
1155+
1156+
GridTestClockTimer.update();
1157+
1158+
return true;
1159+
}
1160+
11391161
/** */
11401162
@QuerySqlFunction
11411163
public static String innerSql(String ignite, String cache, String val) {

modules/clients/src/test/java/org/apache/ignite/qa/query/WarningOnBigQueryResultsBaseTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public class WarningOnBigQueryResultsBaseTest extends AbstractIndexingCommonTest
6565
/** Log message pattern. */
6666
private static final Pattern logPtrn = Pattern.compile(
6767
"fetched=([0-9]+), duration=([0-9]+)ms, type=(MAP|LOCAL|REDUCE), distributedJoin=(true|false), " +
68-
"enforceJoinOrder=(true|false), lazy=(true|false), schema=(\\S+), sql");
68+
"enforceJoinOrder=(true|false), lazy=(true|false), schema=(\\S+), initiatorId=(\\S+), sql");
6969

7070
/** Test log. */
7171
private static Map<String, BigResultsLogListener> logListeners = new HashMap<>();

modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2DmlInfo.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ public class H2DmlInfo implements TrackableQuery {
3838
/** Schema name. */
3939
private final String schema;
4040

41+
/** Query initiator id. */
42+
private final String initiatorId;
43+
4144
/** Dml command. */
4245
private final String sql;
4346

@@ -48,11 +51,12 @@ public class H2DmlInfo implements TrackableQuery {
4851
* @param schema Schema name.
4952
* @param sql Dml command.
5053
*/
51-
public H2DmlInfo(long beginTs, long qryId, UUID initNodeId, String schema, String sql) {
54+
public H2DmlInfo(long beginTs, long qryId, UUID initNodeId, String schema, String initiatorId, String sql) {
5255
this.beginTs = beginTs;
5356
this.qryId = qryId;
5457
this.initNodeId = initNodeId;
5558
this.schema = schema;
59+
this.initiatorId = initiatorId;
5660
this.sql = sql;
5761
}
5862

@@ -76,6 +80,7 @@ public H2DmlInfo(long beginTs, long qryId, UUID initNodeId, String schema, Strin
7680
msgSb.append(", duration=").append(time()).append("ms")
7781
.append(", type=DML")
7882
.append(", schema=").append(schema)
83+
.append(", initiatorId=").append(initiatorId)
7984
.append(", sql='").append(sql).append("']");
8085

8186
return msgSb.toString();

modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,10 @@ public class H2QueryInfo implements TrackableQuery {
7272
private final UUID nodeId;
7373

7474
/** Query id. */
75-
private final long queryId;
75+
private final long qryId;
76+
77+
/** Query initiator ID. */
78+
private final String initiatorId;
7679

7780
/** Query SQL plan. */
7881
private volatile String plan;
@@ -82,16 +85,18 @@ public class H2QueryInfo implements TrackableQuery {
8285
* @param stmt Query statement.
8386
* @param sql Query statement.
8487
* @param nodeId Originator node id.
85-
* @param queryId Query id.
88+
* @param qryId Query id.
89+
* @param initiatorId Query initiator id.
8690
*/
87-
public H2QueryInfo(QueryType type, PreparedStatement stmt, String sql, UUID nodeId, long queryId) {
91+
public H2QueryInfo(QueryType type, PreparedStatement stmt, String sql, UUID nodeId, long qryId, String initiatorId) {
8892
try {
8993
assert stmt != null;
9094

9195
this.type = type;
9296
this.sql = sql;
9397
this.nodeId = nodeId;
94-
this.queryId = queryId;
98+
this.qryId = qryId;
99+
this.initiatorId = initiatorId;
95100

96101
beginTs = U.currentTimeMillis();
97102

@@ -116,7 +121,7 @@ public UUID nodeId() {
116121

117122
/** */
118123
public long queryId() {
119-
return queryId;
124+
return qryId;
120125
}
121126

122127
/** */
@@ -183,10 +188,10 @@ public synchronized void resumeTracking() {
183188
@Override public String queryInfo(@Nullable String additionalInfo) {
184189
StringBuilder msgSb = new StringBuilder();
185190

186-
if (queryId == RunningQueryManager.UNDEFINED_QUERY_ID)
191+
if (qryId == RunningQueryManager.UNDEFINED_QUERY_ID)
187192
msgSb.append(" [globalQueryId=(undefined), node=").append(nodeId);
188193
else
189-
msgSb.append(" [globalQueryId=").append(QueryUtils.globalQueryId(nodeId, queryId));
194+
msgSb.append(" [globalQueryId=").append(QueryUtils.globalQueryId(nodeId, qryId));
190195

191196
if (additionalInfo != null)
192197
msgSb.append(", ").append(additionalInfo);
@@ -197,6 +202,7 @@ public synchronized void resumeTracking() {
197202
.append(", enforceJoinOrder=").append(enforceJoinOrder)
198203
.append(", lazy=").append(lazy)
199204
.append(", schema=").append(schema)
205+
.append(", initiatorId=").append(initiatorId)
200206
.append(", sql='").append(sql)
201207
.append("', plan=").append(plan());
202208

modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ private GridQueryFieldsResult executeSelectLocal(
435435
H2Utils.bindParameters(stmt, F.asList(params));
436436

437437
qryInfo = new H2QueryInfo(H2QueryInfo.QueryType.LOCAL, stmt, qry,
438-
ctx.localNodeId(), qryId);
438+
ctx.localNodeId(), qryId, qryDesc.queryInitiatorId());
439439

440440
heavyQryTracker.startTracking(qryInfo);
441441

@@ -1064,6 +1064,7 @@ private List<? extends FieldsQueryCursor<List<?>>> executeDml(
10641064
qryId,
10651065
ctx.localNodeId(),
10661066
qryDesc.schemaName(),
1067+
qryDesc.queryInitiatorId(),
10671068
qryDesc.sql()
10681069
);
10691070

@@ -1436,6 +1437,7 @@ private Iterable<List<?>> executeSelectDistributed(
14361437
return IgniteH2Indexing.this.rdcQryExec.query(
14371438
qryId,
14381439
qryDesc.schemaName(),
1440+
qryDesc.queryInitiatorId(),
14391441
twoStepQry,
14401442
keepBinary,
14411443
qryDesc.enforceJoinOrder(),

modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/MapH2QueryInfo.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,20 @@ public class MapH2QueryInfo extends H2QueryInfo {
3535
* @param sql Query statement.
3636
* @param nodeId Originator node id.
3737
* @param qryId Query id.
38+
* @param initiatorId Query initiator id.
3839
* @param reqId Request ID.
3940
* @param segment Segment.
4041
*/
41-
public MapH2QueryInfo(PreparedStatement stmt, String sql, UUID nodeId, long qryId, long reqId,
42-
int segment) {
43-
super(QueryType.MAP, stmt, sql, nodeId, qryId);
42+
public MapH2QueryInfo(
43+
PreparedStatement stmt,
44+
String sql,
45+
UUID nodeId,
46+
long qryId,
47+
String initiatorId,
48+
long reqId,
49+
int segment
50+
) {
51+
super(QueryType.MAP, stmt, sql, nodeId, qryId, initiatorId);
4452

4553
this.reqId = reqId;
4654
this.segment = segment;

modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ReduceH2QueryInfo.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,11 @@ public class ReduceH2QueryInfo extends H2QueryInfo {
3232
* @param sql Query statement.
3333
* @param nodeId Originator node id.
3434
* @param qryId Query id.
35+
* @param initiatorId Query initiator id.
3536
* @param reqId Request ID.
3637
*/
37-
public ReduceH2QueryInfo(PreparedStatement stmt, String sql, UUID nodeId, long qryId, long reqId) {
38-
super(QueryType.REDUCE, stmt, sql, nodeId, qryId);
38+
public ReduceH2QueryInfo(PreparedStatement stmt, String sql, UUID nodeId, long qryId, String initiatorId, long reqId) {
39+
super(QueryType.REDUCE, stmt, sql, nodeId, qryId, initiatorId);
3940

4041
this.reqId = reqId;
4142
}

0 commit comments

Comments
 (0)