Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ PUBLISHER_KAFKA_CLIENT_RETRY_BACKOFF_MS="100"
PUBLISHER_KAFKA_CLIENT_STATISTICS_INTERVAL_MS=5000
PUBLISHER_KAFKA_CLIENT_QUEUE_BUFFERING_MAX_MESSAGES=100000
PUBLISHER_KAFKA_FLUSH_INTERVAL_MS=1000
PUBLISHER_KAFKA_DELIVERY_REPORT_INTERVAL_MS=60000
PUBLISHER_KAFKA_DELIVERY_REPORT_TOPIC_NAME="clickstream-total-event-log"

METRIC_STATSD_ADDRESS=":8125"
METRIC_STATSD_FLUSH_PERIOD_MS=100
Expand Down
2 changes: 2 additions & 0 deletions .env.test
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ PUBLISHER_KAFKA_CLIENT_RETRY_BACKOFF_MS="100"
PUBLISHER_KAFKA_CLIENT_STATISTICS_INTERVAL_MS=5000
PUBLISHER_KAFKA_CLIENT_QUEUE_BUFFERING_MAX_MESSAGES=100000
PUBLISHER_KAFKA_FLUSH_INTERVAL_MS=1000
PUBLISHER_KAFKA_DELIVERY_REPORT_INTERVAL_MS=60000
PUBLISHER_KAFKA_DELIVERY_REPORT_TOPIC_NAME="clickstream-total-event-log"

METRIC_STATSD_ADDRESS=":8125"
METRIC_STATSD_FLUSH_PERIOD_MS=100
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v2.1.3
with:
go-version: '1.17'
go-version: '1.23'
- name: Checkout repo
uses: actions/checkout@v2
- name: Setup Project
Expand All @@ -28,7 +28,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v2.1.3
with:
go-version: '1.17'
go-version: '1.23'
- uses: actions/checkout@v2
- name: Build
run: make all
8 changes: 4 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
FROM golang:1.17
FROM golang:1.23

WORKDIR /app
RUN apt-get update && apt-get install unzip --no-install-recommends --assume-yes
COPY . .
RUN make update-deps && make compile

