diff --git a/src/Orleans.Streaming.NATS/NatsOptions.cs b/src/Orleans.Streaming.NATS/NatsOptions.cs index ae84d882d3d..0086e908cc0 100644 --- a/src/Orleans.Streaming.NATS/NatsOptions.cs +++ b/src/Orleans.Streaming.NATS/NatsOptions.cs @@ -1,4 +1,4 @@ -using System.Text.Json; +using System.Text.Json; using Orleans.Runtime; using NATS.Client.Core; @@ -51,6 +51,15 @@ public class NatsOptions /// System.Text.Json serializer options to be used by the NATS provider. /// public JsonSerializerOptions? JsonSerializerOptions { get; set; } + + /// + /// The number of stream replicas in the NATS JetStream cluster. + /// Higher values improve availability during node restarts (R3 survives + /// single-node failures in a 3-node cluster). Must be an odd number + /// and cannot exceed the number of NATS nodes. + /// Defaults to 1. Set to 3 for production clusters with ≥ 3 nodes. + /// + public int NumReplicas { get; set; } = 1; } public class NatsStreamOptionsValidator(NatsOptions options, string? name = null) : IConfigurationValidator @@ -62,5 +71,11 @@ public void ValidateConfiguration() throw new OrleansConfigurationException( $"The {nameof(NatsOptions.StreamName)} is required for the NATS stream provider '{name}'."); } + + if (options.NumReplicas < 1) + { + throw new OrleansConfigurationException( + $"The {nameof(NatsOptions.NumReplicas)} must be at least 1 for the NATS stream provider '{name}'."); + } } } diff --git a/src/Orleans.Streaming.NATS/Providers/NatsConnectionManager.cs b/src/Orleans.Streaming.NATS/Providers/NatsConnectionManager.cs index 76a19ed7cb0..a936656e58e 100644 --- a/src/Orleans.Streaming.NATS/Providers/NatsConnectionManager.cs +++ b/src/Orleans.Streaming.NATS/Providers/NatsConnectionManager.cs @@ -111,6 +111,7 @@ public async Task Initialize(CancellationToken cancellationToken = default) var streamConfig = new StreamConfig(this._options.StreamName, [$"{this._providerName}.>"]) { Retention = StreamConfigRetention.Workqueue, + NumReplicas = this._options.NumReplicas, SubjectTransform = new SubjectTransform { Src = $"{this._providerName}.*.*", @@ -125,6 +126,28 @@ public async Task Initialize(CancellationToken cancellationToken = default) { // ignore, stream already exists } + catch (NatsJSApiException e) when (e.Error.ErrCode == 10058) + { + // Stream exists with different config — attempt in-place update + // (safe for NumReplicas changes; NATS allows replica count upgrades) + this._logger.LogInformation( + "Stream {Stream} exists with different config — updating.", + this._options.StreamName); + + var streamConfig = new StreamConfig(this._options.StreamName, [$"{this._providerName}.>"]) + { + Retention = StreamConfigRetention.Workqueue, + NumReplicas = this._options.NumReplicas, + SubjectTransform = new SubjectTransform + { + Src = $"{this._providerName}.*.*", + Dest = + @$"{this._providerName}.{{{{partition({this._options.PartitionCount},1,2)}}}}.{{{{wildcard(1)}}}}.{{{{wildcard(2)}}}}" + } + }; + + await this._natsContext.UpdateStreamAsync(streamConfig, cancellationToken); + } this._logger.LogTrace( "Initialized to NATS JetStream stream {Stream} on server {NatsServer}", diff --git a/src/Orleans.Streaming.NATS/Providers/NatsStreamConsumer.cs b/src/Orleans.Streaming.NATS/Providers/NatsStreamConsumer.cs index c998cdaa339..818d49ba8f9 100644 --- a/src/Orleans.Streaming.NATS/Providers/NatsStreamConsumer.cs +++ b/src/Orleans.Streaming.NATS/Providers/NatsStreamConsumer.cs @@ -38,10 +38,30 @@ internal sealed class NatsStreamConsumer( { if (this._consumer is null) { - this._logger.LogError( - "Internal NATS Consumer is not initialized. Provider: {Provider} | Stream: {Stream} | Partition: {Partition}.", - provider, stream, partition); - return ([], 0); + // Lazy retry: attempt re-initialization on each poll cycle. + // This handles transient failures during initial Initialize() + // (leader election, timeout, network blip). + try + { + this._logger.LogWarning( + "NATS Consumer not initialized — attempting re-initialization. Provider: {Provider} | Stream: {Stream} | Partition: {Partition}.", + provider, stream, partition); + + await Initialize(cancellationToken); + } + catch (Exception ex) + { + this._logger.LogWarning(ex, + "NATS Consumer re-initialization failed. Provider: {Provider} | Stream: {Stream} | Partition: {Partition}. Will retry on next poll.", + provider, stream, partition); + return ([], 0); + } + + // If still null after retry, bail (next poll will retry again) + if (this._consumer is null) + { + return ([], 0); + } } var batchCount = messageCount > 0 && messageCount < batchSize ? messageCount : batchSize; diff --git a/test/Extensions/Orleans.Streaming.NATS.Tests/NatsOptionsTests.cs b/test/Extensions/Orleans.Streaming.NATS.Tests/NatsOptionsTests.cs new file mode 100644 index 00000000000..1e930f57055 --- /dev/null +++ b/test/Extensions/Orleans.Streaming.NATS.Tests/NatsOptionsTests.cs @@ -0,0 +1,129 @@ +using NATS.Client.Core; +using NATS.Client.JetStream; +using NATS.Client.JetStream.Models; +using Orleans.Runtime; +using Orleans.Streaming.NATS; +using TestExtensions; +using Xunit; + +namespace NATS.Tests; + +[TestCategory("NATS")] +public sealed class NatsOptionsTests +{ + [Fact] + public void DefaultNumReplicas_ShouldBeOne() + { + var options = new NatsOptions(); + + Assert.Equal(1, options.NumReplicas); + } + + [Theory] + [InlineData(0)] + [InlineData(-1)] + [InlineData(-100)] + public void Validator_InvalidNumReplicas_ShouldThrow(int numReplicas) + { + var options = new NatsOptions + { + StreamName = "test-stream", + NumReplicas = numReplicas + }; + + var validator = new NatsStreamOptionsValidator(options, "test-provider"); + + Assert.Throws(validator.ValidateConfiguration); + } + + [Theory] + [InlineData(1)] + [InlineData(3)] + [InlineData(5)] + public void Validator_ValidNumReplicas_ShouldNotThrow(int numReplicas) + { + var options = new NatsOptions + { + StreamName = "test-stream", + NumReplicas = numReplicas + }; + + var validator = new NatsStreamOptionsValidator(options, "test-provider"); + + validator.ValidateConfiguration(); + } + + [Fact] + public void Validator_MissingStreamName_ShouldThrow() + { + var options = new NatsOptions + { + StreamName = null!, + NumReplicas = 1 + }; + + var validator = new NatsStreamOptionsValidator(options, "test-provider"); + + Assert.Throws(validator.ValidateConfiguration); + } + + [Fact] + public void Validator_EmptyStreamName_ShouldThrow() + { + var options = new NatsOptions + { + StreamName = " ", + NumReplicas = 1 + }; + + var validator = new NatsStreamOptionsValidator(options, "test-provider"); + + Assert.Throws(validator.ValidateConfiguration); + } + + [SkippableFact] + public async Task NumReplicas_IsAppliedToJetStreamConfig() + { + if (!NatsTestConstants.IsNatsAvailable) + { + throw new SkipException("Nats Server is not available"); + } + + var streamName = $"test-replicas-{Guid.NewGuid()}"; + await using var natsConnection = new NatsConnection(); + var natsContext = new NatsJSContext(natsConnection); + + await natsConnection.ConnectAsync(); + + try + { + var streamConfig = new StreamConfig(streamName, [$"test-replicas-provider.>"]) + { + Retention = StreamConfigRetention.Workqueue, + NumReplicas = 1 + }; + + var stream = await natsContext.CreateStreamAsync(streamConfig); + var info = stream.Info; + + Assert.Equal(1, info.Config.NumReplicas); + } + finally + { + try + { + var stream = await natsContext.GetStreamAsync(streamName); + await stream.DeleteAsync(); + } + catch (NatsJSApiException) + { + // Ignore cleanup errors + } + } + } + + // NOTE: Testing NumReplicas > 1 (e.g. R3) requires a multi-node NATS JetStream + // cluster. A single NATS node only supports NumReplicas = 1. R3 integration + // testing should be done in a CI environment with a 3-node cluster configured + // via docker-compose or similar infrastructure. +}