Skip to content

Commit 181b2e1

Browse files
committed
Added QueryRewrite option
1 parent 7c3358f commit 181b2e1

File tree

8 files changed

+410
-7
lines changed

8 files changed

+410
-7
lines changed

jdbc/src/main/java/tech/ydb/jdbc/context/YdbCache.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,13 @@
5050
*/
5151
public class YdbCache {
5252
private final YdbContext ctx;
53-
private final SessionRetryContext retryCtx;
53+
protected final SessionRetryContext retryCtx;
5454
private final YdbQueryProperties queryOptions;
5555

5656
private final Cache<QueryKey, YdbQuery> queriesCache;
5757
private final Cache<String, QueryStat> statsCache;
5858
private final Cache<String, Map<String, Type>> queryParamsCache;
59-
private final Cache<String, TableDescription> tableDescribeCache;
59+
protected final Cache<String, TableDescription> tableDescribeCache;
6060

6161
private final Supplier<String> version = Suppliers.memoizeWithExpiration(this::readVersion, 1, TimeUnit.HOURS);
6262

@@ -104,6 +104,10 @@ public void resetQueryStats() {
104104
}
105105
}
106106

107+
public void validate() throws SQLException {
108+
// nothing
109+
}
110+
107111
public Collection<QueryStat> getQueryStats() {
108112
if (statsCache == null) {
109113
return Collections.emptyList();

jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,16 @@ private YdbContext(
8686
}
8787

8888
this.types = new YdbTypes(operationProperties.getForceNewDatetypes());
89-
this.cache = new YdbCache(this, queryProperties, config.getPreparedStatementsCachecSize(),
90-
config.isFullScanDetectorEnabled());
89+
90+
String queryRewriteTable = operationOptions.getQueryRewriteTable();
91+
if (queryRewriteTable != null && !queryRewriteTable.isEmpty()) {
92+
String tablePath = joined(prefixPath, queryRewriteTable);
93+
this.cache = new YdbQueryRewriteCache(this, tablePath, operationOptions.getQueryRewriteTtl(),
94+
queryProperties, config.getPreparedStatementsCachecSize(), config.isFullScanDetectorEnabled());
95+
} else {
96+
this.cache = new YdbCache(this,
97+
queryProperties, config.getPreparedStatementsCachecSize(), config.isFullScanDetectorEnabled());
98+
}
9199
}
92100

93101
public YdbTypes getTypes() {
@@ -139,6 +147,7 @@ public String getUsername() {
139147
}
140148

141149
public YdbExecutor createExecutor() throws SQLException {
150+
cache.validate();
142151
if (config.isUseQueryService()) {
143152
String txValidationTable = operationOptions.getTxValidationTable();
144153
if (txValidationTable != null && !txValidationTable.isEmpty()) {
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
package tech.ydb.jdbc.context;
2+
3+
import java.sql.SQLException;
4+
import java.time.Duration;
5+
import java.time.Instant;
6+
import java.util.concurrent.atomic.AtomicReference;
7+
import java.util.concurrent.locks.ReentrantLock;
8+
import java.util.logging.Level;
9+
import java.util.logging.Logger;
10+
11+
import com.google.common.cache.Cache;
12+
import com.google.common.cache.CacheBuilder;
13+
import com.google.common.hash.Hashing;
14+
15+
import tech.ydb.core.Result;
16+
import tech.ydb.core.Status;
17+
import tech.ydb.core.StatusCode;
18+
import tech.ydb.core.UnexpectedResultException;
19+
import tech.ydb.jdbc.exception.ExceptionFactory;
20+
import tech.ydb.jdbc.query.QueryKey;
21+
import tech.ydb.jdbc.query.YdbQuery;
22+
import tech.ydb.jdbc.settings.YdbQueryProperties;
23+
import tech.ydb.table.description.TableDescription;
24+
import tech.ydb.table.query.DataQueryResult;
25+
import tech.ydb.table.query.Params;
26+
import tech.ydb.table.result.ResultSetReader;
27+
import tech.ydb.table.transaction.TxControl;
28+
import tech.ydb.table.values.PrimitiveValue;
29+
30+
/**
31+
*
32+
* @author Aleksandr Gorshenin
33+
*/
34+
public class YdbQueryRewriteCache extends YdbCache {
35+
private static final Logger LOGGER = Logger.getLogger(YdbQueryRewriteCache.class.getName());
36+
private static final ReentrantLock VALIDATE_LOCK = new ReentrantLock();
37+
38+
private static final String CREATE_SQL = ""
39+
+ "CREATE TABLE IF NOT EXISTS `%s` ("
40+
+ " hash Text NOT NULL,"
41+
+ " query Text NOT NULL,"
42+
+ " rewritten Text,"
43+
+ " used_at Timestamp,"
44+
+ " PRIMARY KEY (hash)"
45+
+ ") WITH ("
46+
+ " AUTO_PARTITIONING_BY_LOAD=ENABLED,"
47+
+ " AUTO_PARTITIONING_BY_SIZE=ENABLED,"
48+
+ " AUTO_PARTITIONING_PARTITION_SIZE_MB=100"
49+
+ ");";
50+
51+
private static final String UPDATE_SQL = ""
52+
+ "DECLARE $h AS Text; "
53+
+ "DECLARE $q AS Text; "
54+
+ "UPSERT INTO `%s` (hash, query, used_at) VALUES ($h, $q, CurrentUtcTimestamp()) RETURNING rewritten;";
55+
56+
private final String rewriteTable;
57+
private final Duration rewriteTtl;
58+
private final Cache<QueryKey, CachedQuery> rewriteCache;
59+
60+
public YdbQueryRewriteCache(YdbContext ctx, String tableName, Duration ttl, YdbQueryProperties options,
61+
int cacheSize, boolean fullScanDetector) {
62+
super(ctx, options, cacheSize, fullScanDetector);
63+
this.rewriteTable = tableName;
64+
this.rewriteTtl = ttl;
65+
this.rewriteCache = CacheBuilder.newBuilder().maximumSize(cacheSize).build();
66+
}
67+
68+
@Override
69+
public YdbQuery parseYdbQuery(QueryKey key) throws SQLException {
70+
CachedQuery cached = rewriteCache.getIfPresent(key);
71+
if (cached == null) {
72+
cached = new CachedQuery(key);
73+
rewriteCache.put(key, cached);
74+
}
75+
76+
return super.parseYdbQuery(cached.update(key));
77+
}
78+
79+
@Override
80+
public void validate() throws SQLException {
81+
if (tableDescribeCache.getIfPresent(rewriteTable) != null) {
82+
return;
83+
}
84+
85+
VALIDATE_LOCK.lock();
86+
try {
87+
LOGGER.log(Level.INFO, "Validate QueryRewrite {0}", rewriteTable);
88+
if (tableDescribeCache.getIfPresent(rewriteTable) != null) {
89+
return;
90+
}
91+
92+
// validate table name
93+
Result<TableDescription> res = retryCtx.supplyResult(s -> s.describeTable(rewriteTable)).join();
94+
LOGGER.log(Level.INFO, "Describe QueryRewrite {0} -> {1}", new Object[] {rewriteTable, res.getStatus()});
95+
if (res.isSuccess()) {
96+
tableDescribeCache.put(rewriteTable, res.getValue());
97+
return;
98+
}
99+
100+
if (res.getStatus().getCode() != StatusCode.SCHEME_ERROR) {
101+
throw ExceptionFactory.createException("Cannot initialize executor with rewrite table " + rewriteTable,
102+
new UnexpectedResultException("Cannot describe", res.getStatus()));
103+
}
104+
105+
// Try to create a table
106+
String query = String.format(CREATE_SQL, rewriteTable);
107+
Status status = retryCtx.supplyStatus(session -> session.executeSchemeQuery(query)).join();
108+
LOGGER.log(Level.INFO, "Create rewrite table {0} -> {1}", new Object[] {rewriteTable, status});
109+
if (!status.isSuccess()) {
110+
throw ExceptionFactory.createException("Cannot initialize executor with rewrite table " + rewriteTable,
111+
new UnexpectedResultException("Cannot create table", status));
112+
}
113+
114+
Result<TableDescription> res2 = retryCtx.supplyResult(s -> s.describeTable(rewriteTable)).join();
115+
LOGGER.log(Level.INFO, "Validate rewrite table {0} -> {1}", new Object[] {rewriteTable, res2.getStatus()});
116+
if (!res2.isSuccess()) {
117+
throw ExceptionFactory.createException("Cannot initialize executor with rewrite table " + rewriteTable,
118+
new UnexpectedResultException("Cannot describe after creating", res2.getStatus()));
119+
}
120+
121+
tableDescribeCache.put(rewriteTable, res2.getValue());
122+
} finally {
123+
VALIDATE_LOCK.unlock();
124+
}
125+
}
126+
127+
private class CachedQuery {
128+
private final String hash;
129+
private final String query;
130+
private final AtomicReference<QueryKey> rewritten;
131+
private final AtomicReference<Instant> ttl;
132+
133+
CachedQuery(QueryKey origin) {
134+
this.query = origin.getReturning() != null ? origin.getQuery() + origin.getReturning() : origin.getQuery();
135+
this.hash = Hashing.sha256().hashBytes(query.getBytes()).toString();
136+
this.rewritten = new AtomicReference<>();
137+
this.ttl = new AtomicReference<>(Instant.MIN);
138+
}
139+
140+
public QueryKey update(QueryKey origin) {
141+
Instant now = Instant.now();
142+
Instant localTtl = ttl.get();
143+
while (localTtl.isBefore(now)) {
144+
if (ttl.compareAndSet(localTtl, now.plus(rewriteTtl))) {
145+
Params params = Params.of(
146+
"$h", PrimitiveValue.newText(hash),
147+
"$q", PrimitiveValue.newText(query)
148+
);
149+
String updateQuery = String.format(UPDATE_SQL, rewriteTable);
150+
Result<DataQueryResult> res = retryCtx.supplyResult(
151+
session -> session.executeDataQuery(updateQuery, TxControl.serializableRw(), params)
152+
).join();
153+
154+
if (res.isSuccess()) {
155+
ResultSetReader rs = res.getValue().getResultSet(0);
156+
if (rs.next() && rs.getColumn(0).isOptionalItemPresent()) {
157+
String rewrittenQuery = rs.getColumn(0).getText();
158+
rewritten.set(new QueryKey(rewrittenQuery));
159+
}
160+
} else {
161+
LOGGER.log(Level.WARNING, "Cannot read table {0} -> {1}", new Object[] {
162+
rewriteTable, res.getStatus()
163+
});
164+
}
165+
}
166+
localTtl = ttl.get();
167+
}
168+
169+
QueryKey local = rewritten.get();
170+
return local != null ? local : origin;
171+
}
172+
}
173+
}

jdbc/src/main/java/tech/ydb/jdbc/query/YdbQuery.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ public List<QueryStatement> getStatements() {
7272
public static YdbQuery parseQuery(QueryKey query, YdbQueryProperties opts, YdbTypes types) throws SQLException {
7373
YdbQueryParser parser = new YdbQueryParser(types, query, opts);
7474
String preparedYQL = parser.parseSQL();
75-
boolean writing = false;
7675

7776
QueryType type = null;
7877
YqlBatcher batcher = parser.getYqlBatcher();

jdbc/src/main/java/tech/ydb/jdbc/settings/YdbOperationProperties.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,12 @@ public class YdbOperationProperties {
6464
static final YdbProperty<String> TX_VALIDATION_TABLE = YdbProperty.string("withTxValidationTable",
6565
"Name of working table to store transactions to avoid UNDETERMINED errors");
6666

67+
static final YdbProperty<String> QUERY_REWRITE_TABLE = YdbProperty.string("withQueryRewriteTable",
68+
"Name of working table to hot replacemnt of queies");
69+
70+
static final YdbProperty<Duration> QUERY_REWRITE_TABLE_TTL = YdbProperty.duration("queryRewriteTtl",
71+
"Name of working table to hot replacemnt of queies", "300s");
72+
6773
private static final int MAX_ROWS = 1000; // TODO: how to figure out the max rows of current connection?
6874

6975
private final YdbValue<Duration> joinDuration;
@@ -82,6 +88,8 @@ public class YdbOperationProperties {
8288
private final YdbValue<Boolean> useStreamResultSets;
8389
private final YdbValue<Boolean> forceNewDatetypes;
8490
private final YdbValue<String> txValidationTable;
91+
private final YdbValue<String> queryRewriteTable;
92+
private final YdbValue<Duration> queryRewriteTTL;
8593

8694
public YdbOperationProperties(YdbConfig config) throws SQLException {
8795
Properties props = config.getProperties();
@@ -102,6 +110,8 @@ public YdbOperationProperties(YdbConfig config) throws SQLException {
102110
this.useStreamResultSets = USE_STREAM_RESULT_SETS.readValue(props);
103111
this.forceNewDatetypes = FORCE_NEW_DATETYPES.readValue(props);
104112
this.txValidationTable = TX_VALIDATION_TABLE.readValue(props);
113+
this.queryRewriteTable = QUERY_REWRITE_TABLE.readValue(props);
114+
this.queryRewriteTTL = QUERY_REWRITE_TABLE_TTL.readValue(props);
105115
}
106116

107117
public Duration getJoinDuration() {
@@ -163,4 +173,12 @@ public int getMaxRows() {
163173
public String getTxValidationTable() {
164174
return txValidationTable.getValue();
165175
}
176+
177+
public String getQueryRewriteTable() {
178+
return queryRewriteTable.getValue();
179+
}
180+
181+
public Duration getQueryRewriteTtl() {
182+
return queryRewriteTTL.getValue();
183+
}
166184
}

0 commit comments

Comments
 (0)