Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .github/workflows/wf_build-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,16 @@ jobs:
run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1
- name: Test
timeout-minutes: 25
# --diag ${{ github.workspace}}\diag
run: dotnet test ${{ github.workspace }}\Build.csproj --no-restore --no-build --logger 'console;verbosity=detailed'
- name: Check for errors in RabbitMQ logs
run: ${{ github.workspace }}\.ci\windows\gha-log-check.ps1
# - name: Upload dotnet test diag logs (on failure)
# if: failure()
# uses: actions/upload-artifact@v4
# with:
# name: dotnet-test-diag-win32
# path: ${{ github.workspace }}/diag/
- name: Maybe upload RabbitMQ logs
if: failure()
uses: actions/upload-artifact@v4
Expand All @@ -57,12 +64,19 @@ jobs:
run: ${{ github.workspace }}/.ci/ubuntu/cluster/gha-setup.sh
- name: Test
timeout-minutes: 25
# --diag ${{ github.workspace}}/diag
run: dotnet test ${{ github.workspace }}/Build.csproj --no-restore --no-build --logger "console;verbosity=detailed"
- name: Check for errors in RabbitMQ logs
run: ${{ github.workspace}}/.ci/ubuntu/cluster/gha-logs.sh check
- name: Collect RabbitMQ logs (on failure)
if: failure()
run: ${{ github.workspace}}/.ci/ubuntu/cluster/gha-logs.sh
# - name: Upload dotnet test diag logs (on failure)
# if: failure()
# uses: actions/upload-artifact@v4
# with:
# name: dotnet-test-diag-ubuntu
# path: ${{ github.workspace }}/diag/
- name: Upload RabbitMQ logs (on failure)
if: failure()
uses: actions/upload-artifact@v4
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ InternalTrace*
*.lock.json
nunit-agent*
*.pyc
test-diag.log
test-output.log
TestResults.xml
TestResult.xml
Expand All @@ -24,6 +25,7 @@ gensrc/
.ionide/
NuGet/
tmp/
diag/
.vscode/

#################
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.AMQP.Client/Impl/AbstractLifeCycle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ protected void ThrowIfClosed()
case State.Closing:
throw new AmqpNotOpenException("Resource is closing");
case State.Reconnecting:
throw new AmqpNotOpenException("Resource is Reconnecting");
throw new AmqpNotOpenException("Resource is reconnecting");
case State.Open:
break;
default:
Expand Down
27 changes: 11 additions & 16 deletions RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public override async Task OpenAsync()
Attach attach = Utils.CreateAttach(_configuration.Address, DeliveryMode.AtLeastOnce, _id,
_configuration.Filters);

