Skip to content

Commit 5afd272

Browse files
committed
feat: Add go-kit Kafka transport
1 parent eba9a3f commit 5afd272

File tree

6 files changed

+983
-0
lines changed

6 files changed

+983
-0
lines changed

pkg/transport/kafka/encode_decode.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
6+
"github.com/twmb/franz-go/pkg/kgo"
7+
)
8+
9+
// DecodeRequestFunc extracts a user-domain request object from
10+
// an Kafka message. It is designed to be used in Kafka Subscribers.
11+
type DecodeRequestFunc func(ctx context.Context, msg *kgo.Record) (request interface{}, err error)
12+
13+
// EncodeRequestFunc encodes the passed request object into
14+
// an Kafka message object. It is designed to be used in Kafka Publishers.
15+
type EncodeRequestFunc func(context.Context, *kgo.Record, interface{}) error
16+
17+
// EncodeResponseFunc encodes the passed response object into
18+
// a Kafka message object. It is designed to be used in Kafka Subscribers.
19+
type EncodeResponseFunc func(context.Context, *kgo.Record, interface{}) error
20+
21+
// DecodeResponseFunc extracts a user-domain response object from kafka
22+
// response object. It's designed to be used in kafka publisher, for publisher-side
23+
// endpoints. One straightforward DecodeResponseFunc could be something that
24+
// JSON decodes from the response payload to the concrete response type.
25+
type DecodeResponseFunc func(context.Context, *kgo.Record) (response interface{}, err error)

