Skip to content

Commit e2bab09

Browse files
authored
Fix receive getting stuck after seek in MultiTopicsConsumer (#336)
* Fix MultiTopicsConsumer seek stuck * Add test "Seek wont stuck the receive" * Improve codes * Address comments
1 parent 7e6df56 commit e2bab09

File tree

3 files changed

+111
-11
lines changed

3 files changed

+111
-11
lines changed

src/Pulsar.Client/Internal/MultiTopicsConsumerImpl.fs

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -375,15 +375,24 @@ type internal MultiTopicsConsumerImpl<'T> (consumerConfig: ConsumerConfiguration
375375
| Ok msg -> incomingMessagesSize <- incomingMessagesSize + msg.Data.LongLength
376376
| _ -> ()
377377
incomingMessages.Enqueue(m)
378+
379+
let tryResumePoller() =
380+
if isPollingAllowed() && (waitingPoller <> defaultWaitingPoller) then
381+
Log.Logger.LogDebug("{0} resume poller", prefix)
382+
waitingPoller.SetResult()
383+
waitingPoller <- defaultWaitingPoller
384+
385+
let clearIncomingMessages() =
386+
incomingMessages.Clear()
387+
incomingMessagesSize <- 0L
388+
tryResumePoller()
378389

379390
let dequeueMessage() =
380391
let m = incomingMessages.Dequeue()
381392
match m with
382393
| Ok msg -> incomingMessagesSize <- incomingMessagesSize - msg.Data.LongLength
383394
| _ -> ()
384-
if isPollingAllowed() && (waitingPoller <> defaultWaitingPoller) then
385-
waitingPoller.SetResult()
386-
waitingPoller <- defaultWaitingPoller
395+
tryResumePoller()
387396
m
388397

389398
let hasEnoughMessagesForBatchReceive() =
@@ -678,6 +687,7 @@ type internal MultiTopicsConsumerImpl<'T> (consumerConfig: ConsumerConfiguration
678687
replyWithBatch ch
679688
// check if should reply to poller immediately
680689
if isPollingAllowed() |> not then
690+
Log.Logger.LogDebug("{0} paused poller, incomingMessages={1}, sharedQueueResumeThreshold={2}", prefix, incomingMessages.Count, sharedQueueResumeThreshold)
681691
waitingPoller <- pollerChannel
682692
else
683693
pollerChannel.SetResult()
@@ -755,9 +765,8 @@ type internal MultiTopicsConsumerImpl<'T> (consumerConfig: ConsumerConfiguration
755765
|> Seq.map(fun (KeyValue(_, (consumer, _))) -> consumer.RedeliverUnacknowledgedMessagesAsync())
756766
|> Task.WhenAll
757767
unAckedMessageTracker.Clear()
758-
incomingMessages.Clear()
768+
clearIncomingMessages()
759769
currentStream.RestartCompletedTasks()
760-
incomingMessagesSize <- 0L
761770
channel |> Option.map _.SetResult() |> ignore
762771
with ex ->
763772
Log.Logger.LogError(ex, "{0} RedeliverUnacknowledgedMessages failed", prefix)
@@ -838,16 +847,16 @@ type internal MultiTopicsConsumerImpl<'T> (consumerConfig: ConsumerConfiguration
838847
Log.Logger.LogDebug("{0} Seek {1}", prefix, seekData)
839848
backgroundTask {
840849
try
850+
unAckedMessageTracker.Clear()
851+
clearIncomingMessages()
841852
let! _ =
842853
consumers
843854
|> Seq.map (fun (KeyValue(_, (consumer, _))) ->
844855
match seekData with
845856
| SeekType.Timestamp ts -> consumer.SeekAsync(ts)
846857
| SeekType.MessageId msgId -> consumer.SeekAsync(msgId))
847858
|> Task.WhenAll
848-
unAckedMessageTracker.Clear()
849-
incomingMessages.Clear()
850-
incomingMessagesSize <- 0L
859+
currentStream.RestartCompletedTasks()
851860
channel.SetResult()
852861
with Flatten ex ->
853862
channel.SetException ex
@@ -856,13 +865,13 @@ type internal MultiTopicsConsumerImpl<'T> (consumerConfig: ConsumerConfiguration
856865
| SeekWithResolver (resolver, channel) ->
857866
backgroundTask {
858867
try
868+
unAckedMessageTracker.Clear()
869+
clearIncomingMessages()
859870
let! _ =
860871
consumers
861872
|> Seq.map (fun (KeyValue(_, (consumer, _))) -> consumer.SeekAsync(resolver))
862873
|> Task.WhenAll
863-
unAckedMessageTracker.Clear()
864-
incomingMessages.Clear()
865-
incomingMessagesSize <- 0L
874+
currentStream.RestartCompletedTasks()
866875
channel.SetResult()
867876
with Flatten ex ->
868877
channel.SetException ex

tests/IntegrationTests/Seek.fs

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module Pulsar.Client.IntegrationTests.Seek
33
open System
44
open System.Threading
55
open System.Diagnostics
6+
open System.Collections.Generic
67

78
open Expecto
89
open Expecto.Flip
@@ -240,5 +241,94 @@ let tests =
240241
testTask "Seek randomly works without batching " {
241242
do! testRandomSeek true
242243
}
244+
245+
246+
testTask "Seek won't get stuck at the receive in MultiTopicsConsumer" {
247+
Log.Debug("Started Seek won't get stuck at the receive in MultiTopicsConsumer")
248+
let client = getClient()
249+
let topicName = "persistent://public/default/multi-topic-seek"
250+
let producerName = "seekStuckProducer"
251+
let consumerName = "seekStuckConsumer"
252+
let numberOfMessages = 30
253+
let subscriptionName = "test-seek-stuck-" + Guid.NewGuid().ToString("N")
254+
255+
let seekWithRetry (consumer: IConsumer<byte[]>) (targetTimestamp: TimeStamp) (maxRetries: int) =
256+
task {
257+
let mutable retryCount = 0
258+
let mutable success = false
259+
while retryCount < maxRetries && not success do
260+
try
261+
do! consumer.SeekAsync(targetTimestamp)
262+
success <- true
263+
with Flatten ex ->
264+
match ex with
265+
| :? NotConnectedException as notConnectedEx ->
266+
retryCount <- retryCount + 1
267+
if retryCount >= maxRetries then
268+
Log.Error("SeekAsync failed after {0} retries: {1}", maxRetries, notConnectedEx.Message)
269+
raise notConnectedEx
270+
else
271+
Log.Debug("SeekAsync failed (attempt {0}/{1}): {2}. Retrying in 1 second...", retryCount, maxRetries, notConnectedEx.Message)
272+
do! Task.Delay(1000)
273+
| _ ->
274+
raise ex
275+
}
276+
277+
let! consumer =
278+
client.NewConsumer()
279+
.Topic(topicName)
280+
.ConsumerName(consumerName)
281+
.SubscriptionName(subscriptionName)
282+
.ReceiverQueueSize(10)
283+
.SubscribeAsync()
284+
let! (producer : IProducer<byte[]>) =
285+
client.NewProducer()
286+
.Topic(topicName)
287+
.ProducerName(producerName)
288+
.EnableBatching(false)
289+
.CreateAsync()
290+
291+
let expectedMessages = HashSet<string>()
292+
for i in 1..numberOfMessages do
293+
let messageContent = sprintf "Message-%i-%s" i (Guid.NewGuid().ToString("N"))
294+
expectedMessages.Add(messageContent) |> ignore
295+
let messageBytes = Encoding.UTF8.GetBytes(messageContent)
296+
let! (_ : MessageId) = producer.SendAsync(messageBytes)
297+
()
298+
do! Task.Delay(1000)
299+
300+
let targetTimestamp = %(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - 1000L * 3600L * 24L)
301+
Log.Debug("Seeking to timestamp: {0}", targetTimestamp)
302+
do! seekWithRetry consumer targetTimestamp 10
303+
304+
let receivedMessages = HashSet<string>()
305+
let cts = new CancellationTokenSource(TimeSpan.FromSeconds(30.0))
306+
307+
try
308+
for _ in 1..numberOfMessages do
309+
let! (message : Message<byte[]>) = consumer.ReceiveAsync(cts.Token)
310+
let received = Encoding.UTF8.GetString(message.Data)
311+
Log.Debug("{0} received {1}", consumerName, received)
312+
receivedMessages.Add(received) |> ignore
313+
do! consumer.AcknowledgeAsync(message.MessageId)
314+
315+
Expect.equal "" numberOfMessages receivedMessages.Count
316+
for expectedMsg in expectedMessages do
317+
Expect.isTrue "" (receivedMessages.Contains(expectedMsg))
318+
319+
cts.Dispose()
320+
with
321+
| :? OperationCanceledException
322+
| :? TaskCanceledException ->
323+
cts.Dispose()
324+
let errorMsg = $"Test timeout: Only received {receivedMessages.Count} out of {numberOfMessages} messages within 30 seconds"
325+
Log.Error(errorMsg)
326+
failwith errorMsg
327+
| ex ->
328+
cts.Dispose()
329+
raise ex
330+
331+
Log.Debug("Finished Seek won't get stuck at the receive in MultiTopicsConsumer")
332+
}
243333

244334
]

tests/compose/standalone/scripts/init-standalone.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ bin/pulsar-admin topics create-partitioned-topic persistent://public/default/par
2222
bin/pulsar-admin topics create-partitioned-topic persistent://public/default/partitioned6 --partitions 2
2323
bin/pulsar-admin topics create-partitioned-topic persistent://public/default/partitioned-dl-test --partitions 2
2424
bin/pulsar-admin topics create-partitioned-topic persistent://public/deduplication/partitioned --partitions 3
25+
bin/pulsar-admin topics create-partitioned-topic persistent://public/default/multi-topic-seek --partitions 3
2526

2627
echo "Initializing transaction coordinator metadata..."
2728
bin/pulsar initialize-transaction-coordinator-metadata -cs standalone:2181 -c standalone --initial-num-transaction-coordinators 2

0 commit comments

Comments
 (0)