Skip to content

Commit 1b198c4

Browse files
authored
Merge pull request #102 from scribd/maksimt/SERF-3223/kafka-client
[SEF-3223] Add Kafka PubSub and Kafka transport clients
2 parents 2eb05ae + 5afd272 commit 1b198c4

14 files changed

+1924
-1
lines changed

go.mod

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ require (
88
github.com/DataDog/datadog-go v4.8.2+incompatible
99
github.com/aws/aws-sdk-go v1.34.28
1010
github.com/getsentry/sentry-go v0.12.0
11+
github.com/go-kit/kit v0.9.0
1112
github.com/google/uuid v1.6.0
1213
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
1314
github.com/magefile/mage v1.15.0
@@ -16,6 +17,7 @@ require (
1617
github.com/spf13/viper v1.10.1
1718
github.com/stretchr/testify v1.8.4
1819
github.com/twmb/franz-go v1.12.1
20+
github.com/twmb/franz-go/pkg/kmsg v1.4.0
1921
google.golang.org/grpc v1.60.1
2022
google.golang.org/protobuf v1.32.0
2123
gopkg.in/DataDog/dd-trace-go.v1 v1.47.0
@@ -38,6 +40,7 @@ require (
3840
github.com/dgraph-io/ristretto v0.1.0 // indirect
3941
github.com/dustin/go-humanize v1.0.0 // indirect
4042
github.com/fsnotify/fsnotify v1.5.1 // indirect
43+
github.com/go-logfmt/logfmt v0.6.0 // indirect
4144
github.com/go-sql-driver/mysql v1.7.0 // indirect
4245
github.com/golang/glog v1.1.2 // indirect
4346
github.com/golang/protobuf v1.5.3 // indirect
@@ -67,7 +70,6 @@ require (
6770
github.com/spf13/pflag v1.0.5 // indirect
6871
github.com/subosito/gotenv v1.2.0 // indirect
6972
github.com/tinylib/msgp v1.1.6 // indirect
70-
github.com/twmb/franz-go/pkg/kmsg v1.4.0 // indirect
7173
go4.org/intern v0.0.0-20211027215823-ae77deb06f29 // indirect
7274
go4.org/unsafe/assume-no-moving-gc v0.0.0-20220617031537-928513b29760 // indirect
7375
golang.org/x/net v0.20.0 // indirect

go.sum

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,16 @@ github.com/gin-gonic/gin v1.4.0/go.mod h1:OW2EZn3DO8Ln9oIKOvM++LBO+5UPHJJDH72/q/
7070
github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98=
7171
github.com/go-errors/errors v1.0.1 h1:LUHzmkK3GUKUrL/1gfBUxAHzcev3apQlezX/+O7ma6w=
7272
github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q=
73+
github.com/go-kit/kit v0.9.0 h1:wDJmvq38kDhkVxi50ni9ykkdUr1PKgqKOoi01fa0Mdk=
74+
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
75+
github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4=
76+
github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
7377
github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab/go.mod h1:/P9AEU963A2AYjv4d1V5eVL1CQbEJq6aCNHDDjibzu8=
7478
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
7579
github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc=
7680
github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
81+
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
82+
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
7783
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
7884
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo=
7985
github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=

pkg/pubsub/kafka/config.go

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"crypto/tls"
6+
"crypto/x509"
7+
"net"
8+
"time"
9+
10+
"github.com/aws/aws-sdk-go/aws/session"
11+
"github.com/aws/aws-sdk-go/service/sts"
12+
"github.com/twmb/franz-go/pkg/kgo"
13+
awssasl "github.com/twmb/franz-go/pkg/sasl/aws"
14+
"github.com/twmb/franz-go/pkg/sasl/plain"
15+
16+
sdklogger "github.com/scribd/go-sdk/pkg/logger"
17+
"github.com/scribd/go-sdk/pkg/pubsub"
18+
)
19+
20+
// Config provides a common configuration for Kafka PubSub clients.
21+
type Config struct {
22+
// Application name that will be used in a serviceName provided to tracer spans
23+
ApplicationName string
24+
// Kafka configuration provided by go-sdk
25+
KafkaConfig pubsub.Kafka
26+
// AWS session reference, it will be used in case AWS MSK IAM authentication mechanism is used
27+
AwsSession *session.Session
28+
// MsgHandler is a function that will be called when a message is received
29+
MsgHandler MsgHandler
30+
Logger sdklogger.Logger
31+
}
32+
33+
const tlsConnectionTimeout = 10 * time.Second
34+
35+
func newConfig(c Config, opts ...kgo.Opt) ([]kgo.Opt, error) {
36+
options := []kgo.Opt{
37+
kgo.SeedBrokers(c.KafkaConfig.BrokerUrls...),
38+
kgo.ClientID(c.KafkaConfig.ClientId),
39+
}
40+
41+
if c.KafkaConfig.SASL.Enabled {
42+
switch c.KafkaConfig.SASLMechanism() {
43+
case pubsub.Plain:
44+
options = append(options, getPlainSaslOption(c.KafkaConfig.SASL))
45+
case pubsub.AWSMskIam:
46+
options = append(options, getAwsMskIamSaslOption(c.KafkaConfig.SASL.AWSMskIam, c.AwsSession))
47+
}
48+
}
49+
50+
if c.KafkaConfig.TLS.Enabled || c.KafkaConfig.SecurityProtocol == "ssl" {
51+
var caCertPool *x509.CertPool
52+
53+
if c.KafkaConfig.TLS.Ca != "" {
54+
caCertPool = x509.NewCertPool()
55+
caCertPool.AppendCertsFromPEM([]byte(c.KafkaConfig.TLS.Ca))
56+
}
57+
58+
var certificates []tls.Certificate
59+
if c.KafkaConfig.TLS.Cert != "" && c.KafkaConfig.TLS.CertKey != "" {
60+
cert, err := tls.X509KeyPair([]byte(c.KafkaConfig.TLS.Cert), []byte(c.KafkaConfig.TLS.CertKey))
61+
if err != nil {
62+
return nil, err
63+
}
64+
certificates = []tls.Certificate{cert}
65+
}
66+
67+
if c.KafkaConfig.Cert != "" && c.KafkaConfig.CertKey != "" {
68+
cert, err := tls.X509KeyPair([]byte(c.KafkaConfig.Cert), []byte(c.KafkaConfig.CertKey))
69+
if err != nil {
70+
return nil, err
71+
}
72+
certificates = []tls.Certificate{cert}
73+
}
74+
75+
var skipTLSVerify bool
76+
if c.KafkaConfig.TLS.InsecureSkipVerify || !c.KafkaConfig.SSLVerificationEnabled {
77+
skipTLSVerify = true
78+
}
79+
80+
tlsDialer := &tls.Dialer{
81+
NetDialer: &net.Dialer{Timeout: tlsConnectionTimeout},
82+
Config: &tls.Config{
83+
InsecureSkipVerify: skipTLSVerify,
84+
Certificates: certificates,
85+
RootCAs: caCertPool,
86+
},
87+
}
88+
89+
options = append(options, kgo.Dialer(tlsDialer.DialContext))
90+
}
91+
92+
options = append(options, opts...)
93+
94+
return options, nil
95+
}
96+
97+
func getPlainSaslOption(saslConf pubsub.SASL) kgo.Opt {
98+
return kgo.SASL(plain.Auth{
99+
User: saslConf.Username,
100+
Pass: saslConf.Password,
101+
}.AsMechanism())
102+
}
103+
104+
func getAwsMskIamSaslOption(iamConf pubsub.SASLAwsMskIam, s *session.Session) kgo.Opt {
105+
var opt kgo.Opt
106+
107+
// no AWS session provided
108+
if s == nil {
109+
opt = kgo.SASL(awssasl.Auth{
110+
AccessKey: iamConf.AccessKey,
111+
SecretKey: iamConf.SecretKey,
112+
SessionToken: iamConf.SessionToken,
113+
UserAgent: iamConf.UserAgent,
114+
}.AsManagedStreamingIAMMechanism())
115+
} else {
116+
opt = kgo.SASL(
117+
awssasl.ManagedStreamingIAM(func(ctx context.Context) (awssasl.Auth, error) {
118+
// If assumable role is not provided, we try to get credentials from the provided AWS session
119+
if iamConf.AssumableRole == "" {
120+
val, err := s.Config.Credentials.Get()
121+
if err != nil {
122+
return awssasl.Auth{}, err
123+
}
124+
125+
return awssasl.Auth{
126+
AccessKey: val.AccessKeyID,
127+
SecretKey: val.SecretAccessKey,
128+
SessionToken: val.SessionToken,
129+
UserAgent: iamConf.UserAgent,
130+
}, nil
131+
}
132+
133+
svc := sts.New(s)
134+
135+
res, stsErr := svc.AssumeRole(&sts.AssumeRoleInput{
136+
RoleArn: &iamConf.AssumableRole,
137+
RoleSessionName: &iamConf.SessionName,
138+
})
139+
if stsErr != nil {
140+
return awssasl.Auth{}, stsErr
141+
}
142+
143+
return awssasl.Auth{
144+
AccessKey: *res.Credentials.AccessKeyId,
145+
SecretKey: *res.Credentials.SecretAccessKey,
146+
SessionToken: *res.Credentials.SessionToken,
147+
UserAgent: iamConf.UserAgent,
148+
}, nil
149+
}),
150+
)
151+
}
152+
153+
return opt
154+
}

pkg/pubsub/kafka/partitionconsumer.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
6+
"github.com/twmb/franz-go/pkg/kgo"
7+
8+
sdkkafka "github.com/scribd/go-sdk/pkg/instrumentation/kafka"
9+
sdklogger "github.com/scribd/go-sdk/pkg/logger"
10+
)
11+
12+
type pconsumer struct {
13+
pool *pool
14+
15+
quit chan struct{}
16+
done chan struct{}
17+
recs chan *sdkkafka.FetchPartition
18+
}
19+
20+
func (pc *pconsumer) consume(cl *kgo.Client, logger sdklogger.Logger, shouldCommit bool, handler func(*kgo.Record)) {
21+
defer close(pc.done)
22+
23+
for {
24+
select {
25+
case <-pc.quit:
26+
return
27+
case p := <-pc.recs:
28+
p.EachRecord(func(rec *kgo.Record) {
29+
pc.pool.Schedule(func() {
30+
defer p.ConsumeRecord(rec)
31+
32+
handler(rec)
33+
})
34+
})
35+
if shouldCommit {
36+
if err := cl.CommitRecords(context.Background(), p.Records...); err != nil {
37+
logger.WithError(err).Errorf("Partition consumer failed to commit records")
38+
}
39+
}
40+
}
41+
}
42+
}

pkg/pubsub/kafka/pool.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package kafka
2+
3+
type pool struct {
4+
sem chan struct{}
5+
work chan func()
6+
}
7+
8+
func newPool(size int) *pool {
9+
p := &pool{
10+
sem: make(chan struct{}, size),
11+
work: make(chan func()),
12+
}
13+
14+
return p
15+
}
16+
17+
func (p *pool) Schedule(task func()) {
18+
select {
19+
case p.work <- task:
20+
return
21+
case p.sem <- struct{}{}:
22+
go p.worker(task)
23+
}
24+
}
25+
26+
func (p *pool) worker(task func()) {
27+
defer func() { <-p.sem }()
28+
29+
for {
30+
task()
31+
task = <-p.work
32+
}
33+
}

pkg/pubsub/kafka/publisher.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/twmb/franz-go/pkg/kgo"
9+
10+
sdkkafka "github.com/scribd/go-sdk/pkg/instrumentation/kafka"
11+
)
12+
13+
type (
14+
Publisher struct {
15+
producer *sdkkafka.Client
16+
}
17+
)
18+
19+
const (
20+
defaultFlushTimeout = time.Second * 10
21+
22+
publisherServiceNameSuffix = "pubsub-publisher"
23+
)
24+
25+
// NewPublisher is a tiny wrapper around the go-sdk kafka.Client and provides API to Publish kafka messages.
26+
func NewPublisher(c Config, opts ...kgo.Opt) (*Publisher, error) {
27+
serviceName := fmt.Sprintf("%s-%s", c.ApplicationName, publisherServiceNameSuffix)
28+
29+
cfg, err := newConfig(c, opts...)
30+
if err != nil {
31+
return nil, err
32+
}
33+
34+
cfg = append(cfg, []kgo.Opt{
35+
kgo.ProduceRequestTimeout(c.KafkaConfig.Publisher.WriteTimeout),
36+
kgo.RecordRetries(c.KafkaConfig.Publisher.MaxAttempts),
37+
}...)
38+
39+
producer, err := sdkkafka.NewClient(cfg, sdkkafka.WithServiceName(serviceName))
40+
if err != nil {
41+
return nil, err
42+
}
43+
44+
return &Publisher{producer: producer}, nil
45+
}
46+
47+
// Publish publishes kgo.Record message.
48+
func (p *Publisher) Publish(ctx context.Context, rec *kgo.Record, fn func(record *kgo.Record, err error)) {
49+
p.producer.Produce(ctx, rec, fn)
50+
}
51+
52+
// Produce is an alias to Publish to satisfy kafka go-kit transport.
53+
func (p *Publisher) Produce(ctx context.Context, rec *kgo.Record, fn func(record *kgo.Record, err error)) {
54+
p.Publish(ctx, rec, fn)
55+
}
56+
57+
// ProduceSync publishes kgo.Record messages synchronously.
58+
func (p *Publisher) ProduceSync(ctx context.Context, rs ...*kgo.Record) kgo.ProduceResults {
59+
return p.producer.ProduceSync(ctx, rs...)
60+
}
61+
62+
// GetKafkaProducer returns underlying kafka.Producer for fine-grained tuning purposes.
63+
func (p *Publisher) GetKafkaProducer() *sdkkafka.Client {
64+
return p.producer
65+
}
66+
67+
// Stop flushes and waits for outstanding messages and requests to complete delivery.
68+
// It also closes a Producer instance.
69+
func (p *Publisher) Stop(ctx context.Context) error {
70+
if _, deadlineSet := ctx.Deadline(); !deadlineSet {
71+
timeoutCtx, cancel := context.WithTimeout(ctx, defaultFlushTimeout)
72+
defer cancel()
73+
74+
ctx = timeoutCtx
75+
}
76+
77+
err := p.producer.Flush(ctx)
78+
if err != nil {
79+
return err
80+
}
81+
82+
p.producer.Close()
83+
84+
return nil
85+
}

0 commit comments

Comments
 (0)