Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions src/DotNetCore.CAP/Internal/ISubscribeExector.Default.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public async Task<OperateResult> ExecuteAsync(MediumMessage message, ConsumerExe
TracingError(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), message.Origin, null, new Exception(error));

var ex = new SubscriberNotFoundException(error);
await SetFailedState(message, ex);
await SetFailedState(message, ex, descriptor.Attribute.FailedMessageExpiredAfter);
return OperateResult.Failed(ex);
}
}
Expand Down Expand Up @@ -100,7 +100,7 @@ public async Task<OperateResult> ExecuteAsync(MediumMessage message, ConsumerExe

sp.Stop();

await SetSuccessfulState(message).ConfigureAwait(false);
await SetSuccessfulState(message, descriptor.Attribute.SucceedMessageExpiredAfter).ConfigureAwait(false);

CapEventCounterSource.Log.WriteInvokeTimeMetrics(sp.Elapsed.TotalMilliseconds);
_logger.ConsumerExecuted(descriptor.ImplTypeInfo.Name, descriptor.MethodInfo.Name,
Expand All @@ -113,26 +113,28 @@ public async Task<OperateResult> ExecuteAsync(MediumMessage message, ConsumerExe
_logger.ConsumerExecuteFailed(message.Origin.GetName(), message.DbId,
message.Origin.GetExecutionInstanceId(), ex);

return (await SetFailedState(message, ex).ConfigureAwait(false), OperateResult.Failed(ex));
return (await SetFailedState(message, ex, descriptor.Attribute.FailedMessageExpiredAfter).ConfigureAwait(false), OperateResult.Failed(ex));
}
}

private Task SetSuccessfulState(MediumMessage message)
private Task SetSuccessfulState(MediumMessage message, int expiredAfter)
{
message.ExpiresAt = DateTime.Now.AddSeconds(_options.SucceedMessageExpiredAfter);
expiredAfter = expiredAfter > 0 ? expiredAfter : _options.SucceedMessageExpiredAfter;
message.ExpiresAt = DateTime.Now.AddSeconds(expiredAfter);

return _dataStorage.ChangeReceiveStateAsync(message, StatusName.Succeeded);
}

private async Task<bool> SetFailedState(MediumMessage message, Exception ex)
private async Task<bool> SetFailedState(MediumMessage message, Exception ex, int expiredAfter)
{
if (ex is SubscriberNotFoundException)
message.Retries = _options.FailedRetryCount; // not retry if SubscriberNotFoundException

var needRetry = UpdateMessageForRetry(message);

message.Origin.AddOrUpdateException(ex);
message.ExpiresAt = message.Added.AddSeconds(_options.FailedMessageExpiredAfter);
expiredAfter = expiredAfter > 0 ? expiredAfter : _options.FailedMessageExpiredAfter;
message.ExpiresAt = message.Added.AddSeconds(expiredAfter);

await _dataStorage.ChangeReceiveStateAsync(message, StatusName.Failed).ConfigureAwait(false);

Expand Down
10 changes: 10 additions & 0 deletions src/DotNetCore.CAP/Internal/TopicAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,14 @@ protected TopicAttribute(string name, bool isPartial = false)
/// If you set this value but don't specify the Group, we will automatically create a Group using the Name.
/// </summary>
public byte GroupConcurrent { get; set; }

/// <summary>
/// Sent or received succeed message after time span of due, then the message will be deleted at due time.
/// </summary>
public int SucceedMessageExpiredAfter { get; set; }

/// <summary>
/// Sent or received failed message after time span of due, then the message will be deleted at due time.
/// </summary>
public int FailedMessageExpiredAfter { get; set; }
}