Skip to content

Commit a93f4d7

Browse files
authored
[2.x] Fix wrong behavior of hasMessageAvailable after seeking by timestamp (#342)
* Fix wrong results of hasMessageAvailable after seeking by timestamp (#333) * Fix wrong results of hasMessageAvailable and readNext after seeking by timestamp * Remove HasSoughtByTimestamp and improve test (cherry picked from commit e894e66) * Fix Reader HasMessageAvailable behavior after seeking by timestamp (#344) * Fix Reader HasMessageAvailable behavior after seeking by timestamp * Improve logic (cherry picked from commit 170cb2f)
1 parent 612d40e commit a93f4d7

File tree

2 files changed

+152
-8
lines changed

2 files changed

+152
-8
lines changed

src/Pulsar.Client/Internal/ConsumerImpl.fs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
9696
let mutable lastMessageIdInBroker = MessageId.Earliest
9797
let mutable lastDequeuedMessageId = MessageId.Earliest
9898
let mutable duringSeek = None
99+
let mutable hasSoughtByTimestamp = false
99100
let initialStartMessageId = startMessageId
100101
let mutable incomingMessagesSize = 0L
101102
let receiverQueueRefillThreshold = consumerConfig.ReceiverQueueSize / 2
@@ -1082,8 +1083,11 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
10821083
Log.Logger.LogInformation("{0} Seek subscription to {1}", prefix, seekData)
10831084
let payload, seekMessageId =
10841085
match seekData with
1085-
| SeekType.Timestamp timestamp -> Commands.newSeekByTimestamp consumerId requestId timestamp, MessageId.Earliest
1086+
| SeekType.Timestamp timestamp ->
1087+
hasSoughtByTimestamp <- true
1088+
Commands.newSeekByTimestamp consumerId requestId timestamp, MessageId.Earliest
10861089
| SeekType.MessageId messageId ->
1090+
hasSoughtByTimestamp <- false
10871091
match messageId.ChunkMessageIds with
10881092
| Some chunkMessageIds when chunkMessageIds.Length >0 ->
10891093
Commands.newSeekByMsgId consumerId requestId chunkMessageIds[0], chunkMessageIds[0]
@@ -1122,16 +1126,19 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
11221126

11231127
// we haven't read yet. use startMessageId for comparison
11241128
if lastDequeuedMessageId = MessageId.Earliest then
1129+
// If the last seek is called with timestamp, startMessageId cannot represent the position to start, so we
1130+
// have to get the mark-delete position from the GetLastMessageId response to compare as well.
11251131
// if we are starting from latest, we should seek to the actual last message first.
11261132
// allow the last one to be read when read head inclusively.
1127-
if startMessageId = MessageId.Latest then
1133+
if startMessageId = MessageId.Latest || hasSoughtByTimestamp then
11281134
backgroundTask {
11291135
try
11301136
let! lastMessageIdResult = getLastMessageIdAsync()
11311137
let lastMessageId = lastMessageIdResult.LastMessageId
11321138
// if the consumer is configured to read inclusive then we need to seek to the last message
1133-
do! postAndAsyncReply this.Mb (fun channel ->
1134-
SeekAsync (SeekType.MessageId lastMessageId, channel))
1139+
if consumerConfig.ResetIncludeHead && not hasSoughtByTimestamp then
1140+
do! postAndAsyncReply this.Mb (fun channel ->
1141+
SeekAsync (SeekType.MessageId lastMessageId, channel))
11351142
match lastMessageIdResult.MarkDeletePosition with
11361143
| Some markDeletePosition ->
11371144
if lastMessageId.EntryId < %0L then
@@ -1416,6 +1423,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
14161423
with get() = Volatile.Read(&lastMessageIdInBroker)
14171424
and private set value = Volatile.Write(&lastMessageIdInBroker, value)
14181425

1426+
14191427
override this.Equals consumer =
14201428
consumerId = (consumer :?> IConsumer<'T>).ConsumerId
14211429

tests/IntegrationTests/Reader.fs

Lines changed: 140 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,132 @@ let tests =
272272

273273

274274

275+
let checkHasMessageAvailableAfterSeekTimestamp (initializeLastMessageIdInBroker: bool) =
276+
task {
277+
Log.Debug("Started HasMessageAvailableAfterSeekTimestamp initializeLastMessageIdInBroker: {0}", initializeLastMessageIdInBroker)
278+
let client = getClient()
279+
let topicName = "public/default/test-has-message-available-after-seek-timestamp-" + Guid.NewGuid().ToString("N")
280+
281+
let! (producer : IProducer<string>) =
282+
client.NewProducer(Schema.STRING())
283+
.Topic(topicName)
284+
.CreateAsync()
285+
286+
let timestampBeforeSend = %(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds())
287+
let! sentMsgId = producer.SendAsync("msg")
288+
do! producer.DisposeAsync()
289+
290+
let messageIds = [
291+
MessageId.Earliest
292+
sentMsgId
293+
MessageId.Latest
294+
]
295+
296+
// Test 1: Seek to future timestamp
297+
for messageId in messageIds do
298+
let! (reader : IReader<string>) =
299+
client.NewReader(Schema.STRING())
300+
.Topic(topicName)
301+
.ReceiverQueueSize(1)
302+
.StartMessageId(messageId)
303+
.CreateAsync()
304+
305+
if initializeLastMessageIdInBroker then
306+
if messageId = MessageId.Earliest then
307+
let! hasMessage = reader.HasMessageAvailableAsync()
308+
Expect.isTrue "should have message" hasMessage
309+
else
310+
let! hasMessage = reader.HasMessageAvailableAsync()
311+
Expect.isFalse "should not have message" hasMessage
312+
313+
let futureTimestamp = %(DateTimeOffset.UtcNow.AddMinutes(1.0).ToUnixTimeMilliseconds())
314+
315+
// The Seek operation does not implement a backoff mechanism. It will fail if the connection is not
316+
// ready, so wait for a short period until the connection becomes available.
317+
do! Task.Delay(1000)
318+
do! reader.SeekAsync(futureTimestamp)
319+
// HasMessageAvailableAsync does not implement a backoff mechanism. The operation may fail due to the
320+
// broker not finding the consumer during the seek operation. So wait for a short period here.
321+
do! Task.Delay(1000)
322+
let! hasMessage = reader.HasMessageAvailableAsync()
323+
Expect.isFalse "after seek to future should not have message" hasMessage
324+
do! reader.DisposeAsync()
325+
326+
// Test 2: Seek to timestamp before send
327+
for messageId in messageIds do
328+
let! (reader : IReader<string>) =
329+
client.NewReader(Schema.STRING())
330+
.Topic(topicName)
331+
.ReceiverQueueSize(1)
332+
.StartMessageId(messageId)
333+
.CreateAsync()
334+
335+
336+
if initializeLastMessageIdInBroker then
337+
if messageId = MessageId.Earliest then
338+
let! hasMessage = reader.HasMessageAvailableAsync()
339+
Expect.isTrue "should have message" hasMessage
340+
else
341+
let! hasMessage = reader.HasMessageAvailableAsync()
342+
Expect.isFalse "should not have message" hasMessage
343+
344+
do! Task.Delay(1000)
345+
do! reader.SeekAsync(timestampBeforeSend)
346+
do! Task.Delay(1000)
347+
let! hasMessage = reader.HasMessageAvailableAsync()
348+
Expect.isTrue "after seek to before send should have message" hasMessage
349+
do! reader.DisposeAsync()
350+
351+
Log.Debug("Finished HasMessageAvailableAfterSeekTimestamp initializeLastMessageIdInBroker: {0}", initializeLastMessageIdInBroker)
352+
}
353+
354+
let checkReaderLoopWhileHasMessageAvailableAfterSeekTimestamp () =
355+
task {
356+
Log.Debug("Started Reader loop while HasMessageAvailable after SeekTimestamp")
357+
let client = getClient()
358+
let topicName = "public/default/test-reader-loop-has-message-available-after-seek-timestamp-" + Guid.NewGuid().ToString("N")
359+
360+
let! (producer : IProducer<string>) =
361+
client.NewProducer(Schema.STRING())
362+
.Topic(topicName)
363+
.EnableBatching(false)
364+
.CreateAsync()
365+
366+
let timestampBeforeSend = %(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds())
367+
let! _ = producer.SendAsync("m1")
368+
let! _ = producer.SendAsync("m2")
369+
let! _ = producer.SendAsync("m3")
370+
do! producer.DisposeAsync()
371+
372+
let! (reader : IReader<string>) =
373+
client.NewReader(Schema.STRING())
374+
.Topic(topicName)
375+
.ReceiverQueueSize(1)
376+
.StartMessageId(MessageId.Latest)
377+
.CreateAsync()
378+
379+
do! Task.Delay(1000)
380+
do! reader.SeekAsync(timestampBeforeSend)
381+
do! Task.Delay(1000)
382+
383+
use cts = new System.Threading.CancellationTokenSource(TimeSpan.FromSeconds(10.0))
384+
let received = ResizeArray<string>()
385+
386+
let mutable continueLooping = true
387+
while continueLooping do
388+
let! hasMessage = reader.HasMessageAvailableAsync()
389+
if hasMessage then
390+
let! msg = reader.ReadNextAsync(cts.Token)
391+
received.Add(msg.GetValue())
392+
else
393+
continueLooping <- false
394+
395+
Expect.sequenceEqual "" [ "m1"; "m2"; "m3" ] received
396+
do! reader.DisposeAsync()
397+
398+
Log.Debug("Finished Reader loop while HasMessageAvailable after SeekTimestamp")
399+
}
400+
275401
testList "Reader" [
276402

277403
testTask "Reader non-batching configuration works fine" {
@@ -314,13 +440,23 @@ let tests =
314440
do! checkReadingFromRollback true
315441
}
316442

317-
// uncomment when https://github.com/apache/pulsar/issues/10515 is ready
318-
ptestTask "Check StartMessageFromFuturePoint without batching" {
443+
testTask "Check StartMessageFromFuturePoint without batching" {
319444
do! checkReadingFromFuture false
320445
}
321446

322-
// uncomment when https://github.com/apache/pulsar/issues/10515 is ready
323-
ptestTask "Check StartMessageFromFuturePoint with batching" {
447+
testTask "Check StartMessageFromFuturePoint with batching" {
324448
do! checkReadingFromFuture true
325449
}
450+
451+
testTask "HasMessageAvailable after SeekTimestamp without initializeLastMessageIdInBroker" {
452+
do! checkHasMessageAvailableAfterSeekTimestamp false
453+
}
454+
455+
testTask "HasMessageAvailable after SeekTimestamp with initializeLastMessageIdInBroker" {
456+
do! checkHasMessageAvailableAfterSeekTimestamp true
457+
}
458+
459+
testTask "Check HasMessageAvailable works after SeekTimestamp" {
460+
do! checkReaderLoopWhileHasMessageAvailableAfterSeekTimestamp()
461+
}
326462
]

0 commit comments

Comments
 (0)