Skip to content

Commit 448ae95

Browse files
C#: batch compatibility (valkey-io#4371)
Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com> Co-authored-by: Edward Liang <76571219+edlng@users.noreply.github.com>
1 parent 2a27f52 commit 448ae95

21 files changed

+273
-49
lines changed

csharp/sources/Valkey.Glide/Abstract/ConnectionMultiplexer.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
using System.Net;
44

5-
using Valkey.Glide.Commands;
65
using Valkey.Glide.Internals;
76

87
using static Valkey.Glide.Commands.Options.InfoOptions;

csharp/sources/Valkey.Glide/Abstract/Database.cs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0
22

3-
using Valkey.Glide.Commands;
43
using Valkey.Glide.Commands.Options;
54
using Valkey.Glide.Internals;
65

@@ -17,9 +16,21 @@ internal class DatabaseImpl : GlideClient, IDatabase
1716
? await Command(Request.Info(sections), Route.Random)
1817
: await base.Info(sections);
1918

19+
public IBatch CreateBatch(object? asyncState = null)
20+
{
21+
Utils.Requires<ArgumentException>(asyncState is null, "Async state is not supported by GLIDE");
22+
return new ValkeyBatch(this);
23+
}
24+
25+
public ITransaction CreateTransaction(object? asyncState = null)
26+
{
27+
Utils.Requires<ArgumentException>(asyncState is null, "Async state is not supported by GLIDE");
28+
return new ValkeyTransaction(this);
29+
}
30+
2031
internal readonly bool IsCluster;
2132

22-
private DatabaseImpl(bool isCluster) { IsCluster = isCluster; }
33+
protected DatabaseImpl(bool isCluster) { IsCluster = isCluster; }
2334

2435
public static async Task<DatabaseImpl> Create(BaseClientConfiguration config)
2536
=> await CreateClient(config, () => new DatabaseImpl(config is ClusterClientConfiguration));
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0
2+
3+
namespace Valkey.Glide;
4+
5+
/// <summary>
6+
/// Represents a block of operations that will be sent to the server together.
7+
/// This can be useful to reduce packet fragmentation on slow connections - it
8+
/// can improve the time to get *all* the operations processed, with the trade-off
9+
/// of a slower time to get the *first* operation processed; this is usually
10+
/// a good thing. Unless this batch is a <b>transaction</b>, there is no guarantee
11+
/// that these operations will be processed either contiguously or atomically by the server.<br />
12+
/// See also <see cref="Pipeline.Batch" /> and <see cref="Pipeline.ClusterBatch" />.
13+
/// </summary>
14+
public interface IBatch : IDatabaseAsync
15+
{
16+
/// <summary>
17+
/// Execute the batch operation, sending all queued commands to the server.
18+
/// Note that this operation is neither synchronous nor truly asynchronous - it simply enqueues the buffered messages.
19+
/// To check on completion, you should check the individual responses.
20+
/// </summary>
21+
void Execute();
22+
}

csharp/sources/Valkey.Glide/Abstract/IConnectionMultiplexer.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
using System.Net;
44

5-
using Valkey.Glide.Commands;
6-
75
namespace Valkey.Glide;
86

