diff --git a/Orleans.slnx b/Orleans.slnx index dd0bac06746..d05fb4e7945 100644 --- a/Orleans.slnx +++ b/Orleans.slnx @@ -98,6 +98,7 @@ + diff --git a/src/Redis/Orleans.DurableJobs.Redis/Hosting/RedisDurableJobsExtensions.cs b/src/Redis/Orleans.DurableJobs.Redis/Hosting/RedisDurableJobsExtensions.cs new file mode 100644 index 00000000000..3d3afa213c9 --- /dev/null +++ b/src/Redis/Orleans.DurableJobs.Redis/Hosting/RedisDurableJobsExtensions.cs @@ -0,0 +1,64 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using Orleans.Configuration; +using Orleans.Configuration.Internal; +using Orleans.DurableJobs; +using Orleans.DurableJobs.Redis; + +namespace Orleans.Hosting; + +/// +/// Extensions for configuring Redis durable jobs. +/// +public static class RedisDurableJobsExtensions +{ + /// + /// Adds durable jobs storage backed by Redis. + /// + /// The builder. + /// The delegate used to configure the durable jobs storage. + /// The provided , for chaining. + public static ISiloBuilder UseRedisDurableJobs(this ISiloBuilder builder, Action configure) + { + builder.ConfigureServices(services => services.UseRedisDurableJobs(configure)); + return builder; + } + + /// + /// Adds durable jobs storage backed by Redis. + /// + /// The builder. + /// The configuration delegate. + /// The provided , for chaining. + public static ISiloBuilder UseRedisDurableJobs(this ISiloBuilder builder, Action> configureOptions) + { + builder.ConfigureServices(services => services.UseRedisDurableJobs(configureOptions)); + return builder; + } + + /// + /// Adds durable jobs storage backed by Redis. + /// + /// The service collection. + /// The delegate used to configure the durable jobs storage. + /// The provided , for chaining. + public static IServiceCollection UseRedisDurableJobs(this IServiceCollection services, Action configure) + => services.UseRedisDurableJobs(builder => builder.Configure(configure)); + + /// + /// Adds durable jobs storage backed by Redis. + /// + /// The service collection. + /// The configuration delegate. + /// The provided , for chaining. + public static IServiceCollection UseRedisDurableJobs(this IServiceCollection services, Action>? configureOptions) + { + services.AddDurableJobs(); + services.AddSingleton(); + services.AddFromExisting(); + configureOptions?.Invoke(services.AddOptions()); + services.ConfigureFormatter(); + services.AddTransient(sp => new RedisJobShardOptionsValidator(sp.GetRequiredService>().Get(Options.DefaultName), Options.DefaultName)); + return services; + } +} diff --git a/src/Redis/Orleans.DurableJobs.Redis/Hosting/RedisJobShardOptions.cs b/src/Redis/Orleans.DurableJobs.Redis/Hosting/RedisJobShardOptions.cs new file mode 100644 index 00000000000..f3929633f65 --- /dev/null +++ b/src/Redis/Orleans.DurableJobs.Redis/Hosting/RedisJobShardOptions.cs @@ -0,0 +1,75 @@ +using StackExchange.Redis; + +namespace Orleans.Hosting; + +/// +/// Options for configuring the Redis durable jobs provider. +/// +public class RedisJobShardOptions +{ + /// + /// Gets or sets the Redis client configuration. + /// + [RedactRedisConfigurationOptions] + public ConfigurationOptions? ConfigurationOptions { get; set; } + + /// + /// Gets or sets a delegate to create the Redis connection multiplexer. + /// + /// + /// This delegate is called once during initialization to create the connection. + /// + public Func> CreateMultiplexer { get; set; } = DefaultCreateMultiplexer; + + /// + /// Gets or sets the prefix for Redis keys used by the durable jobs provider. + /// + /// + /// This prefix is combined with the service ID to create the final key prefix. + /// If not set, defaults to null and the standard pattern is used: "{ServiceId}/durablejobs". + /// + public string? KeyPrefix { get; set; } + + /// + /// Gets or sets the prefix for shard identifiers. + /// + /// + /// This prefix is used to namespace shards in Redis, allowing multiple applications to share the same Redis instance. + /// + public string ShardPrefix { get; set; } = "shard"; + + /// + /// Gets or sets the maximum number of retries when creating a shard in case of ID collisions. + /// + public int MaxShardCreationRetries { get; set; } = 5; + + /// + /// Gets or sets the maximum number of job operations to batch together in a single write. + /// Default is 128 operations. + /// + public int MaxBatchSize { get; set; } = 128; + + /// + /// Gets or sets the minimum number of job operations to batch together before flushing. + /// Default is 1 operation (immediate flush, optimized for latency). + /// + public int MinBatchSize { get; set; } = 1; + + /// + /// Gets or sets the maximum time to wait for additional operations if the minimum batch size isn't reached + /// before flushing a batch. + /// Default is 100 milliseconds. + /// + public TimeSpan BatchFlushInterval { get; set; } = TimeSpan.FromMilliseconds(100); + + /// + /// The default multiplexer creation delegate. + /// + public static async Task DefaultCreateMultiplexer(RedisJobShardOptions options) + => await ConnectionMultiplexer.ConnectAsync(options.ConfigurationOptions!); +} + +internal class RedactRedisConfigurationOptions : RedactAttribute +{ + public override string Redact(object value) => value is ConfigurationOptions cfg ? cfg.ToString(includePassword: false) : base.Redact(value); +} diff --git a/src/Redis/Orleans.DurableJobs.Redis/Hosting/RedisJobShardOptionsValidator.cs b/src/Redis/Orleans.DurableJobs.Redis/Hosting/RedisJobShardOptionsValidator.cs new file mode 100644 index 00000000000..5aeabc46822 --- /dev/null +++ b/src/Redis/Orleans.DurableJobs.Redis/Hosting/RedisJobShardOptionsValidator.cs @@ -0,0 +1,64 @@ +namespace Orleans.Hosting; + +/// +/// Validates . +/// +public class RedisJobShardOptionsValidator : IConfigurationValidator +{ + private readonly RedisJobShardOptions _options; + private readonly string _name; + + /// + /// Initializes a new instance of the class. + /// + /// The options. + /// The name. + public RedisJobShardOptionsValidator(RedisJobShardOptions options, string name) + { + _options = options; + _name = name; + } + + /// + public void ValidateConfiguration() + { + if (_options.ConfigurationOptions is null) + { + throw new OrleansConfigurationException($"Invalid configuration for {nameof(RedisJobShardOptions)} with name '{_name}'. {nameof(_options.ConfigurationOptions)} is required."); + } + + if (_options.CreateMultiplexer is null) + { + throw new OrleansConfigurationException($"Invalid configuration for {nameof(RedisJobShardOptions)} with name '{_name}'. {nameof(_options.CreateMultiplexer)} is required."); + } + if (string.IsNullOrWhiteSpace(_options.ShardPrefix)) + { + throw new OrleansConfigurationException($"Invalid configuration for {nameof(RedisJobShardOptions)} with name '{_name}'. {nameof(_options.ShardPrefix)} is required."); + } + + if (_options.MaxShardCreationRetries < 1) + { + throw new OrleansConfigurationException($"Invalid configuration for {nameof(RedisJobShardOptions)} with name '{_name}'. {nameof(_options.MaxShardCreationRetries)} must be at least 1."); + } + + if (_options.MaxBatchSize < 1) + { + throw new OrleansConfigurationException($"Invalid configuration for {nameof(RedisJobShardOptions)} with name '{_name}'. {nameof(_options.MaxBatchSize)} must be at least 1."); + } + + if (_options.MinBatchSize < 1) + { + throw new OrleansConfigurationException($"Invalid configuration for {nameof(RedisJobShardOptions)} with name '{_name}'. {nameof(_options.MinBatchSize)} must be at least 1."); + } + + if (_options.MinBatchSize > _options.MaxBatchSize) + { + throw new OrleansConfigurationException($"Invalid configuration for {nameof(RedisJobShardOptions)} with name '{_name}'. {nameof(_options.MinBatchSize)} must not exceed {nameof(_options.MaxBatchSize)}."); + } + + if (_options.BatchFlushInterval < TimeSpan.Zero) + { + throw new OrleansConfigurationException($"Invalid configuration for {nameof(RedisJobShardOptions)} with name '{_name}'. {nameof(_options.BatchFlushInterval)} must not be negative."); + } + } +} diff --git a/src/Redis/Orleans.DurableJobs.Redis/JobOperation.cs b/src/Redis/Orleans.DurableJobs.Redis/JobOperation.cs new file mode 100644 index 00000000000..170eb16481d --- /dev/null +++ b/src/Redis/Orleans.DurableJobs.Redis/JobOperation.cs @@ -0,0 +1,106 @@ +using System.Text.Json.Serialization; + +namespace Orleans.DurableJobs.Redis; + +/// +/// Represents an operation to be performed on a durable job. +/// +internal readonly struct JobOperation +{ + /// + /// The type of operation to perform. + /// + public enum OperationType + { + Add, + Remove, + Retry, + } + + /// + /// Gets or sets the type of operation. + /// + public OperationType Type { get; init; } + + /// + /// Gets or sets the job identifier. + /// + public string Id { get; init; } + + /// + /// Gets or sets the job name (only used for Add operations). + /// + public string? Name { get; init; } + + /// + /// Gets or sets the due time (used for Add and Retry operations). + /// + public DateTimeOffset? DueTime { get; init; } + + /// + /// Gets or sets the target grain ID (only used for Add operations). + /// + public GrainId? TargetGrainId { get; init; } + + /// + /// Gets or sets the job metadata (only used for Add operations). + /// + public IReadOnlyDictionary? Metadata { get; init; } + + /// + /// Creates an Add operation for scheduling a new job. + /// + /// The job identifier. + /// The job name. + /// The job due time. + /// The target grain ID. + /// The job metadata. + /// A new JobOperation for adding a job. + /// Thrown when or is null or empty. + public static JobOperation CreateAddOperation(string id, string name, DateTimeOffset dueTime, GrainId targetGrainId, IReadOnlyDictionary? metadata) + { + ArgumentException.ThrowIfNullOrEmpty(id); + ArgumentException.ThrowIfNullOrEmpty(name); + + return new() { Type = OperationType.Add, Id = id, Name = name, DueTime = dueTime, TargetGrainId = targetGrainId, Metadata = metadata }; + } + + /// + /// Creates a Remove operation for canceling a job. + /// + /// The job identifier. + /// A new JobOperation for removing a job. + /// Thrown when is null or empty. + public static JobOperation CreateRemoveOperation(string id) + { + ArgumentException.ThrowIfNullOrEmpty(id); + + return new() { Type = OperationType.Remove, Id = id }; + } + + /// + /// Creates a Retry operation for rescheduling a job. + /// + /// The job identifier. + /// The new due time. + /// A new JobOperation for retrying a job. + /// Thrown when is null or empty. + public static JobOperation CreateRetryOperation(string id, DateTimeOffset dueTime) + { + ArgumentException.ThrowIfNullOrEmpty(id); + + return new() { Type = OperationType.Retry, Id = id, DueTime = dueTime }; + } +} + +/// +/// JSON serialization context for JobOperation with compile-time source generation. +/// +[JsonSerializable(typeof(JobOperation))] +[JsonSourceGenerationOptions( + DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingDefault, + PropertyNamingPolicy = JsonKnownNamingPolicy.CamelCase, + WriteIndented = false)] +internal partial class JobOperationJsonContext : JsonSerializerContext +{ +} diff --git a/src/Redis/Orleans.DurableJobs.Redis/Orleans.DurableJobs.Redis.csproj b/src/Redis/Orleans.DurableJobs.Redis/Orleans.DurableJobs.Redis.csproj new file mode 100644 index 00000000000..616e697bccf --- /dev/null +++ b/src/Redis/Orleans.DurableJobs.Redis/Orleans.DurableJobs.Redis.csproj @@ -0,0 +1,32 @@ + + + + README.md + Microsoft.Orleans.DurableJobs.Redis + Microsoft Orleans Redis Durable Jobs Provider + Microsoft Orleans durable jobs provider backed by Redis + $(PackageTags) Redis + $(DefaultTargetFrameworks) + Orleans.DurableJobs.Redis + Orleans.DurableJobs.Redis + true + $(DefineConstants) + enable + $(VersionSuffix).alpha.1 + alpha.1 + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/Redis/Orleans.DurableJobs.Redis/README.md b/src/Redis/Orleans.DurableJobs.Redis/README.md new file mode 100644 index 00000000000..8647a88221a --- /dev/null +++ b/src/Redis/Orleans.DurableJobs.Redis/README.md @@ -0,0 +1,65 @@ +# Microsoft Orleans Durable Jobs for Redis + +## Introduction +Microsoft Orleans Durable Jobs for Redis provides persistent storage for Orleans Durable Jobs using Redis Streams. This allows your Orleans applications to schedule jobs that survive silo restarts, grain deactivation, and cluster reconfigurations. Jobs are stored in Redis Streams, providing efficient storage and retrieval for time-based job scheduling. + +## Getting Started + +### Installation +To use this package, install it via NuGet along with the core package: + +```shell +dotnet add package Microsoft.Orleans.DurableJobs +dotnet add package Microsoft.Orleans.DurableJobs.Redis +``` + +### Configuration +Configure the Redis durable jobs provider in your silo configuration: + +```csharp +siloBuilder.UseRedisDurableJobs(options => +{ + options.ConfigurationOptions = ConfigurationOptions.Parse("localhost:6379"); + options.ShardPrefix = "my-app"; // Optional: prefix for shard keys +}); +``` + +### Advanced Configuration - Custom Key Prefix + +By default, Redis keys are prefixed with `{ServiceId}/durablejobs` (e.g., `my-service/durablejobs:shards:shard`). You can customize this prefix: + +```csharp +siloBuilder.UseRedisDurableJobs(options => +{ + options.ConfigurationOptions = ConfigurationOptions.Parse("localhost:6379"); + options.KeyPrefix = "custom-prefix"; // Custom Redis key prefix + options.ShardPrefix = "my-app"; +}); +``` + +This will result in Redis keys like `custom-prefix:shards:my-app` instead of the default pattern. + +### Configuration Options + +| Option | Description | Default | +|--------|-------------|---------| +| `ConfigurationOptions` | Redis client configuration options (from StackExchange.Redis) | Required | +| `CreateMultiplexer` | Optional delegate for custom connection logic (advanced scenarios) | Uses `ConfigurationOptions` | +| `KeyPrefix` | Custom prefix for all Redis keys. If not set, defaults to `{ServiceId}/durablejobs` | `null` | +| `ShardPrefix` | Prefix for shard identifiers in Redis | `"shard"` | +| `MaxShardCreationRetries` | Maximum retries when creating a shard | `5` | +| `MaxBatchSize` | Maximum operations per batch write | `128` | +| `MinBatchSize` | Minimum operations before flush | `1` | +| `BatchFlushInterval` | Time to wait for more operations | `100ms` | + +## Documentation +For more comprehensive documentation, please refer to: +- [Microsoft Orleans Documentation](https://learn.microsoft.com/dotnet/orleans/) +- [Orleans Durable Jobs Core Package](../../Orleans.DurableJobs/README.md) + +## Feedback & Contributing +- If you have any issues or would like to provide feedback, please [open an issue on GitHub](https://github.com/dotnet/orleans/issues) +- Join our community on [Discord](https://aka.ms/orleans-discord) +- Follow the [@msftorleans](https://twitter.com/msftorleans) Twitter account for Orleans announcements +- Contributions are welcome! Please review our [contribution guidelines](https://github.com/dotnet/orleans/blob/main/CONTRIBUTING.md) +- This project is licensed under the [MIT license](https://github.com/dotnet/orleans/blob/main/LICENSE) diff --git a/src/Redis/Orleans.DurableJobs.Redis/RedisJobShard.Log.cs b/src/Redis/Orleans.DurableJobs.Redis/RedisJobShard.Log.cs new file mode 100644 index 00000000000..014e77c9e80 --- /dev/null +++ b/src/Redis/Orleans.DurableJobs.Redis/RedisJobShard.Log.cs @@ -0,0 +1,96 @@ +using Microsoft.Extensions.Logging; + +namespace Orleans.DurableJobs.Redis; + +internal sealed partial class RedisJobShard +{ + [LoggerMessage( + Level = LogLevel.Information, + Message = "Initializing shard '{ShardId}' from Redis Stream {StreamKey}" + )] + private static partial void LogInitializingShard(ILogger logger, string shardId, string streamKey); + + [LoggerMessage( + Level = LogLevel.Information, + Message = "Shard '{ShardId}' initialized from Redis: loaded {JobCount} job(s) in {ElapsedMilliseconds}ms" + )] + private static partial void LogShardInitialized(ILogger logger, string shardId, int jobCount, long elapsedMilliseconds); + + [LoggerMessage( + Level = LogLevel.Debug, + Message = "Adding job '{JobId}' (Name: '{JobName}') to shard '{ShardId}' with due time {DueTime}" + )] + private static partial void LogAddingJob(ILogger logger, string jobId, string jobName, string shardId, DateTimeOffset dueTime); + + [LoggerMessage( + Level = LogLevel.Debug, + Message = "Removing job '{JobId}' from shard '{ShardId}'" + )] + private static partial void LogRemovingJob(ILogger logger, string jobId, string shardId); + + [LoggerMessage( + Level = LogLevel.Debug, + Message = "Retrying job '{JobId}' in shard '{ShardId}' with new due time {NewDueTime}" + )] + private static partial void LogRetryingJob(ILogger logger, string jobId, string shardId, DateTimeOffset newDueTime); + + [LoggerMessage( + Level = LogLevel.Trace, + Message = "Flushing batch of {OperationCount} job operation(s) to shard '{ShardId}'" + )] + private static partial void LogFlushingBatch(ILogger logger, int operationCount, string shardId); + + [LoggerMessage( + Level = LogLevel.Debug, + Message = "Batch of {OperationCount} job operation(s) written to shard '{ShardId}' in {ElapsedMilliseconds}ms. Total committed blocks: {CommittedBlockCount}" + )] + private static partial void LogBatchWritten(ILogger logger, int operationCount, string shardId, long elapsedMilliseconds, int committedBlockCount); + + [LoggerMessage( + Level = LogLevel.Trace, + Message = "Updating metadata for shard '{ShardId}'" + )] + private static partial void LogUpdatingMetadata(ILogger logger, string shardId); + + [LoggerMessage( + Level = LogLevel.Debug, + Message = "Metadata updated for shard '{ShardId}'" + )] + private static partial void LogMetadataUpdated(ILogger logger, string shardId); + + [LoggerMessage( + Level = LogLevel.Error, + Message = "Error writing batch of {OperationCount} operation(s) to shard '{ShardId}'" + )] + private static partial void LogErrorWritingBatch(ILogger logger, Exception exception, int operationCount, string shardId); + + [LoggerMessage( + Level = LogLevel.Error, + Message = "Error updating metadata for shard '{ShardId}'" + )] + private static partial void LogErrorUpdatingMetadata(ILogger logger, Exception exception, string shardId); + + [LoggerMessage( + Level = LogLevel.Debug, + Message = "Stopping storage processor for shard '{ShardId}'" + )] + private static partial void LogStoppingProcessor(ILogger logger, string shardId); + + [LoggerMessage( + Level = LogLevel.Information, + Message = "Storage processor stopped for shard '{ShardId}'" + )] + private static partial void LogProcessorStopped(ILogger logger, string shardId); + + [LoggerMessage( + Level = LogLevel.Trace, + Message = "Processing storage operation queue for shard '{ShardId}'" + )] + private static partial void LogProcessingStorageQueue(ILogger logger, string shardId); + + [LoggerMessage( + Level = LogLevel.Debug, + Message = "Waiting for additional operations to batch (current size: {CurrentSize}, min size: {MinSize}) for shard '{ShardId}'" + )] + private static partial void LogWaitingForBatch(ILogger logger, int currentSize, int minSize, string shardId); +} diff --git a/src/Redis/Orleans.DurableJobs.Redis/RedisJobShard.cs b/src/Redis/Orleans.DurableJobs.Redis/RedisJobShard.cs new file mode 100644 index 00000000000..41270f84867 --- /dev/null +++ b/src/Redis/Orleans.DurableJobs.Redis/RedisJobShard.cs @@ -0,0 +1,365 @@ +using System.Diagnostics; +using System.Threading.Channels; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Orleans.Configuration; +using StackExchange.Redis; + +namespace Orleans.DurableJobs.Redis; + +internal sealed partial class RedisJobShard : JobShard +{ + private readonly IConnectionMultiplexer _redis; + private readonly RedisOperationsManager _redisOps; + private readonly Channel _storageOperationChannel; + private readonly Task _storageProcessorTask; + private readonly CancellationTokenSource _shutdownCts = new(); + private readonly RedisJobShardOptions _options; + private readonly ILogger _logger; + + private readonly string _streamKey; + private readonly string _metaKey; + + internal long MetadataVersion { get; private set; } + + /// + /// Initializes a new instance of the class. + /// + /// The unique identifier for this shard. + /// The start time of the shard's time range. + /// The end time of the shard's time range. + /// The Redis connection multiplexer. + /// The shard metadata. + /// The Redis key prefix to use. + /// The Redis job shard options. + /// The logger. + public RedisJobShard(string shardId, + DateTimeOffset startTime, + DateTimeOffset endTime, + IConnectionMultiplexer redis, + IDictionary metadata, + string keyPrefix, + RedisJobShardOptions options, + ILogger logger) + : base(shardId, startTime, endTime) + { + _redis = redis ?? throw new ArgumentNullException(nameof(redis)); + var db = _redis.GetDatabase(); + _redisOps = new RedisOperationsManager(db); + _options = options ?? throw new ArgumentNullException(nameof(options)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + Metadata = metadata; + + // Initialize metadata version from metadata dictionary + if (metadata.TryGetValue("version", out var versionStr) && long.TryParse(versionStr, out var version)) + { + MetadataVersion = version; + } + + _streamKey = $"{keyPrefix}:shard:{Id}:stream"; + _metaKey = $"{keyPrefix}:shard:{Id}:meta"; + + _storageOperationChannel = Channel.CreateUnbounded(new UnboundedChannelOptions + { + SingleReader = true, + SingleWriter = false + }); + + _storageProcessorTask = ProcessStorageOperationsAsync(); + } + + // Initialize: replay the Redis stream and rebuild in-memory queue (same logic as Azure) + public async Task InitializeAsync(CancellationToken cancellationToken) + { + LogInitializingShard(_logger, Id, _streamKey); + var sw = Stopwatch.StartNew(); + + // Replay stream from beginning + var addedJobs = new Dictionary(); + var deletedJobs = new HashSet(); + var jobRetryCounters = new Dictionary(); + + // StreamRange with "-" .. "+" returns all entries. If stream does not exist, returns empty. + var streamEntries = _redisOps.StreamRange(_streamKey, "-", "+"); + + await foreach (var operation in RedisStreamJsonSerializer.DecodeAsync(streamEntries, JobOperationJsonContext.Default.JobOperation, cancellationToken)) + { + switch (operation.Type) + { + case JobOperation.OperationType.Add: + if (!deletedJobs.Contains(operation.Id)) + { + addedJobs[operation.Id] = operation; + } + break; + case JobOperation.OperationType.Remove: + deletedJobs.Add(operation.Id); + addedJobs.Remove(operation.Id); + jobRetryCounters.Remove(operation.Id); + break; + case JobOperation.OperationType.Retry: + if (!deletedJobs.Contains(operation.Id)) + { + if (!jobRetryCounters.ContainsKey(operation.Id)) + { + jobRetryCounters[operation.Id] = (1, operation.DueTime); + } + else + { + var (dequeueCount, _) = jobRetryCounters[operation.Id]; + jobRetryCounters[operation.Id] = (dequeueCount + 1, operation.DueTime); + } + } + break; + } + } + + // Rebuild the priority queue in memory (use EnqueueJob) + foreach (var op in addedJobs.Values) + { + var retryCounter = 0; + var dueTime = op.DueTime!.Value; + if (jobRetryCounters.TryGetValue(op.Id, out var retryEntries)) + { + retryCounter = retryEntries.dequeueCount; + dueTime = retryEntries.newDueTime ?? dueTime; + } + + EnqueueJob(new DurableJob + { + Id = op.Id, + Name = op.Name!, + DueTime = dueTime, + TargetGrainId = op.TargetGrainId!.Value, + ShardId = Id, + Metadata = op.Metadata, + }, retryCounter); + } + + sw.Stop(); + LogShardInitialized(_logger, Id, addedJobs.Count, sw.ElapsedMilliseconds); + } + + protected override async Task PersistAddJobAsync(string jobId, string jobName, DateTimeOffset dueTime, GrainId target, IReadOnlyDictionary? metadata, CancellationToken cancellationToken) + { + LogAddingJob(_logger, jobId, jobName, Id, dueTime); + var operation = JobOperation.CreateAddOperation(jobId, jobName, dueTime, target, metadata); + await EnqueueStorageOperationAsync(StorageOperation.CreateAppendOperation(operation), cancellationToken); + } + + protected override async Task PersistRemoveJobAsync(string jobId, CancellationToken cancellationToken) + { + LogRemovingJob(_logger, jobId, Id); + var operation = JobOperation.CreateRemoveOperation(jobId); + await EnqueueStorageOperationAsync(StorageOperation.CreateAppendOperation(operation), cancellationToken); + } + + protected override async Task PersistRetryJobAsync(string jobId, DateTimeOffset newDueTime, CancellationToken cancellationToken) + { + LogRetryingJob(_logger, jobId, Id, newDueTime); + var operation = JobOperation.CreateRetryOperation(jobId, newDueTime); + await EnqueueStorageOperationAsync(StorageOperation.CreateAppendOperation(operation), cancellationToken); + } + + public async Task UpdateShardMetadataAsync(IDictionary metadata, long expectedVersion, CancellationToken cancellationToken) + { + LogUpdatingMetadata(_logger, Id); + await EnqueueStorageOperationAsync(StorageOperation.CreateMetadataOperation(metadata, expectedVersion), cancellationToken); + } + + private async Task EnqueueStorageOperationAsync(StorageOperation operation, CancellationToken cancellationToken) + { + await _storageOperationChannel.Writer.WriteAsync(operation, cancellationToken); + await operation.CompletionSource.Task; + } + + private async Task ProcessStorageOperationsAsync() + { + await Task.CompletedTask.ConfigureAwait(ConfigureAwaitOptions.ContinueOnCapturedContext | ConfigureAwaitOptions.ForceYielding); + var cancellationToken = _shutdownCts.Token; + var batchOperations = new List(_options.MaxBatchSize); + + try + { + while (await _storageOperationChannel.Reader.WaitToReadAsync(cancellationToken)) + { + if (!_storageOperationChannel.Reader.TryRead(out var firstOperation)) + { + continue; + } + + // Metadata ops are handled immediately and atomically via Lua + if (firstOperation.Type == StorageOperationType.UpdateMetadata) + { + try + { + var success = await _redisOps.UpdateMetadataAsync(_metaKey, firstOperation.Metadata!, firstOperation.ExpectedVersion).ConfigureAwait(false); + if (!success) + { + throw new InvalidOperationException("Metadata CAS failed - version mismatch."); + } + + // Update local metadata tracking to keep in-memory state synchronized with Redis. + // This is necessary because RedisJobShard maintains stateful metadata (Metadata, MetadataVersion) + // while RedisOperationsManager is stateless and only performs Redis operations. + Metadata = new Dictionary(firstOperation.Metadata!); + var newVersion = (firstOperation.ExpectedVersion + 1).ToString(); + Metadata["version"] = newVersion; + MetadataVersion = long.Parse(newVersion); + + LogMetadataUpdated(_logger, Id); + firstOperation.CompletionSource.TrySetResult(); + } + catch (Exception ex) + { + LogErrorUpdatingMetadata(_logger, ex, Id); + firstOperation.CompletionSource.TrySetException(ex); + } + continue; + } + + // collect job ops for batch + batchOperations.Add(firstOperation); + + // Try to collect more operations up to the maximum batch size + if (TryCollectJobOperationsForBatch(batchOperations) && batchOperations.Count < _options.MaxBatchSize) + { + // Not enough operations to meet the minimum batch size, wait for more or timeout + if (batchOperations.Count < _options.MinBatchSize) + { + LogWaitingForBatch(_logger, batchOperations.Count, _options.MinBatchSize, Id); + } + await Task.Delay(_options.BatchFlushInterval, cancellationToken); + TryCollectJobOperationsForBatch(batchOperations); + } + + if (batchOperations.Count > 0) + { + try + { + LogFlushingBatch(_logger, batchOperations.Count, Id); + await AppendJobOperationBatchAsync(batchOperations, cancellationToken); + foreach (var op in batchOperations) + { + op.CompletionSource.TrySetResult(); + } + } + catch (Exception ex) + { + LogErrorWritingBatch(_logger, ex, batchOperations.Count, Id); + foreach (var op in batchOperations) + { + op.CompletionSource.TrySetException(ex); + } + } + finally + { + batchOperations.Clear(); + } + } + } + } + catch (OperationCanceledException) + { + } + finally + { + while (_storageOperationChannel.Reader.TryRead(out var op)) + { + op.CompletionSource?.TrySetCanceled(cancellationToken); + } + } + + // Local function to collect job operations for batching. Returns true if more operations can be collected. + bool TryCollectJobOperationsForBatch(List batchOperations) + { + // Collect more jobs, up to a maximum batch size + while (batchOperations.Count < _options.MaxBatchSize && _storageOperationChannel.Reader.TryPeek(out var nextOperation)) + { + if (nextOperation.Type is StorageOperationType.UpdateMetadata) + { + // Stop batching if we encounter a metadata operation + return false; + } + _storageOperationChannel.Reader.TryRead(out var operation); + Debug.Assert(operation != null); + batchOperations.Add(operation!); + } + return batchOperations.Count != _options.MaxBatchSize; + } + } + + private async Task AppendJobOperationBatchAsync(List operations, CancellationToken cancellationToken) + { + var sw = Stopwatch.StartNew(); + + var jobOperations = operations.Select(op => op.JobOperation!.Value); + var payloads = RedisStreamJsonSerializer.Encode(jobOperations, JobOperationJsonContext.Default.JobOperation); + + var result = await _redisOps.AppendJobOperationBatchAsync(_streamKey, payloads).ConfigureAwait(false); + // result is array of ids, but we don't use them for now + sw.Stop(); + LogBatchWritten(_logger, operations.Count, Id, sw.ElapsedMilliseconds, -1); + } + + internal async Task StopProcessorAsync(CancellationToken cancellationToken) + { + LogStoppingProcessor(_logger, Id); + + if (_storageOperationChannel.Writer.TryComplete()) + { + _shutdownCts.Cancel(); + } + + try + { + await _storageProcessorTask.WaitAsync(cancellationToken); + LogProcessorStopped(_logger, Id); + } + catch (OperationCanceledException) + { + LogProcessorStopped(_logger, Id); + } + } + + public override async ValueTask DisposeAsync() + { + await StopProcessorAsync(CancellationToken.None); + _shutdownCts.Dispose(); + await base.DisposeAsync(); + } +} + +internal enum StorageOperationType +{ + AppendJobOperation, + UpdateMetadata +} + +internal sealed class StorageOperation +{ + public required StorageOperationType Type { get; init; } + public JobOperation? JobOperation { get; init; } + public IDictionary? Metadata { get; init; } + public long ExpectedVersion { get; init; } + public TaskCompletionSource CompletionSource { get; init; } = new(TaskCreationOptions.RunContinuationsAsynchronously); + + public static StorageOperation CreateAppendOperation(JobOperation jobOperation) + { + return new StorageOperation + { + Type = StorageOperationType.AppendJobOperation, + JobOperation = jobOperation + }; + } + + public static StorageOperation CreateMetadataOperation(IDictionary metadata, long expectedVersion) + { + return new StorageOperation + { + Type = StorageOperationType.UpdateMetadata, + Metadata = metadata, + ExpectedVersion = expectedVersion + }; + } +} diff --git a/src/Redis/Orleans.DurableJobs.Redis/RedisJobShardManager.Log.cs b/src/Redis/Orleans.DurableJobs.Redis/RedisJobShardManager.Log.cs new file mode 100644 index 00000000000..dd9cbbd92b1 --- /dev/null +++ b/src/Redis/Orleans.DurableJobs.Redis/RedisJobShardManager.Log.cs @@ -0,0 +1,156 @@ +using Microsoft.Extensions.Logging; + +namespace Orleans.DurableJobs.Redis; + +public sealed partial class RedisJobShardManager +{ + [LoggerMessage( + Level = LogLevel.Information, + Message = "Initializing RedisJobShardManager (shardPrefix={Prefix})" + )] + private static partial void LogInitializing(ILogger logger, string prefix); + + [LoggerMessage( + Level = LogLevel.Information, + Message = "RedisJobShardManager initialized" + )] + private static partial void LogInitialized(ILogger logger); + + [LoggerMessage( + Level = LogLevel.Information, + Message = "Assigning shards up to {MaxShardStartTime} (prefix={Prefix})" + )] + private static partial void LogAssigningShards(ILogger logger, DateTimeOffset maxShardStartTime, string prefix); + + [LoggerMessage( + Level = LogLevel.Debug, + Message = "Shard {ShardId} is too new (start {Start}) > max {Max}, skipping" + )] + private static partial void LogShardTooNew(ILogger logger, string shardId, DateTimeOffset start, DateTimeOffset max); + + [LoggerMessage( + Level = LogLevel.Debug, + Message = "Shard {ShardId} is owned by this silo and in cache" + )] + private static partial void LogShardOwnedByThisSilo(ILogger logger, string shardId); + + [LoggerMessage( + Level = LogLevel.Warning, + Message = "Shard {ShardId} metadata says owned by this silo but not in cache; releasing ownership" + )] + private static partial void LogShardOwnedButNotInCache(ILogger logger, string shardId); + + [LoggerMessage( + Level = LogLevel.Warning, + Message = "Failed to release ownership of shard {ShardId}" + )] + private static partial void LogFailedToReleaseOwnership(ILogger logger, Exception exception, string shardId); + + [LoggerMessage( + Level = LogLevel.Debug, + Message = "Shard {ShardId} owned by active silo {Owner}, skipping" + )] + private static partial void LogShardOwnedByActiveSilo(ILogger logger, string shardId, string owner); + + [LoggerMessage( + Level = LogLevel.Information, + Message = "Claiming orphaned shard {ShardId} (old owner: {Owner})" + )] + private static partial void LogClaimingOrphanedShard(ILogger logger, string shardId, string? owner); + + [LoggerMessage( + Level = LogLevel.Information, + Message = "Failed to take ownership: another silo likely took shard {ShardId}" + )] + private static partial void LogFailedToTakeOwnership(ILogger logger, string shardId); + + [LoggerMessage( + Level = LogLevel.Information, + Message = "Shard {ShardId} assigned to this silo" + )] + private static partial void LogShardAssigned(ILogger logger, string shardId); + + [LoggerMessage( + Level = LogLevel.Warning, + Message = "Failed initializing shard {ShardId} after taking ownership; releasing" + )] + private static partial void LogFailedInitializingShard(ILogger logger, Exception exception, string shardId); + + [LoggerMessage( + Level = LogLevel.Warning, + Message = "Failed to release shard {ShardId} after failed init" + )] + private static partial void LogFailedToReleaseShardAfterFailedInit(ILogger logger, Exception exception, string shardId); + + [LoggerMessage( + Level = LogLevel.Information, + Message = "AssignJobShardsAsync completed, returning {Count} shards" + )] + private static partial void LogAssignmentCompleted(ILogger logger, int count); + + [LoggerMessage( + Level = LogLevel.Information, + Message = "Registering new shard (min={Min}, max={Max})" + )] + private static partial void LogRegisteringShard(ILogger logger, DateTimeOffset min, DateTimeOffset max); + + [LoggerMessage( + Level = LogLevel.Warning, + Message = "Shard id collision for {ShardId}, retrying (attempt {Attempt})" + )] + private static partial void LogShardIdCollision(ILogger logger, string shardId, int attempt); + + [LoggerMessage( + Level = LogLevel.Information, + Message = "Shard {ShardId} registered and assigned to this silo" + )] + private static partial void LogShardRegistered(ILogger logger, string shardId); + + [LoggerMessage( + Level = LogLevel.Warning, + Message = "Error creating shard {ShardId} attempt {Attempt}" + )] + private static partial void LogErrorCreatingShard(ILogger logger, Exception exception, string shardId, int attempt); + + [LoggerMessage( + Level = LogLevel.Warning, + Message = "UnregisterShardAsync called with non-RedisJobShard instance; disposing generically" + )] + private static partial void LogUnregisterNonRedisJobShard(ILogger logger); + + [LoggerMessage( + Level = LogLevel.Information, + Message = "Unregistering shard {ShardId}" + )] + private static partial void LogUnregisteringShard(ILogger logger, string shardId); + + [LoggerMessage( + Level = LogLevel.Information, + Message = "Released ownership of shard {ShardId} with {Count} remaining jobs" + )] + private static partial void LogShardOwnershipReleased(ILogger logger, string shardId, int count); + + [LoggerMessage( + Level = LogLevel.Warning, + Message = "Failed to release ownership for shard {ShardId}" + )] + private static partial void LogFailedToReleaseOwnershipForShard(ILogger logger, Exception exception, string shardId); + + [LoggerMessage( + Level = LogLevel.Information, + Message = "Deleted shard {ShardId} (no remaining jobs)" + )] + private static partial void LogShardDeleted(ILogger logger, string shardId); + + [LoggerMessage( + Level = LogLevel.Warning, + Message = "Failed to delete shard {ShardId}" + )] + private static partial void LogFailedToDeleteShard(ILogger logger, Exception exception, string shardId); + + [LoggerMessage( + Level = LogLevel.Warning, + Message = "Error disposing shard {ShardId}" + )] + private static partial void LogErrorDisposingShard(ILogger logger, Exception exception, string shardId); +} diff --git a/src/Redis/Orleans.DurableJobs.Redis/RedisJobShardManager.cs b/src/Redis/Orleans.DurableJobs.Redis/RedisJobShardManager.cs new file mode 100644 index 00000000000..4f4b4a26edb --- /dev/null +++ b/src/Redis/Orleans.DurableJobs.Redis/RedisJobShardManager.cs @@ -0,0 +1,347 @@ +using System.Collections.Concurrent; +using System.Globalization; +using System.Text; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Orleans.Configuration; +using StackExchange.Redis; + +namespace Orleans.DurableJobs.Redis; + +/// +/// Redis-based implementation of that stores job shards in Redis. +/// +public sealed partial class RedisJobShardManager : JobShardManager +{ + private readonly ILocalSiloDetails _localSiloDetails; + private readonly IClusterMembershipService _clusterMembership; + private readonly RedisJobShardOptions _options; + private readonly ClusterOptions _clusterOptions; + private readonly ILogger _logger; + private readonly ILoggerFactory _loggerFactory; + private readonly RedisKey _keyPrefix; + + private IConnectionMultiplexer? _multiplexer; + private RedisOperationsManager? _redisOps; + + // in-memory cache of owned shards + private readonly ConcurrentDictionary _jobShardCache = new(); + + private long _shardCounter; + + /// + /// Initializes a new instance of the class. + /// + /// The local silo details. + /// The Redis job shard options. + /// The cluster options. + /// The cluster membership service. + /// The logger factory. + public RedisJobShardManager( + ILocalSiloDetails localSiloDetails, + IOptions options, + IOptions clusterOptions, + IClusterMembershipService clusterMembership, + ILoggerFactory loggerFactory) + : base(localSiloDetails.SiloAddress) + { + _localSiloDetails = localSiloDetails ?? throw new ArgumentNullException(nameof(localSiloDetails)); + _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); + _clusterOptions = clusterOptions?.Value ?? throw new ArgumentNullException(nameof(clusterOptions)); + _clusterMembership = clusterMembership ?? throw new ArgumentNullException(nameof(clusterMembership)); + _loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory)); + _logger = loggerFactory.CreateLogger(); + + _keyPrefix = Encoding.UTF8.GetBytes( + _options.KeyPrefix ?? $"{_clusterOptions.ServiceId}/durablejobs"); + } + + private async ValueTask InitializeIfNeeded() + { + if (_redisOps is not null) + { + return; + } + + LogInitializing(_logger, _options.ShardPrefix); + _multiplexer = await _options.CreateMultiplexer(_options).ConfigureAwait(false); + var db = _multiplexer.GetDatabase(); + _redisOps = new RedisOperationsManager(db); + LogInitialized(_logger); + } + + private string ShardSetKey => $"{_keyPrefix}:shards:{_options.ShardPrefix}"; + private string MetaKeyForShard(string shardId) => $"{_keyPrefix}:shard:{shardId}:meta"; + + public override async Task> AssignJobShardsAsync(DateTimeOffset maxDueTime, CancellationToken cancellationToken) + { + await InitializeIfNeeded().ConfigureAwait(false); + LogAssigningShards(_logger, maxDueTime, _options.ShardPrefix); + + var result = new List(); + + // get all shard ids + var shardIds = await _redisOps!.GetSetMembersAsync(ShardSetKey).ConfigureAwait(false); + + foreach (var shardId in shardIds) + { + cancellationToken.ThrowIfCancellationRequested(); + + var metaKey = MetaKeyForShard(shardId); + var metadata = await _redisOps.GetHashAllAsync(metaKey).ConfigureAwait(false); + + // parse values + metadata.TryGetValue("Owner", out var ownerStr); + metadata.TryGetValue("MembershipVersion", out var membershipVersionStr); + metadata.TryGetValue("MinDueTime", out var minDueStr); + + // refresh membership if remote higher + if (!string.IsNullOrEmpty(membershipVersionStr) && long.TryParse(membershipVersionStr, out var memVer)) + { + var memVersion = new MembershipVersion(memVer); + if (memVersion > _clusterMembership.CurrentSnapshot.Version) + { + await _clusterMembership.Refresh(memVersion, cancellationToken).ConfigureAwait(false); + } + } + + if (!string.IsNullOrEmpty(minDueStr) && DateTimeOffset.TryParse(minDueStr, null, DateTimeStyles.RoundtripKind, out var shardStartTime)) + { + if (shardStartTime > maxDueTime) + { + LogShardTooNew(_logger, shardId, shardStartTime, maxDueTime); + continue; + } + } + + // If I am the owner + if (!string.IsNullOrEmpty(ownerStr) && ownerStr == _localSiloDetails.SiloAddress.ToParsableString()) + { + if (_jobShardCache.TryGetValue(shardId, out var cached)) + { + LogShardOwnedByThisSilo(_logger, shardId); + result.Add(cached); + continue; + } + else + { + LogShardOwnedButNotInCache(_logger, shardId); + try + { + await ReleaseOwnershipAsync(shardId).ConfigureAwait(false); + } + catch (Exception ex) + { + LogFailedToReleaseOwnership(_logger, ex, shardId); + } + continue; + } + } + + // If owner exists and is active, skip + if (!string.IsNullOrEmpty(ownerStr)) + { + try + { + var ownerAddr = SiloAddress.FromParsableString(ownerStr); + var ownerStatus = _clusterMembership.CurrentSnapshot.GetSiloStatus(ownerAddr); + if (ownerStatus is not SiloStatus.Dead and not SiloStatus.None) + { + LogShardOwnedByActiveSilo(_logger, shardId, ownerStr); + continue; + } + } + catch + { + // If parsing fails, treat as orphan and try to claim + } + } + + // Try to claim orphaned shard + LogClaimingOrphanedShard(_logger, shardId, ownerStr); + var expectedVersion = metadata.TryGetValue("version", out var v) ? v : "0"; + var took = await _redisOps!.TryTakeOwnershipAsync( + metaKey, + expectedVersion, + _localSiloDetails.SiloAddress.ToParsableString(), + _clusterMembership.CurrentSnapshot.Version.Value.ToString()).ConfigureAwait(false); + if (!took) + { + LogFailedToTakeOwnership(_logger, shardId); + continue; + } + + // instantiate shard and initialize + var minDue = ParseDateTimeOffset(metadata, "MinDueTime", DateTimeOffset.MinValue); + var maxDue = ParseDateTimeOffset(metadata, "MaxDueTime", DateTimeOffset.MaxValue); + + var shard = new RedisJobShard(shardId, minDue, maxDue, _multiplexer!, metadata, _keyPrefix.ToString(), _options, _loggerFactory.CreateLogger()); + try + { + await shard.InitializeAsync(cancellationToken).ConfigureAwait(false); + // same behavior as Azure manager: shards just taken are not used to add new jobs + await shard.MarkAsCompleteAsync(cancellationToken).ConfigureAwait(false); + + _jobShardCache[shardId] = shard; + LogShardAssigned(_logger, shardId); + result.Add(shard); + } + catch (Exception ex) + { + LogFailedInitializingShard(_logger, ex, shardId); + try + { + await ReleaseOwnershipAsync(shardId).ConfigureAwait(false); + } + catch (Exception releaseEx) + { + LogFailedToReleaseShardAfterFailedInit(_logger, releaseEx, shardId); + } + + await shard.DisposeAsync(); + } + } + + LogAssignmentCompleted(_logger, result.Count); + return result; + } + + public override async Task CreateShardAsync(DateTimeOffset minDueTime, DateTimeOffset maxDueTime, IDictionary metadata, CancellationToken cancellationToken) + { + await InitializeIfNeeded().ConfigureAwait(false); + LogRegisteringShard(_logger, minDueTime, maxDueTime); + + var i = 0; + while (true) + { + i++; + var counter = Interlocked.Increment(ref _shardCounter); + var shardId = $"{_options.ShardPrefix}-{minDueTime:yyyyMMddHHmm}-{_localSiloDetails.SiloAddress.ToParsableString()}-{counter}"; + var metaKey = MetaKeyForShard(shardId); + + var metadataInfo = new Dictionary(metadata) + { + ["MinDueTime"] = minDueTime.ToString("o"), + ["MaxDueTime"] = maxDueTime.ToString("o"), + ["MembershipVersion"] = _clusterMembership.CurrentSnapshot.Version.Value.ToString(CultureInfo.InvariantCulture), + ["Owner"] = _localSiloDetails.SiloAddress.ToParsableString(), + ["Creator"] = _localSiloDetails.SiloAddress.ToParsableString(), + ["version"] = "1" + }; + + try + { + var created = await _redisOps!.CreateShardAsync(metaKey, ShardSetKey, shardId, metadataInfo).ConfigureAwait(false); + + if (!created) + { + LogShardIdCollision(_logger, shardId, i); + if (i >= _options.MaxShardCreationRetries) + { + throw new InvalidOperationException($"Failed to create shard '{shardId}' after {i} attempts"); + } + continue; + } + + var shard = new RedisJobShard(shardId, minDueTime, maxDueTime, _multiplexer!, metadataInfo, _keyPrefix.ToString(), _options, _loggerFactory.CreateLogger()); + await shard.InitializeAsync(cancellationToken).ConfigureAwait(false); + _jobShardCache[shardId] = shard; + LogShardRegistered(_logger, shardId); + return shard; + } + catch (Exception ex) + { + LogErrorCreatingShard(_logger, ex, shardId, i); + if (i >= _options.MaxShardCreationRetries) + { + throw new InvalidOperationException($"Failed to create shard '{shardId}' after {i} attempts", ex); + } + } + } + } + + public override async Task UnregisterShardAsync(IJobShard shard, CancellationToken cancellationToken) + { + if (shard is not RedisJobShard redisShard) + { + LogUnregisterNonRedisJobShard(_logger); + await shard.DisposeAsync(); + return; + } + + var shardId = redisShard.Id; + LogUnregisteringShard(_logger, shardId); + + // Stop the background storage processor to ensure no more changes can happen + await redisShard.StopProcessorAsync(cancellationToken).ConfigureAwait(false); + + // Now we can safely get a consistent view of the state + var count = await shard.GetJobCountAsync().ConfigureAwait(false); + + // Remove from cache + _jobShardCache.TryRemove(shardId, out _); + + if (count > 0) + { + // There are still jobs in the shard, just release ownership + try + { + await ReleaseOwnershipAsync(shardId).ConfigureAwait(false); + LogShardOwnershipReleased(_logger, shardId, count); + } + catch (Exception ex) + { + LogFailedToReleaseOwnershipForShard(_logger, ex, shardId); + } + } + else + { + // No jobs left, delete the shard data entirely + try + { + await DeleteShardAsync(shardId).ConfigureAwait(false); + LogShardDeleted(_logger, shardId); + } + catch (Exception ex) + { + LogFailedToDeleteShard(_logger, ex, shardId); + } + } + + // Dispose the shard's resources + try + { + await redisShard.DisposeAsync().ConfigureAwait(false); + } + catch (Exception ex) + { + LogErrorDisposingShard(_logger, ex, shardId); + } + } + + private async Task DeleteShardAsync(string shardId) + { + await InitializeIfNeeded().ConfigureAwait(false); + + var streamKey = $"{_keyPrefix}:shard:{shardId}:stream"; + var metaKey = $"{_keyPrefix}:shard:{shardId}:meta"; + + // Delete all shard-related keys + await _redisOps!.DeleteKeysAsync([streamKey, metaKey]).ConfigureAwait(false); + + // Remove from the shard set + await _redisOps.SetRemoveAsync(ShardSetKey, shardId).ConfigureAwait(false); + } + + private async Task ReleaseOwnershipAsync(string shardId) + { + await InitializeIfNeeded().ConfigureAwait(false); + var metaKey = MetaKeyForShard(shardId); + var metadata = await _redisOps!.GetHashAllAsync(metaKey).ConfigureAwait(false); + var version = metadata.TryGetValue("version", out var v) ? v : "0"; + + return await _redisOps.ReleaseOwnershipAsync(metaKey, version).ConfigureAwait(false); + } + + private static DateTimeOffset ParseDateTimeOffset(IDictionary meta, string key, DateTimeOffset @default) => meta.TryGetValue(key, out var s) && DateTimeOffset.TryParse(s, null, DateTimeStyles.RoundtripKind, out var dt) ? dt : @default; +} diff --git a/src/Redis/Orleans.DurableJobs.Redis/RedisOperationsManager.cs b/src/Redis/Orleans.DurableJobs.Redis/RedisOperationsManager.cs new file mode 100644 index 00000000000..55ee9726576 --- /dev/null +++ b/src/Redis/Orleans.DurableJobs.Redis/RedisOperationsManager.cs @@ -0,0 +1,234 @@ +using System.Text.Json; +using StackExchange.Redis; + +namespace Orleans.DurableJobs.Redis; + +/// +/// Centralizes Redis operations for DurableJobs Redis implementation. +/// +/// +/// Initializes a new instance of the class. +/// +/// The Redis database instance. +internal sealed class RedisOperationsManager(IDatabase db) +{ + private readonly IDatabase _db = db ?? throw new ArgumentNullException(nameof(db)); + + // Lua scripts + private const string CreateShardLua = @" + -- KEYS[1] = metaKey + -- KEYS[2] = shardsSetKey + -- ARGV[1] = shardId + -- ARGV[2] = metadataJson (JSON object with all metadata fields) + if redis.call('EXISTS', KEYS[1]) == 1 then + return 0 + end + local metadata = cjson.decode(ARGV[2]) + for k, v in pairs(metadata) do + redis.call('HSET', KEYS[1], k, v) + end + redis.call('SADD', KEYS[2], ARGV[1]) + return 1 + "; + + private const string TryTakeOwnershipLua = @" + -- KEYS[1] = metaKey + -- ARGV[1] = expectedVersion + -- ARGV[2] = newOwner + -- ARGV[3] = newMembershipVersion + local curr = redis.call('HGET', KEYS[1], 'version') + if curr == false then curr = '0' end + if curr == ARGV[1] then + local next = tostring(tonumber(curr) + 1) + redis.call('HSET', KEYS[1], 'Owner', ARGV[2], 'MembershipVersion', ARGV[3], 'version', next) + return 1 + end + return 0 + "; + + private const string ReleaseOwnershipLua = @" + -- KEYS[1] = metaKey + -- ARGV[1] = expectedVersion + local curr = redis.call('HGET', KEYS[1], 'version') + if curr == false then curr = '0' end + if curr == ARGV[1] then + local next = tostring(tonumber(curr) + 1) + redis.call('HDEL', KEYS[1], 'Owner') + redis.call('HSET', KEYS[1], 'version', next) + return 1 + end + return 0 + "; + + private const string MultiXAddLua = @" + local streamKey = KEYS[1] + local n = tonumber(ARGV[1]) + local ids = {} + local idx = 2 + for i=1,n do + local payload = ARGV[idx] + idx = idx + 1 + local id = redis.call('XADD', streamKey, '*', 'payload', payload) + table.insert(ids, id) + end + return ids + "; + + private const string UpdateMetaLua = @" + local metaKey = KEYS[1] + local expectedVersion = ARGV[1] + local newVersion = ARGV[2] + local fieldsJson = ARGV[3] + local curr = redis.call('HGET', metaKey, 'version') + if curr == false then curr = '' end + if curr == expectedVersion then + local obj = cjson.decode(fieldsJson) + for k,v in pairs(obj) do + redis.call('HSET', metaKey, k, v) + end + redis.call('HSET', metaKey, 'version', newVersion) + return 1 + else + return 0 + end + "; + + /// + /// Gets all member values from a Redis set. + /// + /// The key of the set. + /// An array of string values representing the set members. + public async Task GetSetMembersAsync(RedisKey setKey) + { + var members = await _db.SetMembersAsync(setKey).ConfigureAwait(false); + return [.. members.Select(rv => rv.ToString())]; + } + + /// + /// Gets all hash entries and converts them to a dictionary. + /// + /// The key of the hash. + /// A dictionary containing the hash entries. + public async Task> GetHashAllAsync(RedisKey hashKey) + { + var entries = await _db.HashGetAllAsync(hashKey).ConfigureAwait(false); + return HashEntriesToDictionary(entries); + } + + /// + /// Creates a new shard using the CreateShardLua script. + /// + /// The metadata key for the shard. + /// The key of the set containing all shard IDs. + /// The unique identifier for the shard. + /// The shard metadata. + /// True if the shard was created successfully, false if it already exists. + public async Task CreateShardAsync(RedisKey metaKey, RedisKey shardsSetKey, string shardId, IDictionary metadata) + { + var metadataJson = JsonSerializer.Serialize(metadata); + var res = (int)await _db.ScriptEvaluateAsync(CreateShardLua, + [metaKey, shardsSetKey], + [shardId, metadataJson]).ConfigureAwait(false); + return res == 1; + } + + /// + /// Attempts to take ownership of a shard. + /// + /// The metadata key for the shard. + /// The expected version for optimistic concurrency control. + /// The new owner's address. + /// The new membership version. + /// True if ownership was successfully taken, false otherwise. + public async Task TryTakeOwnershipAsync(RedisKey metaKey, string expectedVersion, string newOwner, string newMembershipVersion) + { + var res = (int)await _db.ScriptEvaluateAsync(TryTakeOwnershipLua, + [metaKey], + [expectedVersion, newOwner, newMembershipVersion]).ConfigureAwait(false); + return res == 1; + } + + /// + /// Releases ownership of a shard. + /// + /// The metadata key for the shard. + /// The expected version for optimistic concurrency control. + /// True if ownership was successfully released, false otherwise. + public async Task ReleaseOwnershipAsync(RedisKey metaKey, string expectedVersion) + { + var res = (int)await _db.ScriptEvaluateAsync(ReleaseOwnershipLua, + [metaKey], + [expectedVersion]).ConfigureAwait(false); + return res == 1; + } + + /// + /// Deletes multiple Redis keys. + /// + /// The keys to delete. + /// The number of keys that were deleted. + public async Task DeleteKeysAsync(RedisKey[] keys) => await _db.KeyDeleteAsync(keys).ConfigureAwait(false); + + /// + /// Removes a value from a Redis set. + /// + /// The key of the set. + /// The value to remove. + /// True if the value was removed, false if it didn't exist. + public async Task SetRemoveAsync(RedisKey setKey, RedisValue value) => await _db.SetRemoveAsync(setKey, value).ConfigureAwait(false); + + /// + /// Reads a range of entries from a Redis stream. + /// Note: This method is synchronous because StackExchange.Redis's StreamRange is synchronous. + /// + /// The key of the stream. + /// The minimum stream entry ID (use "-" for start). + /// The maximum stream entry ID (use "+" for end). + /// An array of stream entries. + public StreamEntry[] StreamRange(RedisKey streamKey, RedisValue minId = default, RedisValue maxId = default) => _db.StreamRange(streamKey, minId, maxId); + + /// + /// Appends multiple job operations to a Redis stream in a single batch. + /// + /// The key of the stream. + /// The payloads to append to the stream. + /// A Redis result containing the stream entry IDs. + public async Task AppendJobOperationBatchAsync(RedisKey streamKey, RedisValue[] payloads) + { + var args = new RedisValue[1 + payloads.Length]; + args[0] = payloads.Length; + for (var i = 0; i < payloads.Length; i++) + { + args[i + 1] = payloads[i]; + } + + return await _db.ScriptEvaluateAsync(MultiXAddLua, [streamKey], args).ConfigureAwait(false); + } + + /// + /// Updates shard metadata using optimistic concurrency control. + /// + /// The metadata key for the shard. + /// The new metadata values. + /// The expected version for optimistic concurrency control. + /// True if the metadata was successfully updated, false if version mismatch occurred. + public async Task UpdateMetadataAsync(RedisKey metaKey, IDictionary metadata, long expectedVersion) + { + var newVersion = (expectedVersion + 1).ToString(); + var fieldsJson = JsonSerializer.Serialize(metadata); + var res = await _db.ScriptEvaluateAsync(UpdateMetaLua, + [metaKey], + [expectedVersion.ToString(), newVersion, fieldsJson]).ConfigureAwait(false); + return (int)res == 1; + } + + private static IDictionary HashEntriesToDictionary(HashEntry[] entries) + { + var dict = new Dictionary(StringComparer.OrdinalIgnoreCase); + foreach (var e in entries) + { + dict[e.Name.ToString()] = e.Value.ToString(); + } + return dict; + } +} diff --git a/src/Redis/Orleans.DurableJobs.Redis/RedisStreamJsonSerializer.cs b/src/Redis/Orleans.DurableJobs.Redis/RedisStreamJsonSerializer.cs new file mode 100644 index 00000000000..96508cfd3d0 --- /dev/null +++ b/src/Redis/Orleans.DurableJobs.Redis/RedisStreamJsonSerializer.cs @@ -0,0 +1,58 @@ +using System.Runtime.CompilerServices; +using System.Text.Json; +using System.Text.Json.Serialization.Metadata; +using StackExchange.Redis; + +namespace Orleans.DurableJobs.Redis; + +internal static class RedisStreamJsonSerializer +{ + public static async IAsyncEnumerable DecodeAsync(StreamEntry[] streamEntries, JsonTypeInfo jsonTypeInfo, [EnumeratorCancellation] CancellationToken cancellationToken) + { + if (streamEntries is null) yield break; + + ArgumentNullException.ThrowIfNull(jsonTypeInfo); + + foreach (var streamEntry in streamEntries) + { + cancellationToken.ThrowIfCancellationRequested(); + + // Find the field named "payload" (case-sensitive) - matches the Lua script in RedisJobShard + var dataField = streamEntry.Values.FirstOrDefault(v => v.Name == "payload"); + + if (dataField.Equals(default) || dataField.Value.IsNull) + { + // Skip entries without a data field + continue; + } + + // Read JSON as string (Redis stores stream field values as binary or string) + var json = (string?)dataField.Value; + if (string.IsNullOrEmpty(json)) + { + continue; + } + + // Deserialize using provided JsonTypeInfo + var item = JsonSerializer.Deserialize(json, jsonTypeInfo) ?? throw new JsonException("Deserialized JSON resulted in null value"); + + // Yield asynchronously - allow caller to observe cancellation between items + await Task.Yield(); + yield return item; + } + } + + /// + /// Serializes a collection of items to JSON strings as RedisValue array using the provided JsonTypeInfo for source-generated serialization. + /// + /// The items to serialize. + /// The JSON type info for source-generated serialization. + /// An array of RedisValue containing JSON strings. + public static RedisValue[] Encode(IEnumerable items, JsonTypeInfo jsonTypeInfo) + { + ArgumentNullException.ThrowIfNull(items); + ArgumentNullException.ThrowIfNull(jsonTypeInfo); + + return items.Select(item => (RedisValue)JsonSerializer.Serialize(item, jsonTypeInfo)).ToArray(); + } +} diff --git a/test/Extensions/Orleans.Redis.Tests/DurableJobs/RedisDurableJobsTests.cs b/test/Extensions/Orleans.Redis.Tests/DurableJobs/RedisDurableJobsTests.cs new file mode 100644 index 00000000000..e15e443d9fa --- /dev/null +++ b/test/Extensions/Orleans.Redis.Tests/DurableJobs/RedisDurableJobsTests.cs @@ -0,0 +1,137 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Orleans.TestingHost; +using StackExchange.Redis; +using Tester.DurableJobs; +using TestExtensions; +using Xunit; + +namespace Tester.Redis.DurableJobs; + +/// +/// Integration tests for Redis DurableJobs using a test cluster. +/// +public class RedisDurableJobsTests : TestClusterPerTest +{ + private DurableJobTestsRunner _runner = null!; + + protected override void CheckPreconditionsOrThrow() => TestUtils.CheckForRedis(); + + public override async Task InitializeAsync() + { + await base.InitializeAsync(); + _runner = new DurableJobTestsRunner(this.GrainFactory); + } + + protected override void ConfigureTestCluster(TestClusterBuilder builder) + { + builder.AddSiloBuilderConfigurator(); + } + + public class SiloHostConfigurator : ISiloConfigurator + { + public void Configure(ISiloBuilder hostBuilder) + { + var connectionString = TestDefaultConfiguration.RedisConnectionString; + hostBuilder + .UseRedisDurableJobs(options => + { + options.ConfigurationOptions = ConfigurationOptions.Parse(connectionString); + options.ShardPrefix = "test-shard"; + }) + .AddMemoryGrainStorageAsDefault(); + } + } + + [SkippableFact, TestCategory("Redis"), TestCategory("DurableJobs")] + public async Task DurableJobGrain() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.DurableJobGrain(cts.Token); + } + + [SkippableFact, TestCategory("Redis"), TestCategory("DurableJobs")] + public async Task JobExecutionOrder() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.JobExecutionOrder(cts.Token); + } + + [SkippableFact, TestCategory("Redis"), TestCategory("DurableJobs")] + public async Task PastDueTime() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.PastDueTime(cts.Token); + } + + [SkippableFact, TestCategory("Redis"), TestCategory("DurableJobs")] + public async Task JobWithMetadata() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.JobWithMetadata(cts.Token); + } + + [SkippableFact, TestCategory("Redis"), TestCategory("DurableJobs")] + public async Task MultipleGrains() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.MultipleGrains(cts.Token); + } + + [SkippableFact, TestCategory("Redis"), TestCategory("DurableJobs")] + public async Task DuplicateJobNames() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.DuplicateJobNames(cts.Token); + } + + [SkippableFact, TestCategory("Redis"), TestCategory("DurableJobs")] + public async Task CancelNonExistentJob() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.CancelNonExistentJob(cts.Token); + } + + [SkippableFact, TestCategory("Redis"), TestCategory("DurableJobs")] + public async Task CancelAlreadyExecutedJob() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.CancelAlreadyExecutedJob(cts.Token); + } + + [SkippableFact, TestCategory("Redis"), TestCategory("DurableJobs")] + public async Task ConcurrentScheduling() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.ConcurrentScheduling(cts.Token); + } + + [SkippableFact, TestCategory("Redis"), TestCategory("DurableJobs")] + public async Task JobPropertiesVerification() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.JobPropertiesVerification(cts.Token); + } + + [SkippableFact, TestCategory("Redis"), TestCategory("DurableJobs")] + public async Task DequeueCount() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.DequeueCount(cts.Token); + } + + [SkippableFact, TestCategory("Redis"), TestCategory("DurableJobs")] + public async Task ScheduleJobOnAnotherGrain() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.ScheduleJobOnAnotherGrain(cts.Token); + } + + [SkippableFact, TestCategory("Redis"), TestCategory("DurableJobs")] + public async Task JobRetry() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.JobRetry(cts.Token); + } +} diff --git a/test/Extensions/Orleans.Redis.Tests/DurableJobs/RedisJobShardManagerTestFixture.cs b/test/Extensions/Orleans.Redis.Tests/DurableJobs/RedisJobShardManagerTestFixture.cs new file mode 100644 index 00000000000..40de06690dd --- /dev/null +++ b/test/Extensions/Orleans.Redis.Tests/DurableJobs/RedisJobShardManagerTestFixture.cs @@ -0,0 +1,84 @@ +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using Orleans.Configuration; +using Orleans.DurableJobs; +using Orleans.DurableJobs.Redis; +using StackExchange.Redis; +using Tester.DurableJobs; +using TestExtensions; + +namespace Tester.Redis.DurableJobs; + +/// +/// Redis implementation of . +/// Provides the infrastructure needed to run shared job shard manager tests against Redis. +/// +internal sealed class RedisJobShardManagerTestFixture : IJobShardManagerTestFixture +{ + private readonly IOptions _options; + private readonly IOptions _clusterOptions; + private ConnectionMultiplexer _multiplexer; + private readonly string _shardPrefix; + + public RedisJobShardManagerTestFixture() + { + _shardPrefix = $"test-{Guid.NewGuid():N}"; + + // Create a custom CreateMultiplexer that caches the multiplexer for cleanup + _options = Options.Create(new RedisJobShardOptions + { + ConfigurationOptions = ConfigurationOptions.Parse(TestDefaultConfiguration.RedisConnectionString), + CreateMultiplexer = CreateMultiplexerAsync, + ShardPrefix = _shardPrefix, + MaxShardCreationRetries = 5, + MaxBatchSize = 128, + MinBatchSize = 1, + BatchFlushInterval = TimeSpan.FromMilliseconds(100) + }); + + _clusterOptions = Options.Create(new ClusterOptions + { + ServiceId = "test-service", + ClusterId = "test-cluster" + }); + } + + private async Task CreateMultiplexerAsync(RedisJobShardOptions options) + { + _multiplexer ??= await ConnectionMultiplexer.ConnectAsync(options.ConfigurationOptions!); + return _multiplexer; + } + + public JobShardManager CreateManager(ILocalSiloDetails localSiloDetails, IClusterMembershipService membershipService) + { + return new RedisJobShardManager( + localSiloDetails, + _options, + _clusterOptions, + membershipService, + NullLoggerFactory.Instance); + } + + public async ValueTask DisposeAsync() + { + if (_multiplexer is not null) + { + // Clean up test data from Redis + var db = _multiplexer.GetDatabase(); + var server = _multiplexer.GetServer(_multiplexer.GetEndPoints()[0]); + + // Delete all keys with our test prefix (using the default key prefix pattern) + var keyPrefix = _options.Value.KeyPrefix ?? $"{_clusterOptions.Value.ServiceId}/durablejobs"; + await foreach (var key in server.KeysAsync(pattern: $"{keyPrefix}:shard:{_shardPrefix}*")) + { + await db.KeyDeleteAsync(key); + } + + // Delete the shard set key + await db.KeyDeleteAsync($"{keyPrefix}:shards:{_shardPrefix}"); + + await _multiplexer.CloseAsync(); + _multiplexer.Dispose(); + } + } +} diff --git a/test/Extensions/Orleans.Redis.Tests/DurableJobs/RedisJobShardManagerTests.cs b/test/Extensions/Orleans.Redis.Tests/DurableJobs/RedisJobShardManagerTests.cs new file mode 100644 index 00000000000..d5f22539652 --- /dev/null +++ b/test/Extensions/Orleans.Redis.Tests/DurableJobs/RedisJobShardManagerTests.cs @@ -0,0 +1,147 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Tester.DurableJobs; +using Xunit; + +namespace Tester.Redis.DurableJobs; + +/// +/// Redis-specific tests for job shard manager functionality. +/// Common tests are delegated to for reusability across providers. +/// +[TestCategory("Redis"), TestCategory("DurableJobs"), TestCategory("Functional")] +public class RedisJobShardManagerTests : IAsyncLifetime, IAsyncDisposable +{ + private readonly RedisJobShardManagerTestFixture _fixture; + private readonly JobShardManagerTestsRunner _runner; + + public RedisJobShardManagerTests() + { + TestUtils.CheckForRedis(); + + // Create fixture and runner for common tests + _fixture = new RedisJobShardManagerTestFixture(); + _runner = new JobShardManagerTestsRunner(_fixture); + } + + public Task InitializeAsync() => Task.CompletedTask; + + public async Task DisposeAsync() => await _fixture.DisposeAsync(); + + async ValueTask IAsyncDisposable.DisposeAsync() => await DisposeAsync(); + + #region Common Tests (Delegated to Runner) + + /// + /// Tests basic shard creation and assignment workflow. + /// + [SkippableFact] + public async Task RedisJobShardManager_Creation_Assignment() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.ShardCreationAndAssignment(cts.Token); + } + + /// + /// Tests reading and consuming jobs from a frozen shard after ownership transfer. + /// + [SkippableFact] + public async Task RedisJobShardManager_ReadFrozenShard() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.ReadFrozenShard(cts.Token); + } + + /// + /// Tests consuming jobs from a live shard. + /// + [SkippableFact] + public async Task RedisJobShardManager_LiveShard() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.LiveShard(cts.Token); + } + + /// + /// Tests job metadata persistence across ownership transfers. + /// + [SkippableFact] + public async Task RedisJobShardManager_JobMetadata() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.JobMetadata(cts.Token); + } + + /// + /// Tests concurrent shard assignment to verify ownership conflict resolution. + /// + [SkippableFact] + public async Task RedisJobShardManager_ConcurrentShardAssignment_OwnershipConflicts() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.ConcurrentShardAssignment_OwnershipConflicts(cts.Token); + } + + /// + /// Tests shard metadata preservation across ownership transfers. + /// + [SkippableFact] + public async Task RedisJobShardManager_ShardMetadataMerge() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.ShardMetadataMerge(cts.Token); + } + + /// + /// Tests stopping shard processing and verifying jobs remain for reassignment. + /// + [SkippableFact] + public async Task RedisJobShardManager_StopProcessingShard() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.StopProcessingShard(cts.Token); + } + + /// + /// Tests retrying a job with a new due time. + /// + [SkippableFact] + public async Task RedisJobShardManager_RetryJobLater() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.RetryJobLater(cts.Token); + } + + /// + /// Tests job cancellation before and during processing. + /// + [SkippableFact] + public async Task RedisJobShardManager_JobCancellation() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.JobCancellation(cts.Token); + } + + /// + /// Tests that multiple shard registrations with the same time range produce unique IDs. + /// + [SkippableFact] + public async Task RedisJobShardManager_ShardRegistrationRetry_IdCollisions() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.ShardRegistrationRetry_IdCollisions(cts.Token); + } + + /// + /// Tests that unregistering a shard with remaining jobs preserves the shard for reassignment. + /// + [SkippableFact] + public async Task RedisJobShardManager_UnregisterShard_WithJobsRemaining() + { + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await _runner.UnregisterShard_WithJobsRemaining(cts.Token); + } + + #endregion +} diff --git a/test/Extensions/Orleans.Redis.Tests/Orleans.Redis.Tests.csproj b/test/Extensions/Orleans.Redis.Tests/Orleans.Redis.Tests.csproj index 1b7119506c2..75393632aea 100644 --- a/test/Extensions/Orleans.Redis.Tests/Orleans.Redis.Tests.csproj +++ b/test/Extensions/Orleans.Redis.Tests/Orleans.Redis.Tests.csproj @@ -15,8 +15,9 @@ + - +