Skip to content

Commit c620cac

Browse files
committed
* Added receiving first result set on construct internal/table/scanner.NewStream()
1 parent 5a96158 commit c620cac

File tree

6 files changed

+113
-22
lines changed

6 files changed

+113
-22
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Added receiving first result set on construct `internal/table/scanner.NewStream()`
12
* Added experimental package `metrics` with SDK metrics
23
* Fixed redundant trace call for finished `database/sql` transactions
34
* Added repeater event type to wake-up func context

internal/scripting/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ func (c *Client) streamExecute(
222222
return nil, xerrors.WithStackTrace(err)
223223
}
224224

225-
return scanner.NewStream(
225+
return scanner.NewStream(ctx,
226226
func(ctx context.Context) (
227227
set *Ydb.ResultSet,
228228
stats *Ydb_TableStats.QueryStats,
@@ -249,7 +249,7 @@ func (c *Client) streamExecute(
249249
onIntermediate(xerrors.HideEOF(err))(xerrors.HideEOF(err))
250250
return err
251251
},
252-
), nil
252+
)
253253
}
254254

255255
func (c *Client) Close(ctx context.Context) (err error) {

internal/table/scanner/result.go

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ var errAlreadyClosed = xerrors.Wrap(errors.New("result closed early"))
2020
type baseResult struct {
2121
scanner
2222

23-
statsMtx xsync.RWMutex
24-
stats *Ydb_TableStats.QueryStats
23+
nextResultSetCounter xatomic.Uint64
24+
statsMtx xsync.RWMutex
25+
stats *Ydb_TableStats.QueryStats
2526

2627
closed xatomic.Bool
2728
}
@@ -103,10 +104,11 @@ func WithMarkTruncatedAsRetryable() option {
103104
}
104105

105106
func NewStream(
107+
ctx context.Context,
106108
recv func(ctx context.Context) (*Ydb.ResultSet, *Ydb_TableStats.QueryStats, error),
107109
onClose func(error) error,
108110
opts ...option,
109-
) StreamResult {
111+
) (StreamResult, error) {
110112
r := &streamResult{
111113
recv: recv,
112114
close: onClose,
@@ -116,7 +118,10 @@ func NewStream(
116118
o(&r.baseResult)
117119
}
118120
}
119-
return r
121+
if err := r.nextResultSetErr(ctx); err != nil {
122+
return nil, xerrors.WithStackTrace(err)
123+
}
124+
return r, nil
120125
}
121126

122127
func NewUnary(sets []*Ydb.ResultSet, stats *Ydb_TableStats.QueryStats, opts ...option) UnaryResult {
@@ -157,20 +162,18 @@ func (r *unaryResult) NextResultSet(ctx context.Context, columns ...string) bool
157162
return r.NextResultSetErr(ctx, columns...) == nil
158163
}
159164

160-
func (r *streamResult) NextResultSetErr(ctx context.Context, columns ...string) (err error) {
161-
if r.isClosed() {
162-
return xerrors.WithStackTrace(errAlreadyClosed)
163-
}
164-
if err = r.Err(); err != nil {
165-
return xerrors.WithStackTrace(err)
165+
func (r *streamResult) nextResultSetErr(ctx context.Context, columns ...string) (err error) {
166+
if r.nextResultSetCounter.Add(1) == 2 {
167+
r.setColumnIndexes(columns)
168+
return ctx.Err()
166169
}
167170
s, stats, err := r.recv(ctx)
168171
if err != nil {
169172
r.Reset(nil)
170173
if xerrors.Is(err, io.EOF) {
171174
return err
172175
}
173-
return r.errorf(0, "streamResult.NextResultSetErr(): %w", err)
176+
return r.errorf(1, "streamResult.NextResultSetErr(): %w", err)
174177
}
175178
r.Reset(s, columns...)
176179
if stats != nil {
@@ -181,6 +184,22 @@ func (r *streamResult) NextResultSetErr(ctx context.Context, columns ...string)
181184
return ctx.Err()
182185
}
183186

187+
func (r *streamResult) NextResultSetErr(ctx context.Context, columns ...string) (err error) {
188+
if r.isClosed() {
189+
return xerrors.WithStackTrace(errAlreadyClosed)
190+
}
191+
if err = r.Err(); err != nil {
192+
return xerrors.WithStackTrace(err)
193+
}
194+
if err := r.nextResultSetErr(ctx, columns...); err != nil {
195+
if xerrors.Is(err, io.EOF) {
196+
return io.EOF
197+
}
198+
return xerrors.WithStackTrace(err)
199+
}
200+
return nil
201+
}
202+
184203
func (r *streamResult) NextResultSet(ctx context.Context, columns ...string) bool {
185204
return r.NextResultSetErr(ctx, columns...) == nil
186205
}

internal/table/scanner/result_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@ package scanner
33
import (
44
"context"
55
"fmt"
6+
"io"
67
"reflect"
78
"testing"
9+
"time"
810

11+
"github.com/stretchr/testify/require"
912
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
13+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_TableStats"
1014

1115
"github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator"
1216
"github.com/ydb-platform/ydb-go-sdk/v3/internal/value"
@@ -199,3 +203,75 @@ func NewResultSet(a *allocator.Allocator, opts ...ResultSetOption) *Ydb.ResultSe
199203
}
200204
return (*Ydb.ResultSet)(&d)
201205
}
206+
207+
func TestNewStreamWithRecvFirstResultSet(t *testing.T) {
208+
for _, tt := range []struct {
209+
ctx context.Context
210+
recvCounter int
211+
err error
212+
}{
213+
{
214+
ctx: context.Background(),
215+
err: nil,
216+
},
217+
{
218+
ctx: func() context.Context {
219+
ctx, cancel := context.WithCancel(context.Background())
220+
cancel()
221+
return ctx
222+
}(),
223+
err: context.Canceled,
224+
},
225+
{
226+
ctx: func() context.Context {
227+
ctx, cancel := context.WithTimeout(context.Background(), 0)
228+
cancel()
229+
return ctx
230+
}(),
231+
err: context.DeadlineExceeded,
232+
},
233+
{
234+
ctx: func() context.Context {
235+
ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
236+
cancel()
237+
return ctx
238+
}(),
239+
err: context.Canceled,
240+
},
241+
} {
242+
t.Run("", func(t *testing.T) {
243+
result, err := NewStream(tt.ctx,
244+
func(ctx context.Context) (*Ydb.ResultSet, *Ydb_TableStats.QueryStats, error) {
245+
tt.recvCounter++
246+
if tt.recvCounter > 1000 {
247+
return nil, nil, io.EOF
248+
}
249+
return &Ydb.ResultSet{}, nil, ctx.Err()
250+
},
251+
func(err error) error {
252+
return err
253+
},
254+
)
255+
if tt.err != nil {
256+
require.ErrorIs(t, err, tt.err)
257+
require.Nil(t, result)
258+
} else {
259+
require.NoError(t, err)
260+
require.NotNil(t, result)
261+
require.EqualValues(t, 1, tt.recvCounter)
262+
require.EqualValues(t, 1, result.(*streamResult).nextResultSetCounter.Load())
263+
for i := range make([]struct{}, 1000) {
264+
err = result.NextResultSetErr(tt.ctx)
265+
require.NoError(t, err)
266+
require.Equal(t, i+1, tt.recvCounter)
267+
require.Equal(t, i+2, int(result.(*streamResult).nextResultSetCounter.Load()))
268+
}
269+
err = result.NextResultSetErr(tt.ctx)
270+
require.ErrorIs(t, err, io.EOF)
271+
require.True(t, err == io.EOF) //nolint:errorlint
272+
require.Equal(t, 1001, tt.recvCounter)
273+
require.Equal(t, 1002, int(result.(*streamResult).nextResultSetCounter.Load()))
274+
}
275+
})
276+
}
277+
}

internal/table/session.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -996,7 +996,7 @@ func (s *session) StreamReadTable(
996996
return nil, xerrors.WithStackTrace(err)
997997
}
998998

999-
return scanner.NewStream(
999+
return scanner.NewStream(ctx,
10001000
func(ctx context.Context) (
10011001
set *Ydb.ResultSet,
10021002
stats *Ydb_TableStats.QueryStats,
@@ -1024,7 +1024,7 @@ func (s *session) StreamReadTable(
10241024
return err
10251025
},
10261026
scanner.WithIgnoreTruncated(true), // stream read table always returns truncated flag on last result set
1027-
), nil
1027+
)
10281028
}
10291029

10301030
func (s *session) ReadRows(
@@ -1123,7 +1123,7 @@ func (s *session) StreamExecuteScanQuery(
11231123
return nil, xerrors.WithStackTrace(err)
11241124
}
11251125

1126-
return scanner.NewStream(
1126+
return scanner.NewStream(ctx,
11271127
func(ctx context.Context) (
11281128
set *Ydb.ResultSet,
11291129
stats *Ydb_TableStats.QueryStats,
@@ -1152,7 +1152,7 @@ func (s *session) StreamExecuteScanQuery(
11521152
},
11531153
scanner.WithIgnoreTruncated(s.config.IgnoreTruncated()),
11541154
scanner.WithMarkTruncatedAsRetryable(),
1155-
), nil
1155+
)
11561156
}
11571157

11581158
// BulkUpsert uploads given list of ydb struct values to the table.

tests/integration/table_scan_error_test.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,8 @@ func TestIssue847ScanError(t *testing.T) {
7171
if err != nil {
7272
return err
7373
}
74-
err = res.NextResultSetErr(ctx)
75-
if err != nil {
76-
return err
77-
}
7874
return res.Err()
7975
}, table.WithTxSettings(table.TxSettings(table.WithSnapshotReadOnly())))
8076
require.Error(t, err)
81-
t.Log(err)
8277
require.ErrorContains(t, err, "Unexpected token 'SELICT'")
8378
}

0 commit comments

Comments
 (0)