Skip to content

ccccccmd/DelayQ

Repository files navigation

DelayQ


DelayQ Logo

DelayQ is a robust, Redis-backed delay queue library for .NET, designed for high performance and reliability. It leverages Redis Sorted Sets (ZSET) for efficient scheduling and Redis Streams for reliable message delivery, consumer groups, and horizontal scalability.

System Architecture

DelayQ separates the scheduling of messages from their reliable consumption to guarantee atomicity and high throughput:

  1. Producer: Enqueues messages into a Redis Sorted Set (ZSET) with their scheduled execution time (ExecuteAt) as the score. It uses Lua scripts to guarantee atomicity (e.g., when providing custom MessageId to allow cancellation and upserting).
  2. ScanWorker: A background polling process (IHostedService) that periodically checks the ZSET for messages that are due. It atomically reads and removes them from the ZSET and pushes them into a Redis Stream.
  3. ConsumerWorker: A background consumer that reads from the Redis Stream using a Consumer Group. It ensures reliable at-least-once delivery, acknowledges (XACK) successful messages, and handles retries or Dead-Letter Queue (DLQ) routing for failed messages.
graph TD
    classDef redis fill:#b32d2d,stroke:#333,stroke-width:2px,color:#fff;
    classDef worker fill:#2d6b8c,stroke:#333,stroke-width:2px,color:#fff;
    
    P([Producer]) -. "Enqueue (Time)" .-> Z[("Redis ZSET <br/> rdq:queue:zset")]:::redis
    Z -. "Lua Fetch Due" .-> SW[ScanWorker]:::worker
    
    subgraph IHostedService
        SW
        CW[ConsumerWorker]:::worker
    end
    
    SW -. "XADD" .-> S[("Redis Stream <br/> rdq:queue:stream")]:::redis
    S -. "XREADGROUP" .-> CW
    
    CW -- "Invoke" --> H{{IDelayMessageHandler}}
    
    H -- "Success" --> XACK([Redis XACK])
    H -- "Failure or Retry" --> P
    H -- "Max Retries" --> DLQ[("Redis DLQ")]:::redis
Loading

Flowchart

sequenceDiagram
    participant App as Producer App
    participant Q as IDelayQueue
    participant R_Zset as Redis ZSET
    participant Scan as ScanWorker
    participant R_Stream as Redis Stream
    participant Consumer as ConsumerWorker
    participant Handler as IDelayMessageHandler

    App->>Q: EnqueueAsync(msg, delay)
    Q->>R_Zset: ZADD (Score = Now + Delay)
    
    loop Every PollingInterval
        Scan->>R_Zset: Fetch due messages (score <= Now)
        alt Has due messages
            Scan->>R_Stream: XADD (Push to Stream)
            Scan->>R_Zset: ZREM (Remove from ZSET)
        end
    end
    
    loop Stream Reading (Consumer Group)
        Consumer->>R_Stream: XREADGROUP
        R_Stream-->>Consumer: Unacknowledged messages
        Consumer->>Handler: HandleAsync(msg)
        
        alt Success
            Handler-->>Consumer: Task Completed successfully
            Consumer->>R_Stream: XACK
        else Failure
            Handler--xConsumer: Exception Thrown
            alt Retry count < MaxRetries
                Consumer->>Q: EnqueueAsync(msg, NextRetryDelay)
            else MaxRetries Exceeded
                Consumer->>Redis DLQ: Move to DLQ
            end
            Consumer->>R_Stream: XACK
        end
    end
Loading

Usage

1. Service Registration

In your Program.cs or Startup.cs, register the Redis connection and the Delay Queue services. Provide options for polling intervals, concurrency, and retries:

using StackExchange.Redis;
using DelayQ;

var builder = Host.CreateApplicationBuilder(args);

// 1. Register a singleton Redis ConnectionMultiplexer
var redis = await ConnectionMultiplexer.ConnectAsync("localhost:6379");
builder.Services.AddSingleton<IConnectionMultiplexer>(redis);

// 2. Register Delay Queue for a specific message type and its handler
builder.Services.AddDelayQueue<OrderMessage, OrderTimeoutHandler>(options =>
{
    options.QueueName = "order-timeout";               // Unique queue identifier in Redis
    options.ConsumerGroup = "order-service";           // Redis Stream consumer group name
    options.PollingInterval = TimeSpan.FromSeconds(1); // How often to check for due messages
    options.MaxConcurrency = 4;                        // Max parallel message processing
    options.MaxRetries = 3;                            // Max retries on failure
    options.RetryBaseDelay = TimeSpan.FromSeconds(5);  // Base backoff for retries
});

var host = builder.Build();
host.Run();

2. Define Message and Handler

Create a class or record for your message payload, and implement the IDelayMessageHandler<T> interface. Custom dependencies (like DbContext) can be injected as the handler is resolved from a scoped DI container automatically.

// Message Payload
public record OrderMessage(string OrderId, decimal Amount, string Currency);

// Message Handler
public class OrderTimeoutHandler : IDelayMessageHandler<OrderMessage>
{
    private readonly ILogger<OrderTimeoutHandler> _logger;

    public OrderTimeoutHandler(ILogger<OrderTimeoutHandler> logger)
    {
        _logger = logger;
    }

    public async Task HandleAsync(DelayMessage<OrderMessage> message, CancellationToken cancellationToken)
    {
        _logger.LogInformation(
            "Checking timeout for order: {OrderId}. Attempt: {RetryCount}", 
            message.Payload.OrderId, 
            message.RetryCount);
            
        // Add your business logic here
        // E.g., cancel the order in the database if it hasn't been paid
    }
}

3. Publish / Cancel Delayed Messages

Inject IDelayQueue<T> anywhere in your application where you need to schedule a message. You can optionally provide a custom messageId if you need the ability to cancel it later before it executes.

public class CheckoutService
{
    private readonly IDelayQueue<OrderMessage> _delayQueue;

    public CheckoutService(IDelayQueue<OrderMessage> delayQueue)
    {
        _delayQueue = delayQueue;
    }

    public async Task ProcessCheckoutAsync(string orderId)
    {
        // ... payment initial logic ...

        // Schedule a message to be processed in 30 minutes
        string messageId = await _delayQueue.EnqueueAsync(
            new OrderMessage(orderId, 100m, "USD"),
            delay: TimeSpan.FromMinutes(30),
            messageId: $"timeout-{orderId}" // Optional: custom ID allows cancellation
        );
    }

    public async Task CancelOrderTimeoutAsync(string orderId)
    {
        // Cancel a previously scheduled message using its ID
        // Returns true if it was cancelled, false if it had already executed or wasn't found
        bool cancelled = await _delayQueue.CancelAsync($"timeout-{orderId}");
    }
}

4. Batch Enqueueing

If you need to produce multiple delayed messages at once, use the batching API to minimize network overhead with Redis:

await _delayQueue.EnqueueBatchAsync([
    (new OrderMessage("O-001", 10m, "USD"), TimeSpan.FromSeconds(10)),
    (new OrderMessage("O-002", 20m, "USD"), TimeSpan.FromSeconds(20)),
    (new OrderMessage("O-003", 30m, "USD"), TimeSpan.FromSeconds(30))
]);

About

DelayQ is a robust, Redis-backed delay queue library for .NET, designed for high performance and reliability. It leverages Redis Sorted Sets (ZSET) for efficient scheduling and Redis Streams for reliable message delivery, consumer groups, and horizontal scalability.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages