Skip to content

Commit c3a4046

Browse files
committed
fixes
1 parent 4c27e84 commit c3a4046

File tree

7 files changed

+63
-294
lines changed

7 files changed

+63
-294
lines changed
File renamed without changes.

internal/coordination/session.go

Lines changed: 8 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package coordination
33
import (
44
"context"
55
"encoding/binary"
6-
"fmt"
76
"math"
87
"math/rand"
98
"sync"
@@ -35,7 +34,7 @@ type (
3534
sessionReconnectDelay time.Duration
3635
trace *trace.Coordination
3736

38-
ctx context.Context
37+
ctx context.Context //nolint:containedctx
3938
cancel context.CancelFunc
4039
sessionClosedChan chan struct{}
4140
controller *conversation.Controller
@@ -54,7 +53,7 @@ type (
5453
type lease struct {
5554
session *session
5655
name string
57-
ctx context.Context
56+
ctx context.Context //nolint:containedctx
5857
cancel context.CancelFunc
5958
}
6059

@@ -131,84 +130,8 @@ func (s *session) updateCancelStream(cancel context.CancelFunc) {
131130
s.cancelStream = cancel
132131
}
133132

134-
type sessionStream struct {
135-
sessionID uint64
136-
stream Ydb_Coordination_V1.CoordinationService_SessionClient
137-
cancelStream context.CancelFunc
138-
}
139-
140-
func newSessionStream(
141-
ctx context.Context,
142-
client Ydb_Coordination_V1.CoordinationServiceClient,
143-
t *trace.Coordination,
144-
) (
145-
_ *sessionStream, finalErr error,
146-
) {
147-
streamCtx, streamCancel := xcontext.WithCancel(xcontext.ValueOnly(ctx))
148-
defer func() {
149-
if finalErr != nil {
150-
streamCancel()
151-
}
152-
}()
153-
154-
onDone := trace.CoordinationOnNewSessionClient(t, &ctx,
155-
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/coordination.newSessionStream"),
156-
)
157-
defer func() {
158-
onDone(finalErr)
159-
}()
160-
161-
stream, err := client.Session(streamCtx)
162-
if err != nil {
163-
return nil, xerrors.WithStackTrace(err)
164-
}
165-
166-
err = stream.Send(&Ydb_Coordination.SessionRequest{
167-
Request: &Ydb_Coordination.SessionRequest_SessionStart_{
168-
SessionStart: &Ydb_Coordination.SessionRequest_SessionStart{
169-
Path: "/a/b/c",
170-
ProtectionKey: protectionKey,
171-
},
172-
},
173-
})
174-
if err != nil {
175-
return nil, xerrors.WithStackTrace(err)
176-
}
177-
178-
for {
179-
msg, err := stream.Recv()
180-
if err != nil {
181-
return nil, xerrors.WithStackTrace(err)
182-
}
183-
184-
switch t := msg.GetResponse().(type) {
185-
case *Ydb_Coordination.SessionResponse_SessionStarted_:
186-
return &sessionStream{
187-
sessionID: t.SessionStarted.GetSessionId(),
188-
stream: stream,
189-
cancelStream: streamCancel,
190-
}, nil
191-
case *Ydb_Coordination.SessionResponse_Failure_:
192-
return nil, xerrors.WithStackTrace(xerrors.FromOperation(t.Failure))
193-
case *Ydb_Coordination.SessionResponse_Ping:
194-
err := stream.Send(&Ydb_Coordination.SessionRequest{
195-
Request: &Ydb_Coordination.SessionRequest_Pong{
196-
Pong: &Ydb_Coordination.SessionRequest_PingPong{
197-
Opaque: t.Ping.GetOpaque(),
198-
},
199-
},
200-
})
201-
if err != nil {
202-
return nil, xerrors.WithStackTrace(err)
203-
}
204-
default:
205-
return nil, xerrors.WithStackTrace(fmt.Errorf("unexpected first message: %+v", msg))
206-
}
207-
}
208-
}
209-
210133
// Create a new gRPC stream using an independent context.
211-
func (s *session) newStream(
134+
func (s *session) newStream( //nolint:funlen
212135
streamCtx context.Context,
213136
cancelStream context.CancelFunc,
214137
) (Ydb_Coordination_V1.CoordinationService_SessionClient, error) {
@@ -230,8 +153,8 @@ func (s *session) newStream(
230153
sessionClient Ydb_Coordination_V1.CoordinationService_SessionClient
231154
err error
232155
)
233-
onDone := trace.CoordinationOnNewSessionClient(s.trace, &streamCtx,
234-
stack.FunctionID(""),
156+
onDone := trace.CoordinationOnSessionNewStream(s.trace, &streamCtx,
157+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/coordination.(*session).newStream"),
235158
)
236159
sessionClient, err = s.client.Session(streamCtx)
237160
onDone(err)
@@ -294,7 +217,7 @@ func (s *session) newStream(
294217
}
295218
}
296219

297-
func (s *session) mainLoop(ctx context.Context, path string, sessionStartedChan chan struct{}) {
220+
func (s *session) mainLoop(ctx context.Context, path string, sessionStartedChan chan struct{}) { //nolint:funlen
298221
defer func() {
299222
for _, f := range s.onClose {
300223
f(s)
@@ -351,8 +274,6 @@ func (s *session) mainLoop(ctx context.Context, path string, sessionStartedChan
351274
},
352275
}
353276

354-
fmt.Printf("[SEND]: %+v (%T)\n", startSession.GetRequest(), startSession.GetRequest())
355-
356277
err = sessionClient.Send(&startSession)
357278
if err != nil {
358279
// Reconnect if a session cannot be started in this stream.
@@ -482,7 +403,7 @@ func (s *session) mainLoop(ctx context.Context, path string, sessionStartedChan
482403
}
483404
}
484405

485-
func (s *session) receiveLoop(
406+
func (s *session) receiveLoop( //nolint:funlen
486407
wg *sync.WaitGroup,
487408
sessionClient Ydb_Coordination_V1.CoordinationService_SessionClient,
488409
cancelStream context.CancelFunc,
@@ -505,8 +426,6 @@ func (s *session) receiveLoop(
505426
}
506427
onDone(message, nil)
507428

508-
fmt.Printf("[RECV]: %+v (%T)\n", message.GetResponse(), message.GetResponse())
509-
510429
switch message.GetResponse().(type) {
511430
case *Ydb_Coordination.SessionResponse_Failure_:
512431
if message.GetFailure().GetStatus() == Ydb.StatusIds_SESSION_EXPIRED ||
@@ -589,8 +508,6 @@ func (s *session) sendLoop(
589508
return
590509
}
591510

592-
fmt.Printf("[SEND]: %+v (%T)\n", message.GetRequest(), message.GetRequest())
593-
594511
onSendDone := trace.CoordinationOnSessionSend(s.trace, message)
595512
err = sessionClient.Send(message)
596513
if err != nil {
@@ -861,7 +778,7 @@ func convertSemaphoreSession(
861778
return &result
862779
}
863780

864-
func (s *session) AcquireSemaphore(
781+
func (s *session) AcquireSemaphore( //nolint:funlen
865782
ctx context.Context,
866783
name string,
867784
count uint64,

internal/coordination/session_test.go

Lines changed: 0 additions & 182 deletions
This file was deleted.

log/coordination.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,10 +169,10 @@ func internalCoordination(
169169
}
170170
}
171171
},
172-
OnNewSessionClient: func(
173-
info trace.CoordinationNewSessionClientStartInfo,
172+
OnSessionNewStream: func(
173+
info trace.CoordinationSessionNewStreamStartInfo,
174174
) func(
175-
info trace.CoordinationNewSessionClientDoneInfo,
175+
info trace.CoordinationSessionNewStreamDoneInfo,
176176
) {
177177
if d.Details()&trace.CoordinationEvents == 0 {
178178
return nil
@@ -181,7 +181,7 @@ func internalCoordination(
181181
l.Log(ctx, "stream")
182182
start := time.Now()
183183

184-
return func(info trace.CoordinationNewSessionClientDoneInfo) {
184+
return func(info trace.CoordinationSessionNewStreamDoneInfo) {
185185
l.Log(ctx, "done",
186186
kv.Latency(start),
187187
kv.Error(info.Error),

tests/integration/coordination_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func TestCoordinationSemaphore(t *testing.T) {
3030
if err != nil {
3131
t.Fatalf("failed to connect: %v", err)
3232
}
33-
defer db.Close(ctx) // cleanup resources
33+
defer db.Close(ctx) // cleanup resourcess
3434

3535
const nodePath = "/local/coordination/node/test"
3636

0 commit comments

Comments
 (0)