Skip to content

Commit bcdee01

Browse files
authored
Merge pull request #88 from rabbitmq/rabbitmq-amqp-dotnet-client-87
Enable verbose logging of when a test starts and stops
2 parents 88dc07a + 472f7ef commit bcdee01

15 files changed

+238
-126
lines changed

.github/workflows/wf_build-and-test.yaml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,16 @@ jobs:
3535
run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1
3636
- name: Test
3737
timeout-minutes: 25
38+
# --diag ${{ github.workspace}}\diag
3839
run: dotnet test ${{ github.workspace }}\Build.csproj --no-restore --no-build --logger 'console;verbosity=detailed'
3940
- name: Check for errors in RabbitMQ logs
4041
run: ${{ github.workspace }}\.ci\windows\gha-log-check.ps1
42+
# - name: Upload dotnet test diag logs (on failure)
43+
# if: failure()
44+
# uses: actions/upload-artifact@v4
45+
# with:
46+
# name: dotnet-test-diag-win32
47+
# path: ${{ github.workspace }}/diag/
4148
- name: Maybe upload RabbitMQ logs
4249
if: failure()
4350
uses: actions/upload-artifact@v4
@@ -57,12 +64,19 @@ jobs:
5764
run: ${{ github.workspace }}/.ci/ubuntu/cluster/gha-setup.sh
5865
- name: Test
5966
timeout-minutes: 25
67+
# --diag ${{ github.workspace}}/diag
6068
run: dotnet test ${{ github.workspace }}/Build.csproj --no-restore --no-build --logger "console;verbosity=detailed"
6169
- name: Check for errors in RabbitMQ logs
6270
run: ${{ github.workspace}}/.ci/ubuntu/cluster/gha-logs.sh check
6371
- name: Collect RabbitMQ logs (on failure)
6472
if: failure()
6573
run: ${{ github.workspace}}/.ci/ubuntu/cluster/gha-logs.sh
74+
# - name: Upload dotnet test diag logs (on failure)
75+
# if: failure()
76+
# uses: actions/upload-artifact@v4
77+
# with:
78+
# name: dotnet-test-diag-ubuntu
79+
# path: ${{ github.workspace }}/diag/
6680
- name: Upload RabbitMQ logs (on failure)
6781
if: failure()
6882
uses: actions/upload-artifact@v4

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ InternalTrace*
1111
*.lock.json
1212
nunit-agent*
1313
*.pyc
14+
test-diag.log
1415
test-output.log
1516
TestResults.xml
1617
TestResult.xml
@@ -24,6 +25,7 @@ gensrc/
2425
.ionide/
2526
NuGet/
2627
tmp/
28+
diag/
2729
.vscode/
2830

2931
#################

RabbitMQ.AMQP.Client/Impl/AbstractLifeCycle.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ protected void ThrowIfClosed()
4747
case State.Closing:
4848
throw new AmqpNotOpenException("Resource is closing");
4949
case State.Reconnecting:
50-
throw new AmqpNotOpenException("Resource is Reconnecting");
50+
throw new AmqpNotOpenException("Resource is reconnecting");
5151
case State.Open:
5252
break;
5353
default:

RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public override async Task OpenAsync()
5959
Attach attach = Utils.CreateAttach(_configuration.Address, DeliveryMode.AtLeastOnce, _id,
6060
_configuration.Filters);
6161

