Skip to content

Commit 411f51c

Browse files
committed
feat: streaming supports more detailed tracing
1 parent ca63109 commit 411f51c

File tree

11 files changed

+605
-85
lines changed

11 files changed

+605
-85
lines changed

internal/client/option.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,7 @@ func (o *Options) initRemoteOpt() {
274274
// grpc unary short connection
275275
o.GRPCConnectOpts.ShortConn = true
276276
}
277+
o.GRPCConnectOpts.StreamEventHandler = o.TracerCtl.GetStreamEventHandler()
277278
o.RemoteOpt.ConnPool = nphttp2.NewConnPool(o.Svr.ServiceName, o.GRPCConnPoolSize, *o.GRPCConnectOpts)
278279
o.RemoteOpt.CliHandlerFactory = nphttp2.NewCliTransHandlerFactory()
279280
}
@@ -283,6 +284,7 @@ func (o *Options) initRemoteOpt() {
283284
// grpc unary short connection
284285
o.GRPCConnectOpts.ShortConn = true
285286
}
287+
o.GRPCConnectOpts.StreamEventHandler = o.TracerCtl.GetStreamEventHandler()
286288
o.RemoteOpt.GRPCStreamingConnPool = nphttp2.NewConnPool(o.Svr.ServiceName, o.GRPCConnPoolSize, *o.GRPCConnectOpts)
287289
o.RemoteOpt.GRPCStreamingCliHandlerFactory = nphttp2.NewCliTransHandlerFactory()
288290
}
@@ -291,6 +293,7 @@ func (o *Options) initRemoteOpt() {
291293
if o.PoolCfg != nil && *o.PoolCfg == zero {
292294
// configure short conn pool
293295
o.TTHeaderStreamingOptions.TransportOptions = append(o.TTHeaderStreamingOptions.TransportOptions, ttstream.WithClientShortConnPool())
296+
// todo: ttstream supports inject StreamEventHandler
294297
}
295298
o.RemoteOpt.TTHeaderStreamingCliHandlerFactory = ttstream.NewCliTransHandlerFactory(o.TTHeaderStreamingOptions.TransportOptions...)
296299
}

internal/server/option.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ func NewOptions(opts []Option) *Options {
213213
}
214214
o.StatsLevel = &level
215215
}
216+
ropt.GRPCCfg.StreamEventHandler = o.TracerCtl.GetStreamEventHandler()
216217
return o
217218
}
218219

pkg/remote/trans/nphttp2/grpc/http2_client.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"golang.org/x/net/http2"
4040
"golang.org/x/net/http2/hpack"
4141

42+
"github.com/cloudwego/kitex/internal/stream"
4243
"github.com/cloudwego/kitex/pkg/gofunc"
4344
"github.com/cloudwego/kitex/pkg/klog"
4445
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/codes"
@@ -47,6 +48,7 @@ import (
4748
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/metadata"
4849
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/peer"
4950
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/status"
51+
"github.com/cloudwego/kitex/pkg/stats"
5052
"github.com/cloudwego/kitex/pkg/utils"
5153
)
5254

@@ -119,6 +121,8 @@ type http2Client struct {
119121
onClose func()
120122

121123
bufferPool *bufferPool
124+
125+
eventHandler stream.StreamEventHandler
122126
}
123127

124128
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
@@ -196,6 +200,7 @@ func newHTTP2Client(ctx context.Context, conn net.Conn, opts ConnectOptions,
196200
onGoAway: onGoAway,
197201
onClose: onClose,
198202
bufferPool: newBufferPool(),
203+
eventHandler: opts.StreamEventHandler,
199204
}
200205
t.controlBuf = newControlBuffer(t.ctx.Done())
201206
if opts.InitialWindowSize >= defaultWindowSize {
@@ -584,6 +589,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
584589
return nil, ErrConnClosing
585590
}
586591
}
592+
t.reportEvent(s, stats.StreamSendHeader)
587593
return s, nil
588594
}
589595

@@ -624,6 +630,11 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
624630
s.trailer = mdata
625631
}
626632
if err != nil {
633+
if rst {
634+
// report event before writing recvMsg to avoid event lost
635+
// because tracer would finish when Recv/Send failed
636+
t.reportEvent(s, stats.StreamSendRst)
637+
}
627638
// This will unblock reads eventually.
628639
s.write(recvMsg{err: err})
629640
}
@@ -753,7 +764,13 @@ func (t *http2Client) Write(s *Stream, hdr, data []byte, opts *Options) error {
753764
return s.getCloseStreamErr()
754765
}
755766
}
756-
return t.controlBuf.put(df)
767+
if err := t.controlBuf.put(df); err != nil {
768+
return err
769+
}
770+
if opts.Last {
771+
t.reportEvent(s, stats.StreamSendTrailer)
772+
}
773+
return nil
757774
}
758775

759776
func (t *http2Client) getStream(f http2.Frame) *Stream {
@@ -899,6 +916,7 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
899916
} else {
900917
msg = fmt.Sprintf("stream terminated by RST_STREAM with error code: %v", f.ErrCode)
901918
}
919+
t.reportEvent(s, stats.StreamRecvRst)
902920
t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(statusCode, msg), nil, false)
903921
}
904922

