Skip to content

Commit adf986d

Browse files
authored
set session status when closeOnce() is called instead of Close() (#1824)
* Set session status when `closeOnce()` is called instead of `Close()` * Add test for closed session status
1 parent f47246a commit adf986d

File tree

5 files changed

+248
-6
lines changed

5 files changed

+248
-6
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Fixed session closing in `ydb.WithExecuteDataQueryOverQueryClient(true)` scenario
2+
13
## v3.111.2
24
* Changed discovery and dns resolving log level to DEBUG
35

internal/query/session_core.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ func (core *sessionCore) attach(ctx context.Context) (finalErr error) {
198198
}
199199

200200
core.closeOnce = sync.OnceFunc(func() {
201+
core.SetStatus(StatusClosed)
201202
defer close(core.done)
202203
defer cancelAttach()
203204
})
@@ -273,7 +274,6 @@ func (core *sessionCore) Close(ctx context.Context) (err error) {
273274
return nil
274275
default:
275276
core.SetStatus(StatusClosing)
276-
defer core.SetStatus(StatusClosed)
277277

278278
if err = core.deleteSession(ctx); err != nil {
279279
return xerrors.WithStackTrace(err)

internal/query/session_core_test.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
package query
22

33
import (
4+
"context"
5+
"errors"
6+
"runtime"
7+
"runtime/debug"
8+
"sync"
49
"sync/atomic"
510
"testing"
611
"time"
@@ -9,6 +14,7 @@ import (
914
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
1015
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query"
1116
"go.uber.org/mock/gomock"
17+
"google.golang.org/grpc"
1218

1319
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
1420
)
@@ -58,5 +64,126 @@ func TestSessionCoreCancelAttachOnDone(t *testing.T) {
5864
core.closeOnce()
5965
require.GreaterOrEqual(t, recvMsgCounter.Load(), uint32(2))
6066
require.LessOrEqual(t, recvMsgCounter.Load(), uint32(3))
67+
require.Equal(t, core.Status(), StatusClosed.String())
68+
}, xtest.StopAfter(time.Second))
69+
}
70+
71+
func TestSessionCoreAttachError(t *testing.T) {
72+
xtest.TestManyTimes(t, func(t testing.TB) {
73+
ctx := xtest.Context(t)
74+
ctrl := gomock.NewController(t)
75+
client := NewMockQueryServiceClient(ctrl)
76+
client.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(&Ydb_Query.CreateSessionResponse{
77+
Status: Ydb.StatusIds_SUCCESS,
78+
SessionId: "123",
79+
}, nil)
80+
var sessionDeletes atomic.Int32
81+
done := make(chan struct{})
82+
sessionDeletes.Store(0)
83+
client.EXPECT().DeleteSession(gomock.Any(), gomock.Any()).DoAndReturn(
84+
func(
85+
context.Context,
86+
*Ydb_Query.DeleteSessionRequest,
87+
...grpc.CallOption,
88+
) (*Ydb_Query.DeleteSessionResponse, error) {
89+
sessionDeletes.Add(1)
90+
91+
return &Ydb_Query.DeleteSessionResponse{}, nil
92+
})
93+
attachStream := NewMockQueryService_AttachSessionClient(ctrl)
94+
attachStream.EXPECT().Recv().DoAndReturn(func() (*Ydb_Query.SessionState, error) {
95+
return nil, errSessionClosed
96+
}).AnyTimes()
97+
client.EXPECT().AttachSession(gomock.Any(), &Ydb_Query.AttachSessionRequest{
98+
SessionId: "123",
99+
}).Return(attachStream, nil)
100+
core, err := Open(ctx, client, func(core *sessionCore) {
101+
core.done = done
102+
})
103+
require.Error(t, err, errSessionClosed)
104+
require.Nil(t, core)
105+
}, xtest.StopAfter(time.Second))
106+
}
107+
108+
func TestSessionCoreClose(t *testing.T) {
109+
debug.SetTraceback("all")
110+
xtest.TestManyTimes(t, func(t testing.TB) {
111+
ctx := xtest.Context(t)
112+
ctrl := gomock.NewController(t)
113+
client := NewMockQueryServiceClient(ctrl)
114+
client.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(&Ydb_Query.CreateSessionResponse{
115+
Status: Ydb.StatusIds_SUCCESS,
116+
SessionId: "123",
117+
}, nil)
118+
attachStream := NewMockQueryService_AttachSessionClient(ctrl)
119+
var (
120+
done chan struct{}
121+
startRecv = make(chan struct{}, 1)
122+
stopRecv = make(chan struct{}, 1)
123+
unblock atomic.Bool
124+
sessionDeletes atomic.Uint32
125+
)
126+
unblock.Store(false)
127+
sessionDeletes.Store(0)
128+
attachStream.EXPECT().Recv().DoAndReturn(func() (*Ydb_Query.SessionState, error) {
129+
startRecv <- struct{}{}
130+
select {
131+
case <-done:
132+
return nil, errSessionClosed
133+
case stopRecv <- struct{}{}:
134+
return &Ydb_Query.SessionState{
135+
Status: Ydb.StatusIds_SUCCESS,
136+
}, nil
137+
}
138+
}).AnyTimes()
139+
client.EXPECT().AttachSession(gomock.Any(), &Ydb_Query.AttachSessionRequest{
140+
SessionId: "123",
141+
}).Return(attachStream, nil)
142+
client.EXPECT().DeleteSession(gomock.Any(), gomock.Any()).DoAndReturn(
143+
func(
144+
context.Context,
145+
*Ydb_Query.DeleteSessionRequest,
146+
...grpc.CallOption,
147+
) (*Ydb_Query.DeleteSessionResponse, error) {
148+
if sessionDeletes.CompareAndSwap(0, 1) {
149+
return &Ydb_Query.DeleteSessionResponse{
150+
Status: Ydb.StatusIds_SUCCESS,
151+
}, nil
152+
}
153+
sessionDeletes.Add(1)
154+
155+
return nil, errors.New("session not found")
156+
}).AnyTimes()
157+
core, err := Open(ctx, client, func(core *sessionCore) {
158+
done = core.done
159+
})
160+
require.NoError(t, err)
161+
require.NotNil(t, core)
162+
<-stopRecv
163+
164+
var wg sync.WaitGroup
165+
parallel := runtime.GOMAXPROCS(0)
166+
if parallel > 10 {
167+
parallel = 10
168+
}
169+
for i := 0; i < parallel; i++ {
170+
wg.Add(1)
171+
go func(i int) {
172+
defer wg.Done()
173+
for {
174+
if unblock.Load() {
175+
_ = core.Close(ctx)
176+
177+
break
178+
}
179+
}
180+
}(i)
181+
}
182+
unblock.Store(true)
183+
wg.Wait()
184+
_, ok := <-done
185+
require.False(t, ok)
186+
require.GreaterOrEqual(t, sessionDeletes.Load(), uint32(1))
187+
require.LessOrEqual(t, sessionDeletes.Load(), uint32(10))
61188
}, xtest.StopAfter(time.Second))
62189
}

tests/integration/helpers_test.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,23 @@ func (scope *scopeT) ConnectionString() string {
7979
return "grpc://localhost:2136/local"
8080
}
8181

82+
func (scope *scopeT) Endpoint() string {
83+
conn := "grpc://localhost:2136/local"
84+
if envString := os.Getenv("YDB_CONNECTION_STRING"); envString != "" {
85+
conn = envString
86+
}
87+
arr := strings.Split(conn, "/")
88+
return arr[len(arr)-2]
89+
}
90+
8291
func (scope *scopeT) AuthToken() string {
8392
return os.Getenv("YDB_ACCESS_TOKEN_CREDENTIALS")
8493
}
8594

95+
func (scope *scopeT) CertFile() string {
96+
return os.Getenv("YDB_SSL_ROOT_CERTIFICATES_FILE")
97+
}
98+
8699
func (scope *scopeT) Driver(opts ...ydb.Option) *ydb.Driver {
87100
return scope.driverNamed("default", opts...)
88101
}
@@ -117,18 +130,24 @@ func (scope *scopeT) driverNamed(name string, opts ...ydb.Option) *ydb.Driver {
117130
token := scope.AuthToken()
118131
if token == "" {
119132
scope.Logf("With empty auth token")
133+
opts = append(opts, ydb.WithAnonymousCredentials())
120134
} else {
121135
scope.Logf("With auth token")
136+
opts = append(opts, ydb.WithAccessTokenCredentials(token))
137+
}
138+
cert := scope.CertFile()
139+
if cert == "" {
140+
scope.Logf("Without tls")
141+
opts = append(opts, ydb.WithTLSSInsecureSkipVerify())
142+
} else {
143+
scope.Logf("With tls")
144+
opts = append(opts, ydb.WithCertificatesFromFile(cert))
122145
}
123146

124147
connectionContext, cancel := context.WithTimeout(scope.Ctx, time.Second*10)
125148
defer cancel()
126149

127-
driver, err := ydb.Open(connectionContext, connectionString,
128-
append(opts,
129-
ydb.WithAccessTokenCredentials(token),
130-
)...,
131-
)
150+
driver, err := ydb.Open(connectionContext, connectionString, opts...)
132151
clean := func() {
133152
if driver != nil {
134153
scope.Require.NoError(driver.Close(scope.Ctx))
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
//go:build integration
2+
// +build integration
3+
4+
package integration
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"testing"
10+
"time"
11+
12+
"github.com/stretchr/testify/require"
13+
"github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
14+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query"
15+
"google.golang.org/grpc"
16+
"google.golang.org/grpc/credentials/insecure"
17+
18+
"github.com/ydb-platform/ydb-go-sdk/v3"
19+
"github.com/ydb-platform/ydb-go-sdk/v3/table"
20+
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
21+
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
22+
)
23+
24+
func TestSessionCoreContextCancel(t *testing.T) {
25+
scope := newScope(t)
26+
tableName := fmt.Sprintf("/local/%s", t.Name())
27+
28+
conn, err := grpc.NewClient(scope.Endpoint(), grpc.WithTransportCredentials(insecure.NewCredentials()))
29+
require.NoError(t, err)
30+
defer func(conn *grpc.ClientConn) {
31+
err := conn.Close()
32+
require.NoError(t, err)
33+
}(conn)
34+
35+
db := scope.Driver(ydb.WithSessionPoolSizeLimit(1), ydb.WithExecuteDataQueryOverQueryClient(true))
36+
queryClient := Ydb_Query_V1.NewQueryServiceClient(conn)
37+
38+
_ = db.Table().Do(scope.Ctx, func(ctx context.Context, session table.Session) error {
39+
return session.DropTable(ctx, tableName)
40+
})
41+
42+
err = db.Table().Do(scope.Ctx, func(ctx context.Context, session table.Session) error {
43+
return session.CreateTable(ctx, tableName,
44+
options.WithColumn("id", types.Optional(types.TypeUint64)),
45+
options.WithColumn("v", types.Optional(types.TypeUint64)),
46+
options.WithPrimaryKeyColumn("id"),
47+
)
48+
})
49+
require.NoError(t, err)
50+
51+
writeTx := table.SerializableReadWriteTxControl(
52+
table.CommitTx(),
53+
)
54+
55+
var sessionID string
56+
_ = db.Table().Do(scope.Ctx, func(ctx context.Context, session table.Session) error {
57+
query := fmt.Sprintf("UPSERT INTO %s (id) VALUES (123)", t.Name())
58+
_, res, err := session.Execute(ctx, writeTx, query, nil)
59+
require.NoError(t, err)
60+
err = res.Close()
61+
require.NoError(t, err)
62+
sessionID = session.ID()
63+
return nil
64+
})
65+
releaseSession := make(chan struct{})
66+
go func() {
67+
_ = db.Table().Do(scope.Ctx, func(ctx context.Context, session table.Session) error {
68+
require.Equal(t, session.ID(), sessionID)
69+
query := fmt.Sprintf("UPSERT INTO %s (id, v) VALUES (123, 123)", t.Name())
70+
_, _, _ = session.Execute(ctx, writeTx, query, nil)
71+
<-releaseSession
72+
return nil
73+
})
74+
}()
75+
76+
time.Sleep(100 * time.Millisecond)
77+
_, err = queryClient.DeleteSession(scope.Ctx, &Ydb_Query.DeleteSessionRequest{SessionId: sessionID})
78+
require.NoError(t, err)
79+
80+
// after that, listenAttachStream should fail. now we return session to the pool
81+
82+
close(releaseSession)
83+
84+
time.Sleep(100 * time.Millisecond)
85+
86+
// next query should not be context canceled because of closed session, but should get a new session
87+
88+
err = db.Table().Do(scope.Ctx, func(ctx context.Context, session table.Session) error {
89+
query := fmt.Sprintf("UPSERT INTO %s (id, v) VALUES (124, 124)", t.Name())
90+
_, _, err = session.Execute(ctx, writeTx, query, nil)
91+
return err
92+
})
93+
require.NoError(t, err)
94+
}

0 commit comments

Comments
 (0)