@@ -11,6 +11,7 @@ import (
1111 "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
1212 "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawydb"
1313 "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
14+ "github.com/ydb-platform/ydb-go-sdk/v3/trace"
1415)
1516
1617var ErrUnexpectedMessageType = errors .New ("unexpected message type" )
@@ -22,16 +23,36 @@ type GrpcStream interface {
2223}
2324
2425type StreamReader struct {
25- Stream GrpcStream
26+ Stream GrpcStream
27+ ReaderID int64
28+
29+ Tracer * trace.Topic
30+ sessionID string
31+ sentMessageCount int
32+ receiveMessageCount int
2633}
2734
2835func (s StreamReader ) CloseSend () error {
2936 return s .Stream .CloseSend ()
3037}
3138
3239//nolint:funlen
33- func (s StreamReader ) Recv () (ServerMessage , error ) {
40+ func (s StreamReader ) Recv () (_ ServerMessage , resErr error ) {
3441 grpcMess , err := s .Stream .Recv ()
42+
43+ defer func () {
44+ s .receiveMessageCount ++
45+
46+ trace .TopicOnReaderReceiveGRPCMessage (
47+ s .Tracer ,
48+ s .ReaderID ,
49+ s .sessionID ,
50+ s .receiveMessageCount ,
51+ grpcMess ,
52+ resErr ,
53+ )
54+ }()
55+
3556 if xerrors .Is (err , io .EOF ) {
3657 return nil , err
3758 }
@@ -53,6 +74,8 @@ func (s StreamReader) Recv() (ServerMessage, error) {
5374
5475 switch m := grpcMess .GetServerMessage ().(type ) {
5576 case * Ydb_Topic.StreamReadMessage_FromServer_InitResponse :
77+ s .sessionID = m .InitResponse .GetSessionId ()
78+
5679 resp := & InitResponse {}
5780 resp .ServerMessageMetadata = meta
5881 resp .fromProto (m .InitResponse )
@@ -82,6 +105,16 @@ func (s StreamReader) Recv() (ServerMessage, error) {
82105 }
83106
84107 return req , nil
108+
109+ case * Ydb_Topic.StreamReadMessage_FromServer_EndPartitionSession :
110+ req := & EndPartitionSession {}
111+ req .ServerMessageMetadata = meta
112+ if err = req .fromProto (m .EndPartitionSession ); err != nil {
113+ return nil , err
114+ }
115+
116+ return req , nil
117+
85118 case * Ydb_Topic.StreamReadMessage_FromServer_CommitOffsetResponse :
86119 resp := & CommitOffsetResponse {}
87120 resp .ServerMessageMetadata = meta
@@ -113,66 +146,75 @@ func (s StreamReader) Recv() (ServerMessage, error) {
113146 }
114147}
115148
116- func (s StreamReader ) Send (msg ClientMessage ) (err error ) {
149+ //nolint:funlen
150+ func (s StreamReader ) Send (msg ClientMessage ) (resErr error ) {
117151 defer func () {
118- err = xerrors .Transport (err )
152+ resErr = xerrors .Transport (resErr )
119153 }()
154+
155+ var grpcMess * Ydb_Topic.StreamReadMessage_FromClient
120156 switch m := msg .(type ) {
121157 case * InitRequest :
122- grpcMess : = & Ydb_Topic.StreamReadMessage_FromClient {
158+ grpcMess = & Ydb_Topic.StreamReadMessage_FromClient {
123159 ClientMessage : & Ydb_Topic.StreamReadMessage_FromClient_InitRequest {InitRequest : m .toProto ()},
124160 }
125161
126- return s .Stream .Send (grpcMess )
127162 case * ReadRequest :
128- grpcMess : = & Ydb_Topic.StreamReadMessage_FromClient {
163+ grpcMess = & Ydb_Topic.StreamReadMessage_FromClient {
129164 ClientMessage : & Ydb_Topic.StreamReadMessage_FromClient_ReadRequest {ReadRequest : m .toProto ()},
130165 }
131-
132- return s .Stream .Send (grpcMess )
133166 case * StartPartitionSessionResponse :
134- grpcMess : = & Ydb_Topic.StreamReadMessage_FromClient {
167+ grpcMess = & Ydb_Topic.StreamReadMessage_FromClient {
135168 ClientMessage : & Ydb_Topic.StreamReadMessage_FromClient_StartPartitionSessionResponse {
136169 StartPartitionSessionResponse : m .toProto (),
137170 },
138171 }
139-
140- return s .Stream .Send (grpcMess )
141172 case * StopPartitionSessionResponse :
142- grpcMess : = & Ydb_Topic.StreamReadMessage_FromClient {
173+ grpcMess = & Ydb_Topic.StreamReadMessage_FromClient {
143174 ClientMessage : & Ydb_Topic.StreamReadMessage_FromClient_StopPartitionSessionResponse {
144175 StopPartitionSessionResponse : m .toProto (),
145176 },
146177 }
147178
148- return s .Stream .Send (grpcMess )
149179 case * CommitOffsetRequest :
150- grpcMess : = & Ydb_Topic.StreamReadMessage_FromClient {
180+ grpcMess = & Ydb_Topic.StreamReadMessage_FromClient {
151181 ClientMessage : & Ydb_Topic.StreamReadMessage_FromClient_CommitOffsetRequest {
152182 CommitOffsetRequest : m .toProto (),
153183 },
154184 }
155185
156- return s .Stream .Send (grpcMess )
157186 case * PartitionSessionStatusRequest :
158- grpcMess : = & Ydb_Topic.StreamReadMessage_FromClient {
187+ grpcMess = & Ydb_Topic.StreamReadMessage_FromClient {
159188 ClientMessage : & Ydb_Topic.StreamReadMessage_FromClient_PartitionSessionStatusRequest {
160189 PartitionSessionStatusRequest : m .toProto (),
161190 },
162191 }
163192
164- return s .Stream .Send (grpcMess )
165193 case * UpdateTokenRequest :
166- grpcMess : = & Ydb_Topic.StreamReadMessage_FromClient {
194+ grpcMess = & Ydb_Topic.StreamReadMessage_FromClient {
167195 ClientMessage : & Ydb_Topic.StreamReadMessage_FromClient_UpdateTokenRequest {
168196 UpdateTokenRequest : m .ToProto (),
169197 },
170198 }
199+ }
171200
172- return s .Stream .Send (grpcMess )
173- default :
174- return xerrors .WithStackTrace (fmt .Errorf ("ydb: send unexpected message type: %v" , reflect .TypeOf (msg )))
201+ if grpcMess == nil {
202+ resErr = xerrors .WithStackTrace (fmt .Errorf ("ydb: send unexpected message type: %v" , reflect .TypeOf (msg )))
203+ } else {
204+ resErr = s .Stream .Send (grpcMess )
175205 }
206+
207+ s .sentMessageCount ++
208+ trace .TopicOnReaderSentGRPCMessage (
209+ s .Tracer ,
210+ s .ReaderID ,
211+ s .sessionID ,
212+ s .sentMessageCount ,
213+ grpcMess ,
214+ resErr ,
215+ )
216+
217+ return resErr
176218}
177219
178220type ClientMessage interface {
0 commit comments