diff --git a/src/DotNetCore.CAP.NATS/ITransport.NATS.cs b/src/DotNetCore.CAP.NATS/ITransport.NATS.cs index 1049989e..7804a5ca 100644 --- a/src/DotNetCore.CAP.NATS/ITransport.NATS.cs +++ b/src/DotNetCore.CAP.NATS/ITransport.NATS.cs @@ -30,40 +30,47 @@ public NATSTransport(ILogger logger, IConnectionPool connectionPo public async Task SendAsync(TransportMessage message) { - var connection = _connectionPool.RentConnection(); - try { - var msg = new Msg(message.GetName(), message.Body.ToArray()); - foreach (var header in message.Headers) + var connection = _connectionPool.RentConnection(); + try { - msg.Header[header.Key] = header.Value; - } + var msg = new Msg(message.GetName(), message.Body.ToArray()); + foreach (var header in message.Headers) + { + msg.Header[header.Key] = header.Value; + } - var js = connection.CreateJetStreamContext(_jetStreamOptions); + var js = connection.CreateJetStreamContext(_jetStreamOptions); - var builder = PublishOptions.Builder().WithMessageId(message.GetId()); + var builder = PublishOptions.Builder().WithMessageId(message.GetId()); - var resp = await js.PublishAsync(msg, builder.Build()); + var resp = await js.PublishAsync(msg, builder.Build()); - if (resp.Seq > 0) - { - _logger.LogDebug($"NATS stream message [{message.GetName()}] has been published."); + if (resp.Seq > 0) + { + _logger.LogDebug($"NATS stream message [{message.GetName()}] has been published."); - return OperateResult.Success; + return OperateResult.Success; + } + + throw new PublisherSentFailedException("NATS message send failed, no consumer reply!"); } + catch (Exception ex) + { + var warpEx = new PublisherSentFailedException(ex.Message, ex); - throw new PublisherSentFailedException("NATS message send failed, no consumer reply!"); + return OperateResult.Failed(warpEx); + } + finally + { + _connectionPool.Return(connection); + } } - catch (Exception ex) + catch (Exception e) { - var warpEx = new PublisherSentFailedException(ex.Message, ex); - + var warpEx = new PublisherSentFailedException(e.Message, e); return OperateResult.Failed(warpEx); } - finally - { - _connectionPool.Return(connection); - } } -} \ No newline at end of file +}