Skip to content

Commit eb1221e

Browse files
committed
* Changed behavior on result.Err() on truncated result (return non-retryable error now)
1 parent 7976cb5 commit eb1221e

File tree

5 files changed

+93
-55
lines changed

5 files changed

+93
-55
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Changed behavior on `result.Err()` on truncated unary result (returns non-retryable error now)
12
* Added simple transaction control constructors `table.OnlineReadOnlyTxControl()` and `table.StaleReadOnlyTxControl()`
23
* Added transaction control specifier with context `ydb.WithTxControl`
34
* Added value constructors `types.BytesValue`, `types.BytesValueFromString`, `types.TextValue`

internal/table/client.go

Lines changed: 61 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -111,87 +111,96 @@ func (c *Client) createSession(ctx context.Context, opts ...createSessionOption)
111111
if c.isClosed() {
112112
return nil, errClosedClient
113113
}
114+
115+
options := createSessionOptions{}
116+
for _, o := range opts {
117+
o(&options)
118+
}
119+
114120
defer func() {
115121
if s == nil {
116122
return
117123
}
118-
options := createSessionOptions{}
119-
for _, o := range opts {
120-
o(&options)
121-
}
122124
for _, onCreate := range options.onCreate {
123125
onCreate(s)
124126
}
125127
s.onClose = append(s.onClose, options.onClose...)
126128
}()
127-
type result struct {
128-
s *session
129-
err error
130-
}
131-
132-
ch := make(chan result)
133129

