Skip to content

Commit c06f577

Browse files
committed
Add context to ProducerInterceptor
This change adds a context.Context argument to the ProducerInterceptor interface, and passes it between the pre- and post-Send interceptor methods. Having this makes it much easier to write useful interceptors that can integrate with common tracing SDKs like OpenTelemetry, as the context is the conventional method for propagating metadata vertically through a call stack. For an example of another library using a similar convention, see: https://github.com/jackc/pgx/blob/9ab9e3c40bbb33c6f37359c87508cbc6a9830ed6/tracer.go#L10 Fixes #443
1 parent 062fefe commit c06f577

File tree

4 files changed

+66
-37
lines changed

4 files changed

+66
-37
lines changed

pulsar/internal/pulsartracing/producer_interceptor.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,20 @@ const toPrefix = "To__"
2929
type ProducerInterceptor struct {
3030
}
3131

32-
func (t *ProducerInterceptor) BeforeSend(producer pulsar.Producer, message *pulsar.ProducerMessage) {
32+
func (t *ProducerInterceptor) BeforeSend(
33+
ctx context.Context,
34+
producer pulsar.Producer,
35+
message *pulsar.ProducerMessage,
36+
) context.Context {
3337
buildAndInjectSpan(message, producer).Finish()
38+
return ctx
3439
}
3540

36-
func (t *ProducerInterceptor) OnSendAcknowledgement(producer pulsar.Producer,
37-
message *pulsar.ProducerMessage,
38-
msgID pulsar.MessageID) {
41+
func (t *ProducerInterceptor) OnSendAcknowledgement(
42+
_ context.Context,
43+
_ pulsar.Producer,
44+
_ *pulsar.ProducerMessage,
45+
_ pulsar.MessageID) {
3946
}
4047

4148
func buildAndInjectSpan(message *pulsar.ProducerMessage, producer pulsar.Producer) opentracing.Span {

pulsar/producer_interceptor.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,30 @@
1717

1818
package pulsar
1919

20+
import "context"
21+
2022
type ProducerInterceptor interface {
2123
// BeforeSend This is called before send the message to the brokers. This method is allowed to modify the
2224
// message.
23-
BeforeSend(producer Producer, message *ProducerMessage)
25+
BeforeSend(ctx context.Context, producer Producer, message *ProducerMessage) context.Context
2426

2527
// OnSendAcknowledgement This method is called when the message sent to the broker has been acknowledged,
2628
// or when sending the message fails.
27-
OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID)
29+
OnSendAcknowledgement(ctx context.Context, producer Producer, message *ProducerMessage, msgID MessageID)
2830
}
2931

3032
type ProducerInterceptors []ProducerInterceptor
3133

32-
func (x ProducerInterceptors) BeforeSend(producer Producer, message *ProducerMessage) {
34+
func (x ProducerInterceptors) BeforeSend(ctx context.Context, producer Producer, message *ProducerMessage) context.Context {
3335
for i := range x {
34-
x[i].BeforeSend(producer, message)
36+
ctx = x[i].BeforeSend(ctx, producer, message)
3537
}
38+
return ctx
3639
}
3740

38-
func (x ProducerInterceptors) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) {
41+
func (x ProducerInterceptors) OnSendAcknowledgement(ctx context.Context, producer Producer, message *ProducerMessage, msgID MessageID) {
3942
for i := range x {
40-
x[i].OnSendAcknowledgement(producer, message, msgID)
43+
x[i].OnSendAcknowledgement(ctx, producer, message, msgID)
4144
}
4245
}
4346

pulsar/producer_partition.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -994,7 +994,7 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (Mes
994994
isDone := uAtomic.NewBool(false)
995995
doneCh := make(chan struct{})
996996

997-
p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) {
997+
ctx = p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) {
998998
if isDone.CAS(false, true) {
999999
err = e
10001000
msgID = ID
@@ -1194,11 +1194,11 @@ func (p *partitionProducer) internalSendAsync(
11941194
msg *ProducerMessage,
11951195
callback func(MessageID, *ProducerMessage, error),
11961196
flushImmediately bool,
1197-
) {
1197+
) context.Context {
11981198
if err := p.validateMsg(msg); err != nil {
11991199
p.log.Error(err)
12001200
runCallback(callback, nil, msg, err)
1201-
return
1201+
return ctx
12021202
}
12031203

12041204
sr := sendRequestPool.Get().(*sendRequest)
@@ -1216,43 +1216,46 @@ func (p *partitionProducer) internalSendAsync(
12161216

12171217
if err := p.prepareTransaction(sr); err != nil {
12181218
sr.done(nil, err)
1219-
return
1219+
return ctx
12201220
}
12211221

12221222
if p.getProducerState() != producerReady {
12231223
sr.done(nil, errProducerClosed)
1224-
return
1224+
return ctx
12251225
}
12261226

1227-
p.options.Interceptors.BeforeSend(p, msg)
1227+
ctx = p.options.Interceptors.BeforeSend(ctx, p, msg)
1228+
sr.ctx = ctx
12281229

12291230
if err := p.updateSchema(sr); err != nil {
12301231
p.log.Error(err)
12311232
sr.done(nil, err)
1232-
return
1233+
return ctx
12331234
}
12341235

12351236
if err := p.updateUncompressedPayload(sr); err != nil {
12361237
p.log.Error(err)
12371238
sr.done(nil, err)
1238-
return
1239+
return ctx
12391240
}
12401241

12411242
p.updateMetaData(sr)
12421243

12431244
if err := p.updateChunkInfo(sr); err != nil {
12441245
p.log.Error(err)
12451246
sr.done(nil, err)
1246-
return
1247+
return ctx
12471248
}
12481249

12491250
if err := p.reserveResources(sr); err != nil {
12501251
p.log.Error(err)
12511252
sr.done(nil, err)
1252-
return
1253+
return ctx
12531254
}
12541255

12551256
p.dataChan <- sr
1257+
1258+
return ctx
12561259
}
12571260

12581261
func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) {
@@ -1497,7 +1500,7 @@ func (sr *sendRequest) done(msgID MessageID, err error) {
14971500

14981501
if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 {
14991502
if sr.producer.options.Interceptors != nil {
1500-
sr.producer.options.Interceptors.OnSendAcknowledgement(sr.producer, sr.msg, msgID)
1503+
sr.producer.options.Interceptors.OnSendAcknowledgement(sr.ctx, sr.producer, sr.msg, msgID)
15011504
}
15021505
}
15031506
}

pulsar/producer_test.go

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1480,23 +1480,38 @@ func TestProducuerSendFailOnInvalidKey(t *testing.T) {
14801480

14811481
type noopProduceInterceptor struct{}
14821482

1483-
func (noopProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) {}
1483+
func (noopProduceInterceptor) BeforeSend(ctx context.Context, _ Producer, _ *ProducerMessage) context.Context {
1484+
return ctx
1485+
}
14841486

1485-
func (noopProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) {
1487+
func (noopProduceInterceptor) OnSendAcknowledgement(_ context.Context, _ Producer, _ *ProducerMessage, _ MessageID) {
14861488
}
14871489

1488-
// copyPropertyIntercepotr copy all keys in message properties map and add a suffix
1489-
type metricProduceInterceptor struct {
1490-
sendn int
1491-
ackn int
1490+
type trackingProduceInterceptor struct {
1491+
sendn int
1492+
ackn int
1493+
maxDuration time.Duration
14921494
}
14931495

1494-
func (x *metricProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) {
1495-
x.sendn++
1496+
type beforeSendCtxKey struct{}
1497+
1498+
func (i *trackingProduceInterceptor) BeforeSend(ctx context.Context, _ Producer, msg *ProducerMessage) context.Context {
1499+
i.sendn++
1500+
ctx = context.WithValue(ctx, beforeSendCtxKey{}, time.Now())
1501+
return ctx
14961502
}
14971503

1498-
func (x *metricProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) {
1499-
x.ackn++
1504+
func (i *trackingProduceInterceptor) OnSendAcknowledgement(ctx context.Context, _ Producer, _ *ProducerMessage, _ MessageID) {
1505+
var dur time.Duration
1506+
if v := ctx.Value(beforeSendCtxKey{}); v != nil {
1507+
dur = time.Since(v.(time.Time))
1508+
}
1509+
1510+
if dur > i.maxDuration {
1511+
i.maxDuration = dur
1512+
}
1513+
1514+
i.ackn++
15001515
}
15011516

15021517
func TestProducerWithInterceptors(t *testing.T) {
@@ -1519,14 +1534,14 @@ func TestProducerWithInterceptors(t *testing.T) {
15191534
assert.Nil(t, err)
15201535
defer consumer.Close()
15211536

1522-
metric := &metricProduceInterceptor{}
1537+
interceptor := &trackingProduceInterceptor{}
15231538
// create producer
15241539
producer, err := client.CreateProducer(ProducerOptions{
15251540
Topic: topic,
15261541
DisableBatching: false,
15271542
Interceptors: ProducerInterceptors{
15281543
noopProduceInterceptor{},
1529-
metric,
1544+
interceptor,
15301545
},
15311546
})
15321547
assert.Nil(t, err)
@@ -1576,8 +1591,9 @@ func TestProducerWithInterceptors(t *testing.T) {
15761591
consumer.Ack(msg)
15771592
}
15781593

1579-
assert.Equal(t, 10, metric.sendn)
1580-
assert.Equal(t, 10, metric.ackn)
1594+
assert.Equal(t, 10, interceptor.sendn)
1595+
assert.Equal(t, 10, interceptor.ackn)
1596+
assert.NotZero(t, interceptor.maxDuration)
15811597
}
15821598

15831599
func TestProducerSendAfterClose(t *testing.T) {
@@ -1720,7 +1736,7 @@ func TestMultipleSchemaOfKeyBasedBatchProducerConsumer(t *testing.T) {
17201736
}
17211737
producer.Flush()
17221738

1723-
//// create consumer
1739+
// create consumer
17241740
consumer, err := client.Subscribe(ConsumerOptions{
17251741
Topic: topic,
17261742
SubscriptionName: "my-sub2",
@@ -1811,7 +1827,7 @@ func TestMultipleSchemaProducerConsumer(t *testing.T) {
18111827
}
18121828
producer.Flush()
18131829

1814-
//// create consumer
1830+
// create consumer
18151831
consumer, err := client.Subscribe(ConsumerOptions{
18161832
Topic: topic,
18171833
SubscriptionName: "my-sub2",

0 commit comments

Comments
 (0)