Skip to content

Commit 7c3358f

Browse files
committed
Move all caches to separate class
1 parent 48bed70 commit 7c3358f

File tree

6 files changed

+326
-250
lines changed

6 files changed

+326
-250
lines changed

jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S
158158
try {
159159
Collection<ResultSetReader> resultSets = new LinkedBlockingQueue<>();
160160

161-
ctx.traceQuery(query, yql);
161+
ctx.traceQueryByFullScanDetector(query, yql);
162162
validator.execute(QueryType.SCAN_QUERY + " >>\n" + yql, tracer,
163163
() -> session.executeScanQuery(yql, params, settings).start(resultSets::add)
164164
);
Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,289 @@
1+
package tech.ydb.jdbc.context;
2+
3+
import java.nio.charset.StandardCharsets;
4+
import java.sql.SQLDataException;
5+
import java.sql.SQLException;
6+
import java.util.ArrayList;
7+
import java.util.Collection;
8+
import java.util.Collections;
9+
import java.util.Comparator;
10+
import java.util.List;
11+
import java.util.Map;
12+
import java.util.concurrent.TimeUnit;
13+
14+
import com.google.common.base.Supplier;
15+
import com.google.common.base.Suppliers;
16+
import com.google.common.cache.Cache;
17+
import com.google.common.cache.CacheBuilder;
18+
19+
import tech.ydb.core.Result;
20+
import tech.ydb.core.UnexpectedResultException;
21+
import tech.ydb.jdbc.YdbConst;
22+
import tech.ydb.jdbc.YdbPrepareMode;
23+
import tech.ydb.jdbc.YdbTracer;
24+
import tech.ydb.jdbc.exception.ExceptionFactory;
25+
import tech.ydb.jdbc.query.QueryKey;
26+
import tech.ydb.jdbc.query.QueryType;
27+
import tech.ydb.jdbc.query.YdbPreparedQuery;
28+
import tech.ydb.jdbc.query.YdbQuery;
29+
import tech.ydb.jdbc.query.YqlBatcher;
30+
import tech.ydb.jdbc.query.params.BatchedQuery;
31+
import tech.ydb.jdbc.query.params.BulkUpsertQuery;
32+
import tech.ydb.jdbc.query.params.InMemoryQuery;
33+
import tech.ydb.jdbc.query.params.PreparedQuery;
34+
import tech.ydb.jdbc.settings.YdbQueryProperties;
35+
import tech.ydb.table.SessionRetryContext;
36+
import tech.ydb.table.description.TableDescription;
37+
import tech.ydb.table.query.DataQuery;
38+
import tech.ydb.table.query.DataQueryResult;
39+
import tech.ydb.table.query.ExplainDataQueryResult;
40+
import tech.ydb.table.result.ResultSetReader;
41+
import tech.ydb.table.settings.DescribeTableSettings;
42+
import tech.ydb.table.settings.ExplainDataQuerySettings;
43+
import tech.ydb.table.settings.PrepareDataQuerySettings;
44+
import tech.ydb.table.transaction.TxControl;
45+
import tech.ydb.table.values.Type;
46+
47+
/**
48+
*
49+
* @author Aleksandr Gorshenin
50+
*/
51+
public class YdbCache {
52+
private final YdbContext ctx;
53+
private final SessionRetryContext retryCtx;
54+
private final YdbQueryProperties queryOptions;
55+
56+
private final Cache<QueryKey, YdbQuery> queriesCache;
57+
private final Cache<String, QueryStat> statsCache;
58+
private final Cache<String, Map<String, Type>> queryParamsCache;
59+
private final Cache<String, TableDescription> tableDescribeCache;
60+
61+
private final Supplier<String> version = Suppliers.memoizeWithExpiration(this::readVersion, 1, TimeUnit.HOURS);
62+
63+
public YdbCache(YdbContext ctx, YdbQueryProperties queryOptions, int cacheSize, boolean fullScanDetector) {
64+
this.ctx = ctx;
65+
this.retryCtx = SessionRetryContext.create(ctx.getTableClient()).build();
66+
this.queryOptions = queryOptions;
67+
68+
if (cacheSize > 0) {
69+
queriesCache = CacheBuilder.newBuilder().maximumSize(cacheSize).build();
70+
queryParamsCache = CacheBuilder.newBuilder().maximumSize(cacheSize).build();
71+
tableDescribeCache = CacheBuilder.newBuilder().maximumSize(cacheSize).build();
72+
if (fullScanDetector) {
73+
statsCache = CacheBuilder.newBuilder().maximumSize(cacheSize).build();
74+
} else {
75+
statsCache = null;
76+
}
77+
} else {
78+
queriesCache = null;
79+
statsCache = null;
80+
queryParamsCache = null;
81+
tableDescribeCache = null;
82+
}
83+
}
84+
85+
String getDatabaseVersion() {
86+
return version.get();
87+
}
88+
89+
Cache<String, TableDescription> getTableDescriptionCache() {
90+
return tableDescribeCache;
91+
}
92+
93+
YdbQueryProperties getQueryOptions() {
94+
return this.queryOptions;
95+
}
96+
97+
public boolean queryStatsEnabled() {
98+
return statsCache != null;
99+
}
100+
101+
public void resetQueryStats() {
102+
if (statsCache != null) {
103+
statsCache.invalidateAll();
104+
}
105+
}
106+
107+
public Collection<QueryStat> getQueryStats() {
108+
if (statsCache == null) {
109+
return Collections.emptyList();
110+
}
111+
List<QueryStat> sorted = new ArrayList<>(statsCache.asMap().values());
112+
Collections.sort(sorted,
113+
Comparator
114+
.comparingLong(QueryStat::getUsageCounter).reversed()
115+
.thenComparing(QueryStat::getPreparedYQL)
116+
);
117+
return sorted;
118+
}
119+
120+
private String readVersion() {
121+
Result<DataQueryResult> res = retryCtx.supplyResult(
122+
s -> s.executeDataQuery("SELECT version();", TxControl.snapshotRo())
123+
).join();
124+
125+
if (res.isSuccess()) {
126+
ResultSetReader rs = res.getValue().getResultSet(0);
127+
if (rs.next()) {
128+
return rs.getColumn(0).getBytesAsString(StandardCharsets.UTF_8);
129+
}
130+
}
131+
return "unknown";
132+
}
133+
134+
public void traceQuery(YdbQuery query, String yql) {
135+
if (statsCache == null) {
136+
return;
137+
}
138+
139+
QueryStat stat = statsCache.getIfPresent(yql);
140+
if (stat == null) {
141+
final ExplainDataQuerySettings settings = ctx.withDefaultTimeout(new ExplainDataQuerySettings());
142+
Result<ExplainDataQueryResult> res = retryCtx.supplyResult(
143+
session -> session.explainDataQuery(yql, settings)
144+
).join();
145+
146+
if (res.isSuccess()) {
147+
ExplainDataQueryResult exp = res.getValue();
148+
stat = new QueryStat(query.getOriginQuery(), yql, exp.getQueryAst(), exp.getQueryPlan());
149+
} else {
150+
stat = new QueryStat(query.getOriginQuery(), yql, res.getStatus());
151+
}
152+
153+
statsCache.put(yql, stat);
154+
}
155+
156+
stat.incrementUsage();
157+
}
158+
159+
public YdbQuery parseYdbQuery(QueryKey key) throws SQLException {
160+
if (queriesCache == null) {
161+
return YdbQuery.parseQuery(key, queryOptions, ctx.getTypes());
162+
}
163+
164+
YdbQuery cached = queriesCache.getIfPresent(key);
165+
if (cached == null) {
166+
cached = YdbQuery.parseQuery(key, queryOptions, ctx.getTypes());
167+
queriesCache.put(key, cached);
168+
}
169+
170+
return cached;
171+
}
172+
173+
public YdbPreparedQuery prepareYdbQuery(YdbQuery query, YdbPrepareMode mode) throws SQLException {
174+
if (statsCache != null) {
175+
if (QueryStat.isPrint(query.getOriginQuery()) || QueryStat.isReset(query.getOriginQuery())) {
176+
return new InMemoryQuery(query, queryOptions.isDeclareJdbcParameters());
177+
}
178+
}
179+
180+
QueryType type = query.getType();
181+
YqlBatcher batcher = query.getYqlBatcher();
182+
183+
if (type == QueryType.BULK_QUERY) {
184+
if (batcher == null || batcher.getCommand() != YqlBatcher.Cmd.UPSERT) {
185+
throw new SQLException(YdbConst.BULKS_UNSUPPORTED);
186+
}
187+
}
188+
189+
if (type == QueryType.EXPLAIN_QUERY || type == QueryType.SCHEME_QUERY ||
190+
!queryOptions.isPrepareDataQueries() || mode == YdbPrepareMode.IN_MEMORY) {
191+
return new InMemoryQuery(query, queryOptions.isDeclareJdbcParameters());
192+
}
193+
194+
if (batcher != null && (mode == YdbPrepareMode.AUTO || type == QueryType.BULK_QUERY)) {
195+
YdbPreparedQuery batched = createBatchQuery(query, batcher);
196+
if (batched != null) {
197+
return batched;
198+
}
199+
}
200+
201+
if (!query.isPlainYQL()) {
202+
return new InMemoryQuery(query, queryOptions.isDeclareJdbcParameters());
203+
}
204+
205+
// try to prepare data query
206+
Map<String, Type> queryTypes = queryParamsCache.getIfPresent(query.getOriginQuery());
207+
if (queryTypes == null) {
208+
String yql = ctx.getPrefixPragma() + query.getPreparedYql();
209+
YdbTracer tracer = ctx.getTracer();
210+
tracer.trace("--> prepare data query");
211+
tracer.trace(yql);
212+
213+
PrepareDataQuerySettings settings = ctx.withDefaultTimeout(new PrepareDataQuerySettings());
214+
Result<DataQuery> result = retryCtx.supplyResult(
215+
session -> session.prepareDataQuery(yql, settings)
216+
).join();
217+
218+
tracer.trace("<-- " + result.getStatus());
219+
if (!result.isSuccess()) {
220+
tracer.close();
221+
throw ExceptionFactory.createException("Cannot prepare data query: " + result.getStatus(),
222+
new UnexpectedResultException("Unexpected status", result.getStatus()));
223+
}
224+
225+
queryTypes = result.getValue().types();
226+
queryParamsCache.put(query.getOriginQuery(), queryTypes);
227+
}
228+
229+
boolean requireBatch = mode == YdbPrepareMode.DATA_QUERY_BATCH;
230+
if (requireBatch || (mode == YdbPrepareMode.AUTO && queryOptions.isDetectBatchQueries())) {
231+
BatchedQuery params = BatchedQuery.tryCreateBatched(ctx.getTypes(), query, queryTypes);
232+
if (params != null) {
233+
return params;
234+
}
235+
236+
if (requireBatch) {
237+
throw new SQLDataException(YdbConst.STATEMENT_IS_NOT_A_BATCH + query.getOriginQuery());
238+
}
239+
}
240+
return new PreparedQuery(ctx.getTypes(), query, queryTypes);
241+
}
242+
243+
private YdbPreparedQuery createBatchQuery(YdbQuery query, YqlBatcher batcher) throws SQLException {
244+
String tablePath = YdbContext.joined(ctx.getPrefixPath(), batcher.getTableName());
245+
Result<TableDescription> description = describeTable(tablePath);
246+
247+
if (query.getType() == QueryType.BULK_QUERY) {
248+
if (query.getReturning() != null) {
249+
throw new SQLException(YdbConst.BULK_NOT_SUPPORT_RETURNING);
250+
}
251+
if (!description.isSuccess()) {
252+
throw new SQLException(YdbConst.BULK_DESCRIBE_ERROR + description.getStatus());
253+
}
254+
return BulkUpsertQuery.build(ctx.getTypes(), tablePath, batcher.getColumns(), description.getValue());
255+
}
256+
257+
if (description.isSuccess()) {
258+
BatchedQuery params = BatchedQuery.createAutoBatched(ctx.getTypes(), query, description.getValue());
259+
if (params != null) {
260+
return params;
261+
}
262+
}
263+
264+
return null;
265+
}
266+
267+
private Result<TableDescription> describeTable(String tablePath) {
268+
TableDescription cached = tableDescribeCache.getIfPresent(tablePath);
269+
if (cached != null) {
270+
return Result.success(cached);
271+
}
272+
273+
YdbTracer tracer = ctx.getTracer();
274+
tracer.trace("--> describe table");
275+
tracer.trace(tablePath);
276+
277+
DescribeTableSettings settings = ctx.withDefaultTimeout(new DescribeTableSettings());
278+
Result<TableDescription> result = retryCtx.supplyResult(session -> session.describeTable(tablePath, settings))
279+
.join();
280+
281+
tracer.trace("<-- " + result.getStatus());
282+
283+
if (result.isSuccess()) {
284+
tableDescribeCache.put(tablePath, result.getValue());
285+
}
286+
287+
return result;
288+
}
289+
}

0 commit comments

Comments
 (0)