Skip to content
This repository was archived by the owner on Dec 24, 2022. It is now read-only.

Commit 171cbd2

Browse files
author
Pete Ness
committed
Switch from AsyncMonitor to AsyncManualResetEvent to support releasing all waiting "locks" better.
1 parent 3811e09 commit 171cbd2

File tree

2 files changed

+49
-53
lines changed

2 files changed

+49
-53
lines changed

src/ServiceStack.Redis/PooledRedisClientManager.cs

Lines changed: 48 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
using System.Linq;
1717
using System.Threading;
1818
using System.Threading.Tasks;
19-
using Cleary.AsyncExtensions;
19+
using Nito.AsyncEx;
2020
using ServiceStack.Caching;
2121
using ServiceStack.Logging;
2222
using ServiceStack.Text;
@@ -36,8 +36,8 @@ public partial class PooledRedisClientManager
3636
private const string PoolTimeoutError =
3737
"Redis Timeout expired. The timeout period elapsed prior to obtaining a connection from the pool. This may have occurred because all pooled connections were in use.";
3838

39-
private AsyncMonitor readMonitor;
40-
private AsyncMonitor writeMonitor;
39+
private AsyncManualResetEvent readAsyncEvent;
40+
private AsyncManualResetEvent writeAsyncEvent;
4141

4242
protected readonly int PoolSizeMultiplier = 20;
4343
public int RecheckPoolAfterMs = 100;
@@ -217,30 +217,44 @@ protected virtual void OnStart()
217217
this.Start();
218218
}
219219

220+
private void pulseAllRead()
221+
{
222+
readAsyncEvent?.Set();
223+
readAsyncEvent?.Reset();
224+
Monitor.PulseAll(readClients);
225+
}
226+
227+
private void pulseAllWrite()
228+
{
229+
writeAsyncEvent?.Set();
230+
writeAsyncEvent?.Reset();
231+
Monitor.PulseAll(writeClients);
232+
}
233+
220234
private async Task<bool> waitForWriter(int msTimeout)
221235
{
222-
if (writeMonitor == null)
223-
writeMonitor = new AsyncMonitor();
224-
var delayTask = Task.Delay(msTimeout);
225-
var result = await Task.WhenAny(writeMonitor.WaitAsync(), delayTask);
226-
if (result == delayTask) return false;
236+
if (writeAsyncEvent == null) // If we're not doing async, no need to create this till we need it.
237+
writeAsyncEvent = new AsyncManualResetEvent(false);
238+
var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(msTimeout));
239+
try
240+
{
241+
await writeAsyncEvent.WaitAsync(cts.Token);
242+
}
243+
catch (OperationCanceledException) { return false; }
227244
return true;
228245
}
229246

230247
/// <summary>
231248
/// Returns a Read/Write client (The default) using the hosts defined in ReadWriteHosts
232249
/// </summary>
233250
/// <returns></returns>
234-
public IRedisClient GetClient() => GetClient(false);
251+
public IRedisClient GetClient() => GetClientBlocking();
235252

236253
private async ValueTask<IRedisClientAsync> GetClientAsync()
237254
{
238255
try
239256
{
240-
var poolTimedOut = false;
241257
var inactivePoolIndex = -1;
242-
243-
var timeoutsTests = 300;
244258
do
245259
{
246260
RedisClient inActiveClient;
@@ -269,33 +283,22 @@ private async ValueTask<IRedisClientAsync> GetClientAsync()
269283
}
270284
}
271285

272-
timeoutsTests--;
273-
if (timeoutsTests <= 0)
286+
if (PoolTimeout.HasValue)
287+
{
288+
// We have a timeout value set - so try to not wait longer than this.
289+
if (!await waitForWriter(PoolTimeout.Value))
290+
{
291+
throw new TimeoutException(PoolTimeoutError);
292+
}
293+
}
294+
else
274295
{
275-
poolTimedOut = true;
276-
break;
296+
// Wait forever, so just retry till we get one.
297+
await waitForWriter(RecheckPoolAfterMs);
277298
}
278-
await Task.Delay(10);
279-
280-
// // Didn't get one, so let's wait a bit for a new one.
281-
// if (PoolTimeout.HasValue)
282-
// {
283-
// if (!await waitForWriter(PoolTimeout.Value))
284-
// {
285-
// poolTimedOut = true;
286-
// break;
287-
// }
288-
// }
289-
// else
290-
// {
291-
// await waitForWriter(RecheckPoolAfterMs);
292-
// }
293-
} while (true); // Just keep repeating until we get a
299+
} while (true); // Just keep repeating until we get a slot.
294300

