@@ -121,7 +121,7 @@ func (p *pubsub) publishMessageHTTP(ctx context.Context, msg *subscribedMessage)
121
121
elapsed := diag .ElapsedSince (start )
122
122
123
123
if err != nil {
124
- diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Retry )), msg .topic , elapsed )
124
+ diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Retry )), "" , msg .topic , elapsed )
125
125
return fmt .Errorf ("error returned from app channel while sending pub/sub event to app: %w" , rterrors .NewRetriable (err ))
126
126
}
127
127
defer resp .Close ()
@@ -145,7 +145,7 @@ func (p *pubsub) publishMessageHTTP(ctx context.Context, msg *subscribedMessage)
145
145
} else {
146
146
log .Debugf ("skipping status check due to error parsing result from pub/sub event %v: %s" , cloudEvent [contribpubsub .IDField ], err )
147
147
}
148
- diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Success )), msg .topic , elapsed )
148
+ diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Success )), "" , msg .topic , elapsed )
149
149
return nil
150
150
}
151
151
@@ -154,19 +154,19 @@ func (p *pubsub) publishMessageHTTP(ctx context.Context, msg *subscribedMessage)
154
154
// Consider empty status field as success
155
155
fallthrough
156
156
case contribpubsub .Success :
157
- diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Success )), msg .topic , elapsed )
157
+ diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Success )), "" , msg .topic , elapsed )
158
158
return nil
159
159
case contribpubsub .Retry :
160
- diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Retry )), msg .topic , elapsed )
160
+ diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Retry )), "" , msg .topic , elapsed )
161
161
// TODO: add retry error info
162
162
return fmt .Errorf ("RETRY status returned from app while processing pub/sub event %v: %w" , cloudEvent [contribpubsub .IDField ], rterrors .NewRetriable (nil ))
163
163
case contribpubsub .Drop :
164
- diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Drop )), msg .topic , elapsed )
164
+ diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Drop )), strings . ToLower ( string ( contribpubsub . Success )), msg .topic , elapsed )
165
165
log .Warnf ("DROP status returned from app while processing pub/sub event %v" , cloudEvent [contribpubsub .IDField ])
166
166
return rtpubsub .ErrMessageDropped
167
167
}
168
168
// Consider unknown status field as error and retry
169
- diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Retry )), msg .topic , elapsed )
169
+ diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Retry )), "" , msg .topic , elapsed )
170
170
return fmt .Errorf ("unknown status returned from app while processing pub/sub event %v, status: %v, err: %w" , cloudEvent [contribpubsub .IDField ], appResponse .Status , rterrors .NewRetriable (nil ))
171
171
}
172
172
@@ -176,14 +176,14 @@ func (p *pubsub) publishMessageHTTP(ctx context.Context, msg *subscribedMessage)
176
176
// When adding/removing an error here, check if that is also applicable to GRPC since there is a mapping between HTTP and GRPC errors:
177
177
// https://cloud.google.com/apis/design/errors#handling_errors
178
178
log .Errorf ("non-retriable error returned from app while processing pub/sub event %v: %s. status code returned: %v" , cloudEvent [contribpubsub .IDField ], body , statusCode )
179
- diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Drop )), msg .topic , elapsed )
179
+ diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Drop )), "" , msg .topic , elapsed )
180
180
return nil
181
181
}
182
182
183
183
// Every error from now on is a retriable error.
184
184
errMsg := fmt .Sprintf ("retriable error returned from app while processing pub/sub event %v, topic: %v, body: %s. status code returned: %v" , cloudEvent [contribpubsub .IDField ], cloudEvent [contribpubsub .TopicField ], body , statusCode )
185
185
log .Warnf (errMsg )
186
- diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Retry )), msg .topic , elapsed )
186
+ diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Retry )), "" , msg .topic , elapsed )
187
187
return rterrors .NewRetriable (errors .New (errMsg ))
188
188
}
189
189
@@ -206,14 +206,14 @@ func (p *pubsub) publishMessageGRPC(ctx context.Context, msg *subscribedMessage)
206
206
decoded , decodeErr := base64 .StdEncoding .DecodeString (dataAsString )
207
207
if decodeErr != nil {
208
208
log .Debugf ("unable to base64 decode cloudEvent field data_base64: %s" , decodeErr )
209
- diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Retry )), msg .topic , 0 )
209
+ diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Retry )), "" , msg .topic , 0 )
210
210
211
211
return fmt .Errorf ("error returned from app while processing pub/sub event: %w" , rterrors .NewRetriable (decodeErr ))
212
212
}
213
213
214
214
envelope .Data = decoded
215
215
} else {
216
- diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Retry )), msg .topic , 0 )
216
+ diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Retry )), "" , msg .topic , 0 )
217
217
return fmt .Errorf ("error returned from app while processing pub/sub event: %w" , rterrors .NewRetriable (errUnexpectedEnvelopeData ))
218
218
}
219
219
} else if data , ok := cloudEvent [contribpubsub .DataField ]; ok && data != nil {
@@ -226,7 +226,7 @@ func (p *pubsub) publishMessageGRPC(ctx context.Context, msg *subscribedMessage)
226
226
case []byte :
227
227
envelope .Data = v
228
228
default :
229
- diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Retry )), msg .topic , 0 )
229
+ diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Retry )), "" , msg .topic , 0 )
230
230
return fmt .Errorf ("error returned from app while processing pub/sub event: %w" , rterrors .NewRetriable (errUnexpectedEnvelopeData ))
231
231
}
232
232
} else if contenttype .IsJSONContentType (envelope .GetDataContentType ()) || contenttype .IsCloudEventContentType (envelope .GetDataContentType ()) {
@@ -257,7 +257,7 @@ func (p *pubsub) publishMessageGRPC(ctx context.Context, msg *subscribedMessage)
257
257
258
258
extensions , extensionsErr := extractCloudEventExtensions (cloudEvent )
259
259
if extensionsErr != nil {
260
- diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Retry )), msg .topic , 0 )
260
+ diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Retry )), "" , msg .topic , 0 )
261
261
return extensionsErr
262
262
}
263
263
envelope .Extensions = extensions
@@ -286,14 +286,14 @@ func (p *pubsub) publishMessageGRPC(ctx context.Context, msg *subscribedMessage)
286
286
if hasErrStatus && (errStatus .Code () == codes .Unimplemented ) {
287
287
// DROP
288
288
log .Warnf ("non-retriable error returned from app while processing pub/sub event %v: %s" , cloudEvent [contribpubsub .IDField ], err )
289
- diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Drop )), msg .topic , elapsed )
289
+ diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Drop )), "" , msg .topic , elapsed )
290
290
291
291
return nil
292
292
}
293
293
294
294
err = fmt .Errorf ("error returned from app while processing pub/sub event %v: %w" , cloudEvent [contribpubsub .IDField ], rterrors .NewRetriable (err ))
295
295
log .Debug (err )
296
- diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Retry )), msg .topic , elapsed )
296
+ diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Retry )), "" , msg .topic , elapsed )
297
297
298
298
// on error from application, return error for redelivery of event
299
299
return err
@@ -303,21 +303,21 @@ func (p *pubsub) publishMessageGRPC(ctx context.Context, msg *subscribedMessage)
303
303
case runtimev1 .TopicEventResponse_SUCCESS : //nolint:nosnakecase
304
304
// on uninitialized status, this is the case it defaults to as an uninitialized status defaults to 0 which is
305
305
// success from protobuf definition
306
- diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Success )), msg .topic , elapsed )
306
+ diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Success )), "" , msg .topic , elapsed )
307
307
return nil
308
308
case runtimev1 .TopicEventResponse_RETRY : //nolint:nosnakecase
309
- diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Retry )), msg .topic , elapsed )
309
+ diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Retry )), "" , msg .topic , elapsed )
310
310
// TODO: add retry error info
311
311
return fmt .Errorf ("RETRY status returned from app while processing pub/sub event %v: %w" , cloudEvent [contribpubsub .IDField ], rterrors .NewRetriable (nil ))
312
312
case runtimev1 .TopicEventResponse_DROP : //nolint:nosnakecase
313
313
log .Warnf ("DROP status returned from app while processing pub/sub event %v" , cloudEvent [contribpubsub .IDField ])
314
- diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Drop )), msg .topic , elapsed )
314
+ diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Drop )), strings . ToLower ( string ( contribpubsub . Success )), msg .topic , elapsed )
315
315
316
316
return rtpubsub .ErrMessageDropped
317
317
}
318
318
319
319
// Consider unknown status field as error and retry
320
- diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Retry )), msg .topic , elapsed )
320
+ diag .DefaultComponentMonitoring .PubsubIngressEvent (ctx , msg .pubsub , strings .ToLower (string (contribpubsub .Retry )), "" , msg .topic , elapsed )
321
321
return fmt .Errorf ("unknown status returned from app while processing pub/sub event %v, status: %v, err: %w" , cloudEvent [contribpubsub .IDField ], res .GetStatus (), rterrors .NewRetriable (nil ))
322
322
}
323
323
0 commit comments