Skip to content

Commit e7414f3

Browse files
authored
Added transaction tracer (#76)
2 parents 1e83223 + 3d21311 commit e7414f3

File tree

14 files changed

+250
-81
lines changed

14 files changed

+250
-81
lines changed

jdbc/src/main/java/tech/ydb/jdbc/YdbDriver.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,11 @@ public boolean acceptsURL(String url) {
9898
return YdbConfig.isYdb(url);
9999
}
100100

101+
@Override
102+
public String toString() {
103+
return YdbDriverInfo.DRIVER_FULL_NAME;
104+
}
105+
101106
@Override
102107
public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException {
103108
YdbConfig config = YdbConfig.from(url, info);
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package tech.ydb.jdbc;
2+
3+
import java.util.ArrayList;
4+
import java.util.Date;
5+
import java.util.List;
6+
import java.util.concurrent.atomic.AtomicLong;
7+
import java.util.logging.Level;
8+
import java.util.logging.Logger;
9+
10+
11+
12+
/**
13+
*
14+
* @author Aleksandr Gorshenin
15+
*/
16+
public class YdbTracer {
17+
private static final Logger LOGGER = Logger.getLogger(YdbTracer.class.getName());
18+
private static final ThreadLocal<YdbTracer> LOCAL = new ThreadLocal<>();
19+
private static final AtomicLong ANONYMOUS_COUNTER = new AtomicLong(0);
20+
21+
private final Date startDate = new Date();
22+
private final long startedAt = System.currentTimeMillis();
23+
private final List<Record> records = new ArrayList<>();
24+
25+
private String txID = null;
26+
private boolean isMarked = false;
27+
private boolean isClosed = false;
28+
29+
private class Record {
30+
private final long executedAt = System.currentTimeMillis();
31+
private final String message;
32+
33+
Record(String message) {
34+
this.message = message;
35+
}
36+
}
37+
38+
public static void clear() {
39+
LOCAL.remove();
40+
}
41+
42+
public static YdbTracer current() {
43+
YdbTracer tracer = LOCAL.get();
44+
if (tracer == null || tracer.isClosed) {
45+
tracer = new YdbTracer();
46+
LOCAL.set(tracer);
47+
}
48+
49+
return tracer;
50+
}
51+
52+
public void trace(String message) {
53+
records.add(new Record(message));
54+
}
55+
56+
public void setId(String id) {
57+
this.txID = id;
58+
trace("set-id " + id);
59+
}
60+
61+
public void markToPrint() {
62+
this.isMarked = true;
63+
trace("markToPrint");
64+
}
65+
66+
public void close() {
67+
isClosed = true;
68+
69+
LOCAL.remove();
70+
71+
final Level level = isMarked ? Level.INFO : Level.FINE;
72+
if (!LOGGER.isLoggable(level) || records.isEmpty()) {
73+
return;
74+
}
75+
76+
long finishedAt = System.currentTimeMillis();
77+
78+
final String id = txID != null ? txID : "anonymous-" + ANONYMOUS_COUNTER.incrementAndGet();
79+
LOGGER.log(level, "Trace[{0}] started at {1}", new Object[] {id, startDate});
80+
long last = startedAt;
81+
for (Record record: records) {
82+
long ms = record.executedAt - last;
83+
LOGGER.log(level, "Trace[{0}] {1} ms {2}", new Object[] {id, ms, record.message});
84+
last = record.executedAt;
85+
}
86+
LOGGER.log(level, "Trace[{0}] finished in {1} ms", new Object[] {id, finishedAt - startedAt});
87+
}
88+
}

jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import tech.ydb.core.grpc.GrpcReadStream;
1212
import tech.ydb.jdbc.YdbConst;
1313
import tech.ydb.jdbc.YdbStatement;
14+
import tech.ydb.jdbc.YdbTracer;
1415
import tech.ydb.jdbc.exception.ExceptionFactory;
1516
import tech.ydb.jdbc.impl.YdbQueryResult;
1617
import tech.ydb.jdbc.query.QueryType;
@@ -33,9 +34,11 @@ public abstract class BaseYdbExecutor implements YdbExecutor {
3334
private final Duration sessionTimeout;
3435
private final TableClient tableClient;
3536
private final AtomicReference<YdbQueryResult> currResult;
37+
protected final boolean traceEnabled;
3638

3739
public BaseYdbExecutor(YdbContext ctx) {
3840
this.retryCtx = ctx.getRetryCtx();
41+
this.traceEnabled = ctx.isTxTracerEnabled();
3942
this.sessionTimeout = ctx.getOperationProperties().getSessionTimeout();
4043
this.tableClient = ctx.getTableClient();
4144
this.currResult = new AtomicReference<>();
@@ -74,6 +77,16 @@ public void ensureOpened() throws SQLException {
7477
}
7578
}
7679

80+
@Override
81+
public YdbTracer trace(String message) {
82+
if (!traceEnabled) {
83+
return null;
84+
}
85+
YdbTracer tracer = YdbTracer.current();
86+
tracer.trace(message);
87+
return tracer;
88+
}
89+
7790
@Override
7891
public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query) throws SQLException {
7992
ensureOpened();
@@ -83,11 +96,16 @@ public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query)
8396
YdbValidator validator = statement.getValidator();
8497

8598
// Scheme query does not affect transactions or result sets
99+
YdbTracer tracer = trace("--> scheme >>\n" + yql);
86100
ExecuteSchemeQuerySettings settings = ctx.withDefaultTimeout(new ExecuteSchemeQuerySettings());
87-
validator.execute(QueryType.SCHEME_QUERY + " >>\n" + yql,
101+
validator.execute(QueryType.SCHEME_QUERY + " >>\n" + yql, tracer,
88102
() -> retryCtx.supplyStatus(session -> session.executeSchemeQuery(yql, settings))
89103
);
90104

105+
if (tracer != null && !isInsideTransaction()) {
106+
tracer.close();
107+
}
108+
91109
return updateCurrentResult(new StaticQueryResult(query, Collections.emptyList()));
92110
}
93111

@@ -98,10 +116,15 @@ public YdbQueryResult executeBulkUpsert(YdbStatement statement, YdbQuery query,
98116

99117
String yql = query.getPreparedYql();
100118
YdbValidator validator = statement.getValidator();
101-
validator.execute(QueryType.BULK_QUERY + " >>\n" + yql,
119+
YdbTracer tracer = trace("--> bulk upsert >>\n" + yql);
120+
validator.execute(QueryType.BULK_QUERY + " >>\n" + yql, tracer,
102121
() -> retryCtx.supplyStatus(session -> session.executeBulkUpsert(tablePath, rows))
103122
);
104123

124+
if (tracer != null && !isInsideTransaction()) {
125+
tracer.close();
126+
}
127+
105128
return updateCurrentResult(new StaticQueryResult(query, Collections.emptyList()));
106129
}
107130

@@ -116,11 +139,11 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S
116139
ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder()
117140
.withRequestTimeout(scanQueryTimeout)
118141
.build();
119-
142+
String msg = QueryType.SCAN_QUERY + " >>\n" + yql;
143+
final YdbTracer tracer = trace("--> scan query >>\n" + yql);
120144
final Session session = createNewTableSession(validator);
121145

122-
String msg = QueryType.SCAN_QUERY + " >>\n" + yql;
123-
StreamQueryResult lazy = validator.call(msg, () -> {
146+
StreamQueryResult lazy = validator.call(msg, null, () -> {
124147
final CompletableFuture<Result<StreamQueryResult>> future = new CompletableFuture<>();
125148
final GrpcReadStream<ResultSetReader> stream = session.executeScanQuery(yql, params, settings);
126149
final StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel);
@@ -134,11 +157,21 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S
134157
if (th != null) {
135158
result.onStreamFinished(th);
136159
future.completeExceptionally(th);
160+
161+
if (tracer != null) {
162+
tracer.trace("<-- " + th.getMessage());
163+
tracer.close();
164+
}
137165
}
138166
if (st != null) {
139167
validator.addStatusIssues(st);
140168
result.onStreamFinished(st);
141169
future.complete(st.isSuccess() ? Result.success(result) : Result.fail(st));
170+
171+
if (tracer != null) {
172+
tracer.trace("<-- " + st.toString());
173+
tracer.close();
174+
}
142175
}
143176
});
144177

jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import tech.ydb.jdbc.YdbConst;
2020
import tech.ydb.jdbc.YdbResultSet;
2121
import tech.ydb.jdbc.YdbStatement;
22+
import tech.ydb.jdbc.YdbTracer;
2223
import tech.ydb.jdbc.exception.ExceptionFactory;
2324
import tech.ydb.jdbc.impl.YdbQueryResult;
2425
import tech.ydb.jdbc.impl.YdbStaticResultSet;
@@ -187,14 +188,18 @@ public void commit(YdbContext ctx, YdbValidator validator) throws SQLException {
187188
return;
188189
}
189190

191+
YdbTracer tracer = trace("--> commit");
190192
CommitTransactionSettings settings = ctx.withRequestTimeout(CommitTransactionSettings.newBuilder()).build();
191193
try {
192194
validator.clearWarnings();
193-
validator.call("Commit TxId: " + localTx.getId(), () -> localTx.commit(settings));
195+
validator.call("Commit TxId: " + localTx.getId(), tracer, () -> localTx.commit(settings));
194196
} finally {
195197
if (tx.compareAndSet(localTx, null)) {
196198
localTx.getSession().close();
197199
}
200+
if (tracer != null) {
201+
tracer.close();
202+
}
198203
}
199204
}
200205

@@ -207,16 +212,20 @@ public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException
207212
return;
208213
}
209214

215+
YdbTracer tracer = trace("--> rollback");
210216
RollbackTransactionSettings settings = ctx.withRequestTimeout(RollbackTransactionSettings.newBuilder())
211217
.build();
212218

213219
try {
214220
validator.clearWarnings();
215-
validator.execute("Rollback TxId: " + localTx.getId(), () -> localTx.rollback(settings));
221+
validator.execute("Rollback TxId: " + localTx.getId(), tracer, () -> localTx.rollback(settings));
216222
} finally {
217223
if (tx.compareAndSet(localTx, null)) {
218224
localTx.getSession().close();
219225
}
226+
if (tracer != null) {
227+
tracer.close();
228+
}
220229
}
221230
}
222231

@@ -246,8 +255,9 @@ public YdbQueryResult executeDataQuery(
246255
final QueryTransaction localTx = nextTx;
247256

248257
if (useStreamResultSet) {
258+
YdbTracer tracer = trace("--> stream query >>\n" + yql);
249259
String msg = "STREAM_QUERY >>\n" + yql;
250-
StreamQueryResult lazy = validator.call(msg, () -> {
260+
StreamQueryResult lazy = validator.call(msg, null, () -> {
251261
final CompletableFuture<Result<StreamQueryResult>> future = new CompletableFuture<>();
252262
final QueryStream stream = localTx.createQuery(yql, isAutoCommit, params, settings);
253263
final StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel);
@@ -274,11 +284,27 @@ public void onNextPart(QueryResultPart part) {
274284
if (th != null) {
275285
future.completeExceptionally(th);
276286
result.onStreamFinished(th);
287+
if (tracer != null) {
288+
tracer.trace("<-- " + th.getMessage());
289+
if (localTx.isActive()) {
290+
tracer.setId(localTx.getId());
291+
} else {
292+
tracer.close();
293+
}
294+
}
277295
}
278296
if (res != null) {
279297
validator.addStatusIssues(res.getStatus());
280298
future.complete(res.isSuccess() ? Result.success(result) : Result.fail(res.getStatus()));
281299
result.onStreamFinished(res.getStatus());
300+
if (tracer != null) {
301+
tracer.trace("<-- " + res.getStatus().toString());
302+
if (localTx.isActive()) {
303+
tracer.setId(localTx.getId());
304+
} else {
305+
tracer.close();
306+
}
307+
}
282308
}
283309
});
284310

@@ -288,8 +314,9 @@ public void onNextPart(QueryResultPart part) {
288314
return updateCurrentResult(lazy);
289315
}
290316

317+
YdbTracer tracer = trace("--> data query >>\n" + yql);
291318
try {
292-
QueryReader result = validator.call(QueryType.DATA_QUERY + " >>\n" + yql,
319+
QueryReader result = validator.call(QueryType.DATA_QUERY + " >>\n" + yql, tracer,
293320
() -> QueryReader.readFrom(localTx.createQuery(yql, isAutoCommit, params, settings))
294321
);
295322
validator.addStatusIssues(result.getIssueList());
@@ -305,6 +332,14 @@ public void onNextPart(QueryResultPart part) {
305332
localTx.getSession().close();
306333
}
307334
}
335+
336+
if (tracer != null) {
337+
if (localTx.isActive()) {
338+
tracer.setId(localTx.getId());
339+
} else {
340+
tracer.close();
341+
}
342+
}
308343
}
309344
}
310345

@@ -317,14 +352,20 @@ public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query)
317352
YdbValidator validator = statement.getValidator();
318353

319354
// Scheme query does not affect transactions or result sets
355+
YdbTracer tracer = trace("--> scheme query >>\n" + yql);
320356
ExecuteQuerySettings settings = ctx.withRequestTimeout(ExecuteQuerySettings.newBuilder()).build();
321357
try (QuerySession session = createNewQuerySession(validator)) {
322-
validator.call(QueryType.SCHEME_QUERY + " >>\n" + yql, () -> session
358+
validator.call(QueryType.SCHEME_QUERY + " >>\n" + yql, tracer, () -> session
323359
.createQuery(yql, TxMode.NONE, Params.empty(), settings)
324360
.execute(new IssueHandler(validator))
325361
);
362+
} finally {
363+
if (tracer != null && tx.get() == null) {
364+
tracer.close();
365+
}
326366
}
327367

368+
328369
return updateCurrentResult(new StaticQueryResult(query, Collections.emptyList()));
329370
}
330371

@@ -340,9 +381,10 @@ public YdbQueryResult executeExplainQuery(YdbStatement statement, YdbQuery query
340381
ExecuteQuerySettings settings = ctx.withRequestTimeout(ExecuteQuerySettings.newBuilder())
341382
.withExecMode(QueryExecMode.EXPLAIN)
342383
.build();
384+
YdbTracer tracer = trace("--> explain query >>\n" + yql);
343385

344386
try (QuerySession session = createNewQuerySession(validator)) {
345-
QueryInfo res = validator.call(QueryType.EXPLAIN_QUERY + " >>\n" + yql, () -> session
387+
QueryInfo res = validator.call(QueryType.EXPLAIN_QUERY + " >>\n" + yql, tracer, () -> session
346388
.createQuery(yql, TxMode.NONE, Params.empty(), settings)
347389
.execute(new IssueHandler(validator))
348390
);
@@ -354,6 +396,10 @@ public YdbQueryResult executeExplainQuery(YdbStatement statement, YdbQuery query
354396
return updateCurrentResult(
355397
new StaticQueryResult(statement, res.getStats().getQueryAst(), res.getStats().getQueryPlan())
356398
);
399+
} finally {
400+
if (tracer != null && tx.get() == null) {
401+
tracer.close();
402+
}
357403
}
358404
}
359405

0 commit comments

Comments
 (0)