Skip to content

Commit d2d4f40

Browse files
authored
Add sending USER_END events to Kafka (#1388)
1 parent e60920a commit d2d4f40

File tree

8 files changed

+243
-112
lines changed

8 files changed

+243
-112
lines changed

api/http.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"net/http"
66
"time"
77

8-
"github.com/golang/glog"
98
"github.com/julienschmidt/httprouter"
109
"github.com/livepeer/catalyst-api/balancer"
1110
"github.com/livepeer/catalyst-api/config"
@@ -64,17 +63,14 @@ func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal b
6463
router.GET("/healthcheck", withLogging(catalystApiHandlers.Healthcheck()))
6564

6665
if cli.EnableAnalytics == "true" || cli.EnableAnalytics == "enabled" {
67-
logProcessor, err := analytics.NewLogProcessor(cli.KafkaBootstrapServers, cli.KafkaUser, cli.KafkaPassword, cli.AnalyticsKafkaTopic)
68-
if err != nil {
69-
glog.Fatalf("failed to configure analytics log processor, err=%v", err)
70-
} else {
71-
analyticsApiHandlers := handlers.NewAnalyticsHandlersCollection(mapic, lapi, logProcessor)
72-
router.POST("/analytics/log", withCORS(analyticsApiHandlers.Log()))
73-
// Redirect GET /analytics/log to the specific catalyst node, e.g. "mdw-staging-staging-catalyst-0.livepeer.monster"
74-
// This is useful for the player, because then it can stick to one node while sending analytics logs
75-
router.GET("/analytics/log", withLogging(withCORS(geoHandlers.RedirectConstPathHandler())))
76-
router.HEAD("/analytics/log", withLogging(withCORS(geoHandlers.RedirectConstPathHandler())))
77-
}
66+
logProcessor := analytics.NewLogProcessor(cli.KafkaBootstrapServers, cli.KafkaUser, cli.KafkaPassword, cli.AnalyticsKafkaTopic)
67+
68+
analyticsApiHandlers := handlers.NewAnalyticsHandlersCollection(mapic, lapi, logProcessor)
69+
router.POST("/analytics/log", withCORS(analyticsApiHandlers.Log()))
70+
// Redirect GET /analytics/log to the specific catalyst node, e.g. "mdw-staging-staging-catalyst-0.livepeer.monster"
71+
// This is useful for the player, because then it can stick to one node while sending analytics logs
72+
router.GET("/analytics/log", withLogging(withCORS(geoHandlers.RedirectConstPathHandler())))
73+
router.HEAD("/analytics/log", withLogging(withCORS(geoHandlers.RedirectConstPathHandler())))
7874
}
7975

8076
// Playback endpoint

api/http_internal.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato
7777
eventsHandler := handlers.NewEventsHandlersCollection(c, mapic, bal, eventsEndpoint)
7878
ffmpegSegmentingHandlers := &ffmpeg.HandlersCollection{VODEngine: vodEngine}
7979
accessControlHandlers := accesscontrol.NewAccessControlHandlersCollection(cli, mapic)
80-
analyticsHandlers := analytics.NewAnalyticsHandler(metricsDB)
80+
analyticsHandlers := analytics.NewAnalyticsHandler(cli, metricsDB)
8181
encryptionHandlers := accesscontrol.NewEncryptionHandlersCollection(cli, spkiPublicKey)
8282
adminHandlers := &admin.AdminHandlersCollection{Cluster: c}
8383
mistCallbackHandlers := misttriggers.NewMistCallbackHandlersCollection(cli, broker)

config/cli.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ type Cli struct {
7373
KafkaUser string
7474
KafkaPassword string
7575
AnalyticsKafkaTopic string
76+
UserEndKafkaTopic string
7677
SerfMembersEndpoint string
7778
EventsEndpoint string
7879
CatalystApiURL string

handlers/analytics/kafka.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package analytics
2+
3+
import (
4+
"context"
5+
"crypto/tls"
6+
7+
"github.com/golang/glog"
8+
"github.com/livepeer/catalyst-api/metrics"
9+
"github.com/segmentio/kafka-go"
10+
"github.com/segmentio/kafka-go/sasl/plain"
11+
)
12+
13+
func sendWithRetries(writer *kafka.Writer, msgs []kafka.Message) {
14+
// We retry sending messages to Kafka in case of a failure
15+
// We don't use any backoff, because the number of events are filling up very quickly, so in case of a failure
16+
// it's better to lose events than fill up the memory and crash the whole catalyst-api
17+
kafkaWriteRetries := 3
18+
var err error
19+
for i := 0; i < kafkaWriteRetries; i++ {
20+
err = writer.WriteMessages(context.Background(), msgs...)
21+
if err == nil {
22+
return
23+
} else {
24+
glog.Warningf("error while sending analytics log to Kafka, retrying, try=%d, err=%v", i, err)
25+
}
26+
}
27+
metrics.Metrics.AnalyticsMetrics.LogProcessorWriteErrors.Inc()
28+
glog.Errorf("error while sending events to Kafka, the events are lost, err=%d", err)
29+
}
30+
31+
func logWriteMetrics(writer *kafka.Writer) {
32+
stats := writer.Stats()
33+
metrics.Metrics.AnalyticsMetrics.KafkaWriteErrors.Add(float64(stats.Errors))
34+
metrics.Metrics.AnalyticsMetrics.KafkaWriteMessages.Add(float64(stats.Messages))
35+
metrics.Metrics.AnalyticsMetrics.KafkaWriteAvgTime.Observe(stats.WriteTime.Avg.Seconds())
36+
metrics.Metrics.AnalyticsMetrics.KafkaWriteRetries.Add(float64(stats.Retries))
37+
}
38+
39+
func newWriter(bootstrapServers, user, password, topic string) *kafka.Writer {
40+
dialer := &kafka.Dialer{
41+
Timeout: kafkaRequestTimeout,
42+
SASLMechanism: plain.Mechanism{
43+
Username: user,
44+
Password: password,
45+
},
46+
DualStack: true,
47+
TLS: &tls.Config{
48+
MinVersion: tls.VersionTLS12,
49+
},
50+
}
51+
52+
// Create a new Kafka writer
53+
return kafka.NewWriter(kafka.WriterConfig{
54+
Brokers: []string{bootstrapServers},
55+
Topic: topic,
56+
Balancer: kafka.CRC32Balancer{},
57+
Dialer: dialer,
58+
})
59+
}

handlers/analytics/log_processor.go

Lines changed: 8 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,17 @@
11
package analytics
22

33
import (
4-
"context"
5-
"crypto/tls"
64
"encoding/json"
75
"time"
86

97
"github.com/golang/glog"
108
"github.com/livepeer/catalyst-api/metrics"
119
"github.com/segmentio/kafka-go"
12-
"github.com/segmentio/kafka-go/sasl/plain"
1310
)
1411

1512
const (
16-
KafkaBatchInterval = 1 * time.Second
17-
KafkaRequestTimeout = 60 * time.Second
13+
kafkaBatchInterval = 1 * time.Second
14+
kafkaRequestTimeout = 60 * time.Second
1815
)
1916

2017
type ILogProcessor interface {
@@ -95,37 +92,18 @@ type KafkaKey struct {
9592
EventType string `json:"event_type"`
9693
}
9794

98-
func NewLogProcessor(bootstrapServers, user, password, topic string) (*LogProcessor, error) {
99-
dialer := &kafka.Dialer{
100-
Timeout: KafkaRequestTimeout,
101-
SASLMechanism: plain.Mechanism{
102-
Username: user,
103-
Password: password,
104-
},
105-
DualStack: true,
106-
TLS: &tls.Config{
107-
MinVersion: tls.VersionTLS12,
108-
},
109-
}
110-
111-
// Create a new Kafka writer
112-
writer := kafka.NewWriter(kafka.WriterConfig{
113-
Brokers: []string{bootstrapServers},
114-
Topic: topic,
115-
Balancer: kafka.CRC32Balancer{},
116-
Dialer: dialer,
117-
})
118-
95+
func NewLogProcessor(bootstrapServers, user, password, topic string) *LogProcessor {
96+
writer := newWriter(bootstrapServers, user, password, topic)
11997
return &LogProcessor{
12098
logs: []LogData{},
12199
writer: writer,
122100
topic: topic,
123-
}, nil
101+
}
124102
}
125103

126104
// Start starts LogProcessor which sends events to Kafka in batches.
127105
func (lp *LogProcessor) Start(ch chan LogData) {
128-
t := time.NewTicker(KafkaBatchInterval)
106+
t := time.NewTicker(kafkaBatchInterval)
129107
go func() {
130108
for {
131109
select {
@@ -156,7 +134,7 @@ func updateMetrics(d LogData) {
156134
}
157135

158136
func (p *LogProcessor) sendEvents() {
159-
defer p.logWriteMetrics()
137+
defer logWriteMetrics(p.writer)
160138

161139
if len(p.logs) > 0 {
162140
glog.Infof("sending analytics logs, count=%d", len(p.logs))
@@ -184,27 +162,5 @@ func (p *LogProcessor) sendEvents() {
184162
}
185163
p.logs = []LogData{}
186164

187-
// We retry sending messages to Kafka in case of a failure
188-
// We don't use any backoff, because the number of events are filling up very quickly, so in case of a failure
189-
// it's better to lose analytics logs than fill up the memory and crash the whole catalyst-api
190-
kafkaWriteRetries := 3
191-
var err error
192-
for i := 0; i < kafkaWriteRetries; i++ {
193-
err = p.writer.WriteMessages(context.Background(), msgs...)
194-
if err == nil {
195-
return
196-
} else {
197-
glog.Warningf("error while sending analytics log to Kafka, retrying, try=%d, err=%v", i, err)
198-
}
199-
}
200-
metrics.Metrics.AnalyticsMetrics.LogProcessorWriteErrors.Inc()
201-
glog.Errorf("error while sending analytics log to Kafka, the analytics logs are lost, err=%d", err)
202-
}
203-
204-
func (p *LogProcessor) logWriteMetrics() {
205-
stats := p.writer.Stats()
206-
metrics.Metrics.AnalyticsMetrics.KafkaWriteErrors.Add(float64(stats.Errors))
207-
metrics.Metrics.AnalyticsMetrics.KafkaWriteMessages.Add(float64(stats.Messages))
208-
metrics.Metrics.AnalyticsMetrics.KafkaWriteAvgTime.Observe(stats.WriteTime.Avg.Seconds())
209-
metrics.Metrics.AnalyticsMetrics.KafkaWriteRetries.Add(float64(stats.Retries))
165+
sendWithRetries(p.writer, msgs)
210166
}

0 commit comments

Comments
 (0)