Skip to content

Commit 5a238a1

Browse files
authored
Merge branch 'master' into master
2 parents 36c3ba4 + 7842cf9 commit 5a238a1

File tree

13 files changed

+187
-117
lines changed

13 files changed

+187
-117
lines changed

CHANGELOG.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
1-
21
* Added type assertion checks to enhance type safety and prevent unexpected panics in critical sections of the codebase
32

3+
## v3.66.3
4+
* Fixed the OAuth2 test
5+
6+
## v3.66.2
7+
* Added `trace.DriverConnStreamEvents` details bit
8+
* Added `trace.Driver.OnConnStreamFinish` event
9+
410
## v3.66.1
511
* Added flush messages from buffer before close topic writer
612
* Added Flush method for topic writer

internal/conn/conn.go

Lines changed: 25 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ type Conn interface {
5151
type conn struct {
5252
mtx sync.RWMutex
5353
config Config // ro access
54-
cc *grpc.ClientConn
54+
grpcConn *grpc.ClientConn
5555
done chan struct{}
5656
endpoint endpoint.Endpoint // ro access
5757
closed bool
@@ -121,7 +121,7 @@ func (c *conn) park(ctx context.Context) (err error) {
121121
return nil
122122
}
123123

124-
if c.cc == nil {
124+
if c.grpcConn == nil {
125125
return nil
126126
}
127127

@@ -161,7 +161,7 @@ func (c *conn) setState(ctx context.Context, s State) State {
161161
func (c *conn) Unban(ctx context.Context) State {
162162
var newState State
163163
c.mtx.RLock()
164-
cc := c.cc
164+
cc := c.grpcConn
165165
c.mtx.RUnlock()
166166
if isAvailable(cc) {
167167
newState = Online
@@ -186,8 +186,8 @@ func (c *conn) realConn(ctx context.Context) (cc *grpc.ClientConn, err error) {
186186
c.mtx.Lock()
187187
defer c.mtx.Unlock()
188188

189-
if c.cc != nil {
190-
return c.cc, nil
189+
if c.grpcConn != nil {
190+
return c.grpcConn, nil
191191
}
192192

193193
if dialTimeout := c.config.DialTimeout(); dialTimeout > 0 {
@@ -234,10 +234,10 @@ func (c *conn) realConn(ctx context.Context) (cc *grpc.ClientConn, err error) {
234234
)
235235
}
236236

237-
c.cc = cc
237+
c.grpcConn = cc
238238
c.setState(ctx, Online)
239239

240-
return c.cc, nil
240+
return c.grpcConn, nil
241241
}
242242

243243
func (c *conn) onTransportError(ctx context.Context, cause error) {
@@ -252,11 +252,11 @@ func isAvailable(raw *grpc.ClientConn) bool {
252252

253253
// conn must be locked
254254
func (c *conn) close(ctx context.Context) (err error) {
255-
if c.cc == nil {
255+
if c.grpcConn == nil {
256256
return nil
257257
}
258-
err = c.cc.Close()
259-
c.cc = nil
258+
err = c.grpcConn.Close()
259+
c.grpcConn = nil
260260
c.setState(ctx, Offline)
261261

262262
return c.wrapError(err)
@@ -423,19 +423,23 @@ func (c *conn) NewStream(
423423

424424
ctx, sentMark := markContext(meta.WithTraceID(ctx, traceID))
425425

426-
ctx, cancel := xcontext.WithCancel(ctx)
426+
ctx, cancel := c.childStreams.WithCancel(ctx)
427427
defer func() {
428428
if finalErr != nil {
429429
cancel()
430-
} else {
431-
c.childStreams.Remember(&cancel)
432430
}
433431
}()
434432

435-
s, err := cc.NewStream(ctx, desc, method, append(opts, grpc.OnFinish(func(err error) {
436-
cancel()
437-
c.childStreams.Forget(&cancel)
438-
}))...)
433+
s := &grpcClientStream{
434+
parentConn: c,
435+
streamCtx: ctx,
436+
streamCancel: cancel,
437+
wrapping: useWrapping,
438+
traceID: traceID,
439+
sentMark: sentMark,
440+
}
441+
442+
s.stream, err = cc.NewStream(ctx, desc, method, append(opts, grpc.OnFinish(s.finish))...)
439443
if err != nil {
440444
if xerrors.IsContextError(err) {
441445
return nil, xerrors.WithStackTrace(err)
@@ -451,25 +455,16 @@ func (c *conn) NewStream(
451455
xerrors.WithTraceID(traceID),
452456
)
453457
if sentMark.canRetry() {
454-
return s, c.wrapError(xerrors.Retryable(err, xerrors.WithName("NewStream")))
458+
return nil, c.wrapError(xerrors.Retryable(err, xerrors.WithName("NewStream")))
455459
}
456460

457-
return s, c.wrapError(err)
461+
return nil, c.wrapError(err)
458462
}
459463

460-
return s, err
464+
return nil, err
461465
}
462466

463-
return &grpcClientStream{
464-
ClientStream: s,
465-
c: c,
466-
wrapping: useWrapping,
467-
traceID: traceID,
468-
sentMark: sentMark,
469-
onDone: func(ctx context.Context, md metadata.MD) {
470-
meta.CallTrailerCallback(ctx, md)
471-
},
472-
}, nil
467+
return s, nil
473468
}
474469

475470
func (c *conn) wrapError(err error) error {

internal/conn/grpc_client_stream.go

Lines changed: 49 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -8,36 +8,50 @@ import (
88
"google.golang.org/grpc"
99
"google.golang.org/grpc/metadata"
1010

11+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/meta"
1112
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
1213
"github.com/ydb-platform/ydb-go-sdk/v3/internal/wrap"
1314
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1415
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
1516
)
1617

1718
type grpcClientStream struct {
18-
grpc.ClientStream
19-
c *conn
20-
wrapping bool
21-
traceID string
22-
sentMark *modificationMark
23-
onDone func(ctx context.Context, md metadata.MD)
19+
parentConn *conn
20+
stream grpc.ClientStream
21+
streamCtx context.Context //nolint:containedctx
22+
streamCancel context.CancelFunc
23+
wrapping bool
24+
traceID string
25+
sentMark *modificationMark
26+
}
27+
28+
func (s *grpcClientStream) Header() (metadata.MD, error) {
29+
return s.stream.Header()
30+
}
31+
32+
func (s *grpcClientStream) Trailer() metadata.MD {
33+
return s.stream.Trailer()
34+
}
35+
36+
func (s *grpcClientStream) Context() context.Context {
37+
return s.stream.Context()
2438
}
2539

2640
func (s *grpcClientStream) CloseSend() (err error) {
2741
var (
28-
ctx = s.Context()
29-
onDone = trace.DriverOnConnStreamCloseSend(s.c.config.Trace(), &ctx,
42+
ctx = s.streamCtx
43+
onDone = trace.DriverOnConnStreamCloseSend(s.parentConn.config.Trace(), &ctx,
3044
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).CloseSend"),
3145
)
3246
)
3347
defer func() {
3448
onDone(err)
3549
}()
3650

37-
stop := s.c.lastUsage.Start()
51+
stop := s.parentConn.lastUsage.Start()
3852
defer stop()
3953

40-
err = s.ClientStream.CloseSend()
54+
err = s.stream.CloseSend()
4155

4256
if err != nil {
4357
if xerrors.IsContextError(err) {
@@ -48,7 +62,7 @@ func (s *grpcClientStream) CloseSend() (err error) {
4862
return s.wrapError(
4963
xerrors.Transport(
5064
err,
51-
xerrors.WithAddress(s.c.Address()),
65+
xerrors.WithAddress(s.parentConn.Address()),
5266
xerrors.WithTraceID(s.traceID),
5367
),
5468
)
@@ -62,32 +76,32 @@ func (s *grpcClientStream) CloseSend() (err error) {
6276

6377
func (s *grpcClientStream) SendMsg(m interface{}) (err error) {
6478
var (
65-
ctx = s.Context()
66-
onDone = trace.DriverOnConnStreamSendMsg(s.c.config.Trace(), &ctx,
79+
ctx = s.streamCtx
80+
onDone = trace.DriverOnConnStreamSendMsg(s.parentConn.config.Trace(), &ctx,
6781
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).SendMsg"),
6882
)
6983
)
7084
defer func() {
7185
onDone(err)
7286
}()
7387

74-
stop := s.c.lastUsage.Start()
88+
stop := s.parentConn.lastUsage.Start()
7589
defer stop()
7690

77-
err = s.ClientStream.SendMsg(m)
91+
err = s.stream.SendMsg(m)
7892

7993
if err != nil {
8094
if xerrors.IsContextError(err) {
8195
return xerrors.WithStackTrace(err)
8296
}
8397

8498
defer func() {
85-
s.c.onTransportError(ctx, err)
99+
s.parentConn.onTransportError(ctx, err)
86100
}()
87101

88102
if s.wrapping {
89103
err = xerrors.Transport(err,
90-
xerrors.WithAddress(s.c.Address()),
104+
xerrors.WithAddress(s.parentConn.Address()),
91105
xerrors.WithTraceID(s.traceID),
92106
)
93107
if s.sentMark.canRetry() {
@@ -105,28 +119,31 @@ func (s *grpcClientStream) SendMsg(m interface{}) (err error) {
105119
return nil
106120
}
107121

122+
func (s *grpcClientStream) finish(err error) {
123+
s.streamCancel()
124+
trace.DriverOnConnStreamFinish(s.parentConn.config.Trace(), s.streamCtx,
125+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).finish"), err,
126+
)
127+
}
128+
108129
func (s *grpcClientStream) RecvMsg(m interface{}) (err error) {
109130
var (
110-
ctx = s.Context()
111-
onDone = trace.DriverOnConnStreamRecvMsg(s.c.config.Trace(), &ctx,
131+
ctx = s.streamCtx
132+
onDone = trace.DriverOnConnStreamRecvMsg(s.parentConn.config.Trace(), &ctx,
112133
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).RecvMsg"),
113134
)
114135
)
115136
defer func() {
116137
onDone(err)
117-
}()
118-
119-
stop := s.c.lastUsage.Start()
120-
defer stop()
121-
122-
defer func() {
123138
if err != nil {
124-
md := s.ClientStream.Trailer()
125-
s.onDone(ctx, md)
139+
meta.CallTrailerCallback(s.streamCtx, s.stream.Trailer())
126140
}
127141
}()
128142

129-
err = s.ClientStream.RecvMsg(m)
143+
stop := s.parentConn.lastUsage.Start()
144+
defer stop()
145+
146+
err = s.stream.RecvMsg(m)
130147

131148
if err != nil { //nolint:nestif
132149
if xerrors.IsContextError(err) {
@@ -135,13 +152,13 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) {
135152

136153
defer func() {
137154
if !xerrors.Is(err, io.EOF) {
138-
s.c.onTransportError(ctx, err)
155+
s.parentConn.onTransportError(ctx, err)
139156
}
140157
}()
141158

142159
if s.wrapping {
143160
err = xerrors.Transport(err,
144-
xerrors.WithAddress(s.c.Address()),
161+
xerrors.WithAddress(s.parentConn.Address()),
145162
)
146163
if s.sentMark.canRetry() {
147164
return s.wrapError(xerrors.Retryable(err,
@@ -161,7 +178,7 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) {
161178
return s.wrapError(
162179
xerrors.Operation(
163180
xerrors.FromOperation(operation),
164-
xerrors.WithAddress(s.c.Address()),
181+
xerrors.WithAddress(s.parentConn.Address()),
165182
),
166183
)
167184
}
@@ -177,7 +194,7 @@ func (s *grpcClientStream) wrapError(err error) error {
177194
}
178195

179196
return xerrors.WithStackTrace(
180-
newConnError(s.c.endpoint.NodeID(), s.c.endpoint.Address(), err),
197+
newConnError(s.parentConn.endpoint.NodeID(), s.parentConn.endpoint.Address(), err),
181198
xerrors.WithSkipDepth(1),
182199
)
183200
}

internal/credentials/oauth2_test.go

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ import (
99
"net/http"
1010
"net/url"
1111
"os"
12-
"os/user"
13-
"path/filepath"
1412
"reflect"
1513
"strconv"
1614
"testing"
@@ -430,23 +428,16 @@ func TestJWTTokenBadParams(t *testing.T) {
430428
}
431429

432430
func TestJWTTokenSourceReadPrivateKeyFromFile(t *testing.T) {
433-
const perm = 0o600
434-
usr, err := user.Current()
431+
f, err := os.CreateTemp("", "tmpfile-")
435432
require.NoError(t, err)
436-
fileName := strconv.Itoa(time.Now().Second())
437-
filePath := filepath.Join(usr.HomeDir, fileName)
438-
beautifulFilePath := filepath.Join("~", fileName)
439-
err = os.WriteFile(
440-
filePath,
441-
[]byte(testPrivateKeyContent),
442-
perm,
443-
)
433+
defer os.Remove(f.Name())
434+
_, err = f.WriteString(testPrivateKeyContent)
444435
require.NoError(t, err)
445-
defer os.Remove(filePath)
436+
f.Close()
446437

447438
var src TokenSource
448439
src, err = NewJWTTokenSource(
449-
WithRSAPrivateKeyPEMFile(beautifulFilePath),
440+
WithRSAPrivateKeyPEMFile(f.Name()),
450441
WithKeyID("key_id"),
451442
WithSigningMethod(jwt.SigningMethodRS256),
452443
WithIssuer("test_issuer"),

internal/version/version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package version
33
const (
44
Major = "3"
55
Minor = "66"
6-
Patch = "1"
6+
Patch = "3"
77

88
Prefix = "ydb-go-sdk"
99
)

0 commit comments

Comments
 (0)