Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions src/Pulsar.Client/Internal/ConsumerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
let mutable lastMessageIdInBroker = MessageId.Earliest
let mutable lastDequeuedMessageId = MessageId.Earliest
let mutable duringSeek = None
let mutable hasSoughtByTimestamp = false
let initialStartMessageId = startMessageId
let mutable incomingMessagesSize = 0L
let receiverQueueRefillThreshold = consumerConfig.ReceiverQueueSize / 2
Expand Down Expand Up @@ -1082,8 +1083,11 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
Log.Logger.LogInformation("{0} Seek subscription to {1}", prefix, seekData)
let payload, seekMessageId =
match seekData with
| SeekType.Timestamp timestamp -> Commands.newSeekByTimestamp consumerId requestId timestamp, MessageId.Earliest
| SeekType.Timestamp timestamp ->
hasSoughtByTimestamp <- true
Commands.newSeekByTimestamp consumerId requestId timestamp, MessageId.Earliest
| SeekType.MessageId messageId ->
hasSoughtByTimestamp <- false
match messageId.ChunkMessageIds with
| Some chunkMessageIds when chunkMessageIds.Length >0 ->
Commands.newSeekByMsgId consumerId requestId chunkMessageIds[0], chunkMessageIds[0]
Expand Down Expand Up @@ -1122,16 +1126,19 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien

