Skip to content

Commit 890f81e

Browse files
authored
Merge branch 'master' into tx-actor
2 parents f78c13f + 53cbd2f commit 890f81e

File tree

5 files changed

+18
-28
lines changed

5 files changed

+18
-28
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
* Added `query.TransactionActor` type alias to `query.TxActor` for compatibility with `table.Client` API's
2+
* Removed comment `experimental` from `ydb.ParamsBuilder` and `ydb.ParamsFromMap`
3+
* Fixed panic on closing `internal/query/sessionCore.done` channel twice
24
* Fixed hangup when try to send batch of messages with size more, then grpc limits from topic writer internals
35

46
## v3.101.2

internal/query/session_core.go

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"os"
66
"runtime/pprof"
77
"strconv"
8+
"sync"
89
"sync/atomic"
910
"time"
1011

@@ -20,7 +21,6 @@ import (
2021
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
2122
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
2223
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
23-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
2424
"github.com/ydb-platform/ydb-go-sdk/v3/query"
2525
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
2626
)
@@ -48,7 +48,7 @@ type (
4848
nodeID uint32
4949
status atomic.Uint32
5050
onChangeStatus []func(status Status)
51-
closeOnce func(ctx context.Context) error
51+
closeOnce func()
5252
}
5353
)
5454

@@ -197,17 +197,9 @@ func (core *sessionCore) attach(ctx context.Context) (finalErr error) {
197197
return xerrors.WithStackTrace(err)
198198
}
199199

200-
core.closeOnce = xsync.OnceFunc(func(ctx context.Context) error {
200+
core.closeOnce = sync.OnceFunc(func() {
201+
defer close(core.done)
201202
defer cancelAttach()
202-
203-
core.SetStatus(StatusClosing)
204-
defer core.SetStatus(StatusClosed)
205-
206-
if err = core.deleteSession(ctx); err != nil {
207-
return xerrors.WithStackTrace(err)
208-
}
209-
210-
return nil
211203
})
212204

213205
if markGoroutineWithLabelNodeIDForAttachStream {
@@ -224,17 +216,10 @@ func (core *sessionCore) attach(ctx context.Context) (finalErr error) {
224216
}
225217

226218
func (core *sessionCore) listenAttachStream(attachStream Ydb_Query_V1.QueryService_AttachSessionClient) {
227-
defer func() {
228-
select {
229-
case <-core.done:
230-
return
231-
default:
232-
close(core.done)
233-
}
234-
}()
235-
236219
for core.IsAlive() {
237220
if _, recvErr := attachStream.Recv(); recvErr != nil {
221+
core.closeOnce()
222+
238223
return
239224
}
240225
}
@@ -281,14 +266,21 @@ func (core *sessionCore) IsAlive() bool {
281266
}
282267

283268
func (core *sessionCore) Close(ctx context.Context) (err error) {
269+
defer core.closeOnce()
270+
284271
select {
285272
case <-core.done:
286273
return nil
287274
default:
288-
close(core.done)
275+
core.SetStatus(StatusClosing)
276+
defer core.SetStatus(StatusClosed)
277+
278+
if err = core.deleteSession(ctx); err != nil {
279+
return xerrors.WithStackTrace(err)
280+
}
289281
}
290282

291-
return core.closeOnce(ctx)
283+
return nil
292284
}
293285

294286
func StatusFromErr(err error) Status {

internal/query/session_core_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func TestSessionCoreCancelAttachOnDone(t *testing.T) {
5555
<-stopRecv
5656
require.Equal(t, uint32(2), recvMsgCounter.Load())
5757
<-startRecv
58-
close(done)
58+
core.closeOnce()
5959
require.GreaterOrEqual(t, recvMsgCounter.Load(), uint32(2))
6060
require.LessOrEqual(t, recvMsgCounter.Load(), uint32(3))
6161
}, xtest.StopAfter(time.Second))

params_builder.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package ydb
33
import "github.com/ydb-platform/ydb-go-sdk/v3/internal/params"
44

55
// ParamsBuilder used for create query arguments instead of tons options.
6-
//
7-
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
86
func ParamsBuilder() params.Builder {
97
return params.Builder{}
108
}

params_map.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ func (p wrongParameters) ToYDB(a *allocator.Allocator) (map[string]*Ydb.TypedVal
2020
}
2121

2222
// ParamsFromMap build parameters from named map
23-
//
24-
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
2523
func ParamsFromMap(m map[string]any) params.Parameters {
2624
namedParameters := make([]any, 0, len(m))
2725
for name, val := range m {

0 commit comments

Comments
 (0)