Skip to content

Commit 06feb1b

Browse files
authored
Fix multi-tread creation session (#139)
* Fix multi-tread creation session When the session is created in multi-thread way. The fix adds _semaphoreSlim. closes: #138 Signed-off-by: Gabriele Santomaggio <[email protected]> * Add tests Signed-off-by: Gabriele Santomaggio <[email protected]> --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent c2a9a70 commit 06feb1b

File tree

2 files changed

+110
-18
lines changed

2 files changed

+110
-18
lines changed

RabbitMQ.AMQP.Client/Impl/AmqpSessionManagement.cs

Lines changed: 55 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System;
66
using System.Collections.Concurrent;
77
using System.Linq;
8+
using System.Threading;
89
using System.Threading.Tasks;
910
using Amqp;
1011
using Amqp.Framing;
@@ -15,8 +16,12 @@ internal class AmqpSessionManagement
1516
{
1617
private readonly AmqpConnection _amqpConnection;
1718
private readonly int _maxSessionsPerItem;
19+
1820
private readonly ConcurrentBag<Session> _sessions = new();
1921

22+
// lock during session creation
23+
private readonly SemaphoreSlim _semaphoreSlim = new(1);
24+
2025
internal AmqpSessionManagement(AmqpConnection amqpConnection, int maxSessionsPerItem)
2126
{
2227
_amqpConnection = amqpConnection;
@@ -26,33 +31,65 @@ internal AmqpSessionManagement(AmqpConnection amqpConnection, int maxSessionsPer
2631
// TODO cancellation token
2732
internal async Task<Session> GetOrCreateSessionAsync()
2833
{
29-
Session rv;
30-
31-
if (_sessions.Count >= _maxSessionsPerItem)
34+
await _semaphoreSlim.WaitAsync(TimeSpan.FromSeconds(1)).ConfigureAwait(false);
35+
try
3236
{
33-
rv = _sessions.First();
34-
}
35-
else
36-
{
37-
TaskCompletionSource<ISession> sessionBeginTcs = Utils.CreateTaskCompletionSource<ISession>();
38-
void OnBegin(ISession session, Begin peerBegin)
37+
Session rv;
38+
39+
if (_sessions.Count >= _maxSessionsPerItem)
3940
{
40-
sessionBeginTcs.SetResult(session);
41+
rv = _sessions.First();
4142
}
43+
else
44+
{
45+
TaskCompletionSource<ISession> sessionBeginTcs = Utils.CreateTaskCompletionSource<ISession>();
4246

43-
rv = new Session(_amqpConnection.NativeConnection, GetDefaultBegin(), OnBegin);
44-
// TODO cancellation token
45-
ISession awaitedSession = await sessionBeginTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
46-
_sessions.Add(rv);
47-
}
47+
void OnBegin(ISession session, Begin peerBegin)
48+
{
49+
sessionBeginTcs.SetResult(session);
50+
}
51+
52+
rv = new Session(_amqpConnection.NativeConnection, GetDefaultBegin(), OnBegin);
53+
// TODO cancellation token
54+
ISession awaitedSession =
55+
await sessionBeginTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
56+
_sessions.Add(rv);
57+
}
4858

49-
return rv;
59+
return rv;
60+
}
61+
finally
62+
{
63+
_semaphoreSlim.Release();
64+
}
5065
}
5166

5267
internal void ClearSessions()
5368
{
54-
// TODO close open sessions?
55-
_sessions.Clear();
69+
_semaphoreSlim.Wait();
70+
try
71+
{
72+
// TODO close open sessions?
73+
_sessions.Clear();
74+
}
75+
finally
76+
{
77+
_semaphoreSlim.Release();
78+
}
79+
}
80+
81+
// sessions count
82+
internal int GetSessionsCount()
83+
{
84+
_semaphoreSlim.Wait();
85+
try
86+
{
87+
return _sessions.Count;
88+
}
89+
finally
90+
{
91+
_semaphoreSlim.Release();
92+
}
5693
}
5794

5895
// Note: these values come from Amqp.NET

Tests/Sessions/SessionsTests.cs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// This source code is dual-licensed under the Apache License, version 2.0,
2+
// and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
4+
5+
using System.Collections.Generic;
6+
using System.Linq;
7+
using System.Threading.Tasks;
8+
using RabbitMQ.AMQP.Client.Impl;
9+
using Xunit;
10+
using Xunit.Abstractions;
11+
12+
namespace Tests.Sessions
13+
{
14+
public class SessionsTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper)
15+
{
16+
[Theory]
17+
[InlineData(1)]
18+
[InlineData(10)]
19+
[InlineData(37)]
20+
public async Task CreateSessionSequentiallyShouldHaveTheMaxSessionsPerItem(int maxSessions)
21+
{
22+
Assert.NotNull(_connection);
23+
AmqpConnection? amqpConnection = _connection as AmqpConnection;
24+
Assert.NotNull(amqpConnection);
25+
AmqpSessionManagement s = new(amqpConnection, maxSessions);
26+
for (int i = 0; i < 100; i++)
27+
{
28+
await s.GetOrCreateSessionAsync();
29+
}
30+
31+
Assert.Equal(maxSessions, s.GetSessionsCount());
32+
}
33+
34+
[Theory]
35+
[InlineData(1)]
36+
[InlineData(10)]
37+
[InlineData(37)]
38+
public async Task CreateSessionInMultiThreadingShouldHaveTheMaxSessionsPerItem(int maxSessions)
39+
{
40+
Assert.NotNull(_connection);
41+
AmqpConnection? amqpConnection = _connection as AmqpConnection;
42+
Assert.NotNull(amqpConnection);
43+
AmqpSessionManagement s = new(amqpConnection, maxSessions);
44+
Task[] tasks = new Task[100];
45+
for (int i = 0; i < 100; i++)
46+
{
47+
tasks[i] = (Task.Run(async () => { await s.GetOrCreateSessionAsync(); }));
48+
}
49+
50+
await Task.WhenAll(tasks);
51+
52+
Assert.Equal(maxSessions, s.GetSessionsCount());
53+
}
54+
}
55+
}

0 commit comments

Comments
 (0)