diff --git a/pulsar/producer_interceptor.go b/pulsar/producer_interceptor.go index cb2cc152f5..e545cf32f6 100644 --- a/pulsar/producer_interceptor.go +++ b/pulsar/producer_interceptor.go @@ -17,9 +17,11 @@ package pulsar +import "context" + type ProducerInterceptor interface { // BeforeSend This is called before send the message to the brokers. This method is allowed to modify the - BeforeSend(producer Producer, message *ProducerMessage) + BeforeSend(ctx context.Context, producer Producer, message *ProducerMessage) // OnSendAcknowledgement This method is called when the message sent to the broker has been acknowledged, // or when sending the message fails. @@ -28,9 +30,9 @@ type ProducerInterceptor interface { type ProducerInterceptors []ProducerInterceptor -func (x ProducerInterceptors) BeforeSend(producer Producer, message *ProducerMessage) { +func (x ProducerInterceptors) BeforeSend(ctx context.Context, producer Producer, message *ProducerMessage) { for i := range x { - x[i].BeforeSend(producer, message) + x[i].BeforeSend(ctx, producer, message) } } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index e273021e8f..580c12e40d 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -748,7 +748,8 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer flushImmediately: flushImmediately, publishTime: time.Now(), } - p.options.Interceptors.BeforeSend(p, msg) + // call interceptor with context parameter + p.options.Interceptors.BeforeSend(ctx, p, msg) if p.options.DisableBlockIfQueueFull { if !p.publishSemaphore.TryAcquire() { diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index f914017316..d329bf3bd0 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -1107,7 +1107,8 @@ func TestProducuerSendFailOnInvalidKey(t *testing.T) { type noopProduceInterceptor struct{} -func (noopProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) {} +func (noopProduceInterceptor) BeforeSend(ctx context.Context, producer Producer, message *ProducerMessage) { +} func (noopProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) { } @@ -1118,7 +1119,7 @@ type metricProduceInterceptor struct { ackn int } -func (x *metricProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) { +func (x *metricProduceInterceptor) BeforeSend(ctx context.Context, producer Producer, message *ProducerMessage) { x.sendn++ }