97
/// <summary>
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0
2+
3+
namespace Valkey.Glide;
4+
5+
/// <summary>
6+
/// Describes functionality that is common to both standalone and cluster servers.<br />
7+
/// See also <see cref="GlideClient" /> and <see cref="GlideClusterClient" />.
8+
/// </summary>
9+
public interface IDatabase : IDatabaseAsync
10+
{
11+
/// <summary>
12+
/// Allows creation of a group of operations that will be sent to the server as a single unit,
13+
/// but which may or may not be processed on the server contiguously.
14+
/// </summary>
15+
/// <param name="asyncState">The async state is not supported by GLIDE.</param>
16+
/// <returns>The created batch.</returns>
17+
IBatch CreateBatch(object? asyncState = null);
18+
19+
/// <summary>
20+
/// Allows creation of a group of operations that will be sent to the server as a single unit,
21+
/// and processed on the server as a single unit.
22+
/// </summary>
23+
/// <param name="asyncState">The async state is not supported by GLIDE.</param>
24+
/// <returns>The created transaction.</returns>
25+
ITransaction CreateTransaction(object? asyncState = null);
26+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0
2+
3+
using Valkey.Glide.Commands;
4+
5+
namespace Valkey.Glide;
6+
7+
public interface IDatabaseAsync : IStringBaseCommands, IGenericCommands, IServerManagementCommands
8+
{ }
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0
2+
3+
namespace Valkey.Glide;
4+
5+
/// <summary>
6+
/// Represents a group of operations that will be sent to the server as a single unit,
7+
/// and processed on the server as a single unit. Transactions can also include constraints
8+
/// (implemented via <c>WATCH</c>), but note that constraint checking involves will (very briefly)
9+
/// block the connection, since the transaction cannot be correctly committed (<c>EXEC</c>),
10+
/// aborted (<c>DISCARD</c>) or not applied in the first place (<c>UNWATCH</c>) until the responses from
11+
/// the constraint checks have arrived.
12+
/// See also <see cref="Pipeline.Batch" /> and <see cref="Pipeline.ClusterBatch" />.
13+
/// </summary>
14+
/// <remarks>
15+
/// <para>Note that on a cluster, it may be required that all keys involved in the transaction (including constraints) are in the same hash-slot.</para>
16+
/// <para><seealso href="https://valkey.io/topics/transactions/"/></para>
17+
/// </remarks>
18+
public interface ITransaction : IBatch
19+
{
20+
/// <summary>
21+
/// Adds a precondition for this transaction.
22+
/// </summary>
23+
/// <param name="condition">The condition to add to the transaction.</param>
24+
//ConditionResult AddCondition(Condition condition);
25+
26+
/// <summary>
27+
/// Execute the batch operation, sending all queued commands to the server.
28+
/// </summary>
29+
/// <param name="flags">Command flags are not supported by GLIDE.</param>
30+
/// <returns>
31+
/// <see langword="true" /> if a transaction was applied or
32+
/// <see langword="false" /> if a transaction failed due to a <c>WATCH</c> command.
33+
/// </returns>
34+
bool Execute(CommandFlags flags = CommandFlags.None);
35+
36+
/// <summary>
37+
/// Execute the batch operation, sending all queued commands to the server.
38+
/// </summary>
39+
/// <param name="flags">Command flags are not supported by GLIDE.</param>
40+
/// <returns>
41+
/// <see langword="true" /> if a transaction was applied or
42+
/// <see langword="false" /> if a transaction failed due to a <c>WATCH</c> command.
43+
/// </returns>
44+
Task<bool> ExecuteAsync(CommandFlags flags = CommandFlags.None);
45+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0
2+
3+
using Valkey.Glide.Internals;
4+
using Valkey.Glide.Pipeline;
5+
6+
namespace Valkey.Glide;
7+
8+
internal class ValkeyBatch(BaseClient client) : DatabaseImpl(false), IBatch
9+
{
10+
private readonly List<ICmd> _commands = [];
11+
protected TaskCompletionSource<object?[]?> _tcs = new();
12+
private readonly BaseClient _client = client;
13+
protected bool _isAtomic = false;
14+
15+
internal override async Task<T> Command<R, T>(Cmd<R, T> command, Route? route = null)
16+
{
17+
int idx = _commands.Count;
18+
_commands.Add(command);
19+
// wait for the batch to be executed and then pick command's result
20+
return await _tcs.Task.ContinueWith(task => (T)task.Result![idx]!);
21+
// TODO what if transactoin failed?
22+
}
23+
24+
protected async Task ExecuteImpl()
25+
{
26+
Batch b = new(_isAtomic);
27+
b.Commands.AddRange(_commands);
28+
29+
object?[]? res = await _client.Batch(b, true);
30+
_tcs.SetResult(res);
31+
}
32+
33+
public void Execute() => ExecuteImpl().GetAwaiter().GetResult();
34+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0
2+
3+
namespace Valkey.Glide;
4+
5+
internal class ValkeyTransaction : ValkeyBatch, ITransaction
6+
{
7+
public ValkeyTransaction(BaseClient client) : base(client)
8+
{
9+
_isAtomic = true;
10+
}
11+
12+
public bool Execute(CommandFlags flags = CommandFlags.None)
13+
=> ExecuteAsync(flags).GetAwaiter().GetResult();
14+
15+
public async Task<bool> ExecuteAsync(CommandFlags flags = CommandFlags.None)
16+
{
17+
Utils.Requires<NotImplementedException>(flags == CommandFlags.None, "Command flags are not supported by GLIDE");
18+
await ExecuteImpl();
19+
return _tcs.Task.Result is not null;
20+
}
21+
}

csharp/sources/Valkey.Glide/BaseClient.cs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,11 @@ protected BaseClient()
6666

6767
protected internal delegate T ResponseHandler<T>(IntPtr response);
6868

69-
/// <summary>
70-
/// </summary>
7169
/// <typeparam name="R">Type received from server.</typeparam>
7270
/// <typeparam name="T">Type we return to the user.</typeparam>
7371
/// <param name="command"></param>
7472
/// <param name="route"></param>
75-
/// <returns></returns>
76-
internal async Task<T> Command<R, T>(Cmd<R, T> command, Route? route = null)
73+
internal virtual async Task<T> Command<R, T>(Cmd<R, T> command, Route? route = null)
7774
{
7875
// 1. Create Cmd which wraps CmdInfo and manages all memory allocations
7976
using Cmd cmd = command.ToFfi();
@@ -99,7 +96,7 @@ internal async Task<T> Command<R, T>(Cmd<R, T> command, Route? route = null)
9996
// All memory allocated is auto-freed by `using` operator
10097
}
10198

102-
protected async Task<object?[]?> Batch<T>(BaseBatch<T> batch, bool raiseOnError, BaseBatchOptions? options = null) where T : BaseBatch<T>
99+
internal async Task<object?[]?> Batch<T>(BaseBatch<T> batch, bool raiseOnError, BaseBatchOptions? options = null) where T : BaseBatch<T>
103100
{
104101
// 1. Allocate memory for batch, which allocates all nested Cmds
105102
using FFI.Batch ffiBatch = batch.ToFFI();

0 commit comments

Comments
 (0)