Skip to content

Commit af1dcf7

Browse files
committed
fix closing slo over query service
1 parent 79d3cfc commit af1dcf7

File tree

9 files changed

+95
-92
lines changed

9 files changed

+95
-92
lines changed

internal/query/client.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config"
1212
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options"
1313
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
14+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
1415
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1516
"github.com/ydb-platform/ydb-go-sdk/v3/query"
1617
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
@@ -177,9 +178,18 @@ func New(ctx context.Context, balancer balancer, cfg *config.Config) *Client {
177178
pool.WithCreateItemTimeout[*Session, Session](cfg.SessionCreateTimeout()),
178179
pool.WithCloseItemTimeout[*Session, Session](cfg.SessionDeleteTimeout()),
179180
pool.WithCreateFunc(func(ctx context.Context) (_ *Session, err error) {
180-
s, err := createSession(ctx,
181-
client.grpcClient,
182-
withSessionTrace(cfg.Trace()),
181+
var (
182+
createCtx context.Context
183+
cancelCreate context.CancelFunc
184+
)
185+
if d := cfg.SessionCreateTimeout(); d > 0 {
186+
createCtx, cancelCreate = xcontext.WithTimeout(ctx, d)
187+
} else {
188+
createCtx, cancelCreate = xcontext.WithCancel(ctx)
189+
}
190+
defer cancelCreate()
191+
192+
s, err := createSession(createCtx, client.grpcClient, cfg,
183193
withSessionCheck(func(s *Session) bool {
184194
return balancer.HasNode(uint32(s.nodeID))
185195
}),

internal/query/client_test.go

Lines changed: 23 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
grpcStatus "google.golang.org/grpc/status"
1616

1717
"github.com/ydb-platform/ydb-go-sdk/v3/internal/pool"
18+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config"
1819
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1920
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
2021
"github.com/ydb-platform/ydb-go-sdk/v3/query"
@@ -39,9 +40,8 @@ func TestCreateSession(t *testing.T) {
3940
service.EXPECT().DeleteSession(gomock.Any(), gomock.Any()).Return(&Ydb_Query.DeleteSessionResponse{
4041
Status: Ydb.StatusIds_SUCCESS,
4142
}, nil)
42-
t.Log("createSession")
4343
attached := 0
44-
s, err := createSession(ctx, service, withSessionTrace(
44+
s, err := createSession(ctx, service, config.New(config.WithTrace(
4545
&trace.Query{
4646
OnSessionAttach: func(info trace.QuerySessionAttachStartInfo) func(info trace.QuerySessionAttachDoneInfo) {
4747
return func(info trace.QuerySessionAttachDoneInfo) {
@@ -56,7 +56,7 @@ func TestCreateSession(t *testing.T) {
5656
return nil
5757
},
5858
},
59-
))
59+
)))
6060
require.NoError(t, err)
6161
require.EqualValues(t, "test", s.id)
6262
require.EqualValues(t, 1, attached)
@@ -72,8 +72,7 @@ func TestCreateSession(t *testing.T) {
7272
ctrl := gomock.NewController(t)
7373
service := NewMockQueryServiceClient(ctrl)
7474
service.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(nil, grpcStatus.Error(grpcCodes.Unavailable, ""))
75-
t.Log("execute")
76-
_, err := createSession(ctx, service)
75+
_, err := createSession(ctx, service, config.New())
7776
require.Error(t, err)
7877
require.True(t, xerrors.IsTransportError(err, grpcCodes.Unavailable))
7978
}, xtest.StopAfter(time.Second))
@@ -89,8 +88,7 @@ func TestCreateSession(t *testing.T) {
8988
}, nil)
9089
service.EXPECT().AttachSession(gomock.Any(), gomock.Any()).Return(nil, grpcStatus.Error(grpcCodes.Unavailable, ""))
9190
service.EXPECT().DeleteSession(gomock.Any(), gomock.Any()).Return(nil, grpcStatus.Error(grpcCodes.Unavailable, ""))
92-
t.Log("execute")
93-
_, err := createSession(ctx, service)
91+
_, err := createSession(ctx, service, config.New())
9492
require.Error(t, err)
9593
require.True(t, xerrors.IsTransportError(err, grpcCodes.Unavailable))
9694
}, xtest.StopAfter(time.Second))
@@ -110,8 +108,7 @@ func TestCreateSession(t *testing.T) {
110108
service.EXPECT().DeleteSession(gomock.Any(), gomock.Any()).Return(&Ydb_Query.DeleteSessionResponse{
111109
Status: Ydb.StatusIds_SUCCESS,
112110
}, nil)
113-
t.Log("execute")
114-
_, err := createSession(ctx, service)
111+
_, err := createSession(ctx, service, config.New())
115112
require.Error(t, err)
116113
require.True(t, xerrors.IsTransportError(err, grpcCodes.Unavailable))
117114
}, xtest.StopAfter(time.Second))
@@ -126,8 +123,7 @@ func TestCreateSession(t *testing.T) {
126123
service.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(nil,
127124
xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_UNAVAILABLE)),
128125
)
129-
t.Log("execute")
130-
_, err := createSession(ctx, service)
126+
_, err := createSession(ctx, service, config.New())
131127
require.Error(t, err)
132128
require.True(t, xerrors.IsOperationError(err, Ydb.StatusIds_UNAVAILABLE))
133129
}, xtest.StopAfter(time.Second))
@@ -137,9 +133,9 @@ func TestCreateSession(t *testing.T) {
137133
ctx := xtest.Context(t)
138134
ctrl := gomock.NewController(t)
139135
attachStream := NewMockQueryService_AttachSessionClient(ctrl)
140-
attachStream.EXPECT().Recv().Return(&Ydb_Query.SessionState{
141-
Status: Ydb.StatusIds_UNAVAILABLE,
142-
}, nil).AnyTimes()
136+
attachStream.EXPECT().Recv().Return(nil,
137+
xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_UNAVAILABLE)),
138+
)
143139
service := NewMockQueryServiceClient(ctrl)
144140
service.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(&Ydb_Query.CreateSessionResponse{
145141
Status: Ydb.StatusIds_SUCCESS,
@@ -149,28 +145,29 @@ func TestCreateSession(t *testing.T) {
149145
service.EXPECT().DeleteSession(gomock.Any(), gomock.Any()).Return(&Ydb_Query.DeleteSessionResponse{
150146
Status: Ydb.StatusIds_SUCCESS,
151147
}, nil)
152-
t.Log("execute")
153-
_, err := createSession(ctx, service)
148+
_, err := createSession(ctx, service, config.New())
154149
require.Error(t, err)
155150
require.True(t, xerrors.IsOperationError(err, Ydb.StatusIds_UNAVAILABLE))
156151
}, xtest.StopAfter(time.Second))
157152
})
158153
})
159154
}
160155

