diff --git a/table/src/main/java/tech/ydb/table/Session.java b/table/src/main/java/tech/ydb/table/Session.java index 819f2a659..7793c972e 100644 --- a/table/src/main/java/tech/ydb/table/Session.java +++ b/table/src/main/java/tech/ydb/table/Session.java @@ -9,8 +9,10 @@ import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcReadStream; +import tech.ydb.proto.ValueProtos; import tech.ydb.table.description.TableDescription; import tech.ydb.table.description.TableOptionDescription; +import tech.ydb.table.query.BulkUpsertData; import tech.ydb.table.query.DataQuery; import tech.ydb.table.query.DataQueryResult; import tech.ydb.table.query.ExplainDataQueryResult; @@ -18,6 +20,7 @@ import tech.ydb.table.query.ReadRowsResult; import tech.ydb.table.query.ReadTablePart; import tech.ydb.table.result.ResultSetReader; +import tech.ydb.table.result.impl.ProtoValueReaders; import tech.ydb.table.settings.AlterTableSettings; import tech.ydb.table.settings.BeginTxSettings; import tech.ydb.table.settings.BulkUpsertSettings; @@ -146,7 +149,24 @@ default CompletableFuture> beginTransaction(TxMode txMo GrpcReadStream executeReadTable(String tablePath, ReadTableSettings settings); - GrpcReadStream executeScanQuery(String query, Params params, ExecuteScanQuerySettings settings); + GrpcReadStream executeScanQueryRaw(String query, Params params, + ExecuteScanQuerySettings settings); + + default GrpcReadStream executeScanQuery(String query, Params params, + ExecuteScanQuerySettings settings) { + GrpcReadStream stream = executeScanQueryRaw(query, params, settings); + return new GrpcReadStream() { + @Override + public CompletableFuture start(GrpcReadStream.Observer observer) { + return stream.start(part -> observer.onNext(ProtoValueReaders.forResultSet(part))); + } + + @Override + public void cancel() { + stream.cancel(); + } + }; + } @Deprecated default GrpcReadStream readTable(String tablePath, ReadTableSettings settings) { @@ -178,7 +198,15 @@ default CompletableFuture executeScanQuery(String query, Params params, CompletableFuture> keepAlive(KeepAliveSessionSettings settings); - CompletableFuture executeBulkUpsert(String tablePath, ListValue rows, BulkUpsertSettings settings); + default CompletableFuture executeBulkUpsert(String tablePath, ListValue rows, BulkUpsertSettings settings) { + return executeBulkUpsert(tablePath, new BulkUpsertData(rows), settings); + } + + CompletableFuture executeBulkUpsert(String tablePath, BulkUpsertData data, BulkUpsertSettings settings); + + default CompletableFuture executeBulkUpsert(String tablePath, BulkUpsertData data) { + return executeBulkUpsert(tablePath, data, new BulkUpsertSettings()); + } default CompletableFuture createTable(String path, TableDescription tableDescriptions) { return createTable(path, tableDescriptions, new CreateTableSettings()); diff --git a/table/src/main/java/tech/ydb/table/impl/BaseSession.java b/table/src/main/java/tech/ydb/table/impl/BaseSession.java index 62df08568..975956f10 100644 --- a/table/src/main/java/tech/ydb/table/impl/BaseSession.java +++ b/table/src/main/java/tech/ydb/table/impl/BaseSession.java @@ -57,14 +57,13 @@ import tech.ydb.table.description.TableIndex; import tech.ydb.table.description.TableOptionDescription; import tech.ydb.table.description.TableTtl; +import tech.ydb.table.query.BulkUpsertData; import tech.ydb.table.query.DataQuery; import tech.ydb.table.query.DataQueryResult; import tech.ydb.table.query.ExplainDataQueryResult; import tech.ydb.table.query.Params; import tech.ydb.table.query.ReadRowsResult; import tech.ydb.table.query.ReadTablePart; -import tech.ydb.table.result.ResultSetReader; -import tech.ydb.table.result.impl.ProtoValueReaders; import tech.ydb.table.rpc.TableRpc; import tech.ydb.table.settings.AlterTableSettings; import tech.ydb.table.settings.AutoPartitioningPolicy; @@ -99,7 +98,6 @@ import tech.ydb.table.transaction.Transaction; import tech.ydb.table.transaction.TxControl; import tech.ydb.table.values.ListType; -import tech.ydb.table.values.ListValue; import tech.ydb.table.values.PrimitiveValue; import tech.ydb.table.values.StructValue; import tech.ydb.table.values.TupleValue; @@ -1266,23 +1264,23 @@ public GrpcReadStream executeReadTable(String tablePath, ReadTabl .setBatchLimitBytes(settings.batchLimitBytes()) .setBatchLimitRows(settings.batchLimitRows()); - Value fromKey = settings.getFromKey(); + ValueProtos.TypedValue fromKey = settings.getFromKeyRaw(); if (fromKey != null) { YdbTable.KeyRange.Builder range = request.getKeyRangeBuilder(); if (settings.isFromInclusive()) { - range.setGreaterOrEqual(ProtoValue.toTypedValue(fromKey)); + range.setGreaterOrEqual(fromKey); } else { - range.setGreater(ProtoValue.toTypedValue(fromKey)); + range.setGreater(fromKey); } } - Value toKey = settings.getToKey(); + ValueProtos.TypedValue toKey = settings.getToKeyRaw(); if (toKey != null) { YdbTable.KeyRange.Builder range = request.getKeyRangeBuilder(); if (settings.isToInclusive()) { - range.setLessOrEqual(ProtoValue.toTypedValue(toKey)); + range.setLessOrEqual(toKey); } else { - range.setLess(ProtoValue.toTypedValue(toKey)); + range.setLess(toKey); } } @@ -1309,13 +1307,13 @@ List readIssues(YdbTable.ReadTableResponse message @Override ReadTablePart readValue(YdbTable.ReadTableResponse message) { - return new ReadTablePart(message.getResult(), message.getSnapshot()); + return new ReadTablePart(message); } }; } @Override - public GrpcReadStream executeScanQuery(String query, Params params, + public GrpcReadStream executeScanQueryRaw(String query, Params params, ExecuteScanQuerySettings settings) { YdbTable.ExecuteScanQueryRequest req = YdbTable.ExecuteScanQueryRequest.newBuilder() .setQuery(YdbTable.Query.newBuilder().setYqlText(query)) @@ -1330,7 +1328,7 @@ public GrpcReadStream executeScanQuery(String query, Params par } GrpcReadStream origin = rpc.streamExecuteScanQuery(req, opts.build()); - return new ProxyStream(origin) { + return new ProxyStream(origin) { @Override StatusIds.StatusCode readStatusCode(YdbTable.ExecuteScanQueryPartialResponse message) { return message.getStatus(); @@ -1342,8 +1340,8 @@ List readIssues(YdbTable.ExecuteScanQueryPartialRe } @Override - ResultSetReader readValue(YdbTable.ExecuteScanQueryPartialResponse message) { - return ProtoValueReaders.forResultSet(message.getResult().getResultSet()); + ValueProtos.ResultSet readValue(YdbTable.ExecuteScanQueryPartialResponse message) { + return message.getResult().getResultSet(); } }; } @@ -1394,19 +1392,13 @@ public CompletableFuture> keepAlive(KeepAliveSessionSettings setti } @Override - public CompletableFuture executeBulkUpsert(String tablePath, ListValue rows, BulkUpsertSettings settings) { - ValueProtos.TypedValue typedRows = ValueProtos.TypedValue.newBuilder() - .setType(rows.getType().toPb()) - .setValue(rows.toPb()) - .build(); - - YdbTable.BulkUpsertRequest request = YdbTable.BulkUpsertRequest.newBuilder() + public CompletableFuture executeBulkUpsert(String tablePath, BulkUpsertData data, BulkUpsertSettings st) { + YdbTable.BulkUpsertRequest.Builder request = YdbTable.BulkUpsertRequest.newBuilder() .setTable(tablePath) - .setRows(typedRows) - .setOperationParams(Operation.buildParams(settings.toOperationSettings())) - .build(); + .setOperationParams(Operation.buildParams(st.toOperationSettings())); - return interceptStatus(rpc.bulkUpsert(request, makeOptions(settings).build())); + data.applyToRequest(request); + return interceptStatus(rpc.bulkUpsert(request.build(), makeOptions(st).build())); } private static State mapSessionStatus(YdbTable.KeepAliveResult result) { diff --git a/table/src/main/java/tech/ydb/table/query/BulkUpsertData.java b/table/src/main/java/tech/ydb/table/query/BulkUpsertData.java new file mode 100644 index 000000000..73d170ffc --- /dev/null +++ b/table/src/main/java/tech/ydb/table/query/BulkUpsertData.java @@ -0,0 +1,29 @@ +package tech.ydb.table.query; + + +import tech.ydb.proto.ValueProtos; +import tech.ydb.proto.table.YdbTable; +import tech.ydb.table.values.ListValue; + +/** + * + * @author Aleksandr Gorshenin + */ +public class BulkUpsertData { + private final ValueProtos.TypedValue rows; + + public BulkUpsertData(ListValue rows) { + this.rows = ValueProtos.TypedValue.newBuilder() + .setType(rows.getType().toPb()) + .setValue(rows.toPb()) + .build(); + } + + public BulkUpsertData(ValueProtos.TypedValue rows) { + this.rows = rows; + } + + public void applyToRequest(YdbTable.BulkUpsertRequest.Builder builder) { + builder.setRows(rows); + } +} diff --git a/table/src/main/java/tech/ydb/table/query/DataQueryResult.java b/table/src/main/java/tech/ydb/table/query/DataQueryResult.java index a489aeab1..6cdd187bc 100644 --- a/table/src/main/java/tech/ydb/table/query/DataQueryResult.java +++ b/table/src/main/java/tech/ydb/table/query/DataQueryResult.java @@ -35,6 +35,10 @@ public ResultSetReader getResultSet(int index) { return ProtoValueReaders.forResultSet(resultSets.get(index)); } + public ValueProtos.ResultSet getRawResultSet(int index) { + return resultSets.get(index); + } + public boolean isTruncated(int index) { return resultSets.get(index).getTruncated(); } diff --git a/table/src/main/java/tech/ydb/table/query/ReadTablePart.java b/table/src/main/java/tech/ydb/table/query/ReadTablePart.java index b4006b8ec..07cecde2e 100644 --- a/table/src/main/java/tech/ydb/table/query/ReadTablePart.java +++ b/table/src/main/java/tech/ydb/table/query/ReadTablePart.java @@ -27,19 +27,30 @@ public long getTxId() { } } - private final ResultSetReader resultSetReader; - private final VirtualTimestamp timestamp; + private final YdbTable.ReadTableResponse part; + private ResultSetReader resultSetReader = null; + private VirtualTimestamp timestamp = null; - public ReadTablePart(YdbTable.ReadTableResult result, CommonProtos.VirtualTimestamp snapshot) { - this.resultSetReader = ProtoValueReaders.forResultSet(result.getResultSet()); - this.timestamp = new VirtualTimestamp(snapshot.getPlanStep(), snapshot.getTxId()); + public ReadTablePart(YdbTable.ReadTableResponse part) { + this.part = part; + } + + public YdbTable.ReadTableResponse getReadTableResponse() { + return part; } public ResultSetReader getResultSetReader() { + if (resultSetReader == null) { + resultSetReader = ProtoValueReaders.forResultSet(part.getResult().getResultSet()); + } return resultSetReader; } public VirtualTimestamp getVirtualTimestamp() { + if (timestamp == null) { + CommonProtos.VirtualTimestamp vt = part.getSnapshot(); + timestamp = new VirtualTimestamp(vt.getPlanStep(), vt.getTxId()); + } return timestamp; } } diff --git a/table/src/main/java/tech/ydb/table/result/impl/ProtoValueReaders.java b/table/src/main/java/tech/ydb/table/result/impl/ProtoValueReaders.java index 026bccfbb..a27833aa5 100644 --- a/table/src/main/java/tech/ydb/table/result/impl/ProtoValueReaders.java +++ b/table/src/main/java/tech/ydb/table/result/impl/ProtoValueReaders.java @@ -40,10 +40,17 @@ public static ResultSetReader forResultSets(Collection resultSe return new ProtoResultSetReader(builder.build()); } + @Deprecated public static ValueReader forType(ValueProtos.Type type) { return forTypeImpl(type); } + public static ValueReader forTypedValue(ValueProtos.TypedValue tv) { + AbstractValueReader vr = forTypeImpl(tv.getType()); + vr.setProtoValue(tv.getValue()); + return vr; + } + static AbstractValueReader forTypeImpl(ValueProtos.Type type) { switch (type.getTypeCase()) { case TYPE_ID: diff --git a/table/src/main/java/tech/ydb/table/settings/ReadTableSettings.java b/table/src/main/java/tech/ydb/table/settings/ReadTableSettings.java index 61695f362..f29a03d51 100644 --- a/table/src/main/java/tech/ydb/table/settings/ReadTableSettings.java +++ b/table/src/main/java/tech/ydb/table/settings/ReadTableSettings.java @@ -11,8 +11,11 @@ import tech.ydb.core.grpc.GrpcFlowControl; import tech.ydb.core.settings.BaseRequestSettings; +import tech.ydb.proto.ValueProtos; +import tech.ydb.table.result.impl.ProtoValueReaders; import tech.ydb.table.values.PrimitiveValue; import tech.ydb.table.values.TupleValue; +import tech.ydb.table.values.proto.ProtoValue; import static com.google.common.base.Preconditions.checkArgument; @@ -24,9 +27,9 @@ public class ReadTableSettings extends BaseRequestSettings { private final boolean ordered; - private final TupleValue fromKey; + private final ValueProtos.TypedValue fromKey; private final boolean fromInclusive; - private final TupleValue toKey; + private final ValueProtos.TypedValue toKey; private final boolean toInclusive; private final int rowLimit; private final ImmutableList columns; @@ -58,6 +61,11 @@ public boolean isOrdered() { @Nullable public TupleValue getFromKey() { + return fromKey == null ? null : ProtoValueReaders.forTypedValue(fromKey).getValue().asTuple(); + } + + @Nullable + public ValueProtos.TypedValue getFromKeyRaw() { return fromKey; } @@ -67,6 +75,11 @@ public boolean isFromInclusive() { @Nullable public TupleValue getToKey() { + return toKey == null ? null : ProtoValueReaders.forTypedValue(toKey).getValue().asTuple(); + } + + @Nullable + public ValueProtos.TypedValue getToKeyRaw() { return toKey; } @@ -99,9 +112,9 @@ public GrpcFlowControl getGrpcFlowControl() { */ public static final class Builder extends BaseBuilder { private boolean ordered = false; - private TupleValue fromKey = null; + private ValueProtos.TypedValue fromKey = null; private boolean fromInclusive = false; - private TupleValue toKey = null; + private ValueProtos.TypedValue toKey = null; private boolean toInclusive = false; private int rowLimit = 0; private List columns = Collections.emptyList(); @@ -115,12 +128,24 @@ public Builder orderedRead(boolean ordered) { } public Builder fromKey(TupleValue value, boolean inclusive) { + this.fromKey = ProtoValue.toTypedValue(value); + this.fromInclusive = inclusive; + return this; + } + + public Builder fromKey(ValueProtos.TypedValue value, boolean inclusive) { this.fromKey = value; this.fromInclusive = inclusive; return this; } public Builder toKey(TupleValue value, boolean inclusive) { + this.toKey = ProtoValue.toTypedValue(value); + this.toInclusive = inclusive; + return this; + } + + public Builder toKey(ValueProtos.TypedValue value, boolean inclusive) { this.toKey = value; this.toInclusive = inclusive; return this; diff --git a/table/src/main/java/tech/ydb/table/values/Value.java b/table/src/main/java/tech/ydb/table/values/Value.java index 91ed8df7f..cd3cecb34 100644 --- a/table/src/main/java/tech/ydb/table/values/Value.java +++ b/table/src/main/java/tech/ydb/table/values/Value.java @@ -44,6 +44,10 @@ default StructValue asStruct() { return (StructValue) this; } + default TupleValue asTuple() { + return (TupleValue) this; + } + default VariantValue asVariant() { return (VariantValue) this; } diff --git a/table/src/main/java/tech/ydb/table/values/proto/ProtoType.java b/table/src/main/java/tech/ydb/table/values/proto/ProtoType.java index fb2b39274..05aac187a 100644 --- a/table/src/main/java/tech/ydb/table/values/proto/ProtoType.java +++ b/table/src/main/java/tech/ydb/table/values/proto/ProtoType.java @@ -2,7 +2,9 @@ import java.util.Arrays; +import java.util.List; +import com.google.common.base.Preconditions; import com.google.protobuf.NullValue; import tech.ydb.proto.ValueProtos; @@ -219,6 +221,13 @@ public static ValueProtos.Type getOptional(ValueProtos.Type itemType) { return builder.build(); } + public static ValueProtos.StructMember getStructMember(String memberName, ValueProtos.Type memberType) { + return ValueProtos.StructMember.newBuilder() + .setName(memberName) + .setType(memberType) + .build(); + } + public static ValueProtos.Type getStruct(String memberName, ValueProtos.Type memberType) { ValueProtos.Type.Builder builder = ValueProtos.Type.newBuilder(); ValueProtos.StructType.Builder structType = builder.getStructTypeBuilder(); @@ -321,6 +330,13 @@ public static ValueProtos.Type getStruct(ValueProtos.StructMember firstMember, return builder.build(); } + public static ValueProtos.Type getStruct(List members) { + Preconditions.checkArgument(!members.isEmpty(), "Struct members cannot be empty"); + ValueProtos.Type.Builder builder = ValueProtos.Type.newBuilder(); + builder.getStructTypeBuilder().addAllMembers(members); + return builder.build(); + } + public static ValueProtos.Type getTuple() { return EMPTY_TUPLE; } @@ -334,6 +350,13 @@ public static ValueProtos.Type getTuple(ValueProtos.Type... elementTypes) { return builder.build(); } + public static ValueProtos.Type getTuple(List elements) { + Preconditions.checkArgument(!elements.isEmpty(), "Tuple elements cannot be empty"); + ValueProtos.Type.Builder builder = ValueProtos.Type.newBuilder(); + builder.getTupleTypeBuilder().addAllElements(elements); + return builder.build(); + } + public static ValueProtos.Type getVariant(ValueProtos.StructType structType) { ValueProtos.Type.Builder builder = ValueProtos.Type.newBuilder(); builder.getVariantTypeBuilder().setStructItems(structType); diff --git a/table/src/main/java/tech/ydb/table/values/proto/ProtoValue.java b/table/src/main/java/tech/ydb/table/values/proto/ProtoValue.java index 7eef37bb5..af2883c54 100644 --- a/table/src/main/java/tech/ydb/table/values/proto/ProtoValue.java +++ b/table/src/main/java/tech/ydb/table/values/proto/ProtoValue.java @@ -884,6 +884,20 @@ public static ValueProtos.TypedValue toTypedValue(Value p) { .build(); } + public static ValueProtos.TypedValue toTypedValue(ValueProtos.Type type, ValueProtos.Value value) { + return ValueProtos.TypedValue.newBuilder() + .setType(type) + .setValue(value) + .build(); + } + + public static ValueProtos.TypedValue toTypedValue(ValueProtos.Type type, ValueProtos.Value.Builder value) { + return ValueProtos.TypedValue.newBuilder() + .setType(type) + .setValue(value) + .build(); + } + public static PrimitiveValue newUuid(long high, long low) { return new Uuid(high, low); } diff --git a/table/src/test/java/tech/ydb/table/SessionStub.java b/table/src/test/java/tech/ydb/table/SessionStub.java index 2d9eb0da0..b94d1752b 100644 --- a/table/src/test/java/tech/ydb/table/SessionStub.java +++ b/table/src/test/java/tech/ydb/table/SessionStub.java @@ -8,15 +8,16 @@ import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.core.utils.FutureTools; +import tech.ydb.proto.ValueProtos; import tech.ydb.table.description.TableDescription; import tech.ydb.table.description.TableOptionDescription; +import tech.ydb.table.query.BulkUpsertData; import tech.ydb.table.query.DataQuery; import tech.ydb.table.query.DataQueryResult; import tech.ydb.table.query.ExplainDataQueryResult; import tech.ydb.table.query.Params; import tech.ydb.table.query.ReadRowsResult; import tech.ydb.table.query.ReadTablePart; -import tech.ydb.table.result.ResultSetReader; import tech.ydb.table.settings.AlterTableSettings; import tech.ydb.table.settings.BeginTxSettings; import tech.ydb.table.settings.BulkUpsertSettings; @@ -40,7 +41,6 @@ import tech.ydb.table.transaction.TableTransaction; import tech.ydb.table.transaction.Transaction; import tech.ydb.table.transaction.TxControl; -import tech.ydb.table.values.ListValue; /** @@ -158,7 +158,7 @@ public GrpcReadStream executeReadTable(String tablePath, ReadTabl } @Override - public GrpcReadStream executeScanQuery(String query, Params params, ExecuteScanQuerySettings settings) { + public GrpcReadStream executeScanQueryRaw(String query, Params params, ExecuteScanQuerySettings settings) { throw new UnsupportedOperationException("executeScanQuery not implemented"); } @@ -178,7 +178,7 @@ public CompletableFuture> keepAlive(KeepAliveSessionSettin } @Override - public CompletableFuture executeBulkUpsert(String tablePath, ListValue rows, BulkUpsertSettings settings) { + public CompletableFuture executeBulkUpsert(String tablePath, BulkUpsertData rows, BulkUpsertSettings settings) { return notImplemented("bulkUpsert()"); } diff --git a/table/src/test/java/tech/ydb/table/integration/ReadTableTest.java b/table/src/test/java/tech/ydb/table/integration/ReadTableTest.java index e661947ac..1d736c3af 100644 --- a/table/src/test/java/tech/ydb/table/integration/ReadTableTest.java +++ b/table/src/test/java/tech/ydb/table/integration/ReadTableTest.java @@ -23,19 +23,23 @@ import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcFlowControl; import tech.ydb.core.grpc.GrpcReadStream; +import tech.ydb.proto.table.YdbTable; import tech.ydb.table.Session; import tech.ydb.table.SessionRetryContext; import tech.ydb.table.description.TableDescription; import tech.ydb.table.impl.SimpleTableClient; +import tech.ydb.table.query.BulkUpsertData; import tech.ydb.table.query.ReadTablePart; +import tech.ydb.table.result.ResultSetReader; import tech.ydb.table.rpc.grpc.GrpcTableRpc; -import tech.ydb.table.settings.BulkUpsertSettings; import tech.ydb.table.settings.ReadTableSettings; import tech.ydb.table.values.ListType; import tech.ydb.table.values.PrimitiveType; import tech.ydb.table.values.PrimitiveValue; import tech.ydb.table.values.StructType; import tech.ydb.table.values.StructValue; +import tech.ydb.table.values.TupleValue; +import tech.ydb.table.values.proto.ProtoValue; import tech.ydb.test.junit4.GrpcTransportRule; /** @@ -94,9 +98,9 @@ public static void prepareTable() { ); }).collect(Collectors.toList()); - retryCtx.supplyStatus(session -> session.executeBulkUpsert( - tablePath, ListType.of(batchType).newValue(batchData), new BulkUpsertSettings()) - ).join().expectSuccess("bulk upsert problem in table " + tablePath); + retryCtx.supplyStatus(session -> session.executeBulkUpsert(tablePath, + new BulkUpsertData(ProtoValue.toTypedValue(ListType.of(batchType).newValue(batchData))) + )).join().expectSuccess("bulk upsert problem in table " + tablePath); } @AfterClass @@ -114,6 +118,20 @@ public void readTableTest() { retryCtx.supplyStatus(session -> { rowsRead.set(0); return session.executeReadTable(tablePath, rts).start(part -> { + YdbTable.ReadTableResponse proto = part.getReadTableResponse(); + ResultSetReader rsr = part.getResultSetReader(); + ReadTablePart.VirtualTimestamp vt = part.getVirtualTimestamp(); + + Assert.assertNotNull(proto); + Assert.assertNotNull(rsr); + Assert.assertNotNull(vt); + + Assert.assertSame(rsr, part.getResultSetReader()); + Assert.assertSame(vt, part.getVirtualTimestamp()); + + Assert.assertEquals(proto.getSnapshot().getPlanStep(), vt.getPlanStep()); + Assert.assertEquals(proto.getSnapshot().getTxId(), vt.getTxId()); + rowsRead.addAndGet(part.getResultSetReader().getRowCount()); }); }).join().expectSuccess("Cannot read table " + tablePath); @@ -127,6 +145,13 @@ public void limitedReadTableTest() { AtomicLong rewsRead = new AtomicLong(0); ReadTableSettings rts = ReadTableSettings.newBuilder().column("id").batchLimitRows(100).build(); + Assert.assertNull(rts.getFromKey()); + Assert.assertNull(rts.getToKey()); + Assert.assertNull(rts.getFromKeyRaw()); + Assert.assertNull(rts.getToKeyRaw()); + Assert.assertEquals(0, rts.batchLimitBytes()); + Assert.assertEquals(100, rts.batchLimitRows()); + retryCtx.supplyStatus(session -> { rewsRead.set(0); return session.executeReadTable(tablePath, rts).start(part -> { @@ -143,10 +168,20 @@ public void partialReadTableTest() { String tablePath = tablePath(TEST_TABLE); AtomicLong rowsRead = new AtomicLong(0); + PrimitiveValue from = PrimitiveValue.newInt64(1); + PrimitiveValue to = PrimitiveValue.newInt64(TEST_TABLE_SIZE); ReadTableSettings rts = ReadTableSettings.newBuilder().column("id") - .fromKeyExclusive(PrimitiveValue.newInt64(1)) - .toKeyExclusive(PrimitiveValue.newInt64(TEST_TABLE_SIZE)) + .fromKeyExclusive(from) // always coverted to optional type + .toKeyExclusive(to) .build(); + + Assert.assertEquals(TupleValue.of(from.makeOptional()), rts.getFromKey()); + Assert.assertEquals(TupleValue.of(to.makeOptional()), rts.getToKey()); + Assert.assertEquals(ProtoValue.toTypedValue(TupleValue.of(from.makeOptional())), rts.getFromKeyRaw()); + Assert.assertEquals(ProtoValue.toTypedValue(TupleValue.of(to.makeOptional())), rts.getToKeyRaw()); + Assert.assertEquals(0, rts.batchLimitBytes()); + Assert.assertEquals(0, rts.batchLimitRows()); + retryCtx.supplyStatus(session -> { rowsRead.set(0); return session.executeReadTable(tablePath, rts).start(part -> { diff --git a/table/src/test/java/tech/ydb/table/values/proto/ProtoTypeTest.java b/table/src/test/java/tech/ydb/table/values/proto/ProtoTypeTest.java new file mode 100644 index 000000000..7d9bc7ebb --- /dev/null +++ b/table/src/test/java/tech/ydb/table/values/proto/ProtoTypeTest.java @@ -0,0 +1,55 @@ +package tech.ydb.table.values.proto; + +import java.util.ArrayList; +import java.util.Arrays; + +import org.junit.Assert; +import org.junit.Test; + +import tech.ydb.proto.ValueProtos; +import tech.ydb.table.values.PrimitiveType; +import tech.ydb.table.values.StructType; +import tech.ydb.table.values.TupleType; + +/** + * + * @author Aleksandr Gorshenin + */ +public class ProtoTypeTest { + + @Test + public void getStructTest() { + StructType s1 = StructType.of("a", PrimitiveType.Text, "b", PrimitiveType.Bytes); + + ValueProtos.StructMember sm1 = ProtoType.getStructMember("a", ProtoType.getText()); + ValueProtos.StructMember sm2 = ProtoType.getStructMember("b", ProtoType.getBytes()); + + Assert.assertEquals(s1.toPb(), ProtoType.getStruct(sm1, sm2)); + Assert.assertEquals(s1.toPb(), ProtoType.getStruct(Arrays.asList(sm1, sm2))); + + Assert.assertNotEquals(s1.toPb(), ProtoType.getStruct(sm2, sm1)); + Assert.assertNotEquals(s1.toPb(), ProtoType.getStruct(Arrays.asList(sm2, sm1))); + + IllegalArgumentException ex = Assert.assertThrows(IllegalArgumentException.class, + () -> ProtoType.getStruct(new ArrayList<>()) + ); + Assert.assertEquals("Struct members cannot be empty", ex.getMessage()); + } + + @Test + public void getTupleTest() { + TupleType t1 = TupleType.ofOwn(PrimitiveType.Text, PrimitiveType.Bytes); + + Assert.assertEquals(t1.toPb(), ProtoType.getTuple(ProtoType.getText(), ProtoType.getBytes())); + Assert.assertEquals(t1.toPb(), ProtoType.getTuple(Arrays.asList(ProtoType.getText(), ProtoType.getBytes()))); + + Assert.assertNotEquals(t1.toPb(), ProtoType.getTuple(ProtoType.getBytes(), ProtoType.getText())); + Assert.assertNotEquals(t1.toPb(), ProtoType.getTuple(Arrays.asList(ProtoType.getBytes(), ProtoType.getText()))); + + + IllegalArgumentException ex = Assert.assertThrows(IllegalArgumentException.class, + () -> ProtoType.getTuple(new ArrayList<>()) + ); + Assert.assertEquals("Tuple elements cannot be empty", ex.getMessage()); + } +} diff --git a/table/src/test/java/tech/ydb/table/values/proto/ProtoValueTest.java b/table/src/test/java/tech/ydb/table/values/proto/ProtoValueTest.java index ae60bc6b9..6dcafb15f 100644 --- a/table/src/test/java/tech/ydb/table/values/proto/ProtoValueTest.java +++ b/table/src/test/java/tech/ydb/table/values/proto/ProtoValueTest.java @@ -9,6 +9,7 @@ import org.junit.Assert; import org.junit.Test; +import tech.ydb.proto.ValueProtos; import tech.ydb.table.values.PrimitiveValue; @@ -81,4 +82,19 @@ public void uuid() { Assert.assertEquals(low, u1.getUuidLow()); Assert.assertEquals(high, u1.getUuidHigh()); } + + @Test + public void toTypedValueTest() { + PrimitiveValue v1 = PrimitiveValue.newDouble(1.5432f); + + ValueProtos.TypedValue tv1 = ProtoValue.toTypedValue(v1); + ValueProtos.TypedValue tv2 = ProtoValue.toTypedValue(v1.getType().toPb(), v1.toPb()); + ValueProtos.TypedValue tv3 = ProtoValue.toTypedValue( + ValueProtos.Type.newBuilder().setTypeId(ValueProtos.Type.PrimitiveTypeId.DOUBLE).build(), + ValueProtos.Value.newBuilder().setDoubleValue(1.5432f) + ); + + Assert.assertEquals(tv1, tv2); + Assert.assertEquals(tv1, tv3); + } }