Skip to content

Commit 4cf6e0f

Browse files
authored
Merge pull request #848 from ydb-platform/YDBREQUESTS-2691
* Added receiving first result set on construct `internal/table/scanner.NewStream()`
2 parents b9d3521 + 78f2327 commit 4cf6e0f

File tree

8 files changed

+217
-29
lines changed

8 files changed

+217
-29
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Added receiving first result set on construct `internal/table/scanner.NewStream()`
12
* Added experimental package `metrics` with SDK metrics
23
* Fixed redundant trace call for finished `database/sql` transactions
34
* Added repeater event type to wake-up func context

internal/scripting/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ func (c *Client) streamExecute(
222222
return nil, xerrors.WithStackTrace(err)
223223
}
224224

225-
return scanner.NewStream(
225+
return scanner.NewStream(ctx,
226226
func(ctx context.Context) (
227227
set *Ydb.ResultSet,
228228
stats *Ydb_TableStats.QueryStats,
@@ -249,7 +249,7 @@ func (c *Client) streamExecute(
249249
onIntermediate(xerrors.HideEOF(err))(xerrors.HideEOF(err))
250250
return err
251251
},
252-
), nil
252+
)
253253
}
254254

255255
func (c *Client) Close(ctx context.Context) (err error) {

internal/table/scanner/result.go

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ var errAlreadyClosed = xerrors.Wrap(errors.New("result closed early"))
2020
type baseResult struct {
2121
scanner
2222

23-
statsMtx xsync.RWMutex
24-
stats *Ydb_TableStats.QueryStats
23+
nextResultSetCounter xatomic.Uint64
24+
statsMtx xsync.RWMutex
25+
stats *Ydb_TableStats.QueryStats
2526

2627
closed xatomic.Bool
2728
}
@@ -103,10 +104,11 @@ func WithMarkTruncatedAsRetryable() option {
103104
}
104105

105106
func NewStream(
107+
ctx context.Context,
106108
recv func(ctx context.Context) (*Ydb.ResultSet, *Ydb_TableStats.QueryStats, error),
107109
onClose func(error) error,
108110
opts ...option,
109-
) StreamResult {
111+
) (StreamResult, error) {
110112
r := &streamResult{
111113
recv: recv,
112114
close: onClose,
@@ -116,7 +118,10 @@ func NewStream(
116118
o(&r.baseResult)
117119
}
118120
}
119-
return r
121+
if err := r.nextResultSetErr(ctx); err != nil {
122+
return nil, xerrors.WithStackTrace(err)
123+
}
124+
return r, nil
120125
}
121126

122127
func NewUnary(sets []*Ydb.ResultSet, stats *Ydb_TableStats.QueryStats, opts ...option) UnaryResult {
@@ -157,20 +162,18 @@ func (r *unaryResult) NextResultSet(ctx context.Context, columns ...string) bool
157162
return r.NextResultSetErr(ctx, columns...) == nil
158163
}
159164

160-
func (r *streamResult) NextResultSetErr(ctx context.Context, columns ...string) (err error) {
161-
if r.isClosed() {
162-
return xerrors.WithStackTrace(errAlreadyClosed)
163-
}
164-
if err = r.Err(); err != nil {
165-
return xerrors.WithStackTrace(err)
165+
func (r *streamResult) nextResultSetErr(ctx context.Context, columns ...string) (err error) {
166+
if r.nextResultSetCounter.Add(1) == 2 {
167+
r.setColumnIndexes(columns)
168+
return ctx.Err()
166169
}
167170
s, stats, err := r.recv(ctx)
168171
if err != nil {
169172
r.Reset(nil)
170173
if xerrors.Is(err, io.EOF) {
171174
return err
172175
}
173-
return r.errorf(0, "streamResult.NextResultSetErr(): %w", err)
176+
return r.errorf(1, "streamResult.NextResultSetErr(): %w", err)
174177
}
175178
r.Reset(s, columns...)
176179
if stats != nil {
@@ -181,6 +184,22 @@ func (r *streamResult) NextResultSetErr(ctx context.Context, columns ...string)
181184
return ctx.Err()
182185
}
183186

187+
func (r *streamResult) NextResultSetErr(ctx context.Context, columns ...string) (err error) {
188+
if r.isClosed() {
189+
return xerrors.WithStackTrace(errAlreadyClosed)
190+
}
191+
if err = r.Err(); err != nil {
192+
return xerrors.WithStackTrace(err)
193+
}
194+
if err := r.nextResultSetErr(ctx, columns...); err != nil {
195+
if xerrors.Is(err, io.EOF) {
196+
return io.EOF
197+
}
198+
return xerrors.WithStackTrace(err)
199+
}
200+
return nil
201+
}
202+
184203
func (r *streamResult) NextResultSet(ctx context.Context, columns ...string) bool {
185204
return r.NextResultSetErr(ctx, columns...) == nil
186205
}

internal/table/scanner/result_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@ package scanner
33
import (
44
"context"
55
"fmt"
6+
"io"
67
"reflect"
78
"testing"
9+
"time"
810

11+
"github.com/stretchr/testify/require"
912
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
13+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_TableStats"
1014

1115
"github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator"
1216
"github.com/ydb-platform/ydb-go-sdk/v3/internal/value"
@@ -199,3 +203,75 @@ func NewResultSet(a *allocator.Allocator, opts ...ResultSetOption) *Ydb.ResultSe
199203
}
200204
return (*Ydb.ResultSet)(&d)
201205
}
206+
207+
func TestNewStreamWithRecvFirstResultSet(t *testing.T) {
208+
for _, tt := range []struct {
209+
ctx context.Context
210+
recvCounter int
211+
err error
212+
}{
213+
{
214+
ctx: context.Background(),
215+
err: nil,
216+
},
217+
{
218+
ctx: func() context.Context {
219+
ctx, cancel := context.WithCancel(context.Background())
220+
cancel()
221+
return ctx
222+
}(),
223+
err: context.Canceled,
224+
},
225+
{
226+
ctx: func() context.Context {
227+
ctx, cancel := context.WithTimeout(context.Background(), 0)
228+
cancel()
229+
return ctx
230+
}(),
231+
err: context.DeadlineExceeded,
232+
},
233+
{
234+
ctx: func() context.Context {
235+
ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
236+
cancel()
237+
return ctx
238+
}(),
239+
err: context.Canceled,
240+
},
241+
} {
242+
t.Run("", func(t *testing.T) {
243+
result, err := NewStream(tt.ctx,
244+
func(ctx context.Context) (*Ydb.ResultSet, *Ydb_TableStats.QueryStats, error) {
245+
tt.recvCounter++
246+
if tt.recvCounter > 1000 {
247+
return nil, nil, io.EOF
248+
}
249+
return &Ydb.ResultSet{}, nil, ctx.Err()
250+
},
251+
func(err error) error {
252+
return err
253+
},
254+
)
255+
if tt.err != nil {
256+
require.ErrorIs(t, err, tt.err)
257+
require.Nil(t, result)
258+
} else {
259+
require.NoError(t, err)
260+
require.NotNil(t, result)
261+
require.EqualValues(t, 1, tt.recvCounter)
262+
require.EqualValues(t, 1, result.(*streamResult).nextResultSetCounter.Load())
263+
for i := range make([]struct{}, 1000) {
264+
err = result.NextResultSetErr(tt.ctx)
265+
require.NoError(t, err)
266+
require.Equal(t, i+1, tt.recvCounter)
267+
require.Equal(t, i+2, int(result.(*streamResult).nextResultSetCounter.Load()))
268+
}
269+
err = result.NextResultSetErr(tt.ctx)
270+
require.ErrorIs(t, err, io.EOF)
271+
require.True(t, err == io.EOF) //nolint:errorlint
272+
require.Equal(t, 1001, tt.recvCounter)
273+
require.Equal(t, 1002, int(result.(*streamResult).nextResultSetCounter.Load()))
274+
}
275+
})
276+
}
277+
}

internal/table/session.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -996,7 +996,7 @@ func (s *session) StreamReadTable(
996996
return nil, xerrors.WithStackTrace(err)
997997
}
998998

999-
return scanner.NewStream(
999+
return scanner.NewStream(ctx,
10001000
func(ctx context.Context) (
10011001
set *Ydb.ResultSet,
10021002
stats *Ydb_TableStats.QueryStats,
@@ -1024,7 +1024,7 @@ func (s *session) StreamReadTable(
10241024
return err
10251025
},
10261026
scanner.WithIgnoreTruncated(true), // stream read table always returns truncated flag on last result set
1027-
), nil
1027+
)
10281028
}
10291029

10301030
func (s *session) ReadRows(
@@ -1123,7 +1123,7 @@ func (s *session) StreamExecuteScanQuery(
11231123
return nil, xerrors.WithStackTrace(err)
11241124
}
11251125

1126-
return scanner.NewStream(
1126+
return scanner.NewStream(ctx,
11271127
func(ctx context.Context) (
11281128
set *Ydb.ResultSet,
11291129
stats *Ydb_TableStats.QueryStats,
@@ -1152,7 +1152,7 @@ func (s *session) StreamExecuteScanQuery(
11521152
},
11531153
scanner.WithIgnoreTruncated(s.config.IgnoreTruncated()),
11541154
scanner.WithMarkTruncatedAsRetryable(),
1155-
), nil
1155+
)
11561156
}
11571157

11581158
// BulkUpsert uploads given list of ydb struct values to the table.

internal/xerrors/join.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
)
88

99
func Join(errs ...error) joinError {
10-
return joinError(errs)
10+
return errs
1111
}
1212

1313
type joinError []error

tests/integration/table_long_stream_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,14 @@ import (
1111
"testing"
1212
"time"
1313

14+
"github.com/stretchr/testify/require"
15+
1416
"github.com/ydb-platform/ydb-go-sdk/v3"
1517
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
1618
"github.com/ydb-platform/ydb-go-sdk/v3/log"
1719
"github.com/ydb-platform/ydb-go-sdk/v3/table"
1820
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
21+
"github.com/ydb-platform/ydb-go-sdk/v3/table/result/indexed"
1922
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
2023
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
2124
)
@@ -29,6 +32,7 @@ func TestLongStream(t *testing.T) {
2932
err error
3033
upsertRowsCount = 100000
3134
batchSize = 10000
35+
expectedCheckSum = uint64(4999950000)
3236
ctx = xtest.Context(t)
3337
)
3438

@@ -146,6 +150,60 @@ func TestLongStream(t *testing.T) {
146150
if upserted != uint32(upsertRowsCount) {
147151
t.Fatalf("wrong rows count: %v, expected: %d", upserted, upsertRowsCount)
148152
}
153+
err := db.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
154+
_, res, err := s.Execute(ctx, table.DefaultTxControl(),
155+
"SELECT CAST(COUNT(*) AS Uint64) FROM `"+path.Join(db.Name(), folder, tableName)+"`;",
156+
nil,
157+
)
158+
if err != nil {
159+
return err
160+
}
161+
if !res.NextResultSet(ctx) {
162+
return fmt.Errorf("no result sets")
163+
}
164+
if !res.NextRow() {
165+
return fmt.Errorf("no rows")
166+
}
167+
var rowsFromDb uint64
168+
if err := res.ScanWithDefaults(indexed.Required(&rowsFromDb)); err != nil {
169+
return err
170+
}
171+
if rowsFromDb != uint64(upsertRowsCount) {
172+
return fmt.Errorf("wrong rows count: %d, expected: %d",
173+
rowsFromDb,
174+
upsertRowsCount,
175+
)
176+
}
177+
return res.Err()
178+
}, table.WithIdempotent())
179+
require.NoError(t, err)
180+
err = db.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
181+
_, res, err := s.Execute(ctx, table.DefaultTxControl(),
182+
"SELECT CAST(SUM(val) AS Uint64) FROM `"+path.Join(db.Name(), folder, tableName)+"`;",
183+
nil,
184+
)
185+
if err != nil {
186+
return err
187+
}
188+
if !res.NextResultSet(ctx) {
189+
return fmt.Errorf("no result sets")
190+
}
191+
if !res.NextRow() {
192+
return fmt.Errorf("no rows")
193+
}
194+
var checkSumFromDb uint64
195+
if err := res.ScanWithDefaults(indexed.Required(&checkSumFromDb)); err != nil {
196+
return err
197+
}
198+
if checkSumFromDb != expectedCheckSum {
199+
return fmt.Errorf("wrong checksum: %d, expected: %d",
200+
checkSumFromDb,
201+
expectedCheckSum,
202+
)
203+
}
204+
return res.Err()
205+
}, table.WithIdempotent())
206+
require.NoError(t, err)
149207
})
150208
})
151209
})
@@ -178,6 +236,7 @@ func TestLongStream(t *testing.T) {
178236
var (
179237
start = time.Now()
180238
rowsCount = 0
239+
checkSum = uint64(0)
181240
)
182241
res, err := s.StreamExecuteScanQuery(ctx,
183242
"SELECT val FROM `"+path.Join(db.Name(), folder, tableName)+"`;", nil,
@@ -192,6 +251,11 @@ func TestLongStream(t *testing.T) {
192251
count := 0
193252
for res.NextRow() {
194253
count++
254+
var val int64
255+
if err = res.ScanWithDefaults(indexed.Required(&val)); err != nil {
256+
return err
257+
}
258+
checkSum += uint64(val)
195259
}
196260
rowsCount += count
197261
time.Sleep(discoveryInterval)
@@ -206,6 +270,12 @@ func TestLongStream(t *testing.T) {
206270
time.Since(start),
207271
)
208272
}
273+
if checkSum != expectedCheckSum {
274+
return fmt.Errorf("wrong checksum: %d, expected: %d",
275+
checkSum,
276+
expectedCheckSum,
277+
)
278+
}
209279
return res.Err()
210280
},
211281
table.WithIdempotent(),
@@ -224,6 +294,7 @@ func TestLongStream(t *testing.T) {
224294
var (
225295
start = time.Now()
226296
rowsCount = 0
297+
checkSum = uint64(0)
227298
)
228299
res, err := s.StreamReadTable(ctx, path.Join(db.Name(), folder, tableName), options.ReadColumn("val"))
229300
if err != nil {
@@ -236,6 +307,11 @@ func TestLongStream(t *testing.T) {
236307
count := 0
237308
for res.NextRow() {
238309
count++
310+
var val int64
311+
if err = res.ScanWithDefaults(indexed.Required(&val)); err != nil {
312+
return err
313+
}
314+
checkSum += uint64(val)
239315
}
240316
rowsCount += count
241317
time.Sleep(discoveryInterval)
@@ -250,6 +326,12 @@ func TestLongStream(t *testing.T) {
250326
time.Since(start),
251327
)
252328
}
329+
if checkSum != expectedCheckSum {
330+
return fmt.Errorf("wrong checksum: %d, expected: %d",
331+
checkSum,
332+
expectedCheckSum,
333+
)
334+
}
253335
return res.Err()
254336
},
255337
table.WithIdempotent(),

0 commit comments

Comments
 (0)