@@ -64,21 +64,21 @@ public async Task SubscribeAsync(string channel, string group, int? port, bool p
6464 {
6565 if ( priorityEnabled )
6666 {
67- if ( await HandleGroupMessage ( db , $ "{ channel } -{ EventPriority . High } ", group , consumer , received ) > 0 )
67+ if ( await HandleGroupMessage ( db , $ "{ channel } -{ EventPriority . High } ", group , consumer , received , $ " { channel } -Error" ) > 0 )
6868 {
6969 continue ;
7070 }
7171
72- if ( await HandleGroupMessage ( db , $ "{ channel } -{ EventPriority . Medium } ", group , consumer , received ) > 0 )
72+ if ( await HandleGroupMessage ( db , $ "{ channel } -{ EventPriority . Medium } ", group , consumer , received , $ " { channel } -Error" ) > 0 )
7373 {
7474 continue ;
7575 }
7676
77- await HandleGroupMessage ( db , $ "{ channel } -{ EventPriority . Low } ", group , consumer , received ) ;
77+ await HandleGroupMessage ( db , $ "{ channel } -{ EventPriority . Low } ", group , consumer , received , $ " { channel } -Error" ) ;
7878 }
7979 else
8080 {
81- await HandleGroupMessage ( db , channel , group , consumer , received ) ;
81+ await HandleGroupMessage ( db , channel , group , consumer , received , $ " { channel } -Error" ) ;
8282 }
8383 }
8484 catch ( Exception ex )
@@ -89,7 +89,7 @@ public async Task SubscribeAsync(string channel, string group, int? port, bool p
8989 }
9090 }
9191
92- private async Task < int > HandleGroupMessage ( IDatabase db , string channel , string group , string consumer , Func < string , string , Task > received )
92+ private async Task < int > HandleGroupMessage ( IDatabase db , string channel , string group , string consumer , Func < string , string , Task > received , string errorChannel )
9393 {
9494 var entries = await db . StreamReadGroupAsync ( channel , group , consumer , count : 1 ) ;
9595 foreach ( var entry in entries )
@@ -106,7 +106,7 @@ private async Task<int> HandleGroupMessage(IDatabase db, string channel, string
106106 _logger . LogError ( $ "Error processing message: { ex . Message } , event id: { channel } { entry . Id } { entry . Values [ 0 ] . Value } ") ;
107107
108108 // Add a message to the Error stream, keeping only the latest 1 million messages
109- await db . StreamAddAsync ( $ " { channel } -Error" ,
109+ await db . StreamAddAsync ( errorChannel ,
110110 RedisPublisher . AssembleErrorMessage ( entry . Values [ 0 ] . Value , ex . Message ) ,
111111 messageId : entry . Id ,
112112 maxLength : 1000 * 10000 ) ;
0 commit comments