Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ PUBLISHER_KAFKA_CLIENT_STATISTICS_INTERVAL_MS=5000
PUBLISHER_KAFKA_CLIENT_QUEUE_BUFFERING_MAX_MESSAGES=100000
PUBLISHER_KAFKA_FLUSH_INTERVAL_MS=1000

STATS_TOPIC_NAME="clickstream-total-event-log"
STATS_FLUSH_INTERVAL_IN_SEC=10
STATS_CHANNEL_SIZE=1000

METRIC_STATSD_ADDRESS=":8125"
METRIC_STATSD_FLUSH_PERIOD_MS=100

Expand Down
4 changes: 4 additions & 0 deletions .env.test
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ PUBLISHER_KAFKA_CLIENT_STATISTICS_INTERVAL_MS=5000
PUBLISHER_KAFKA_CLIENT_QUEUE_BUFFERING_MAX_MESSAGES=100000
PUBLISHER_KAFKA_FLUSH_INTERVAL_MS=1000

STATS_TOPIC_NAME="clickstream-total-event-log"
STATS_FLUSH_INTERVAL_IN_SEC=10
STATS_CHANNEL_SIZE=1000

METRIC_STATSD_ADDRESS=":8125"
METRIC_STATSD_FLUSH_PERIOD_MS=100

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v2.1.3
with:
go-version: '1.17'
go-version: '1.23'
- name: Checkout repo
uses: actions/checkout@v2
- name: Setup Project
Expand All @@ -28,7 +28,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v2.1.3
with:
go-version: '1.17'
go-version: '1.23'
- uses: actions/checkout@v2
- name: Build
run: make all
5 changes: 3 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
FROM golang:1.17
FROM golang:1.23

WORKDIR /app
RUN apt-get update && apt-get install unzip --no-install-recommends --assume-yes
COPY . .
RUN make update-deps && make compile

