Skip to content

Commit 6c45f77

Browse files
committed
added newSessionStream
1 parent 3a7620d commit 6c45f77

File tree

2 files changed

+151
-17
lines changed

2 files changed

+151
-17
lines changed

internal/coordination/session.go

Lines changed: 52 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package coordination
33
import (
44
"context"
55
"encoding/binary"
6+
"fmt"
67
"math"
78
"math/rand"
89
"sync"
@@ -110,28 +111,55 @@ func (s *session) updateCancelStream(cancel context.CancelFunc) {
110111
s.cancelStream = cancel
111112
}
112113

113-
func newSessionClient(
114-
ctx context.Context, client Ydb_Coordination_V1.CoordinationServiceClient, t *trace.Coordination,
114+
type sessionStream struct {
115+
sessionID uint64
116+
stream Ydb_Coordination_V1.CoordinationService_SessionClient
117+
cancelStream context.CancelFunc
118+
}
119+
120+
func newSessionStream(
121+
ctx context.Context,
122+
client Ydb_Coordination_V1.CoordinationServiceClient,
123+
t *trace.Coordination,
115124
) (
116-
_ Ydb_Coordination_V1.CoordinationService_SessionClient, cancel context.CancelFunc, finalErr error,
125+
_ *sessionStream, finalErr error,
117126
) {
118127
streamCtx, streamCancel := xcontext.WithCancel(xcontext.ValueOnly(ctx))
128+
defer func() {
129+
if finalErr != nil {
130+
streamCancel()
131+
}
132+
}()
119133

120134
onDone := trace.CoordinationOnNewSessionClient(t, &ctx,
121-
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/coordination.newSessionClient"),
135+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/coordination.newSessionStream"),
122136
)
123137
defer func() {
124138
onDone(finalErr)
125139
}()
126140

127-
sessionClient, err := client.Session(streamCtx)
141+
stream, err := client.Session(streamCtx)
128142
if err != nil {
129-
streamCancel()
143+
return nil, xerrors.WithStackTrace(err)
144+
}
130145

131-
return nil, nil, xerrors.WithStackTrace(err)
146+
msg, err := stream.Recv()
147+
if err != nil {
148+
return nil, xerrors.WithStackTrace(err)
132149
}
133150

134-
return sessionClient, streamCancel, nil
151+
switch t := msg.GetResponse().(type) {
152+
case *Ydb_Coordination.SessionResponse_SessionStarted_:
153+
return &sessionStream{
154+
sessionID: t.SessionStarted.GetSessionId(),
155+
stream: stream,
156+
cancelStream: streamCancel,
157+
}, nil
158+
case *Ydb_Coordination.SessionResponse_Failure_:
159+
return nil, xerrors.WithStackTrace(xerrors.FromOperation(t.Failure))
160+
default:
161+
return nil, xerrors.WithStackTrace(fmt.Errorf("unexpected first message: %+v", msg))
162+
}
135163
}
136164

137165
// Create a new gRPC stream using an independent context.
@@ -155,14 +183,19 @@ func (s *session) newStream(
155183
for {
156184
result := make(chan Ydb_Coordination_V1.CoordinationService_SessionClient, 1)
157185
go func() {
158-
client, cancel, err := newSessionClient(streamCtx, s.client.client, s.client.config.Trace())
159-
if err == nil {
160-
select {
161-
case result <- client:
162-
default:
163-
cancel()
164-
}
165-
}
186+
var (
187+
sessionClient Ydb_Coordination_V1.CoordinationService_SessionClient
188+
err error
189+
)
190+
onDone := trace.CoordinationOnNewSessionClient(s.client.config.Trace(), &streamCtx,
191+
stack.FunctionID(""),
192+
)
193+
defer func() {
194+
onDone(err)
195+
}()
196+
197+
sessionClient, err = s.client.client.Session(streamCtx)
198+
result <- sessionClient
166199
}()
167200

168201
var client Ydb_Coordination_V1.CoordinationService_SessionClient
@@ -246,7 +279,7 @@ func (s *session) mainLoop(path string, sessionStartedChan chan struct{}) {
246279
// We intentionally place a stream context outside the scope of any existing contexts to make an attempt to
247280
// close the session gracefully at the end of the main loop.
248281

249-
streamCtx, cancelStream := context.WithCancel(ctx)
282+
streamCtx, cancelStream := xcontext.WithCancel(ctx)
250283
sessionClient, err := s.newStream(streamCtx, cancelStream)
251284
if err != nil {
252285
// Giving up, we can do nothing without a stream.
@@ -416,6 +449,8 @@ func (s *session) receiveLoop(
416449
}
417450
onDone(message, nil)
418451

452+
fmt.Printf("---RECV: %+v (%T)\n", message.GetResponse(), message.GetResponse())
453+
419454
switch message.GetResponse().(type) {
420455
case *Ydb_Coordination.SessionResponse_Failure_:
421456
if message.GetFailure().GetStatus() == Ydb.StatusIds_SESSION_EXPIRED ||
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package coordination
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
8+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Coordination"
9+
"go.uber.org/mock/gomock"
10+
grpcCodes "google.golang.org/grpc/codes"
11+
grpcStatus "google.golang.org/grpc/status"
12+
13+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
14+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
15+
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
16+
)
17+
18+
func TestNewSessionStream(t *testing.T) {
19+
t.Run("HappyWay", func(t *testing.T) {
20+
ctx := xtest.Context(t)
21+
ctrl := gomock.NewController(t)
22+
client := NewMockCoordinationServiceClient(ctrl)
23+
sessionStream := NewMockCoordinationService_SessionClient(ctrl)
24+
sessionStream.EXPECT().Recv().Return(&Ydb_Coordination.SessionResponse{
25+
Response: &Ydb_Coordination.SessionResponse_SessionStarted_{
26+
SessionStarted: &Ydb_Coordination.SessionResponse_SessionStarted{
27+
SessionId: 123456789,
28+
TimeoutMillis: 987654321,
29+
},
30+
},
31+
}, nil)
32+
client.EXPECT().Session(gomock.Any()).Return(sessionStream, nil)
33+
s, err := newSessionStream(ctx, client, &trace.Coordination{})
34+
require.NoError(t, err)
35+
require.NotNil(t, s)
36+
require.EqualValues(t, 123456789, s.sessionID)
37+
})
38+
t.Run("TransportError", func(t *testing.T) {
39+
t.Run("On", func(t *testing.T) {
40+
t.Run("NewStream", func(t *testing.T) {
41+
ctx := xtest.Context(t)
42+
ctrl := gomock.NewController(t)
43+
client := NewMockCoordinationServiceClient(ctrl)
44+
sessionStream := NewMockCoordinationService_SessionClient(ctrl)
45+
sessionStream.EXPECT().Recv().Return(nil,
46+
xerrors.Transport(grpcStatus.Error(grpcCodes.Unavailable, "")),
47+
)
48+
client.EXPECT().Session(gomock.Any()).Return(sessionStream, nil)
49+
s, err := newSessionStream(ctx, client, &trace.Coordination{})
50+
require.Error(t, err)
51+
require.True(t, xerrors.IsTransportError(err, grpcCodes.Unavailable))
52+
require.Nil(t, s)
53+
})
54+
t.Run("Recv", func(t *testing.T) {
55+
ctx := xtest.Context(t)
56+
ctrl := gomock.NewController(t)
57+
client := NewMockCoordinationServiceClient(ctrl)
58+
client.EXPECT().Session(gomock.Any()).Return(nil,
59+
xerrors.Transport(grpcStatus.Error(grpcCodes.ResourceExhausted, "")),
60+
)
61+
s, err := newSessionStream(ctx, client, &trace.Coordination{})
62+
require.True(t, xerrors.IsTransportError(err, grpcCodes.ResourceExhausted))
63+
require.Nil(t, s)
64+
})
65+
})
66+
})
67+
t.Run("OperationError", func(t *testing.T) {
68+
t.Run("On", func(t *testing.T) {
69+
t.Run("NewStream", func(t *testing.T) {
70+
ctx := xtest.Context(t)
71+
ctrl := gomock.NewController(t)
72+
client := NewMockCoordinationServiceClient(ctrl)
73+
client.EXPECT().Session(gomock.Any()).Return(
74+
nil, xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_ABORTED)),
75+
)
76+
s, err := newSessionStream(ctx, client, &trace.Coordination{})
77+
require.True(t, xerrors.IsOperationError(err, Ydb.StatusIds_ABORTED))
78+
require.Nil(t, s)
79+
})
80+
t.Run("Recv", func(t *testing.T) {
81+
ctx := xtest.Context(t)
82+
ctrl := gomock.NewController(t)
83+
client := NewMockCoordinationServiceClient(ctrl)
84+
sessionStream := NewMockCoordinationService_SessionClient(ctrl)
85+
sessionStream.EXPECT().Recv().Return(&Ydb_Coordination.SessionResponse{
86+
Response: &Ydb_Coordination.SessionResponse_Failure_{
87+
Failure: &Ydb_Coordination.SessionResponse_Failure{
88+
Status: Ydb.StatusIds_ABORTED,
89+
},
90+
},
91+
}, nil)
92+
client.EXPECT().Session(gomock.Any()).Return(sessionStream, nil)
93+
s, err := newSessionStream(ctx, client, &trace.Coordination{})
94+
require.True(t, xerrors.IsOperationError(err, Ydb.StatusIds_ABORTED))
95+
require.Nil(t, s)
96+
})
97+
})
98+
})
99+
}

0 commit comments

Comments
 (0)