pkg/transport/kafka/publisher.go

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"time"
7+
8+
"github.com/go-kit/kit/endpoint"
9+
"github.com/twmb/franz-go/pkg/kgo"
10+
)
11+
12+
const (
13+
defaultPublisherTimeout = 10 * time.Second
14+
)
15+
16+
// Publisher wraps single Kafka topic for message publishing
17+
// and implements endpoint.Endpoint.
18+
type Publisher struct {
19+
handler Handler
20+
topic string
21+
enc EncodeRequestFunc
22+
dec DecodeResponseFunc
23+
before []RequestFunc
24+
after []PublisherResponseFunc
25+
deliverer Deliverer
26+
timeout time.Duration
27+
}
28+
29+
// NewPublisher constructs a new publisher for a single Kafka topic,
30+
// which implements endpoint.Endpoint.
31+
func NewPublisher(
32+
handler Handler,
33+
topic string,
34+
enc EncodeRequestFunc,
35+
dec DecodeResponseFunc,
36+
options ...PublisherOption,
37+
) *Publisher {
38+
p := &Publisher{
39+
handler: handler,
40+
topic: topic,
41+
deliverer: SyncDeliverer,
42+
enc: enc,
43+
dec: dec,
44+
timeout: defaultPublisherTimeout,
45+
}
46+
for _, opt := range options {
47+
opt(p)
48+
}
49+
50+
return p
51+
}
52+
53+
// PublisherOption sets an optional parameter for publishers.
54+
type PublisherOption func(publisher *Publisher)
55+
56+
// PublisherBefore sets the RequestFuncs that are applied to the outgoing publisher
57+
// request before it's invoked.
58+
func PublisherBefore(before ...RequestFunc) PublisherOption {
59+
return func(p *Publisher) {
60+
p.before = append(p.before, before...)
61+
}
62+
}
63+
64+
// PublisherAfter adds one or more PublisherResponseFuncs, which are applied to the
65+
// context after successful message publishing.
66+
// This is useful for context-manipulation operations.
67+
func PublisherAfter(after ...PublisherResponseFunc) PublisherOption {
68+
return func(p *Publisher) {
69+
p.after = append(p.after, after...)
70+
}
71+
}
72+
73+
// PublisherDeliverer sets the deliverer function that the Publisher invokes.
74+
func PublisherDeliverer(deliverer Deliverer) PublisherOption {
75+
return func(p *Publisher) { p.deliverer = deliverer }
76+
}
77+
78+
// PublisherTimeout sets the available timeout for a kafka request.
79+
func PublisherTimeout(timeout time.Duration) PublisherOption {
80+
return func(p *Publisher) { p.timeout = timeout }
81+
}
82+
83+
// Endpoint returns a usable endpoint that invokes message publishing.
84+
func (p Publisher) Endpoint() endpoint.Endpoint {
85+
return func(ctx context.Context, request interface{}) (interface{}, error) {
86+
ctx, cancel := context.WithTimeout(ctx, p.timeout)
87+
defer cancel()
88+
89+
msg := &kgo.Record{
90+
Topic: p.topic,
91+
}
92+
93+
if err := p.enc(ctx, msg, request); err != nil {
94+
return nil, err
95+
}
96+
97+
for _, f := range p.before {
98+
ctx = f(ctx, msg)
99+
}
100+
101+
event, err := p.deliverer(ctx, p, msg)
102+
if err != nil {
103+
return nil, err
104+
}
105+
106+
for _, f := range p.after {
107+
ctx = f(ctx, event)
108+
}
109+
110+
response, err := p.dec(ctx, event)
111+
if err != nil {
112+
return nil, err
113+
}
114+
115+
return response, nil
116+
}
117+
}
118+
119+
// Deliverer is invoked by the Publisher to publish the specified Message, and to
120+
// retrieve the appropriate response Event object.
121+
type Deliverer func(
122+
context.Context,
123+
Publisher,
124+
*kgo.Record,
125+
) (*kgo.Record, error)
126+
127+
// SyncDeliverer is a deliverer that publishes the specified message
128+
// and returns the first object.
129+
// If the context times out while waiting for a reply, an error will be returned.
130+
func SyncDeliverer(ctx context.Context, pub Publisher, msg *kgo.Record) (*kgo.Record, error) {
131+
results := pub.handler.ProduceSync(ctx, msg)
132+
133+
if len(results) > 0 && results[0].Err != nil {
134+
return nil, results[0].Err
135+
}
136+
137+
return results[0].Record, nil
138+
}
139+
140+
// AsyncDeliverer delivers the supplied message and
141+
// returns a nil response.
142+
//
143+
// When using this deliverer please ensure that the supplied DecodeResponseFunc and
144+
// PublisherResponseFunc are able to handle nil-type responses.
145+
//
146+
// AsyncDeliverer will produce the message with the context detached due to the fact that actual
147+
// message producing is called asynchronously (another goroutine) and at that time original context might be
148+
// already canceled causing the producer to fail. The detached context will include values attached to the original
149+
// context, but deadline and cancel will be reset. To provide a context for asynchronous deliverer please
150+
// use AsyncDelivererCtx function instead.
151+
func AsyncDeliverer(ctx context.Context, pub Publisher, msg *kgo.Record) (*kgo.Record, error) {
152+
pub.handler.Produce(detach{ctx: ctx}, msg, nil)
153+
154+
return nil, nil
155+
}
156+
157+
// AsyncDelivererCtx delivers the supplied message and
158+
// returns a nil response.
159+
//
160+
// When using this deliverer please ensure that the supplied DecodeResponseFunc and
161+
// PublisherResponseFunc are able to handle nil-type responses.
162+
func AsyncDelivererCtx(ctx context.Context, pub Publisher, msg *kgo.Record) (*kgo.Record, error) {
163+
pub.handler.Produce(ctx, msg, nil)
164+
165+
return nil, nil
166+
}
167+
168+
// EncodeJSONRequest is an EncodeRequestFunc that serializes the request as a
169+
// JSON object to the Message value.
170+
// Many services can use it as a sensible default.
171+
func EncodeJSONRequest(_ context.Context, msg *kgo.Record, request interface{}) error {
172+
rawJSON, err := json.Marshal(request)
173+
if err != nil {
174+
return err
175+
}
176+
177+
msg.Value = rawJSON
178+
179+
return nil
180+
}
181+
182+
// Handler is a handler interface to make testing possible.
183+
// It is highly recommended to use *kafka.Producer as the interface implementation.
184+
type Handler interface {
185+
Produce(ctx context.Context, rec *kgo.Record, fn func(record *kgo.Record, err error))
186+
ProduceSync(ctx context.Context, rs ...*kgo.Record) kgo.ProduceResults
187+
}
188+
189+
type detach struct {
190+
ctx context.Context
191+
}
192+
193+
func (d detach) Deadline() (time.Time, bool) {
194+
return time.Time{}, false
195+
}
196+
func (d detach) Done() <-chan struct{} {
197+
return nil
198+
}
199+
func (d detach) Err() error {
200+
return nil
201+
}
202+
203+
func (d detach) Value(key interface{}) interface{} {
204+
return d.ctx.Value(key)
205+
}

0 commit comments

Comments
 (0)