Skip to content

Commit 94880cc

Browse files
committed
Added implementation for BULK upserts
1 parent 2a5d8bb commit 94880cc

File tree

14 files changed

+257
-65
lines changed

14 files changed

+257
-65
lines changed

jdbc/src/main/java/tech/ydb/jdbc/YdbConnection.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,14 @@ public interface YdbConnection extends Connection {
3838
/**
3939
* Explicitly execute bulk upsert to the table
4040
*
41+
* @param yql description of request
4142
* @param tablePath path to table
4243
* @param validator handler for logging and warnings
4344
* @param rows bulk rows
4445
* @throws SQLException if query cannot be executed
4546
*/
46-
void executeBulkUpsertQuery(String tablePath, YdbValidator validator, ListValue rows) throws SQLException;
47+
void executeBulkUpsertQuery(String yql, String tablePath, YdbValidator validator, ListValue rows)
48+
throws SQLException;
4749

4850
/**
4951
* Explicitly execute query as a data query

jdbc/src/main/java/tech/ydb/jdbc/YdbConst.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,9 @@ public final class YdbConst {
6565
public static final String PARAMETER_NOT_FOUND = "Parameter not found: ";
6666
public static final String PARAMETER_TYPE_UNKNOWN = "Unable to convert sqlType %s to YDB type for parameter: %s";
6767
public static final String INVALID_ROW = "Current row index is out of bounds: ";
68-
public static final String BULKS_UNSUPPORTED = "Bulk upserts are supported only in prepared statements";
68+
public static final String BULKS_UNSUPPORTED = "BULK mode is available only for prepared statement with one UPSERT";
69+
public static final String INVALID_BATCH_COLUMN = "Cannot prepared batch request: cannot find a column";
70+
public static final String BULKS_DESCRIBE_ERROR = "Cannot parse BULK upsert: ";
6971
public static final String METADATA_RS_UNSUPPORTED_IN_PS = "ResultSet metadata is not supported " +
7072
"in prepared statements";
7173
public static final String CANNOT_UNWRAP_TO = "Cannot unwrap to ";

jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,10 @@ public void executeSchemeQuery(YdbContext ctx, YdbValidator validator, String yq
3838
}
3939

4040
@Override
41-
public void executeBulkUpsert(YdbContext ctx, YdbValidator validator, String tablePath, ListValue rows)
41+
public void executeBulkUpsert(YdbContext ctx, YdbValidator validator, String yql, String tablePath, ListValue rows)
4242
throws SQLException {
4343
ensureOpened();
44-
validator.execute(QueryType.BULK_QUERY + " >>\n" + tablePath,
44+
validator.execute(QueryType.BULK_QUERY + " >>\n" + yql,
4545
() -> retryCtx.supplyStatus(session -> session.executeBulkUpsert(tablePath, rows))
4646
);
4747
}

jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import tech.ydb.jdbc.query.YdbPreparedQuery;
3131
import tech.ydb.jdbc.query.YdbQuery;
3232
import tech.ydb.jdbc.query.params.BatchedQuery;
33+
import tech.ydb.jdbc.query.params.BulkUpsertQuery;
3334
import tech.ydb.jdbc.query.params.InMemoryQuery;
3435
import tech.ydb.jdbc.query.params.PreparedQuery;
3536
import tech.ydb.jdbc.settings.YdbClientProperties;
@@ -350,16 +351,23 @@ public YdbPreparedQuery findOrPrepareParams(YdbQuery query, YdbPrepareMode mode)
350351
}
351352
}
352353

353-
if (query.getType() == QueryType.EXPLAIN_QUERY || query.getType() == QueryType.SCHEME_QUERY) {
354+
QueryType type = query.getType();
355+
356+
if (type == QueryType.BULK_QUERY) {
357+
if (query.getYqlBatcher() == null || query.getYqlBatcher().isInsert()) {
358+
throw new SQLException(YdbConst.BULKS_UNSUPPORTED);
359+
}
360+
}
361+
362+
if (type == QueryType.EXPLAIN_QUERY || type == QueryType.SCHEME_QUERY) {
354363
return new InMemoryQuery(query, queryOptions.isDeclareJdbcParameters());
355364
}
356365

357-
if (query.getYqlBatcher() != null && mode == YdbPrepareMode.AUTO) {
366+
if (query.getYqlBatcher() != null && (mode == YdbPrepareMode.AUTO || type == QueryType.BULK_QUERY)) {
367+
String tableName = query.getYqlBatcher().getTableName();
368+
String tablePath = tableName.startsWith("/") ? tableName : getDatabase() + "/" + tableName;
358369
Map<String, Type> types = queryParamsCache.getIfPresent(query.getOriginQuery());
359370
if (types == null) {
360-
String tableName = query.getYqlBatcher().getTableName();
361-
String tablePath = tableName.startsWith("/") ? tableName : getDatabase() + "/" + tableName;
362-
363371
DescribeTableSettings settings = withDefaultTimeout(new DescribeTableSettings());
364372
Result<TableDescription> result = retryCtx.supplyResult(
365373
session -> session.describeTable(tablePath, settings)
@@ -370,8 +378,16 @@ public YdbPreparedQuery findOrPrepareParams(YdbQuery query, YdbPrepareMode mode)
370378
types = descrtiption.getColumns().stream()
371379
.collect(Collectors.toMap(TableColumn::getName, TableColumn::getType));
372380
queryParamsCache.put(query.getOriginQuery(), types);
381+
} else {
382+
if (type == QueryType.BULK_QUERY) {
383+
throw new SQLException(YdbConst.BULKS_DESCRIBE_ERROR + result.getStatus());
384+
}
373385
}
374386
}
387+
if (type == QueryType.BULK_QUERY) {
388+
return BulkUpsertQuery.build(tablePath, query.getYqlBatcher().getColumns(), types);
389+
}
390+
375391
if (types != null) {
376392
BatchedQuery params = BatchedQuery.createAutoBatched(query.getYqlBatcher(), types);
377393
if (params != null) {

jdbc/src/main/java/tech/ydb/jdbc/context/YdbExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ default void ensureOpened() throws SQLException {
3636

3737
void executeSchemeQuery(YdbContext ctx, YdbValidator validator, String yql) throws SQLException;
3838

39-
void executeBulkUpsert(YdbContext ctx, YdbValidator validator, String tablePath, ListValue rows)
39+
void executeBulkUpsert(YdbContext ctx, YdbValidator validator, String yql, String tablePath, ListValue rows)
4040
throws SQLException;
4141

4242
List<ResultSetReader> executeDataQuery(YdbContext ctx, YdbValidator validator, YdbQuery query, String yql,

jdbc/src/main/java/tech/ydb/jdbc/impl/BaseYdbStatement.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import tech.ydb.jdbc.settings.YdbOperationProperties;
2626
import tech.ydb.table.query.Params;
2727
import tech.ydb.table.result.ResultSetReader;
28+
import tech.ydb.table.values.ListValue;
2829

2930
/**
3031
*
@@ -167,6 +168,18 @@ protected boolean updateState(List<YdbResult> results) {
167168
return state.hasResultSets();
168169
}
169170

171+
protected List<YdbResult> executeBulkUpsert(YdbQuery query, String yql, String tablePath, ListValue rows)
172+
throws SQLException {
173+
connection.executeBulkUpsertQuery(yql, tablePath, validator, rows);
174+
175+
int expressionsCount = query.getStatements().isEmpty() ? 1 : query.getStatements().size();
176+
List<YdbResult> results = new ArrayList<>();
177+
for (int i = 0; i < expressionsCount; i++) {
178+
results.add(HAS_UPDATED);
179+
}
180+
return results;
181+
}
182+
170183
protected List<YdbResult> executeSchemeQuery(YdbQuery query) throws SQLException {
171184
connection.executeSchemeQuery(query.getPreparedYql(), validator);
172185

jdbc/src/main/java/tech/ydb/jdbc/impl/YdbConnectionImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,8 @@ public ResultSetReader executeScanQuery(YdbQuery query, String yql, YdbValidator
235235
}
236236

237237
@Override
238-
public void executeBulkUpsertQuery(String tablePath, YdbValidator validator, ListValue rows) throws SQLException {
238+
public void executeBulkUpsertQuery(String yql, String tablePath, YdbValidator validator, ListValue rows)
239+
throws SQLException {
239240
executor.ensureOpened();
240241

241242
if (executor.isInsideTransaction()) {
@@ -251,7 +252,7 @@ public void executeBulkUpsertQuery(String tablePath, YdbValidator validator, Lis
251252
}
252253
}
253254

254-
executor.executeBulkUpsert(ctx, validator, tablePath, rows);
255+
executor.executeBulkUpsert(ctx, validator, yql, tablePath, rows);
255256
}
256257

257258
@Override

jdbc/src/main/java/tech/ydb/jdbc/impl/YdbPreparedStatementImpl.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@
3232
import tech.ydb.jdbc.YdbPreparedStatement;
3333
import tech.ydb.jdbc.YdbResultSet;
3434
import tech.ydb.jdbc.common.MappingSetters;
35+
import tech.ydb.jdbc.query.QueryType;
3536
import tech.ydb.jdbc.query.YdbPreparedQuery;
3637
import tech.ydb.jdbc.query.YdbQuery;
38+
import tech.ydb.jdbc.query.params.BulkUpsertQuery;
3739
import tech.ydb.table.query.Params;
3840
import tech.ydb.table.values.Type;
3941

@@ -90,8 +92,14 @@ public int[] executeBatch() throws SQLException {
9092
}
9193

9294
try {
93-
for (Params prm: prepared.getBatchParams()) {
94-
executeDataQuery(query, prepared.getQueryText(prm), prm);
95+
if (query.getType() == QueryType.BULK_QUERY && (prepared instanceof BulkUpsertQuery)) {
96+
BulkUpsertQuery bulk = (BulkUpsertQuery) prepared;
97+
String yql = bulk.getQueryText(null);
98+
executeBulkUpsert(query, yql, bulk.getTablePath(), bulk.getBatchedBulk());
99+
} else {
100+
for (Params prm: prepared.getBatchParams()) {
101+
executeDataQuery(query, prepared.getQueryText(prm), prm);
102+
}
95103
}
96104
} finally {
97105
clearBatch();
@@ -138,6 +146,17 @@ public boolean execute() throws SQLException {
138146
case EXPLAIN_QUERY:
139147
newState = executeExplainQuery(query);
140148
break;
149+
case BULK_QUERY:
150+
if (prepared instanceof BulkUpsertQuery) {
151+
BulkUpsertQuery bulk = (BulkUpsertQuery) prepared;
152+
String yql = bulk.getQueryText(null);
153+
newState = executeBulkUpsert(query, yql, bulk.getTablePath(), bulk.getCurrentBulk());
154+
} else {
155+
throw new IllegalStateException(
156+
"Internal error. Incorrect class of bulk prepared query " + prepared.getClass()
157+
);
158+
}
159+
break;
141160
default:
142161
throw new IllegalStateException("Internal error. Unsupported query type " + query.getType());
143162
}

jdbc/src/main/java/tech/ydb/jdbc/impl/YdbStatementImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ public boolean execute(String sql) throws SQLException {
9494
case EXPLAIN_QUERY:
9595
newState = executeExplainQuery(query);
9696
break;
97+
case BULK_QUERY:
98+
throw new SQLException(YdbConst.BULKS_UNSUPPORTED);
9799
default:
98100
throw new IllegalStateException("Internal error. Unsupported query type " + query.getType());
99101
}

jdbc/src/main/java/tech/ydb/jdbc/query/QueryType.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,6 @@ public enum QueryType {
1313
// EXPLAIN
1414
EXPLAIN_QUERY,
1515

16-
// BULK UPSERT
17-
BULK_QUERY,
16+
// BULK
17+
BULK_QUERY;
1818
}

0 commit comments

Comments
 (0)