Skip to content

Commit d2be4cd

Browse files
authored
Merge pull request #801 from ydb-platform/truncated
`table.options.WithIgnoreTruncated` + `table.result.ErrTruncated`
2 parents 66a967f + 4ca1e2c commit d2be4cd

File tree

9 files changed

+260
-22
lines changed

9 files changed

+260
-22
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
* Added `table.options.WithIgnoreTruncated` option for `session.Execute` method
2+
* Added `table.result.ErrTruncated` error for check it with `errors.Is()` outside of `ydb-go-sdk`
3+
14
## v3.49.0
25
* Added `table.Session.ReadRows` method for getting rows by keys
36
* Added `table/options.ChangefeedFormatDynamoDBStreamsJSON` format of `DynamoDB` change feeds

internal/table/scanner/scanner.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1717
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
1818
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
19+
"github.com/ydb-platform/ydb-go-sdk/v3/table/result"
1920
"github.com/ydb-platform/ydb-go-sdk/v3/table/result/indexed"
2021
"github.com/ydb-platform/ydb-go-sdk/v3/table/result/named"
2122
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
@@ -224,7 +225,7 @@ func (s *scanner) Err() error {
224225
}
225226
if !s.ignoreTruncated && s.truncated() {
226227
err := xerrors.Wrap(
227-
fmt.Errorf("truncated result (more than %d rows)", len(s.set.GetRows())),
228+
fmt.Errorf("more than %d rows: %w", len(s.set.GetRows()), result.ErrTruncated),
228229
)
229230
if s.markTruncatedAsRetryable {
230231
err = xerrors.Retryable(err)

internal/table/session.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -650,9 +650,12 @@ func (s *session) Execute(
650650
txr table.Transaction, r result.Result, err error,
651651
) {
652652
var (
653-
a = allocator.New()
654-
q = queryFromText(query)
655-
request = a.TableExecuteDataQueryRequest()
653+
a = allocator.New()
654+
q = queryFromText(query)
655+
request = options.ExecuteDataQueryDesc{
656+
ExecuteDataQueryRequest: a.TableExecuteDataQueryRequest(),
657+
IgnoreTruncated: s.config.IgnoreTruncated(),
658+
}
656659
callOptions []grpc.CallOption
657660
)
658661
defer a.Free()
@@ -671,7 +674,7 @@ func (s *session) Execute(
671674

672675
for _, opt := range opts {
673676
if opt != nil {
674-
callOptions = append(callOptions, opt.ApplyExecuteDataQueryOption((*options.ExecuteDataQueryDesc)(request), a)...)
677+
callOptions = append(callOptions, opt.ApplyExecuteDataQueryOption(&request, a)...)
675678
}
676679
}
677680

@@ -683,18 +686,20 @@ func (s *session) Execute(
683686
onDone(txr, false, r, err)
684687
}()
685688

686-
result, err := s.executeDataQuery(ctx, a, request, callOptions...)
689+
result, err := s.executeDataQuery(ctx, a, request.ExecuteDataQueryRequest, callOptions...)
687690
if err != nil {
688691
return nil, nil, xerrors.WithStackTrace(err)
689692
}
690693

691-
return s.executeQueryResult(result, request.TxControl)
694+
return s.executeQueryResult(result, request.TxControl, request.IgnoreTruncated)
692695
}
693696

694697
// executeQueryResult returns Transaction and result built from received
695698
// result.
696699
func (s *session) executeQueryResult(
697-
res *Ydb_Table.ExecuteQueryResult, txControl *Ydb_Table.TransactionControl,
700+
res *Ydb_Table.ExecuteQueryResult,
701+
txControl *Ydb_Table.TransactionControl,
702+
ignoreTruncated bool,
698703
) (
699704
table.Transaction, result.Result, error,
700705
) {
@@ -711,7 +716,7 @@ func (s *session) executeQueryResult(
711716
return tx, scanner.NewUnary(
712717
res.GetResultSets(),
713718
res.GetQueryStats(),
714-
scanner.WithIgnoreTruncated(s.config.IgnoreTruncated()),
719+
scanner.WithIgnoreTruncated(ignoreTruncated),
715720
), nil
716721
}
717722

internal/table/statement.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,11 @@ func (s *statement) Execute(
3131
txr table.Transaction, r result.Result, err error,
3232
) {
3333
var (
34-
a = allocator.New()
35-
request = a.TableExecuteDataQueryRequest()
34+
a = allocator.New()
35+
request = options.ExecuteDataQueryDesc{
36+
ExecuteDataQueryRequest: a.TableExecuteDataQueryRequest(),
37+
IgnoreTruncated: s.session.config.IgnoreTruncated(),
38+
}
3639
callOptions []grpc.CallOption
3740
)
3841
defer a.Free()
@@ -51,7 +54,7 @@ func (s *statement) Execute(
5154

5255
for _, opt := range opts {
5356
if opt != nil {
54-
callOptions = append(callOptions, opt.ApplyExecuteDataQueryOption((*options.ExecuteDataQueryDesc)(request), a)...)
57+
callOptions = append(callOptions, opt.ApplyExecuteDataQueryOption(&request, a)...)
5558
}
5659
}
5760

@@ -63,22 +66,22 @@ func (s *statement) Execute(
6366
onDone(txr, true, r, err)
6467
}()
6568

66-
return s.execute(ctx, a, request, request.TxControl, callOptions...)
69+
return s.execute(ctx, a, &request, request.TxControl, callOptions...)
6770
}
6871

6972
// execute executes prepared query without any tracing.
7073
func (s *statement) execute(
7174
ctx context.Context, a *allocator.Allocator,
72-
request *Ydb_Table.ExecuteDataQueryRequest, txControl *Ydb_Table.TransactionControl,
75+
request *options.ExecuteDataQueryDesc, txControl *Ydb_Table.TransactionControl,
7376
callOptions ...grpc.CallOption,
7477
) (
7578
txr table.Transaction, r result.Result, err error,
7679
) {
77-
res, err := s.session.executeDataQuery(ctx, a, request, callOptions...)
80+
res, err := s.session.executeDataQuery(ctx, a, request.ExecuteDataQueryRequest, callOptions...)
7881
if err != nil {
7982
return nil, nil, xerrors.WithStackTrace(err)
8083
}
81-
return s.session.executeQueryResult(res, txControl)
84+
return s.session.executeQueryResult(res, txControl, request.IgnoreTruncated)
8285
}
8386

8487
func (s *statement) NumInput() int {

internal/xsql/connector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ type ydbDriver interface {
171171

172172
type nopPathNormalizer struct{}
173173

174-
func (nopPathNormalizer) NormalizePath(folderOrTable string) string {
174+
func (nopPathNormalizer) NormalizePath(_ string) string {
175175
return tablePathPrefixTransformer
176176
}
177177

table/options/options.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -793,7 +793,11 @@ type (
793793
)
794794

795795
type (
796-
ExecuteDataQueryDesc Ydb_Table.ExecuteDataQueryRequest
796+
ExecuteDataQueryDesc struct {
797+
*Ydb_Table.ExecuteDataQueryRequest
798+
799+
IgnoreTruncated bool
800+
}
797801
ExecuteDataQueryOption interface {
798802
ApplyExecuteDataQueryOption(d *ExecuteDataQueryDesc, a *allocator.Allocator) []grpc.CallOption
799803
}
@@ -856,6 +860,14 @@ func WithCommit() ExecuteDataQueryOption {
856860
})
857861
}
858862

863+
// WithIgnoreTruncated mark truncated result as good (without error)
864+
func WithIgnoreTruncated() ExecuteDataQueryOption {
865+
return executeDataQueryOptionFunc(func(desc *ExecuteDataQueryDesc, a *allocator.Allocator) []grpc.CallOption {
866+
desc.IgnoreTruncated = true
867+
return nil
868+
})
869+
}
870+
859871
// WithQueryCachePolicyKeepInCache manages keep-in-cache policy
860872
//
861873
// Deprecated: data queries always executes with enabled keep-in-cache policy.

table/result/errors.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package result
2+
3+
import (
4+
"errors"
5+
)
6+
7+
var ErrTruncated = errors.New("truncated result")

tests/integration/helpers_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,13 +102,11 @@ func (scope *scopeT) Driver(opts ...ydb.Option) *ydb.Driver {
102102
}).(*ydb.Driver)
103103
}
104104

105-
func (scope *scopeT) SQLDriverWithFolder(opts ...ydb.ConnectorOption) *sql.DB {
105+
func (scope *scopeT) SQLDriver(opts ...ydb.ConnectorOption) *sql.DB {
106106
return scope.Cache(nil, nil, func() (res interface{}, err error) {
107107
driver := scope.Driver()
108108
scope.Logf("Create sql db connector")
109-
connector, err := ydb.Connector(driver,
110-
append([]ydb.ConnectorOption{ydb.WithTablePathPrefix(scope.Folder())}, opts...)...,
111-
)
109+
connector, err := ydb.Connector(driver, opts...)
112110
if err != nil {
113111
return nil, err
114112
}
@@ -124,6 +122,12 @@ func (scope *scopeT) SQLDriverWithFolder(opts ...ydb.ConnectorOption) *sql.DB {
124122
}).(*sql.DB)
125123
}
126124

125+
func (scope *scopeT) SQLDriverWithFolder(opts ...ydb.ConnectorOption) *sql.DB {
126+
return scope.SQLDriver(
127+
append([]ydb.ConnectorOption{ydb.WithTablePathPrefix(scope.Folder())}, opts...)...,
128+
)
129+
}
130+
127131
func (scope *scopeT) Folder() string {
128132
return scope.CacheWithCleanup(nil, nil, func() (res interface{}, cleanup fixenv.FixtureCleanupFunc, err error) {
129133
driver := scope.Driver()

0 commit comments

Comments
 (0)