Skip to content

Commit 8b9ba18

Browse files
authored
Merge pull request #1686 from ydb-platform/close-stream-result
Close stream result
2 parents deaca20 + 4d3efa8 commit 8b9ba18

File tree

3 files changed

+22
-6
lines changed

3 files changed

+22
-6
lines changed

internal/query/result.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,10 +143,9 @@ func newResult(
143143
}
144144

145145
r.closeOnce = sync.OnceFunc(func() {
146-
for _, onClose := range r.onClose {
147-
onClose()
146+
for i := range r.onClose { // descending calls for LIFO
147+
r.onClose[len(r.onClose)-i-1]()
148148
}
149-
r.stream = nil
150149
})
151150

152151
if r.trace != nil {
@@ -239,6 +238,8 @@ func (r *streamResult) Close(ctx context.Context) (finalErr error) {
239238

240239
for {
241240
select {
241+
case <-ctx.Done():
242+
return xerrors.WithStackTrace(ctx.Err())
242243
case <-r.closed:
243244
return nil
244245
default:
@@ -280,6 +281,7 @@ func (r *streamResult) nextResultSet(ctx context.Context) (_ *resultSet, err err
280281
}
281282
if part.GetResultSetIndex() < r.resultSetIndex {
282283
r.closeOnce()
284+
283285
if part.GetResultSetIndex() <= 0 && r.resultSetIndex > 0 {
284286
return nil, xerrors.WithStackTrace(io.EOF)
285287
}

internal/query/transaction_test.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,10 @@ func TestTxOnCompleted(t *testing.T) {
274274

275275
res, err := tx.Query(sf.Context(e), "")
276276
require.NoError(t, err)
277+
_, err = res.NextResultSet(sf.Context(e))
278+
require.NoError(t, err)
279+
_, err = res.NextResultSet(sf.Context(e))
280+
require.ErrorIs(t, err, io.EOF)
277281
_ = res.Close(sf.Context(e))
278282
time.Sleep(time.Millisecond) // time for reaction for closing channel
279283
require.Empty(t, completed)
@@ -332,7 +336,12 @@ func TestTxOnCompleted(t *testing.T) {
332336
})
333337

334338
res, err := tx.Query(sf.Context(e), "", options.WithCommit())
335-
_ = res.Close(sf.Context(e))
339+
require.NoError(t, err)
340+
_, err = res.NextResultSet(sf.Context(e))
341+
require.NoError(t, err)
342+
_, err = res.NextResultSet(sf.Context(e))
343+
require.ErrorIs(t, err, io.EOF)
344+
err = res.Close(sf.Context(e))
336345
require.NoError(t, err)
337346
xtest.SpinWaitCondition(t, &completedMutex, func() bool {
338347
return len(completed) != 0
@@ -384,7 +393,10 @@ func TestTxOnCompleted(t *testing.T) {
384393
_, err = res.NextResultSet(sf.Context(e))
385394
require.NoError(t, err)
386395

387-
_ = res.Close(sf.Context(e))
396+
_, err = res.NextResultSet(sf.Context(e))
397+
require.ErrorIs(t, err, io.EOF)
398+
399+
err = res.Close(sf.Context(e))
388400
require.NoError(t, err)
389401
xtest.SpinWaitCondition(t, &completedMutex, func() bool {
390402
return len(completed) != 0

internal/xtest/leak.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@ func findGoroutinesLeak() error {
6262
unexpectedGoroutines = append(unexpectedGoroutines, g)
6363
}
6464
if l := len(unexpectedGoroutines); l > 0 {
65-
return fmt.Errorf("found %d unexpected goroutines:\n%s", len(goroutines), strings.Join(goroutines, "\n"))
65+
return fmt.Errorf("found %d unexpected goroutines:\n\n%s",
66+
len(goroutines), strings.Join(goroutines, "\n\n"),
67+
)
6668
}
6769

6870
return nil

0 commit comments

Comments
 (0)