Skip to content

Commit fa2b263

Browse files
authored
Fix partition update failure causing existing producers to close (#1437)
1 parent 4ba6850 commit fa2b263

File tree

3 files changed

+96
-44
lines changed

3 files changed

+96
-44
lines changed

pulsar/message_chunking_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,7 @@ func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int) {
544544
Payload: []byte(fmt.Sprintf("chunk-%s-%d|", uuid, chunkID)),
545545
}
546546
wholePayload := msg.Payload
547-
producerImpl := p.(*producer).producers[0].(*partitionProducer)
547+
producerImpl := p.(*producer).getProducer(0).(*partitionProducer)
548548
mm := producerImpl.genMetadata(msg, len(wholePayload), time.Now())
549549
mm.Uuid = proto.String(uuid)
550550
mm.NumChunksFromMsg = proto.Int32(int32(totalChunks))

pulsar/producer_impl.go

Lines changed: 36 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"sync"
2424
"sync/atomic"
2525
"time"
26-
"unsafe"
2726

2827
"github.com/apache/pulsar-client-go/pulsar/crypto"
2928
"github.com/apache/pulsar-client-go/pulsar/internal"
@@ -48,13 +47,10 @@ const (
4847
)
4948

5049
type producer struct {
51-
sync.RWMutex
5250
client *client
5351
options *ProducerOptions
5452
topic string
55-
producers []Producer
56-
producersPtr unsafe.Pointer
57-
numPartitions uint32
53+
producers atomic.Value
5854
messageRouter func(*ProducerMessage, TopicMetadata) int
5955
closeOnce sync.Once
6056
stopDiscovery func()
@@ -195,10 +191,7 @@ func (p *producer) internalCreatePartitionsProducers() error {
195191
oldNumPartitions := 0
196192
newNumPartitions := len(partitions)
197193

198-
p.Lock()
199-
defer p.Unlock()
200-
201-
oldProducers := p.producers
194+
oldProducers := p.getProducers()
202195
oldNumPartitions = len(oldProducers)
203196

204197
if oldProducers != nil {
@@ -213,14 +206,14 @@ func (p *producer) internalCreatePartitionsProducers() error {
213206

214207
}
215208

216-
p.producers = make([]Producer, newNumPartitions)
209+
producers := make([]Producer, newNumPartitions)
217210

218211
// When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions,
219212
// we need to rebuild the cache of new producers, otherwise the array will be out of bounds.
220213
if oldProducers != nil && oldNumPartitions < newNumPartitions {
221214
// Copy over the existing consumer instances
222215
for i := 0; i < oldNumPartitions; i++ {
223-
p.producers[i] = oldProducers[i]
216+
producers[i] = oldProducers[i]
224217
}
225218
}
226219

@@ -251,20 +244,23 @@ func (p *producer) internalCreatePartitionsProducers() error {
251244
}(partitionIdx, partition)
252245
}
253246

247+
var newProducers []Producer
248+
254249
for i := 0; i < partitionsToAdd; i++ {
255250
pe, ok := <-c
256251
if ok {
257252
if pe.err != nil {
258253
err = pe.err
259254
} else {
260-
p.producers[pe.partition] = pe.prod
255+
producers[pe.partition] = pe.prod
256+
newProducers = append(newProducers, pe.prod)
261257
}
262258
}
263259
}
264260

265261
if err != nil {
266262
// Since there were some failures, cleanup all the partitions that succeeded in creating the producers
267-
for _, producer := range p.producers {
263+
for _, producer := range newProducers {
268264
if producer != nil {
269265
producer.Close()
270266
}
@@ -277,8 +273,7 @@ func (p *producer) internalCreatePartitionsProducers() error {
277273
} else {
278274
p.metrics.ProducersPartitions.Add(float64(partitionsToAdd))
279275
}
280-
atomic.StorePointer(&p.producersPtr, unsafe.Pointer(&p.producers))
281-
atomic.StoreUint32(&p.numPartitions, uint32(len(p.producers)))
276+
p.producers.Store(producers)
282277
return nil
283278
}
284279

@@ -287,14 +282,11 @@ func (p *producer) Topic() string {
287282
}
288283

289284
func (p *producer) Name() string {
290-
p.RLock()
291-
defer p.RUnlock()
292-
293-
return p.producers[0].Name()
285+
return p.getProducer(0).Name()
294286
}
295287

296288
func (p *producer) NumPartitions() uint32 {
297-
return atomic.LoadUint32(&p.numPartitions)
289+
return uint32(len(p.getProducers()))
298290
}
299291

300292
func (p *producer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) {
@@ -306,11 +298,11 @@ func (p *producer) SendAsync(ctx context.Context, msg *ProducerMessage,
306298
p.getPartition(msg).SendAsync(ctx, msg, callback)
307299
}
308300

309-
func (p *producer) getPartition(msg *ProducerMessage) Producer {
310-
// Since partitions can only increase, it's ok if the producers list
311-
// is updated in between. The numPartition is updated only after the list.
312-
partition := p.messageRouter(msg, p)
313-
producers := *(*[]Producer)(atomic.LoadPointer(&p.producersPtr))
301+
func (p *producer) getProducer(partition int) Producer {
302+
producers := p.getProducers()
303+
if len(producers) == 0 {
304+
panic("producer has not been initialized properly")
305+
}
314306
if partition >= len(producers) {
315307
// We read the old producers list while the count was already
316308
// updated
@@ -319,12 +311,23 @@ func (p *producer) getPartition(msg *ProducerMessage) Producer {
319311
return producers[partition]
320312
}
321313

322-
func (p *producer) LastSequenceID() int64 {
323-
p.RLock()
324-
defer p.RUnlock()
314+
func (p *producer) getProducers() []Producer {
315+
if producers := p.producers.Load(); producers != nil {
316+
return producers.([]Producer)
317+
}
318+
return []Producer{}
319+
}
320+
321+
func (p *producer) getPartition(msg *ProducerMessage) Producer {
322+
// Since partitions can only increase, it's ok if the producers list
323+
// is updated in between. The numPartition is updated only after the list.
324+
partition := p.messageRouter(msg, p)
325+
return p.getProducer(partition)
326+
}
325327

328+
func (p *producer) LastSequenceID() int64 {
326329
var maxSeq int64 = -1
327-
for _, pp := range p.producers {
330+
for _, pp := range p.getProducers() {
328331
s := pp.LastSequenceID()
329332
if s > maxSeq {
330333
maxSeq = s
@@ -338,10 +341,7 @@ func (p *producer) Flush() error {
338341
}
339342

340343
func (p *producer) FlushWithCtx(ctx context.Context) error {
341-
p.RLock()
342-
defer p.RUnlock()
343-
344-
for _, pp := range p.producers {
344+
for _, pp := range p.getProducers() {
345345
if err := pp.FlushWithCtx(ctx); err != nil {
346346
return err
347347
}
@@ -354,14 +354,12 @@ func (p *producer) Close() {
354354
p.closeOnce.Do(func() {
355355
p.stopDiscovery()
356356

357-
p.Lock()
358-
defer p.Unlock()
359-
360-
for _, pp := range p.producers {
357+
producers := p.getProducers()
358+
for _, pp := range producers {
361359
pp.Close()
362360
}
363361
p.client.handlers.Del(p)
364-
p.metrics.ProducersPartitions.Sub(float64(len(p.producers)))
362+
p.metrics.ProducersPartitions.Sub(float64(len(producers)))
365363
p.metrics.ProducersClosed.Inc()
366364
})
367365
}

pulsar/producer_test.go

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ import (
3030
"testing"
3131
"time"
3232

33+
"github.com/apache/pulsar-client-go/pulsaradmin"
34+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
35+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
3336
"github.com/stretchr/testify/require"
3437
"github.com/testcontainers/testcontainers-go"
3538
"github.com/testcontainers/testcontainers-go/wait"
@@ -1382,7 +1385,7 @@ func TestProducerWithBackoffPolicy(t *testing.T) {
13821385
assert.Nil(t, err)
13831386
defer _producer.Close()
13841387

1385-
partitionProducerImp := _producer.(*producer).producers[0].(*partitionProducer)
1388+
partitionProducerImp := _producer.(*producer).getProducer(0).(*partitionProducer)
13861389
// 1 s
13871390
startTime := time.Now()
13881391
partitionProducerImp.reconnectToBroker(nil)
@@ -2477,7 +2480,7 @@ func TestFailPendingMessageWithClose(t *testing.T) {
24772480
}
24782481
})
24792482
}
2480-
partitionProducerImp := testProducer.(*producer).producers[0].(*partitionProducer)
2483+
partitionProducerImp := testProducer.(*producer).getProducer(0).(*partitionProducer)
24812484
partitionProducerImp.pendingQueue.Put(&pendingItem{
24822485
buffer: buffersPool.GetBuffer(0),
24832486
})
@@ -2597,7 +2600,7 @@ func TestDisableReplication(t *testing.T) {
25972600
writtenBuffers: &writtenBuffers,
25982601
}
25992602

2600-
partitionProducerImp := testProducer.(*producer).producers[0].(*partitionProducer)
2603+
partitionProducerImp := testProducer.(*producer).getProducer(0).(*partitionProducer)
26012604
partitionProducerImp.pendingQueue = pqw
26022605

26032606
ID, err := testProducer.Send(context.Background(), &ProducerMessage{
@@ -2718,7 +2721,7 @@ func TestSelectConnectionForSameProducer(t *testing.T) {
27182721
assert.NoError(t, err)
27192722
defer _producer.Close()
27202723

2721-
partitionProducerImp := _producer.(*producer).producers[0].(*partitionProducer)
2724+
partitionProducerImp := _producer.(*producer).getProducer(0).(*partitionProducer)
27222725
conn := partitionProducerImp._getConn()
27232726

27242727
for i := 0; i < 5; i++ {
@@ -2762,7 +2765,7 @@ func TestSendBufferRetainWhenConnectionStuck(t *testing.T) {
27622765
Topic: topicName,
27632766
})
27642767
assert.NoError(t, err)
2765-
pp := p.(*producer).producers[0].(*partitionProducer)
2768+
pp := p.(*producer).getProducer(0).(*partitionProducer)
27662769

27672770
// Create a mock connection that tracks written buffers
27682771
conn := &mockConn{
@@ -2898,3 +2901,54 @@ func testSendAsyncCouldTimeoutWhileReconnecting(t *testing.T, isDisableBatching
28982901
}
28992902
close(finalErr)
29002903
}
2904+
2905+
type mockRPCClient struct {
2906+
internal.RPCClient
2907+
}
2908+
2909+
func (m *mockRPCClient) RequestOnCnx(_ internal.Connection, _ uint64, _ pb.BaseCommand_Type,
2910+
_ proto.Message) (*internal.RPCResult, error) {
2911+
return nil, fmt.Errorf("expected error")
2912+
}
2913+
2914+
func TestPartitionUpdateFailed(t *testing.T) {
2915+
topicName := newTopicName()
2916+
2917+
admin, err := pulsaradmin.NewClient(&config.Config{
2918+
WebServiceURL: adminURL,
2919+
})
2920+
require.NoError(t, err)
2921+
2922+
tn, err := utils.GetTopicName(topicName)
2923+
require.NoError(t, err)
2924+
require.NoError(t, admin.Topics().Create(*tn, 1))
2925+
2926+
c, err := NewClient(ClientOptions{
2927+
URL: serviceURL,
2928+
})
2929+
require.NoError(t, err)
2930+
p, err := c.CreateProducer(ProducerOptions{
2931+
Topic: topicName,
2932+
PartitionsAutoDiscoveryInterval: time.Second * 1,
2933+
})
2934+
require.NoError(t, err)
2935+
_, err = p.Send(context.Background(), &ProducerMessage{
2936+
Payload: []byte("test"),
2937+
})
2938+
require.NoError(t, err)
2939+
c.(*client).rpcClient = &mockRPCClient{
2940+
RPCClient: c.(*client).rpcClient,
2941+
}
2942+
2943+
require.NoError(t, admin.Topics().Update(*tn, 2))
2944+
2945+
// Assert that partition update failed won't affect the existing producers
2946+
for i := 0; i < 5; i++ {
2947+
_, err = p.Send(context.Background(), &ProducerMessage{
2948+
Payload: []byte("test"),
2949+
})
2950+
require.NoError(t, err)
2951+
2952+
time.Sleep(time.Second * 1)
2953+
}
2954+
}

0 commit comments

Comments
 (0)