Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/Orleans.Streaming.NATS/NatsOptions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Text.Json;
using System.Text.Json;
using Orleans.Runtime;
using NATS.Client.Core;

Expand Down Expand Up @@ -51,6 +51,15 @@ public class NatsOptions
/// System.Text.Json serializer options to be used by the NATS provider.
/// </summary>
public JsonSerializerOptions? JsonSerializerOptions { get; set; }

/// <summary>
/// The number of stream replicas in the NATS JetStream cluster.
/// Higher values improve availability during node restarts (R3 survives
/// single-node failures in a 3-node cluster). Must be an odd number
/// and cannot exceed the number of NATS nodes.
/// Defaults to 1. Set to 3 for production clusters with ≥ 3 nodes.
/// </summary>
public int NumReplicas { get; set; } = 1;
Comment on lines +55 to +62
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The XML docs state NumReplicas "must be an odd number" and "cannot exceed the number of NATS nodes", but the validator only enforces >= 1. Either relax the docs to match what is actually enforced, or add validation for the odd-number requirement (and consider how to handle/enforce the upper bound, if possible).

Copilot uses AI. Check for mistakes.
}

public class NatsStreamOptionsValidator(NatsOptions options, string? name = null) : IConfigurationValidator
Expand All @@ -62,5 +71,11 @@ public void ValidateConfiguration()
throw new OrleansConfigurationException(
$"The {nameof(NatsOptions.StreamName)} is required for the NATS stream provider '{name}'.");
}

