Skip to content

Commit 9284b4d

Browse files
authored
Added support of query rewriting (#139)
2 parents 48bed70 + d5df236 commit 9284b4d

File tree

12 files changed

+732
-253
lines changed

12 files changed

+732
-253
lines changed

jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S
158158
try {
159159
Collection<ResultSetReader> resultSets = new LinkedBlockingQueue<>();
160160

161-
ctx.traceQuery(query, yql);
161+
ctx.traceQueryByFullScanDetector(query, yql);
162162
validator.execute(QueryType.SCAN_QUERY + " >>\n" + yql, tracer,
163163
() -> session.executeScanQuery(yql, params, settings).start(resultSets::add)
164164
);
Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
package tech.ydb.jdbc.context;
2+
3+
import java.nio.charset.StandardCharsets;
4+
import java.sql.SQLDataException;
5+
import java.sql.SQLException;
6+
import java.util.ArrayList;
7+
import java.util.Collection;
8+
import java.util.Collections;
9+
import java.util.Comparator;
10+
import java.util.List;
11+
import java.util.Map;
12+
import java.util.concurrent.TimeUnit;
13+
14+
import com.google.common.base.Supplier;
15+
import com.google.common.base.Suppliers;
16+
import com.google.common.cache.Cache;
17+
import com.google.common.cache.CacheBuilder;
18+
19+
import tech.ydb.core.Result;
20+
import tech.ydb.core.UnexpectedResultException;
21+
import tech.ydb.jdbc.YdbConst;
22+
import tech.ydb.jdbc.YdbPrepareMode;
23+
import tech.ydb.jdbc.YdbTracer;
24+
import tech.ydb.jdbc.exception.ExceptionFactory;
25+
import tech.ydb.jdbc.query.QueryKey;
26+
import tech.ydb.jdbc.query.QueryType;
27+
import tech.ydb.jdbc.query.YdbPreparedQuery;
28+
import tech.ydb.jdbc.query.YdbQuery;
29+
import tech.ydb.jdbc.query.YqlBatcher;
30+
import tech.ydb.jdbc.query.params.BatchedQuery;
31+
import tech.ydb.jdbc.query.params.BulkUpsertQuery;
32+
import tech.ydb.jdbc.query.params.InMemoryQuery;
33+
import tech.ydb.jdbc.query.params.PreparedQuery;
34+
import tech.ydb.jdbc.settings.YdbQueryProperties;
35+
import tech.ydb.table.SessionRetryContext;
36+
import tech.ydb.table.description.TableDescription;
37+
import tech.ydb.table.query.DataQuery;
38+
import tech.ydb.table.query.DataQueryResult;
39+
import tech.ydb.table.query.ExplainDataQueryResult;
40+
import tech.ydb.table.result.ResultSetReader;
41+
import tech.ydb.table.settings.DescribeTableSettings;
42+
import tech.ydb.table.settings.ExplainDataQuerySettings;
43+
import tech.ydb.table.settings.PrepareDataQuerySettings;
44+
import tech.ydb.table.transaction.TxControl;
45+
import tech.ydb.table.values.Type;
46+
47+
/**
48+
*
49+
* @author Aleksandr Gorshenin
50+
*/
51+
public class YdbCache {
52+
private final YdbContext ctx;
53+
protected final SessionRetryContext retryCtx;
54+
private final YdbQueryProperties queryOptions;
55+
56+
private final Cache<QueryKey, YdbQuery> queriesCache;
57+
private final Cache<String, QueryStat> statsCache;
58+
private final Cache<String, Map<String, Type>> queryParamsCache;
59+
protected final Cache<String, TableDescription> tableDescribeCache;
60+
61+
private final Supplier<String> version = Suppliers.memoizeWithExpiration(this::readVersion, 1, TimeUnit.HOURS);
62+
63+
public YdbCache(YdbContext ctx, YdbQueryProperties queryOptions, int cacheSize, boolean fullScanDetector) {
64+
this.ctx = ctx;
65+
this.retryCtx = SessionRetryContext.create(ctx.getTableClient()).idempotent(true).build();
66+
this.queryOptions = queryOptions;
67+
68+
if (cacheSize > 0) {
69+
queriesCache = CacheBuilder.newBuilder().maximumSize(cacheSize).build();
70+
queryParamsCache = CacheBuilder.newBuilder().maximumSize(cacheSize).build();
71+
tableDescribeCache = CacheBuilder.newBuilder().maximumSize(cacheSize).build();
72+
if (fullScanDetector) {
73+
statsCache = CacheBuilder.newBuilder().maximumSize(cacheSize).build();
74+
} else {
75+
statsCache = null;
76+
}
77+
} else {
78+
queriesCache = null;
79+
statsCache = null;
80+
queryParamsCache = null;
81+
tableDescribeCache = null;
82+
}
83+
}
84+
85+
String getDatabaseVersion() {
86+
return version.get();
87+
}
88+
89+
Cache<String, TableDescription> getTableDescriptionCache() {
90+
return tableDescribeCache;
91+
}
92+
93+
YdbQueryProperties getQueryOptions() {
94+
return this.queryOptions;
95+
}
96+
97+
public boolean queryStatsEnabled() {
98+
return statsCache != null;
99+
}
100+
101+
public void resetQueryStats() {
102+
if (statsCache != null) {
103+
statsCache.invalidateAll();
104+
}
105+
}
106+
107+
public void validate() throws SQLException {
108+
// nothing
109+
}
110+
111+
public Collection<QueryStat> getQueryStats() {
112+
if (statsCache == null) {
113+
return Collections.emptyList();
114+
}
115+
List<QueryStat> sorted = new ArrayList<>(statsCache.asMap().values());
116+
Collections.sort(sorted,
117+
Comparator
118+
.comparingLong(QueryStat::getUsageCounter).reversed()
119+
.thenComparing(QueryStat::getPreparedYQL)
120+
);
121+
return sorted;
122+
}
123+
124+
private String readVersion() {
125+
Result<DataQueryResult> res = retryCtx.supplyResult(
126+
s -> s.executeDataQuery("SELECT version();", TxControl.snapshotRo())
127+
).join();
128+
129+
if (res.isSuccess()) {
130+
ResultSetReader rs = res.getValue().getResultSet(0);
131+
if (rs.next()) {
132+
return rs.getColumn(0).getBytesAsString(StandardCharsets.UTF_8);
133+
}
134+
}
135+
return "unknown";
136+
}
137+
138+
public void traceQuery(YdbQuery query, String yql) {
139+
if (statsCache == null) {
140+
return;
141+
}
142+
143+
QueryStat stat = statsCache.getIfPresent(yql);
144+
if (stat == null) {
145+
final ExplainDataQuerySettings settings = ctx.withDefaultTimeout(new ExplainDataQuerySettings());
146+
Result<ExplainDataQueryResult> res = retryCtx.supplyResult(
147+
session -> session.explainDataQuery(yql, settings)
148+
).join();
149+
150+
if (res.isSuccess()) {
151+
ExplainDataQueryResult exp = res.getValue();
152+
stat = new QueryStat(query.getOriginQuery(), yql, exp.getQueryAst(), exp.getQueryPlan());
153+
} else {
154+
stat = new QueryStat(query.getOriginQuery(), yql, res.getStatus());
155+
}
156+
157+
statsCache.put(yql, stat);
158+
}
159+
160+
stat.incrementUsage();
161+
}
162+
163+
public YdbQuery parseYdbQuery(QueryKey key) throws SQLException {
164+
if (queriesCache == null) {
165+
return YdbQuery.parseQuery(key, queryOptions, ctx.getTypes());
166+
}
167+
168+
YdbQuery cached = queriesCache.getIfPresent(key);
169+
if (cached == null) {
170+
cached = YdbQuery.parseQuery(key, queryOptions, ctx.getTypes());
171+
queriesCache.put(key, cached);
172+
}
173+
174+
return cached;
175+
}
176+
177+
public YdbPreparedQuery prepareYdbQuery(YdbQuery query, YdbPrepareMode mode) throws SQLException {
178+
if (statsCache != null) {
179+
if (QueryStat.isPrint(query.getOriginQuery()) || QueryStat.isReset(query.getOriginQuery())) {
180+
return new InMemoryQuery(query, queryOptions.isDeclareJdbcParameters());
181+
}
182+
}
183+
184+
QueryType type = query.getType();
185+
YqlBatcher batcher = query.getYqlBatcher();
186+
187+
if (type == QueryType.BULK_QUERY) {
188+
if (batcher == null || batcher.getCommand() != YqlBatcher.Cmd.UPSERT) {
189+
throw new SQLException(YdbConst.BULKS_UNSUPPORTED);
190+
}
191+
}
192+
193+
if (type == QueryType.EXPLAIN_QUERY || type == QueryType.SCHEME_QUERY ||
194+
!queryOptions.isPrepareDataQueries() || mode == YdbPrepareMode.IN_MEMORY) {
195+
return new InMemoryQuery(query, queryOptions.isDeclareJdbcParameters());
196+
}
197+
198+
if (batcher != null && (mode == YdbPrepareMode.AUTO || type == QueryType.BULK_QUERY)) {
199+
YdbPreparedQuery batched = createBatchQuery(query, batcher);
200+
if (batched != null) {
201+
return batched;
202+
}
203+
}
204+
205+
if (!query.isPlainYQL()) {
206+
return new InMemoryQuery(query, queryOptions.isDeclareJdbcParameters());
207+
}
208+
209+
// try to prepare data query
210+
Map<String, Type> queryTypes = queryParamsCache.getIfPresent(query.getOriginQuery());
211+
if (queryTypes == null) {
212+
String yql = ctx.getPrefixPragma() + query.getPreparedYql();
213+
YdbTracer tracer = ctx.getTracer();
214+
tracer.trace("--> prepare data query");
215+
tracer.trace(yql);
216+
217+
PrepareDataQuerySettings settings = ctx.withDefaultTimeout(new PrepareDataQuerySettings());
218+
Result<DataQuery> result = retryCtx.supplyResult(
219+
session -> session.prepareDataQuery(yql, settings)
220+
).join();
221+
222+
tracer.trace("<-- " + result.getStatus());
223+
if (!result.isSuccess()) {
224+
tracer.close();
225+
throw ExceptionFactory.createException("Cannot prepare data query: " + result.getStatus(),
226+
new UnexpectedResultException("Unexpected status", result.getStatus()));
227+
}
228+
229+
queryTypes = result.getValue().types();
230+
queryParamsCache.put(query.getOriginQuery(), queryTypes);
231+
}
232+
233+
boolean requireBatch = mode == YdbPrepareMode.DATA_QUERY_BATCH;
234+
if (requireBatch || (mode == YdbPrepareMode.AUTO && queryOptions.isDetectBatchQueries())) {
235+
BatchedQuery params = BatchedQuery.tryCreateBatched(ctx.getTypes(), query, queryTypes);
236+
if (params != null) {
237+
return params;
238+
}
239+
240+
if (requireBatch) {
241+
throw new SQLDataException(YdbConst.STATEMENT_IS_NOT_A_BATCH + query.getOriginQuery());
242+
}
243+
}
244+
return new PreparedQuery(ctx.getTypes(), query, queryTypes);
245+
}
246+
247+
private YdbPreparedQuery createBatchQuery(YdbQuery query, YqlBatcher batcher) throws SQLException {
248+
String tablePath = YdbContext.joined(ctx.getPrefixPath(), batcher.getTableName());
249+
Result<TableDescription> description = describeTable(tablePath);
250+
251+
if (query.getType() == QueryType.BULK_QUERY) {
252+
if (query.getReturning() != null) {
253+
throw new SQLException(YdbConst.BULK_NOT_SUPPORT_RETURNING);
254+
}
255+
if (!description.isSuccess()) {
256+
throw new SQLException(YdbConst.BULK_DESCRIBE_ERROR + description.getStatus());
257+
}
258+
return BulkUpsertQuery.build(ctx.getTypes(), tablePath, batcher.getColumns(), description.getValue());
259+
}
260+
261+
if (description.isSuccess()) {
262+
BatchedQuery params = BatchedQuery.createAutoBatched(ctx.getTypes(), query, description.getValue());
263+
if (params != null) {
264+
return params;
265+
}
266+
}
267+
268+
return null;
269+
}
270+
271+
private Result<TableDescription> describeTable(String tablePath) {
272+
TableDescription cached = tableDescribeCache.getIfPresent(tablePath);
273+
if (cached != null) {
274+
return Result.success(cached);
275+
}
276+
277+
YdbTracer tracer = ctx.getTracer();
278+
tracer.trace("--> describe table");
279+
tracer.trace(tablePath);
280+
281+
DescribeTableSettings settings = ctx.withDefaultTimeout(new DescribeTableSettings());
282+
Result<TableDescription> result = retryCtx.supplyResult(session -> session.describeTable(tablePath, settings))
283+
.join();
284+
285+
tracer.trace("<-- " + result.getStatus());
286+
287+
if (result.isSuccess()) {
288+
tableDescribeCache.put(tablePath, result.getValue());
289+
}
290+
291+
return result;
292+
}
293+
}

0 commit comments

Comments
 (0)