FROM debian:bullseye
#bookworm-slim comparible with golang:1.23 and has glibc 2.34
FROM debian:bookworm-slim
WORKDIR /app
COPY --from=0 /app/raccoon ./raccoon
COPY . .
Expand Down
6 changes: 5 additions & 1 deletion app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package app
import (
"context"
"fmt"
"github.com/goto/raccoon/stats"
"os"
"os/signal"
"runtime"
Expand All @@ -21,11 +22,12 @@ import (
// StartServer starts the server
func StartServer(ctx context.Context, cancel context.CancelFunc, shutdown chan bool) {
bufferChannel := make(chan collection.CollectRequest, config.Worker.ChannelSize)
eventCountChannel := make(chan int32, config.Stats.ChannelSize)
httpServices := services.Create(bufferChannel, ctx)
logger.Info("Start Server -->")
httpServices.Start(ctx, cancel)
logger.Info("Start publisher -->")
kPublisher, err := publisher.NewKafka()
kPublisher, err := publisher.NewKafka(eventCountChannel)
if err != nil {
logger.Error("Error creating kafka producer", err)
logger.Info("Exiting server")
Expand All @@ -37,6 +39,8 @@ func StartServer(ctx context.Context, cancel context.CancelFunc, shutdown chan b
workerPool.StartWorkers()
go kPublisher.ReportStats()
go reportProcMetrics()
eventStat := stats.CreateTotalEventStat(kPublisher, config.Stats.FlushInterval, config.Stats.TopicName, eventCountChannel)
go eventStat.FlushTotalEventStat()
// create signal channel at startup
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
Expand Down
2 changes: 1 addition & 1 deletion app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestShutDownServer(t *testing.T) {
defer cancel()
mockKafka := &mockKafkaClient{}

kp := publisher.NewKafkaFromClient(mockKafka, 50, "test")
kp := publisher.NewKafkaFromClient(mockKafka, 50, "test", make(chan int32, 1))

shutdownCh := make(chan bool, 1)
bufferCh := make(chan collection.CollectRequest, 1)
Expand Down
1 change: 1 addition & 0 deletions config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ func Load() {
metricStatsdConfigLoader()
eventDistributionConfigLoader()
eventConfigLoader()
statsConfigLoader()
}
10 changes: 10 additions & 0 deletions config/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,13 @@ func TestWorkerConfig(t *testing.T) {
assert.Equal(t, 5, Worker.ChannelSize)
assert.Equal(t, 2, Worker.WorkersPoolSize)
}

func TestStatsConfig(t *testing.T) {
os.Setenv("STATS_TOPIC_NAME", "clickstream-total-event-log")
os.Setenv("STATS_FLUSH_INTERVAL_IN_SEC", "10")
os.Setenv("STATS_CHANNEL_SIZE", "1000")
statsConfigLoader()
assert.Equal(t, time.Duration(10)*time.Second, Stats.FlushInterval)
assert.Equal(t, 1000, Stats.ChannelSize)
assert.Equal(t, "clickstream-total-event-log", Stats.TopicName)
}
5 changes: 2 additions & 3 deletions config/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ package config

import (
"bytes"
"os"
"strings"

"github.com/goto/raccoon/config/util"
"github.com/spf13/viper"
confluent "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
"os"
"strings"
)

var PublisherKafka publisherKafka
Expand Down
28 changes: 28 additions & 0 deletions config/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package config

import (
"time"

"github.com/goto/raccoon/config/util"

"github.com/spf13/viper"
)

var Stats statsConfig

type statsConfig struct {
FlushInterval time.Duration
TopicName string
ChannelSize int
}

func statsConfigLoader() {
viper.SetDefault("STATS_TOPIC_NAME", "clickstream-total-event-log")
viper.SetDefault("STATS_FLUSH_INTERVAL_IN_SEC", 10)
viper.SetDefault("STATS_CHANNEL_SIZE", "1000")
Stats = statsConfig{
TopicName: util.MustGetString("STATS_TOPIC_NAME"),
FlushInterval: util.MustGetDuration("STATS_FLUSH_INTERVAL_IN_SEC", time.Second),
ChannelSize: util.MustGetInt("STATS_CHANNEL_SIZE"),
}
}
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
module github.com/goto/raccoon

go 1.17
go 1.23

require (
buf.build/gen/go/gotocompany/proton/grpc/go v1.3.0-20230313110213-9a3d240d5293.1
buf.build/gen/go/gotocompany/proton/protocolbuffers/go v1.29.0-20230313110213-9a3d240d5293.1
buf.build/gen/go/gotocompany/proton/protocolbuffers/go v1.36.9-20250923105611-d4292d610761.1
github.com/gorilla/mux v1.7.4
github.com/gorilla/websocket v1.4.2
github.com/sirupsen/logrus v1.6.0
github.com/spf13/viper v1.7.0
github.com/stretchr/testify v1.8.1
google.golang.org/grpc v1.53.0
google.golang.org/protobuf v1.29.0
google.golang.org/protobuf v1.36.9
gopkg.in/alexcesaro/statsd.v2 v2.0.0
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.4.2
)
Expand Down
10 changes: 4 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
buf.build/gen/go/envoyproxy/protoc-gen-validate/grpc/go v1.3.0-20220812150255-45685e052c7e.1/go.mod h1:MYI7lG0UVkUYOAsBEqZV4teiG+sD8w02v/lUHePAUgU=
buf.build/gen/go/envoyproxy/protoc-gen-validate/protocolbuffers/go v1.28.1-20220812150255-45685e052c7e.4/go.mod h1:hY+2q+FbjbfelDKYRqA/RnrnBrAiTYatnPLx6JFklXI=
buf.build/gen/go/envoyproxy/protoc-gen-validate/protocolbuffers/go v1.29.0-20220812150255-45685e052c7e.1/go.mod h1:YOh3xePeWOdqK7OZtDi3IFnKp+oz2Uveoy2+M6jByRI=
buf.build/gen/go/gotocompany/proton/grpc/go v1.3.0-20230313110213-9a3d240d5293.1 h1:O5qalwfO6Eydq5CAi4CCMS4SHlUYkwWy62S9jd2pivE=
buf.build/gen/go/gotocompany/proton/grpc/go v1.3.0-20230313110213-9a3d240d5293.1/go.mod h1:oaufFdUEC9ZKlBZJmkA/0ycQ8Ntn4vavkViSGC0Ht6o=
buf.build/gen/go/gotocompany/proton/protocolbuffers/go v1.28.1-20230313110213-9a3d240d5293.4/go.mod h1:SkLAgdMqJ3u9UGFY064fVrnkkfAIaR8JqyOkoa9mz3U=
buf.build/gen/go/gotocompany/proton/protocolbuffers/go v1.29.0-20230313110213-9a3d240d5293.1 h1:7qQ1zdLofKy0pc6zvRlwlj6y6Y4aGxYSm9k7jJU59V4=
buf.build/gen/go/gotocompany/proton/protocolbuffers/go v1.29.0-20230313110213-9a3d240d5293.1/go.mod h1:nY80cfNDSgV/ovE2vdgs0UeosGsIIceHZBQldm1BUKA=
buf.build/gen/go/gotocompany/proton/protocolbuffers/go v1.36.9-20250923105611-d4292d610761.1 h1:nvstcoWquAiZNTDmOEndcFYYlHoG0sDxQ3BA5f2ByzA=
buf.build/gen/go/gotocompany/proton/protocolbuffers/go v1.36.9-20250923105611-d4292d610761.1/go.mod h1:iu9vyGlLXR80cI2ej7ov5KwJIDki7SsiXDcmklQS1N0=
buf.build/gen/go/grpc-ecosystem/grpc-gateway/grpc/go v1.3.0-20220906183531-bc28b723cd77.1/go.mod h1:9Ec7rvBnjfZvU/TnWjtcSGgiLQ4B+U3B+6SnZgVTA7A=
buf.build/gen/go/grpc-ecosystem/grpc-gateway/protocolbuffers/go v1.28.1-20220906183531-bc28b723cd77.4/go.mod h1:92ejKVTiuvnKoAtRlpJpIxKfloI935DDqhs0NCRx+KM=
buf.build/gen/go/grpc-ecosystem/grpc-gateway/protocolbuffers/go v1.29.0-20220906183531-bc28b723cd77.1/go.mod h1:KdKLHnCB+HSTqTEczw09mmFoJtNrCF7urUbAakgSGhg=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
Expand Down Expand Up @@ -1256,8 +1254,8 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.29.0 h1:44S3JjaKmLEE4YIkjzexaP+NzZsudE3Zin5Njn/pYX0=
google.golang.org/protobuf v1.29.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw=
google.golang.org/protobuf v1.36.9/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/alexcesaro/statsd.v2 v2.0.0 h1:FXkZSCZIH17vLCO5sO2UucTHsH9pc+17F6pl3JVCwMc=
gopkg.in/alexcesaro/statsd.v2 v2.0.0/go.mod h1:i0ubccKGzBVNBpdGV5MocxyA/XlLUJzA7SLonnE4drU=
Expand Down
49 changes: 36 additions & 13 deletions publisher/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package publisher
import (
"encoding/json"
"fmt"
"strings"

"github.com/goto/raccoon/serialization"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
"strings"
// Importing librd to make it work on vendor mode
_ "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka/librdkafka"

Expand All @@ -24,32 +24,37 @@ const (
type KafkaProducer interface {
// ProduceBulk message to kafka. Block until all messages are sent. Return array of error. Order is not guaranteed.
ProduceBulk(events []*pb.Event, connGroup string, deliveryChannel chan kafka.Event) error
// ProduceEventStat event stat message to kafka
ProduceEventStat(topicName string, event *pb.TotalEventCountMessage) error
}

func NewKafka() (*Kafka, error) {
func NewKafka(eventCountChannel chan int32) (*Kafka, error) {
kp, err := newKafkaClient(config.PublisherKafka.ToKafkaConfigMap())
if err != nil {
return &Kafka{}, err
}
return &Kafka{
kp: kp,
flushInterval: config.PublisherKafka.FlushInterval,
topicFormat: config.EventDistribution.PublisherPattern,
kp: kp,
flushInterval: config.PublisherKafka.FlushInterval,
topicFormat: config.EventDistribution.PublisherPattern,
eventCountChannel: eventCountChannel,
}, nil
}

func NewKafkaFromClient(client Client, flushInterval int, topicFormat string) *Kafka {
func NewKafkaFromClient(client Client, flushInterval int, topicFormat string, eventCountChannel chan int32) *Kafka {
return &Kafka{
kp: client,
flushInterval: flushInterval,
topicFormat: topicFormat,
kp: client,
flushInterval: flushInterval,
topicFormat: topicFormat,
eventCountChannel: eventCountChannel,
}
}

type Kafka struct {
kp Client
flushInterval int
topicFormat string
kp Client
flushInterval int
topicFormat string
eventCountChannel chan int32 // channel to track successful deliveries
}

// ProduceBulk messages to kafka. Block until all messages are sent. Return array of error. Order of Errors is guaranteed.
Expand Down Expand Up @@ -89,6 +94,7 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann
totalProcessed++
}
// Wait for deliveryChannel as many as processed
successEventCount := int32(0)
for i := 0; i < totalProcessed; i++ {
d := <-deliveryChannel
m := d.(*kafka.Message)
Expand All @@ -99,8 +105,14 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann
metrics.Increment("kafka_error", fmt.Sprintf("type=%s,event_type=%s,conn_group=%s", "delivery_failed", eventType, connGroup))
order := m.Opaque.(int)
errors[order] = m.TopicPartition.Error
} else {
successEventCount++
}
}
// send count to channel instead of atomic
if successEventCount > 0 {
pr.eventCountChannel <- successEventCount
}

if allNil(errors) {
return nil
Expand Down Expand Up @@ -141,6 +153,17 @@ func (pr *Kafka) ReportStats() {
}
}

func (pr *Kafka) ProduceEventStat(topicName string, event *pb.TotalEventCountMessage) error {
value, err := serialization.SerializeProto(event)
if err != nil {
return fmt.Errorf("failed to serialize proto: %w", err)
}
return pr.kp.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topicName, Partition: kafka.PartitionAny},
Value: value,
}, nil)
}

// Close wait for outstanding messages to be delivered within given flush interval timeout.
func (pr *Kafka) Close() int {
remaining := pr.kp.Flush(pr.flushInterval)
Expand Down
Loading