Skip to content

Commit 8a1bbcc

Browse files
committed
Improve speed of idempotency checks by using HybridCache
Users can configure in-memory only or the optional IDistributedCache backing store, but either way the new idempotency speeds up checks by caching the processed status of recent items so when retries happen, we can more quickly avoid dupes.
1 parent 45ec803 commit 8a1bbcc

File tree

11 files changed

+351
-24
lines changed

11 files changed

+351
-24
lines changed

.netconfig

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,3 +158,13 @@
158158

159159
weak
160160
etag = 371342087cec0269473a91f7fc295fd1049e21f016a1b7113614f2c4e3eefe5f
161+
[file "src/WhatsApp/Extensions/AsyncLazy.cs"]
162+
url = https://github.com/devlooped/catbag/blob/main/System/Threading/Tasks/AsyncLazy.cs
163+
sha = 9f3330f09713aa5f746047e3a50ee839147a5797
164+
etag = 73320600b7a18e0eb25cadc3d687c69dc79181b0458facf526666e150c634782
165+
weak
166+
[file "src/WhatsApp/Extensions/Base62.cs"]
167+
url = https://github.com/devlooped/catbag/blob/main/System/Base62.cs
168+
sha = cf76df0d6a218c26ebe117339fe3445050b0532a
169+
etag = aed711a45e051edfddfcb76d9f8021d30f9817c342cfe8d1cc38f2af37b47aa8
170+
weak

src/SampleApp/Sample/Program.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using Devlooped;
77
using Devlooped.WhatsApp;
88
using Microsoft.Azure.Functions.Worker.Builder;
9+
using Microsoft.Extensions.Caching.Hybrid;
910
using Microsoft.Extensions.Configuration;
1011
using Microsoft.Extensions.DependencyInjection;
1112
using Microsoft.Extensions.Hosting;

src/SampleApp/Sample/Sample.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
<PackageReference Include="OpenTelemetry.Instrumentation.Http" Version="1.12.0" />
2525
<PackageReference Include="OpenTelemetry.Instrumentation.Runtime" Version="1.12.0" />
2626
<PackageReference Include="System.Linq.Async" Version="6.0.3" />
27+
<PackageReference Include="Microsoft.Extensions.Caching.Hybrid" Version="9.9.0" />
2728
</ItemGroup>
2829
<ItemGroup>
2930
<ProjectReference Include="..\..\WhatsApp\WhatsApp.csproj" />

src/Tests/IdempotencyTests.cs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
using Azure.Data.Tables;
2+
using Microsoft.Extensions.Caching.Hybrid;
3+
using Microsoft.Extensions.DependencyInjection;
4+
5+
namespace Devlooped.WhatsApp;
6+
7+
public class IdempotencyTests
8+
{
9+
[Fact]
10+
public async Task CanAddProcessedItem()
11+
{
12+
var client = CloudStorageAccount.DevelopmentStorageAccount.CreateTableServiceClient();
13+
var table = client.GetTableClient("WhatsAppWebhook");
14+
var collection = new ServiceCollection();
15+
collection.AddHybridCache();
16+
var cache = collection.BuildServiceProvider().GetRequiredService<HybridCache>();
17+
18+
var idempotent = new Idempotency(client, cache);
19+
var pk = nameof(CanAddProcessedItem);
20+
var rk = Ulid.NewUlid().ToString();
21+
22+
// Initially unprocessed
23+
Assert.False(await idempotent.IsProcessedAsync(pk, rk));
24+
25+
// The etag is used for optimistic concurrency on resetting
26+
var etag = await idempotent.TrySetProcessedAsync(pk, rk);
27+
28+
Assert.NotNull(etag);
29+
Assert.True(await idempotent.IsProcessedAsync(pk, rk));
30+
31+
// Can't set again, the 409 conflict will mark it as true if it isn't already.
32+
Assert.Null(await idempotent.TrySetProcessedAsync(pk, rk));
33+
34+
// Simulates a failure in processing so we're returning the item to the processing pool
35+
await idempotent.ResetProcessedAsync(pk, rk, etag.Value);
36+
37+
Assert.False((await table.GetEntityIfExistsAsync<TableEntity>(pk, rk)).HasValue);
38+
39+
// Simulate another process picking the item up at this point and writing back to storage
40+
await table.AddEntityAsync(new TableEntity(pk, rk));
41+
42+
// The check would now re-read from storage and see it since we restored.
43+
Assert.True(await idempotent.IsProcessedAsync(pk, rk));
44+
}
45+
}

