Skip to content

Commit 0c18ee0

Browse files
committed
add config option for disabling truncated flag
1 parent eb1221e commit 0c18ee0

File tree

7 files changed

+53
-21
lines changed

7 files changed

+53
-21
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
* Changed behavior on `result.Err()` on truncated unary result (returns non-retryable error now)
2+
* Added `ydb.WithIgnoreTruncated` option for disabling errors on truncated flag
23
* Added simple transaction control constructors `table.OnlineReadOnlyTxControl()` and `table.StaleReadOnlyTxControl()`
34
* Added transaction control specifier with context `ydb.WithTxControl`
45
* Added value constructors `types.BytesValue`, `types.BytesValueFromString`, `types.TextValue`

internal/table/config/config.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,13 @@ func WithTrace(trace trace.Table, opts ...trace.TableComposeOption) Option {
124124
}
125125
}
126126

127+
// WithIgnoreTruncated disables errors on truncated flag
128+
func WithIgnoreTruncated() Option {
129+
return func(c *Config) {
130+
c.ignoreTruncated = true
131+
}
132+
}
133+
127134
// Config is a configuration of table client
128135
type Config struct {
129136
config.Common
@@ -134,6 +141,8 @@ type Config struct {
134141
deleteTimeout time.Duration
135142
idleThreshold time.Duration
136143

144+
ignoreTruncated bool
145+
137146
trace trace.Table
138147
}
139148

@@ -159,6 +168,11 @@ func (c Config) KeepAliveMinSize() int {
159168
return DefaultKeepAliveMinSize
160169
}
161170

171+
// IgnoreTruncated specifies behavior on truncated flag
172+
func (c Config) IgnoreTruncated() bool {
173+
return c.ignoreTruncated
174+
}
175+
162176
// IdleKeepAliveThreshold is a number of keepAlive messages to call before the
163177
// session is removed if it is an excess session (see KeepAliveMinSize)
164178
// This means that session will be deleted after the expiration of lifetime = IdleThreshold * IdleKeepAliveThreshold

internal/table/scanner/result.go

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -70,24 +70,39 @@ type StreamResult interface {
7070
resultWithError
7171
}
7272

73+
type option func(r *baseResult)
74+
75+
func WithIgnoreTruncated(ignoreTruncated bool) option {
76+
return func(r *baseResult) {
77+
r.scanner.ignoreTruncated = ignoreTruncated
78+
}
79+
}
80+
7381
func NewStream(
7482
recv func(ctx context.Context) (*Ydb.ResultSet, *Ydb_TableStats.QueryStats, error),
7583
onClose func(error) error,
84+
opts ...option,
7685
) StreamResult {
7786
r := &streamResult{
7887
recv: recv,
7988
close: onClose,
8089
}
90+
for _, o := range opts {
91+
o(&r.baseResult)
92+
}
8193
return r
8294
}
8395

84-
func NewUnary(sets []*Ydb.ResultSet, stats *Ydb_TableStats.QueryStats) UnaryResult {
96+
func NewUnary(sets []*Ydb.ResultSet, stats *Ydb_TableStats.QueryStats, opts ...option) UnaryResult {
8597
r := &unaryResult{
8698
baseResult: baseResult{
8799
stats: stats,
88100
},
89101
sets: sets,
90102
}
103+
for _, o := range opts {
104+
o(&r.baseResult)
105+
}
91106
return r
92107
}
93108

@@ -98,16 +113,6 @@ func (r *baseResult) Reset(set *Ydb.ResultSet, columnNames ...string) {
98113
}
99114
}
100115

101-
func (r *unaryResult) Err() (err error) {
102-
if err = r.baseResult.scanner.Err(); err != nil {
103-
return err
104-
}
105-
if r.truncated() {
106-
return xerrors.WithStackTrace(errTruncated)
107-
}
108-
return nil
109-
}
110-
111116
func (r *unaryResult) NextResultSetErr(ctx context.Context, columns ...string) (err error) {
112117
if r.isClosed() {
113118
return xerrors.WithStackTrace(errAlreadyClosed)
@@ -124,10 +129,6 @@ func (r *unaryResult) NextResultSet(ctx context.Context, columns ...string) bool
124129
return r.NextResultSetErr(ctx, columns...) == nil
125130
}
126131

127-
func (r *streamResult) Err() (err error) {
128-
return r.baseResult.scanner.Err()
129-
}
130-
131132
func (r *streamResult) NextResultSetErr(ctx context.Context, columns ...string) (err error) {
132133
if r.isClosed() {
133134
return xerrors.WithStackTrace(errAlreadyClosed)

internal/table/scanner/scanner.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,13 @@ import (
2626
var errTruncated = xerrors.Wrap(errors.New("truncated result"))
2727

2828
type scanner struct {
29-
set *Ydb.ResultSet
30-
row *Ydb.Value
31-
converter *rawConverter
32-
stack scanStack
33-
nextRow int
34-
nextItem int
29+
set *Ydb.ResultSet
30+
row *Ydb.Value
31+
converter *rawConverter
32+
stack scanStack
33+
nextRow int
34+
nextItem int
35+
ignoreTruncated bool
3536

3637
columnIndexes []int
3738

@@ -224,6 +225,9 @@ func (s *scanner) Err() error {
224225
if s.err != nil {
225226
return s.err
226227
}
228+
if !s.ignoreTruncated && s.truncated() {
229+
return xerrors.WithStackTrace(errTruncated)
230+
}
227231
return nil
228232
}
229233

internal/table/session.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -694,6 +694,7 @@ func (s *session) executeQueryResult(res *Ydb_Table.ExecuteQueryResult) (
694694
r := scanner.NewUnary(
695695
res.GetResultSets(),
696696
res.GetQueryStats(),
697+
scanner.WithIgnoreTruncated(s.config.IgnoreTruncated()),
697698
)
698699
return t, r, nil
699700
}
@@ -993,6 +994,7 @@ func (s *session) StreamReadTable(
993994
onIntermediate(xerrors.HideEOF(err))(xerrors.HideEOF(err))
994995
return err
995996
},
997+
scanner.WithIgnoreTruncated(true), // stream read table always returns truncated flag on last result set
996998
), nil
997999
}
9981000

@@ -1075,6 +1077,7 @@ func (s *session) StreamExecuteScanQuery(
10751077
onIntermediate(xerrors.HideEOF(err))(xerrors.HideEOF(err))
10761078
return err
10771079
},
1080+
scanner.WithIgnoreTruncated(s.config.IgnoreTruncated()),
10781081
), nil
10791082
}
10801083

internal/table/transaction.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ func (tx *transaction) CommitTx(
161161
return scanner.NewUnary(
162162
nil,
163163
result.GetQueryStats(),
164+
scanner.WithIgnoreTruncated(tx.s.config.IgnoreTruncated()),
164165
), nil
165166
}
166167

options.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,14 @@ func WithSessionPoolDeleteTimeout(deleteTimeout time.Duration) Option {
386386
}
387387
}
388388

389+
// WithIgnoreTruncated disables errors on truncated flag
390+
func WithIgnoreTruncated() Option {
391+
return func(ctx context.Context, c *connection) error {
392+
c.tableOptions = append(c.tableOptions, tableConfig.WithIgnoreTruncated())
393+
return nil
394+
}
395+
}
396+
389397
// WithPanicCallback specified behavior on panic
390398
// Warning: WithPanicCallback must be defined on start of all options
391399
// (before `WithTrace{Driver,Table,Scheme,Scripting,Coordination,Ratelimiter}` and other options)

0 commit comments

Comments
 (0)