Skip to content

Commit 813a1de

Browse files
authored
Merge pull request #367 from ydb-platform/grpc-logger
Grpc logger
2 parents d4303bc + ef6923e commit 813a1de

File tree

1 file changed

+96
-0
lines changed

1 file changed

+96
-0
lines changed

internal/xtest/grpclogger.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package xtest
2+
3+
import (
4+
"context"
5+
"sync/atomic"
6+
"testing"
7+
8+
"google.golang.org/grpc"
9+
)
10+
11+
var globalLastStreamID int64 = 0
12+
13+
// GrpcLogger use for log raw grpc messages
14+
//
15+
// Usage:
16+
//
17+
// db, err := ydb.Open(context.Background(), connectionString,
18+
// ...
19+
// ydb.With(config.WithGrpcOptions(grpc.WithChainUnaryInterceptor(xtest.NewGrpcLogger(t).UnaryClientInterceptor))),
20+
// )
21+
type GrpcLogger struct {
22+
t testing.TB
23+
}
24+
25+
func NewGrpcLogger(t testing.TB) GrpcLogger {
26+
return GrpcLogger{t: t}
27+
}
28+
29+
func (l GrpcLogger) UnaryClientInterceptor(
30+
ctx context.Context,
31+
method string,
32+
req, reply interface{},
33+
cc *grpc.ClientConn,
34+
invoker grpc.UnaryInvoker,
35+
opts ...grpc.CallOption,
36+
) error {
37+
err := invoker(ctx, method, req, reply, cc, opts...)
38+
l.t.Logf(
39+
"UnaryClientInterceptor: %s - err: %v\n\nreq:\n%v\n\nresp:\n%v",
40+
method,
41+
err,
42+
req,
43+
reply,
44+
)
45+
return err
46+
}
47+
48+
func (l GrpcLogger) StreamClientInterceptor(
49+
ctx context.Context,
50+
desc *grpc.StreamDesc,
51+
cc *grpc.ClientConn,
52+
method string,
53+
streamer grpc.Streamer,
54+
opts ...grpc.CallOption,
55+
) (grpc.ClientStream, error) {
56+
stream, err := streamer(ctx, desc, cc, method, opts...)
57+
streamWrapper := newGrpcLoggerStream(stream, l.t)
58+
if stream != nil {
59+
stream = streamWrapper
60+
}
61+
l.t.Logf(
62+
"StreamStart: %v with err '%v' (streamID: %v)",
63+
method,
64+
err,
65+
streamWrapper.streamID,
66+
)
67+
return stream, err
68+
}
69+
70+
type grpcLoggerStream struct {
71+
grpc.ClientStream
72+
streamID int64
73+
t testing.TB
74+
}
75+
76+
func newGrpcLoggerStream(stream grpc.ClientStream, t testing.TB) grpcLoggerStream {
77+
return grpcLoggerStream{stream, atomic.AddInt64(&globalLastStreamID, 1), t}
78+
}
79+
80+
func (g grpcLoggerStream) CloseSend() error {
81+
err := g.ClientStream.CloseSend()
82+
g.t.Logf("CloseSend: %v (streamID: %v)", err, g.streamID)
83+
return err
84+
}
85+
86+
func (g grpcLoggerStream) SendMsg(m interface{}) error {
87+
err := g.ClientStream.SendMsg(m)
88+
g.t.Logf("SendMsg (streamID: %v) with err '%v':\n%v ", g.streamID, err, m)
89+
return err
90+
}
91+
92+
func (g grpcLoggerStream) RecvMsg(m interface{}) error {
93+
err := g.ClientStream.RecvMsg(m)
94+
g.t.Logf("RecvMsg (streamID: %v) with err '%v':\n%v ", g.streamID, err, m)
95+
return err
96+
}

0 commit comments

Comments
 (0)