Skip to content

Commit b3a728c

Browse files
authored
Merge pull request #1687 from ydb-platform/session-core-panic
* Fixed panic on closing `internal/query/sessionCore.done` channel twice
2 parents 8b9ba18 + 20644d6 commit b3a728c

File tree

3 files changed

+17
-24
lines changed

3 files changed

+17
-24
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Fixed panic on closing `internal/query/sessionCore.done` channel twice
12
* Fixed hangup when try to send batch of messages with size more, then grpc limits from topic writer internals
23

34
## v3.101.2

internal/query/session_core.go

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"os"
66
"runtime/pprof"
77
"strconv"
8+
"sync"
89
"sync/atomic"
910
"time"
1011

@@ -20,7 +21,6 @@ import (
2021
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
2122
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
2223
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
23-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
2424
"github.com/ydb-platform/ydb-go-sdk/v3/query"
2525
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
2626
)
@@ -48,7 +48,7 @@ type (
4848
nodeID uint32
4949
status atomic.Uint32
5050
onChangeStatus []func(status Status)
51-
closeOnce func(ctx context.Context) error
51+
closeOnce func()
5252
}
5353
)
5454

@@ -197,17 +197,9 @@ func (core *sessionCore) attach(ctx context.Context) (finalErr error) {
197197
return xerrors.WithStackTrace(err)
198198
}
199199

200-
core.closeOnce = xsync.OnceFunc(func(ctx context.Context) error {
200+
core.closeOnce = sync.OnceFunc(func() {
201+
defer close(core.done)
201202
defer cancelAttach()
202-
203-
core.SetStatus(StatusClosing)
204-
defer core.SetStatus(StatusClosed)
205-
206-
if err = core.deleteSession(ctx); err != nil {
207-
return xerrors.WithStackTrace(err)
208-
}
209-
210-
return nil
211203
})
212204

213205
if markGoroutineWithLabelNodeIDForAttachStream {
@@ -224,17 +216,10 @@ func (core *sessionCore) attach(ctx context.Context) (finalErr error) {
224216
}
225217

226218
func (core *sessionCore) listenAttachStream(attachStream Ydb_Query_V1.QueryService_AttachSessionClient) {
227-
defer func() {
228-
select {
229-
case <-core.done:
230-
return
231-
default:
232-
close(core.done)
233-
}
234-
}()
235-
236219
for core.IsAlive() {
237220
if _, recvErr := attachStream.Recv(); recvErr != nil {
221+
core.closeOnce()
222+
238223
return
239224
}
240225
}
@@ -281,14 +266,21 @@ func (core *sessionCore) IsAlive() bool {
281266
}
282267

283268
func (core *sessionCore) Close(ctx context.Context) (err error) {
269+
defer core.closeOnce()
270+
284271
select {
285272
case <-core.done:
286273
return nil
287274
default:
288-
close(core.done)
275+
core.SetStatus(StatusClosing)
276+
defer core.SetStatus(StatusClosed)
277+
278+
if err = core.deleteSession(ctx); err != nil {
279+
return xerrors.WithStackTrace(err)
280+
}
289281
}
290282

291-
return core.closeOnce(ctx)
283+
return nil
292284
}
293285

294286
func StatusFromErr(err error) Status {

internal/query/session_core_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func TestSessionCoreCancelAttachOnDone(t *testing.T) {
5555
<-stopRecv
5656
require.Equal(t, uint32(2), recvMsgCounter.Load())
5757
<-startRecv
58-
close(done)
58+
core.closeOnce()
5959
require.GreaterOrEqual(t, recvMsgCounter.Load(), uint32(2))
6060
require.LessOrEqual(t, recvMsgCounter.Load(), uint32(3))
6161
}, xtest.StopAfter(time.Second))

0 commit comments

Comments
 (0)