@@ -1115,9 +1133,13 @@ func (t *http2Client) operateHeaders(frame *grpcframe.MetaHeadersFrame) {
11151133
close(s.headerChan)
11161134
}
11171135

1136+
// Header
11181137
if !endStream {
1138+
t.reportEvent(s, stats.StreamRecvHeader)
11191139
return
11201140
}
1141+
// Trailer
1142+
t.reportEvent(s, stats.StreamRecvTrailer)
11211143

11221144
// if client received END_STREAM from server while stream was still active, send RST_STREAM
11231145
rst := s.getState() == streamActive

pkg/remote/trans/nphttp2/grpc/http2_server.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@ import (
3434
"sync/atomic"
3535
"time"
3636

37+
"github.com/cloudwego/kitex/internal/stream"
3738
"github.com/cloudwego/kitex/pkg/remote/codec/protobuf/encoding"
3839
"github.com/cloudwego/kitex/pkg/remote/transmeta"
40+
"github.com/cloudwego/kitex/pkg/stats"
3941

4042
"golang.org/x/net/http2"
4143
"golang.org/x/net/http2/hpack"
@@ -137,6 +139,8 @@ type http2Server struct {
137139
idle time.Time
138140

139141
bufferPool *bufferPool
142+
143+
eventHandler stream.StreamEventHandler
140144
}
141145

142146
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
@@ -240,6 +244,7 @@ func newHTTP2Server(ctx context.Context, conn net.Conn, config *ServerConfig) (_
240244
idle: time.Now(),
241245
initialWindowSize: int32(iwz),
242246
bufferPool: newBufferPool(),
247+
eventHandler: config.StreamEventHandler,
243248
}
244249
t.controlBuf = newControlBuffer(t.done)
245250
if dynamicWindow {
@@ -407,6 +412,7 @@ func (t *http2Server) operateHeaders(frame *grpcframe.MetaHeadersFrame, handle f
407412
wq: s.wq,
408413
})
409414
handle(s)
415+
t.reportEvent(s, stats.StreamRecvHeader)
410416
return nil
411417
}
412418

@@ -436,6 +442,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
436442
rstCode: se.Code,
437443
onWrite: func() {},
438444
})
445+
t.reportEvent(s, stats.StreamSendRst)
439446
}
440447
continue
441448
}
@@ -594,6 +601,7 @@ func (t *http2Server) handleData(f *grpcframe.DataFrame) {
594601
// Received the end of stream from the client.
595602
s.compareAndSwapState(streamActive, streamReadDone)
596603
s.write(recvMsg{err: io.EOF})
604+
t.reportEvent(s, stats.StreamRecvTrailer)
597605
}
598606
}
599607

@@ -605,6 +613,7 @@ func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
605613
} else {
606614
t.closeStream(s, status.Errorf(codes.Canceled, "transport: RSTStream Frame received with error code: %v [triggered by %s]", f.ErrCode, s.sourceService), false, 0, false)
607615
}
616+
t.reportEvent(s, stats.StreamRecvRst)
608617
return
609618
}
610619
// If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
@@ -792,6 +801,7 @@ func (t *http2Server) writeHeaderLocked(s *Stream) error {
792801
t.closeStream(s, errStatusHeaderListSizeLimitViolation, true, http2.ErrCodeInternal, false)
793802
return errStatusHeaderListSizeLimitViolation
794803
}
804+
t.reportEvent(s, stats.StreamSendHeader)
795805
return nil
796806
}
797807

@@ -1048,6 +1058,7 @@ func (t *http2Server) rstActiveStreams(streams map[uint32]*Stream, cancelErr err
10481058
finishCh <- struct{}{}
10491059
},
10501060
})
1061+
t.reportEvent(s, stats.StreamSendRst)
10511062
}
10521063
return activeStreams
10531064
}
@@ -1104,6 +1115,10 @@ func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, h
11041115
},
11051116
}
11061117
t.controlBuf.put(hdr)
1118+
if rst {
1119+
t.reportEvent(s, stats.StreamSendRst)
1120+
}
1121+
t.reportEvent(s, stats.StreamSendTrailer)
11071122
}
11081123

11091124
// closeStream clears the footprint of a stream when the stream is not needed any more.
@@ -1121,6 +1136,9 @@ func (t *http2Server) closeStream(s *Stream, err error, rst bool, rstCode http2.
11211136
rstCode: rstCode,
11221137
onWrite: func() {},
11231138
})
1139+
if rst {
1140+
t.reportEvent(s, stats.StreamSendRst)
1141+
}
11241142
}
11251143

11261144
func (t *http2Server) RemoteAddr() net.Addr {
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2025 CloudWeGo Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package grpc
18+
19+
import "github.com/cloudwego/kitex/pkg/stats"
20+
21+
func (t *http2Client) reportEvent(st *Stream, event stats.Event) {
22+
if t.eventHandler == nil {
23+
return
24+
}
25+
t.eventHandler(st.ctx, event, nil)
26+
}
27+
28+
func (t *http2Server) reportEvent(st *Stream, event stats.Event) {
29+
if t.eventHandler == nil {
30+
return
31+
}
32+
t.eventHandler(st.ctx, event, nil)
33+
}

0 commit comments

Comments
 (0)