diff --git a/src/DotNetCore.CAP/Internal/ISubscribeExector.Default.cs b/src/DotNetCore.CAP/Internal/ISubscribeExector.Default.cs index cf1f6156..4a19c6a3 100644 --- a/src/DotNetCore.CAP/Internal/ISubscribeExector.Default.cs +++ b/src/DotNetCore.CAP/Internal/ISubscribeExector.Default.cs @@ -60,7 +60,7 @@ public async Task ExecuteAsync(MediumMessage message, ConsumerExe TracingError(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), message.Origin, null, new Exception(error)); var ex = new SubscriberNotFoundException(error); - await SetFailedState(message, ex); + await SetFailedState(message, ex, descriptor.Attribute.FailedMessageExpiredAfter); return OperateResult.Failed(ex); } } @@ -100,7 +100,7 @@ public async Task ExecuteAsync(MediumMessage message, ConsumerExe sp.Stop(); - await SetSuccessfulState(message).ConfigureAwait(false); + await SetSuccessfulState(message, descriptor.Attribute.SucceedMessageExpiredAfter).ConfigureAwait(false); CapEventCounterSource.Log.WriteInvokeTimeMetrics(sp.Elapsed.TotalMilliseconds); _logger.ConsumerExecuted(descriptor.ImplTypeInfo.Name, descriptor.MethodInfo.Name, @@ -113,18 +113,19 @@ public async Task ExecuteAsync(MediumMessage message, ConsumerExe _logger.ConsumerExecuteFailed(message.Origin.GetName(), message.DbId, message.Origin.GetExecutionInstanceId(), ex); - return (await SetFailedState(message, ex).ConfigureAwait(false), OperateResult.Failed(ex)); + return (await SetFailedState(message, ex, descriptor.Attribute.FailedMessageExpiredAfter).ConfigureAwait(false), OperateResult.Failed(ex)); } } - private Task SetSuccessfulState(MediumMessage message) + private Task SetSuccessfulState(MediumMessage message, int expiredAfter) { - message.ExpiresAt = DateTime.Now.AddSeconds(_options.SucceedMessageExpiredAfter); + expiredAfter = expiredAfter > 0 ? expiredAfter : _options.SucceedMessageExpiredAfter; + message.ExpiresAt = DateTime.Now.AddSeconds(expiredAfter); return _dataStorage.ChangeReceiveStateAsync(message, StatusName.Succeeded); } - private async Task SetFailedState(MediumMessage message, Exception ex) + private async Task SetFailedState(MediumMessage message, Exception ex, int expiredAfter) { if (ex is SubscriberNotFoundException) message.Retries = _options.FailedRetryCount; // not retry if SubscriberNotFoundException @@ -132,7 +133,8 @@ private async Task SetFailedState(MediumMessage message, Exception ex) var needRetry = UpdateMessageForRetry(message); message.Origin.AddOrUpdateException(ex); - message.ExpiresAt = message.Added.AddSeconds(_options.FailedMessageExpiredAfter); + expiredAfter = expiredAfter > 0 ? expiredAfter : _options.FailedMessageExpiredAfter; + message.ExpiresAt = message.Added.AddSeconds(expiredAfter); await _dataStorage.ChangeReceiveStateAsync(message, StatusName.Failed).ConfigureAwait(false); diff --git a/src/DotNetCore.CAP/Internal/TopicAttribute.cs b/src/DotNetCore.CAP/Internal/TopicAttribute.cs index 0b3e03be..b34961f3 100644 --- a/src/DotNetCore.CAP/Internal/TopicAttribute.cs +++ b/src/DotNetCore.CAP/Internal/TopicAttribute.cs @@ -42,4 +42,14 @@ protected TopicAttribute(string name, bool isPartial = false) /// If you set this value but don't specify the Group, we will automatically create a Group using the Name. /// public byte GroupConcurrent { get; set; } + + /// + /// Sent or received succeed message after time span of due, then the message will be deleted at due time. + /// + public int SucceedMessageExpiredAfter { get; set; } + + /// + /// Sent or received failed message after time span of due, then the message will be deleted at due time. + /// + public int FailedMessageExpiredAfter { get; set; } } \ No newline at end of file