Skip to content

Commit 5f31ffe

Browse files
committed
Allow Event to be removed.
1 parent 4813c5b commit 5f31ffe

File tree

2 files changed

+24
-0
lines changed

2 files changed

+24
-0
lines changed

src/Infrastructure/BotSharp.Abstraction/Infrastructures/Events/IEventPublisher.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,6 @@ public interface IEventPublisher
1515
Task ReDispatchAsync(string channel, int count = 10, string order = "asc");
1616

1717
Task ReDispatchPendingAsync(string channel, string group, int count = 10);
18+
19+
Task RemoveAsync(string channel, int count = 10);
1820
}

src/Infrastructure/BotSharp.Core/Infrastructures/Events/RedisPublisher.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,4 +158,26 @@ public async Task ReDispatchPendingAsync(string channel, string group, int count
158158
Console.WriteLine($"Redis error: {ex.Message}");
159159
}
160160
}
161+
162+
public async Task RemoveAsync(string channel, int count = 10)
163+
{
164+
var db = _redis.GetDatabase();
165+
166+
var entries = await db.StreamRangeAsync(channel, "-", "+", count: count, messageOrder: Order.Ascending);
167+
foreach (var entry in entries)
168+
{
169+
_logger.LogInformation($"Fetched message: {channel} {entry.Values[0].Value} ({entry.Id})");
170+
171+
try
172+
{
173+
await db.StreamDeleteAsync(channel, [entry.Id]);
174+
175+
_logger.LogWarning($"Deleted message: {channel} {entry.Values[0].Value} ({entry.Id})");
176+
}
177+
catch (Exception ex)
178+
{
179+
_logger.LogError($"Error processing message: {ex.Message}, event id: {channel} {entry.Id}\r\n{ex}");
180+
}
181+
}
182+
}
161183
}

0 commit comments

Comments
 (0)