Skip to content

Commit 3dc76fc

Browse files
committed
Support port for event
1 parent 4cf3531 commit 3dc76fc

File tree

2 files changed

+14
-8
lines changed

2 files changed

+14
-8
lines changed

src/Infrastructure/BotSharp.Abstraction/Infrastructures/Events/IEventSubscriber.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ public interface IEventSubscriber
66
{
77
Task SubscribeAsync(string channel, Func<string, string, Task> received);
88

9-
Task SubscribeAsync(string channel, string group, bool priorityEnabled,
9+
Task SubscribeAsync(string channel, string group, int? port, bool priorityEnabled,
1010
Func<string, string, Task> received,
1111
CancellationToken? stoppingToken = null);
1212
}

src/Infrastructure/BotSharp.Core/Infrastructures/Events/RedisSubscriber.cs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ await _subscriber.SubscribeAsync(channel, async (ch, message) =>
2525
});
2626
}
2727

28-
public async Task SubscribeAsync(string channel, string group, bool priorityEnabled,
28+
public async Task SubscribeAsync(string channel, string group, int? port, bool priorityEnabled,
2929
Func<string, string, Task> received,
3030
CancellationToken? stoppingToken = null)
3131
{
@@ -42,6 +42,12 @@ public async Task SubscribeAsync(string channel, string group, bool priorityEnab
4242
await CreateConsumerGroup(db, channel, group);
4343
}
4444

45+
var consumer = Environment.MachineName;
46+
if (port.HasValue)
47+
{
48+
consumer += $"-{port}";
49+
}
50+
4551
while (true)
4652
{
4753
await Task.Delay(100);
@@ -54,28 +60,28 @@ public async Task SubscribeAsync(string channel, string group, bool priorityEnab
5460

5561
if (priorityEnabled)
5662
{
57-
if (await HandleGroupMessage(db, $"{channel}-{EventPriority.High}", group, received) > 0)
63+
if (await HandleGroupMessage(db, $"{channel}-{EventPriority.High}", group, consumer, received) > 0)
5864
{
5965
continue;
6066
}
6167

62-
if (await HandleGroupMessage(db, $"{channel}-{EventPriority.Medium}", group, received) > 0)
68+
if (await HandleGroupMessage(db, $"{channel}-{EventPriority.Medium}", group, consumer, received) > 0)
6369
{
6470
continue;
6571
}
6672

67-
await HandleGroupMessage(db, $"{channel}-{EventPriority.Low}", group, received);
73+
await HandleGroupMessage(db, $"{channel}-{EventPriority.Low}", group, consumer, received);
6874
}
6975
else
7076
{
71-
await HandleGroupMessage(db, channel, group, received);
77+
await HandleGroupMessage(db, channel, group, consumer, received);
7278
}
7379
}
7480
}
7581

76-
private async Task<int> HandleGroupMessage(IDatabase db, string channel, string group, Func<string, string, Task> received)
82+
private async Task<int> HandleGroupMessage(IDatabase db, string channel, string group, string consumer, Func<string, string, Task> received)
7783
{
78-
var entries = await db.StreamReadGroupAsync(channel, group, Environment.MachineName, count: 1);
84+
var entries = await db.StreamReadGroupAsync(channel, group, consumer, count: 1);
7985
foreach (var entry in entries)
8086
{
8187
_logger.LogInformation($"Consumer {Environment.MachineName} received: {channel} {entry.Values[0].Value}");

0 commit comments

Comments
 (0)