Skip to content

Commit 3d4624f

Browse files
authored
Merge pull request #691 from ydb-platform/call-options
* Added `table/options.WithCallOptions` options for append custom grp…
2 parents 52ea1bb + 5d14280 commit 3d4624f

13 files changed

+767
-70
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Added `table/options.WithCallOptions` options for append custom grpc call options into `session.{BulkUpsert,Execute,StreamExecuteScanQuery}`
12
* Supported fake transactions in `database/sql` driver over connector option `ydb.WithFakeTx(queryMode)` and connection string param `go_fake_tx`
23
* Removed `testutil/timeutil` package (all usages replaced with `clockwork` package)
34
* Changed behaviour of retryer on transport errors `cancelled` and `deadline exceeded` - will retry idempotent operation if context is not done

example_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ func Example_discovery() {
381381
}
382382
}
383383

384-
func Example_enableGzipCompression() {
384+
func Example_enableGzipCompressionForAllRequests() {
385385
ctx := context.TODO()
386386
db, err := ydb.Open(
387387
ctx,

internal/table/session.go

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -656,9 +656,10 @@ func (s *session) Execute(
656656
txr table.Transaction, r result.Result, err error,
657657
) {
658658
var (
659-
a = allocator.New()
660-
q = queryFromText(query)
661-
request = a.TableExecuteDataQueryRequest()
659+
a = allocator.New()
660+
q = queryFromText(query)
661+
request = a.TableExecuteDataQueryRequest()
662+
callOptions []grpc.CallOption
662663
)
663664
defer a.Free()
664665

@@ -676,23 +677,19 @@ func (s *session) Execute(
676677

677678
for _, opt := range opts {
678679
if opt != nil {
679-
opt((*options.ExecuteDataQueryDesc)(request), a)
680+
callOptions = append(callOptions, opt.ApplyExecuteDataQueryOption((*options.ExecuteDataQueryDesc)(request), a)...)
680681
}
681682
}
682683

683684
onDone := trace.TableOnSessionQueryExecute(
684-
s.config.Trace(),
685-
&ctx,
686-
s,
687-
q,
688-
params,
685+
s.config.Trace(), &ctx, s, q, params,
689686
request.QueryCachePolicy.GetKeepInCache(),
690687
)
691688
defer func() {
692689
onDone(txr, false, r, err)
693690
}()
694691

695-
result, err := s.executeDataQuery(ctx, a, request)
692+
result, err := s.executeDataQuery(ctx, a, request, callOptions...)
696693
if err != nil {
697694
return nil, nil, xerrors.WithStackTrace(err)
698695
}
@@ -727,6 +724,7 @@ func (s *session) executeQueryResult(
727724
// executeDataQuery executes data query.
728725
func (s *session) executeDataQuery(
729726
ctx context.Context, a *allocator.Allocator, request *Ydb_Table.ExecuteDataQueryRequest,
727+
callOptions ...grpc.CallOption,
730728
) (
731729
_ *Ydb_Table.ExecuteQueryResult,
732730
err error,
@@ -736,7 +734,7 @@ func (s *session) executeDataQuery(
736734
response *Ydb_Table.ExecuteDataQueryResponse
737735
)
738736

739-
response, err = s.tableService.ExecuteDataQuery(ctx, request)
737+
response, err = s.tableService.ExecuteDataQuery(ctx, request, callOptions...)
740738
if err != nil {
741739
return nil, xerrors.WithStackTrace(err)
742740
}
@@ -1008,7 +1006,8 @@ func (s *session) StreamExecuteScanQuery(
10081006
Parameters: params.Params().ToYDB(a),
10091007
Mode: Ydb_Table.ExecuteScanQueryRequest_MODE_EXEC, // set default
10101008
}
1011-
stream Ydb_Table_V1.TableService_StreamExecuteScanQueryClient
1009+
stream Ydb_Table_V1.TableService_StreamExecuteScanQueryClient
1010+
callOptions []grpc.CallOption
10121011
)
10131012
defer func() {
10141013
a.Free()
@@ -1019,13 +1018,13 @@ func (s *session) StreamExecuteScanQuery(
10191018

10201019
for _, opt := range opts {
10211020
if opt != nil {
1022-
opt((*options.ExecuteScanQueryDesc)(&request))
1021+
callOptions = append(callOptions, opt.ApplyExecuteScanQueryOption((*options.ExecuteScanQueryDesc)(&request))...)
10231022
}
10241023
}
10251024

10261025
ctx, cancel := context.WithCancel(ctx)
10271026

1028-
stream, err = s.tableService.StreamExecuteScanQuery(ctx, &request)
1027+
stream, err = s.tableService.StreamExecuteScanQuery(ctx, &request, callOptions...)
10291028

10301029
if err != nil {
10311030
cancel()
@@ -1065,9 +1064,19 @@ func (s *session) StreamExecuteScanQuery(
10651064
}
10661065

10671066
// BulkUpsert uploads given list of ydb struct values to the table.
1068-
func (s *session) BulkUpsert(ctx context.Context, table string, rows types.Value) (err error) {
1069-
a := allocator.New()
1067+
func (s *session) BulkUpsert(ctx context.Context, table string, rows types.Value,
1068+
opts ...options.BulkUpsertOption,
1069+
) (err error) {
1070+
var (
1071+
a = allocator.New()
1072+
callOptions []grpc.CallOption
1073+
)
10701074
defer a.Free()
1075+
1076+
for _, opt := range opts {
1077+
callOptions = append(callOptions, opt.ApplyBulkUpsertOption()...)
1078+
}
1079+
10711080
_, err = s.tableService.BulkUpsert(ctx,
10721081
&Ydb_Table.BulkUpsertRequest{
10731082
Table: table,
@@ -1079,6 +1088,7 @@ func (s *session) BulkUpsert(ctx context.Context, table string, rows types.Value
10791088
operation.ModeSync,
10801089
),
10811090
},
1091+
callOptions...,
10821092
)
10831093
return xerrors.WithStackTrace(err)
10841094
}

internal/table/statement.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
77
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Table"
8+
"google.golang.org/grpc"
89

910
"github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator"
1011
"github.com/ydb-platform/ydb-go-sdk/v3/internal/operation"
@@ -30,8 +31,9 @@ func (s *statement) Execute(
3031
txr table.Transaction, r result.Result, err error,
3132
) {
3233
var (
33-
a = allocator.New()
34-
request = a.TableExecuteDataQueryRequest()
34+
a = allocator.New()
35+
request = a.TableExecuteDataQueryRequest()
36+
callOptions []grpc.CallOption
3537
)
3638
defer a.Free()
3739

@@ -49,33 +51,30 @@ func (s *statement) Execute(
4951

5052
for _, opt := range opts {
5153
if opt != nil {
52-
opt((*options.ExecuteDataQueryDesc)(request), a)
54+
callOptions = append(callOptions, opt.ApplyExecuteDataQueryOption((*options.ExecuteDataQueryDesc)(request), a)...)
5355
}
5456
}
5557

5658
onDone := trace.TableOnSessionQueryExecute(
57-
s.session.config.Trace(),
58-
&ctx,
59-
s.session,
60-
s.query,
61-
params,
59+
s.session.config.Trace(), &ctx, s.session, s.query, params,
6260
request.QueryCachePolicy.GetKeepInCache(),
6361
)
6462
defer func() {
6563
onDone(txr, true, r, err)
6664
}()
6765

68-
return s.execute(ctx, a, request, request.TxControl)
66+
return s.execute(ctx, a, request, request.TxControl, callOptions...)
6967
}
7068

7169
// execute executes prepared query without any tracing.
7270
func (s *statement) execute(
7371
ctx context.Context, a *allocator.Allocator,
7472
request *Ydb_Table.ExecuteDataQueryRequest, txControl *Ydb_Table.TransactionControl,
73+
callOptions ...grpc.CallOption,
7574
) (
7675
txr table.Transaction, r result.Result, err error,
7776
) {
78-
res, err := s.session.executeDataQuery(ctx, a, request)
77+
res, err := s.session.executeDataQuery(ctx, a, request, callOptions...)
7978
if err != nil {
8079
return nil, nil, xerrors.WithStackTrace(err)
8180
}

internal/table/transaction.go

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -80,18 +80,9 @@ func (tx *transaction) ExecuteStatement(
8080
if params == nil {
8181
params = table.NewQueryParameters()
8282
}
83-
var (
84-
a = allocator.New()
85-
optsResult options.ExecuteDataQueryDesc
86-
)
83+
a := allocator.New()
8784
defer a.Free()
8885

89-
for _, f := range opts {
90-
if f != nil {
91-
f(&optsResult, a)
92-
}
93-
}
94-
9586
onDone := trace.TableOnSessionTransactionExecuteStatement(
9687
tx.s.config.Trace(),
9788
&ctx,

table/example_test.go

Lines changed: 152 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ import (
66
"path"
77
"time"
88

9+
"google.golang.org/grpc"
10+
"google.golang.org/grpc/encoding/gzip"
11+
912
"github.com/ydb-platform/ydb-go-sdk/v3"
1013
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
1114
"github.com/ydb-platform/ydb-go-sdk/v3/table"
@@ -110,18 +113,13 @@ func Example_bulkUpsert() {
110113
const batchSize = 10000
111114
logs := make([]logMessage, 0, batchSize)
112115
for i := 0; i < batchSize; i++ {
113-
message := logMessage{
116+
logs = append(logs, logMessage{
114117
App: fmt.Sprintf("App_%d", i/256),
115118
Host: fmt.Sprintf("192.168.0.%d", i%256),
116119
Timestamp: time.Now().Add(time.Millisecond * time.Duration(i%1000)),
117120
HTTPCode: 200,
118-
}
119-
if i%2 == 0 {
120-
message.Message = "GET / HTTP/1.1"
121-
} else {
122-
message.Message = "GET /images/logo.png HTTP/1.1"
123-
}
124-
logs = append(logs, message)
121+
Message: "GET / HTTP/1.1",
122+
})
125123
}
126124
// execute bulk upsert with native ydb data
127125
err = db.Table().Do( // Do retry operation on errors with best effort
@@ -250,3 +248,149 @@ func Example_lazyTransaction() {
250248
fmt.Printf("unexpected error: %v", err)
251249
}
252250
}
251+
252+
func Example_bulkUpsertWithCompression() {
253+
ctx := context.TODO()
254+
db, err := ydb.Open(ctx, "grpc://localhost:2136/local")
255+
if err != nil {
256+
fmt.Printf("failed connect: %v", err)
257+
return
258+
}
259+
defer db.Close(ctx) // cleanup resources
260+
type logMessage struct {
261+
App string
262+
Host string
263+
Timestamp time.Time
264+
HTTPCode uint32
265+
Message string
266+
}
267+
// prepare native go data
268+
const batchSize = 10000
269+
logs := make([]logMessage, 0, batchSize)
270+
for i := 0; i < batchSize; i++ {
271+
logs = append(logs, logMessage{
272+
App: fmt.Sprintf("App_%d", i/256),
273+
Host: fmt.Sprintf("192.168.0.%d", i%256),
274+
Timestamp: time.Now().Add(time.Millisecond * time.Duration(i%1000)),
275+
HTTPCode: 200,
276+
Message: "GET /images/logo.png HTTP/1.1",
277+
})
278+
}
279+
// execute bulk upsert with native ydb data
280+
err = db.Table().Do( // Do retry operation on errors with best effort
281+
ctx, // context manage exiting from Do
282+
func(ctx context.Context, s table.Session) (err error) { // retry operation
283+
rows := make([]types.Value, 0, len(logs))
284+
for _, msg := range logs {
285+
rows = append(rows, types.StructValue(
286+
types.StructFieldValue("App", types.TextValue(msg.App)),
287+
types.StructFieldValue("Host", types.TextValue(msg.Host)),
288+
types.StructFieldValue("Timestamp", types.TimestampValueFromTime(msg.Timestamp)),
289+
types.StructFieldValue("HTTPCode", types.Uint32Value(msg.HTTPCode)),
290+
types.StructFieldValue("Message", types.TextValue(msg.Message)),
291+
))
292+
}
293+
return s.BulkUpsert(ctx, "/local/bulk_upsert_example", types.ListValue(rows...),
294+
options.WithCallOptions(grpc.UseCompressor(gzip.Name)),
295+
)
296+
},
297+
table.WithIdempotent(),
298+
)
299+
if err != nil {
300+
fmt.Printf("unexpected error: %v", err)
301+
}
302+
}
303+
304+
func Example_dataQueryWithCompression() {
305+
ctx := context.TODO()
306+
db, err := ydb.Open(ctx, "grpc://localhost:2136/local")
307+
if err != nil {
308+
fmt.Printf("failed connect: %v", err)
309+
return
310+
}
311+
defer db.Close(ctx) // cleanup resources
312+
var (
313+
query = `SELECT 42 as id, "my string" as myStr`
314+
id int32 // required value
315+
myStr string // optional value
316+
)
317+
err = db.Table().Do( // Do retry operation on errors with best effort
318+
ctx, // context manage exiting from Do
319+
func(ctx context.Context, s table.Session) (err error) { // retry operation
320+
_, res, err := s.Execute(ctx, table.DefaultTxControl(), query, nil,
321+
options.WithCallOptions(
322+
grpc.UseCompressor(gzip.Name),
323+
),
324+
)
325+
if err != nil {
326+
return err // for auto-retry with driver
327+
}
328+
defer res.Close() // cleanup resources
329+
if err = res.NextResultSetErr(ctx); err != nil { // check single result set and switch to it
330+
return err // for auto-retry with driver
331+
}
332+
for res.NextRow() { // iterate over rows
333+
err = res.ScanNamed(
334+
named.Required("id", &id),
335+
named.OptionalWithDefault("myStr", &myStr),
336+
)
337+
if err != nil {
338+
return err // generally scan error not retryable, return it for driver check error
339+
}
340+
fmt.Printf("id=%v, myStr='%s'\n", id, myStr)
341+
}
342+
return res.Err() // return finally result error for auto-retry with driver
343+
},
344+
table.WithIdempotent(),
345+
)
346+
if err != nil {
347+
fmt.Printf("unexpected error: %v", err)
348+
}
349+
}
350+
351+
func Example_scanQueryWithCompression() {
352+
ctx := context.TODO()
353+
db, err := ydb.Open(ctx, "grpc://localhost:2136/local")
354+
if err != nil {
355+
fmt.Printf("failed connect: %v", err)
356+
return
357+
}
358+
defer db.Close(ctx) // cleanup resources
359+
var (
360+
query = `SELECT 42 as id, "my string" as myStr`
361+
id int32 // required value
362+
myStr string // optional value
363+
)
364+
err = db.Table().Do( // Do retry operation on errors with best effort
365+
ctx, // context manage exiting from Do
366+
func(ctx context.Context, s table.Session) (err error) { // retry operation
367+
res, err := s.StreamExecuteScanQuery(ctx, query, nil,
368+
options.WithCallOptions(
369+
grpc.UseCompressor(gzip.Name),
370+
),
371+
)
372+
if err != nil {
373+
return err // for auto-retry with driver
374+
}
375+
defer res.Close() // cleanup resources
376+
if err = res.NextResultSetErr(ctx); err != nil { // check single result set and switch to it
377+
return err // for auto-retry with driver
378+
}
379+
for res.NextRow() { // iterate over rows
380+
err = res.ScanNamed(
381+
named.Required("id", &id),
382+
named.OptionalWithDefault("myStr", &myStr),
383+
)
384+
if err != nil {
385+
return err // generally scan error not retryable, return it for driver check error
386+
}
387+
fmt.Printf("id=%v, myStr='%s'\n", id, myStr)
388+
}
389+
return res.Err() // return finally result error for auto-retry with driver
390+
},
391+
table.WithIdempotent(),
392+
)
393+
if err != nil {
394+
fmt.Printf("unexpected error: %v", err)
395+
}
396+
}

0 commit comments

Comments
 (0)