diff --git a/pulsar/reader.go b/pulsar/reader.go index 5e1a73b988..4bd64152f0 100644 --- a/pulsar/reader.go +++ b/pulsar/reader.go @@ -93,6 +93,9 @@ type ReaderOptions struct { // allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage) BackoffPolicy internal.BackoffPolicy + // ReaderInterceptors call before messages consumed. + ReaderInterceptors []ReaderInterceptor + // MaxPendingChunkedMessage sets the maximum pending chunked messages. (default: 100) MaxPendingChunkedMessage int diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index ffc92dedde..a514370667 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -114,6 +114,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { decryption: options.Decryption, schema: options.Schema, backoffPolicy: options.BackoffPolicy, + interceptors: transformReaderInterceptors(options.ReaderInterceptors), maxPendingChunkedMessage: options.MaxPendingChunkedMessage, expireTimeOfIncompleteChunk: options.ExpireTimeOfIncompleteChunk, autoAckIncompleteChunk: options.AutoAckIncompleteChunk, diff --git a/pulsar/reader_interceptor.go b/pulsar/reader_interceptor.go new file mode 100644 index 0000000000..58077fff5a --- /dev/null +++ b/pulsar/reader_interceptor.go @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +type ReaderInterceptor interface { + // BeforeRead called after messages received + BeforeRead(consumerMessage ConsumerMessage) +} + +func transformReaderInterceptors(readerInterceptors []ReaderInterceptor) ConsumerInterceptors { + if len(readerInterceptors) <= 0 { + return defaultConsumerInterceptors + } + + consumerInterceptors := make(ConsumerInterceptors, len(readerInterceptors)) + for i := range readerInterceptors { + consumerInterceptors[i] = consumerInterceptor{readerInterceptors[i]} + } + return consumerInterceptors +} + +type consumerInterceptor struct { + readerInterceptor ReaderInterceptor +} + +func (c consumerInterceptor) BeforeConsume(message ConsumerMessage) { + c.readerInterceptor.BeforeRead(message) +} + +func (c consumerInterceptor) OnAcknowledge(Consumer, MessageID) { +} + +func (c consumerInterceptor) OnNegativeAcksSend(Consumer, []MessageID) { +} diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index ec10f8f162..507061063f 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -20,6 +20,7 @@ package pulsar import ( "context" "fmt" + "sync/atomic" "testing" "time" @@ -943,3 +944,124 @@ func TestReaderGetLastMessageID(t *testing.T) { assert.Equal(t, lastMsgID.LedgerID(), getLastMessageID.LedgerID()) assert.Equal(t, lastMsgID.EntryID(), getLastMessageID.EntryID()) } + +var _ ReaderInterceptor = (*counterReaderInterceptor)(nil) + +type counterReaderInterceptor struct { + v *int32 +} + +func newInterceptor() *counterReaderInterceptor { + var v = int32(0) + return &counterReaderInterceptor{v: &v} +} + +func (c *counterReaderInterceptor) BeforeRead(_ ConsumerMessage) { + atomic.AddInt32(c.v, int32(1)) +} + +func TestSingleReaderInterceptor(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + ctx := context.Background() + schema := NewStringSchema(nil) + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: true, + Schema: schema, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + msgID, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + assert.NoError(t, err) + assert.NotNil(t, msgID) + } + + interceptor := newInterceptor() + r, err := client.CreateReader(ReaderOptions{ + Topic: topic, + ReaderInterceptors: []ReaderInterceptor{interceptor}, + StartMessageID: EarliestMessageID(), + }) + + assert.NoError(t, err) + assert.NotNil(t, r) + defer r.Close() + + for { + if r.HasNext() { + _, err := r.Next(ctx) + assert.NoError(t, err) + } else { + break + } + } + + assert.Equal(t, atomic.LoadInt32(interceptor.v), int32(10)) +} + +func TestMultiReaderInterceptors(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + ctx := context.Background() + schema := NewStringSchema(nil) + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: true, + Schema: schema, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + msgID, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + assert.NoError(t, err) + assert.NotNil(t, msgID) + } + + interceptor0 := newInterceptor() + interceptor1 := newInterceptor() + interceptor2 := newInterceptor() + r, err := client.CreateReader(ReaderOptions{ + Topic: topic, + ReaderInterceptors: []ReaderInterceptor{interceptor0, interceptor1, interceptor2}, + StartMessageID: EarliestMessageID(), + }) + + assert.NotNil(t, r) + assert.NoError(t, err) + defer r.Close() + + for { + if r.HasNext() { + _, err := r.Next(ctx) + assert.NoError(t, err) + } else { + break + } + } + + assert.Equal(t, atomic.LoadInt32(interceptor0.v), int32(10)) + assert.Equal(t, atomic.LoadInt32(interceptor1.v), int32(10)) + assert.Equal(t, atomic.LoadInt32(interceptor2.v), int32(10)) +}