Skip to content

Commit b663c94

Browse files
RobertIndieLanayx
andauthored
Support FlushAsync in Producer (#334)
* Support producer flush * Cleanup codes * Fix flush won't be failed when the callback failed and improve the code and logs * Review comments --------- Co-authored-by: Vladimir Shchur <[email protected]>
1 parent 5b562ca commit b663c94

File tree

5 files changed

+175
-0
lines changed

5 files changed

+175
-0
lines changed

src/Pulsar.Client/Api/IProducer.fs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,3 +75,5 @@ type IProducer<'T> =
7575
abstract member LastDisconnectedTimestamp: unit -> Task<TimeStamp>
7676
/// Return true if the consumer is connected to the broker
7777
abstract member IsConnected: unit -> Task<bool>
78+
/// Flush all pending messages and wait for their acknowledgements
79+
abstract member FlushAsync: unit -> Task<unit>

src/Pulsar.Client/Internal/PartitionedProducerImpl.fs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,18 @@ type internal PartitionedProducerImpl<'T> private (producerConfig: ProducerConfi
385385

386386
member this.IsConnected() = postAndAsyncReply mb IsConnected
387387

388+
member this.FlushAsync() =
389+
backgroundTask {
390+
// Flush all partition producers
391+
let flushTasks =
392+
producers
393+
|> Seq.map (fun producer -> producer.FlushAsync())
394+
|> Seq.toArray
395+
if flushTasks.Length > 0 then
396+
let! _ = Task.WhenAll(flushTasks)
397+
()
398+
}
399+
388400

389401
interface IAsyncDisposable with
390402
member this.DisposeAsync() =

src/Pulsar.Client/Internal/ProducerImpl.fs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type internal ProducerMessage<'T> =
4040
| Close of TaskCompletionSource<ResultOrException<unit>>
4141
| Tick of ProducerTickType
4242
| GetStats of TaskCompletionSource<ProducerStats>
43+
| Flush of TaskCompletionSource<unit>
4344

4445
type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, clientConfig: PulsarClientConfiguration, connectionPool: ConnectionPool,
4546
partitionIndex: int, lookup: ILookupService, schema: ISchema<'T>,
@@ -754,6 +755,45 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
754755
| ProducerMessage.GetStats channel ->
755756
channel.SetResult <| stats.GetStats()
756757

758+
| ProducerMessage.Flush channel ->
759+
Log.Logger.LogDebug("{0} Flush requested, pendingMessages count: {1}", prefix, pendingMessages.Count)
760+
// First, send all batched messages
761+
batchMessageAndSend()
762+
763+
// If pendingMessages queue is empty, return immediately
764+
if pendingMessages.Count = 0 then
765+
Log.Logger.LogDebug("{0} Flush completed immediately, no pending messages", prefix)
766+
channel.SetResult()
767+
else
768+
// Get the last message from the queue
769+
let lastMessage = pendingMessages |> Seq.last
770+
771+
Log.Logger.LogDebug("{0} Flush waiting for last message callback, last sequenceId: {1}", prefix, %lastMessage.SequenceId)
772+
// Wait for the last message's callback to complete asynchronously
773+
backgroundTask {
774+
try
775+
match lastMessage.Callback with
776+
| SingleCallback (_, _, tcsOption) ->
777+
match tcsOption with
778+
| Some tcs ->
779+
let! _ = tcs.Task
780+
()
781+
| None -> ()
782+
| BatchCallbacks batchCallbacks ->
783+
// Wait for all TaskCompletionSource in the batch
784+
let tasks =
785+
batchCallbacks
786+
|> Array.choose (fun struct(_, _, tcsOption) ->
787+
tcsOption |> Option.map (fun tcs -> tcs.Task :> Task))
788+
if tasks.Length > 0 then
789+
do! Task.WhenAll(tasks)
790+
Log.Logger.LogDebug("{0} Flush completed, last sequenceId: {1}", prefix, %lastMessage.SequenceId)
791+
channel.SetResult()
792+
with Flatten ex ->
793+
Log.Logger.LogError(ex, "{0} Flush failed, last sequenceId: {1}", prefix, %lastMessage.SequenceId)
794+
channel.SetException ex
795+
} |> ignore
796+
757797
| ProducerMessage.Close channel ->
758798

759799
match connectionHandler.ConnectionState with
@@ -941,6 +981,10 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
941981
| Ready _ -> trueTask
942982
| _ -> falseTask
943983

984+
member this.FlushAsync() =
985+
connectionHandler.CheckIfActive() |> throwIfNotNull
986+
postAndAsyncReply mb ProducerMessage.Flush
987+
944988
interface IAsyncDisposable with
945989

946990
member this.DisposeAsync() =

tests/IntegrationTests/Flush.fs

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
module Pulsar.Client.IntegrationTests.Flush
2+
3+
open System
4+
open System.Collections.Generic
5+
open System.Text
6+
open System.Threading
7+
open Expecto
8+
open Pulsar.Client.Api
9+
open Pulsar.Client.Common
10+
open Serilog
11+
open Pulsar.Client.IntegrationTests.Common
12+
13+
14+
let testMessageOrderAndDuplicates (messageSet: HashSet<string>) (receivedMessage: string) (expectedMessage: string) =
15+
if messageSet.Contains(receivedMessage) then
16+
failwith $"Duplicate message received: {receivedMessage}"
17+
messageSet.Add(receivedMessage) |> ignore
18+
if receivedMessage <> expectedMessage then
19+
failwith $"Incorrect message order. Expected: {expectedMessage}, Received: {receivedMessage}"
20+
21+
[<Tests>]
22+
let tests =
23+
24+
testList "Flush" [
25+
26+
testTask "Flush with batch enabled" {
27+
Log.Debug("Started Flush with batch enabled")
28+
let client = getClient()
29+
let topicName = "persistent://public/default/test-flush-batch-enabled-" + Guid.NewGuid().ToString("N")
30+
31+
let! (consumer : IConsumer<byte[]>) =
32+
client.NewConsumer()
33+
.Topic(topicName)
34+
.SubscriptionName("my-subscriber-name")
35+
.SubscribeAsync()
36+
37+
let! (producer : IProducer<byte[]>) =
38+
client.NewProducer()
39+
.Topic(topicName)
40+
.EnableBatching(true)
41+
.BatchingMaxPublishDelay(TimeSpan.FromHours(1.0))
42+
.BatchingMaxMessages(10000)
43+
.CreateAsync()
44+
45+
// Send 10 messages asynchronously without waiting
46+
for i in 0..9 do
47+
let message = $"my-message-{i}"
48+
producer.SendAsync(Encoding.UTF8.GetBytes(message)) |> ignore
49+
50+
// Flush to ensure all messages are sent and acknowledged
51+
do! producer.FlushAsync()
52+
53+
// Dispose producer
54+
do! (producer :> IAsyncDisposable).DisposeAsync().AsTask()
55+
56+
// Receive and verify messages
57+
let messageSet = HashSet<string>()
58+
let cts = new CancellationTokenSource(TimeSpan.FromSeconds(5.0))
59+
60+
for i in 0..9 do
61+
let! (msg : Message<byte[]>) = consumer.ReceiveAsync(cts.Token)
62+
let receivedMessage = Encoding.UTF8.GetString(msg.GetValue())
63+
Log.Debug("Received message: [{0}]", receivedMessage)
64+
let expectedMessage = $"my-message-{i}"
65+
testMessageOrderAndDuplicates messageSet receivedMessage expectedMessage
66+
67+
do! (consumer :> IAsyncDisposable).DisposeAsync().AsTask()
68+
69+
Log.Debug("Finished Started Flush with batch enabled")
70+
}
71+
72+
testTask "Flush with batch disabled" {
73+
Log.Debug("Started Flush with batch disabled")
74+
let client = getClient()
75+
let topicName = "persistent://public/default/test-flush-batch-disabled-" + Guid.NewGuid().ToString("N")
76+
77+
let! (consumer : IConsumer<byte[]>) =
78+
client.NewConsumer()
79+
.Topic(topicName)
80+
.SubscriptionName("my-subscriber-name")
81+
.SubscribeAsync()
82+
83+
let! (producer : IProducer<byte[]>) =
84+
client.NewProducer()
85+
.Topic(topicName)
86+
.EnableBatching(false)
87+
.CreateAsync()
88+
89+
// Send 10 messages asynchronously without waiting
90+
for i in 0..9 do
91+
let message = $"my-message-{i}"
92+
producer.SendAsync(Encoding.UTF8.GetBytes(message)) |> ignore
93+
94+
// Flush to ensure all messages are sent and acknowledged
95+
do! producer.FlushAsync()
96+
97+
// Dispose producer
98+
do! (producer :> IAsyncDisposable).DisposeAsync().AsTask()
99+
100+
// Receive and verify messages
101+
let messageSet = HashSet<string>()
102+
let cts = new CancellationTokenSource(TimeSpan.FromSeconds(5.0))
103+
104+
for i in 0..9 do
105+
let! (msg : Message<byte[]>) = consumer.ReceiveAsync(cts.Token)
106+
let receivedMessage = Encoding.UTF8.GetString(msg.GetValue())
107+
Log.Debug("Received message: [{0}]", receivedMessage)
108+
let expectedMessage = $"my-message-{i}"
109+
testMessageOrderAndDuplicates messageSet receivedMessage expectedMessage
110+
111+
do! (consumer :> IAsyncDisposable).DisposeAsync().AsTask()
112+
113+
Log.Debug("Finished Flush with batch disabled")
114+
}
115+
]
116+

tests/IntegrationTests/IntegrationTests.fsproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
<Compile Include="Failover.fs" />
3737
<Compile Include="Transaction.fs" />
3838
<Compile Include="MessageCrypto.fs" />
39+
<Compile Include="Flush.fs" />
3940
<Compile Include="HttpLookupService.fs" />
4041
<Compile Include="Main.fs" />
4142
</ItemGroup>

0 commit comments

Comments
 (0)