Skip to content

Commit f887b4d

Browse files
committed
* Added trace.Driver.OnConnStreamFinish event
1 parent f400f65 commit f887b4d

File tree

7 files changed

+167
-80
lines changed

7 files changed

+167
-80
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
* Added `trace.DriverConnStreamEvents` details bit
2+
* Added `trace.Driver.OnConnStreamFinish` event
23

34
## v3.66.1
45
* Added flush messages from buffer before close topic writer

internal/conn/conn.go

Lines changed: 25 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ type Conn interface {
5151
type conn struct {
5252
mtx sync.RWMutex
5353
config Config // ro access
54-
cc *grpc.ClientConn
54+
grpcConn *grpc.ClientConn
5555
done chan struct{}
5656
endpoint endpoint.Endpoint // ro access
5757
closed bool
@@ -121,7 +121,7 @@ func (c *conn) park(ctx context.Context) (err error) {
121121
return nil
122122
}
123123

124-
if c.cc == nil {
124+
if c.grpcConn == nil {
125125
return nil
126126
}
127127

@@ -161,7 +161,7 @@ func (c *conn) setState(ctx context.Context, s State) State {
161161
func (c *conn) Unban(ctx context.Context) State {
162162
var newState State
163163
c.mtx.RLock()
164-
cc := c.cc
164+
cc := c.grpcConn
165165
c.mtx.RUnlock()
166166
if isAvailable(cc) {
167167
newState = Online
@@ -186,8 +186,8 @@ func (c *conn) realConn(ctx context.Context) (cc *grpc.ClientConn, err error) {
186186
c.mtx.Lock()
187187
defer c.mtx.Unlock()
188188

189-
if c.cc != nil {
190-
return c.cc, nil
189+
if c.grpcConn != nil {
190+
return c.grpcConn, nil
191191
}
192192

193193
if dialTimeout := c.config.DialTimeout(); dialTimeout > 0 {
@@ -234,10 +234,10 @@ func (c *conn) realConn(ctx context.Context) (cc *grpc.ClientConn, err error) {
234234
)
235235
}
236236

237-
c.cc = cc
237+
c.grpcConn = cc
238238
c.setState(ctx, Online)
239239

240-
return c.cc, nil
240+
return c.grpcConn, nil
241241
}
242242

243243
func (c *conn) onTransportError(ctx context.Context, cause error) {
@@ -252,11 +252,11 @@ func isAvailable(raw *grpc.ClientConn) bool {
252252

253253
// conn must be locked
254254
func (c *conn) close(ctx context.Context) (err error) {
255-
if c.cc == nil {
255+
if c.grpcConn == nil {
256256
return nil
257257
}
258-
err = c.cc.Close()
259-
c.cc = nil
258+
err = c.grpcConn.Close()
259+
c.grpcConn = nil
260260
c.setState(ctx, Offline)
261261

262262
return c.wrapError(err)
@@ -423,19 +423,23 @@ func (c *conn) NewStream(
423423

424424
ctx, sentMark := markContext(meta.WithTraceID(ctx, traceID))
425425

426-
ctx, cancel := xcontext.WithCancel(ctx)
426+
ctx, cancel := c.childStreams.WithCancel(ctx)
427427
defer func() {
428428
if finalErr != nil {
429429
cancel()
430-
} else {
431-
c.childStreams.Remember(&cancel)
432430
}
433431
}()
434432

435-
s, err := cc.NewStream(ctx, desc, method, append(opts, grpc.OnFinish(func(err error) {
436-
cancel()
437-
c.childStreams.Forget(&cancel)
438-
}))...)
433+
s := &grpcClientStream{
434+
parentConn: c,
435+
streamCtx: ctx,
436+
streamCancel: cancel,
437+
wrapping: useWrapping,
438+
traceID: traceID,
439+
sentMark: sentMark,
440+
}
441+
442+
s.stream, err = cc.NewStream(ctx, desc, method, append(opts, grpc.OnFinish(s.finish))...)
439443
if err != nil {
440444
if xerrors.IsContextError(err) {
441445
return nil, xerrors.WithStackTrace(err)
@@ -451,25 +455,16 @@ func (c *conn) NewStream(
451455
xerrors.WithTraceID(traceID),
452456
)
453457
if sentMark.canRetry() {
454-
return s, c.wrapError(xerrors.Retryable(err, xerrors.WithName("NewStream")))
458+
return nil, c.wrapError(xerrors.Retryable(err, xerrors.WithName("NewStream")))
455459
}
456460

457-
return s, c.wrapError(err)
461+
return nil, c.wrapError(err)
458462
}
459463

460-
return s, err
464+
return nil, err
461465
}
462466

463-
return &grpcClientStream{
464-
ClientStream: s,
465-
c: c,
466-
wrapping: useWrapping,
467-
traceID: traceID,
468-
sentMark: sentMark,
469-
onDone: func(ctx context.Context, md metadata.MD) {
470-
meta.CallTrailerCallback(ctx, md)
471-
},
472-
}, nil
467+
return s, nil
473468
}
474469

475470
func (c *conn) wrapError(err error) error {

internal/conn/grpc_client_stream.go

Lines changed: 49 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -8,36 +8,50 @@ import (
88
"google.golang.org/grpc"
99
"google.golang.org/grpc/metadata"
1010

11+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/meta"
1112
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
1213
"github.com/ydb-platform/ydb-go-sdk/v3/internal/wrap"
1314
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1415
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
1516
)
1617

1718
type grpcClientStream struct {
18-
grpc.ClientStream
19-
c *conn
20-
wrapping bool
21-
traceID string
22-
sentMark *modificationMark
23-
onDone func(ctx context.Context, md metadata.MD)
19+
parentConn *conn
20+
stream grpc.ClientStream
21+
streamCtx context.Context //nolint:containedctx
22+
streamCancel context.CancelFunc
23+
wrapping bool
24+
traceID string
25+
sentMark *modificationMark
26+
}
27+
28+
func (s *grpcClientStream) Header() (metadata.MD, error) {
29+
return s.stream.Header()
30+
}
31+
32+
func (s *grpcClientStream) Trailer() metadata.MD {
33+
return s.stream.Trailer()
34+
}
35+
36+
func (s *grpcClientStream) Context() context.Context {
37+
return s.stream.Context()
2438
}
2539

2640
func (s *grpcClientStream) CloseSend() (err error) {
2741
var (
28-
ctx = s.Context()
29-
onDone = trace.DriverOnConnStreamCloseSend(s.c.config.Trace(), &ctx,
42+
ctx = s.streamCtx
43+
onDone = trace.DriverOnConnStreamCloseSend(s.parentConn.config.Trace(), &ctx,
3044
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).CloseSend"),
3145
)
3246
)
3347
defer func() {
3448
onDone(err)
3549
}()
3650

37-
stop := s.c.lastUsage.Start()
51+
stop := s.parentConn.lastUsage.Start()
3852
defer stop()
3953

40-
err = s.ClientStream.CloseSend()
54+
err = s.stream.CloseSend()
4155

4256
if err != nil {
4357
if xerrors.IsContextError(err) {
@@ -48,7 +62,7 @@ func (s *grpcClientStream) CloseSend() (err error) {
4862
return s.wrapError(
4963
xerrors.Transport(
5064
err,
51-
xerrors.WithAddress(s.c.Address()),
65+
xerrors.WithAddress(s.parentConn.Address()),
5266
xerrors.WithTraceID(s.traceID),
5367
),
5468
)
@@ -62,32 +76,32 @@ func (s *grpcClientStream) CloseSend() (err error) {
6276

6377
func (s *grpcClientStream) SendMsg(m interface{}) (err error) {
6478
var (
65-
ctx = s.Context()
66-
onDone = trace.DriverOnConnStreamSendMsg(s.c.config.Trace(), &ctx,
79+
ctx = s.streamCtx
80+
onDone = trace.DriverOnConnStreamSendMsg(s.parentConn.config.Trace(), &ctx,
6781
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).SendMsg"),
6882
)
6983
)
7084
defer func() {
7185
onDone(err)
7286
}()
7387

74-
stop := s.c.lastUsage.Start()
88+
stop := s.parentConn.lastUsage.Start()
7589
defer stop()
7690

77-
err = s.ClientStream.SendMsg(m)
91+
err = s.stream.SendMsg(m)
7892

7993
if err != nil {
8094
if xerrors.IsContextError(err) {
8195
return xerrors.WithStackTrace(err)
8296
}
8397

8498
defer func() {
85-
s.c.onTransportError(ctx, err)
99+
s.parentConn.onTransportError(ctx, err)
86100
}()
87101

88102
if s.wrapping {
89103
err = xerrors.Transport(err,
90-
xerrors.WithAddress(s.c.Address()),
104+
xerrors.WithAddress(s.parentConn.Address()),
91105
xerrors.WithTraceID(s.traceID),
92106
)
93107
if s.sentMark.canRetry() {
@@ -105,28 +119,31 @@ func (s *grpcClientStream) SendMsg(m interface{}) (err error) {
105119
return nil
106120
}
107121

122+
func (s *grpcClientStream) finish(err error) {
123+
s.streamCancel()
124+
trace.DriverOnConnStreamFinish(s.parentConn.config.Trace(), s.streamCtx,
125+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).finish"), err,
126+
)
127+
}
128+
108129
func (s *grpcClientStream) RecvMsg(m interface{}) (err error) {
109130
var (
110-
ctx = s.Context()
111-
onDone = trace.DriverOnConnStreamRecvMsg(s.c.config.Trace(), &ctx,
131+
ctx = s.streamCtx
132+
onDone = trace.DriverOnConnStreamRecvMsg(s.parentConn.config.Trace(), &ctx,
112133
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).RecvMsg"),
113134
)
114135
)
115136
defer func() {
116137
onDone(err)
117-
}()
118-
119-
stop := s.c.lastUsage.Start()
120-
defer stop()
121-
122-
defer func() {
123138
if err != nil {
124-
md := s.ClientStream.Trailer()
125-
s.onDone(ctx, md)
139+
meta.CallTrailerCallback(s.streamCtx, s.stream.Trailer())
126140
}
127141
}()
128142

129-
err = s.ClientStream.RecvMsg(m)
143+
stop := s.parentConn.lastUsage.Start()
144+
defer stop()
145+
146+
err = s.stream.RecvMsg(m)
130147

131148
if err != nil { //nolint:nestif
132149
if xerrors.IsContextError(err) {
@@ -135,13 +152,13 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) {
135152

136153
defer func() {
137154
if !xerrors.Is(err, io.EOF) {
138-
s.c.onTransportError(ctx, err)
155+
s.parentConn.onTransportError(ctx, err)
139156
}
140157
}()
141158

142159
if s.wrapping {
143160
err = xerrors.Transport(err,
144-
xerrors.WithAddress(s.c.Address()),
161+
xerrors.WithAddress(s.parentConn.Address()),
145162
)
146163
if s.sentMark.canRetry() {
147164
return s.wrapError(xerrors.Retryable(err,
@@ -161,7 +178,7 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) {
161178
return s.wrapError(
162179
xerrors.Operation(
163180
xerrors.FromOperation(operation),
164-
xerrors.WithAddress(s.c.Address()),
181+
xerrors.WithAddress(s.parentConn.Address()),
165182
),
166183
)
167184
}
@@ -177,7 +194,7 @@ func (s *grpcClientStream) wrapError(err error) error {
177194
}
178195

179196
return xerrors.WithStackTrace(
180-
newConnError(s.c.endpoint.NodeID(), s.c.endpoint.Address(), err),
197+
newConnError(s.parentConn.endpoint.NodeID(), s.parentConn.endpoint.Address(), err),
181198
xerrors.WithSkipDepth(1),
182199
)
183200
}

internal/xcontext/cancels_quard.go

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,27 +5,31 @@ import (
55
"sync"
66
)
77

8-
type CancelsGuard struct {
9-
mu sync.Mutex
10-
cancels map[*context.CancelFunc]struct{}
11-
}
8+
type (
9+
CancelsGuard struct {
10+
mu sync.Mutex
11+
cancels map[*context.CancelFunc]struct{}
12+
}
13+
)
1214

1315
func NewCancelsGuard() *CancelsGuard {
1416
return &CancelsGuard{
1517
cancels: make(map[*context.CancelFunc]struct{}),
1618
}
1719
}
1820

19-
func (g *CancelsGuard) Remember(cancel *context.CancelFunc) {
21+
func (g *CancelsGuard) WithCancel(ctx context.Context) (context.Context, context.CancelFunc) {
2022
g.mu.Lock()
2123
defer g.mu.Unlock()
22-
g.cancels[cancel] = struct{}{}
23-
}
24+
ctx, cancel := WithCancel(ctx)
25+
g.cancels[&cancel] = struct{}{}
2426

25-
func (g *CancelsGuard) Forget(cancel *context.CancelFunc) {
26-
g.mu.Lock()
27-
defer g.mu.Unlock()
28-
delete(g.cancels, cancel)
27+
return ctx, func() {
28+
cancel()
29+
g.mu.Lock()
30+
defer g.mu.Unlock()
31+
delete(g.cancels, &cancel)
32+
}
2933
}
3034

3135
func (g *CancelsGuard) Cancel() {

internal/xcontext/cancels_quard_test.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,15 @@ import (
99

1010
func TestCancelsGuard(t *testing.T) {
1111
g := NewCancelsGuard()
12-
ctx, cancel1 := context.WithCancel(context.Background())
13-
g.Remember(&cancel1)
12+
ctx, cancel1 := g.WithCancel(context.Background())
1413
require.Len(t, g.cancels, 1)
15-
g.Forget(&cancel1)
14+
cancel1()
15+
require.Error(t, ctx.Err())
1616
require.Empty(t, g.cancels, 0)
17-
cancel2 := context.CancelFunc(func() {
18-
cancel1()
19-
})
20-
g.Remember(&cancel2)
17+
ctx, _ = g.WithCancel(context.Background())
2118
require.Len(t, g.cancels, 1)
19+
ctx, _ = g.WithCancel(ctx)
20+
require.Len(t, g.cancels, 2)
2221
g.Cancel()
2322
require.Error(t, ctx.Err())
2423
}

0 commit comments

Comments
 (0)