Skip to content

Commit a992994

Browse files
authored
Make copy of respons message body (#32)
1 parent f16423b commit a992994

File tree

1 file changed

+12
-10
lines changed

1 file changed

+12
-10
lines changed

src/Insurello.RabbitMqClient/RabbitMqClient.fs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -440,15 +440,15 @@ module RPC =
440440
type private CorrelationId = string
441441

442442
type private PendingRequests =
443-
ConcurrentDictionary<CorrelationId, TaskCompletionSource<Result<BasicDeliverEventArgs, string>>>
443+
ConcurrentDictionary<CorrelationId, TaskCompletionSource<Result<IReadOnlyBasicProperties * byte[], string>>>
444444

445445
[<Literal>]
446446
let private queueDirectReplyTo = "amq.rabbitmq.reply-to"
447447

448448
let private requestRawAsync<'response>
449449
(pendingRequests: PendingRequests)
450450
(consumer: AsyncEventingBasicConsumer)
451-
(mapResponse: ResponseHeaders -> System.ReadOnlyMemory<byte> -> 'response)
451+
(mapResponse: IReadOnlyBasicProperties -> byte[] -> 'response)
452452
: RequestMessage -> Async<Result<'response, string>> =
453453
fun message ->
454454
task {
@@ -508,7 +508,7 @@ module RPC =
508508

509509
match! completionSource.Task with
510510
| Error error -> return Error error
511-
| Ok response -> return Ok (mapResponse response.BasicProperties.Headers response.Body)
511+
| Ok (basicProperties, body) -> return Ok (mapResponse basicProperties body)
512512

513513
else
514514
return Error $"RabbitMqClient.RPC: Duplicate message id %s{messageId}"
@@ -554,7 +554,9 @@ module RPC =
554554

555555
match pendingRequests.TryRemove correlationId with
556556
| true, tcs ->
557-
if not (tcs.TrySetResult (Ok eventArgs)) then
557+
let result = Ok (eventArgs.BasicProperties, eventArgs.Body.ToArray ())
558+
559+
if not (tcs.TrySetResult result) then
558560
logger.LogWarning (
559561
"Consumer {clientName} received reply but unable to set task completion source with correlation id {correlationId}",
560562
clientName,
@@ -630,18 +632,18 @@ module RPC =
630632
requestRawAsync
631633
pendingRequests
632634
consumer
633-
(fun headers body -> {
634-
headers = headers
635-
body = body.ToArray ()
635+
(fun basicProperties body -> {
636+
headers = basicProperties.Headers
637+
body = body
636638
})
637639

638640
requestAsync =
639641
requestRawAsync
640642
pendingRequests
641643
consumer
642-
(fun headers body -> {
643-
headers = headers
644-
body = System.Text.Encoding.UTF8.GetString body.Span
644+
(fun basicProperties body -> {
645+
headers = basicProperties.Headers
646+
body = System.Text.Encoding.UTF8.GetString body
645647
})
646648
}
647649

0 commit comments

Comments
 (0)