Skip to content

Commit 7b14c83

Browse files
committed
Add Error channel
1 parent d041d42 commit 7b14c83

File tree

2 files changed

+25
-5
lines changed

2 files changed

+25
-5
lines changed

src/Infrastructure/BotSharp.Core/Infrastructures/Events/RedisPublisher.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ private bool CheckMessageExists(IDatabase db, string channel, string fieldName,
7979
return exists;
8080
}
8181

82-
private NameValueEntry[] AssembleMessage(RedisValue message)
82+
public static NameValueEntry[] AssembleMessage(RedisValue message)
8383
{
8484
return
8585
[
@@ -88,6 +88,16 @@ private NameValueEntry[] AssembleMessage(RedisValue message)
8888
];
8989
}
9090

91+
public static NameValueEntry[] AssembleErrorMessage(RedisValue message, string error)
92+
{
93+
return
94+
[
95+
new NameValueEntry("message", message),
96+
new NameValueEntry("timestamp", DateTime.UtcNow.ToString("o")),
97+
new NameValueEntry("error", error)
98+
];
99+
}
100+
91101
public async Task ReDispatchAsync(string channel, int count = 10, string order = "asc")
92102
{
93103
var db = _redis.GetDatabase();

src/Infrastructure/BotSharp.Core/Infrastructures/Events/RedisSubscriber.cs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ public async Task SubscribeAsync(string channel, string group, int? port, bool p
4242
await CreateConsumerGroup(db, channel, group);
4343
}
4444

45+
await CreateConsumerGroup(db, $"{channel}-Error", group);
46+
4547
var consumer = Environment.MachineName;
4648
if (port.HasValue)
4749
{
@@ -98,16 +100,24 @@ private async Task<int> HandleGroupMessage(IDatabase db, string channel, string
98100
try
99101
{
100102
await received(channel, entry.Values[0].Value);
101-
102-
// Optionally delete the message to save space
103-
await db.StreamDeleteAsync(channel, [entry.Id]);
104103
}
105104
catch (Exception ex)
106105
{
107-
_logger.LogError($"Error processing message: {ex.Message}, event id: {channel} {entry.Id}\r\n{ex}");
106+
_logger.LogError($"Error processing message: {ex.Message}, event id: {channel} {entry.Id} {entry.Values[0].Value}");
107+
108+
// Add a message to the Error stream, keeping only the latest 1 million messages
109+
await db.StreamAddAsync($"{channel}-Error",
110+
RedisPublisher.AssembleErrorMessage(entry.Values[0].Value, ex.Message),
111+
messageId: entry.Id,
112+
maxLength: 1000 * 10000);
113+
108114
// Slow down the consumer if there are errors
109115
await Task.Delay(1000 * 10);
110116
}
117+
finally
118+
{
119+
await db.StreamDeleteAsync(channel, [entry.Id]);
120+
}
111121
}
112122

113123
return entries.Length;

0 commit comments

Comments
 (0)