diff --git a/src/AzureFunctionExtensions/AzureFunctionExtensions.csproj b/src/AzureFunctionExtensions/AzureFunctionExtensions.csproj
index 2551848..d8e2f7a 100644
--- a/src/AzureFunctionExtensions/AzureFunctionExtensions.csproj
+++ b/src/AzureFunctionExtensions/AzureFunctionExtensions.csproj
@@ -26,7 +26,7 @@
-
+
diff --git a/src/AzureFunctionExtensions/Redis/RedisConfiguration.cs b/src/AzureFunctionExtensions/Redis/RedisConfiguration.cs
index 2022649..c0e7c04 100644
--- a/src/AzureFunctionExtensions/Redis/RedisConfiguration.cs
+++ b/src/AzureFunctionExtensions/Redis/RedisConfiguration.cs
@@ -37,6 +37,11 @@ public RedisConfiguration(IRedisDatabaseManager redisDatabaseManager)
///
public bool SendInBatch { get; set; } = true;
+ ///
+ /// Send items in contiguous transaction to Redis
+ ///
+ public bool SendInTransaction { get; set; } = true;
+
///
/// Sets the operation to performed in Redis
@@ -48,6 +53,7 @@ public RedisConfiguration(IRedisDatabaseManager redisDatabaseManager)
/// Time to live in Redis
///
public TimeSpan? TimeToLive { get; set; }
+
///
/// Initializes attributes, configuration and async collector
diff --git a/src/AzureFunctionExtensions/Redis/RedisOutputAsyncCollector.cs b/src/AzureFunctionExtensions/Redis/RedisOutputAsyncCollector.cs
index ce08736..6bdae52 100644
--- a/src/AzureFunctionExtensions/Redis/RedisOutputAsyncCollector.cs
+++ b/src/AzureFunctionExtensions/Redis/RedisOutputAsyncCollector.cs
@@ -46,7 +46,7 @@ public RedisOutputAsyncCollector(RedisConfiguration config, RedisOutputAttribute
{
BinaryValue = item.BinaryValue,
ObjectValue = item.ObjectValue,
- TextValue = item.TextValue,
+ TextValue = item.TextValue,
Key = Utils.MergeValueForProperty(item.Key, attr.Key, config.Key),
TimeToLive = Utils.MergeValueForNullableProperty(item.TimeToLive, attr.TimeToLive, config.TimeToLive),
IncrementValue = item.IncrementValue,
@@ -70,7 +70,7 @@ public RedisOutputAsyncCollector(RedisConfiguration config, RedisOutputAttribute
else
{
await SendToRedis(finalItem);
- }
+ }
}
///
@@ -80,13 +80,7 @@ public RedisOutputAsyncCollector(RedisConfiguration config, RedisOutputAttribute
///
public async Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken))
{
- foreach (var item in this.redisOutputCollection)
- {
- await SendToRedis(item);
-
- if (cancellationToken != null && cancellationToken.IsCancellationRequested)
- break;
- }
+ await SendToRedis(this.redisOutputCollection.ToArray());
}
///
@@ -94,39 +88,62 @@ public RedisOutputAsyncCollector(RedisConfiguration config, RedisOutputAttribute
///
///
///
- async Task SendToRedis(RedisOutput item)
+ async Task SendToRedis(params RedisOutput[] items)
{
- var connectionString = Utils.MergeValueForProperty(attr.Connection, config.Connection);
+ var connectionString = Utils.MergeValueForProperty(attr.Connection, config.Connection);
var db = redisDatabaseManager.GetDatabase(connectionString); // TODO: add support for multiple databases
-
- RedisValue value = CreateRedisValue(item);
-
- switch (item.Operation)
+ IDatabaseAsync dbAsync = db;
+ IBatch batch = null;
+ ITransaction transaction = null;
+ if (items.Length > 1)
{
- case RedisOutputOperation.SetKeyValue:
- {
- await db.StringSetAsync(item.Key, value, item.TimeToLive, When.Always, CommandFlags.FireAndForget);
- break;
- }
-
- case RedisOutputOperation.IncrementValue:
- {
- await db.StringIncrementAsync(item.Key, item.IncrementValue);
- break;
- }
-
- case RedisOutputOperation.ListRightPush:
- {
- await db.ListRightPushAsync(item.Key, value, When.Always, CommandFlags.FireAndForget);
- break;
- }
-
- case RedisOutputOperation.ListLeftPush:
- {
- await db.ListLeftPushAsync(item.Key, value, When.Always, CommandFlags.FireAndForget);
- break;
- }
+ if (this.config.SendInTransaction)
+ {
+ dbAsync = transaction = db.CreateTransaction();
+ }
+ else
+ {
+ dbAsync = batch = db.CreateBatch();
+ }
+ }
+ var tasks = new List();
+ foreach (var item in items)
+ {
+ RedisValue value = CreateRedisValue(item);
+
+ switch (item.Operation)
+ {
+ case RedisOutputOperation.SetKeyValue:
+ {
+ tasks.Add(dbAsync.StringSetAsync(item.Key, value, item.TimeToLive, When.Always, CommandFlags.FireAndForget));
+ break;
+ }
+
+ case RedisOutputOperation.IncrementValue:
+ {
+ tasks.Add(dbAsync.StringIncrementAsync(item.Key, item.IncrementValue));
+ break;
+ }
+
+ case RedisOutputOperation.ListRightPush:
+ {
+ tasks.Add(dbAsync.ListRightPushAsync(item.Key, value, When.Always, CommandFlags.FireAndForget));
+ break;
+ }
+
+ case RedisOutputOperation.ListLeftPush:
+ {
+ tasks.Add(dbAsync.ListLeftPushAsync(item.Key, value, When.Always, CommandFlags.FireAndForget));
+ break;
+ }
+ }
}
+
+ transaction?.Execute();
+ batch?.Execute();
+
+ await Task.WhenAll(tasks).ConfigureAwait(false);
+
}
///
diff --git a/src/AzureFunctionExtensions/Redis/RedisOutputAttribute.cs b/src/AzureFunctionExtensions/Redis/RedisOutputAttribute.cs
index bb8c7e6..d7f4806 100644
--- a/src/AzureFunctionExtensions/Redis/RedisOutputAttribute.cs
+++ b/src/AzureFunctionExtensions/Redis/RedisOutputAttribute.cs
@@ -28,6 +28,11 @@ public sealed class RedisOutputAttribute : Attribute, IConnectionProvider
///
public bool SendInBatch { get; set; } = true;
+ ///
+ /// Send items in contiguous transaction to Redis
+ ///
+ public bool SendInTransaction { get; set; } = true;
+
///
/// Sets the operation to performed in Redis
/// Default is
diff --git a/test/AzureFunctionExtensions.Test/RedisBatchMock.cs b/test/AzureFunctionExtensions.Test/RedisBatchMock.cs
new file mode 100644
index 0000000..d1e9e32
--- /dev/null
+++ b/test/AzureFunctionExtensions.Test/RedisBatchMock.cs
@@ -0,0 +1,932 @@
+using StackExchange.Redis;
+using System;
+using System.Collections.Generic;
+using System.Net;
+using System.Threading.Tasks;
+
+namespace Fbeltrao.AzureFunctionExtensions.Test
+{
+ public class RedisBatchMock : IBatch
+ {
+ private readonly RedisDatabaseMock databaseMock;
+ Queue> actionQueue = new Queue>();
+
+ public RedisBatchMock(RedisDatabaseMock databaseMock)
+ {
+ this.databaseMock = databaseMock;
+ }
+
+ public IConnectionMultiplexer Multiplexer => throw new NotImplementedException();
+
+ public Task DebugObjectAsync(RedisKey key, CommandFlags flags = CommandFlags.None)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void Execute()
+ {
+ if (actionQueue.TryDequeue(out var action))
+ {
+ Task.Run(()=>action()).ContinueWith(x=>Execute());
+ }
+ }
+
+ public Task ExecuteAsync(string command, params object[] args)
+ {
+ throw new NotImplementedException();
+ }
+
+ public Task ExecuteAsync(string command, ICollection