Skip to content

Commit 5e9d660

Browse files
authored
Merge branch 'master' into fix-pool-item-close
2 parents 211365a + 7374388 commit 5e9d660

File tree

10 files changed

+300
-33
lines changed

10 files changed

+300
-33
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
* Fixed connections pool leak on closing
22

3+
## v3.92.3
4+
* Fixed error with incompleted data returen from transaction.ReadQueryResult method
5+
* Added option `query/WithResponsePartLimitSizeBytes(...)` for queries with query service
6+
7+
38
## v3.92.2
49
* Added `table/options.WithShardNodesInfo()` experimental option to get shard nodeId for describe table call
510

internal/query/execute_query.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type executeSettings interface {
3131
CallOptions() []grpc.CallOption
3232
RetryOpts() []retry.Option
3333
ResourcePool() string
34+
ResponsePartLimitSizeBytes() int64
3435
}
3536

3637
type executeScriptConfig interface {
@@ -77,6 +78,7 @@ func executeQueryRequest(a *allocator.Allocator, sessionID, q string, cfg execut
7778
request.StatsMode = Ydb_Query.StatsMode(cfg.StatsMode())
7879
request.ConcurrentResultSets = false
7980
request.PoolId = cfg.ResourcePool()
81+
request.ResponsePartLimitBytes = cfg.ResponsePartLimitSizeBytes()
8082

8183
return request, cfg.CallOptions()
8284
}
@@ -146,14 +148,7 @@ func readResultSet(ctx context.Context, r *streamResult) (_ *resultSetWithClose,
146148
if err != nil {
147149
return nil, xerrors.WithStackTrace(err)
148150
}
149-
150-
_, err = r.nextResultSet(ctx)
151-
if err == nil {
152-
return nil, xerrors.WithStackTrace(errMoreThanOneResultSet)
153-
}
154-
if !xerrors.Is(err, io.EOF) {
155-
return nil, xerrors.WithStackTrace(err)
156-
}
151+
rs.mustBeLastResultSet = true
157152

158153
return &resultSetWithClose{
159154
resultSet: rs,

internal/query/options/execute.go

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,16 @@ type (
2727

2828
// executeSettings is a holder for execute settings
2929
executeSettings struct {
30-
syntax Syntax
31-
params params.Parameters
32-
execMode ExecMode
33-
statsMode StatsMode
34-
resourcePool string
35-
statsCallback func(queryStats stats.QueryStats)
36-
callOptions []grpc.CallOption
37-
txControl *tx.Control
38-
retryOptions []retry.Option
30+
syntax Syntax
31+
params params.Parameters
32+
execMode ExecMode
33+
statsMode StatsMode
34+
resourcePool string
35+
statsCallback func(queryStats stats.QueryStats)
36+
callOptions []grpc.CallOption
37+
txControl *tx.Control
38+
retryOptions []retry.Option
39+
responsePartLimitBytes int64
3940
}
4041

4142
// Execute is an interface for execute method options
@@ -58,7 +59,8 @@ type (
5859
mode StatsMode
5960
callback func(stats.QueryStats)
6061
}
61-
execModeOption = ExecMode
62+
execModeOption = ExecMode
63+
responsePartLimitBytes int64
6264
)
6365

6466
func (poolID resourcePool) applyExecuteOption(s *executeSettings) {
@@ -175,6 +177,10 @@ func (s *executeSettings) Params() *params.Parameters {
175177
return &s.params
176178
}
177179

180+
func (s *executeSettings) ResponsePartLimitSizeBytes() int64 {
181+
return s.responsePartLimitBytes
182+
}
183+
178184
func WithParameters(parameters *params.Parameters) parametersOption {
179185
return parametersOption(*parameters)
180186
}
@@ -201,6 +207,14 @@ func WithExecMode(mode ExecMode) execModeOption {
201207
return mode
202208
}
203209

210+
func WithResponsePartLimitSizeBytes(size int64) responsePartLimitBytes {
211+
return responsePartLimitBytes(size)
212+
}
213+
214+
func (size responsePartLimitBytes) applyExecuteOption(s *executeSettings) {
215+
s.responsePartLimitBytes = int64(size)
216+
}
217+
204218
func WithSyntax(syntax Syntax) syntaxOption {
205219
return syntax
206220
}

internal/query/result.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import (
1919
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
2020
)
2121

22+
var errReadNextResultSet = xerrors.Wrap(errors.New("ydb: stop read the result set because see part of next result set"))
23+
2224
var (
2325
_ result.Result = (*streamResult)(nil)
2426
_ result.Result = (*materializedResult)(nil)
@@ -294,8 +296,8 @@ func (r *streamResult) nextPartFunc(
294296
}
295297
if part.GetResultSetIndex() > nextResultSetIndex {
296298
return nil, xerrors.WithStackTrace(fmt.Errorf(
297-
"result set (index=%d) receive part (index=%d) for next result set: %w",
298-
nextResultSetIndex, part.GetResultSetIndex(), io.EOF,
299+
"result set (index=%d) receive part (index=%d) for next result set: %w (%w)",
300+
nextResultSetIndex, part.GetResultSetIndex(), io.EOF, errReadNextResultSet,
299301
))
300302
}
301303

internal/query/result_set.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package query
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"io"
78

@@ -29,12 +30,13 @@ type (
2930
rowIndex int
3031
}
3132
resultSet struct {
32-
index int64
33-
recv func() (*Ydb_Query.ExecuteQueryResponsePart, error)
34-
columns []*Ydb.Column
35-
currentPart *Ydb_Query.ExecuteQueryResponsePart
36-
rowIndex int
37-
done chan struct{}
33+
index int64
34+
recv func() (*Ydb_Query.ExecuteQueryResponsePart, error)
35+
columns []*Ydb.Column
36+
currentPart *Ydb_Query.ExecuteQueryResponsePart
37+
rowIndex int
38+
done chan struct{}
39+
mustBeLastResultSet bool
3840
}
3941
resultSetWithClose struct {
4042
*resultSet
@@ -158,13 +160,19 @@ func (rs *resultSet) nextRow(ctx context.Context) (*Row, error) {
158160
case <-ctx.Done():
159161
return nil, xerrors.WithStackTrace(ctx.Err())
160162
default:
163+
//nolint:nestif
161164
if rs.rowIndex == len(rs.currentPart.GetResultSet().GetRows()) {
162165
part, err := rs.recv()
163166
if err != nil {
164167
if xerrors.Is(err, io.EOF) {
165168
close(rs.done)
166169
}
167170

171+
if rs.mustBeLastResultSet && errors.Is(err, errReadNextResultSet) {
172+
// prevent detect io.EOF in the error
173+
return nil, xerrors.WithStackTrace(xerrors.Wrap(errors.New(err.Error())))
174+
}
175+
168176
return nil, xerrors.WithStackTrace(err)
169177
}
170178
rs.rowIndex = 0

internal/query/result_test.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1520,22 +1520,22 @@ func TestCloseResultOnCloseClosableResultSet(t *testing.T) {
15201520
{
15211521
Items: []*Ydb.Value{{
15221522
Value: &Ydb.Value_Uint64Value{
1523-
Uint64Value: 1,
1523+
Uint64Value: 3,
15241524
},
15251525
}, {
15261526
Value: &Ydb.Value_TextValue{
1527-
TextValue: "1",
1527+
TextValue: "3",
15281528
},
15291529
}},
15301530
},
15311531
{
15321532
Items: []*Ydb.Value{{
15331533
Value: &Ydb.Value_Uint64Value{
1534-
Uint64Value: 2,
1534+
Uint64Value: 4,
15351535
},
15361536
}, {
15371537
Value: &Ydb.Value_TextValue{
1538-
TextValue: "2",
1538+
TextValue: "4",
15391539
},
15401540
}},
15411541
},
@@ -1576,8 +1576,20 @@ func TestCloseResultOnCloseClosableResultSet(t *testing.T) {
15761576
require.EqualValues(t, 2, a)
15771577
require.EqualValues(t, "2", b)
15781578
r3, err3 := rs.NextRow(ctx)
1579-
require.ErrorIs(t, err3, io.EOF)
1580-
require.Nil(t, r3)
1579+
require.NoError(t, err3)
1580+
scanErr3 := r3.Scan(&a, &b)
1581+
require.EqualValues(t, 3, a)
1582+
require.EqualValues(t, "3", b)
1583+
require.NoError(t, scanErr3)
1584+
r4, err4 := rs.NextRow(ctx)
1585+
require.NoError(t, err4)
1586+
scanErr4 := r4.Scan(&a, &b)
1587+
require.EqualValues(t, 4, a)
1588+
require.EqualValues(t, "4", b)
1589+
require.NoError(t, scanErr4)
1590+
r5, err5 := rs.NextRow(ctx)
1591+
require.ErrorIs(t, err5, io.EOF)
1592+
require.Nil(t, r5)
15811593
err = rs.Close(ctx)
15821594
require.NoError(t, err)
15831595
require.True(t, closed)

internal/version/version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package version
33
const (
44
Major = "3"
55
Minor = "92"
6-
Patch = "2"
6+
Patch = "3"
77

88
Package = "ydb-go-sdk"
99
)

query/execute_options.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ func WithStatsMode(mode options.StatsMode, callback func(Stats)) options.Execute
5555
return options.WithStatsMode(mode, callback)
5656
}
5757

58+
// WithResponsePartLimitSizeBytes limit size of each part (data portion) in stream for query service resoponse
59+
// it isn't limit total size of answer
60+
func WithResponsePartLimitSizeBytes(size int64) options.Execute {
61+
return options.WithResponsePartLimitSizeBytes(size)
62+
}
63+
5864
func WithCallOptions(opts ...grpc.CallOption) options.Execute {
5965
return options.WithCallOptions(opts...)
6066
}

tests/integration/query_execute_test.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,3 +409,144 @@ func TestIssue1456TooManyUnknownTransactions(t *testing.T) {
409409
wg.Wait()
410410
})
411411
}
412+
413+
func TestQueryResultSet(t *testing.T) {
414+
if version.Lt(os.Getenv("YDB_VERSION"), "24.1") {
415+
t.Skip("query service not allowed in YDB version '" + os.Getenv("YDB_VERSION") + "'")
416+
}
417+
418+
t.Run("OK", func(t *testing.T) {
419+
scope := newScope(t)
420+
421+
partSizeBytes := 1000
422+
targetCount := partSizeBytes * 10 // for guarantee size of response will contain many parts
423+
items := make([]types.Value, 0, targetCount)
424+
for i := 0; i < targetCount; i++ {
425+
item := types.StructValue(
426+
types.StructFieldValue("val", types.Int64Value(int64(i))),
427+
)
428+
items = append(items, item)
429+
}
430+
431+
err := scope.Driver().Query().DoTx(scope.Ctx, func(ctx context.Context, tx query.TxActor) error {
432+
rs, err := tx.QueryResultSet(ctx, `
433+
DECLARE $arg AS List<Struct<val: Int64>>;
434+
435+
SELECT * FROM AS_TABLE($arg);
436+
`,
437+
query.WithParameters(ydb.ParamsBuilder().Param("$arg").Any(types.ListValue(items...)).Build()),
438+
query.WithResponsePartLimitSizeBytes(int64(partSizeBytes)),
439+
)
440+
if err != nil {
441+
return err
442+
}
443+
444+
for i := 0; i < targetCount; i++ {
445+
row, err := rs.NextRow(ctx)
446+
if err != nil {
447+
return err
448+
}
449+
450+
var val int64
451+
err = row.Scan(&val)
452+
require.NoError(t, err)
453+
require.Equal(t, int64(i), val)
454+
}
455+
456+
return nil
457+
})
458+
require.NoError(t, err)
459+
})
460+
t.Run("FailOnSecondResultSet", func(t *testing.T) {
461+
scope := newScope(t)
462+
463+
var secondRowError error
464+
err := scope.Driver().Query().DoTx(scope.Ctx, func(ctx context.Context, tx query.TxActor) error {
465+
rs, err := tx.QueryResultSet(ctx, "SELECT 1; SELECT 2")
466+
if err != nil {
467+
return err
468+
}
469+
470+
_, err = rs.NextRow(ctx)
471+
if err != nil {
472+
return err
473+
}
474+
475+
_, secondRowError = rs.NextRow(ctx)
476+
477+
return nil
478+
})
479+
require.NoError(t, err)
480+
require.Error(t, secondRowError)
481+
require.NotErrorIs(t, secondRowError, io.EOF)
482+
})
483+
}
484+
485+
func TestQueryPartLimiter(t *testing.T) {
486+
if os.Getenv("YDB_VERSION") != "nightly" && version.Lt(os.Getenv("YDB_VERSION"), "25.0") {
487+
t.Skip("require enables transactions for topics")
488+
}
489+
490+
scope := newScope(t)
491+
492+
var readPartCount int
493+
scope.Driver(ydb.WithTraceQuery(trace.Query{
494+
OnResultNextPart: func(info trace.QueryResultNextPartStartInfo) func(info trace.QueryResultNextPartDoneInfo) {
495+
return func(info trace.QueryResultNextPartDoneInfo) {
496+
if info.Error == nil {
497+
readPartCount++
498+
}
499+
}
500+
},
501+
}))
502+
503+
targetCount := 1000
504+
items := make([]types.Value, 0, targetCount)
505+
for i := 0; i < targetCount; i++ {
506+
item := types.StructValue(
507+
types.StructFieldValue("val", types.Int64Value(int64(i))),
508+
)
509+
items = append(items, item)
510+
}
511+
512+
getPartCount := func(partSize int64) int {
513+
partCount := 0
514+
err := scope.Driver().Query().DoTx(scope.Ctx, func(ctx context.Context, tx query.TxActor) error {
515+
oldParts := readPartCount
516+
rs, err := tx.QueryResultSet(ctx, `
517+
DECLARE $arg AS List<Struct<val: Int64>>;
518+
519+
SELECT * FROM AS_TABLE($arg);
520+
`,
521+
query.WithParameters(ydb.ParamsBuilder().Param("$arg").Any(types.ListValue(items...)).Build()),
522+
query.WithResponsePartLimitSizeBytes(partSize),
523+
)
524+
if err != nil {
525+
return err
526+
}
527+
528+
rowCount := 0
529+
for {
530+
_, err = rs.NextRow(scope.Ctx)
531+
if errors.Is(err, io.EOF) {
532+
break
533+
}
534+
require.NoError(t, err)
535+
rowCount++
536+
}
537+
require.Equal(t, targetCount, rowCount)
538+
539+
partCount = readPartCount - oldParts
540+
return nil
541+
})
542+
543+
require.NoError(t, err)
544+
return partCount
545+
}
546+
547+
partsWithBigSize := getPartCount(1000000)
548+
partsWithLittleSize := getPartCount(100)
549+
550+
require.Equal(t, 1, partsWithBigSize)
551+
require.Greater(t, partsWithLittleSize, 1)
552+
}

0 commit comments

Comments
 (0)