Skip to content

Commit 553583a

Browse files
authored
Merge pull request #1135 from ydb-platform/remove-sessions-from-discovery
added node check when session check alive
2 parents 25b9a06 + 5dc1baf commit 553583a

File tree

6 files changed

+54
-82
lines changed

6 files changed

+54
-82
lines changed

internal/query/client.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,13 @@ import (
1919

2020
//go:generate mockgen -destination grpc_client_mock_test.go -package query -write_package_comment=false github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1 QueryServiceClient,QueryService_AttachSessionClient,QueryService_ExecuteQueryClient
2121

22+
type nodeChecker interface {
23+
HasNode(id uint32) bool
24+
}
25+
2226
type balancer interface {
2327
grpc.ClientConnInterface
28+
nodeChecker
2429
}
2530

2631
var _ query.Client = (*Client)(nil)
@@ -157,7 +162,7 @@ func New(ctx context.Context, balancer balancer, cfg *config.Config) (_ *Client,
157162
pool.WithMaxSize[*Session, Session](cfg.PoolMaxSize()),
158163
pool.WithProducersCount[*Session, Session](cfg.PoolProducersCount()),
159164
pool.WithTrace[*Session, Session](poolTrace(cfg.Trace())),
160-
pool.WithCreateFunc(func(ctx context.Context) (s *Session, err error) {
165+
pool.WithCreateFunc(func(ctx context.Context) (_ *Session, err error) {
161166
var cancel context.CancelFunc
162167
if d := cfg.SessionCreateTimeout(); d > 0 {
163168
ctx, cancel = xcontext.WithTimeout(ctx, d)
@@ -166,7 +171,13 @@ func New(ctx context.Context, balancer balancer, cfg *config.Config) (_ *Client,
166171
}
167172
defer cancel()
168173

169-
s, err = createSession(ctx, client.grpcClient, withSessionTrace(t))
174+
s, err := createSession(ctx,
175+
client.grpcClient,
176+
withSessionTrace(t),
177+
withSessionCheck(func(s *Session) bool {
178+
return balancer.HasNode(uint32(s.nodeID))
179+
}),
180+
)
170181
if err != nil {
171182
return nil, xerrors.WithStackTrace(err)
172183
}

internal/query/client_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ func TestCreateSession(t *testing.T) {
4444
attached := 0
4545
s, err := createSession(ctx, service, withSessionTrace(
4646
&traceSession{
47+
onCreate: func(ctx context.Context, functionID stack.Caller) func(*Session, error) {
48+
return func(session *Session, err error) {
49+
}
50+
},
4751
onAttach: func(ctx context.Context, functionID stack.Caller, s *Session) func(err error) {
4852
return func(err error) {
4953
if err == nil {

internal/query/result_set.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ func (rs *resultSet) next(ctx context.Context) (*row, error) {
5858
}
5959
rs.rowIndex = 0
6060
rs.currentPart = part
61+
if part == nil {
62+
close(rs.done)
63+
64+
return nil, xerrors.WithStackTrace(io.EOF)
65+
}
6166
}
6267
if rs.index != rs.currentPart.GetResultSetIndex() {
6368
close(rs.done)

internal/query/session.go

Lines changed: 28 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type (
2727
statusCode statusCode
2828
trace *traceSession
2929
closeOnce func(ctx context.Context) error
30+
checks []func(s *Session) bool
3031
}
3132
traceSession struct {
3233
onCreate func(ctx context.Context, functionID stack.Caller) func(*Session, error)
@@ -36,74 +37,15 @@ type (
3637
sessionOption func(session *Session)
3738
)
3839

39-
func withSessionTrace(t *traceSession) sessionOption {
40+
func withSessionCheck(f func(*Session) bool) sessionOption {
4041
return func(s *Session) {
41-
if t.onCreate != nil {
42-
h1 := s.trace.onCreate
43-
h2 := t.onCreate
44-
s.trace.onCreate = func(ctx context.Context, functionID stack.Caller) func(*Session, error) {
45-
var r, r1 func(*Session, error)
46-
if h1 != nil {
47-
r = h1(ctx, functionID)
48-
}
49-
if h2 != nil {
50-
r1 = h2(ctx, functionID)
51-
}
52-
53-
return func(session *Session, err error) {
54-
if r != nil {
55-
r(session, err)
56-
}
57-
if r1 != nil {
58-
r1(session, err)
59-
}
60-
}
61-
}
62-
}
63-
if t.onAttach != nil {
64-
h1 := s.trace.onAttach
65-
h2 := t.onAttach
66-
s.trace.onAttach = func(ctx context.Context, functionID stack.Caller, session *Session) func(error) {
67-
var r, r1 func(error)
68-
if h1 != nil {
69-
r = h1(ctx, functionID, session)
70-
}
71-
if h2 != nil {
72-
r1 = h2(ctx, functionID, session)
73-
}
74-
75-
return func(err error) {
76-
if r != nil {
77-
r(err)
78-
}
79-
if r1 != nil {
80-
r1(err)
81-
}
82-
}
83-
}
84-
}
85-
if t.onClose != nil {
86-
h1 := s.trace.onClose
87-
h2 := t.onClose
88-
s.trace.onClose = func(ctx context.Context, functionID stack.Caller, session *Session) func(error) {
89-
var r, r1 func(error)
90-
if h1 != nil {
91-
r = h1(ctx, functionID, session)
92-
}
93-
if h2 != nil {
94-
r1 = h2(ctx, functionID, session)
95-
}
42+
s.checks = append(s.checks, f)
43+
}
44+
}
9645

97-
return func(err error) {
98-
if r != nil {
99-
r(err)
100-
}
101-
if r1 != nil {
102-
r1(err)
103-
}
104-
}
105-
}
106-
}
46+
func withSessionTrace(t *traceSession) sessionOption {
47+
return func(s *Session) {
48+
s.trace = t
10749
}
10850
}
10951

@@ -114,6 +56,16 @@ func createSession(
11456
grpcClient: client,
11557
statusCode: statusUnknown,
11658
trace: defaultSessionTrace,
59+
checks: []func(*Session) bool{
60+
func(s *Session) bool {
61+
switch s.status() {
62+
case statusIdle, statusInUse:
63+
return true
64+
default:
65+
return false
66+
}
67+
},
68+
},
11769
}
11870
defer func() {
11971
if finalErr != nil {
@@ -214,12 +166,10 @@ func (s *Session) attach(ctx context.Context) (finalErr error) {
214166
s.setStatus(statusClosing)
215167
defer s.setStatus(statusClosed)
216168

217-
if s.trace.onClose != nil {
218-
onClose := s.trace.onClose(ctx, stack.FunctionID(""), s)
219-
defer func() {
220-
onClose(err)
221-
}()
222-
}
169+
onClose := s.trace.onClose(ctx, stack.FunctionID(""), s)
170+
defer func() {
171+
onClose(err)
172+
}()
223173

224174
if err = deleteSession(ctx, s.grpcClient, s.id); err != nil {
225175
return xerrors.WithStackTrace(err)
@@ -248,12 +198,13 @@ func deleteSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient,
248198
}
249199

250200
func (s *Session) IsAlive() bool {
251-
switch s.status() {
252-
case statusIdle, statusInUse:
253-
return true
254-
default:
255-
return false
201+
for _, check := range s.checks {
202+
if !check(s) {
203+
return false
204+
}
256205
}
206+
207+
return true
257208
}
258209

259210
func (s *Session) Close(ctx context.Context) error {

tests/integration/query_execute_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/ydb-platform/ydb-go-sdk/v3"
1616
"github.com/ydb-platform/ydb-go-sdk/v3/internal/version"
17+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
1718
"github.com/ydb-platform/ydb-go-sdk/v3/log"
1819
"github.com/ydb-platform/ydb-go-sdk/v3/query"
1920
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
@@ -24,7 +25,7 @@ func TestQueryExecute(t *testing.T) {
2425
t.Skip("query service not allowed in YDB version '" + os.Getenv("YDB_VERSION") + "'")
2526
}
2627

27-
ctx, cancel := context.WithCancel(context.Background())
28+
ctx, cancel := context.WithCancel(xtest.Context(t))
2829
defer cancel()
2930

3031
db, err := ydb.Open(ctx,
@@ -35,7 +36,7 @@ func TestQueryExecute(t *testing.T) {
3536
log.Default(os.Stdout,
3637
log.WithLogQuery(),
3738
log.WithColoring(),
38-
log.WithMinLevel(log.TRACE),
39+
log.WithMinLevel(log.INFO),
3940
),
4041
trace.QueryEvents,
4142
),

tests/integration/query_tx_execute_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func TestQueryTxExecute(t *testing.T) {
3333
log.Default(os.Stdout,
3434
log.WithLogQuery(),
3535
log.WithColoring(),
36-
log.WithMinLevel(log.TRACE),
36+
log.WithMinLevel(log.INFO),
3737
),
3838
trace.QueryEvents,
3939
),

0 commit comments

Comments
 (0)