134-
c.spawnedGoroutines.Add(1)
135-
go func() {
136-
defer c.spawnedGoroutines.Done()
130+
select {
131+
case <-c.done:
132+
return nil, xerrors.WithStackTrace(errClosedClient)
137133

138-
var (
134+
default:
135+
type result struct {
139136
s *session
140137
err error
141-
)
142-
143-
createSessionCtx := xcontext.WithoutDeadline(ctx)
144-
145-
if timeout := c.config.CreateSessionTimeout(); timeout > 0 {
146-
var cancel context.CancelFunc
147-
createSessionCtx, cancel = context.WithTimeout(createSessionCtx, timeout)
148-
defer cancel()
149138
}
150139

151-
s, err = c.build(createSessionCtx)
140+
ch := make(chan result)
152141

153-
closeSession := func(s *session) {
154-
if s == nil {
155-
return
156-
}
142+
c.spawnedGoroutines.Add(1)
143+
go func() {
144+
defer c.spawnedGoroutines.Done()
145+
146+
var (
147+
s *session
148+
err error
149+
)
157150

158-
closeSessionCtx := xcontext.WithoutDeadline(ctx)
151+
createSessionCtx := xcontext.WithoutDeadline(ctx)
159152

160-
if timeout := c.config.DeleteTimeout(); timeout > 0 {
153+
if timeout := c.config.CreateSessionTimeout(); timeout > 0 {
161154
var cancel context.CancelFunc
162-
createSessionCtx, cancel = context.WithTimeout(closeSessionCtx, timeout)
155+
createSessionCtx, cancel = context.WithTimeout(createSessionCtx, timeout)
163156
defer cancel()
164157
}
165158

166-
_ = s.Close(ctx)
167-
}
159+
s, err = c.build(createSessionCtx)
168160

169-
select {
170-
case ch <- result{
171-
s: s,
172-
err: err,
173-
}: // nop
161+
closeSession := func(s *session) {
162+
if s == nil {
163+
return
164+
}
165+
166+
closeSessionCtx := xcontext.WithoutDeadline(ctx)
167+
168+
if timeout := c.config.DeleteTimeout(); timeout > 0 {
169+
var cancel context.CancelFunc
170+
createSessionCtx, cancel = context.WithTimeout(closeSessionCtx, timeout)
171+
defer cancel()
172+
}
173+
174+
_ = s.Close(ctx)
175+
}
174176

177+
select {
178+
case ch <- result{
179+
s: s,
180+
err: err,
181+
}: // nop
182+
183+
case <-c.done:
184+
closeSession(s)
185+
186+
case <-ctx.Done():
187+
closeSession(s)
188+
}
189+
}()
190+
191+
select {
175192
case <-c.done:
176-
closeSession(s)
193+
return nil, xerrors.WithStackTrace(errClosedClient)
177194

178195
case <-ctx.Done():
179-
closeSession(s)
180-
}
181-
}()
196+
return nil, xerrors.WithStackTrace(ctx.Err())
182197

183-
select {
184-
case r := <-ch:
185-
if r.err != nil {
186-
return nil, xerrors.WithStackTrace(r.err)
198+
case r := <-ch:
199+
if r.err != nil {
200+
return nil, xerrors.WithStackTrace(r.err)
201+
}
202+
return r.s, nil
187203
}
188-
return r.s, nil
189-
190-
case <-c.done:
191-
return nil, xerrors.WithStackTrace(errClosedClient)
192-
193-
case <-ctx.Done():
194-
return nil, xerrors.WithStackTrace(ctx.Err())
195204
}
196205
}
197206

internal/table/scanner/result.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,16 @@ func (r *baseResult) Reset(set *Ydb.ResultSet, columnNames ...string) {
9898
}
9999
}
100100

101+
func (r *unaryResult) Err() (err error) {
102+
if err = r.baseResult.scanner.Err(); err != nil {
103+
return err
104+
}
105+
if r.truncated() {
106+
return xerrors.WithStackTrace(errTruncated)
107+
}
108+
return nil
109+
}
110+
101111
func (r *unaryResult) NextResultSetErr(ctx context.Context, columns ...string) (err error) {
102112
if r.isClosed() {
103113
return xerrors.WithStackTrace(errAlreadyClosed)
@@ -114,6 +124,10 @@ func (r *unaryResult) NextResultSet(ctx context.Context, columns ...string) bool
114124
return r.NextResultSetErr(ctx, columns...) == nil
115125
}
116126

127+
func (r *streamResult) Err() (err error) {
128+
return r.baseResult.scanner.Err()
129+
}
130+
117131
func (r *streamResult) NextResultSetErr(ctx context.Context, columns ...string) (err error) {
118132
if r.isClosed() {
119133
return xerrors.WithStackTrace(errAlreadyClosed)

internal/table/scanner/scanner.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"database/sql"
66
"encoding/json"
7+
"errors"
78
"fmt"
89
"io"
910
"math"
@@ -22,6 +23,8 @@ import (
2223
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
2324
)
2425

26+
var errTruncated = xerrors.Wrap(errors.New("truncated result"))
27+
2528
type scanner struct {
2629
set *Ydb.ResultSet
2730
row *Ydb.Value
@@ -206,11 +209,22 @@ func (s *scanner) Truncated() bool {
206209
return s.set.Truncated
207210
}
208211

212+
// Truncated returns true if current result set has been truncated by server
213+
func (s *scanner) truncated() bool {
214+
if s.set == nil {
215+
return false
216+
}
217+
return s.set.Truncated
218+
}
219+
209220
// Err returns error caused Scanner to be broken.
210221
func (s *scanner) Err() error {
211222
s.errMtx.RLock()
212223
defer s.errMtx.RUnlock()
213-
return s.err
224+
if s.err != nil {
225+
return s.err
226+
}
227+
return nil
214228
}
215229

216230
// Must not be exported.

table/table_e2e_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1510,7 +1510,7 @@ func TestLongStream(t *testing.T) {
15101510
time.Sleep(discoveryInterval)
15111511
}
15121512
if err = res.Err(); err != nil {
1513-
return fmt.Errorf("received error: %w (duration: %v)", err, time.Since(start))
1513+
return fmt.Errorf("received error (duration: %v): %w", time.Since(start), err)
15141514
}
15151515
if rowsCount != upsertRowsCount {
15161516
return fmt.Errorf("wrong rows count: %v, expected: %d (duration: %v)",
@@ -1551,7 +1551,7 @@ func TestLongStream(t *testing.T) {
15511551
time.Sleep(discoveryInterval)
15521552
}
15531553
if err = res.Err(); err != nil {
1554-
return fmt.Errorf("received error: %w (duration: %v)", err, time.Since(start))
1554+
return fmt.Errorf("received error (duration: %v): %w", time.Since(start), err)
15551555
}
15561556
if rowsCount != upsertRowsCount {
15571557
return fmt.Errorf("wrong rows count: %v, expected: %d (duration: %v)",

0 commit comments

Comments
 (0)