Skip to content

Commit bddaeba

Browse files
authored
fix: SendAsync doesn't respect context and can't timeout during reconnection (#1422)
1 parent 8fb4613 commit bddaeba

File tree

5 files changed

+373
-44
lines changed

5 files changed

+373
-44
lines changed

pulsar/internal/blocking_queue.go

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,12 @@ import (
2424
// BlockingQueue is a interface of block queue
2525
type BlockingQueue interface {
2626
// Put enqueue one item, block if the queue is full
27+
// This is currently used for the internal testing
2728
Put(item interface{})
2829

30+
// PutUnsafe enqueue one item without locking the queue, block if the queue is full
31+
PutUnsafe(item interface{})
32+
2933
// Take dequeue one item, block until it's available
3034
Take() interface{}
3135

@@ -46,6 +50,17 @@ type BlockingQueue interface {
4650

4751
// ReadableSlice returns a new view of the readable items in the queue
4852
ReadableSlice() []interface{}
53+
54+
// IterateUnsafe iterates the items in the queue without blocking the queue
55+
IterateUnsafe(func(item interface{}))
56+
57+
// Lock locks the queue for manual control
58+
// Users must call Unlock() after finishing their operations
59+
Lock()
60+
61+
// Unlock unlocks the queue
62+
// Must be called after Lock() to release the lock
63+
Unlock()
4964
}
5065

5166
type blockingQueue struct {
@@ -60,6 +75,17 @@ type blockingQueue struct {
6075
isNotFull *sync.Cond
6176
}
6277

78+
func (bq *blockingQueue) IterateUnsafe(f func(item interface{})) {
79+
readIdx := bq.headIdx
80+
for i := 0; i < bq.size; i++ {
81+
f(bq.items[readIdx])
82+
readIdx++
83+
if readIdx == bq.maxSize {
84+
readIdx = 0
85+
}
86+
}
87+
}
88+
6389
// NewBlockingQueue init block queue and returns a BlockingQueue
6490
func NewBlockingQueue(maxSize int) BlockingQueue {
6591
bq := &blockingQueue{
@@ -76,9 +102,12 @@ func NewBlockingQueue(maxSize int) BlockingQueue {
76102
}
77103

78104
func (bq *blockingQueue) Put(item interface{}) {
79-
bq.mutex.Lock()
80-
defer bq.mutex.Unlock()
105+
bq.Lock()
106+
defer bq.Unlock()
107+
bq.PutUnsafe(item)
108+
}
81109

110+
func (bq *blockingQueue) PutUnsafe(item interface{}) {
82111
for bq.size == bq.maxSize {
83112
bq.isNotFull.Wait()
84113
}
@@ -192,3 +221,11 @@ func (bq *blockingQueue) ReadableSlice() []interface{} {
192221

193222
return res
194223
}
224+
225+
func (bq *blockingQueue) Lock() {
226+
bq.mutex.Lock()
227+
}
228+
229+
func (bq *blockingQueue) Unlock() {
230+
bq.mutex.Unlock()
231+
}

pulsar/internal/blocking_queue_test.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,3 +148,101 @@ func TestBlockingQueue_ReadableSlice(t *testing.T) {
148148
assert.Equal(t, items[1], 3)
149149
assert.Equal(t, items[2], 4)
150150
}
151+
152+
func TestBlockingQueueIterate(t *testing.T) {
153+
bq := NewBlockingQueue(5)
154+
155+
// Add some items
156+
bq.PutUnsafe("item1")
157+
bq.PutUnsafe("item2")
158+
bq.PutUnsafe("item3")
159+
160+
// Test iteration
161+
items := make([]interface{}, 0)
162+
bq.IterateUnsafe(func(item interface{}) {
163+
items = append(items, item)
164+
})
165+
166+
assert.Equal(t, 3, len(items))
167+
assert.Equal(t, "item1", items[0])
168+
assert.Equal(t, "item2", items[1])
169+
assert.Equal(t, "item3", items[2])
170+
}
171+
172+
func TestBlockingQueueIteratePartial(t *testing.T) {
173+
bq := NewBlockingQueue(5)
174+
175+
// Add some items
176+
bq.PutUnsafe("item1")
177+
bq.PutUnsafe("item2")
178+
bq.PutUnsafe("item3")
179+
180+
// Test partial iteration (first 2 items only)
181+
items := make([]interface{}, 0)
182+
bq.IterateUnsafe(func(item interface{}) {
183+
if len(items) < 2 {
184+
items = append(items, item)
185+
}
186+
})
187+
188+
assert.Equal(t, 2, len(items))
189+
assert.Equal(t, "item1", items[0])
190+
assert.Equal(t, "item2", items[1])
191+
}
192+
193+
func TestBlockingQueueIterateCircularBuffer(t *testing.T) {
194+
bq := NewBlockingQueue(3)
195+
196+
// Fill the queue to test circular buffer behavior
197+
bq.PutUnsafe("item1")
198+
bq.PutUnsafe("item2")
199+
bq.PutUnsafe("item3")
200+
201+
// Remove one item to create space
202+
bq.Poll()
203+
204+
// Add another item to test wrapping
205+
bq.PutUnsafe("item4")
206+
207+
// Test iteration with circular buffer
208+
items := make([]interface{}, 0)
209+
bq.IterateUnsafe(func(item interface{}) {
210+
items = append(items, item)
211+
})
212+
213+
assert.Equal(t, 3, len(items))
214+
assert.Equal(t, "item2", items[0])
215+
assert.Equal(t, "item3", items[1])
216+
assert.Equal(t, "item4", items[2])
217+
}
218+
219+
func TestBlockingQueueIterateEmpty(t *testing.T) {
220+
bq := NewBlockingQueue(5)
221+
222+
// Test iteration on empty queue
223+
items := make([]interface{}, 0)
224+
bq.IterateUnsafe(func(item interface{}) {
225+
items = append(items, item)
226+
})
227+
228+
assert.Equal(t, 0, len(items))
229+
}
230+
231+
func TestBlockingQueueManualLock(t *testing.T) {
232+
bq := NewBlockingQueue(5)
233+
234+
// Test manual locking for batch PutUnsafe operations
235+
bq.Lock()
236+
237+
bq.PutUnsafe("item1")
238+
bq.PutUnsafe("item2")
239+
bq.PutUnsafe("item3")
240+
241+
// Unlock
242+
bq.Unlock()
243+
244+
// Verify all items were added
245+
assert.Equal(t, 3, bq.Size())
246+
assert.Equal(t, "item1", bq.Peek())
247+
assert.Equal(t, "item3", bq.PeekLast())
248+
}

pulsar/producer.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,15 @@ type Producer interface {
235235
// This call is blocked when the `maxPendingMessages` becomes full (default: 1000)
236236
// The callback will report back the message being published and
237237
// the eventual error in publishing
238+
// The context passed in the call is only used for the duration of the SendAsync call itself
239+
// (i.e., to control blocking when the queue is full), and not for the entire message lifetime.
240+
// Once SendAsync returns, the message lifetime is controlled by the SendTimeout configuration.
241+
// Example:
242+
// producer.SendAsync(ctx, &pulsar.ProducerMessage{
243+
// Payload: myPayload,
244+
// }, func(msgID pulsar.MessageID, message *pulsar.ProducerMessage, err error) {
245+
// // handle publish result
246+
// })
238247
SendAsync(context.Context, *ProducerMessage, func(MessageID, *ProducerMessage, error))
239248

240249
// LastSequenceID get the last sequence id that was published by this producer.

0 commit comments

Comments
 (0)