diff --git a/.env.sample b/.env.sample index f36ab4c8..0054cab0 100644 --- a/.env.sample +++ b/.env.sample @@ -26,6 +26,8 @@ PUBLISHER_KAFKA_CLIENT_RETRY_BACKOFF_MS="100" PUBLISHER_KAFKA_CLIENT_STATISTICS_INTERVAL_MS=5000 PUBLISHER_KAFKA_CLIENT_QUEUE_BUFFERING_MAX_MESSAGES=100000 PUBLISHER_KAFKA_FLUSH_INTERVAL_MS=1000 +PUBLISHER_KAFKA_DELIVERY_REPORT_INTERVAL_MS=60000 +PUBLISHER_KAFKA_DELIVERY_REPORT_TOPIC_NAME="clickstream-total-event-log" METRIC_STATSD_ADDRESS=":8125" METRIC_STATSD_FLUSH_PERIOD_MS=100 diff --git a/.env.test b/.env.test index 1136269e..a5b3cf9b 100644 --- a/.env.test +++ b/.env.test @@ -27,6 +27,8 @@ PUBLISHER_KAFKA_CLIENT_RETRY_BACKOFF_MS="100" PUBLISHER_KAFKA_CLIENT_STATISTICS_INTERVAL_MS=5000 PUBLISHER_KAFKA_CLIENT_QUEUE_BUFFERING_MAX_MESSAGES=100000 PUBLISHER_KAFKA_FLUSH_INTERVAL_MS=1000 +PUBLISHER_KAFKA_DELIVERY_REPORT_INTERVAL_MS=60000 +PUBLISHER_KAFKA_DELIVERY_REPORT_TOPIC_NAME="clickstream-total-event-log" METRIC_STATSD_ADDRESS=":8125" METRIC_STATSD_FLUSH_PERIOD_MS=100 diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 9f548e27..3953c2c3 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -12,7 +12,7 @@ jobs: - name: Setup Go uses: actions/setup-go@v2.1.3 with: - go-version: '1.17' + go-version: '1.23' - name: Checkout repo uses: actions/checkout@v2 - name: Setup Project @@ -28,7 +28,7 @@ jobs: - name: Setup Go uses: actions/setup-go@v2.1.3 with: - go-version: '1.17' + go-version: '1.23' - uses: actions/checkout@v2 - name: Build run: make all diff --git a/Dockerfile b/Dockerfile index 03a55aab..3263a6f8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,12 +1,12 @@ -FROM golang:1.17 +FROM golang:1.23 WORKDIR /app RUN apt-get update && apt-get install unzip --no-install-recommends --assume-yes COPY . . RUN make update-deps && make compile - -FROM debian:bullseye +#bookworm-slim comparible with golang:1.23 and has glibc 2.34 +FROM debian:bookworm-slim WORKDIR /app COPY --from=0 /app/raccoon ./raccoon COPY . . -CMD ["./raccoon"] +CMD ["./raccoon"] \ No newline at end of file diff --git a/app/server.go b/app/server.go index 525761b6..bc658217 100644 --- a/app/server.go +++ b/app/server.go @@ -37,6 +37,7 @@ func StartServer(ctx context.Context, cancel context.CancelFunc, shutdown chan b workerPool.StartWorkers() go kPublisher.ReportStats() go reportProcMetrics() + go kPublisher.ReportDeliveryEventCount() // create signal channel at startup signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) diff --git a/app/server_test.go b/app/server_test.go index f9f5ad63..dee705c1 100644 --- a/app/server_test.go +++ b/app/server_test.go @@ -19,7 +19,7 @@ func TestShutDownServer(t *testing.T) { defer cancel() mockKafka := &mockKafkaClient{} - kp := publisher.NewKafkaFromClient(mockKafka, 50, "test") + kp := publisher.NewKafkaFromClient(mockKafka, 50, "test", 1*time.Millisecond, "clickstream-test-log") shutdownCh := make(chan bool, 1) bufferCh := make(chan collection.CollectRequest, 1) diff --git a/config/load_test.go b/config/load_test.go index 20480c3a..4feee7c2 100644 --- a/config/load_test.go +++ b/config/load_test.go @@ -63,11 +63,14 @@ func TestDynamicConfigLoad(t *testing.T) { func TestKafkaConfig_ToKafkaConfigMap(t *testing.T) { os.Setenv("PUBLISHER_KAFKA_FLUSH_INTERVAL_MS", "1000") + os.Setenv("PUBLISHER_KAFKA_DELIVERY_REPORT_TOPIC_NAME", "clickstream-test-log") + os.Setenv("PUBLISHER_KAFKA_DELIVERY_REPORT_INTERVAL_MS", "60") os.Setenv("PUBLISHER_KAFKA_CLIENT_BOOTSTRAP_SERVERS", "kafka:9092") os.Setenv("PUBLISHER_KAFKA_CLIENT_ACKS", "1") os.Setenv("PUBLISHER_KAFKA_CLIENT_QUEUE_BUFFERING_MAX_MESSAGES", "10000") os.Setenv("SOMETHING_PUBLISHER_KAFKA_CLIENT_SOMETHING", "anything") publisherKafkaConfigLoader() + expectedDeliveryReportInterval := 60 * time.Millisecond kafkaConfig := PublisherKafka.ToKafkaConfigMap() bootstrapServer, _ := kafkaConfig.Get("bootstrap.servers", "") topic, _ := kafkaConfig.Get("topic", "") @@ -76,6 +79,8 @@ func TestKafkaConfig_ToKafkaConfigMap(t *testing.T) { assert.Equal(t, "", topic) assert.NotEqual(t, something, "anything") assert.Equal(t, 4, len(*kafkaConfig)) + assert.Equal(t, expectedDeliveryReportInterval, PublisherKafka.DeliveryReportInterval) + assert.Equal(t, "clickstream-test-log", PublisherKafka.DeliveryReportTopic) } func TestWorkerConfig(t *testing.T) { diff --git a/config/publisher.go b/config/publisher.go index 24f262aa..9827c901 100644 --- a/config/publisher.go +++ b/config/publisher.go @@ -4,6 +4,7 @@ import ( "bytes" "os" "strings" + "time" "github.com/goto/raccoon/config/util" "github.com/spf13/viper" @@ -14,7 +15,9 @@ var PublisherKafka publisherKafka var dynamicKafkaClientConfigPrefix = "PUBLISHER_KAFKA_CLIENT_" type publisherKafka struct { - FlushInterval int + FlushInterval int + DeliveryReportInterval time.Duration + DeliveryReportTopic string } func (k publisherKafka) ToKafkaConfigMap() *confluent.ConfigMap { @@ -43,9 +46,13 @@ func dynamicKafkaClientConfigLoad() []byte { func publisherKafkaConfigLoader() { viper.SetDefault("PUBLISHER_KAFKA_CLIENT_QUEUE_BUFFERING_MAX_MESSAGES", "100000") viper.SetDefault("PUBLISHER_KAFKA_FLUSH_INTERVAL_MS", "1000") + viper.SetDefault("PUBLISHER_KAFKA_DELIVERY_REPORT_INTERVAL_MS", "60000") + viper.SetDefault("PUBLISHER_KAFKA_DELIVERY_REPORT_TOPIC_NAME", "clickstream-total-event-log") viper.MergeConfig(bytes.NewBuffer(dynamicKafkaClientConfigLoad())) PublisherKafka = publisherKafka{ - FlushInterval: util.MustGetInt("PUBLISHER_KAFKA_FLUSH_INTERVAL_MS"), + FlushInterval: util.MustGetInt("PUBLISHER_KAFKA_FLUSH_INTERVAL_MS"), + DeliveryReportInterval: util.MustGetDuration("PUBLISHER_KAFKA_DELIVERY_REPORT_INTERVAL_MS", time.Millisecond), + DeliveryReportTopic: util.MustGetString("PUBLISHER_KAFKA_DELIVERY_REPORT_TOPIC_NAME"), } } diff --git a/go.mod b/go.mod index 2c087e9b..fd6315e3 100644 --- a/go.mod +++ b/go.mod @@ -1,17 +1,17 @@ module github.com/goto/raccoon -go 1.17 +go 1.23 require ( buf.build/gen/go/gotocompany/proton/grpc/go v1.3.0-20230313110213-9a3d240d5293.1 - buf.build/gen/go/gotocompany/proton/protocolbuffers/go v1.29.0-20230313110213-9a3d240d5293.1 + buf.build/gen/go/gotocompany/proton/protocolbuffers/go v1.36.9-20250923105611-d4292d610761.1 github.com/gorilla/mux v1.7.4 github.com/gorilla/websocket v1.4.2 github.com/sirupsen/logrus v1.6.0 github.com/spf13/viper v1.7.0 github.com/stretchr/testify v1.8.1 google.golang.org/grpc v1.53.0 - google.golang.org/protobuf v1.29.0 + google.golang.org/protobuf v1.36.9 gopkg.in/alexcesaro/statsd.v2 v2.0.0 gopkg.in/confluentinc/confluent-kafka-go.v1 v1.4.2 ) diff --git a/go.sum b/go.sum index 2e17b1ed..97a345a0 100644 --- a/go.sum +++ b/go.sum @@ -1,14 +1,12 @@ buf.build/gen/go/envoyproxy/protoc-gen-validate/grpc/go v1.3.0-20220812150255-45685e052c7e.1/go.mod h1:MYI7lG0UVkUYOAsBEqZV4teiG+sD8w02v/lUHePAUgU= buf.build/gen/go/envoyproxy/protoc-gen-validate/protocolbuffers/go v1.28.1-20220812150255-45685e052c7e.4/go.mod h1:hY+2q+FbjbfelDKYRqA/RnrnBrAiTYatnPLx6JFklXI= -buf.build/gen/go/envoyproxy/protoc-gen-validate/protocolbuffers/go v1.29.0-20220812150255-45685e052c7e.1/go.mod h1:YOh3xePeWOdqK7OZtDi3IFnKp+oz2Uveoy2+M6jByRI= buf.build/gen/go/gotocompany/proton/grpc/go v1.3.0-20230313110213-9a3d240d5293.1 h1:O5qalwfO6Eydq5CAi4CCMS4SHlUYkwWy62S9jd2pivE= buf.build/gen/go/gotocompany/proton/grpc/go v1.3.0-20230313110213-9a3d240d5293.1/go.mod h1:oaufFdUEC9ZKlBZJmkA/0ycQ8Ntn4vavkViSGC0Ht6o= buf.build/gen/go/gotocompany/proton/protocolbuffers/go v1.28.1-20230313110213-9a3d240d5293.4/go.mod h1:SkLAgdMqJ3u9UGFY064fVrnkkfAIaR8JqyOkoa9mz3U= -buf.build/gen/go/gotocompany/proton/protocolbuffers/go v1.29.0-20230313110213-9a3d240d5293.1 h1:7qQ1zdLofKy0pc6zvRlwlj6y6Y4aGxYSm9k7jJU59V4= -buf.build/gen/go/gotocompany/proton/protocolbuffers/go v1.29.0-20230313110213-9a3d240d5293.1/go.mod h1:nY80cfNDSgV/ovE2vdgs0UeosGsIIceHZBQldm1BUKA= +buf.build/gen/go/gotocompany/proton/protocolbuffers/go v1.36.9-20250923105611-d4292d610761.1 h1:nvstcoWquAiZNTDmOEndcFYYlHoG0sDxQ3BA5f2ByzA= +buf.build/gen/go/gotocompany/proton/protocolbuffers/go v1.36.9-20250923105611-d4292d610761.1/go.mod h1:iu9vyGlLXR80cI2ej7ov5KwJIDki7SsiXDcmklQS1N0= buf.build/gen/go/grpc-ecosystem/grpc-gateway/grpc/go v1.3.0-20220906183531-bc28b723cd77.1/go.mod h1:9Ec7rvBnjfZvU/TnWjtcSGgiLQ4B+U3B+6SnZgVTA7A= buf.build/gen/go/grpc-ecosystem/grpc-gateway/protocolbuffers/go v1.28.1-20220906183531-bc28b723cd77.4/go.mod h1:92ejKVTiuvnKoAtRlpJpIxKfloI935DDqhs0NCRx+KM= -buf.build/gen/go/grpc-ecosystem/grpc-gateway/protocolbuffers/go v1.29.0-20220906183531-bc28b723cd77.1/go.mod h1:KdKLHnCB+HSTqTEczw09mmFoJtNrCF7urUbAakgSGhg= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= @@ -1256,8 +1254,8 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -google.golang.org/protobuf v1.29.0 h1:44S3JjaKmLEE4YIkjzexaP+NzZsudE3Zin5Njn/pYX0= -google.golang.org/protobuf v1.29.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw= +google.golang.org/protobuf v1.36.9/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/alexcesaro/statsd.v2 v2.0.0 h1:FXkZSCZIH17vLCO5sO2UucTHsH9pc+17F6pl3JVCwMc= gopkg.in/alexcesaro/statsd.v2 v2.0.0/go.mod h1:i0ubccKGzBVNBpdGV5MocxyA/XlLUJzA7SLonnE4drU= diff --git a/publisher/kafka.go b/publisher/kafka.go index 40b5c947..386cb314 100644 --- a/publisher/kafka.go +++ b/publisher/kafka.go @@ -3,7 +3,11 @@ package publisher import ( "encoding/json" "fmt" + "github.com/goto/raccoon/serialization" + "google.golang.org/protobuf/types/known/timestamppb" "strings" + "sync/atomic" + "time" "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" // Importing librd to make it work on vendor mode @@ -20,6 +24,8 @@ const ( errLargeMessageSize = "Broker: Message size too large" //error msg while producing a message which is larger than message.max.bytes config ) +var DeliveryEventCount int64 + // KafkaProducer Produce data to kafka synchronously type KafkaProducer interface { // ProduceBulk message to kafka. Block until all messages are sent. Return array of error. Order is not guaranteed. @@ -32,24 +38,31 @@ func NewKafka() (*Kafka, error) { return &Kafka{}, err } return &Kafka{ - kp: kp, - flushInterval: config.PublisherKafka.FlushInterval, - topicFormat: config.EventDistribution.PublisherPattern, + kp: kp, + flushInterval: config.PublisherKafka.FlushInterval, + topicFormat: config.EventDistribution.PublisherPattern, + deliveryReportInterval: config.PublisherKafka.DeliveryReportInterval, + deliveryReportTopic: config.PublisherKafka.DeliveryReportTopic, }, nil } -func NewKafkaFromClient(client Client, flushInterval int, topicFormat string) *Kafka { +func NewKafkaFromClient(client Client, flushInterval int, topicFormat string, deliveryReportInterval time.Duration, + topicName string) *Kafka { return &Kafka{ - kp: client, - flushInterval: flushInterval, - topicFormat: topicFormat, + kp: client, + flushInterval: flushInterval, + topicFormat: topicFormat, + deliveryReportInterval: deliveryReportInterval, + deliveryReportTopic: topicName, } } type Kafka struct { - kp Client - flushInterval int - topicFormat string + kp Client + flushInterval int + topicFormat string + deliveryReportInterval time.Duration + deliveryReportTopic string } // ProduceBulk messages to kafka. Block until all messages are sent. Return array of error. Order of Errors is guaranteed. @@ -89,6 +102,7 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann totalProcessed++ } // Wait for deliveryChannel as many as processed + localSuccesses := int64(0) for i := 0; i < totalProcessed; i++ { d := <-deliveryChannel m := d.(*kafka.Message) @@ -99,15 +113,33 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann metrics.Increment("kafka_error", fmt.Sprintf("type=%s,event_type=%s,conn_group=%s", "delivery_failed", eventType, connGroup)) order := m.Opaque.(int) errors[order] = m.TopicPartition.Error + } else { + localSuccesses++ } } + // Single atomic update per batch + if localSuccesses > 0 { + atomic.AddInt64(&DeliveryEventCount, localSuccesses) + } + if allNil(errors) { return nil } return BulkError{Errors: errors} } +func (pr *Kafka) produceTotalEventMessage(topicName string, event *pb.TotalEventCountMessage) error { + value, err := serialization.SerializeProto(event) + if err != nil { + return fmt.Errorf("failed to serialize proto: %w", err) + } + return pr.kp.Produce(&kafka.Message{ + TopicPartition: kafka.TopicPartition{Topic: &topicName, Partition: kafka.PartitionAny}, + Value: value, + }, nil) +} + func (pr *Kafka) ReportStats() { for v := range pr.kp.Events() { switch e := v.(type) { @@ -141,6 +173,25 @@ func (pr *Kafka) ReportStats() { } } +func (pr *Kafka) ReportDeliveryEventCount() { + ticker := time.NewTicker(pr.deliveryReportInterval) + defer ticker.Stop() + + for range ticker.C { + // read the value + eventCount := atomic.LoadInt64(&DeliveryEventCount) + //build kafka message + msg := &pb.TotalEventCountMessage{ + EventTimestamp: timestamppb.Now(), + EventCount: int32(eventCount), + } + //produce to kafka + pr.produceTotalEventMessage(pr.deliveryReportTopic, msg) + //reset the counter + atomic.StoreInt64(&DeliveryEventCount, 0) + } +} + // Close wait for outstanding messages to be delivered within given flush interval timeout. func (pr *Kafka) Close() int { remaining := pr.kp.Flush(pr.flushInterval) diff --git a/publisher/kafka_test.go b/publisher/kafka_test.go index 6af8b33d..31073315 100644 --- a/publisher/kafka_test.go +++ b/publisher/kafka_test.go @@ -2,8 +2,11 @@ package publisher import ( "fmt" + "github.com/goto/raccoon/deserialization" "os" + "sync/atomic" "testing" + "time" pb "buf.build/gen/go/gotocompany/proton/protocolbuffers/go/gotocompany/raccoon/v1beta1" "github.com/goto/raccoon/logger" @@ -26,12 +29,17 @@ func TestMain(t *testing.M) { os.Exit(t.Run()) } +var ( + deliveryReportInterval = 1 * time.Millisecond + deliveryReportTopic = "clickstream-test-log" +) + func TestProducer_Close(suite *testing.T) { suite.Run("Should flush before closing the client", func(t *testing.T) { client := &mockClient{} client.On("Flush", 10).Return(0) client.On("Close").Return() - kp := NewKafkaFromClient(client, 10, "%s") + kp := NewKafkaFromClient(client, 10, "%s", deliveryReportInterval, deliveryReportTopic) kp.Close() client.AssertExpectations(t) }) @@ -55,10 +63,11 @@ func TestKafka_ProduceBulk(suite *testing.T) { } }() }) - kp := NewKafkaFromClient(client, 10, "%s") - + kp := NewKafkaFromClient(client, 10, "%s", deliveryReportInterval, deliveryReportTopic) + atomic.StoreInt64(&DeliveryEventCount, 0) //reset the value to 0 before call the produce function err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}, group1, make(chan kafka.Event, 2)) assert.NoError(t, err) + assert.Equal(t, int64(2), atomic.LoadInt64(&DeliveryEventCount)) //two events are sent to produce bulk }) }) @@ -79,31 +88,34 @@ func TestKafka_ProduceBulk(suite *testing.T) { }() }).Once() client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf("buffer full")).Once() - kp := NewKafkaFromClient(client, 10, "%s") - + kp := NewKafkaFromClient(client, 10, "%s", deliveryReportInterval, deliveryReportTopic) + atomic.StoreInt64(&DeliveryEventCount, 0) //reset the value to 0 before call the produce function err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}, group1, make(chan kafka.Event, 2)) assert.Len(t, err.(BulkError).Errors, 3) assert.Error(t, err.(BulkError).Errors[0]) assert.Empty(t, err.(BulkError).Errors[1]) assert.Error(t, err.(BulkError).Errors[2]) + assert.Equal(t, int64(1), atomic.LoadInt64(&DeliveryEventCount)) // only one message has been successfully produced }) t.Run("Should return topic name when unknown topic is returned", func(t *testing.T) { client := &mockClient{} client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf(errUnknownTopic)).Once() - kp := NewKafkaFromClient(client, 10, "%s") - + kp := NewKafkaFromClient(client, 10, "%s", deliveryReportInterval, deliveryReportTopic) + atomic.StoreInt64(&DeliveryEventCount, 0) //reset the value to 0 before call the produce function err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}}, "group1", make(chan kafka.Event, 2)) assert.EqualError(t, err.(BulkError).Errors[0], errUnknownTopic+" "+topic) + assert.Equal(t, int64(0), atomic.LoadInt64(&DeliveryEventCount)) // no message has been successfully produced }) t.Run("Should return topic name when message size is too large", func(t *testing.T) { client := &mockClient{} client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf(errLargeMessageSize)).Once() - kp := NewKafkaFromClient(client, 10, "%s") - + kp := NewKafkaFromClient(client, 10, "%s", deliveryReportInterval, deliveryReportTopic) + atomic.StoreInt64(&DeliveryEventCount, 0) //reset the value to 0 before call the produce function err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}}, "group1", make(chan kafka.Event, 2)) assert.EqualError(t, err.(BulkError).Errors[0], errLargeMessageSize+" "+topic) + assert.Equal(t, int64(0), atomic.LoadInt64(&DeliveryEventCount)) // no message has been successfully produced }) }) @@ -124,13 +136,48 @@ func TestKafka_ProduceBulk(suite *testing.T) { } }() }).Once() - kp := NewKafkaFromClient(client, 10, "%s") - + kp := NewKafkaFromClient(client, 10, "%s", deliveryReportInterval, deliveryReportTopic) + atomic.StoreInt64(&DeliveryEventCount, 0) //reset the value to 0 before call the produce function err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}, "group1", make(chan kafka.Event, 2)) assert.NotEmpty(t, err) assert.Len(t, err.(BulkError).Errors, 2) assert.Equal(t, "buffer full", err.(BulkError).Errors[0].Error()) assert.Equal(t, "timeout", err.(BulkError).Errors[1].Error()) + assert.Equal(t, int64(0), atomic.LoadInt64(&DeliveryEventCount)) // no message has been successfully produced }) }) } + +func TestKafka_ReportDeliveryEventCount(suite *testing.T) { + suite.Run("Should Produce the delivery count and reset it ", func(t *testing.T) { + client := &mockClient{} + producedCh := make(chan *pb.TotalEventCountMessage, 1) + + client.On("Produce", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) { + msg := args.Get(0).(*kafka.Message) + var produced pb.TotalEventCountMessage + err := deserialization.DeserializeProto(msg.Value, &produced) // use protobuf unmarshal + assert.NoError(t, err) + producedCh <- &produced + }) + + kp := NewKafkaFromClient(client, 10, "%s", 1*time.Millisecond, deliveryReportTopic) + + atomic.StoreInt64(&DeliveryEventCount, 5) + + go kp.ReportDeliveryEventCount() + + // Wait for the message instead of sleeping + select { + case produced := <-producedCh: + assert.Equal(t, int32(5), produced.EventCount) + case <-time.After(10 * time.Millisecond): + t.Fatal("timeout: did not receive produced message") + } + + // After one tick, counter should be reset + assert.Equal(t, int64(0), atomic.LoadInt64(&DeliveryEventCount)) + client.AssertExpectations(t) + }) + +}