Skip to content

Commit 639eaec

Browse files
authored
Merge pull request #356 from ydb-platform/truncated
* Changed behavior on `result.Err()` on truncated result (return non-…
2 parents 7976cb5 + 0c18ee0 commit 639eaec

File tree

9 files changed

+132
-62
lines changed

9 files changed

+132
-62
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Changed behavior on `result.Err()` on truncated unary result (returns non-retryable error now)
2+
* Added `ydb.WithIgnoreTruncated` option for disabling errors on truncated flag
13
* Added simple transaction control constructors `table.OnlineReadOnlyTxControl()` and `table.StaleReadOnlyTxControl()`
24
* Added transaction control specifier with context `ydb.WithTxControl`
35
* Added value constructors `types.BytesValue`, `types.BytesValueFromString`, `types.TextValue`

internal/table/client.go

Lines changed: 61 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -111,87 +111,96 @@ func (c *Client) createSession(ctx context.Context, opts ...createSessionOption)
111111
if c.isClosed() {
112112
return nil, errClosedClient
113113
}
114+
115+
options := createSessionOptions{}
116+
for _, o := range opts {
117+
o(&options)
118+
}
119+
114120
defer func() {
115121
if s == nil {
116122
return
117123
}
118-
options := createSessionOptions{}
119-
for _, o := range opts {
120-
o(&options)
121-
}
122124
for _, onCreate := range options.onCreate {
123125
onCreate(s)
124126
}
125127
s.onClose = append(s.onClose, options.onClose...)
126128
}()
127-
type result struct {
128-
s *session
129-
err error
130-
}
131-
132-
ch := make(chan result)
133129

134-
c.spawnedGoroutines.Add(1)
135-
go func() {
136-
defer c.spawnedGoroutines.Done()
130+
select {
131+
case <-c.done:
132+
return nil, xerrors.WithStackTrace(errClosedClient)
137133

138-
var (
134+
default:
135+
type result struct {
139136
s *session
140137
err error
141-
)
142-
143-
createSessionCtx := xcontext.WithoutDeadline(ctx)
144-
145-
if timeout := c.config.CreateSessionTimeout(); timeout > 0 {
146-
var cancel context.CancelFunc
147-
createSessionCtx, cancel = context.WithTimeout(createSessionCtx, timeout)
148-
defer cancel()
149138
}
150139

151-
s, err = c.build(createSessionCtx)
140+
ch := make(chan result)
152141

153-
closeSession := func(s *session) {
154-
if s == nil {
155-
return
156-
}
142+
c.spawnedGoroutines.Add(1)
143+
go func() {
144+
defer c.spawnedGoroutines.Done()
145+
146+
var (
147+
s *session
148+
err error
149+
)
157150

158-
closeSessionCtx := xcontext.WithoutDeadline(ctx)
151+
createSessionCtx := xcontext.WithoutDeadline(ctx)
159152

160-
if timeout := c.config.DeleteTimeout(); timeout > 0 {
153+
if timeout := c.config.CreateSessionTimeout(); timeout > 0 {
161154
var cancel context.CancelFunc
162-
createSessionCtx, cancel = context.WithTimeout(closeSessionCtx, timeout)
155+
createSessionCtx, cancel = context.WithTimeout(createSessionCtx, timeout)
163156
defer cancel()
164157
}
165158

166-
_ = s.Close(ctx)
167-
}
159+
s, err = c.build(createSessionCtx)
168160

169-
select {
170-
case ch <- result{
171-
s: s,
172-
err: err,
173-
}: // nop
161+
closeSession := func(s *session) {
162+
if s == nil {
163+
return
164+
}
165+
166+
closeSessionCtx := xcontext.WithoutDeadline(ctx)
167+
168+
if timeout := c.config.DeleteTimeout(); timeout > 0 {
169+
var cancel context.CancelFunc
170+
createSessionCtx, cancel = context.WithTimeout(closeSessionCtx, timeout)
171+
defer cancel()
172+
}
173+
174+
_ = s.Close(ctx)
175+
}
174176

177+
select {
178+
case ch <- result{
179+
s: s,
180+
err: err,
181+
}: // nop
182+
183+
case <-c.done:
184+
closeSession(s)
185+
186+
case <-ctx.Done():
187+
closeSession(s)
188+
}
189+
}()
190+
191+
select {
175192
case <-c.done:
176-
closeSession(s)
193+
return nil, xerrors.WithStackTrace(errClosedClient)
177194

178195
case <-ctx.Done():
179-
closeSession(s)
180-
}
181-
}()
196+
return nil, xerrors.WithStackTrace(ctx.Err())
182197

183-
select {
184-
case r := <-ch:
185-
if r.err != nil {
186-
return nil, xerrors.WithStackTrace(r.err)
198+
case r := <-ch:
199+
if r.err != nil {
200+
return nil, xerrors.WithStackTrace(r.err)
201+
}
202+
return r.s, nil
187203
}
188-
return r.s, nil
189-
190-
case <-c.done:
191-
return nil, xerrors.WithStackTrace(errClosedClient)
192-
193-
case <-ctx.Done():
194-
return nil, xerrors.WithStackTrace(ctx.Err())
195204
}
196205
}
197206

internal/table/config/config.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,13 @@ func WithTrace(trace trace.Table, opts ...trace.TableComposeOption) Option {
124124
}
125125
}
126126

127+
// WithIgnoreTruncated disables errors on truncated flag
128+
func WithIgnoreTruncated() Option {
129+
return func(c *Config) {
130+
c.ignoreTruncated = true
131+
}
132+
}
133+
127134
// Config is a configuration of table client
128135
type Config struct {
129136
config.Common
@@ -134,6 +141,8 @@ type Config struct {
134141
deleteTimeout time.Duration
135142
idleThreshold time.Duration
136143

144+
ignoreTruncated bool
145+
137146
trace trace.Table
138147
}
139148

@@ -159,6 +168,11 @@ func (c Config) KeepAliveMinSize() int {
159168
return DefaultKeepAliveMinSize
160169
}
161170

171+
// IgnoreTruncated specifies behavior on truncated flag
172+
func (c Config) IgnoreTruncated() bool {
173+
return c.ignoreTruncated
174+
}
175+
162176
// IdleKeepAliveThreshold is a number of keepAlive messages to call before the
163177
// session is removed if it is an excess session (see KeepAliveMinSize)
164178
// This means that session will be deleted after the expiration of lifetime = IdleThreshold * IdleKeepAliveThreshold

internal/table/scanner/result.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,24 +70,39 @@ type StreamResult interface {
7070
resultWithError
7171
}
7272

73+
type option func(r *baseResult)
74+
75+
func WithIgnoreTruncated(ignoreTruncated bool) option {
76+
return func(r *baseResult) {
77+
r.scanner.ignoreTruncated = ignoreTruncated
78+
}
79+
}
80+
7381
func NewStream(
7482
recv func(ctx context.Context) (*Ydb.ResultSet, *Ydb_TableStats.QueryStats, error),
7583
onClose func(error) error,
84+
opts ...option,
7685
) StreamResult {
7786
r := &streamResult{
7887
recv: recv,
7988
close: onClose,
8089
}
90+
for _, o := range opts {
91+
o(&r.baseResult)
92+
}
8193
return r
8294
}
8395

84-
func NewUnary(sets []*Ydb.ResultSet, stats *Ydb_TableStats.QueryStats) UnaryResult {
96+
func NewUnary(sets []*Ydb.ResultSet, stats *Ydb_TableStats.QueryStats, opts ...option) UnaryResult {
8597
r := &unaryResult{
8698
baseResult: baseResult{
8799
stats: stats,
88100
},
89101
sets: sets,
90102
}
103+
for _, o := range opts {
104+
o(&r.baseResult)
105+
}
91106
return r
92107
}
93108

internal/table/scanner/scanner.go

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"database/sql"
66
"encoding/json"
7+
"errors"
78
"fmt"
89
"io"
910
"math"
@@ -22,13 +23,16 @@ import (
2223
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
2324
)
2425

26+
var errTruncated = xerrors.Wrap(errors.New("truncated result"))
27+
2528
type scanner struct {
26-
set *Ydb.ResultSet
27-
row *Ydb.Value
28-
converter *rawConverter
29-
stack scanStack
30-
nextRow int
31-
nextItem int
29+
set *Ydb.ResultSet
30+
row *Ydb.Value
31+
converter *rawConverter
32+
stack scanStack
33+
nextRow int
34+
nextItem int
35+
ignoreTruncated bool
3236

3337
columnIndexes []int
3438

@@ -206,11 +210,25 @@ func (s *scanner) Truncated() bool {
206210
return s.set.Truncated
207211
}
208212

213+
// Truncated returns true if current result set has been truncated by server
214+
func (s *scanner) truncated() bool {
215+
if s.set == nil {
216+
return false
217+
}
218+
return s.set.Truncated
219+
}
220+
209221
// Err returns error caused Scanner to be broken.
210222
func (s *scanner) Err() error {
211223
s.errMtx.RLock()
212224
defer s.errMtx.RUnlock()
213-
return s.err
225+
if s.err != nil {
226+
return s.err
227+
}
228+
if !s.ignoreTruncated && s.truncated() {
229+
return xerrors.WithStackTrace(errTruncated)
230+
}
231+
return nil
214232
}
215233

216234
// Must not be exported.

internal/table/session.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -694,6 +694,7 @@ func (s *session) executeQueryResult(res *Ydb_Table.ExecuteQueryResult) (
694694
r := scanner.NewUnary(
695695
res.GetResultSets(),
696696
res.GetQueryStats(),
697+
scanner.WithIgnoreTruncated(s.config.IgnoreTruncated()),
697698
)
698699
return t, r, nil
699700
}
@@ -993,6 +994,7 @@ func (s *session) StreamReadTable(
993994
onIntermediate(xerrors.HideEOF(err))(xerrors.HideEOF(err))
994995
return err
995996
},
997+
scanner.WithIgnoreTruncated(true), // stream read table always returns truncated flag on last result set
996998
), nil
997999
}
9981000

@@ -1075,6 +1077,7 @@ func (s *session) StreamExecuteScanQuery(
10751077
onIntermediate(xerrors.HideEOF(err))(xerrors.HideEOF(err))
10761078
return err
10771079
},
1080+
scanner.WithIgnoreTruncated(s.config.IgnoreTruncated()),
10781081
), nil
10791082
}
10801083

internal/table/transaction.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ func (tx *transaction) CommitTx(
161161
return scanner.NewUnary(
162162
nil,
163163
result.GetQueryStats(),
164+
scanner.WithIgnoreTruncated(tx.s.config.IgnoreTruncated()),
164165
), nil
165166
}
166167

options.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,14 @@ func WithSessionPoolDeleteTimeout(deleteTimeout time.Duration) Option {
386386
}
387387
}
388388

389+
// WithIgnoreTruncated disables errors on truncated flag
390+
func WithIgnoreTruncated() Option {
391+
return func(ctx context.Context, c *connection) error {
392+
c.tableOptions = append(c.tableOptions, tableConfig.WithIgnoreTruncated())
393+
return nil
394+
}
395+
}
396+
389397
// WithPanicCallback specified behavior on panic
390398
// Warning: WithPanicCallback must be defined on start of all options
391399
// (before `WithTrace{Driver,Table,Scheme,Scripting,Coordination,Ratelimiter}` and other options)

table/table_e2e_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1510,7 +1510,7 @@ func TestLongStream(t *testing.T) {
15101510
time.Sleep(discoveryInterval)
15111511
}
15121512
if err = res.Err(); err != nil {
1513-
return fmt.Errorf("received error: %w (duration: %v)", err, time.Since(start))
1513+
return fmt.Errorf("received error (duration: %v): %w", time.Since(start), err)
15141514
}
15151515
if rowsCount != upsertRowsCount {
15161516
return fmt.Errorf("wrong rows count: %v, expected: %d (duration: %v)",
@@ -1551,7 +1551,7 @@ func TestLongStream(t *testing.T) {
15511551
time.Sleep(discoveryInterval)
15521552
}
15531553
if err = res.Err(); err != nil {
1554-
return fmt.Errorf("received error: %w (duration: %v)", err, time.Since(start))
1554+
return fmt.Errorf("received error (duration: %v): %w", time.Since(start), err)
15551555
}
15561556
if rowsCount != upsertRowsCount {
15571557
return fmt.Errorf("wrong rows count: %v, expected: %d (duration: %v)",

0 commit comments

Comments
 (0)