295-
if (poolTimedOut)
296-
throw new TimeoutException(PoolTimeoutError);
297-
298-
//Reaches here when there's no Valid InActive Clients
301+
//Reaches here when there's no Valid InActive Clients, but we have a slot for one!
299302
try
300303
{
301304
//inactivePoolIndex = index of reservedSlot || index of invalid client
@@ -343,9 +346,8 @@ private async ValueTask<IRedisClientAsync> GetClientAsync()
343346
}
344347
}
345348

346-
private RedisClient GetClient(bool forAsync)
349+
private RedisClient GetClientBlocking()
347350
{
348-
if (forAsync) throw new Exception("Call GetClientAsync instead");
349351
try
350352
{
351353
var poolTimedOut = false;
@@ -380,7 +382,7 @@ private RedisClient GetClient(bool forAsync)
380382

381383
InitClient(inActiveClient);
382384

383-
return (!AssertAccessOnlyOnSameThread || forAsync)
385+
return (!AssertAccessOnlyOnSameThread)
384386
? inActiveClient
385387
: inActiveClient.LimitAccessToThread(Thread.CurrentThread.ManagedThreadId, Environment.StackTrace);
386388
}
@@ -416,7 +418,7 @@ private RedisClient GetClient(bool forAsync)
416418
WritePoolIndex++;
417419
writeClients[inactivePoolIndex] = newClient;
418420

419-
return (!AssertAccessOnlyOnSameThread || forAsync)
421+
return (!AssertAccessOnlyOnSameThread)
420422
? newClient
421423
: newClient.LimitAccessToThread(Thread.CurrentThread.ManagedThreadId, Environment.StackTrace);
422424
}
@@ -672,8 +674,7 @@ public void DisposeClient(RedisNativeClient client)
672674
client.Deactivate();
673675
}
674676

675-
Monitor.PulseAll(readClients);
676-
readMonitor?.PulseAll();
677+
pulseAllRead();
677678
return;
678679
}
679680
}
@@ -694,23 +695,20 @@ public void DisposeClient(RedisNativeClient client)
694695
client.Deactivate();
695696
}
696697

697-
Monitor.PulseAll(writeClients);
698-
writeMonitor?.PulseAll();
698+
pulseAllWrite();
699699
return;
700700
}
701701
}
702702

703703
//Client not found in any pool, pulse both pools.
704704
lock (readClients)
705705
{
706-
Monitor.PulseAll(readClients);
707-
readMonitor?.PulseAll();
706+
pulseAllRead();
708707
}
709708

710709
lock (writeClients)
711710
{
712-
Monitor.PulseAll(writeClients);
713-
writeMonitor?.PulseAll();
711+
pulseAllWrite();
714712
}
715713
}
716714

@@ -723,8 +721,7 @@ public void DisposeReadOnlyClient(RedisNativeClient client)
723721
lock (readClients)
724722
{
725723
client.Deactivate();
726-
Monitor.PulseAll(readClients);
727-
readMonitor?.PulseAll();
724+
pulseAllRead();
728725
}
729726
}
730727

@@ -737,8 +734,7 @@ public void DisposeWriteClient(RedisNativeClient client)
737734
lock (writeClients)
738735
{
739736
client.Deactivate();
740-
Monitor.PulseAll(writeClients);
741-
writeMonitor?.PulseAll();
737+
pulseAllWrite();
742738
}
743739
}
744740

src/ServiceStack.Redis/ServiceStack.Redis.Core.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
<PackageTags>Redis;NoSQL;Client;Distributed;Cache;PubSub;Messaging;Transactions</PackageTags>
1212
</PropertyGroup>
1313
<ItemGroup>
14-
<PackageReference Include="Cleary.AsyncExtensions" Version="4.9.3" />
14+
<PackageReference Include="Nito.AsyncEx" Version="5.0.0" />
1515
<PackageReference Include="ServiceStack.Common.Core" Version="$(Version)" />
1616
</ItemGroup>
1717
<ItemGroup Condition=" '$(TargetFramework)' == 'netstandard2.0' ">

0 commit comments

Comments
 (0)