55 "os"
66 "runtime/pprof"
77 "strconv"
8+ "sync"
89 "sync/atomic"
910 "time"
1011
@@ -20,7 +21,6 @@ import (
2021 "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
2122 "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
2223 "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
23- "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
2424 "github.com/ydb-platform/ydb-go-sdk/v3/query"
2525 "github.com/ydb-platform/ydb-go-sdk/v3/trace"
2626)
4848 nodeID uint32
4949 status atomic.Uint32
5050 onChangeStatus []func (status Status )
51- closeOnce func (ctx context. Context ) error
51+ closeOnce func ()
5252 }
5353)
5454
@@ -197,17 +197,9 @@ func (core *sessionCore) attach(ctx context.Context) (finalErr error) {
197197 return xerrors .WithStackTrace (err )
198198 }
199199
200- core .closeOnce = xsync .OnceFunc (func (ctx context.Context ) error {
200+ core .closeOnce = sync .OnceFunc (func () {
201+ defer close (core .done )
201202 defer cancelAttach ()
202-
203- core .SetStatus (StatusClosing )
204- defer core .SetStatus (StatusClosed )
205-
206- if err = core .deleteSession (ctx ); err != nil {
207- return xerrors .WithStackTrace (err )
208- }
209-
210- return nil
211203 })
212204
213205 if markGoroutineWithLabelNodeIDForAttachStream {
@@ -224,17 +216,10 @@ func (core *sessionCore) attach(ctx context.Context) (finalErr error) {
224216}
225217
226218func (core * sessionCore ) listenAttachStream (attachStream Ydb_Query_V1.QueryService_AttachSessionClient ) {
227- defer func () {
228- select {
229- case <- core .done :
230- return
231- default :
232- close (core .done )
233- }
234- }()
235-
236219 for core .IsAlive () {
237220 if _ , recvErr := attachStream .Recv (); recvErr != nil {
221+ core .closeOnce ()
222+
238223 return
239224 }
240225 }
@@ -281,14 +266,21 @@ func (core *sessionCore) IsAlive() bool {
281266}
282267
283268func (core * sessionCore ) Close (ctx context.Context ) (err error ) {
269+ defer core .closeOnce ()
270+
284271 select {
285272 case <- core .done :
286273 return nil
287274 default :
288- close (core .done )
275+ core .SetStatus (StatusClosing )
276+ defer core .SetStatus (StatusClosed )
277+
278+ if err = core .deleteSession (ctx ); err != nil {
279+ return xerrors .WithStackTrace (err )
280+ }
289281 }
290282
291- return core . closeOnce ( ctx )
283+ return nil
292284}
293285
294286func StatusFromErr (err error ) Status {
0 commit comments