@@ -57,6 +57,13 @@ type filterResponse interface {
57
57
* ingestv1.MergeSpanProfileResponse
58
58
}
59
59
60
+ func rewriteEOFError (err error ) error {
61
+ if errors .Is (err , io .EOF ) {
62
+ return connect .NewError (connect .CodeCanceled , errors .New ("client closed stream" ))
63
+ }
64
+ return err
65
+ }
66
+
60
67
// filterProfiles merges and dedupe profiles from different iterators and allow filtering via a bidi stream.
61
68
func filterProfiles [B BidiServerMerge [Res , Req ], Res filterResponse , Req filterRequest ](
62
69
ctx context.Context , profiles []iter.Iterator [Profile ], batchProfileSize int , stream B ,
@@ -130,10 +137,7 @@ func filterProfiles[B BidiServerMerge[Res, Req], Res filterResponse, Req filterR
130
137
// read a batch of profiles and sends it.
131
138
132
139
if err != nil {
133
- if errors .Is (err , io .EOF ) {
134
- return connect .NewError (connect .CodeCanceled , errors .New ("client closed stream" ))
135
- }
136
- return err
140
+ return rewriteEOFError (err )
137
141
}
138
142
sp .LogFields (otlog .String ("msg" , "batch sent to client" ))
139
143
@@ -144,30 +148,28 @@ func filterProfiles[B BidiServerMerge[Res, Req], Res filterResponse, Req filterR
144
148
switch s := BidiServerMerge [Res , Req ](stream ).(type ) {
145
149
case BidiServerMerge [* ingestv1.MergeProfilesStacktracesResponse , * ingestv1.MergeProfilesStacktracesRequest ]:
146
150
selectionResponse , err := s .Receive ()
147
- if err = = nil {
148
- selected = selectionResponse . Profiles
151
+ if err ! = nil {
152
+ return rewriteEOFError ( err )
149
153
}
154
+ selected = selectionResponse .Profiles
150
155
case BidiServerMerge [* ingestv1.MergeProfilesLabelsResponse , * ingestv1.MergeProfilesLabelsRequest ]:
151
156
selectionResponse , err := s .Receive ()
152
- if err = = nil {
153
- selected = selectionResponse . Profiles
157
+ if err ! = nil {
158
+ return rewriteEOFError ( err )
154
159
}
160
+ selected = selectionResponse .Profiles
155
161
case BidiServerMerge [* ingestv1.MergeProfilesPprofResponse , * ingestv1.MergeProfilesPprofRequest ]:
156
162
selectionResponse , err := s .Receive ()
157
- if err = = nil {
158
- selected = selectionResponse . Profiles
163
+ if err ! = nil {
164
+ return rewriteEOFError ( err )
159
165
}
166
+ selected = selectionResponse .Profiles
160
167
case BidiServerMerge [* ingestv1.MergeSpanProfileResponse , * ingestv1.MergeSpanProfileRequest ]:
161
168
selectionResponse , err := s .Receive ()
162
- if err == nil {
163
- selected = selectionResponse .Profiles
164
- }
165
- }
166
- if err != nil {
167
- if errors .Is (err , io .EOF ) {
168
- return connect .NewError (connect .CodeCanceled , errors .New ("client closed stream" ))
169
+ if err != nil {
170
+ return rewriteEOFError (err )
169
171
}
170
- return err
172
+ selected = selectionResponse . Profiles
171
173
}
172
174
sp .LogFields (otlog .String ("msg" , "selection received" ))
173
175
for i , k := range selected {
0 commit comments