Skip to content

Commit 2439d67

Browse files
authored
Merge pull request #1675 from robdrynkin/read_rows_from_table_client
Add ReadRows to the table.Client
2 parents feae7e6 + 8fecc48 commit 2439d67

File tree

6 files changed

+124
-35
lines changed

6 files changed

+124
-35
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added `table.Client.ReadRows` method with internal retries
2+
13
## v3.100.3
24
* Fixed bug with concurrent rewrites source slice of `grpc.DialOption` on dial step
35

examples/read_table/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ import (
44
"context"
55
"flag"
66
"fmt"
7-
"github.com/ydb-platform/ydb-go-sdk/v3/table"
87
"log"
98
"os"
109
"path"
1110
"sync"
1211

1312
environ "github.com/ydb-platform/ydb-go-sdk-auth-environ"
1413
ydb "github.com/ydb-platform/ydb-go-sdk/v3"
14+
"github.com/ydb-platform/ydb-go-sdk/v3/table"
1515
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
1616
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
1717
)

internal/table/client.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,22 @@ import (
55

66
"github.com/jonboulle/clockwork"
77
"github.com/ydb-platform/ydb-go-genproto/Ydb_Table_V1"
8+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
9+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Table"
810
"google.golang.org/grpc"
911

1012
"github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator"
1113
"github.com/ydb-platform/ydb-go-sdk/v3/internal/pool"
1214
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
1315
"github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config"
16+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/table/scanner"
17+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/value"
1418
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
1519
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1620
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
1721
"github.com/ydb-platform/ydb-go-sdk/v3/table"
22+
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
23+
"github.com/ydb-platform/ydb-go-sdk/v3/table/result"
1824
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
1925
)
2026

@@ -332,6 +338,87 @@ func (c *Client) BulkUpsert(
332338
return nil
333339
}
334340

