@@ -22,7 +22,8 @@ type Listener struct {
2222 minReconnectInterval time.Duration
2323 maxReconnectInterval time.Duration
2424
25- logger goengine.Logger
25+ logger goengine.Logger
26+ metrics goengine.Metrics
2627}
2728
2829// NewListener returns a new notification listener
@@ -32,6 +33,7 @@ func NewListener(
3233 minReconnectInterval time.Duration ,
3334 maxReconnectInterval time.Duration ,
3435 logger goengine.Logger ,
36+ metrics goengine.Metrics ,
3537) (* Listener , error ) {
3638 switch {
3739 case strings .TrimSpace (dbDSN ) == "" :
@@ -48,12 +50,17 @@ func NewListener(
4850 logger = goengine .NopLogger
4951 }
5052
53+ if metrics == nil {
54+ metrics = goengine .NopMetrics
55+ }
56+
5157 return & Listener {
5258 dbDSN : dbDSN ,
5359 dbChannel : dbChannel ,
5460 minReconnectInterval : minReconnectInterval ,
5561 maxReconnectInterval : maxReconnectInterval ,
5662 logger : logger ,
63+ metrics : metrics ,
5764 }, nil
5865}
5966
@@ -83,6 +90,7 @@ func (s *Listener) Listen(ctx context.Context, exec sql.ProjectionTrigger) error
8390
8491 // Execute an initial run of the projection.
8592 // This is done after db listen is started to avoid losing a set of messages while the Listener creates a db connection.
93+ s .metrics .ReceivedNotification (false )
8694 if err := exec (ctx , nil ); err != nil {
8795 return err
8896 }
@@ -92,6 +100,7 @@ func (s *Listener) Listen(ctx context.Context, exec sql.ProjectionTrigger) error
92100 case n := <- listener .Notify :
93101 // Unmarshal the notification
94102 notification := s .unmarshalNotification (n )
103+ s .metrics .ReceivedNotification (notification != nil )
95104
96105 // Execute the notification to be projected
97106 if err := exec (ctx , notification ); err != nil {
0 commit comments