@@ -25,7 +25,9 @@ import (
2525)
2626
2727var Sink = wire .NewSet (
28- NewFlushHandler ,
28+ NewIngestNotificationHandler ,
29+ NewFlushHandlers ,
30+ NewFlushHandlerManager ,
2931 NewSinkWorkerPublisher ,
3032 NewSinkKafkaConsumer ,
3133 NewSinkDeduplicator ,
@@ -44,12 +46,35 @@ func NewSinkWorkerPublisher(
4446 return publisher , func () {}, err
4547}
4648
47- func NewFlushHandler (
49+ type IngestNotificationHandler flushhandler.FlushEventHandler
50+
51+ func NewIngestNotificationHandler (
4852 sinkConfig config.SinkConfiguration ,
49- messagePublisher message.Publisher ,
5053 eventPublisher eventbus.Publisher ,
51- logger * slog.Logger ,
5254 meter metric.Meter ,
55+ logger * slog.Logger ,
56+ ) (IngestNotificationHandler , error ) {
57+ handler , err := ingestnotification .NewHandler (logger , meter , eventPublisher , ingestnotification.HandlerConfig {
58+ MaxEventsInBatch : sinkConfig .IngestNotifications .MaxEventsInBatch ,
59+ })
60+ if err != nil {
61+ return nil , fmt .Errorf ("failed to initialize ingest notification handler: %w" , err )
62+ }
63+
64+ return handler , nil
65+ }
66+
67+ func NewFlushHandlers (
68+ ingest IngestNotificationHandler ,
69+ ) []flushhandler.FlushEventHandler {
70+ return []flushhandler.FlushEventHandler {ingest }
71+ }
72+
73+ func NewFlushHandlerManager (
74+ sinkConfig config.SinkConfiguration ,
75+ messagePublisher message.Publisher ,
76+ logger * slog.Logger ,
77+ handlers []flushhandler.FlushEventHandler ,
5378) (flushhandler.FlushEventHandler , func (), error ) {
5479 flushHandlerMux := flushhandler .NewFlushEventHandlers ()
5580
@@ -61,26 +86,23 @@ func NewFlushHandler(
6186 }
6287 })
6388
64- ingestNotificationHandler , err := ingestnotification .NewHandler (logger , meter , eventPublisher , ingestnotification.HandlerConfig {
65- MaxEventsInBatch : sinkConfig .IngestNotifications .MaxEventsInBatch ,
66- })
67- if err != nil {
68- return nil , nil , err
89+ for _ , handler := range handlers {
90+ if handler != nil {
91+ flushHandlerMux .AddHandler (handler )
92+ }
6993 }
7094
71- flushHandlerMux .AddHandler (ingestNotificationHandler )
72-
7395 closer := func () {
7496 logger .Info ("shutting down flush success handlers" )
7597
76- if err = flushHandlerMux .Close (); err != nil {
98+ if err : = flushHandlerMux .Close (); err != nil {
7799 logger .Error ("failed to close flush success handler" , slog .String ("err" , err .Error ()))
78100 }
79101
80102 drainCtx , cancel := context .WithTimeout (context .Background (), sinkConfig .DrainTimeout )
81103 defer cancel ()
82104
83- if err = flushHandlerMux .WaitForDrain (drainCtx ); err != nil {
105+ if err : = flushHandlerMux .WaitForDrain (drainCtx ); err != nil {
84106 logger .Error ("failed to drain flush success handlers" , slog .String ("err" , err .Error ()))
85107 }
86108 }
@@ -138,7 +160,7 @@ func NewSinkKafkaConsumer(conf config.SinkConfiguration, logger *slog.Logger) (*
138160 consumerConfig .EnableAutoOffsetStore = false
139161 // Used when offset retention resets the offset. In this case we want to consume from the latest offset
140162 // as everything before should be already processed.
141- consumerConfig .AutoOffsetReset = "latest"
163+ consumerConfig .AutoOffsetReset = pkgkafka . AutoOffsetResetLatest
142164
143165 return NewKafkaConsumer (consumerConfig , logger )
144166}
0 commit comments