341+
func makeReadRowsRequest(
342+
a *allocator.Allocator,
343+
sessionID string,
344+
path string,
345+
keys value.Value,
346+
readRowOpts []options.ReadRowsOption,
347+
) *Ydb_Table.ReadRowsRequest {
348+
request := Ydb_Table.ReadRowsRequest{
349+
SessionId: sessionID,
350+
Path: path,
351+
Keys: value.ToYDB(keys, a),
352+
}
353+
for _, opt := range readRowOpts {
354+
if opt != nil {
355+
opt.ApplyReadRowsOption((*options.ReadRowsDesc)(&request), a)
356+
}
357+
}
358+
359+
return &request
360+
}
361+
362+
func makeReadRowsResponse(response *Ydb_Table.ReadRowsResponse, err error, isTruncated bool) (result.Result, error) {
363+
if err != nil {
364+
return nil, xerrors.WithStackTrace(err)
365+
}
366+
367+
if response.GetStatus() != Ydb.StatusIds_SUCCESS {
368+
return nil, xerrors.WithStackTrace(
369+
xerrors.FromOperation(response),
370+
)
371+
}
372+
373+
return scanner.NewUnary(
374+
[]*Ydb.ResultSet{response.GetResultSet()},
375+
nil,
376+
scanner.WithIgnoreTruncated(isTruncated),
377+
), nil
378+
}
379+
380+
func (c *Client) ReadRows(
381+
ctx context.Context,
382+
path string,
383+
keys value.Value,
384+
readRowOpts []options.ReadRowsOption,
385+
retryOptions ...table.Option,
386+
) (_ result.Result, err error) {
387+
var (
388+
a = allocator.New()
389+
request = makeReadRowsRequest(a, "", path, keys, readRowOpts)
390+
response *Ydb_Table.ReadRowsResponse
391+
)
392+
defer func() {
393+
a.Free()
394+
}()
395+
396+
client := Ydb_Table_V1.NewTableServiceClient(c.cc)
397+
398+
attempts, config := 0, c.retryOptions(retryOptions...)
399+
config.RetryOptions = append(config.RetryOptions,
400+
retry.WithIdempotent(true),
401+
retry.WithTrace(&trace.Retry{
402+
OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopDoneInfo) {
403+
return func(info trace.RetryLoopDoneInfo) {
404+
attempts = info.Attempts
405+
}
406+
},
407+
}),
408+
)
409+
err = retry.Retry(ctx,
410+
func(ctx context.Context) (err error) {
411+
attempts++
412+
response, err = client.ReadRows(ctx, request)
413+
414+
return err
415+
},
416+
config.RetryOptions...,
417+
)
418+
419+
return makeReadRowsResponse(response, err, c.config.IgnoreTruncated())
420+
}
421+
335422
func executeTxOperation(ctx context.Context, c *Client, op table.TxOperation, tx table.Transaction) (err error) {
336423
if panicCallback := c.config.PanicCallback(); panicCallback != nil {
337424
defer func() {

internal/table/session.go

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1374,40 +1374,17 @@ func (s *Session) ReadRows(
13741374
opts ...options.ReadRowsOption,
13751375
) (_ result.Result, err error) {
13761376
var (
1377-
a = allocator.New()
1378-
request = Ydb_Table.ReadRowsRequest{
1379-
SessionId: s.id,
1380-
Path: path,
1381-
Keys: value.ToYDB(keys, a),
1382-
}
1377+
a = allocator.New()
1378+
request = makeReadRowsRequest(a, s.id, path, keys, opts)
13831379
response *Ydb_Table.ReadRowsResponse
13841380
)
13851381
defer func() {
13861382
a.Free()
13871383
}()
13881384

1389-
for _, opt := range opts {
1390-
if opt != nil {
1391-
opt.ApplyReadRowsOption((*options.ReadRowsDesc)(&request), a)
1392-
}
1393-
}
1385+
response, err = s.client.ReadRows(ctx, request)
13941386

1395-
response, err = s.client.ReadRows(ctx, &request)
1396-
if err != nil {
1397-
return nil, xerrors.WithStackTrace(err)
1398-
}
1399-
1400-
if response.GetStatus() != Ydb.StatusIds_SUCCESS {
1401-
return nil, xerrors.WithStackTrace(
1402-
xerrors.FromOperation(response),
1403-
)
1404-
}
1405-
1406-
return scanner.NewUnary(
1407-
[]*Ydb.ResultSet{response.GetResultSet()},
1408-
nil,
1409-
scanner.WithIgnoreTruncated(s.config.IgnoreTruncated()),
1410-
), nil
1387+
return makeReadRowsResponse(response, err, s.config.IgnoreTruncated())
14111388
}
14121389

14131390
// StreamExecuteScanQuery scan-reads table at given path with given options.

table/table.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,12 @@ type Client interface {
7676
// Returns success only when all rows were successfully upserted. In case of an error some rows might
7777
// be upserted and some might not.
7878
BulkUpsert(ctx context.Context, table string, data BulkUpsertData, opts ...Option) error
79+
80+
// ReadRows reads a batch of rows non-transactionally.
81+
ReadRows(
82+
ctx context.Context, path string, keys value.Value,
83+
readRowOpts []options.ReadRowsOption, retryOptions ...Option,
84+
) (_ result.Result, err error)
7985
}
8086

8187
type SessionStatus = string
@@ -161,6 +167,7 @@ type Session interface {
161167
opts ...options.BulkUpsertOption,
162168
) (err error)
163169

170+
// Deprecated: use Client instance instead.
164171
ReadRows(ctx context.Context, path string, keys value.Value,
165172
opts ...options.ReadRowsOption,
166173
) (_ result.Result, err error)

tests/integration/kv_test.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/ydb-platform/ydb-go-sdk/v3/internal/version"
1313
"github.com/ydb-platform/ydb-go-sdk/v3/table"
1414
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
15+
"github.com/ydb-platform/ydb-go-sdk/v3/table/result"
1516
"github.com/ydb-platform/ydb-go-sdk/v3/table/result/named"
1617
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
1718
)
@@ -39,13 +40,7 @@ func TestKeyValue(t *testing.T) {
3940
scope.Require.NoError(err)
4041

4142
// get
42-
err = driver.Table().Do(scope.Ctx, func(ctx context.Context, s table.Session) error {
43-
rows, err := s.ReadRows(ctx, tablePath,
44-
types.ListValue(types.StructValue(
45-
types.StructFieldValue("id", types.Int64Value(id)),
46-
)),
47-
options.ReadColumn("val"),
48-
)
43+
checkResult := func(ctx context.Context, rows result.Result, err error) error {
4944
if err != nil {
5045
return err
5146
}
@@ -70,6 +65,27 @@ func TestKeyValue(t *testing.T) {
7065
}
7166
t.Logf("%s[%d] = %q", tablePath, id, actualValue)
7267
return rows.Err()
68+
}
69+
70+
// session read rows
71+
err = driver.Table().Do(scope.Ctx, func(ctx context.Context, s table.Session) error {
72+
rows, err := s.ReadRows(ctx, tablePath,
73+
types.ListValue(types.StructValue(
74+
types.StructFieldValue("id", types.Int64Value(id)),
75+
)),
76+
options.ReadColumn("val"),
77+
)
78+
return checkResult(ctx, rows, err)
7379
})
7480
scope.Require.NoError(err)
81+
82+
// table client read rows
83+
rows, err := driver.Table().ReadRows(scope.Ctx, tablePath,
84+
types.ListValue(types.StructValue(
85+
types.StructFieldValue("id", types.Int64Value(id)),
86+
)),
87+
[]options.ReadRowsOption{options.ReadColumn("val")},
88+
)
89+
err = checkResult(scope.Ctx, rows, err)
90+
scope.Require.NoError(err)
7591
}

0 commit comments

Comments
 (0)