62-
void onAttached(ILink argLink, Attach argAttach)
62+
void OnAttached(ILink argLink, Attach argAttach)
6363
{
6464
if (argLink is ReceiverLink link)
6565
{
@@ -69,30 +69,25 @@ void onAttached(ILink argLink, Attach argAttach)
6969
{
7070
// TODO create "internal bug" exception type?
7171
var ex = new InvalidOperationException(
72-
"invalid link in onAttached, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
72+
"invalid link in OnAttached, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
7373
attachCompletedTcs.SetException(ex);
7474
}
7575
}
7676

77-
ReceiverLink? tmpReceiverLink = null;
78-
Task receiverLinkTask = Task.Run(async () =>
79-
{
80-
Session session = await _amqpConnection._nativePubSubSessions.GetOrCreateSessionAsync()
81-
.ConfigureAwait(false);
82-
tmpReceiverLink = new ReceiverLink(session, _id.ToString(), attach, onAttached);
83-
});
77+
Session session = await _amqpConnection._nativePubSubSessions.GetOrCreateSessionAsync()
78+
.ConfigureAwait(false);
8479

85-
// TODO configurable timeout
86-
TimeSpan waitSpan = TimeSpan.FromSeconds(5);
80+
var tmpReceiverLink = new ReceiverLink(session, _id.ToString(), attach, OnAttached);
8781

82+
// TODO configurable timeout
83+
var waitSpan = TimeSpan.FromSeconds(5);
8884
_receiverLink = await attachCompletedTcs.Task.WaitAsync(waitSpan)
8985
.ConfigureAwait(false);
9086

91-
await receiverLinkTask.WaitAsync(waitSpan)
92-
.ConfigureAwait(false);
93-
94-
System.Diagnostics.Debug.Assert(tmpReceiverLink != null);
95-
System.Diagnostics.Debug.Assert(object.ReferenceEquals(_receiverLink, tmpReceiverLink));
87+
if (false == Object.ReferenceEquals(_receiverLink, tmpReceiverLink))
88+
{
89+
// TODO log this case?
90+
}
9691

9792
if (_receiverLink is null)
9893
{

RabbitMQ.AMQP.Client/Impl/AmqpEnvironment.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,14 @@ public async Task<IConnection> CreateConnectionAsync(IConnectionSettings connect
4646
return c;
4747
}
4848

49-
public async Task<IConnection> CreateConnectionAsync()
49+
public Task<IConnection> CreateConnectionAsync()
5050
{
51-
if (ConnectionSettings != null)
51+
if (ConnectionSettings is null)
5252
{
53-
return await CreateConnectionAsync(ConnectionSettings).ConfigureAwait(false);
53+
throw new ConnectionException("Connection settings are not set");
5454
}
5555

56-
throw new ConnectionException("Connection settings are not set");
56+
return CreateConnectionAsync(ConnectionSettings);
5757
}
5858

5959
public ReadOnlyCollection<IConnection> GetConnections() =>

RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
using System;
66
using System.Collections.Concurrent;
7-
using System.Diagnostics;
87
using System.Linq;
98
using System.Threading;
109
using System.Threading.Tasks;
@@ -270,24 +269,38 @@ private async Task EnsureReceiverLinkAsync()
270269
new Target() { Address = ManagementNodeAddress, ExpiryPolicy = new Symbol("SESSION_END"), },
271270
};
272271

273-
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
274-
_receiverLink = new ReceiverLink(
272+
var tcs = new TaskCompletionSource<ReceiverLink>(TaskCreationOptions.RunContinuationsAsynchronously);
273+
var tmpReceiverLink = new ReceiverLink(
275274
_managementSession, LinkPairName, receiveAttach, (ILink link, Attach attach) =>
276275
{
277-
Debug.Assert(Object.ReferenceEquals(_receiverLink, link));
278-
tcs.SetResult(true);
276+
if (link is ReceiverLink receiverLink)
277+
{
278+
tcs.SetResult(receiverLink);
279+
}
280+
else
281+
{
282+
// TODO create "internal bug" exception type?
283+
var ex = new InvalidOperationException(
284+
"invalid link in OnAttached, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
285+
tcs.SetException(ex);
286+
}
279287
});
280288

281-
await tcs.Task
289+
_receiverLink = await tcs.Task
282290
.ConfigureAwait(false);
283291

292+
if (false == Object.ReferenceEquals(_receiverLink, tmpReceiverLink))
293+
{
294+
// TODO log this case?
295+
}
296+
284297
// TODO
285298
// using a credit of 1 can result in AmqpExceptions in ProcessResponses
286299
_receiverLink.SetCredit(100);
287300
}
288301
}
289302

290-
private Task EnsureSenderLinkAsync()
303+
private async Task EnsureSenderLinkAsync()
291304
{
292305
if (_senderLink == null || _senderLink.IsClosed)
293306
{
@@ -315,18 +328,30 @@ private Task EnsureSenderLinkAsync()
315328
},
316329
};
317330

318-
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
319-
_senderLink = new SenderLink(
331+
var tcs = new TaskCompletionSource<SenderLink>(TaskCreationOptions.RunContinuationsAsynchronously);
332+
var tmpSenderLink = new SenderLink(
320333
_managementSession, LinkPairName, senderAttach, (ILink link, Attach attach) =>
321334
{
322-
Debug.Assert(Object.ReferenceEquals(_senderLink, link));
323-
tcs.SetResult(true);
335+
if (link is SenderLink senderLink)
336+
{
337+
tcs.SetResult(senderLink);
338+
}
339+
else
340+
{
341+
// TODO create "internal bug" exception type?
342+
var ex = new InvalidOperationException(
343+
"invalid link in OnAttached, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
344+
tcs.SetException(ex);
345+
}
324346
});
325-
return tcs.Task;
326-
}
327-
else
328-
{
329-
return Task.CompletedTask;
347+
348+
_senderLink = await tcs.Task
349+
.ConfigureAwait(false);
350+
351+
if (false == Object.ReferenceEquals(_senderLink, tmpSenderLink))
352+
{
353+
// TODO log this case?
354+
}
330355
}
331356
}
332357

RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public override async Task OpenAsync()
3838

3939
Attach attach = Utils.CreateAttach(_address, DeliveryMode.AtLeastOnce, _id);
4040

41-
void onAttached(ILink argLink, Attach argAttach)
41+
void OnAttached(ILink argLink, Attach argAttach)
4242
{
4343
if (argLink is SenderLink link)
4444
{
@@ -53,25 +53,19 @@ void onAttached(ILink argLink, Attach argAttach)
5353
}
5454
}
5555

56-
SenderLink? tmpSenderLink = null;
57-
Task senderLinkTask = Task.Run(async () =>
58-
{
59-
Session session = await _connection._nativePubSubSessions.GetOrCreateSessionAsync()
60-
.ConfigureAwait(false);
61-
tmpSenderLink = new SenderLink(session, _id.ToString(), attach, onAttached);
62-
});
56+
Session session = await _connection._nativePubSubSessions.GetOrCreateSessionAsync()
57+
.ConfigureAwait(false);
58+
var tmpSenderLink = new SenderLink(session, _id.ToString(), attach, OnAttached);
6359

6460
// TODO configurable timeout
65-
TimeSpan waitSpan = TimeSpan.FromSeconds(5);
66-
61+
var waitSpan = TimeSpan.FromSeconds(5);
6762
_senderLink = await attachCompletedTcs.Task.WaitAsync(waitSpan)
6863
.ConfigureAwait(false);
6964

70-
await senderLinkTask.WaitAsync(waitSpan)
71-
.ConfigureAwait(false);
72-
73-
System.Diagnostics.Debug.Assert(tmpSenderLink != null);
74-
System.Diagnostics.Debug.Assert(Object.ReferenceEquals(_senderLink, tmpSenderLink));
65+
if (false == Object.ReferenceEquals(_senderLink, tmpSenderLink))
66+
{
67+
// TODO log this case?
68+
}
7569

7670
if (_senderLink is null)
7771
{
@@ -125,10 +119,13 @@ public async Task<PublishResult> PublishAsync(IMessage message, CancellationToke
125119

126120
void OutcomeCallback(ILink sender, Message inMessage, Outcome outcome, object state)
127121
{
128-
System.Diagnostics.Debug.Assert(object.ReferenceEquals(this, state));
129-
System.Diagnostics.Debug.Assert(object.ReferenceEquals(_senderLink, sender));
130122
// Note: sometimes `message` is null 🤔
131-
// System.Diagnostics.Debug.Assert(Object.ReferenceEquals(nativeMessage, message));
123+
System.Diagnostics.Debug.Assert(Object.ReferenceEquals(this, state));
124+
125+
if (false == Object.ReferenceEquals(_senderLink, sender))
126+
{
127+
// TODO log this case?
128+
}
132129

133130
PublishOutcome publishOutcome;
134131
switch (outcome)

RabbitMQ.AMQP.Client/Impl/AmqpSessionManagement.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ void OnBegin(ISession session, Begin peerBegin)
4141
}
4242

4343
rv = new Session(_amqpConnection.NativeConnection, GetDefaultBegin(), OnBegin);
44+
// TODO cancellation token
4445
ISession awaitedSession = await sessionBeginTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
45-
System.Diagnostics.Debug.Assert(Object.ReferenceEquals(rv, awaitedSession));
4646
_sessions.Add(rv);
4747
}
4848

0 commit comments

Comments
 (0)