Skip to content
This repository was archived by the owner on Dec 24, 2022. It is now read-only.

Commit b511433

Browse files
authored
Merge pull request #254 from mgravell/async-squashed
Implement async support for ServiceStack.Redis
2 parents ec93f90 + 910fde8 commit b511433

File tree

152 files changed

+19281
-525
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

152 files changed

+19281
-525
lines changed

src/ServiceStack.Redis.sln

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11

22
Microsoft Visual Studio Solution File, Format Version 12.00
3-
# Visual Studio 15
4-
VisualStudioVersion = 15.0.27130.2010
3+
# Visual Studio Version 16
4+
VisualStudioVersion = 16.0.29721.120
55
MinimumVisualStudioVersion = 10.0.40219.1
66
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Build", "Build", "{38F69F8F-9303-4BAF-B081-D28339163E07}"
77
ProjectSection(SolutionItems) = preProject
8+
..\build\build-core.proj = ..\build\build-core.proj
89
..\build\build.bat = ..\build\build.bat
910
..\build\build.proj = ..\build\build.proj
1011
..\build\build.tasks = ..\build\build.tasks
11-
..\README.md = ..\README.md
1212
Directory.Build.props = Directory.Build.props
13-
ServiceStack.Redis\ServiceStack.Redis.Core.csproj = ServiceStack.Redis\ServiceStack.Redis.Core.csproj
14-
..\build\build-core.proj = ..\build\build-core.proj
1513
..\tests\Directory.Build.props = ..\tests\Directory.Build.props
14+
..\README.md = ..\README.md
15+
ServiceStack.Redis\ServiceStack.Redis.Core.csproj = ServiceStack.Redis\ServiceStack.Redis.Core.csproj
1616
EndProjectSection
1717
EndProject
1818
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ServiceStack.Redis", "ServiceStack.Redis\ServiceStack.Redis.csproj", "{AF99F19B-4C04-4F58-81EF-B092F1FCC540}"
@@ -23,6 +23,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Console.Tests", "..\tests\C
2323
EndProject
2424
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ServiceStack.Redis.Tests.Sentinel", "..\tests\ServiceStack.Redis.Tests.Sentinel\ServiceStack.Redis.Tests.Sentinel.csproj", "{91C55091-A946-49B5-9517-8794EBCC5784}"
2525
EndProject
26+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ServiceStack.Redis.Benchmark", "..\tests\ServiceStack.Redis.Benchmark\ServiceStack.Redis.Benchmark.csproj", "{959CA5FE-6525-4EEF-86CA-F4978BEFF14F}"
27+
EndProject
2628
Global
2729
GlobalSection(SolutionConfigurationPlatforms) = preSolution
2830
Debug|Any CPU = Debug|Any CPU
@@ -77,6 +79,18 @@ Global
7779
{91C55091-A946-49B5-9517-8794EBCC5784}.Release|x64.ActiveCfg = Release|Any CPU
7880
{91C55091-A946-49B5-9517-8794EBCC5784}.Release|x64.Build.0 = Release|Any CPU
7981
{91C55091-A946-49B5-9517-8794EBCC5784}.Release|x86.ActiveCfg = Release|Any CPU
82+
{959CA5FE-6525-4EEF-86CA-F4978BEFF14F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
83+
{959CA5FE-6525-4EEF-86CA-F4978BEFF14F}.Debug|Any CPU.Build.0 = Debug|Any CPU
84+
{959CA5FE-6525-4EEF-86CA-F4978BEFF14F}.Debug|x64.ActiveCfg = Debug|Any CPU
85+
{959CA5FE-6525-4EEF-86CA-F4978BEFF14F}.Debug|x64.Build.0 = Debug|Any CPU
86+
{959CA5FE-6525-4EEF-86CA-F4978BEFF14F}.Debug|x86.ActiveCfg = Debug|Any CPU
87+
{959CA5FE-6525-4EEF-86CA-F4978BEFF14F}.Debug|x86.Build.0 = Debug|Any CPU
88+
{959CA5FE-6525-4EEF-86CA-F4978BEFF14F}.Release|Any CPU.ActiveCfg = Release|Any CPU
89+
{959CA5FE-6525-4EEF-86CA-F4978BEFF14F}.Release|Any CPU.Build.0 = Release|Any CPU
90+
{959CA5FE-6525-4EEF-86CA-F4978BEFF14F}.Release|x64.ActiveCfg = Release|Any CPU
91+
{959CA5FE-6525-4EEF-86CA-F4978BEFF14F}.Release|x64.Build.0 = Release|Any CPU
92+
{959CA5FE-6525-4EEF-86CA-F4978BEFF14F}.Release|x86.ActiveCfg = Release|Any CPU
93+
{959CA5FE-6525-4EEF-86CA-F4978BEFF14F}.Release|x86.Build.0 = Release|Any CPU
8094
EndGlobalSection
8195
GlobalSection(SolutionProperties) = preSolution
8296
HideSolutionNode = FALSE
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
//
2+
// https://github.com/ServiceStack/ServiceStack.Redis
3+
// ServiceStack.Redis: ECMA CLI Binding to the Redis key-value storage system
4+
//
5+
// Authors:
6+
// Demis Bellot ([email protected])
7+
//
8+
// Copyright 2013 Service Stack LLC. All Rights Reserved.
9+
//
10+
// Licensed under the same terms of ServiceStack.
11+
//
12+
13+
using ServiceStack.Caching;
14+
using ServiceStack.Redis.Internal;
15+
using System;
16+
using System.Collections.Generic;
17+
using System.Runtime.CompilerServices;
18+
using System.Threading;
19+
using System.Threading.Tasks;
20+
21+
namespace ServiceStack.Redis
22+
{
23+
/// <summary>
24+
/// Provides thread-safe retrieval of redis clients since each client is a new one.
25+
/// Allows the configuration of different ReadWrite and ReadOnly hosts
26+
/// </summary>
27+
public partial class BasicRedisClientManager
28+
: IRedisClientsManagerAsync, ICacheClientAsync
29+
{
30+
private ValueTask<ICacheClientAsync> GetCacheClientAsync(in CancellationToken _)
31+
=> new RedisClientManagerCacheClient(this).AsValueTaskResult<ICacheClientAsync>();
32+
33+
private ValueTask<ICacheClientAsync> GetReadOnlyCacheClientAsync(in CancellationToken _)
34+
=> ConfigureRedisClientAsync(this.GetReadOnlyClientImpl()).AsValueTaskResult<ICacheClientAsync>();
35+
36+
private IRedisClientAsync ConfigureRedisClientAsync(IRedisClientAsync client)
37+
=> client;
38+
39+
ValueTask<ICacheClientAsync> IRedisClientsManagerAsync.GetCacheClientAsync(CancellationToken token)
40+
=> GetCacheClientAsync(token);
41+
42+
ValueTask<IRedisClientAsync> IRedisClientsManagerAsync.GetClientAsync(CancellationToken token)
43+
=> GetClientImpl().AsValueTaskResult<IRedisClientAsync>();
44+
45+
ValueTask<ICacheClientAsync> IRedisClientsManagerAsync.GetReadOnlyCacheClientAsync(CancellationToken token)
46+
=> GetReadOnlyCacheClientAsync(token);
47+
48+
ValueTask<IRedisClientAsync> IRedisClientsManagerAsync.GetReadOnlyClientAsync(CancellationToken token)
49+
=> GetReadOnlyClientImpl().AsValueTaskResult<IRedisClientAsync>();
50+
51+
ValueTask IAsyncDisposable.DisposeAsync()
52+
{
53+
Dispose();
54+
return default;
55+
}
56+
57+
async Task<T> ICacheClientAsync.GetAsync<T>(string key, CancellationToken token)
58+
{
59+
await using var client = await GetReadOnlyCacheClientAsync(token).ConfigureAwait(false);
60+
return await client.GetAsync<T>(key).ConfigureAwait(false);
61+
}
62+
63+
async Task<bool> ICacheClientAsync.SetAsync<T>(string key, T value, CancellationToken token)
64+
{
65+
await using var client = await GetCacheClientAsync(token).ConfigureAwait(false);
66+
return await client.SetAsync<T>(key, value, token).ConfigureAwait(false);
67+
}
68+
69+
async Task<bool> ICacheClientAsync.SetAsync<T>(string key, T value, DateTime expiresAt, CancellationToken token)
70+
{
71+
await using var client = await GetCacheClientAsync(token).ConfigureAwait(false);
72+
return await client.SetAsync<T>(key, value, expiresAt, token).ConfigureAwait(false);
73+
}
74+
75+
async Task<bool> ICacheClientAsync.SetAsync<T>(string key, T value, TimeSpan expiresIn, CancellationToken token)
76+
{
77+
await using var client = await GetCacheClientAsync(token).ConfigureAwait(false);
78+
return await client.SetAsync<T>(key, value, expiresIn, token).ConfigureAwait(false);
79+
}
80+
81+
async Task ICacheClientAsync.FlushAllAsync(CancellationToken token)
82+
{
83+
await using var client = await GetCacheClientAsync(token).ConfigureAwait(false);
84+
await client.FlushAllAsync(token).ConfigureAwait(false);
85+
}
86+
87+
async Task<IDictionary<string, T>> ICacheClientAsync.GetAllAsync<T>(IEnumerable<string> keys, CancellationToken token)
88+
{
89+
await using var client = await GetReadOnlyCacheClientAsync(token).ConfigureAwait(false);
90+
return await client.GetAllAsync<T>(keys, token).ConfigureAwait(false);
91+
}
92+
93+
async Task ICacheClientAsync.SetAllAsync<T>(IDictionary<string, T> values, CancellationToken token)
94+
{
95+
await using var client = await GetCacheClientAsync(token).ConfigureAwait(false);
96+
await client.SetAllAsync<T>(values, token).ConfigureAwait(false);
97+
}
98+
99+
async Task<bool> ICacheClientAsync.RemoveAsync(string key, CancellationToken token)
100+
{
101+
await using var client = await GetCacheClientAsync(token).ConfigureAwait(false);
102+
return await client.RemoveAsync(key, token).ConfigureAwait(false);
103+
}
104+
105+
async Task ICacheClientAsync.RemoveAllAsync(IEnumerable<string> keys, CancellationToken token)
106+
{
107+
await using var client = await GetCacheClientAsync(token).ConfigureAwait(false);
108+
await client.RemoveAllAsync(keys, token).ConfigureAwait(false);
109+
}
110+
111+
async Task<long> ICacheClientAsync.IncrementAsync(string key, uint amount, CancellationToken token)
112+
{
113+
await using var client = await GetCacheClientAsync(token).ConfigureAwait(false);
114+
return await client.IncrementAsync(key, amount, token).ConfigureAwait(false);
115+
}
116+
117+
async Task<long> ICacheClientAsync.DecrementAsync(string key, uint amount, CancellationToken token)
118+
{
119+
await using var client = await GetCacheClientAsync(token).ConfigureAwait(false);
120+
return await client.DecrementAsync(key, amount, token).ConfigureAwait(false);
121+
}
122+
123+
async Task<bool> ICacheClientAsync.AddAsync<T>(string key, T value, CancellationToken token)
124+
{
125+
await using var client = await GetCacheClientAsync(token).ConfigureAwait(false);
126+
return await client.AddAsync<T>(key, value, token).ConfigureAwait(false);
127+
}
128+
129+
async Task<bool> ICacheClientAsync.ReplaceAsync<T>(string key, T value, CancellationToken token)
130+
{
131+
await using var client = await GetCacheClientAsync(token).ConfigureAwait(false);
132+
return await client.ReplaceAsync<T>(key, value, token).ConfigureAwait(false);
133+
}
134+
135+
async Task<bool> ICacheClientAsync.AddAsync<T>(string key, T value, DateTime expiresAt, CancellationToken token)
136+
{
137+
await using var client = await GetCacheClientAsync(token).ConfigureAwait(false);
138+
return await client.AddAsync<T>(key, value, expiresAt, token).ConfigureAwait(false);
139+
}
140+
141+
async Task<bool> ICacheClientAsync.ReplaceAsync<T>(string key, T value, DateTime expiresAt, CancellationToken token)
142+
{
143+
await using var client = await GetCacheClientAsync(token).ConfigureAwait(false);
144+
return await client.ReplaceAsync<T>(key, value, expiresAt, token).ConfigureAwait(false);
145+
}
146+
147+
async Task<bool> ICacheClientAsync.AddAsync<T>(string key, T value, TimeSpan expiresIn, CancellationToken token)
148+
{
149+
await using var client = await GetCacheClientAsync(token).ConfigureAwait(false);
150+
return await client.AddAsync<T>(key, value, expiresIn, token).ConfigureAwait(false);
151+
}
152+
153+
async Task<bool> ICacheClientAsync.ReplaceAsync<T>(string key, T value, TimeSpan expiresIn, CancellationToken token)
154+
{
155+
await using var client = await GetCacheClientAsync(token).ConfigureAwait(false);
156+
return await client.ReplaceAsync<T>(key, value, expiresIn, token).ConfigureAwait(false);
157+
}
158+
159+
async Task<TimeSpan?> ICacheClientAsync.GetTimeToLiveAsync(string key, CancellationToken token)
160+
{
161+
await using var client = await GetReadOnlyCacheClientAsync(token).ConfigureAwait(false);
162+
return await client.GetTimeToLiveAsync(key, token).ConfigureAwait(false);
163+
}
164+
165+
async IAsyncEnumerable<string> ICacheClientAsync.GetKeysByPatternAsync(string pattern, [EnumeratorCancellation] CancellationToken token)
166+
{
167+
await using var client = await GetReadOnlyCacheClientAsync(token).ConfigureAwait(false);
168+
await foreach (var key in client.GetKeysByPatternAsync(pattern, token).ConfigureAwait(false).WithCancellation(token))
169+
{
170+
yield return key;
171+
}
172+
}
173+
174+
async Task ICacheClientAsync.RemoveExpiredEntriesAsync(CancellationToken token)
175+
{
176+
await using var client = await GetCacheClientAsync(token).ConfigureAwait(false);
177+
await client.RemoveExpiredEntriesAsync(token).ConfigureAwait(false);
178+
}
179+
}
180+
}

src/ServiceStack.Redis/BasicRedisClientManager.ICacheClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public ICacheClient GetCacheClient()
3737

3838
public ICacheClient GetReadOnlyCacheClient()
3939
{
40-
return ConfigureRedisClient(this.GetReadOnlyClient());
40+
return ConfigureRedisClient(this.GetReadOnlyClientImpl());
4141
}
4242

4343
private ICacheClient ConfigureRedisClient(IRedisClient client)

src/ServiceStack.Redis/BasicRedisClientManager.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,8 @@ protected virtual void OnStart()
9999
/// Returns a Read/Write client (The default) using the hosts defined in ReadWriteHosts
100100
/// </summary>
101101
/// <returns></returns>
102-
public IRedisClient GetClient()
102+
public IRedisClient GetClient() => GetClientImpl();
103+
private RedisClient GetClientImpl()
103104
{
104105
var client = InitNewClient(RedisResolver.CreateMasterClient(readWriteHostsIndex++));
105106
return client;
@@ -109,7 +110,8 @@ public IRedisClient GetClient()
109110
/// Returns a ReadOnly client using the hosts defined in ReadOnlyHosts.
110111
/// </summary>
111112
/// <returns></returns>
112-
public virtual IRedisClient GetReadOnlyClient()
113+
public virtual IRedisClient GetReadOnlyClient() => GetReadOnlyClientImpl();
114+
private RedisClient GetReadOnlyClientImpl()
113115
{
114116
var client = InitNewClient(RedisResolver.CreateSlaveClient(readOnlyHostsIndex++));
115117
return client;
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
using ServiceStack.Redis.Internal;
2+
using System;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
6+
namespace ServiceStack.Redis
7+
{
8+
internal sealed partial class BufferedReader
9+
{
10+
internal ValueTask<int> ReadByteAsync(in CancellationToken token = default)
11+
=> _available > 0 ? ReadByteFromBuffer().AsValueTaskResult() : ReadByteSlowAsync(token);
12+
13+
private ValueTask<int> ReadByteSlowAsync(in CancellationToken token)
14+
{
15+
token.ThrowIfCancellationRequested();
16+
_offset = 0;
17+
#if ASYNC_MEMORY
18+
var pending = _source.ReadAsync(new Memory<byte>(_buffer), token);
19+
if (!pending.IsCompletedSuccessfully)
20+
return Awaited(this, pending);
21+
#else
22+
var pending = _source.ReadAsync(_buffer, 0, _buffer.Length, token);
23+
if (pending.Status != TaskStatus.RanToCompletion)
24+
return Awaited(this, pending);
25+
#endif
26+
27+
_available = pending.Result;
28+
return (_available > 0 ? ReadByteFromBuffer() : -1).AsValueTaskResult();
29+
30+
#if ASYNC_MEMORY
31+
static async ValueTask<int> Awaited(BufferedReader @this, ValueTask<int> pending)
32+
{
33+
@this._available = await pending.ConfigureAwait(false);
34+
return @this._available > 0 ? @this.ReadByteFromBuffer() : -1;
35+
}
36+
#else
37+
static async ValueTask<int> Awaited(BufferedReader @this, Task<int> pending)
38+
{
39+
@this._available = await pending.ConfigureAwait(false);
40+
return @this._available > 0 ? @this.ReadByteFromBuffer() : -1;
41+
}
42+
#endif
43+
}
44+
45+
internal ValueTask<int> ReadAsync(byte[] buffer, int offset, int count, in CancellationToken token = default)
46+
=> _available > 0
47+
? ReadFromBuffer(buffer, offset, count).AsValueTaskResult()
48+
: ReadSlowAsync(buffer, offset, count, token);
49+
50+
private ValueTask<int> ReadSlowAsync(byte[] buffer, int offset, int count, in CancellationToken token)
51+
{
52+
// if they're asking for more than we deal in, just step out of the way
53+
if (count >= buffer.Length)
54+
{
55+
#if ASYNC_MEMORY
56+
return _source.ReadAsync(new Memory<byte>(buffer, offset, count), token);
57+
#else
58+
return new ValueTask<int>(_source.ReadAsync(buffer, offset, count, token));
59+
#endif
60+
}
61+
62+
// they're asking for less, so we could still have some left
63+
_offset = 0;
64+
#if ASYNC_MEMORY
65+
var pending = _source.ReadAsync(new Memory<byte>(_buffer), token);
66+
if (!pending.IsCompletedSuccessfully)
67+
return Awaited(this, pending, buffer, offset, count);
68+
69+
_available = pending.Result; // already checked status, this is fine
70+
return (_available > 0 ? ReadFromBuffer(buffer, offset, count) : 0).AsValueTaskResult();
71+
72+
static async ValueTask<int> Awaited(BufferedReader @this, ValueTask<int> pending, byte[] buffer, int offset, int count)
73+
{
74+
@this._available = await pending.ConfigureAwait(false);
75+
return @this._available > 0 ? @this.ReadFromBuffer(buffer, offset, count) : 0;
76+
}
77+
#else
78+
var pending = _source.ReadAsync(_buffer, 0, _buffer.Length, token);
79+
if (pending.Status != TaskStatus.RanToCompletion)
80+
return Awaited(this, pending, buffer, offset, count);
81+
82+
_available = pending.Result; // already checked status, this is fine
83+
return (_available > 0 ? ReadFromBuffer(buffer, offset, count) : 0).AsValueTaskResult();
84+
85+
static async ValueTask<int> Awaited(BufferedReader @this, Task<int> pending, byte[] buffer, int offset, int count)
86+
{
87+
@this._available = await pending.ConfigureAwait(false);
88+
return @this._available > 0 ? @this.ReadFromBuffer(buffer, offset, count) : 0;
89+
}
90+
#endif
91+
}
92+
}
93+
}

0 commit comments

Comments
 (0)