void onAttached(ILink argLink, Attach argAttach)
void OnAttached(ILink argLink, Attach argAttach)
{
if (argLink is ReceiverLink link)
{
Expand All @@ -69,30 +69,25 @@ void onAttached(ILink argLink, Attach argAttach)
{
// TODO create "internal bug" exception type?
var ex = new InvalidOperationException(
"invalid link in onAttached, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
"invalid link in OnAttached, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
attachCompletedTcs.SetException(ex);
}
}

ReceiverLink? tmpReceiverLink = null;
Task receiverLinkTask = Task.Run(async () =>
{
Session session = await _amqpConnection._nativePubSubSessions.GetOrCreateSessionAsync()
.ConfigureAwait(false);
tmpReceiverLink = new ReceiverLink(session, _id.ToString(), attach, onAttached);
});
Session session = await _amqpConnection._nativePubSubSessions.GetOrCreateSessionAsync()
.ConfigureAwait(false);

// TODO configurable timeout
TimeSpan waitSpan = TimeSpan.FromSeconds(5);
var tmpReceiverLink = new ReceiverLink(session, _id.ToString(), attach, OnAttached);

// TODO configurable timeout
var waitSpan = TimeSpan.FromSeconds(5);
_receiverLink = await attachCompletedTcs.Task.WaitAsync(waitSpan)
.ConfigureAwait(false);

await receiverLinkTask.WaitAsync(waitSpan)
.ConfigureAwait(false);

System.Diagnostics.Debug.Assert(tmpReceiverLink != null);
System.Diagnostics.Debug.Assert(object.ReferenceEquals(_receiverLink, tmpReceiverLink));
if (false == Object.ReferenceEquals(_receiverLink, tmpReceiverLink))
{
// TODO log this case?
}

if (_receiverLink is null)
{
Expand Down
8 changes: 4 additions & 4 deletions RabbitMQ.AMQP.Client/Impl/AmqpEnvironment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ public async Task<IConnection> CreateConnectionAsync(IConnectionSettings connect
return c;
}

public async Task<IConnection> CreateConnectionAsync()
public Task<IConnection> CreateConnectionAsync()
{
if (ConnectionSettings != null)
if (ConnectionSettings is null)
{
return await CreateConnectionAsync(ConnectionSettings).ConfigureAwait(false);
throw new ConnectionException("Connection settings are not set");
}

throw new ConnectionException("Connection settings are not set");
return CreateConnectionAsync(ConnectionSettings);
}

public ReadOnlyCollection<IConnection> GetConnections() =>
Expand Down
57 changes: 41 additions & 16 deletions RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -270,24 +269,38 @@ private async Task EnsureReceiverLinkAsync()
new Target() { Address = ManagementNodeAddress, ExpiryPolicy = new Symbol("SESSION_END"), },
};

var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
_receiverLink = new ReceiverLink(
var tcs = new TaskCompletionSource<ReceiverLink>(TaskCreationOptions.RunContinuationsAsynchronously);
var tmpReceiverLink = new ReceiverLink(
_managementSession, LinkPairName, receiveAttach, (ILink link, Attach attach) =>
{
Debug.Assert(Object.ReferenceEquals(_receiverLink, link));
tcs.SetResult(true);
if (link is ReceiverLink receiverLink)
{
tcs.SetResult(receiverLink);
}
else
{
// TODO create "internal bug" exception type?
var ex = new InvalidOperationException(
"invalid link in OnAttached, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
tcs.SetException(ex);
}
});

await tcs.Task
_receiverLink = await tcs.Task
.ConfigureAwait(false);

if (false == Object.ReferenceEquals(_receiverLink, tmpReceiverLink))
{
// TODO log this case?
}

// TODO
// using a credit of 1 can result in AmqpExceptions in ProcessResponses
_receiverLink.SetCredit(100);
}
}

private Task EnsureSenderLinkAsync()
private async Task EnsureSenderLinkAsync()
{
if (_senderLink == null || _senderLink.IsClosed)
{
Expand Down Expand Up @@ -315,18 +328,30 @@ private Task EnsureSenderLinkAsync()
},
};

var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
_senderLink = new SenderLink(
var tcs = new TaskCompletionSource<SenderLink>(TaskCreationOptions.RunContinuationsAsynchronously);
var tmpSenderLink = new SenderLink(
_managementSession, LinkPairName, senderAttach, (ILink link, Attach attach) =>
{
Debug.Assert(Object.ReferenceEquals(_senderLink, link));
tcs.SetResult(true);
if (link is SenderLink senderLink)
{
tcs.SetResult(senderLink);
}
else
{
// TODO create "internal bug" exception type?
var ex = new InvalidOperationException(
"invalid link in OnAttached, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
tcs.SetException(ex);
}
});
return tcs.Task;
}
else
{
return Task.CompletedTask;

_senderLink = await tcs.Task
.ConfigureAwait(false);

if (false == Object.ReferenceEquals(_senderLink, tmpSenderLink))
{
// TODO log this case?
}
}
}

Expand Down
33 changes: 15 additions & 18 deletions RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public override async Task OpenAsync()

Attach attach = Utils.CreateAttach(_address, DeliveryMode.AtLeastOnce, _id);

void onAttached(ILink argLink, Attach argAttach)
void OnAttached(ILink argLink, Attach argAttach)
{
if (argLink is SenderLink link)
{
Expand All @@ -53,25 +53,19 @@ void onAttached(ILink argLink, Attach argAttach)
}
}

SenderLink? tmpSenderLink = null;
Task senderLinkTask = Task.Run(async () =>
{
Session session = await _connection._nativePubSubSessions.GetOrCreateSessionAsync()
.ConfigureAwait(false);
tmpSenderLink = new SenderLink(session, _id.ToString(), attach, onAttached);
});
Session session = await _connection._nativePubSubSessions.GetOrCreateSessionAsync()
.ConfigureAwait(false);
var tmpSenderLink = new SenderLink(session, _id.ToString(), attach, OnAttached);

// TODO configurable timeout
TimeSpan waitSpan = TimeSpan.FromSeconds(5);

var waitSpan = TimeSpan.FromSeconds(5);
_senderLink = await attachCompletedTcs.Task.WaitAsync(waitSpan)
.ConfigureAwait(false);

await senderLinkTask.WaitAsync(waitSpan)
.ConfigureAwait(false);

System.Diagnostics.Debug.Assert(tmpSenderLink != null);
System.Diagnostics.Debug.Assert(Object.ReferenceEquals(_senderLink, tmpSenderLink));
if (false == Object.ReferenceEquals(_senderLink, tmpSenderLink))
{
// TODO log this case?
}

if (_senderLink is null)
{
Expand Down Expand Up @@ -125,10 +119,13 @@ public async Task<PublishResult> PublishAsync(IMessage message, CancellationToke

void OutcomeCallback(ILink sender, Message inMessage, Outcome outcome, object state)
{
System.Diagnostics.Debug.Assert(object.ReferenceEquals(this, state));
System.Diagnostics.Debug.Assert(object.ReferenceEquals(_senderLink, sender));
// Note: sometimes `message` is null 🤔
// System.Diagnostics.Debug.Assert(Object.ReferenceEquals(nativeMessage, message));
System.Diagnostics.Debug.Assert(Object.ReferenceEquals(this, state));

if (false == Object.ReferenceEquals(_senderLink, sender))
{
// TODO log this case?
}

PublishOutcome publishOutcome;
switch (outcome)
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.AMQP.Client/Impl/AmqpSessionManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ void OnBegin(ISession session, Begin peerBegin)
}

rv = new Session(_amqpConnection.NativeConnection, GetDefaultBegin(), OnBegin);
// TODO cancellation token
ISession awaitedSession = await sessionBeginTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
System.Diagnostics.Debug.Assert(Object.ReferenceEquals(rv, awaitedSession));
_sessions.Add(rv);
}

Expand Down
Loading