161-
func newTestSession() (*Session, error) {
156+
func newTestSession(id string) *Session {
162157
return &Session{
158+
id: id,
163159
statusCode: statusIdle,
164-
trace: &trace.Query{},
165-
}, nil
160+
cfg: config.New(),
161+
}
166162
}
167163

168-
func newTestSessionWithClient(client Ydb_Query_V1.QueryServiceClient) (*Session, error) {
164+
func newTestSessionWithClient(id string, client Ydb_Query_V1.QueryServiceClient) *Session {
169165
return &Session{
166+
id: id,
170167
grpcClient: client,
171168
statusCode: statusIdle,
172-
trace: &trace.Query{},
173-
}, nil
169+
cfg: config.New(),
170+
}
174171
}
175172

176173
func testPool(
@@ -187,7 +184,7 @@ func TestDo(t *testing.T) {
187184
ctx := xtest.Context(t)
188185
t.Run("HappyWay", func(t *testing.T) {
189186
attempts, err := do(ctx, testPool(ctx, func(ctx context.Context) (*Session, error) {
190-
return newTestSession()
187+
return newTestSession("123"), nil
191188
}), func(ctx context.Context, s query.Session) error {
192189
return nil
193190
}, &trace.Query{})
@@ -197,7 +194,7 @@ func TestDo(t *testing.T) {
197194
t.Run("RetryableError", func(t *testing.T) {
198195
counter := 0
199196
attempts, err := do(ctx, testPool(ctx, func(ctx context.Context) (*Session, error) {
200-
return newTestSession()
197+
return newTestSession("123"), nil
201198
}), func(ctx context.Context, s query.Session) error {
202199
counter++
203200
if counter < 10 {
@@ -224,7 +221,7 @@ func TestDoTx(t *testing.T) {
224221
Status: Ydb.StatusIds_SUCCESS,
225222
}, nil)
226223
attempts, err := doTx(ctx, testPool(ctx, func(ctx context.Context) (*Session, error) {
227-
return newTestSessionWithClient(client)
224+
return newTestSessionWithClient("123", client), nil
228225
}), func(ctx context.Context, tx query.TxActor) error {
229226
return nil
230227
}, &trace.Query{})
@@ -245,7 +242,7 @@ func TestDoTx(t *testing.T) {
245242
Status: Ydb.StatusIds_SUCCESS,
246243
}, nil).AnyTimes()
247244
attempts, err := doTx(ctx, testPool(ctx, func(ctx context.Context) (*Session, error) {
248-
return newTestSessionWithClient(client)
245+
return newTestSessionWithClient("123", client), nil
249246
}), func(ctx context.Context, tx query.TxActor) error {
250247
counter++
251248
if counter < 10 {

internal/query/execute_query.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func execute(ctx context.Context, s *Session, c Ydb_Query_V1.QueryServiceClient,
6767
return nil, nil, xerrors.WithStackTrace(err)
6868
}
6969

70-
r, txID, err := newResult(ctx, stream, s.trace, cancelExecute)
70+
r, txID, err := newResult(ctx, stream, s.cfg.Trace(), cancelExecute)
7171
if err != nil {
7272
cancelExecute()
7373

@@ -78,5 +78,5 @@ func execute(ctx context.Context, s *Session, c Ydb_Query_V1.QueryServiceClient,
7878
return nil, r, nil
7979
}
8080

81-
return newTransaction(txID, s, s.trace), r, nil
81+
return newTransaction(txID, s), r, nil
8282
}

internal/query/execute_query_test.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -355,8 +355,7 @@ func TestExecute(t *testing.T) {
355355
stream.EXPECT().Recv().Return(nil, io.EOF)
356356
service := NewMockQueryServiceClient(ctrl)
357357
service.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).Return(stream, nil)
358-
t.Log("execute")
359-
tx, r, err := execute(ctx, &Session{id: "123"}, service, "", options.ExecuteSettings())
358+
tx, r, err := execute(ctx, newTestSession("123"), service, "", options.ExecuteSettings())
360359
require.NoError(t, err)
361360
defer r.Close(ctx)
362361
require.EqualValues(t, "456", tx.id)
@@ -469,7 +468,7 @@ func TestExecute(t *testing.T) {
469468
service := NewMockQueryServiceClient(ctrl)
470469
service.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).Return(nil, grpcStatus.Error(grpcCodes.Unavailable, ""))
471470
t.Log("execute")
472-
_, _, err := execute(ctx, &Session{id: "123"}, service, "", options.ExecuteSettings())
471+
_, _, err := execute(ctx, newTestSession("123"), service, "", options.ExecuteSettings())
473472
require.Error(t, err)
474473
require.True(t, xerrors.IsTransportError(err, grpcCodes.Unavailable))
475474
})
@@ -573,7 +572,7 @@ func TestExecute(t *testing.T) {
573572
service := NewMockQueryServiceClient(ctrl)
574573
service.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).Return(stream, nil)
575574
t.Log("execute")
576-
tx, r, err := execute(ctx, &Session{id: "123"}, service, "", options.ExecuteSettings())
575+
tx, r, err := execute(ctx, newTestSession("123"), service, "", options.ExecuteSettings())
577576
require.NoError(t, err)
578577
defer r.Close(ctx)
579578
require.EqualValues(t, "456", tx.id)
@@ -637,7 +636,7 @@ func TestExecute(t *testing.T) {
637636
service := NewMockQueryServiceClient(ctrl)
638637
service.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).Return(stream, nil)
639638
t.Log("execute")
640-
_, _, err := execute(ctx, &Session{id: "123"}, service, "", options.ExecuteSettings())
639+
_, _, err := execute(ctx, newTestSession("123"), service, "", options.ExecuteSettings())
641640
require.Error(t, err)
642641
require.True(t, xerrors.IsOperationError(err, Ydb.StatusIds_UNAVAILABLE))
643642
})
@@ -713,7 +712,7 @@ func TestExecute(t *testing.T) {
713712
service := NewMockQueryServiceClient(ctrl)
714713
service.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).Return(stream, nil)
715714
t.Log("execute")
716-
tx, r, err := execute(ctx, &Session{id: "123"}, service, "", options.ExecuteSettings())
715+
tx, r, err := execute(ctx, newTestSession("123"), service, "", options.ExecuteSettings())
717716
require.NoError(t, err)
718717
defer r.Close(ctx)
719718
require.EqualValues(t, "456", tx.id)

0 commit comments

Comments
 (0)