Skip to content

Commit c698ab4

Browse files
authored
Support of BulkUpserts (#64)
2 parents ae07138 + 94880cc commit c698ab4

23 files changed

+580
-175
lines changed

jdbc/src/main/java/tech/ydb/jdbc/YdbConnection.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import tech.ydb.jdbc.query.YdbQuery;
1313
import tech.ydb.table.query.Params;
1414
import tech.ydb.table.result.ResultSetReader;
15+
import tech.ydb.table.values.ListValue;
1516

1617
public interface YdbConnection extends Connection {
1718
/**
@@ -34,6 +35,18 @@ public interface YdbConnection extends Connection {
3435
*/
3536
void executeSchemeQuery(String yql, YdbValidator validator) throws SQLException;
3637

38+
/**
39+
* Explicitly execute bulk upsert to the table
40+
*
41+
* @param yql description of request
42+
* @param tablePath path to table
43+
* @param validator handler for logging and warnings
44+
* @param rows bulk rows
45+
* @throws SQLException if query cannot be executed
46+
*/
47+
void executeBulkUpsertQuery(String yql, String tablePath, YdbValidator validator, ListValue rows)
48+
throws SQLException;
49+
3750
/**
3851
* Explicitly execute query as a data query
3952
*

jdbc/src/main/java/tech/ydb/jdbc/YdbConst.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,9 @@ public final class YdbConst {
6565
public static final String PARAMETER_NOT_FOUND = "Parameter not found: ";
6666
public static final String PARAMETER_TYPE_UNKNOWN = "Unable to convert sqlType %s to YDB type for parameter: %s";
6767
public static final String INVALID_ROW = "Current row index is out of bounds: ";
68-
public static final String BATCH_UNSUPPORTED = "Batches are not supported in simple prepared statements";
69-
public static final String BATCH_INVALID = "Batches are not supported for query type: ";
68+
public static final String BULKS_UNSUPPORTED = "BULK mode is available only for prepared statement with one UPSERT";
69+
public static final String INVALID_BATCH_COLUMN = "Cannot prepared batch request: cannot find a column";
70+
public static final String BULKS_DESCRIBE_ERROR = "Cannot parse BULK upsert: ";
7071
public static final String METADATA_RS_UNSUPPORTED_IN_PS = "ResultSet metadata is not supported " +
7172
"in prepared statements";
7273
public static final String CANNOT_UNWRAP_TO = "Cannot unwrap to ";
@@ -92,6 +93,8 @@ public final class YdbConst {
9293
+ "transaction. This behavior may be changed by property scanQueryTxMode";
9394
public static final String SCHEME_QUERY_INSIDE_TRANSACTION = "Scheme query cannot be executed inside active "
9495
+ "transaction. This behavior may be changed by property schemeQueryTxMode";
96+
public static final String BULK_QUERY_INSIDE_TRANSACTION = "Bulk upsert query cannot be executed inside active "
97+
+ "transaction. This behavior may be changed by property bulkUpsertQueryTxMode";
9598

9699
// Cast errors
97100

jdbc/src/main/java/tech/ydb/jdbc/common/MappingSetters.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,12 @@ private static PrimitiveValue castAsUuid(PrimitiveType type, Object x) throws SQ
241241
private static byte castAsByte(PrimitiveType type, Object x) throws SQLException {
242242
if (x instanceof Byte) {
243243
return (Byte) x;
244+
} else if (x instanceof Short) {
245+
return ((Short) x).byteValue();
246+
} else if (x instanceof Integer) {
247+
return ((Integer) x).byteValue();
248+
} else if (x instanceof Long) {
249+
return ((Long) x).byteValue();
244250
} else if (x instanceof Boolean) {
245251
return (byte) (((Boolean) x) ? 1 : 0);
246252
}
@@ -252,6 +258,10 @@ private static short castAsShort(PrimitiveType type, Object x) throws SQLExcepti
252258
return (Short) x;
253259
} else if (x instanceof Byte) {
254260
return (Byte) x;
261+
} else if (x instanceof Integer) {
262+
return ((Integer) x).shortValue();
263+
} else if (x instanceof Long) {
264+
return ((Long) x).shortValue();
255265
} else if (x instanceof Boolean) {
256266
return (short) (((Boolean) x) ? 1 : 0);
257267
}

jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java

Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,49 +5,45 @@
55
import java.util.Collection;
66
import java.util.concurrent.LinkedBlockingQueue;
77

8-
import tech.ydb.core.Result;
9-
import tech.ydb.core.UnexpectedResultException;
10-
import tech.ydb.jdbc.exception.ExceptionFactory;
118
import tech.ydb.jdbc.query.QueryType;
129
import tech.ydb.jdbc.query.YdbQuery;
13-
import tech.ydb.table.Session;
14-
import tech.ydb.table.TableClient;
10+
import tech.ydb.table.SessionRetryContext;
1511
import tech.ydb.table.query.Params;
1612
import tech.ydb.table.result.ResultSetReader;
1713
import tech.ydb.table.result.impl.ProtoValueReaders;
1814
import tech.ydb.table.settings.ExecuteScanQuerySettings;
1915
import tech.ydb.table.settings.ExecuteSchemeQuerySettings;
16+
import tech.ydb.table.values.ListValue;
2017

2118
/**
2219
*
2320
* @author Aleksandr Gorshenin
2421
*/
2522
public abstract class BaseYdbExecutor implements YdbExecutor {
26-
private final Duration sessionTimeout;
27-
private final TableClient tableClient;
23+
private final SessionRetryContext retryCtx;
2824

2925
public BaseYdbExecutor(YdbContext ctx) {
30-
this.sessionTimeout = ctx.getOperationProperties().getSessionTimeout();
31-
this.tableClient = ctx.getTableClient();
32-
}
33-
34-
protected Session createNewTableSession(YdbValidator validator) throws SQLException {
35-
try {
36-
Result<Session> session = tableClient.createSession(sessionTimeout).join();
37-
validator.addStatusIssues(session.getStatus());
38-
return session.getValue();
39-
} catch (UnexpectedResultException ex) {
40-
throw ExceptionFactory.createException("Cannot create session with " + ex.getStatus(), ex);
41-
}
26+
this.retryCtx = ctx.getRetryCtx();
4227
}
4328

4429
@Override
4530
public void executeSchemeQuery(YdbContext ctx, YdbValidator validator, String yql) throws SQLException {
31+
ensureOpened();
32+
4633
// Scheme query does not affect transactions or result sets
4734
ExecuteSchemeQuerySettings settings = ctx.withDefaultTimeout(new ExecuteSchemeQuerySettings());
48-
try (Session session = createNewTableSession(validator)) {
49-
validator.execute(QueryType.SCHEME_QUERY + " >>\n" + yql, () -> session.executeSchemeQuery(yql, settings));
50-
}
35+
validator.execute(QueryType.SCHEME_QUERY + " >>\n" + yql,
36+
() -> retryCtx.supplyStatus(session -> session.executeSchemeQuery(yql, settings))
37+
);
38+
}
39+
40+
@Override
41+
public void executeBulkUpsert(YdbContext ctx, YdbValidator validator, String yql, String tablePath, ListValue rows)
42+
throws SQLException {
43+
ensureOpened();
44+
validator.execute(QueryType.BULK_QUERY + " >>\n" + yql,
45+
() -> retryCtx.supplyStatus(session -> session.executeBulkUpsert(tablePath, rows))
46+
);
5147
}
5248

5349
@Override
@@ -63,10 +59,12 @@ public ResultSetReader executeScanQuery(
6359
.build();
6460

6561
ctx.traceQuery(query, yql);
66-
try (Session session = createNewTableSession(validator)) {
67-
validator.execute(QueryType.SCAN_QUERY + " >>\n" + yql,
68-
() -> session.executeScanQuery(yql, params, settings).start(resultSets::add));
69-
}
62+
validator.execute(QueryType.SCAN_QUERY + " >>\n" + yql,
63+
() -> retryCtx.supplyStatus(session -> {
64+
resultSets.clear();
65+
return session.executeScanQuery(yql, params, settings).start(resultSets::add);
66+
})
67+
);
7068

7169
return ProtoValueReaders.forResultSets(resultSets);
7270
}

jdbc/src/main/java/tech/ydb/jdbc/context/SchemeExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public class SchemeExecutor {
2020

2121
public SchemeExecutor(YdbContext ctx) {
2222
this.schemeClient = ctx.getSchemeClient();
23-
this.retryCtx = SessionRetryContext.create(ctx.getTableClient()).build();
23+
this.retryCtx = ctx.getRetryCtx();
2424
}
2525

2626
public CompletableFuture<Result<ListDirectoryResult>> listDirectory(String path) {

jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,15 @@
77
import java.util.ArrayList;
88
import java.util.List;
99

10+
import tech.ydb.core.Result;
11+
import tech.ydb.core.UnexpectedResultException;
1012
import tech.ydb.jdbc.YdbConst;
13+
import tech.ydb.jdbc.exception.ExceptionFactory;
1114
import tech.ydb.jdbc.query.ExplainedQuery;
1215
import tech.ydb.jdbc.query.QueryType;
1316
import tech.ydb.jdbc.query.YdbQuery;
1417
import tech.ydb.table.Session;
18+
import tech.ydb.table.TableClient;
1519
import tech.ydb.table.query.DataQueryResult;
1620
import tech.ydb.table.query.ExplainDataQueryResult;
1721
import tech.ydb.table.query.Params;
@@ -28,10 +32,14 @@
2832
* @author Aleksandr Gorshenin
2933
*/
3034
public class TableServiceExecutor extends BaseYdbExecutor {
35+
private final Duration sessionTimeout;
36+
private final TableClient tableClient;
3137
private volatile TxState tx;
3238

3339
public TableServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCommit) throws SQLException {
3440
super(ctx);
41+
this.sessionTimeout = ctx.getOperationProperties().getSessionTimeout();
42+
this.tableClient = ctx.getTableClient();
3543
this.tx = createTx(transactionLevel, autoCommit);
3644
}
3745

@@ -47,6 +55,16 @@ private void updateState(TxState newTx) {
4755
this.tx = newTx;
4856
}
4957

58+
protected Session createNewTableSession(YdbValidator validator) throws SQLException {
59+
try {
60+
Result<Session> session = tableClient.createSession(sessionTimeout).join();
61+
validator.addStatusIssues(session.getStatus());
62+
return session.getValue();
63+
} catch (UnexpectedResultException ex) {
64+
throw ExceptionFactory.createException("Cannot create session with " + ex.getStatus(), ex);
65+
}
66+
}
67+
5068
@Override
5169
public void setTransactionLevel(int level) throws SQLException {
5270
updateState(tx.withTransactionLevel(level));

jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import tech.ydb.jdbc.query.YdbPreparedQuery;
3131
import tech.ydb.jdbc.query.YdbQuery;
3232
import tech.ydb.jdbc.query.params.BatchedQuery;
33+
import tech.ydb.jdbc.query.params.BulkUpsertQuery;
3334
import tech.ydb.jdbc.query.params.InMemoryQuery;
3435
import tech.ydb.jdbc.query.params.PreparedQuery;
3536
import tech.ydb.jdbc.settings.YdbClientProperties;
@@ -144,6 +145,10 @@ public QueryClient getQueryClient() {
144145
return queryClient;
145146
}
146147

148+
public SessionRetryContext getRetryCtx() {
149+
return retryCtx;
150+
}
151+
147152
public String getUrl() {
148153
return config.getUrl();
149154
}
@@ -346,16 +351,23 @@ public YdbPreparedQuery findOrPrepareParams(YdbQuery query, YdbPrepareMode mode)
346351
}
347352
}
348353

349-
if (query.getType() == QueryType.EXPLAIN_QUERY || query.getType() == QueryType.SCHEME_QUERY) {
354+
QueryType type = query.getType();
355+
356+
if (type == QueryType.BULK_QUERY) {
357+
if (query.getYqlBatcher() == null || query.getYqlBatcher().isInsert()) {
358+
throw new SQLException(YdbConst.BULKS_UNSUPPORTED);
359+
}
360+
}
361+
362+
if (type == QueryType.EXPLAIN_QUERY || type == QueryType.SCHEME_QUERY) {
350363
return new InMemoryQuery(query, queryOptions.isDeclareJdbcParameters());
351364
}
352365

353-
if (query.getYqlBatcher() != null && mode == YdbPrepareMode.AUTO) {
366+
if (query.getYqlBatcher() != null && (mode == YdbPrepareMode.AUTO || type == QueryType.BULK_QUERY)) {
367+
String tableName = query.getYqlBatcher().getTableName();
368+
String tablePath = tableName.startsWith("/") ? tableName : getDatabase() + "/" + tableName;
354369
Map<String, Type> types = queryParamsCache.getIfPresent(query.getOriginQuery());
355370
if (types == null) {
356-
String tableName = query.getYqlBatcher().getTableName();
357-
String tablePath = tableName.startsWith("/") ? tableName : getDatabase() + "/" + tableName;
358-
359371
DescribeTableSettings settings = withDefaultTimeout(new DescribeTableSettings());
360372
Result<TableDescription> result = retryCtx.supplyResult(
361373
session -> session.describeTable(tablePath, settings)
@@ -366,8 +378,16 @@ public YdbPreparedQuery findOrPrepareParams(YdbQuery query, YdbPrepareMode mode)
366378
types = descrtiption.getColumns().stream()
367379
.collect(Collectors.toMap(TableColumn::getName, TableColumn::getType));
368380
queryParamsCache.put(query.getOriginQuery(), types);
381+
} else {
382+
if (type == QueryType.BULK_QUERY) {
383+
throw new SQLException(YdbConst.BULKS_DESCRIBE_ERROR + result.getStatus());
384+
}
369385
}
370386
}
387+
if (type == QueryType.BULK_QUERY) {
388+
return BulkUpsertQuery.build(tablePath, query.getYqlBatcher().getColumns(), types);
389+
}
390+
371391
if (types != null) {
372392
BatchedQuery params = BatchedQuery.createAutoBatched(query.getYqlBatcher(), types);
373393
if (params != null) {

jdbc/src/main/java/tech/ydb/jdbc/context/YdbExecutor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import tech.ydb.jdbc.query.YdbQuery;
99
import tech.ydb.table.query.Params;
1010
import tech.ydb.table.result.ResultSetReader;
11+
import tech.ydb.table.values.ListValue;
1112

1213
/**
1314
*
@@ -35,6 +36,9 @@ default void ensureOpened() throws SQLException {
3536

3637
void executeSchemeQuery(YdbContext ctx, YdbValidator validator, String yql) throws SQLException;
3738

39+
void executeBulkUpsert(YdbContext ctx, YdbValidator validator, String yql, String tablePath, ListValue rows)
40+
throws SQLException;
41+
3842
List<ResultSetReader> executeDataQuery(YdbContext ctx, YdbValidator validator, YdbQuery query, String yql,
3943
long timeout, boolean poolable, Params params) throws SQLException;
4044

jdbc/src/main/java/tech/ydb/jdbc/impl/BaseYdbStatement.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import tech.ydb.jdbc.settings.YdbOperationProperties;
2626
import tech.ydb.table.query.Params;
2727
import tech.ydb.table.result.ResultSetReader;
28+
import tech.ydb.table.values.ListValue;
2829

2930
/**
3031
*
@@ -167,6 +168,18 @@ protected boolean updateState(List<YdbResult> results) {
167168
return state.hasResultSets();
168169
}
169170

171+
protected List<YdbResult> executeBulkUpsert(YdbQuery query, String yql, String tablePath, ListValue rows)
172+
throws SQLException {
173+
connection.executeBulkUpsertQuery(yql, tablePath, validator, rows);
174+
175+
int expressionsCount = query.getStatements().isEmpty() ? 1 : query.getStatements().size();
176+
List<YdbResult> results = new ArrayList<>();
177+
for (int i = 0; i < expressionsCount; i++) {
178+
results.add(HAS_UPDATED);
179+
}
180+
return results;
181+
}
182+
170183
protected List<YdbResult> executeSchemeQuery(YdbQuery query) throws SQLException {
171184
connection.executeSchemeQuery(query.getPreparedYql(), validator);
172185

jdbc/src/main/java/tech/ydb/jdbc/impl/YdbConnectionImpl.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import tech.ydb.jdbc.settings.YdbOperationProperties;
3838
import tech.ydb.table.query.Params;
3939
import tech.ydb.table.result.ResultSetReader;
40+
import tech.ydb.table.values.ListValue;
4041

4142
public class YdbConnectionImpl implements YdbConnection {
4243
private static final Logger LOGGER = Logger.getLogger(YdbConnectionImpl.class.getName());
@@ -46,13 +47,15 @@ public class YdbConnectionImpl implements YdbConnection {
4647
private final YdbExecutor executor;
4748
private final FakeTxMode scanQueryTxMode;
4849
private final FakeTxMode schemeQueryTxMode;
50+
private final FakeTxMode bulkQueryTxMode;
4951

5052
public YdbConnectionImpl(YdbContext context) throws SQLException {
5153
this.ctx = context;
5254

5355
YdbOperationProperties props = ctx.getOperationProperties();
5456
this.scanQueryTxMode = props.getScanQueryTxMode();
5557
this.schemeQueryTxMode = props.getSchemeQueryTxMode();
58+
this.bulkQueryTxMode = props.getBulkQueryTxMode();
5659

5760
this.validator = new YdbValidator(LOGGER);
5861
this.executor = ctx.createExecutor();
@@ -206,6 +209,7 @@ public void executeSchemeQuery(String yql, YdbValidator validator) throws SQLExc
206209
@Override
207210
public List<ResultSetReader> executeDataQuery(YdbQuery query, String yql, YdbValidator validator,
208211
int timeout, boolean poolable, Params params) throws SQLException {
212+
executor.ensureOpened();
209213
return executor.executeDataQuery(ctx, validator, query, yql, timeout, poolable, params);
210214
}
211215

@@ -230,8 +234,30 @@ public ResultSetReader executeScanQuery(YdbQuery query, String yql, YdbValidator
230234
return executor.executeScanQuery(ctx, validator, query, yql, params);
231235
}
232236

237+
@Override
238+
public void executeBulkUpsertQuery(String yql, String tablePath, YdbValidator validator, ListValue rows)
239+
throws SQLException {
240+
executor.ensureOpened();
241+
242+
if (executor.isInsideTransaction()) {
243+
switch (bulkQueryTxMode) {
244+
case FAKE_TX:
245+
break;
246+
case SHADOW_COMMIT:
247+
commit();
248+
break;
249+
case ERROR:
250+
default:
251+
throw new SQLException(YdbConst.BULK_QUERY_INSIDE_TRANSACTION);
252+
}
253+
}
254+
255+
executor.executeBulkUpsert(ctx, validator, yql, tablePath, rows);
256+
}
257+
233258
@Override
234259
public ExplainedQuery executeExplainQuery(String yql, YdbValidator validator) throws SQLException {
260+
executor.ensureOpened();
235261
return executor.executeExplainQuery(ctx, validator, yql);
236262
}
237263

0 commit comments

Comments
 (0)