Skip to content

Commit d47c519

Browse files
committed
Added customization of BulkUpsert data
1 parent 06eac34 commit d47c519

File tree

5 files changed

+51
-19
lines changed

5 files changed

+51
-19
lines changed

table/src/main/java/tech/ydb/table/Session.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import tech.ydb.core.grpc.GrpcReadStream;
1212
import tech.ydb.table.description.TableDescription;
1313
import tech.ydb.table.description.TableOptionDescription;
14+
import tech.ydb.table.query.BulkUpsertData;
1415
import tech.ydb.table.query.DataQuery;
1516
import tech.ydb.table.query.DataQueryResult;
1617
import tech.ydb.table.query.ExplainDataQueryResult;
@@ -178,7 +179,15 @@ default CompletableFuture<Status> executeScanQuery(String query, Params params,
178179

179180
CompletableFuture<Result<State>> keepAlive(KeepAliveSessionSettings settings);
180181

181-
CompletableFuture<Status> executeBulkUpsert(String tablePath, ListValue rows, BulkUpsertSettings settings);
182+
default CompletableFuture<Status> executeBulkUpsert(String tablePath, ListValue rows, BulkUpsertSettings settings) {
183+
return executeBulkUpsert(tablePath, new BulkUpsertData(rows), settings);
184+
}
185+
186+
CompletableFuture<Status> executeBulkUpsert(String tablePath, BulkUpsertData data, BulkUpsertSettings settings);
187+
188+
default CompletableFuture<Status> executeBulkUpsert(String tablePath, BulkUpsertData data) {
189+
return executeBulkUpsert(tablePath, data, new BulkUpsertSettings());
190+
}
182191

183192
default CompletableFuture<Status> createTable(String path, TableDescription tableDescriptions) {
184193
return createTable(path, tableDescriptions, new CreateTableSettings());

table/src/main/java/tech/ydb/table/impl/BaseSession.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import tech.ydb.table.description.TableIndex;
5858
import tech.ydb.table.description.TableOptionDescription;
5959
import tech.ydb.table.description.TableTtl;
60+
import tech.ydb.table.query.BulkUpsertData;
6061
import tech.ydb.table.query.DataQuery;
6162
import tech.ydb.table.query.DataQueryResult;
6263
import tech.ydb.table.query.ExplainDataQueryResult;
@@ -99,7 +100,6 @@
99100
import tech.ydb.table.transaction.Transaction;
100101
import tech.ydb.table.transaction.TxControl;
101102
import tech.ydb.table.values.ListType;
102-
import tech.ydb.table.values.ListValue;
103103
import tech.ydb.table.values.PrimitiveValue;
104104
import tech.ydb.table.values.StructValue;
105105
import tech.ydb.table.values.TupleValue;
@@ -1394,19 +1394,13 @@ public CompletableFuture<Result<State>> keepAlive(KeepAliveSessionSettings setti
13941394
}
13951395

13961396
@Override
1397-
public CompletableFuture<Status> executeBulkUpsert(String tablePath, ListValue rows, BulkUpsertSettings settings) {
1398-
ValueProtos.TypedValue typedRows = ValueProtos.TypedValue.newBuilder()
1399-
.setType(rows.getType().toPb())
1400-
.setValue(rows.toPb())
1401-
.build();
1402-
1403-
YdbTable.BulkUpsertRequest request = YdbTable.BulkUpsertRequest.newBuilder()
1397+
public CompletableFuture<Status> executeBulkUpsert(String tablePath, BulkUpsertData data, BulkUpsertSettings st) {
1398+
YdbTable.BulkUpsertRequest.Builder request = YdbTable.BulkUpsertRequest.newBuilder()
14041399
.setTable(tablePath)
1405-
.setRows(typedRows)
1406-
.setOperationParams(Operation.buildParams(settings.toOperationSettings()))
1407-
.build();
1400+
.setOperationParams(Operation.buildParams(st.toOperationSettings()));
14081401

1409-
return interceptStatus(rpc.bulkUpsert(request, makeOptions(settings).build()));
1402+
data.applyToRequest(request);
1403+
return interceptStatus(rpc.bulkUpsert(request.build(), makeOptions(st).build()));
14101404
}
14111405

14121406
private static State mapSessionStatus(YdbTable.KeepAliveResult result) {
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package tech.ydb.table.query;
2+
3+
4+
import tech.ydb.proto.ValueProtos;
5+
import tech.ydb.proto.table.YdbTable;
6+
import tech.ydb.table.values.ListValue;
7+
8+
/**
9+
*
10+
* @author Aleksandr Gorshenin
11+
*/
12+
public class BulkUpsertData {
13+
private final ValueProtos.TypedValue rows;
14+
15+
public BulkUpsertData(ListValue rows) {
16+
this.rows = ValueProtos.TypedValue.newBuilder()
17+
.setType(rows.getType().toPb())
18+
.setValue(rows.toPb())
19+
.build();
20+
}
21+
22+
public BulkUpsertData(ValueProtos.TypedValue rows) {
23+
this.rows = rows;
24+
}
25+
26+
public void applyToRequest(YdbTable.BulkUpsertRequest.Builder builder) {
27+
builder.setRows(rows);
28+
}
29+
}

table/src/test/java/tech/ydb/table/SessionStub.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import tech.ydb.core.utils.FutureTools;
1111
import tech.ydb.table.description.TableDescription;
1212
import tech.ydb.table.description.TableOptionDescription;
13+
import tech.ydb.table.query.BulkUpsertData;
1314
import tech.ydb.table.query.DataQuery;
1415
import tech.ydb.table.query.DataQueryResult;
1516
import tech.ydb.table.query.ExplainDataQueryResult;
@@ -40,7 +41,6 @@
4041
import tech.ydb.table.transaction.TableTransaction;
4142
import tech.ydb.table.transaction.Transaction;
4243
import tech.ydb.table.transaction.TxControl;
43-
import tech.ydb.table.values.ListValue;
4444

4545

4646
/**
@@ -178,7 +178,7 @@ public CompletableFuture<Result<Session.State>> keepAlive(KeepAliveSessionSettin
178178
}
179179

180180
@Override
181-
public CompletableFuture<Status> executeBulkUpsert(String tablePath, ListValue rows, BulkUpsertSettings settings) {
181+
public CompletableFuture<Status> executeBulkUpsert(String tablePath, BulkUpsertData rows, BulkUpsertSettings settings) {
182182
return notImplemented("bulkUpsert()");
183183
}
184184

table/src/test/java/tech/ydb/table/integration/ReadTableTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@
2828
import tech.ydb.table.SessionRetryContext;
2929
import tech.ydb.table.description.TableDescription;
3030
import tech.ydb.table.impl.SimpleTableClient;
31+
import tech.ydb.table.query.BulkUpsertData;
3132
import tech.ydb.table.query.ReadTablePart;
3233
import tech.ydb.table.result.ResultSetReader;
3334
import tech.ydb.table.rpc.grpc.GrpcTableRpc;
34-
import tech.ydb.table.settings.BulkUpsertSettings;
3535
import tech.ydb.table.settings.ReadTableSettings;
3636
import tech.ydb.table.values.ListType;
3737
import tech.ydb.table.values.PrimitiveType;
@@ -98,9 +98,9 @@ public static void prepareTable() {
9898
);
9999
}).collect(Collectors.toList());
100100

101-
retryCtx.supplyStatus(session -> session.executeBulkUpsert(
102-
tablePath, ListType.of(batchType).newValue(batchData), new BulkUpsertSettings())
103-
).join().expectSuccess("bulk upsert problem in table " + tablePath);
101+
retryCtx.supplyStatus(session -> session.executeBulkUpsert(tablePath,
102+
new BulkUpsertData(ProtoValue.toTypedValue(ListType.of(batchType).newValue(batchData)))
103+
)).join().expectSuccess("bulk upsert problem in table " + tablePath);
104104
}
105105

106106
@AfterClass

0 commit comments

Comments
 (0)