Skip to content

Commit 671c9a9

Browse files
committed
feat: Key prefix redis options
1 parent f391104 commit 671c9a9

File tree

5 files changed

+60
-11
lines changed

5 files changed

+60
-11
lines changed

src/Redis/Orleans.DurableJobs.Redis/Hosting/RedisJobShardOptions.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,15 @@ public class RedisJobShardOptions
2121
/// </remarks>
2222
public Func<RedisJobShardOptions, Task<IConnectionMultiplexer>> CreateMultiplexer { get; set; } = DefaultCreateMultiplexer;
2323

24+
/// <summary>
25+
/// Gets or sets the prefix for Redis keys used by the durable jobs provider.
26+
/// </summary>
27+
/// <remarks>
28+
/// This prefix is combined with the service ID to create the final key prefix.
29+
/// If not set, defaults to null and the standard pattern is used: "{ServiceId}/durablejobs".
30+
/// </remarks>
31+
public string? KeyPrefix { get; set; }
32+
2433
/// <summary>
2534
/// Gets or sets the prefix for shard identifiers.
2635
/// </summary>

src/Redis/Orleans.DurableJobs.Redis/README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,28 @@ siloBuilder.UseRedisDurableJobs(options =>
2424
});
2525
```
2626

27+
### Advanced Configuration - Custom Key Prefix
28+
29+
By default, Redis keys are prefixed with `{ServiceId}/durablejobs` (e.g., `my-service/durablejobs:shards:shard`). You can customize this prefix:
30+
31+
```csharp
32+
siloBuilder.UseRedisDurableJobs(options =>
33+
{
34+
options.ConfigurationOptions = ConfigurationOptions.Parse("localhost:6379");
35+
options.KeyPrefix = "custom-prefix"; // Custom Redis key prefix
36+
options.ShardPrefix = "my-app";
37+
});
38+
```
39+
40+
This will result in Redis keys like `custom-prefix:shards:my-app` instead of the default pattern.
41+
2742
### Configuration Options
2843

2944
| Option | Description | Default |
3045
|--------|-------------|---------|
3146
| `ConfigurationOptions` | Redis client configuration options (from StackExchange.Redis) | Required |
3247
| `CreateMultiplexer` | Optional delegate for custom connection logic (advanced scenarios) | Uses `ConfigurationOptions` |
48+
| `KeyPrefix` | Custom prefix for all Redis keys. If not set, defaults to `{ServiceId}/durablejobs` | `null` |
3349
| `ShardPrefix` | Prefix for shard identifiers in Redis | `"shard"` |
3450
| `MaxShardCreationRetries` | Maximum retries when creating a shard | `5` |
3551
| `MaxBatchSize` | Maximum operations per batch write | `128` |

src/Redis/Orleans.DurableJobs.Redis/RedisJobShard.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
using System.Diagnostics;
22
using System.Threading.Channels;
33
using Microsoft.Extensions.Logging;
4+
using Microsoft.Extensions.Options;
5+
using Orleans.Configuration;
46
using StackExchange.Redis;
57

68
namespace Orleans.DurableJobs.Redis;
@@ -28,13 +30,15 @@ internal sealed partial class RedisJobShard : JobShard
2830
/// <param name="endTime">The end time of the shard's time range.</param>
2931
/// <param name="redis">The Redis connection multiplexer.</param>
3032
/// <param name="metadata">The shard metadata.</param>
33+
/// <param name="keyPrefix">The Redis key prefix to use.</param>
3134
/// <param name="options">The Redis job shard options.</param>
3235
/// <param name="logger">The logger.</param>
3336
public RedisJobShard(string shardId,
3437
DateTimeOffset startTime,
3538
DateTimeOffset endTime,
3639
IConnectionMultiplexer redis,
3740
IDictionary<string, string> metadata,
41+
string keyPrefix,
3842
RedisJobShardOptions options,
3943
ILogger<RedisJobShard> logger)
4044
: base(shardId, startTime, endTime)
@@ -52,8 +56,8 @@ public RedisJobShard(string shardId,
5256
MetadataVersion = version;
5357
}
5458

55-
_streamKey = $"durablejobs:shard:{Id}:stream";
56-
_metaKey = $"durablejobs:shard:{Id}:meta";
59+
_streamKey = $"{keyPrefix}:shard:{Id}:stream";
60+
_metaKey = $"{keyPrefix}:shard:{Id}:meta";
5761

5862
_storageOperationChannel = Channel.CreateUnbounded<StorageOperation>(new UnboundedChannelOptions
5963
{

src/Redis/Orleans.DurableJobs.Redis/RedisJobShardManager.cs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
using System.Collections.Concurrent;
22
using System.Globalization;
3+
using System.Text;
34
using Microsoft.Extensions.Logging;
45
using Microsoft.Extensions.Options;
6+
using Orleans.Configuration;
57
using StackExchange.Redis;
68

79
namespace Orleans.DurableJobs.Redis;
@@ -14,8 +16,10 @@ public sealed partial class RedisJobShardManager : JobShardManager
1416
private readonly ILocalSiloDetails _localSiloDetails;
1517
private readonly IClusterMembershipService _clusterMembership;
1618
private readonly RedisJobShardOptions _options;
19+
private readonly ClusterOptions _clusterOptions;
1720
private readonly ILogger<RedisJobShardManager> _logger;
1821
private readonly ILoggerFactory _loggerFactory;
22+
private readonly RedisKey _keyPrefix;
1923

2024
private IConnectionMultiplexer? _multiplexer;
2125
private RedisOperationsManager? _redisOps;
@@ -30,20 +34,26 @@ public sealed partial class RedisJobShardManager : JobShardManager
3034
/// </summary>
3135
/// <param name="localSiloDetails">The local silo details.</param>
3236
/// <param name="options">The Redis job shard options.</param>
37+
/// <param name="clusterOptions">The cluster options.</param>
3338
/// <param name="clusterMembership">The cluster membership service.</param>
3439
/// <param name="loggerFactory">The logger factory.</param>
3540
public RedisJobShardManager(
3641
ILocalSiloDetails localSiloDetails,
3742
IOptions<RedisJobShardOptions> options,
43+
IOptions<ClusterOptions> clusterOptions,
3844
IClusterMembershipService clusterMembership,
3945
ILoggerFactory loggerFactory)
4046
: base(localSiloDetails.SiloAddress)
4147
{
4248
_localSiloDetails = localSiloDetails ?? throw new ArgumentNullException(nameof(localSiloDetails));
4349
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
50+
_clusterOptions = clusterOptions?.Value ?? throw new ArgumentNullException(nameof(clusterOptions));
4451
_clusterMembership = clusterMembership ?? throw new ArgumentNullException(nameof(clusterMembership));
4552
_loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
4653
_logger = loggerFactory.CreateLogger<RedisJobShardManager>();
54+
55+
_keyPrefix = Encoding.UTF8.GetBytes(
56+
_options.KeyPrefix ?? $"{_clusterOptions.ServiceId}/durablejobs");
4757
}
4858

4959
private async ValueTask InitializeIfNeeded()
@@ -60,8 +70,8 @@ private async ValueTask InitializeIfNeeded()
6070
LogInitialized(_logger);
6171
}
6272

63-
private string ShardSetKey => $"durablejobs:shards:{_options.ShardPrefix}";
64-
private static string MetaKeyForShard(string shardId) => $"durablejobs:shard:{shardId}:meta";
73+
private string ShardSetKey => $"{_keyPrefix}:shards:{_options.ShardPrefix}";
74+
private string MetaKeyForShard(string shardId) => $"{_keyPrefix}:shard:{shardId}:meta";
6575

6676
public override async Task<List<IJobShard>> AssignJobShardsAsync(DateTimeOffset maxDueTime, CancellationToken cancellationToken)
6777
{
@@ -165,7 +175,7 @@ public override async Task<List<IJobShard>> AssignJobShardsAsync(DateTimeOffset
165175
var minDue = ParseDateTimeOffset(metadata, "MinDueTime", DateTimeOffset.MinValue);
166176
var maxDue = ParseDateTimeOffset(metadata, "MaxDueTime", DateTimeOffset.MaxValue);
167177

168-
var shard = new RedisJobShard(shardId, minDue, maxDue, _multiplexer!, metadata, _options, _loggerFactory.CreateLogger<RedisJobShard>());
178+
var shard = new RedisJobShard(shardId, minDue, maxDue, _multiplexer!, metadata, _keyPrefix.ToString(), _options, _loggerFactory.CreateLogger<RedisJobShard>());
169179
try
170180
{
171181
await shard.InitializeAsync(cancellationToken).ConfigureAwait(false);
@@ -233,7 +243,7 @@ public override async Task<IJobShard> CreateShardAsync(DateTimeOffset minDueTime
233243
continue;
234244
}
235245

236-
var shard = new RedisJobShard(shardId, minDueTime, maxDueTime, _multiplexer!, metadataInfo, _options, _loggerFactory.CreateLogger<RedisJobShard>());
246+
var shard = new RedisJobShard(shardId, minDueTime, maxDueTime, _multiplexer!, metadataInfo, _keyPrefix.ToString(), _options, _loggerFactory.CreateLogger<RedisJobShard>());
237247
await shard.InitializeAsync(cancellationToken).ConfigureAwait(false);
238248
_jobShardCache[shardId] = shard;
239249
LogShardRegistered(_logger, shardId);
@@ -313,8 +323,8 @@ private async Task DeleteShardAsync(string shardId)
313323
{
314324
await InitializeIfNeeded().ConfigureAwait(false);
315325

316-
var streamKey = $"durablejobs:shard:{shardId}:stream";
317-
var metaKey = $"durablejobs:shard:{shardId}:meta";
326+
var streamKey = $"{_keyPrefix}:shard:{shardId}:stream";
327+
var metaKey = $"{_keyPrefix}:shard:{shardId}:meta";
318328

319329
// Delete all shard-related keys
320330
await _redisOps!.DeleteKeysAsync([streamKey, metaKey]).ConfigureAwait(false);

test/Extensions/Tester.Redis/DurableJobs/RedisJobShardManagerTestFixture.cs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using Microsoft.Extensions.Logging.Abstractions;
22
using Microsoft.Extensions.Options;
3+
using Orleans.Configuration;
34
using Orleans.DurableJobs;
45
using Orleans.DurableJobs.Redis;
56
using StackExchange.Redis;
@@ -15,6 +16,7 @@ namespace Tester.Redis.DurableJobs;
1516
internal sealed class RedisJobShardManagerTestFixture : IJobShardManagerTestFixture
1617
{
1718
private readonly IOptions<RedisJobShardOptions> _options;
19+
private readonly IOptions<ClusterOptions> _clusterOptions;
1820
private ConnectionMultiplexer _multiplexer;
1921
private readonly string _shardPrefix;
2022

@@ -33,6 +35,12 @@ public RedisJobShardManagerTestFixture()
3335
MinBatchSize = 1,
3436
BatchFlushInterval = TimeSpan.FromMilliseconds(100)
3537
});
38+
39+
_clusterOptions = Options.Create(new ClusterOptions
40+
{
41+
ServiceId = "test-service",
42+
ClusterId = "test-cluster"
43+
});
3644
}
3745

3846
private async Task<IConnectionMultiplexer> CreateMultiplexerAsync(RedisJobShardOptions options)
@@ -46,6 +54,7 @@ public JobShardManager CreateManager(ILocalSiloDetails localSiloDetails, ICluste
4654
return new RedisJobShardManager(
4755
localSiloDetails,
4856
_options,
57+
_clusterOptions,
4958
membershipService,
5059
NullLoggerFactory.Instance);
5160
}
@@ -58,14 +67,15 @@ public async ValueTask DisposeAsync()
5867
var db = _multiplexer.GetDatabase();
5968
var server = _multiplexer.GetServer(_multiplexer.GetEndPoints()[0]);
6069

61-
// Delete all keys with our test prefix
62-
await foreach (var key in server.KeysAsync(pattern: $"durablejobs:shard:{_shardPrefix}*"))
70+
// Delete all keys with our test prefix (using the default key prefix pattern)
71+
var keyPrefix = _options.Value.KeyPrefix ?? $"{_clusterOptions.Value.ServiceId}/durablejobs";
72+
await foreach (var key in server.KeysAsync(pattern: $"{keyPrefix}:shard:{_shardPrefix}*"))
6373
{
6474
await db.KeyDeleteAsync(key);
6575
}
6676

6777
// Delete the shard set key
68-
await db.KeyDeleteAsync($"durablejobs:shards:{_shardPrefix}");
78+
await db.KeyDeleteAsync($"{keyPrefix}:shards:{_shardPrefix}");
6979

7080
await _multiplexer.CloseAsync();
7181
_multiplexer.Dispose();

0 commit comments

Comments
 (0)