FROM debian:bullseye
#bookworm-slim comparible with golang:1.23 and has glibc 2.34
FROM debian:bookworm-slim
WORKDIR /app
COPY --from=0 /app/raccoon ./raccoon
COPY . .
CMD ["./raccoon"]
CMD ["./raccoon"]
1 change: 1 addition & 0 deletions app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func StartServer(ctx context.Context, cancel context.CancelFunc, shutdown chan b
workerPool.StartWorkers()
go kPublisher.ReportStats()
go reportProcMetrics()
go kPublisher.ReportDeliveryEventCount()
// create signal channel at startup
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
Expand Down
2 changes: 1 addition & 1 deletion app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestShutDownServer(t *testing.T) {
defer cancel()
mockKafka := &mockKafkaClient{}

kp := publisher.NewKafkaFromClient(mockKafka, 50, "test")
kp := publisher.NewKafkaFromClient(mockKafka, 50, "test", 1*time.Millisecond, "clickstream-test-log")

shutdownCh := make(chan bool, 1)
bufferCh := make(chan collection.CollectRequest, 1)
Expand Down
5 changes: 5 additions & 0 deletions config/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,14 @@ func TestDynamicConfigLoad(t *testing.T) {

func TestKafkaConfig_ToKafkaConfigMap(t *testing.T) {
os.Setenv("PUBLISHER_KAFKA_FLUSH_INTERVAL_MS", "1000")
os.Setenv("PUBLISHER_KAFKA_DELIVERY_REPORT_TOPIC_NAME", "clickstream-test-log")
os.Setenv("PUBLISHER_KAFKA_DELIVERY_REPORT_INTERVAL_MS", "60")
os.Setenv("PUBLISHER_KAFKA_CLIENT_BOOTSTRAP_SERVERS", "kafka:9092")
os.Setenv("PUBLISHER_KAFKA_CLIENT_ACKS", "1")
os.Setenv("PUBLISHER_KAFKA_CLIENT_QUEUE_BUFFERING_MAX_MESSAGES", "10000")
os.Setenv("SOMETHING_PUBLISHER_KAFKA_CLIENT_SOMETHING", "anything")
publisherKafkaConfigLoader()
expectedDeliveryReportInterval := 60 * time.Millisecond
kafkaConfig := PublisherKafka.ToKafkaConfigMap()
bootstrapServer, _ := kafkaConfig.Get("bootstrap.servers", "")
topic, _ := kafkaConfig.Get("topic", "")
Expand All @@ -76,6 +79,8 @@ func TestKafkaConfig_ToKafkaConfigMap(t *testing.T) {
assert.Equal(t, "", topic)
assert.NotEqual(t, something, "anything")
assert.Equal(t, 4, len(*kafkaConfig))
assert.Equal(t, expectedDeliveryReportInterval, PublisherKafka.DeliveryReportInterval)
assert.Equal(t, "clickstream-test-log", PublisherKafka.DeliveryReportTopic)
}

func TestWorkerConfig(t *testing.T) {
Expand Down
11 changes: 9 additions & 2 deletions config/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"os"
"strings"
"time"

"github.com/goto/raccoon/config/util"
"github.com/spf13/viper"
Expand All @@ -14,7 +15,9 @@ var PublisherKafka publisherKafka
var dynamicKafkaClientConfigPrefix = "PUBLISHER_KAFKA_CLIENT_"

type publisherKafka struct {
FlushInterval int
FlushInterval int
DeliveryReportInterval time.Duration
DeliveryReportTopic string
}

func (k publisherKafka) ToKafkaConfigMap() *confluent.ConfigMap {
Expand Down Expand Up @@ -43,9 +46,13 @@ func dynamicKafkaClientConfigLoad() []byte {
func publisherKafkaConfigLoader() {
viper.SetDefault("PUBLISHER_KAFKA_CLIENT_QUEUE_BUFFERING_MAX_MESSAGES", "100000")
viper.SetDefault("PUBLISHER_KAFKA_FLUSH_INTERVAL_MS", "1000")
viper.SetDefault("PUBLISHER_KAFKA_DELIVERY_REPORT_INTERVAL_MS", "60000")
viper.SetDefault("PUBLISHER_KAFKA_DELIVERY_REPORT_TOPIC_NAME", "clickstream-total-event-log")
viper.MergeConfig(bytes.NewBuffer(dynamicKafkaClientConfigLoad()))

PublisherKafka = publisherKafka{
FlushInterval: util.MustGetInt("PUBLISHER_KAFKA_FLUSH_INTERVAL_MS"),
FlushInterval: util.MustGetInt("PUBLISHER_KAFKA_FLUSH_INTERVAL_MS"),
DeliveryReportInterval: util.MustGetDuration("PUBLISHER_KAFKA_DELIVERY_REPORT_INTERVAL_MS", time.Millisecond),
DeliveryReportTopic: util.MustGetString("PUBLISHER_KAFKA_DELIVERY_REPORT_TOPIC_NAME"),
}
}
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
module github.com/goto/raccoon

go 1.17
go 1.23

require (
buf.build/gen/go/gotocompany/proton/grpc/go v1.3.0-20230313110213-9a3d240d5293.1
buf.build/gen/go/gotocompany/proton/protocolbuffers/go v1.29.0-20230313110213-9a3d240d5293.1
buf.build/gen/go/gotocompany/proton/protocolbuffers/go v1.36.9-20250923105611-d4292d610761.1
github.com/gorilla/mux v1.7.4
github.com/gorilla/websocket v1.4.2
github.com/sirupsen/logrus v1.6.0
github.com/spf13/viper v1.7.0
github.com/stretchr/testify v1.8.1
google.golang.org/grpc v1.53.0
google.golang.org/protobuf v1.29.0
google.golang.org/protobuf v1.36.9
gopkg.in/alexcesaro/statsd.v2 v2.0.0
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.4.2
)
Expand Down
10 changes: 4 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
buf.build/gen/go/envoyproxy/protoc-gen-validate/grpc/go v1.3.0-20220812150255-45685e052c7e.1/go.mod h1:MYI7lG0UVkUYOAsBEqZV4teiG+sD8w02v/lUHePAUgU=
buf.build/gen/go/envoyproxy/protoc-gen-validate/protocolbuffers/go v1.28.1-20220812150255-45685e052c7e.4/go.mod h1:hY+2q+FbjbfelDKYRqA/RnrnBrAiTYatnPLx6JFklXI=
buf.build/gen/go/envoyproxy/protoc-gen-validate/protocolbuffers/go v1.29.0-20220812150255-45685e052c7e.1/go.mod h1:YOh3xePeWOdqK7OZtDi3IFnKp+oz2Uveoy2+M6jByRI=
buf.build/gen/go/gotocompany/proton/grpc/go v1.3.0-20230313110213-9a3d240d5293.1 h1:O5qalwfO6Eydq5CAi4CCMS4SHlUYkwWy62S9jd2pivE=
buf.build/gen/go/gotocompany/proton/grpc/go v1.3.0-20230313110213-9a3d240d5293.1/go.mod h1:oaufFdUEC9ZKlBZJmkA/0ycQ8Ntn4vavkViSGC0Ht6o=
buf.build/gen/go/gotocompany/proton/protocolbuffers/go v1.28.1-20230313110213-9a3d240d5293.4/go.mod h1:SkLAgdMqJ3u9UGFY064fVrnkkfAIaR8JqyOkoa9mz3U=
buf.build/gen/go/gotocompany/proton/protocolbuffers/go v1.29.0-20230313110213-9a3d240d5293.1 h1:7qQ1zdLofKy0pc6zvRlwlj6y6Y4aGxYSm9k7jJU59V4=
buf.build/gen/go/gotocompany/proton/protocolbuffers/go v1.29.0-20230313110213-9a3d240d5293.1/go.mod h1:nY80cfNDSgV/ovE2vdgs0UeosGsIIceHZBQldm1BUKA=
buf.build/gen/go/gotocompany/proton/protocolbuffers/go v1.36.9-20250923105611-d4292d610761.1 h1:nvstcoWquAiZNTDmOEndcFYYlHoG0sDxQ3BA5f2ByzA=
buf.build/gen/go/gotocompany/proton/protocolbuffers/go v1.36.9-20250923105611-d4292d610761.1/go.mod h1:iu9vyGlLXR80cI2ej7ov5KwJIDki7SsiXDcmklQS1N0=
buf.build/gen/go/grpc-ecosystem/grpc-gateway/grpc/go v1.3.0-20220906183531-bc28b723cd77.1/go.mod h1:9Ec7rvBnjfZvU/TnWjtcSGgiLQ4B+U3B+6SnZgVTA7A=
buf.build/gen/go/grpc-ecosystem/grpc-gateway/protocolbuffers/go v1.28.1-20220906183531-bc28b723cd77.4/go.mod h1:92ejKVTiuvnKoAtRlpJpIxKfloI935DDqhs0NCRx+KM=
buf.build/gen/go/grpc-ecosystem/grpc-gateway/protocolbuffers/go v1.29.0-20220906183531-bc28b723cd77.1/go.mod h1:KdKLHnCB+HSTqTEczw09mmFoJtNrCF7urUbAakgSGhg=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
Expand Down Expand Up @@ -1256,8 +1254,8 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.29.0 h1:44S3JjaKmLEE4YIkjzexaP+NzZsudE3Zin5Njn/pYX0=
google.golang.org/protobuf v1.29.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw=
google.golang.org/protobuf v1.36.9/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/alexcesaro/statsd.v2 v2.0.0 h1:FXkZSCZIH17vLCO5sO2UucTHsH9pc+17F6pl3JVCwMc=
gopkg.in/alexcesaro/statsd.v2 v2.0.0/go.mod h1:i0ubccKGzBVNBpdGV5MocxyA/XlLUJzA7SLonnE4drU=
Expand Down
71 changes: 61 additions & 10 deletions publisher/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ package publisher
import (
"encoding/json"
"fmt"
"github.com/goto/raccoon/serialization"
"google.golang.org/protobuf/types/known/timestamppb"
"strings"
"sync/atomic"
"time"

"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
// Importing librd to make it work on vendor mode
Expand All @@ -20,6 +24,8 @@ const (
errLargeMessageSize = "Broker: Message size too large" //error msg while producing a message which is larger than message.max.bytes config
)

var DeliveryEventCount int64

// KafkaProducer Produce data to kafka synchronously
type KafkaProducer interface {
// ProduceBulk message to kafka. Block until all messages are sent. Return array of error. Order is not guaranteed.
Expand All @@ -32,24 +38,31 @@ func NewKafka() (*Kafka, error) {
return &Kafka{}, err
}
return &Kafka{
kp: kp,
flushInterval: config.PublisherKafka.FlushInterval,
topicFormat: config.EventDistribution.PublisherPattern,
kp: kp,
flushInterval: config.PublisherKafka.FlushInterval,
topicFormat: config.EventDistribution.PublisherPattern,
deliveryReportInterval: config.PublisherKafka.DeliveryReportInterval,
deliveryReportTopic: config.PublisherKafka.DeliveryReportTopic,
}, nil
}

func NewKafkaFromClient(client Client, flushInterval int, topicFormat string) *Kafka {
func NewKafkaFromClient(client Client, flushInterval int, topicFormat string, deliveryReportInterval time.Duration,
topicName string) *Kafka {
return &Kafka{
kp: client,
flushInterval: flushInterval,
topicFormat: topicFormat,
kp: client,
flushInterval: flushInterval,
topicFormat: topicFormat,
deliveryReportInterval: deliveryReportInterval,
deliveryReportTopic: topicName,
}
}

type Kafka struct {
kp Client
flushInterval int
topicFormat string
kp Client
flushInterval int
topicFormat string
deliveryReportInterval time.Duration
deliveryReportTopic string
}

// ProduceBulk messages to kafka. Block until all messages are sent. Return array of error. Order of Errors is guaranteed.
Expand Down Expand Up @@ -89,6 +102,7 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann
totalProcessed++
}
// Wait for deliveryChannel as many as processed
localSuccesses := int64(0)
for i := 0; i < totalProcessed; i++ {
d := <-deliveryChannel
m := d.(*kafka.Message)
Expand All @@ -99,15 +113,33 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann
metrics.Increment("kafka_error", fmt.Sprintf("type=%s,event_type=%s,conn_group=%s", "delivery_failed", eventType, connGroup))
order := m.Opaque.(int)
errors[order] = m.TopicPartition.Error
} else {
localSuccesses++
}
}

// Single atomic update per batch
if localSuccesses > 0 {
atomic.AddInt64(&DeliveryEventCount, localSuccesses)
}

if allNil(errors) {
return nil
}
return BulkError{Errors: errors}
}

func (pr *Kafka) produceTotalEventMessage(topicName string, event *pb.TotalEventCountMessage) error {
value, err := serialization.SerializeProto(event)
if err != nil {
return fmt.Errorf("failed to serialize proto: %w", err)
}
return pr.kp.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topicName, Partition: kafka.PartitionAny},
Value: value,
}, nil)
}

func (pr *Kafka) ReportStats() {
for v := range pr.kp.Events() {
switch e := v.(type) {
Expand Down Expand Up @@ -141,6 +173,25 @@ func (pr *Kafka) ReportStats() {
}
}

func (pr *Kafka) ReportDeliveryEventCount() {
ticker := time.NewTicker(pr.deliveryReportInterval)
defer ticker.Stop()

for range ticker.C {
// read the value
eventCount := atomic.LoadInt64(&DeliveryEventCount)
//build kafka message
msg := &pb.TotalEventCountMessage{
EventTimestamp: timestamppb.Now(),
EventCount: int32(eventCount),
}
//produce to kafka
pr.produceTotalEventMessage(pr.deliveryReportTopic, msg)
//reset the counter
atomic.StoreInt64(&DeliveryEventCount, 0)
}
}

// Close wait for outstanding messages to be delivered within given flush interval timeout.
func (pr *Kafka) Close() int {
remaining := pr.kp.Flush(pr.flushInterval)
Expand Down
Loading