Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions table/src/main/java/tech/ydb/table/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.proto.ValueProtos;
import tech.ydb.table.description.TableDescription;
import tech.ydb.table.description.TableOptionDescription;
import tech.ydb.table.query.BulkUpsertData;
import tech.ydb.table.query.DataQuery;
import tech.ydb.table.query.DataQueryResult;
import tech.ydb.table.query.ExplainDataQueryResult;
import tech.ydb.table.query.Params;
import tech.ydb.table.query.ReadRowsResult;
import tech.ydb.table.query.ReadTablePart;
import tech.ydb.table.result.ResultSetReader;
import tech.ydb.table.result.impl.ProtoValueReaders;
import tech.ydb.table.settings.AlterTableSettings;
import tech.ydb.table.settings.BeginTxSettings;
import tech.ydb.table.settings.BulkUpsertSettings;
Expand Down Expand Up @@ -146,7 +149,24 @@ default CompletableFuture<Result<TableTransaction>> beginTransaction(TxMode txMo

GrpcReadStream<ReadTablePart> executeReadTable(String tablePath, ReadTableSettings settings);

GrpcReadStream<ResultSetReader> executeScanQuery(String query, Params params, ExecuteScanQuerySettings settings);
GrpcReadStream<ValueProtos.ResultSet> executeScanQueryRaw(String query, Params params,
ExecuteScanQuerySettings settings);

default GrpcReadStream<ResultSetReader> executeScanQuery(String query, Params params,
ExecuteScanQuerySettings settings) {
GrpcReadStream<ValueProtos.ResultSet> stream = executeScanQueryRaw(query, params, settings);
return new GrpcReadStream<ResultSetReader>() {
@Override
public CompletableFuture<Status> start(GrpcReadStream.Observer<ResultSetReader> observer) {
return stream.start(part -> observer.onNext(ProtoValueReaders.forResultSet(part)));
}

@Override
public void cancel() {
stream.cancel();
}
};
}

@Deprecated
default GrpcReadStream<ResultSetReader> readTable(String tablePath, ReadTableSettings settings) {
Expand Down Expand Up @@ -178,7 +198,15 @@ default CompletableFuture<Status> executeScanQuery(String query, Params params,

CompletableFuture<Result<State>> keepAlive(KeepAliveSessionSettings settings);

CompletableFuture<Status> executeBulkUpsert(String tablePath, ListValue rows, BulkUpsertSettings settings);
default CompletableFuture<Status> executeBulkUpsert(String tablePath, ListValue rows, BulkUpsertSettings settings) {
return executeBulkUpsert(tablePath, new BulkUpsertData(rows), settings);
}

CompletableFuture<Status> executeBulkUpsert(String tablePath, BulkUpsertData data, BulkUpsertSettings settings);

default CompletableFuture<Status> executeBulkUpsert(String tablePath, BulkUpsertData data) {
return executeBulkUpsert(tablePath, data, new BulkUpsertSettings());
}

default CompletableFuture<Status> createTable(String path, TableDescription tableDescriptions) {
return createTable(path, tableDescriptions, new CreateTableSettings());
Expand Down
42 changes: 17 additions & 25 deletions table/src/main/java/tech/ydb/table/impl/BaseSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,13 @@
import tech.ydb.table.description.TableIndex;
import tech.ydb.table.description.TableOptionDescription;
import tech.ydb.table.description.TableTtl;
import tech.ydb.table.query.BulkUpsertData;
import tech.ydb.table.query.DataQuery;
import tech.ydb.table.query.DataQueryResult;
import tech.ydb.table.query.ExplainDataQueryResult;
import tech.ydb.table.query.Params;
import tech.ydb.table.query.ReadRowsResult;
import tech.ydb.table.query.ReadTablePart;
import tech.ydb.table.result.ResultSetReader;
import tech.ydb.table.result.impl.ProtoValueReaders;
import tech.ydb.table.rpc.TableRpc;
import tech.ydb.table.settings.AlterTableSettings;
import tech.ydb.table.settings.AutoPartitioningPolicy;
Expand Down Expand Up @@ -99,7 +98,6 @@
import tech.ydb.table.transaction.Transaction;
import tech.ydb.table.transaction.TxControl;
import tech.ydb.table.values.ListType;
import tech.ydb.table.values.ListValue;
import tech.ydb.table.values.PrimitiveValue;
import tech.ydb.table.values.StructValue;
import tech.ydb.table.values.TupleValue;
Expand Down Expand Up @@ -1266,23 +1264,23 @@ public GrpcReadStream<ReadTablePart> executeReadTable(String tablePath, ReadTabl
.setBatchLimitBytes(settings.batchLimitBytes())
.setBatchLimitRows(settings.batchLimitRows());

Value<?> fromKey = settings.getFromKey();
ValueProtos.TypedValue fromKey = settings.getFromKeyRaw();
if (fromKey != null) {
YdbTable.KeyRange.Builder range = request.getKeyRangeBuilder();
if (settings.isFromInclusive()) {
range.setGreaterOrEqual(ProtoValue.toTypedValue(fromKey));
range.setGreaterOrEqual(fromKey);
} else {
range.setGreater(ProtoValue.toTypedValue(fromKey));
range.setGreater(fromKey);
}
}

Value<?> toKey = settings.getToKey();
ValueProtos.TypedValue toKey = settings.getToKeyRaw();
if (toKey != null) {
YdbTable.KeyRange.Builder range = request.getKeyRangeBuilder();
if (settings.isToInclusive()) {
range.setLessOrEqual(ProtoValue.toTypedValue(toKey));
range.setLessOrEqual(toKey);
} else {
range.setLess(ProtoValue.toTypedValue(toKey));
range.setLess(toKey);
}
}

Expand All @@ -1309,13 +1307,13 @@ List<YdbIssueMessage.IssueMessage> readIssues(YdbTable.ReadTableResponse message

@Override
ReadTablePart readValue(YdbTable.ReadTableResponse message) {
return new ReadTablePart(message.getResult(), message.getSnapshot());
return new ReadTablePart(message);
}
};
}

@Override
public GrpcReadStream<ResultSetReader> executeScanQuery(String query, Params params,
public GrpcReadStream<ValueProtos.ResultSet> executeScanQueryRaw(String query, Params params,
ExecuteScanQuerySettings settings) {
YdbTable.ExecuteScanQueryRequest req = YdbTable.ExecuteScanQueryRequest.newBuilder()
.setQuery(YdbTable.Query.newBuilder().setYqlText(query))
Expand All @@ -1330,7 +1328,7 @@ public GrpcReadStream<ResultSetReader> executeScanQuery(String query, Params par
}

GrpcReadStream<YdbTable.ExecuteScanQueryPartialResponse> origin = rpc.streamExecuteScanQuery(req, opts.build());
return new ProxyStream<YdbTable.ExecuteScanQueryPartialResponse, ResultSetReader>(origin) {
return new ProxyStream<YdbTable.ExecuteScanQueryPartialResponse, ValueProtos.ResultSet>(origin) {
@Override
StatusIds.StatusCode readStatusCode(YdbTable.ExecuteScanQueryPartialResponse message) {
return message.getStatus();
Expand All @@ -1342,8 +1340,8 @@ List<YdbIssueMessage.IssueMessage> readIssues(YdbTable.ExecuteScanQueryPartialRe
}

@Override
ResultSetReader readValue(YdbTable.ExecuteScanQueryPartialResponse message) {
return ProtoValueReaders.forResultSet(message.getResult().getResultSet());
ValueProtos.ResultSet readValue(YdbTable.ExecuteScanQueryPartialResponse message) {
return message.getResult().getResultSet();
}
};
}
Expand Down Expand Up @@ -1394,19 +1392,13 @@ public CompletableFuture<Result<State>> keepAlive(KeepAliveSessionSettings setti
}

@Override
public CompletableFuture<Status> executeBulkUpsert(String tablePath, ListValue rows, BulkUpsertSettings settings) {
ValueProtos.TypedValue typedRows = ValueProtos.TypedValue.newBuilder()
.setType(rows.getType().toPb())
.setValue(rows.toPb())
.build();

YdbTable.BulkUpsertRequest request = YdbTable.BulkUpsertRequest.newBuilder()
public CompletableFuture<Status> executeBulkUpsert(String tablePath, BulkUpsertData data, BulkUpsertSettings st) {
YdbTable.BulkUpsertRequest.Builder request = YdbTable.BulkUpsertRequest.newBuilder()
.setTable(tablePath)
.setRows(typedRows)
.setOperationParams(Operation.buildParams(settings.toOperationSettings()))
.build();
.setOperationParams(Operation.buildParams(st.toOperationSettings()));

return interceptStatus(rpc.bulkUpsert(request, makeOptions(settings).build()));
data.applyToRequest(request);
return interceptStatus(rpc.bulkUpsert(request.build(), makeOptions(st).build()));
}

private static State mapSessionStatus(YdbTable.KeepAliveResult result) {
Expand Down
29 changes: 29 additions & 0 deletions table/src/main/java/tech/ydb/table/query/BulkUpsertData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package tech.ydb.table.query;


import tech.ydb.proto.ValueProtos;
import tech.ydb.proto.table.YdbTable;
import tech.ydb.table.values.ListValue;

/**
*
* @author Aleksandr Gorshenin
*/
public class BulkUpsertData {
private final ValueProtos.TypedValue rows;

public BulkUpsertData(ListValue rows) {
this.rows = ValueProtos.TypedValue.newBuilder()
.setType(rows.getType().toPb())
.setValue(rows.toPb())
.build();
}

public BulkUpsertData(ValueProtos.TypedValue rows) {
this.rows = rows;
}

public void applyToRequest(YdbTable.BulkUpsertRequest.Builder builder) {
builder.setRows(rows);
}
}
4 changes: 4 additions & 0 deletions table/src/main/java/tech/ydb/table/query/DataQueryResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ public ResultSetReader getResultSet(int index) {
return ProtoValueReaders.forResultSet(resultSets.get(index));
}

public ValueProtos.ResultSet getRawResultSet(int index) {
return resultSets.get(index);
}

public boolean isTruncated(int index) {
return resultSets.get(index).getTruncated();
}
Expand Down
21 changes: 16 additions & 5 deletions table/src/main/java/tech/ydb/table/query/ReadTablePart.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,30 @@ public long getTxId() {
}
}

private final ResultSetReader resultSetReader;
private final VirtualTimestamp timestamp;
private final YdbTable.ReadTableResponse part;
private ResultSetReader resultSetReader = null;
private VirtualTimestamp timestamp = null;

public ReadTablePart(YdbTable.ReadTableResult result, CommonProtos.VirtualTimestamp snapshot) {
this.resultSetReader = ProtoValueReaders.forResultSet(result.getResultSet());
this.timestamp = new VirtualTimestamp(snapshot.getPlanStep(), snapshot.getTxId());
public ReadTablePart(YdbTable.ReadTableResponse part) {
this.part = part;
}

public YdbTable.ReadTableResponse getReadTableResponse() {
return part;
}

public ResultSetReader getResultSetReader() {
if (resultSetReader == null) {
resultSetReader = ProtoValueReaders.forResultSet(part.getResult().getResultSet());
}
return resultSetReader;
}

public VirtualTimestamp getVirtualTimestamp() {
if (timestamp == null) {
CommonProtos.VirtualTimestamp vt = part.getSnapshot();
timestamp = new VirtualTimestamp(vt.getPlanStep(), vt.getTxId());
}
return timestamp;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,17 @@ public static ResultSetReader forResultSets(Collection<ResultSetReader> resultSe
return new ProtoResultSetReader(builder.build());
}

@Deprecated
public static ValueReader forType(ValueProtos.Type type) {
return forTypeImpl(type);
}

public static ValueReader forTypedValue(ValueProtos.TypedValue tv) {
AbstractValueReader vr = forTypeImpl(tv.getType());
vr.setProtoValue(tv.getValue());
return vr;
}

static AbstractValueReader forTypeImpl(ValueProtos.Type type) {
switch (type.getTypeCase()) {
case TYPE_ID:
Expand Down
33 changes: 29 additions & 4 deletions table/src/main/java/tech/ydb/table/settings/ReadTableSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@

import tech.ydb.core.grpc.GrpcFlowControl;
import tech.ydb.core.settings.BaseRequestSettings;
import tech.ydb.proto.ValueProtos;
import tech.ydb.table.result.impl.ProtoValueReaders;
import tech.ydb.table.values.PrimitiveValue;
import tech.ydb.table.values.TupleValue;
import tech.ydb.table.values.proto.ProtoValue;

import static com.google.common.base.Preconditions.checkArgument;

Expand All @@ -24,9 +27,9 @@
public class ReadTableSettings extends BaseRequestSettings {

private final boolean ordered;
private final TupleValue fromKey;
private final ValueProtos.TypedValue fromKey;
private final boolean fromInclusive;
private final TupleValue toKey;
private final ValueProtos.TypedValue toKey;
private final boolean toInclusive;
private final int rowLimit;
private final ImmutableList<String> columns;
Expand Down Expand Up @@ -58,6 +61,11 @@ public boolean isOrdered() {

@Nullable
public TupleValue getFromKey() {
return fromKey == null ? null : ProtoValueReaders.forTypedValue(fromKey).getValue().asTuple();
}

@Nullable
public ValueProtos.TypedValue getFromKeyRaw() {
return fromKey;
}

Expand All @@ -67,6 +75,11 @@ public boolean isFromInclusive() {

@Nullable
public TupleValue getToKey() {
return toKey == null ? null : ProtoValueReaders.forTypedValue(toKey).getValue().asTuple();
}

@Nullable
public ValueProtos.TypedValue getToKeyRaw() {
return toKey;
}

Expand Down Expand Up @@ -99,9 +112,9 @@ public GrpcFlowControl getGrpcFlowControl() {
*/
public static final class Builder extends BaseBuilder<Builder> {
private boolean ordered = false;
private TupleValue fromKey = null;
private ValueProtos.TypedValue fromKey = null;
private boolean fromInclusive = false;
private TupleValue toKey = null;
private ValueProtos.TypedValue toKey = null;
private boolean toInclusive = false;
private int rowLimit = 0;
private List<String> columns = Collections.emptyList();
Expand All @@ -115,12 +128,24 @@ public Builder orderedRead(boolean ordered) {
}

public Builder fromKey(TupleValue value, boolean inclusive) {
this.fromKey = ProtoValue.toTypedValue(value);
this.fromInclusive = inclusive;
return this;
}

public Builder fromKey(ValueProtos.TypedValue value, boolean inclusive) {
this.fromKey = value;
this.fromInclusive = inclusive;
return this;
}

public Builder toKey(TupleValue value, boolean inclusive) {
this.toKey = ProtoValue.toTypedValue(value);
this.toInclusive = inclusive;
return this;
}

public Builder toKey(ValueProtos.TypedValue value, boolean inclusive) {
this.toKey = value;
this.toInclusive = inclusive;
return this;
Expand Down
4 changes: 4 additions & 0 deletions table/src/main/java/tech/ydb/table/values/Value.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ default StructValue asStruct() {
return (StructValue) this;
}

default TupleValue asTuple() {
return (TupleValue) this;
}

default VariantValue asVariant() {
return (VariantValue) this;
}
Expand Down
Loading