src/WhatsApp/AzureFunctionsWebhook.cs

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
using System;
2-
using System.Diagnostics;
1+
using System.Diagnostics;
32
using System.Text;
43
using System.Text.Json;
54
using System.Text.Json.Nodes;
6-
using Azure.Data.Tables;
75
using Devlooped.WhatsApp.Flows;
86
using Microsoft.AspNetCore.Http;
97
using Microsoft.AspNetCore.Mvc;
@@ -24,7 +22,7 @@ namespace Devlooped.WhatsApp;
2422
/// <param name="handler">The message handler that will process incoming messages.</param>
2523
/// <param name="logger">The logger.</param>
2624
class AzureFunctionsWebhook(
27-
TableServiceClient tableClient,
25+
Idempotency idempotency,
2826
IMessageProcessor messageProcessor,
2927
PipelineRunner runner,
3028
IWhatsAppClient whatsapp,
@@ -65,19 +63,13 @@ public async Task<IActionResult> Message([HttpTrigger(AuthorizationLevel.Anonymo
6563
if (message is UserMessage user)
6664
await user.SendProgress(whatsapp, functionOptions.ReadOnMessage is true, functionOptions.TypingOnMessage is true).Ignore();
6765

68-
// Ensure idempotent processing
69-
var table = tableClient.GetTableClient("WhatsAppWebhook");
70-
await table.CreateIfNotExistsAsync();
71-
if (await table.GetEntityIfExistsAsync<TableEntity>(message.User.Number, message.NotificationId) is { HasValue: true } existing)
72-
{
73-
logger.LogInformation("Skipping already handled message {Id}", message.Id);
74-
return new OkResult();
75-
}
76-
7766
if (functionOptions.ReactOnMessage != null && message.Type == MessageType.Content)
7867
await message.React(functionOptions.ReactOnMessage).SendAsync(whatsapp).Ignore();
7968

80-
if (hosting.IsDevelopment())
69+
// NOTE: development speed-up does check for idempotency so we avoid re-entering the pipeline while we're
70+
// debugging and WhatsApp may interpret that as a failing callback and invoke us again. In production, though,
71+
// we don't need to incur that cost here since the pipeline will do it before running.
72+
if (hosting.IsDevelopment() && await idempotency.IsProcessedAsync(message, json) != true)
8173
{
8274
// Avoid enqueing to speed up local devloop
8375
_ = Task.Run(async () =>
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// <auto-generated />
2+
#region License
3+
// MIT License
4+
//
5+
// Copyright (c) Daniel Cazzulino
6+
//
7+
// Permission is hereby granted, free of charge, to any person obtaining a copy
8+
// of this software and associated documentation files (the "Software"), to deal
9+
// in the Software without restriction, including without limitation the rights
10+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
// copies of the Software, and to permit persons to whom the Software is
12+
// furnished to do so, subject to the following conditions:
13+
//
14+
// The above copyright notice and this permission notice shall be included in all
15+
// copies or substantial portions of the Software.
16+
//
17+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23+
// SOFTWARE.
24+
#endregion
25+
26+
using System.Runtime.CompilerServices;
27+
28+
namespace System.Threading.Tasks
29+
{
30+
/// <summary>
31+
/// Provides factory methods to create <see cref="AsyncLazy{T}"/> so that
32+
/// the <c>T</c> can be inferred from the received value factory return type.
33+
/// </summary>
34+
/// <remarks>
35+
/// Usage:
36+
/// <code>
37+
/// var lazy = AsyncLazy.Create(() => ...);
38+
///
39+
/// var value = await lazy.Value;
40+
/// </code>
41+
/// </remarks>
42+
static partial class AsyncLazy
43+
{
44+
/// <summary>
45+
/// Creates an <see cref="AsyncLazy{T}"/> using the given <paramref name="valueFactory"/>.
46+
/// </summary>
47+
public static AsyncLazy<T> Create<T>(Func<T> valueFactory) => new AsyncLazy<T>(valueFactory);
48+
49+
/// <summary>
50+
/// Creates an <see cref="AsyncLazy{T}"/> using the given <paramref name="asyncValueFactory"/>.
51+
/// </summary>
52+
public static AsyncLazy<T> Create<T>(Func<Task<T>> asyncValueFactory) => new AsyncLazy<T>(asyncValueFactory);
53+
}
54+
55+
/// <summary>
56+
/// A <see cref="Lazy{T}"/> that initializes asynchronously and whose
57+
/// <see cref="Lazy{T}.Value"/> can be awaited for initialization completion.
58+
/// </summary>
59+
/// <remarks>
60+
/// Basically taken from https://devblogs.microsoft.com/pfxteam/asynclazyt/.
61+
/// Usage:
62+
/// <code>
63+
/// var lazy = new AsyncLazy&lt;T&gt;(() => ...);
64+
///
65+
/// var value = await lazy.Value;
66+
/// </code>
67+
/// </remarks>
68+
/// <typeparam name="T">The type of async lazily-initialized value.</typeparam>
69+
partial class AsyncLazy<T> : Lazy<Task<T>>
70+
{
71+
/// <summary>
72+
/// Initializes the lazy, using <see cref="Task.Run(Func{T})"/> to asynchronously
73+
/// schedule the value factory execution.
74+
/// </summary>
75+
public AsyncLazy(Func<T> valueFactory) : base(() => Task.Run(valueFactory))
76+
{ }
77+
78+
/// <summary>
79+
/// Initializes the lazy, using <see cref="Task.Run(Func{Task{T}})"/> to asynchronously
80+
/// schedule the value factory execution.
81+
/// </summary>
82+
public AsyncLazy(Func<Task<T>> asyncValueFactory) : base(() => Task.Run(() => asyncValueFactory()))
83+
{ }
84+
85+
/// <summary>
86+
/// Allows awaiting the async lazy directly.
87+
/// </summary>
88+
public TaskAwaiter<T> GetAwaiter() => Value.GetAwaiter();
89+
90+
/// <summary>
91+
/// Gets a value indicating whether the value factory has been invoked and has run to completion.
92+
/// </summary>
93+
public bool IsValueFactoryCompleted => base.Value.IsCompleted;
94+
}
95+
}

src/WhatsApp/Extensions/Base62.cs

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
// <auto-generated />
2+
#region License
3+
// MIT License
4+
//
5+
// Copyright (c) Daniel Cazzulino
6+
//
7+
// Permission is hereby granted, free of charge, to any person obtaining a copy
8+
// of this software and associated documentation files (the "Software"), to deal
9+
// in the Software without restriction, including without limitation the rights
10+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
// copies of the Software, and to permit persons to whom the Software is
12+
// furnished to do so, subject to the following conditions:
13+
//
14+
// The above copyright notice and this permission notice shall be included in all
15+
// copies or substantial portions of the Software.
16+
//
17+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23+
// SOFTWARE.
24+
#endregion
25+
26+
#nullable enable
27+
using System.Linq;
28+
using System.Numerics;
29+
using System.Text;
30+
31+
namespace System
32+
{
33+
/// <summary>
34+
/// Inspired in a bunch of searches, samples and snippets on various languages
35+
/// and blogs and what-not on doing URL shortering :), heavily tweaked to make
36+
/// it fully idiomatic in C#.
37+
/// </summary>
38+
static partial class Base62
39+
{
40+
/// <summary>
41+
/// Encodes a Guid into a base62 string.
42+
/// </summary>
43+
public static string ToBase62(this Guid guid)
44+
=> Encode(new BigInteger(guid.ToByteArray()));
45+
46+
/// <summary>
47+
/// Encodes the absolute numeric value into a base62 string.
48+
/// </summary>
49+
public static string Encode(BigInteger value)
50+
{
51+
if (value == 0)
52+
return "0";
53+
54+
// normalize sign in case we got negative.
55+
value = BigInteger.Abs(value);
56+
57+
// TODO: I'm almost sure there's a more succint way
58+
// of doing this with LINQ and Aggregate, but just
59+
// can't figure it out...
60+
var sb = new StringBuilder();
61+
62+
while (value != 0)
63+
{
64+
sb = sb.Append(ToBase62(value % 62));
65+
value /= 62;
66+
}
67+
68+
// Reverse the string, since we're building it backwards,
69+
var chars = sb.ToString().ToCharArray();
70+
Array.Reverse(chars);
71+
72+
return new string(chars);
73+
}
74+
75+
/// <summary>
76+
/// Decodes a base62 string into its original numeric value.
77+
/// </summary>
78+
public static BigInteger Decode(string value)
79+
=> value.Aggregate(new BigInteger(0), (current, c) => current * 62 + FromBase62(c));
80+
81+
static char ToBase62(BigInteger d) => d switch
82+
{
83+
BigInteger v when v < 10 => (char)('0' + d),
84+
BigInteger v when v < 36 => (char)('A' + d - 10),
85+
BigInteger v when v < 62 => (char)('a' + d - 36),
86+
_ => throw new ArgumentException($"Cannot encode digit {d} to base 62.", nameof(d)),
87+
};
88+
89+
static BigInteger FromBase62(char c) => c switch
90+
{
91+
char v when c >= 'a' && v <= 'z' => 36 + c - 'a',
92+
char v when c >= 'A' && v <= 'Z' => 10 + c - 'A',
93+
char v when c >= '0' && v <= '9' => c - '0',
94+
_ => throw new ArgumentException($"Cannot decode char '{c}' from base 62.", nameof(c)),
95+
};
96+
}
97+
}

src/WhatsApp/Idempotency.cs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
using System.Numerics;
2+
using System.Security.Cryptography;
3+
using System.Text;
4+
using Azure;
5+
using Azure.Data.Tables;
6+
using Microsoft.Extensions.Caching.Hybrid;
7+
8+
namespace Devlooped.WhatsApp;
9+
10+
static class IdempotencyExtensions
11+
{
12+
public static ValueTask<bool> IsProcessedAsync(this Idempotency idempotency, Message message, string json, CancellationToken token = default)
13+
=> idempotency.IsProcessedAsync(message.User.Number, RowKey(message, json), token);
14+
15+
public static async ValueTask<ETag?> TrySetProcessedAsync(this Idempotency idempotency, Message message, string json, CancellationToken token = default)
16+
=> await idempotency.TrySetProcessedAsync(message.User.Number, RowKey(message, json), token);
17+
18+
public static async ValueTask ResetProcessedAsync(this Idempotency idempotency, Message message, string json, ETag etag, CancellationToken token = default)
19+
=> await idempotency.ResetProcessedAsync(message.User.Number, RowKey(message, json), etag, token);
20+
21+
static string RowKey(Message message, string payload)
22+
=> message.Id.StartsWith("wamid.", StringComparison.Ordinal) ? message.Id : Base62.Encode(new BigInteger(MD5.HashData(Encoding.UTF8.GetBytes(payload)), isUnsigned: true, isBigEndian: true));
23+
}
24+
25+
class Idempotency(TableServiceClient client, HybridCache cache)
26+
{
27+
static readonly HybridCacheEntryOptions options = new()
28+
{
29+
LocalCacheExpiration = TimeSpan.FromDays(3),
30+
Expiration = TimeSpan.FromDays(30)
31+
};
32+
33+
readonly AsyncLazy<TableClient> table = new(async () =>
34+
{
35+
var table = client.GetTableClient("WhatsAppWebhook");
36+
await table.CreateIfNotExistsAsync();
37+
return table;
38+
});
39+
40+
public ValueTask<bool> IsProcessedAsync(string partitionKey, string rowKey, CancellationToken token = default)
41+
=> cache.GetOrCreateAsync(Key(partitionKey, rowKey),
42+
async key => await (await table).GetEntityIfExistsAsync<TableEntity>(partitionKey, rowKey, cancellationToken: token) is { HasValue: true },
43+
options, cancellationToken: token);
44+
45+
public async ValueTask<ETag?> TrySetProcessedAsync(string partitionKey, string rowKey, CancellationToken token = default)
46+
{
47+
var key = Key(partitionKey, rowKey);
48+
try
49+
{
50+
var entity = await (await table).AddEntityAsync(new TableEntity(partitionKey, rowKey), token);
51+
await cache.SetAsync(key, true, options, cancellationToken: token);
52+
return entity.Headers.ETag ?? ETag.All;
53+
}
54+
catch (RequestFailedException ex) when (ex.Status == 409)
55+
{
56+
await cache.SetAsync(key, true, options, cancellationToken: token);
57+
return null;
58+
}
59+
}
60+
61+
public async ValueTask ResetProcessedAsync(string partitionKey, string rowKey, ETag etag, CancellationToken token = default)
62+
{
63+
// If actual processing of a previously marked item failed, we want to return its unprocessed state
64+
var key = Key(partitionKey, rowKey);
65+
await (await table).DeleteEntityAsync(partitionKey, rowKey, etag, token);
66+
await cache.RemoveAsync(key, token);
67+
}
68+
69+
static string Key(string partitionKey, string rowKey) => $"wa:dup:{partitionKey}/{rowKey}";
70+
}

0 commit comments

Comments
 (0)