Skip to content

Commit 789008a

Browse files
committed
Update Producer.cs
1 parent 006169a commit 789008a

File tree

1 file changed

+26
-5
lines changed

1 file changed

+26
-5
lines changed

src/dajet-runtime/adapters/rabbitmq/Producer.cs

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
using DaJet.Data;
22
using DaJet.Scripting.Model;
3-
using Npgsql.Replication;
43
using RabbitMQ.Client;
54
using RabbitMQ.Client.Events;
65
using System.Buffers;
@@ -25,7 +24,7 @@ public sealed class Producer : IProcessor
2524
private const string ERROR_CHANNEL_SHUTDOWN = "Channel shutdown: [{0}] {1}";
2625
private const string ERROR_CONNECTION_SHUTDOWN = "Connection shutdown: [{0}] {1}";
2726
private const string ERROR_CONNECTION_IS_BLOCKED = "Connection blocked: {0}";
28-
private const string ERROR_WAIT_FOR_CONFIRMS = "Wait for confirms interrupted";
27+
private const string ERROR_WAIT_FOR_CONFIRMS = "Wait for confirms timed out";
2928
private const string ERROR_PUBLISHER_CONFIRMS = "Publisher confirms nacked";
3029

3130
private const string HEADER_CC = "CC";
@@ -62,6 +61,19 @@ public Producer(in ScriptScope scope)
6261
private string VirtualHost { get; set; } = "/";
6362
private string UserName { get; set; } = "guest";
6463
private string Password { get; set; } = "guest";
64+
private TimeSpan PublisherConfirmsTimeout { get; set; } = TimeSpan.FromSeconds(60);
65+
private TimeSpan GetPublisherConfirmsTimeout()
66+
{
67+
if (StreamFactory.TryGetOption(in _scope, "PublisherConfirmsTimeout", out object value))
68+
{
69+
if (value is int seconds)
70+
{
71+
return TimeSpan.FromSeconds(seconds);
72+
}
73+
}
74+
75+
return TimeSpan.FromSeconds(60);
76+
}
6577
private TimeSpan GetRequestedHeartbeat()
6678
{
6779
if (StreamFactory.TryGetOption(in _scope, "RequestedHeartbeat", out object value))
@@ -343,7 +355,6 @@ private void InitializeUri()
343355
VirtualHost = HttpUtility.UrlDecode(uri.Segments[1].TrimEnd('/'), Encoding.UTF8);
344356
}
345357
}
346-
347358
private void InitializeConnection()
348359
{
349360
IConnectionFactory factory = new ConnectionFactory()
@@ -421,6 +432,8 @@ private void BeginSessionOrThrow()
421432
{
422433
InitializeUri();
423434

435+
PublisherConfirmsTimeout = GetPublisherConfirmsTimeout();
436+
424437
InitializeConnection();
425438

426439
InitializeChannel();
@@ -590,9 +603,13 @@ public void Synchronize()
590603
{
591604
try
592605
{
606+
FileLogger.Default.Write("RabbitMQ Publisher synchronizing ...");
607+
593608
ThrowIfSessionIsBroken(); // STATE_BROKEN
594609

595610
ConfirmSessionOrThrow(); // STATE_ACTIVE
611+
612+
FileLogger.Default.Write("RabbitMQ Publisher synchronized.");
596613
}
597614
catch (NullReferenceException)
598615
{
@@ -611,10 +628,14 @@ private void ConfirmSessionOrThrow()
611628
{
612629
if (Interlocked.CompareExchange(ref _state, STATE_ACTIVE, STATE_ACTIVE) == STATE_ACTIVE)
613630
{
614-
if (_channel.WaitForConfirms())
631+
if (_channel.WaitForConfirms(PublisherConfirmsTimeout, out bool timedout))
615632
{
616633
ThrowIfSessionIsBroken(); // STATE_BROKEN
617634
}
635+
else if (timedout)
636+
{
637+
throw new OperationCanceledException(ERROR_WAIT_FOR_CONFIRMS);
638+
}
618639
else
619640
{
620641
throw new OperationCanceledException(ERROR_PUBLISHER_CONFIRMS);
@@ -625,7 +646,7 @@ private void ConfirmSessionOrThrow()
625646
// STATE_IDLE | STATE_DISPOSING
626647
}
627648
}
628-
649+
629650
public void Dispose()
630651
{
631652
if (Interlocked.CompareExchange(ref _state, STATE_DISPOSING, STATE_DISPOSING) == STATE_DISPOSING)

0 commit comments

Comments
 (0)