Skip to content
This repository was archived by the owner on Dec 24, 2022. It is now read-only.

Commit 8d38227

Browse files
committed
Drain any buffers before returning pooled client
1 parent 1320326 commit 8d38227

File tree

6 files changed

+62
-23
lines changed

6 files changed

+62
-23
lines changed

src/ServiceStack.Redis/BufferedReader.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ internal BufferedReader(Stream source, int bufferSize)
2828
{
2929
_source = source;
3030
_buffer = new byte[bufferSize];
31+
Reset();
32+
}
33+
34+
internal void Reset()
35+
{
3136
_offset = _available = 0;
3237
}
3338

src/ServiceStack.Redis/PooledRedisClientManager.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ private RedisClient GetClient(bool forAsync)
250250
if (inActiveClient != null)
251251
{
252252
WritePoolIndex++;
253-
inActiveClient.Active = true;
253+
inActiveClient.Activate();
254254

255255
InitClient(inActiveClient);
256256

@@ -400,7 +400,7 @@ private RedisClient GetReadOnlyClient(bool forAsync)
400400
if (inActiveClient != null)
401401
{
402402
ReadPoolIndex++;
403-
inActiveClient.Active = true;
403+
inActiveClient.Activate();
404404

405405
InitClient(inActiveClient);
406406

@@ -468,7 +468,7 @@ private int GetInActiveReadClient(out RedisClient inactiveClient)
468468
{
469469
var desiredIndex = ReadPoolIndex % readClients.Length;
470470
//this will loop through all hosts in readClients once even though there are 2 for loops
471-
//both loops are used to try to get the prefered host according to the round robin algorithm
471+
//both loops are used to try to get the preferred host according to the round robin algorithm
472472
var readOnlyTotal = RedisResolver.ReadOnlyHostsCount;
473473
for (int x = 0; x < readOnlyTotal; x++)
474474
{
@@ -502,7 +502,7 @@ private int GetInActiveReadClient(out RedisClient inactiveClient)
502502
private RedisClient InitNewClient(RedisClient client)
503503
{
504504
client.Id = Interlocked.Increment(ref RedisClientCounter);
505-
client.Active = true;
505+
client.Activate(newClient:true);
506506
client.ClientManager = this;
507507
client.ConnectionFilter = ConnectionFilter;
508508
if (NamespacePrefix != null)
@@ -543,7 +543,7 @@ public void DisposeClient(RedisNativeClient client)
543543
else
544544
{
545545
client.TrackThread = null;
546-
client.Active = false;
546+
client.Deactivate();
547547
}
548548

549549
Monitor.PulseAll(readClients);
@@ -564,7 +564,7 @@ public void DisposeClient(RedisNativeClient client)
564564
else
565565
{
566566
client.TrackThread = null;
567-
client.Active = false;
567+
client.Deactivate();
568568
}
569569

570570
Monitor.PulseAll(writeClients);
@@ -587,7 +587,7 @@ public void DisposeReadOnlyClient(RedisNativeClient client)
587587
{
588588
lock (readClients)
589589
{
590-
client.Active = false;
590+
client.Deactivate();
591591
Monitor.PulseAll(readClients);
592592
}
593593
}
@@ -600,7 +600,7 @@ public void DisposeWriteClient(RedisNativeClient client)
600600
{
601601
lock (writeClients)
602602
{
603-
client.Active = false;
603+
client.Deactivate();
604604
Monitor.PulseAll(writeClients);
605605
}
606606
}

src/ServiceStack.Redis/RedisClient.Async.cs

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ private async ValueTask<T> ExecAsync<T>(Func<IRedisClientAsync, ValueTask<T>> ac
105105
{
106106
using (JsConfig.With(new Text.Config { ExcludeTypeInfo = false }))
107107
{
108-
return await action(this).ConfigureAwait(false);
108+
var ret = await action(this).ConfigureAwait(false);
109+
return ret;
109110
}
110111
}
111112

@@ -120,11 +121,19 @@ ValueTask<string> IRedisClientAsync.GetValueAsync(string key, CancellationToken
120121

121122
Task<T> ICacheClientAsync.GetAsync<T>(string key, CancellationToken token)
122123
{
123-
return ExecAsync(r =>
124-
typeof(T) == typeof(byte[])
125-
? ((IRedisNativeClientAsync)r).GetAsync(key, token).Await(val => (T)(object)val)
126-
: r.GetValueAsync(key, token).Await(val => JsonSerializer.DeserializeFromString<T>(val))
127-
).AsTask();
124+
return ExecAsync(async r => {
125+
if (typeof(T) == typeof(byte[]))
126+
{
127+
var ret = await ((IRedisNativeClientAsync) r).GetAsync(key, token).ConfigureAwait(false);
128+
return (T) (object) ret;
129+
}
130+
else
131+
{
132+
var val = await r.GetValueAsync(key, token).ConfigureAwait(false);
133+
var ret = JsonSerializer.DeserializeFromString<T>(val);
134+
return ret;
135+
}
136+
}).AsTask();
128137
}
129138

130139
async ValueTask<List<string>> IRedisClientAsync.SearchKeysAsync(string pattern, CancellationToken token)
@@ -169,10 +178,10 @@ ValueTask<bool> IRedisClientAsync.AddItemToSortedSetAsync(string setId, string v
169178
=> ((IRedisClientAsync)this).AddItemToSortedSetAsync(setId, value, GetLexicalScore(value), token);
170179

171180
ValueTask<bool> IRedisClientAsync.AddItemToSortedSetAsync(string setId, string value, double score, CancellationToken token)
172-
=> NativeAsync.ZAddAsync(setId, score, value.ToUtf8Bytes()).IsSuccessAsync();
181+
=> NativeAsync.ZAddAsync(setId, score, value.ToUtf8Bytes(), token).IsSuccessAsync();
173182

174183
ValueTask<bool> IRedisClientAsync.SetEntryInHashAsync(string hashId, string key, string value, CancellationToken token)
175-
=> NativeAsync.HSetAsync(hashId, key.ToUtf8Bytes(), value.ToUtf8Bytes()).IsSuccessAsync();
184+
=> NativeAsync.HSetAsync(hashId, key.ToUtf8Bytes(), value.ToUtf8Bytes(), token).IsSuccessAsync();
176185

177186
ValueTask IRedisClientAsync.SetAllAsync(IDictionary<string, string> map, CancellationToken token)
178187
=> GetSetAllBytes(map, out var keyBytes, out var valBytes) ? NativeAsync.MSetAsync(keyBytes, valBytes, token) : default;
@@ -221,7 +230,7 @@ ValueTask<bool> IRedisClientAsync.ExpireEntryAtAsync(string key, DateTime expire
221230
: NativeAsync.ExpireAtAsync(key, ConvertToServerDate(expireAt).ToUnixTime(), token);
222231

223232
Task<TimeSpan?> ICacheClientAsync.GetTimeToLiveAsync(string key, CancellationToken token)
224-
=> NativeAsync.TtlAsync(key, token).Await(ttlSecs => ParseTimeToLiveResult(ttlSecs)).AsTask();
233+
=> NativeAsync.TtlAsync(key, token).Await(ParseTimeToLiveResult).AsTask();
225234

226235
ValueTask<bool> IRedisClientAsync.PingAsync(CancellationToken token)
227236
=> NativeAsync.PingAsync(token);
@@ -365,7 +374,7 @@ ValueTask IRedisClientAsync.SlowlogResetAsync(CancellationToken token)
365374
=> NativeAsync.SlowlogResetAsync(token);
366375

367376
ValueTask<SlowlogItem[]> IRedisClientAsync.GetSlowlogAsync(int? numberOfRecords, CancellationToken token)
368-
=> NativeAsync.SlowlogGetAsync(numberOfRecords, token).Await(data => ParseSlowlog(data));
377+
=> NativeAsync.SlowlogGetAsync(numberOfRecords, token).Await(ParseSlowlog);
369378

370379

371380
Task<bool> ICacheClientAsync.SetAsync<T>(string key, T value, CancellationToken token)

src/ServiceStack.Redis/RedisManagerPool.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ private RedisClient GetClient(bool forAsync)
151151
if (inActiveClient != null)
152152
{
153153
poolIndex++;
154-
inActiveClient.Active = true;
154+
inActiveClient.Activate();
155155

156156
return !AssertAccessOnlyOnSameThread
157157
? inActiveClient
@@ -281,7 +281,7 @@ private int GetInActiveClient(out RedisClient inactiveClient)
281281
private RedisClient InitNewClient(RedisClient client)
282282
{
283283
client.Id = Interlocked.Increment(ref RedisClientCounter);
284-
client.Active = true;
284+
client.Activate(newClient:true);
285285
client.ClientManager = this;
286286
client.ConnectionFilter = ConnectionFilter;
287287

@@ -303,7 +303,7 @@ public void DisposeClient(RedisNativeClient client)
303303
else
304304
{
305305
client.TrackThread = null;
306-
client.Active = false;
306+
client.Deactivate();
307307
}
308308

309309
Monitor.PulseAll(clients);
@@ -320,7 +320,7 @@ public void DisposeWriteClient(RedisNativeClient client)
320320
{
321321
lock (clients)
322322
{
323-
client.Active = false;
323+
client.Deactivate();
324324
}
325325
}
326326

src/ServiceStack.Redis/RedisNativeClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public DateTime? DeactivatedAt
8383
internal bool Active
8484
{
8585
get => Interlocked.CompareExchange(ref active, 0, 0) == YES;
86-
set => Interlocked.Exchange(ref active, value ? YES : NO);
86+
private set => Interlocked.Exchange(ref active, value ? YES : NO);
8787
}
8888

8989
internal IHandleClientDispose ClientManager { get; set; }

src/ServiceStack.Redis/RedisNativeClient_Utils.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -557,6 +557,31 @@ private void SendDirectToSocket(ArraySegment<byte> segment)
557557
}
558558
}
559559

560+
/// <summary>
561+
/// Called before returning pooled client/socket
562+
/// </summary>
563+
internal void Activate(bool newClient=false)
564+
{
565+
if (!newClient)
566+
{
567+
//Drain any existing buffers
568+
ResetSendBuffer();
569+
bufferedReader.Reset();
570+
if (socket?.Available > 0)
571+
{
572+
logDebug($"Draining existing socket of {socket.Available} bytes");
573+
var buff = new byte[socket.Available];
574+
socket.Receive(buff, SocketFlags.None);
575+
}
576+
}
577+
Active = true;
578+
}
579+
580+
internal void Deactivate()
581+
{
582+
Active = false;
583+
}
584+
560585
/// <summary>
561586
/// reset buffer index in send buffer
562587
/// </summary>

0 commit comments

Comments
 (0)