Skip to content

Commit f246a71

Browse files
committed
Add context to ProducerInterceptor
This change adds a context.Context argument to the ProducerInterceptor interface, and passes it between the pre- and post-Send interceptor methods. Having this makes it much easier to write useful interceptors that can integrate with common tracing SDKs like OpenTelemetry, as the context is the conventional method for propagating metadata vertically through a call stack. For an example of another library using a similar convention, see: https://github.com/jackc/pgx/blob/9ab9e3c40bbb33c6f37359c87508cbc6a9830ed6/tracer.go#L10 Fixes #443
1 parent 4e13822 commit f246a71

File tree

4 files changed

+80
-37
lines changed

4 files changed

+80
-37
lines changed

pulsar/internal/pulsartracing/producer_interceptor.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,20 @@ const toPrefix = "To__"
2929
type ProducerInterceptor struct {
3030
}
3131

32-
func (t *ProducerInterceptor) BeforeSend(producer pulsar.Producer, message *pulsar.ProducerMessage) {
32+
func (t *ProducerInterceptor) BeforeSend(
33+
ctx context.Context,
34+
producer pulsar.Producer,
35+
message *pulsar.ProducerMessage,
36+
) context.Context {
3337
buildAndInjectSpan(message, producer).Finish()
38+
return ctx
3439
}
3540

36-
func (t *ProducerInterceptor) OnSendAcknowledgement(producer pulsar.Producer,
37-
message *pulsar.ProducerMessage,
38-
msgID pulsar.MessageID) {
41+
func (t *ProducerInterceptor) OnSendAcknowledgement(
42+
_ context.Context,
43+
_ pulsar.Producer,
44+
_ *pulsar.ProducerMessage,
45+
_ pulsar.MessageID) {
3946
}
4047

4148
func buildAndInjectSpan(message *pulsar.ProducerMessage, producer pulsar.Producer) opentracing.Span {

pulsar/producer_interceptor.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,39 @@
1717

1818
package pulsar
1919

20+
import "context"
21+
2022
type ProducerInterceptor interface {
2123
// BeforeSend This is called before send the message to the brokers. This method is allowed to modify the
2224
// message.
23-
BeforeSend(producer Producer, message *ProducerMessage)
25+
BeforeSend(ctx context.Context, producer Producer, message *ProducerMessage) context.Context
2426

2527
// OnSendAcknowledgement This method is called when the message sent to the broker has been acknowledged,
2628
// or when sending the message fails.
27-
OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID)
29+
OnSendAcknowledgement(ctx context.Context, producer Producer, message *ProducerMessage, msgID MessageID)
2830
}
2931

3032
type ProducerInterceptors []ProducerInterceptor
3133

32-
func (x ProducerInterceptors) BeforeSend(producer Producer, message *ProducerMessage) {
34+
func (x ProducerInterceptors) BeforeSend(
35+
ctx context.Context,
36+
producer Producer,
37+
message *ProducerMessage,
38+
) context.Context {
3339
for i := range x {
34-
x[i].BeforeSend(producer, message)
40+
ctx = x[i].BeforeSend(ctx, producer, message)
3541
}
42+
return ctx
3643
}
3744

38-
func (x ProducerInterceptors) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) {
45+
func (x ProducerInterceptors) OnSendAcknowledgement(
46+
ctx context.Context,
47+
producer Producer,
48+
message *ProducerMessage,
49+
msgID MessageID,
50+
) {
3951
for i := range x {
40-
x[i].OnSendAcknowledgement(producer, message, msgID)
52+
x[i].OnSendAcknowledgement(ctx, producer, message, msgID)
4153
}
4254
}
4355

pulsar/producer_partition.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1001,7 +1001,7 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (Mes
10011001
isDone := uAtomic.NewBool(false)
10021002
doneCh := make(chan struct{})
10031003

1004-
p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) {
1004+
ctx = p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) {
10051005
if isDone.CAS(false, true) {
10061006
err = e
10071007
msgID = ID
@@ -1202,11 +1202,11 @@ func (p *partitionProducer) internalSendAsync(
12021202
msg *ProducerMessage,
12031203
callback func(MessageID, *ProducerMessage, error),
12041204
flushImmediately bool,
1205-
) {
1205+
) context.Context {
12061206
if err := p.validateMsg(msg); err != nil {
12071207
p.log.Error(err)
12081208
runCallback(callback, nil, msg, err)
1209-
return
1209+
return ctx
12101210
}
12111211

12121212
sr := sendRequestPool.Get().(*sendRequest)
@@ -1224,43 +1224,46 @@ func (p *partitionProducer) internalSendAsync(
12241224

12251225
if err := p.prepareTransaction(sr); err != nil {
12261226
sr.done(nil, err)
1227-
return
1227+
return ctx
12281228
}
12291229

12301230
if p.getProducerState() != producerReady {
12311231
sr.done(nil, ErrProducerClosed)
1232-
return
1232+
return ctx
12331233
}
12341234

1235-
p.options.Interceptors.BeforeSend(p, msg)
1235+
ctx = p.options.Interceptors.BeforeSend(ctx, p, msg)
1236+
sr.ctx = ctx
12361237

12371238
if err := p.updateSchema(sr); err != nil {
12381239
p.log.Error(err)
12391240
sr.done(nil, err)
1240-
return
1241+
return ctx
12411242
}
12421243

12431244
if err := p.updateUncompressedPayload(sr); err != nil {
12441245
p.log.Error(err)
12451246
sr.done(nil, err)
1246-
return
1247+
return ctx
12471248
}
12481249

12491250
p.updateMetaData(sr)
12501251

12511252
if err := p.updateChunkInfo(sr); err != nil {
12521253
p.log.Error(err)
12531254
sr.done(nil, err)
1254-
return
1255+
return ctx
12551256
}
12561257

12571258
if err := p.reserveResources(sr); err != nil {
12581259
p.log.Error(err)
12591260
sr.done(nil, err)
1260-
return
1261+
return ctx
12611262
}
12621263

12631264
p.dataChan <- sr
1265+
1266+
return ctx
12641267
}
12651268

12661269
func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) {
@@ -1505,7 +1508,7 @@ func (sr *sendRequest) done(msgID MessageID, err error) {
15051508

15061509
if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 {
15071510
if sr.producer.options.Interceptors != nil {
1508-
sr.producer.options.Interceptors.OnSendAcknowledgement(sr.producer, sr.msg, msgID)
1511+
sr.producer.options.Interceptors.OnSendAcknowledgement(sr.ctx, sr.producer, sr.msg, msgID)
15091512
}
15101513
}
15111514
}

pulsar/producer_test.go

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1479,23 +1479,43 @@ func TestProducuerSendFailOnInvalidKey(t *testing.T) {
14791479

14801480
type noopProduceInterceptor struct{}
14811481

1482-
func (noopProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) {}
1482+
func (noopProduceInterceptor) BeforeSend(ctx context.Context, _ Producer, _ *ProducerMessage) context.Context {
1483+
return ctx
1484+
}
14831485

1484-
func (noopProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) {
1486+
func (noopProduceInterceptor) OnSendAcknowledgement(_ context.Context, _ Producer, _ *ProducerMessage, _ MessageID) {
14851487
}
14861488

1487-
// copyPropertyIntercepotr copy all keys in message properties map and add a suffix
1488-
type metricProduceInterceptor struct {
1489-
sendn int
1490-
ackn int
1489+
type trackingProduceInterceptor struct {
1490+
sendn int
1491+
ackn int
1492+
maxDuration time.Duration
14911493
}
14921494

1493-
func (x *metricProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) {
1494-
x.sendn++
1495+
type beforeSendCtxKey struct{}
1496+
1497+
func (i *trackingProduceInterceptor) BeforeSend(ctx context.Context, _ Producer, msg *ProducerMessage) context.Context {
1498+
i.sendn++
1499+
ctx = context.WithValue(ctx, beforeSendCtxKey{}, time.Now())
1500+
return ctx
14951501
}
14961502

1497-
func (x *metricProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) {
1498-
x.ackn++
1503+
func (i *trackingProduceInterceptor) OnSendAcknowledgement(
1504+
ctx context.Context,
1505+
_ Producer,
1506+
_ *ProducerMessage,
1507+
_ MessageID,
1508+
) {
1509+
var dur time.Duration
1510+
if v := ctx.Value(beforeSendCtxKey{}); v != nil {
1511+
dur = time.Since(v.(time.Time))
1512+
}
1513+
1514+
if dur > i.maxDuration {
1515+
i.maxDuration = dur
1516+
}
1517+
1518+
i.ackn++
14991519
}
15001520

15011521
func TestProducerWithInterceptors(t *testing.T) {
@@ -1518,14 +1538,14 @@ func TestProducerWithInterceptors(t *testing.T) {
15181538
assert.Nil(t, err)
15191539
defer consumer.Close()
15201540

1521-
metric := &metricProduceInterceptor{}
1541+
interceptor := &trackingProduceInterceptor{}
15221542
// create producer
15231543
producer, err := client.CreateProducer(ProducerOptions{
15241544
Topic: topic,
15251545
DisableBatching: false,
15261546
Interceptors: ProducerInterceptors{
15271547
noopProduceInterceptor{},
1528-
metric,
1548+
interceptor,
15291549
},
15301550
})
15311551
assert.Nil(t, err)
@@ -1575,8 +1595,9 @@ func TestProducerWithInterceptors(t *testing.T) {
15751595
consumer.Ack(msg)
15761596
}
15771597

1578-
assert.Equal(t, 10, metric.sendn)
1579-
assert.Equal(t, 10, metric.ackn)
1598+
assert.Equal(t, 10, interceptor.sendn)
1599+
assert.Equal(t, 10, interceptor.ackn)
1600+
assert.NotZero(t, interceptor.maxDuration)
15801601
}
15811602

15821603
func TestProducerSendAfterClose(t *testing.T) {
@@ -1719,7 +1740,7 @@ func TestMultipleSchemaOfKeyBasedBatchProducerConsumer(t *testing.T) {
17191740
}
17201741
producer.Flush()
17211742

1722-
//// create consumer
1743+
// create consumer
17231744
consumer, err := client.Subscribe(ConsumerOptions{
17241745
Topic: topic,
17251746
SubscriptionName: "my-sub2",
@@ -1810,7 +1831,7 @@ func TestMultipleSchemaProducerConsumer(t *testing.T) {
18101831
}
18111832
producer.Flush()
18121833

1813-
//// create consumer
1834+
// create consumer
18141835
consumer, err := client.Subscribe(ConsumerOptions{
18151836
Topic: topic,
18161837
SubscriptionName: "my-sub2",

0 commit comments

Comments
 (0)