Skip to content

Commit 3a7620d

Browse files
committed
* Refactored coordination traces
1 parent 703927a commit 3a7620d

File tree

4 files changed

+58
-55
lines changed

4 files changed

+58
-55
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Refactored coordination traces
2+
13
## v3.91.0
24
* Added `ydb.WithPreferredNodeID(ctx, nodeID)` context modifier for trying to execute queries on given nodeID
35

internal/coordination/session.go

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ import (
1515
"github.com/ydb-platform/ydb-go-sdk/v3/coordination"
1616
"github.com/ydb-platform/ydb-go-sdk/v3/coordination/options"
1717
"github.com/ydb-platform/ydb-go-sdk/v3/internal/coordination/conversation"
18+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
1819
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
20+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1921
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
2022
)
2123

@@ -59,13 +61,13 @@ func createSession(
5961
client.sessionCreated(&s)
6062

6163
sessionStartedChan := make(chan struct{})
62-
go s.mainLoop(path, sessionStartedChan)
64+
go s.mainLoop(xcontext.ValueOnly(ctx), path, sessionStartedChan)
6365

6466
select {
6567
case <-ctx.Done():
6668
cancel()
6769

68-
return nil, ctx.Err()
70+
return nil, xerrors.WithStackTrace(ctx.Err())
6971
case <-sessionStartedChan:
7072
}
7173

@@ -108,6 +110,30 @@ func (s *session) updateCancelStream(cancel context.CancelFunc) {
108110
s.cancelStream = cancel
109111
}
110112

113+
func newSessionClient(
114+
ctx context.Context, client Ydb_Coordination_V1.CoordinationServiceClient, t *trace.Coordination,
115+
) (
116+
_ Ydb_Coordination_V1.CoordinationService_SessionClient, cancel context.CancelFunc, finalErr error,
117+
) {
118+
streamCtx, streamCancel := xcontext.WithCancel(xcontext.ValueOnly(ctx))
119+
120+
onDone := trace.CoordinationOnNewSessionClient(t, &ctx,
121+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/coordination.newSessionClient"),
122+
)
123+
defer func() {
124+
onDone(finalErr)
125+
}()
126+
127+
sessionClient, err := client.Session(streamCtx)
128+
if err != nil {
129+
streamCancel()
130+
131+
return nil, nil, xerrors.WithStackTrace(err)
132+
}
133+
134+
return sessionClient, streamCancel, nil
135+
}
136+
111137
// Create a new gRPC stream using an independent context.
112138
//
113139
//nolint:funlen
@@ -129,14 +155,14 @@ func (s *session) newStream(
129155
for {
130156
result := make(chan Ydb_Coordination_V1.CoordinationService_SessionClient, 1)
131157
go func() {
132-
var err error
133-
onDone := trace.CoordinationOnStreamNew(s.client.config.Trace())
134-
defer func() {
135-
onDone(err)
136-
}()
137-
138-
client, err := s.client.client.Session(streamCtx)
139-
result <- client
158+
client, cancel, err := newSessionClient(streamCtx, s.client.client, s.client.config.Trace())
159+
if err == nil {
160+
select {
161+
case result <- client:
162+
default:
163+
cancel()
164+
}
165+
}
140166
}()
141167

142168
var client Ydb_Coordination_V1.CoordinationService_SessionClient
@@ -220,7 +246,7 @@ func (s *session) mainLoop(path string, sessionStartedChan chan struct{}) {
220246
// We intentionally place a stream context outside the scope of any existing contexts to make an attempt to
221247
// close the session gracefully at the end of the main loop.
222248

223-
streamCtx, cancelStream := context.WithCancel(context.Background())
249+
streamCtx, cancelStream := context.WithCancel(ctx)
224250
sessionClient, err := s.newStream(streamCtx, cancelStream)
225251
if err != nil {
226252
// Giving up, we can do nothing without a stream.

log/coordination.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,10 +169,10 @@ func internalCoordination(
169169
}
170170
}
171171
},
172-
OnStreamNew: func(
173-
info trace.CoordinationStreamNewStartInfo,
172+
OnNewSessionClient: func(
173+
info trace.CoordinationNewSessionClientStartInfo,
174174
) func(
175-
info trace.CoordinationStreamNewDoneInfo,
175+
info trace.CoordinationNewSessionClientDoneInfo,
176176
) {
177177
if d.Details()&trace.CoordinationEvents == 0 {
178178
return nil
@@ -181,7 +181,7 @@ func internalCoordination(
181181
l.Log(ctx, "stream")
182182
start := time.Now()
183183

184-
return func(info trace.CoordinationStreamNewDoneInfo) {
184+
return func(info trace.CoordinationNewSessionClientDoneInfo) {
185185
l.Log(ctx, "done",
186186
kv.Latency(start),
187187
kv.Error(info.Error),

trace/coordination_gtrace.go

Lines changed: 15 additions & 40 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)