// we haven't read yet. use startMessageId for comparison
if lastDequeuedMessageId = MessageId.Earliest then
// If the last seek is called with timestamp, startMessageId cannot represent the position to start, so we
// have to get the mark-delete position from the GetLastMessageId response to compare as well.
// if we are starting from latest, we should seek to the actual last message first.
// allow the last one to be read when read head inclusively.
if startMessageId = MessageId.Latest then
if startMessageId = MessageId.Latest || hasSoughtByTimestamp then
backgroundTask {
try
let! lastMessageIdResult = getLastMessageIdAsync()
let lastMessageId = lastMessageIdResult.LastMessageId
// if the consumer is configured to read inclusive then we need to seek to the last message
do! postAndAsyncReply this.Mb (fun channel ->
SeekAsync (SeekType.MessageId lastMessageId, channel))
if consumerConfig.ResetIncludeHead && not hasSoughtByTimestamp then
do! postAndAsyncReply this.Mb (fun channel ->
SeekAsync (SeekType.MessageId lastMessageId, channel))
match lastMessageIdResult.MarkDeletePosition with
| Some markDeletePosition ->
if lastMessageId.EntryId < %0L then
Expand Down Expand Up @@ -1416,6 +1423,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
with get() = Volatile.Read(&lastMessageIdInBroker)
and private set value = Volatile.Write(&lastMessageIdInBroker, value)


override this.Equals consumer =
consumerId = (consumer :?> IConsumer<'T>).ConsumerId

Expand Down
144 changes: 140 additions & 4 deletions tests/IntegrationTests/Reader.fs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,132 @@ let tests =



let checkHasMessageAvailableAfterSeekTimestamp (initializeLastMessageIdInBroker: bool) =
task {
Log.Debug("Started HasMessageAvailableAfterSeekTimestamp initializeLastMessageIdInBroker: {0}", initializeLastMessageIdInBroker)
let client = getClient()
let topicName = "public/default/test-has-message-available-after-seek-timestamp-" + Guid.NewGuid().ToString("N")

let! (producer : IProducer<string>) =
client.NewProducer(Schema.STRING())
.Topic(topicName)
.CreateAsync()

let timestampBeforeSend = %(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds())
let! sentMsgId = producer.SendAsync("msg")
do! producer.DisposeAsync()

let messageIds = [
MessageId.Earliest
sentMsgId
MessageId.Latest
]

// Test 1: Seek to future timestamp
for messageId in messageIds do
let! (reader : IReader<string>) =
client.NewReader(Schema.STRING())
.Topic(topicName)
.ReceiverQueueSize(1)
.StartMessageId(messageId)
.CreateAsync()

if initializeLastMessageIdInBroker then
if messageId = MessageId.Earliest then
let! hasMessage = reader.HasMessageAvailableAsync()
Expect.isTrue "should have message" hasMessage
else
let! hasMessage = reader.HasMessageAvailableAsync()
Expect.isFalse "should not have message" hasMessage

let futureTimestamp = %(DateTimeOffset.UtcNow.AddMinutes(1.0).ToUnixTimeMilliseconds())

// The Seek operation does not implement a backoff mechanism. It will fail if the connection is not
// ready, so wait for a short period until the connection becomes available.
do! Task.Delay(1000)
do! reader.SeekAsync(futureTimestamp)
// HasMessageAvailableAsync does not implement a backoff mechanism. The operation may fail due to the
// broker not finding the consumer during the seek operation. So wait for a short period here.
do! Task.Delay(1000)
let! hasMessage = reader.HasMessageAvailableAsync()
Expect.isFalse "after seek to future should not have message" hasMessage
do! reader.DisposeAsync()

// Test 2: Seek to timestamp before send
for messageId in messageIds do
let! (reader : IReader<string>) =
client.NewReader(Schema.STRING())
.Topic(topicName)
.ReceiverQueueSize(1)
.StartMessageId(messageId)
.CreateAsync()


if initializeLastMessageIdInBroker then
if messageId = MessageId.Earliest then
let! hasMessage = reader.HasMessageAvailableAsync()
Expect.isTrue "should have message" hasMessage
else
let! hasMessage = reader.HasMessageAvailableAsync()
Expect.isFalse "should not have message" hasMessage

do! Task.Delay(1000)
do! reader.SeekAsync(timestampBeforeSend)
do! Task.Delay(1000)
let! hasMessage = reader.HasMessageAvailableAsync()
Expect.isTrue "after seek to before send should have message" hasMessage
do! reader.DisposeAsync()

Log.Debug("Finished HasMessageAvailableAfterSeekTimestamp initializeLastMessageIdInBroker: {0}", initializeLastMessageIdInBroker)
}

let checkReaderLoopWhileHasMessageAvailableAfterSeekTimestamp () =
task {
Log.Debug("Started Reader loop while HasMessageAvailable after SeekTimestamp")
let client = getClient()
let topicName = "public/default/test-reader-loop-has-message-available-after-seek-timestamp-" + Guid.NewGuid().ToString("N")

let! (producer : IProducer<string>) =
client.NewProducer(Schema.STRING())
.Topic(topicName)
.EnableBatching(false)
.CreateAsync()

let timestampBeforeSend = %(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds())
let! _ = producer.SendAsync("m1")
let! _ = producer.SendAsync("m2")
let! _ = producer.SendAsync("m3")
do! producer.DisposeAsync()

let! (reader : IReader<string>) =
client.NewReader(Schema.STRING())
.Topic(topicName)
.ReceiverQueueSize(1)
.StartMessageId(MessageId.Latest)
.CreateAsync()

do! Task.Delay(1000)
do! reader.SeekAsync(timestampBeforeSend)
do! Task.Delay(1000)

use cts = new System.Threading.CancellationTokenSource(TimeSpan.FromSeconds(10.0))
let received = ResizeArray<string>()

let mutable continueLooping = true
while continueLooping do
let! hasMessage = reader.HasMessageAvailableAsync()
if hasMessage then
let! msg = reader.ReadNextAsync(cts.Token)
received.Add(msg.GetValue())
else
continueLooping <- false

Expect.sequenceEqual "" [ "m1"; "m2"; "m3" ] received
do! reader.DisposeAsync()

Log.Debug("Finished Reader loop while HasMessageAvailable after SeekTimestamp")
}

testList "Reader" [

testTask "Reader non-batching configuration works fine" {
Expand Down Expand Up @@ -314,13 +440,23 @@ let tests =
do! checkReadingFromRollback true
}

// uncomment when https://github.com/apache/pulsar/issues/10515 is ready
ptestTask "Check StartMessageFromFuturePoint without batching" {
testTask "Check StartMessageFromFuturePoint without batching" {
do! checkReadingFromFuture false
}

// uncomment when https://github.com/apache/pulsar/issues/10515 is ready
ptestTask "Check StartMessageFromFuturePoint with batching" {
testTask "Check StartMessageFromFuturePoint with batching" {
do! checkReadingFromFuture true
}

testTask "HasMessageAvailable after SeekTimestamp without initializeLastMessageIdInBroker" {
do! checkHasMessageAvailableAfterSeekTimestamp false
}

testTask "HasMessageAvailable after SeekTimestamp with initializeLastMessageIdInBroker" {
do! checkHasMessageAvailableAfterSeekTimestamp true
}

testTask "Check HasMessageAvailable works after SeekTimestamp" {
do! checkReaderLoopWhileHasMessageAvailableAfterSeekTimestamp()
}
]
Loading