Skip to content

Commit 2a5d8bb

Browse files
committed
Added executeBulkQuery implementation to YdbConnection
1 parent 593704f commit 2a5d8bb

File tree

9 files changed

+103
-29
lines changed

9 files changed

+103
-29
lines changed

jdbc/src/main/java/tech/ydb/jdbc/YdbConnection.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import tech.ydb.jdbc.query.YdbQuery;
1313
import tech.ydb.table.query.Params;
1414
import tech.ydb.table.result.ResultSetReader;
15+
import tech.ydb.table.values.ListValue;
1516

1617
public interface YdbConnection extends Connection {
1718
/**
@@ -34,6 +35,16 @@ public interface YdbConnection extends Connection {
3435
*/
3536
void executeSchemeQuery(String yql, YdbValidator validator) throws SQLException;
3637

38+
/**
39+
* Explicitly execute bulk upsert to the table
40+
*
41+
* @param tablePath path to table
42+
* @param validator handler for logging and warnings
43+
* @param rows bulk rows
44+
* @throws SQLException if query cannot be executed
45+
*/
46+
void executeBulkUpsertQuery(String tablePath, YdbValidator validator, ListValue rows) throws SQLException;
47+
3748
/**
3849
* Explicitly execute query as a data query
3950
*

jdbc/src/main/java/tech/ydb/jdbc/YdbConst.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,7 @@ public final class YdbConst {
6565
public static final String PARAMETER_NOT_FOUND = "Parameter not found: ";
6666
public static final String PARAMETER_TYPE_UNKNOWN = "Unable to convert sqlType %s to YDB type for parameter: %s";
6767
public static final String INVALID_ROW = "Current row index is out of bounds: ";
68-
public static final String BATCH_UNSUPPORTED = "Batches are not supported in simple prepared statements";
69-
public static final String BATCH_INVALID = "Batches are not supported for query type: ";
68+
public static final String BULKS_UNSUPPORTED = "Bulk upserts are supported only in prepared statements";
7069
public static final String METADATA_RS_UNSUPPORTED_IN_PS = "ResultSet metadata is not supported " +
7170
"in prepared statements";
7271
public static final String CANNOT_UNWRAP_TO = "Cannot unwrap to ";
@@ -92,6 +91,8 @@ public final class YdbConst {
9291
+ "transaction. This behavior may be changed by property scanQueryTxMode";
9392
public static final String SCHEME_QUERY_INSIDE_TRANSACTION = "Scheme query cannot be executed inside active "
9493
+ "transaction. This behavior may be changed by property schemeQueryTxMode";
94+
public static final String BULK_QUERY_INSIDE_TRANSACTION = "Bulk upsert query cannot be executed inside active "
95+
+ "transaction. This behavior may be changed by property bulkUpsertQueryTxMode";
9596

9697
// Cast errors
9798

jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java

Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,49 +5,45 @@
55
import java.util.Collection;
66
import java.util.concurrent.LinkedBlockingQueue;
77

8-
import tech.ydb.core.Result;
9-
import tech.ydb.core.UnexpectedResultException;
10-
import tech.ydb.jdbc.exception.ExceptionFactory;
118
import tech.ydb.jdbc.query.QueryType;
129
import tech.ydb.jdbc.query.YdbQuery;
13-
import tech.ydb.table.Session;
14-
import tech.ydb.table.TableClient;
10+
import tech.ydb.table.SessionRetryContext;
1511
import tech.ydb.table.query.Params;
1612
import tech.ydb.table.result.ResultSetReader;
1713
import tech.ydb.table.result.impl.ProtoValueReaders;
1814
import tech.ydb.table.settings.ExecuteScanQuerySettings;
1915
import tech.ydb.table.settings.ExecuteSchemeQuerySettings;
16+
import tech.ydb.table.values.ListValue;
2017

2118
/**
2219
*
2320
* @author Aleksandr Gorshenin
2421
*/
2522
public abstract class BaseYdbExecutor implements YdbExecutor {
26-
private final Duration sessionTimeout;
27-
private final TableClient tableClient;
23+
private final SessionRetryContext retryCtx;
2824

2925
public BaseYdbExecutor(YdbContext ctx) {
30-
this.sessionTimeout = ctx.getOperationProperties().getSessionTimeout();
31-
this.tableClient = ctx.getTableClient();
32-
}
33-
34-
protected Session createNewTableSession(YdbValidator validator) throws SQLException {
35-
try {
36-
Result<Session> session = tableClient.createSession(sessionTimeout).join();
37-
validator.addStatusIssues(session.getStatus());
38-
return session.getValue();
39-
} catch (UnexpectedResultException ex) {
40-
throw ExceptionFactory.createException("Cannot create session with " + ex.getStatus(), ex);
41-
}
26+
this.retryCtx = ctx.getRetryCtx();
4227
}
4328

4429
@Override
4530
public void executeSchemeQuery(YdbContext ctx, YdbValidator validator, String yql) throws SQLException {
31+
ensureOpened();
32+
4633
// Scheme query does not affect transactions or result sets
4734
ExecuteSchemeQuerySettings settings = ctx.withDefaultTimeout(new ExecuteSchemeQuerySettings());
48-
try (Session session = createNewTableSession(validator)) {
49-
validator.execute(QueryType.SCHEME_QUERY + " >>\n" + yql, () -> session.executeSchemeQuery(yql, settings));
50-
}
35+
validator.execute(QueryType.SCHEME_QUERY + " >>\n" + yql,
36+
() -> retryCtx.supplyStatus(session -> session.executeSchemeQuery(yql, settings))
37+
);
38+
}
39+
40+
@Override
41+
public void executeBulkUpsert(YdbContext ctx, YdbValidator validator, String tablePath, ListValue rows)
42+
throws SQLException {
43+
ensureOpened();
44+
validator.execute(QueryType.BULK_QUERY + " >>\n" + tablePath,
45+
() -> retryCtx.supplyStatus(session -> session.executeBulkUpsert(tablePath, rows))
46+
);
5147
}
5248

5349
@Override
@@ -63,10 +59,12 @@ public ResultSetReader executeScanQuery(
6359
.build();
6460

6561
ctx.traceQuery(query, yql);
66-
try (Session session = createNewTableSession(validator)) {
67-
validator.execute(QueryType.SCAN_QUERY + " >>\n" + yql,
68-
() -> session.executeScanQuery(yql, params, settings).start(resultSets::add));
69-
}
62+
validator.execute(QueryType.SCAN_QUERY + " >>\n" + yql,
63+
() -> retryCtx.supplyStatus(session -> {
64+
resultSets.clear();
65+
return session.executeScanQuery(yql, params, settings).start(resultSets::add);
66+
})
67+
);
7068

7169
return ProtoValueReaders.forResultSets(resultSets);
7270
}

jdbc/src/main/java/tech/ydb/jdbc/context/SchemeExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public class SchemeExecutor {
2020

2121
public SchemeExecutor(YdbContext ctx) {
2222
this.schemeClient = ctx.getSchemeClient();
23-
this.retryCtx = SessionRetryContext.create(ctx.getTableClient()).build();
23+
this.retryCtx = ctx.getRetryCtx();
2424
}
2525

2626
public CompletableFuture<Result<ListDirectoryResult>> listDirectory(String path) {

jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,15 @@
77
import java.util.ArrayList;
88
import java.util.List;
99

10+
import tech.ydb.core.Result;
11+
import tech.ydb.core.UnexpectedResultException;
1012
import tech.ydb.jdbc.YdbConst;
13+
import tech.ydb.jdbc.exception.ExceptionFactory;
1114
import tech.ydb.jdbc.query.ExplainedQuery;
1215
import tech.ydb.jdbc.query.QueryType;
1316
import tech.ydb.jdbc.query.YdbQuery;
1417
import tech.ydb.table.Session;
18+
import tech.ydb.table.TableClient;
1519
import tech.ydb.table.query.DataQueryResult;
1620
import tech.ydb.table.query.ExplainDataQueryResult;
1721
import tech.ydb.table.query.Params;
@@ -28,10 +32,14 @@
2832
* @author Aleksandr Gorshenin
2933
*/
3034
public class TableServiceExecutor extends BaseYdbExecutor {
35+
private final Duration sessionTimeout;
36+
private final TableClient tableClient;
3137
private volatile TxState tx;
3238

3339
public TableServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCommit) throws SQLException {
3440
super(ctx);
41+
this.sessionTimeout = ctx.getOperationProperties().getSessionTimeout();
42+
this.tableClient = ctx.getTableClient();
3543
this.tx = createTx(transactionLevel, autoCommit);
3644
}
3745

@@ -47,6 +55,16 @@ private void updateState(TxState newTx) {
4755
this.tx = newTx;
4856
}
4957

58+
protected Session createNewTableSession(YdbValidator validator) throws SQLException {
59+
try {
60+
Result<Session> session = tableClient.createSession(sessionTimeout).join();
61+
validator.addStatusIssues(session.getStatus());
62+
return session.getValue();
63+
} catch (UnexpectedResultException ex) {
64+
throw ExceptionFactory.createException("Cannot create session with " + ex.getStatus(), ex);
65+
}
66+
}
67+
5068
@Override
5169
public void setTransactionLevel(int level) throws SQLException {
5270
updateState(tx.withTransactionLevel(level));

jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,10 @@ public QueryClient getQueryClient() {
144144
return queryClient;
145145
}
146146

147+
public SessionRetryContext getRetryCtx() {
148+
return retryCtx;
149+
}
150+
147151
public String getUrl() {
148152
return config.getUrl();
149153
}

jdbc/src/main/java/tech/ydb/jdbc/context/YdbExecutor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import tech.ydb.jdbc.query.YdbQuery;
99
import tech.ydb.table.query.Params;
1010
import tech.ydb.table.result.ResultSetReader;
11+
import tech.ydb.table.values.ListValue;
1112

1213
/**
1314
*
@@ -35,6 +36,9 @@ default void ensureOpened() throws SQLException {
3536

3637
void executeSchemeQuery(YdbContext ctx, YdbValidator validator, String yql) throws SQLException;
3738

39+
void executeBulkUpsert(YdbContext ctx, YdbValidator validator, String tablePath, ListValue rows)
40+
throws SQLException;
41+
3842
List<ResultSetReader> executeDataQuery(YdbContext ctx, YdbValidator validator, YdbQuery query, String yql,
3943
long timeout, boolean poolable, Params params) throws SQLException;
4044

jdbc/src/main/java/tech/ydb/jdbc/impl/YdbConnectionImpl.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import tech.ydb.jdbc.settings.YdbOperationProperties;
3838
import tech.ydb.table.query.Params;
3939
import tech.ydb.table.result.ResultSetReader;
40+
import tech.ydb.table.values.ListValue;
4041

4142
public class YdbConnectionImpl implements YdbConnection {
4243
private static final Logger LOGGER = Logger.getLogger(YdbConnectionImpl.class.getName());
@@ -46,13 +47,15 @@ public class YdbConnectionImpl implements YdbConnection {
4647
private final YdbExecutor executor;
4748
private final FakeTxMode scanQueryTxMode;
4849
private final FakeTxMode schemeQueryTxMode;
50+
private final FakeTxMode bulkQueryTxMode;
4951

5052
public YdbConnectionImpl(YdbContext context) throws SQLException {
5153
this.ctx = context;
5254

5355
YdbOperationProperties props = ctx.getOperationProperties();
5456
this.scanQueryTxMode = props.getScanQueryTxMode();
5557
this.schemeQueryTxMode = props.getSchemeQueryTxMode();
58+
this.bulkQueryTxMode = props.getBulkQueryTxMode();
5659

5760
this.validator = new YdbValidator(LOGGER);
5861
this.executor = ctx.createExecutor();
@@ -206,6 +209,7 @@ public void executeSchemeQuery(String yql, YdbValidator validator) throws SQLExc
206209
@Override
207210
public List<ResultSetReader> executeDataQuery(YdbQuery query, String yql, YdbValidator validator,
208211
int timeout, boolean poolable, Params params) throws SQLException {
212+
executor.ensureOpened();
209213
return executor.executeDataQuery(ctx, validator, query, yql, timeout, poolable, params);
210214
}
211215

@@ -230,8 +234,29 @@ public ResultSetReader executeScanQuery(YdbQuery query, String yql, YdbValidator
230234
return executor.executeScanQuery(ctx, validator, query, yql, params);
231235
}
232236

237+
@Override
238+
public void executeBulkUpsertQuery(String tablePath, YdbValidator validator, ListValue rows) throws SQLException {
239+
executor.ensureOpened();
240+
241+
if (executor.isInsideTransaction()) {
242+
switch (bulkQueryTxMode) {
243+
case FAKE_TX:
244+
break;
245+
case SHADOW_COMMIT:
246+
commit();
247+
break;
248+
case ERROR:
249+
default:
250+
throw new SQLException(YdbConst.BULK_QUERY_INSIDE_TRANSACTION);
251+
}
252+
}
253+
254+
executor.executeBulkUpsert(ctx, validator, tablePath, rows);
255+
}
256+
233257
@Override
234258
public ExplainedQuery executeExplainQuery(String yql, YdbValidator validator) throws SQLException {
259+
executor.ensureOpened();
235260
return executor.executeExplainQuery(ctx, validator, yql);
236261
}
237262

jdbc/src/main/java/tech/ydb/jdbc/settings/YdbOperationProperties.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,13 @@ public class YdbOperationProperties {
4646
FakeTxMode.ERROR
4747
);
4848

49+
static final YdbProperty<FakeTxMode> BULK_QUERY_TX_MODE = YdbProperty.enums("bulkUpsertQueryTxMode",
50+
FakeTxMode.class,
51+
"Mode of execution bulk upsert query inside transaction. Possible values - "
52+
+ "ERROR(by default), FAKE_TX and SHADOW_COMMIT",
53+
FakeTxMode.ERROR
54+
);
55+
4956
private static final int MAX_ROWS = 1000; // TODO: how to figure out the max rows of current connection?
5057

5158
private final YdbValue<Duration> joinDuration;
@@ -59,6 +66,7 @@ public class YdbOperationProperties {
5966

6067
private final YdbValue<FakeTxMode> scanQueryTxMode;
6168
private final YdbValue<FakeTxMode> schemeQueryTxMode;
69+
private final YdbValue<FakeTxMode> bulkQueryTxMode;
6270

6371
public YdbOperationProperties(YdbConfig config) throws SQLException {
6472
Properties props = config.getProperties();
@@ -74,6 +82,7 @@ public YdbOperationProperties(YdbConfig config) throws SQLException {
7482

7583
this.scanQueryTxMode = SCAN_QUERY_TX_MODE.readValue(props);
7684
this.schemeQueryTxMode = SCHEME_QUERY_TX_MODE.readValue(props);
85+
this.bulkQueryTxMode = BULK_QUERY_TX_MODE.readValue(props);
7786
}
7887

7988
public Duration getJoinDuration() {
@@ -100,6 +109,10 @@ public FakeTxMode getSchemeQueryTxMode() {
100109
return schemeQueryTxMode.getValue();
101110
}
102111

112+
public FakeTxMode getBulkQueryTxMode() {
113+
return bulkQueryTxMode.getValue();
114+
}
115+
103116
public Duration getSessionTimeout() {
104117
return sessionTimeout.getValue();
105118
}

0 commit comments

Comments
 (0)