@@ -2,19 +2,37 @@ package common
22
33import (
44 "context"
5+ "fmt"
56 "log/slog"
67
78 "github.com/ThreeDotsLabs/watermill/message"
9+ "github.com/confluentinc/confluent-kafka-go/v2/kafka"
10+ "github.com/google/wire"
811 "go.opentelemetry.io/otel/metric"
12+ "go.opentelemetry.io/otel/trace"
913
1014 "github.com/openmeterio/openmeter/app/config"
15+ "github.com/openmeterio/openmeter/openmeter/dedupe"
16+ "github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest/topicresolver"
17+ "github.com/openmeterio/openmeter/openmeter/meter"
18+ "github.com/openmeterio/openmeter/openmeter/sink"
1119 "github.com/openmeterio/openmeter/openmeter/sink/flushhandler"
1220 "github.com/openmeterio/openmeter/openmeter/sink/flushhandler/ingestnotification"
21+ "github.com/openmeterio/openmeter/openmeter/streaming"
1322 watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka"
1423 "github.com/openmeterio/openmeter/openmeter/watermill/eventbus"
1524 pkgkafka "github.com/openmeterio/openmeter/pkg/kafka"
1625)
1726
27+ var Sink = wire .NewSet (
28+ NewFlushHandler ,
29+ NewSinkWorkerPublisher ,
30+ NewSinkKafkaConsumer ,
31+ NewSinkDeduplicator ,
32+ NewSinkStorage ,
33+ NewSink ,
34+ )
35+
1836// the sink-worker requires control over how the publisher is closed
1937func NewSinkWorkerPublisher (
2038 ctx context.Context ,
@@ -27,13 +45,12 @@ func NewSinkWorkerPublisher(
2745}
2846
2947func NewFlushHandler (
30- eventsConfig config.EventsConfiguration ,
3148 sinkConfig config.SinkConfiguration ,
3249 messagePublisher message.Publisher ,
3350 eventPublisher eventbus.Publisher ,
3451 logger * slog.Logger ,
3552 meter metric.Meter ,
36- ) (flushhandler.FlushEventHandler , error ) {
53+ ) (flushhandler.FlushEventHandler , func (), error ) {
3754 flushHandlerMux := flushhandler .NewFlushEventHandlers ()
3855
3956 // We should only close the producer once the ingest events are fully processed
@@ -48,12 +65,27 @@ func NewFlushHandler(
4865 MaxEventsInBatch : sinkConfig .IngestNotifications .MaxEventsInBatch ,
4966 })
5067 if err != nil {
51- return nil , err
68+ return nil , nil , err
5269 }
5370
5471 flushHandlerMux .AddHandler (ingestNotificationHandler )
5572
56- return flushHandlerMux , nil
73+ closer := func () {
74+ logger .Info ("shutting down flush success handlers" )
75+
76+ if err = flushHandlerMux .Close (); err != nil {
77+ logger .Error ("failed to close flush success handler" , slog .String ("err" , err .Error ()))
78+ }
79+
80+ drainCtx , cancel := context .WithTimeout (context .Background (), sinkConfig .DrainTimeout )
81+ defer cancel ()
82+
83+ if err = flushHandlerMux .WaitForDrain (drainCtx ); err != nil {
84+ logger .Error ("failed to drain flush success handlers" , slog .String ("err" , err .Error ()))
85+ }
86+ }
87+
88+ return flushHandlerMux , closer , nil
5789}
5890
5991func SinkWorkerProvisionTopics (conf config.EventsConfiguration ) []pkgkafka.TopicConfig {
@@ -64,3 +96,95 @@ func SinkWorkerProvisionTopics(conf config.EventsConfiguration) []pkgkafka.Topic
6496 },
6597 }
6698}
99+
100+ func NewSinkStorage (
101+ streaming streaming.Connector ,
102+ ) (sink.Storage , error ) {
103+ return sink .NewClickhouseStorage (sink.ClickHouseStorageConfig {
104+ Streaming : streaming ,
105+ })
106+ }
107+
108+ func NewSinkDeduplicator (conf config.SinkConfiguration , logger * slog.Logger ) (dedupe.Deduplicator , func (), error ) {
109+ closer := func () {}
110+
111+ if conf .Dedupe .Enabled {
112+ deduplicator , err := conf .Dedupe .NewDeduplicator ()
113+ if err != nil {
114+ return nil , nil , fmt .Errorf ("failed to initialize deduplicator: %w" , err )
115+ }
116+
117+ closer = func () {
118+ err = deduplicator .Close ()
119+ if err != nil {
120+ logger .Error ("failed to close sink deduplicator" , slog .String ("err" , err .Error ()))
121+ }
122+ }
123+
124+ return deduplicator , closer , nil
125+ }
126+
127+ return nil , closer , nil
128+ }
129+
130+ type SinkKafkaConsumer = kafka.Consumer
131+
132+ func NewSinkKafkaConsumer (conf config.SinkConfiguration , logger * slog.Logger ) (* kafka.Consumer , func (), error ) {
133+ consumerConfig := conf .Kafka .AsConsumerConfig ()
134+
135+ // Override the following Kafka consumer configuration parameters with hardcoded values
136+ // as the Sink implementation relies on these to be set to a specific value.
137+ consumerConfig .EnableAutoCommit = true
138+ consumerConfig .EnableAutoOffsetStore = false
139+ // Used when offset retention resets the offset. In this case we want to consume from the latest offset
140+ // as everything before should be already processed.
141+ consumerConfig .AutoOffsetReset = "latest"
142+
143+ return NewKafkaConsumer (consumerConfig , logger )
144+ }
145+
146+ func NewSink (
147+ conf config.SinkConfiguration ,
148+ logger * slog.Logger ,
149+ metricMeter metric.Meter ,
150+ tracer trace.Tracer ,
151+ kafkaConsumer * SinkKafkaConsumer ,
152+ sinkStorage sink.Storage ,
153+ deduplicator dedupe.Deduplicator ,
154+ meterService meter.Service ,
155+ topicResolver * topicresolver.NamespacedTopicResolver ,
156+ flushHandler flushhandler.FlushEventHandler ,
157+ ) (* sink.Sink , func (), error ) {
158+ s , err := sink .NewSink (sink.SinkConfig {
159+ Logger : logger ,
160+ Tracer : tracer ,
161+ MetricMeter : metricMeter ,
162+ Storage : sinkStorage ,
163+ Deduplicator : deduplicator ,
164+ Consumer : kafkaConsumer ,
165+ MinCommitCount : conf .MinCommitCount ,
166+ MaxCommitWait : conf .MaxCommitWait ,
167+ MaxPollTimeout : conf .MaxPollTimeout ,
168+ NamespaceRefetch : conf .NamespaceRefetch ,
169+ NamespaceRefetchTimeout : conf .NamespaceRefetchTimeout ,
170+ NamespaceTopicRegexp : conf .NamespaceTopicRegexp ,
171+ FlushEventHandler : flushHandler ,
172+ FlushSuccessTimeout : conf .FlushSuccessTimeout ,
173+ DrainTimeout : conf .DrainTimeout ,
174+ TopicResolver : topicResolver ,
175+ MeterRefetchInterval : conf .MeterRefetchInterval ,
176+ MeterService : meterService ,
177+ LogDroppedEvents : conf .LogDroppedEvents ,
178+ })
179+ if err != nil {
180+ return nil , nil , fmt .Errorf ("failed to initialize sink: %w" , err )
181+ }
182+
183+ closer := func () {
184+ if err = s .Close (); err != nil {
185+ logger .Error ("failed to close sink worker" , slog .String ("err" , err .Error ()))
186+ }
187+ }
188+
189+ return s , closer , nil
190+ }
0 commit comments