Skip to content

Commit 9b912d9

Browse files
author
jason
committed
Merge remote-tracking branch 'origin/master' into jason_dev
2 parents c0d6a5b + 5f31ffe commit 9b912d9

File tree

6 files changed

+43
-5
lines changed

6 files changed

+43
-5
lines changed

src/Infrastructure/BotSharp.Abstraction/Infrastructures/Events/IEventPublisher.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,6 @@ public interface IEventPublisher
1515
Task ReDispatchAsync(string channel, int count = 10, string order = "asc");
1616

1717
Task ReDispatchPendingAsync(string channel, string group, int count = 10);
18+
19+
Task RemoveAsync(string channel, int count = 10);
1820
}
Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1+
using System.Threading;
2+
13
namespace BotSharp.Abstraction.Infrastructures.Events;
24

35
public interface IEventSubscriber
46
{
57
Task SubscribeAsync(string channel, Func<string, string, Task> received);
68

7-
Task SubscribeAsync(string channel, string group, bool priorityEnabled, Func<string, string, Task> received);
9+
Task SubscribeAsync(string channel, string group, bool priorityEnabled,
10+
Func<string, string, Task> received,
11+
CancellationToken? stoppingToken = null);
812
}

src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationService.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,12 +173,14 @@ public async Task<Conversation> GetConversationRecordOrCreateNew(string agentId)
173173
var state = _services.GetRequiredService<IConversationStateService>();
174174
var channel = state.GetState("channel");
175175
var channelId = state.GetState("channel_id");
176+
var userId = state.GetState("current_user_id");
176177
var sess = new Conversation
177178
{
178179
Id = _conversationId,
179180
Channel = channel,
180181
ChannelId = channelId,
181-
AgentId = agentId
182+
AgentId = agentId,
183+
UserId = userId,
182184
};
183185
converation = await NewConversation(sess);
184186
}

src/Infrastructure/BotSharp.Core/Infrastructures/Events/RedisPublisher.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,4 +158,26 @@ public async Task ReDispatchPendingAsync(string channel, string group, int count
158158
Console.WriteLine($"Redis error: {ex.Message}");
159159
}
160160
}
161+
162+
public async Task RemoveAsync(string channel, int count = 10)
163+
{
164+
var db = _redis.GetDatabase();
165+
166+
var entries = await db.StreamRangeAsync(channel, "-", "+", count: count, messageOrder: Order.Ascending);
167+
foreach (var entry in entries)
168+
{
169+
_logger.LogInformation($"Fetched message: {channel} {entry.Values[0].Value} ({entry.Id})");
170+
171+
try
172+
{
173+
await db.StreamDeleteAsync(channel, [entry.Id]);
174+
175+
_logger.LogWarning($"Deleted message: {channel} {entry.Values[0].Value} ({entry.Id})");
176+
}
177+
catch (Exception ex)
178+
{
179+
_logger.LogError($"Error processing message: {ex.Message}, event id: {channel} {entry.Id}\r\n{ex}");
180+
}
181+
}
182+
}
161183
}

src/Infrastructure/BotSharp.Core/Infrastructures/Events/RedisSubscriber.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ await _subscriber.SubscribeAsync(channel, async (ch, message) =>
2525
});
2626
}
2727

28-
public async Task SubscribeAsync(string channel, string group, bool priorityEnabled, Func<string, string, Task> received)
28+
public async Task SubscribeAsync(string channel, string group, bool priorityEnabled,
29+
Func<string, string, Task> received,
30+
CancellationToken? stoppingToken = null)
2931
{
3032
var db = _redis.GetDatabase();
3133

@@ -44,6 +46,12 @@ public async Task SubscribeAsync(string channel, string group, bool priorityEnab
4446
{
4547
await Task.Delay(100);
4648

49+
if (stoppingToken.HasValue && stoppingToken.Value.IsCancellationRequested)
50+
{
51+
_logger.LogInformation($"Stopping consumer channel & group: [{channel}, {group}]");
52+
break;
53+
}
54+
4755
if (priorityEnabled)
4856
{
4957
if (await HandleGroupMessage(db, $"{channel}-{EventPriority.High}", group, received) > 0)

src/Infrastructure/BotSharp.Core/Users/Services/UserService.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,7 @@ public async Task<bool> SendVerificationCodeResetPasswordNoLogin(User user)
570570

571571
if (!string.IsNullOrEmpty(user.Phone))
572572
{
573-
record = db.GetUserByPhone(user.Phone);
573+
record = db.GetUserByPhone(user.Phone, regionCode: user.RegionCode);
574574
}
575575

576576
if (!string.IsNullOrEmpty(user.Email))
@@ -745,7 +745,7 @@ public async Task<bool> AddDashboardConversation(string userId, string conversat
745745
await Task.CompletedTask;
746746
return true;
747747
}
748-
748+
749749
public async Task<bool> RemoveDashboardConversation(string userId, string conversationId)
750750
{
751751
var db = _services.GetRequiredService<IBotSharpRepository>();

0 commit comments

Comments
 (0)