if (options.NumReplicas < 1)
{
throw new OrleansConfigurationException(
$"The {nameof(NatsOptions.NumReplicas)} must be at least 1 for the NATS stream provider '{name}'.");
}
}
}
23 changes: 23 additions & 0 deletions src/Orleans.Streaming.NATS/Providers/NatsConnectionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public async Task Initialize(CancellationToken cancellationToken = default)
var streamConfig = new StreamConfig(this._options.StreamName, [$"{this._providerName}.>"])
{
Retention = StreamConfigRetention.Workqueue,
NumReplicas = this._options.NumReplicas,
SubjectTransform = new SubjectTransform
Comment on lines 111 to 115
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streamConfig is constructed twice (CreateStreamAsync + UpdateStreamAsync) with the same fields. Consider extracting a private helper/local function to build the config once to avoid future drift (eg, if Retention/SubjectTransform changes later but only one path is updated).

Copilot uses AI. Check for mistakes.
{
Src = $"{this._providerName}.*.*",
Expand All @@ -125,6 +126,28 @@ public async Task Initialize(CancellationToken cancellationToken = default)
{
// ignore, stream already exists
}
catch (NatsJSApiException e) when (e.Error.ErrCode == 10058)
{
// Stream exists with different config — attempt in-place update
// (safe for NumReplicas changes; NATS allows replica count upgrades)
this._logger.LogInformation(
"Stream {Stream} exists with different config — updating.",
this._options.StreamName);

var streamConfig = new StreamConfig(this._options.StreamName, [$"{this._providerName}.>"])
{
Retention = StreamConfigRetention.Workqueue,
NumReplicas = this._options.NumReplicas,
SubjectTransform = new SubjectTransform
{
Src = $"{this._providerName}.*.*",
Dest =
@$"{this._providerName}.{{{{partition({this._options.PartitionCount},1,2)}}}}.{{{{wildcard(1)}}}}.{{{{wildcard(2)}}}}"
}
};

await this._natsContext.UpdateStreamAsync(streamConfig, cancellationToken);
}

this._logger.LogTrace(
"Initialized to NATS JetStream stream {Stream} on server {NatsServer}",
Expand Down
28 changes: 24 additions & 4 deletions src/Orleans.Streaming.NATS/Providers/NatsStreamConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,30 @@ internal sealed class NatsStreamConsumer(
{
if (this._consumer is null)
{
this._logger.LogError(
"Internal NATS Consumer is not initialized. Provider: {Provider} | Stream: {Stream} | Partition: {Partition}.",
provider, stream, partition);
return ([], 0);
// Lazy retry: attempt re-initialization on each poll cycle.
// This handles transient failures during initial Initialize()
// (leader election, timeout, network blip).
try
{
this._logger.LogWarning(
"NATS Consumer not initialized — attempting re-initialization. Provider: {Provider} | Stream: {Stream} | Partition: {Partition}.",
provider, stream, partition);

await Initialize(cancellationToken);
Comment on lines +46 to +50
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetMessages logs a warning on every poll cycle while the consumer is uninitialized. During a longer outage/misconfiguration this can still produce very high log volume (poll cadence is ~100ms). Consider adding a backoff/rate-limit (eg, log once then periodically, or only log after N consecutive failures), while still retrying initialization each cycle.

Copilot uses AI. Check for mistakes.
}
catch (Exception ex)
{
this._logger.LogWarning(ex,
"NATS Consumer re-initialization failed. Provider: {Provider} | Stream: {Stream} | Partition: {Partition}. Will retry on next poll.",
provider, stream, partition);
return ([], 0);
}

// If still null after retry, bail (next poll will retry again)
if (this._consumer is null)
{
return ([], 0);
}
}

var batchCount = messageCount > 0 && messageCount < batchSize ? messageCount : batchSize;
Expand Down
129 changes: 129 additions & 0 deletions test/Extensions/Orleans.Streaming.NATS.Tests/NatsOptionsTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
using Orleans.Runtime;
using Orleans.Streaming.NATS;
using TestExtensions;
using Xunit;

namespace NATS.Tests;

[TestCategory("NATS")]
public sealed class NatsOptionsTests
{
[Fact]
public void DefaultNumReplicas_ShouldBeOne()
{
var options = new NatsOptions();

Assert.Equal(1, options.NumReplicas);
}

[Theory]
[InlineData(0)]
[InlineData(-1)]
[InlineData(-100)]
public void Validator_InvalidNumReplicas_ShouldThrow(int numReplicas)
{
var options = new NatsOptions
{
StreamName = "test-stream",
NumReplicas = numReplicas
};

var validator = new NatsStreamOptionsValidator(options, "test-provider");

Assert.Throws<OrleansConfigurationException>(validator.ValidateConfiguration);
}

[Theory]
[InlineData(1)]
[InlineData(3)]
[InlineData(5)]
public void Validator_ValidNumReplicas_ShouldNotThrow(int numReplicas)
{
var options = new NatsOptions
{
StreamName = "test-stream",
NumReplicas = numReplicas
};

var validator = new NatsStreamOptionsValidator(options, "test-provider");

validator.ValidateConfiguration();
}

[Fact]
public void Validator_MissingStreamName_ShouldThrow()
{
var options = new NatsOptions
{
StreamName = null!,
NumReplicas = 1
};

var validator = new NatsStreamOptionsValidator(options, "test-provider");

Assert.Throws<OrleansConfigurationException>(validator.ValidateConfiguration);
}

[Fact]
public void Validator_EmptyStreamName_ShouldThrow()
{
var options = new NatsOptions
{
StreamName = " ",
NumReplicas = 1
};

var validator = new NatsStreamOptionsValidator(options, "test-provider");

Assert.Throws<OrleansConfigurationException>(validator.ValidateConfiguration);
}

[SkippableFact]
public async Task NumReplicas_IsAppliedToJetStreamConfig()
{
if (!NatsTestConstants.IsNatsAvailable)
{
throw new SkipException("Nats Server is not available");
}

var streamName = $"test-replicas-{Guid.NewGuid()}";
await using var natsConnection = new NatsConnection();
var natsContext = new NatsJSContext(natsConnection);

await natsConnection.ConnectAsync();

try
{
var streamConfig = new StreamConfig(streamName, [$"test-replicas-provider.>"])
{
Retention = StreamConfigRetention.Workqueue,
NumReplicas = 1
};
Comment on lines +100 to +104
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NumReplicas_IsAppliedToJetStreamConfig doesn't exercise the provider path which was changed (NatsOptions -> NatsConnectionManager -> StreamConfig). The test currently hard-codes NumReplicas = 1 directly on StreamConfig, so it will pass even if _options.NumReplicas is never applied by the provider. Consider updating this to initialize a NatsConnectionManager with options.NumReplicas and asserting the resulting stream info, or rename the test so it reflects what is actually being validated.

Copilot uses AI. Check for mistakes.

var stream = await natsContext.CreateStreamAsync(streamConfig);
var info = stream.Info;

Assert.Equal(1, info.Config.NumReplicas);
}
finally
{
try
{
var stream = await natsContext.GetStreamAsync(streamName);
await stream.DeleteAsync();
}
catch (NatsJSApiException)
{
// Ignore cleanup errors
}
}
}

// NOTE: Testing NumReplicas > 1 (e.g. R3) requires a multi-node NATS JetStream
// cluster. A single NATS node only supports NumReplicas = 1. R3 integration
// testing should be done in a CI environment with a 3-node cluster configured
// via docker-compose or similar infrastructure.
}
Loading