11use std:: { collections:: HashSet , sync:: RwLock } ;
22
33use redis:: AsyncCommands ;
4+ use tokio:: sync:: broadcast;
45use tracing:: { debug, error, info, instrument, warn} ;
56
6- use crate :: domain:: { QueueEntry , QueueRepository } ;
7+ use crate :: domain:: { QueueEntry , QueueEvent , QueueRepository } ;
78
89fn queue_key ( guild_id : & str ) -> String {
910 format ! ( "queue:{guild_id}" )
@@ -12,13 +13,16 @@ fn queue_key(guild_id: &str) -> String {
1213pub struct RedisQueueRepository {
1314 redis : redis:: Client ,
1415 open_guilds : RwLock < HashSet < String > > ,
16+ event_tx : broadcast:: Sender < QueueEvent > ,
1517}
1618
1719impl RedisQueueRepository {
1820 pub fn new ( redis : redis:: Client ) -> Self {
21+ let ( event_tx, _) = broadcast:: channel ( 64 ) ;
1922 Self {
2023 redis,
2124 open_guilds : RwLock :: new ( HashSet :: new ( ) ) ,
25+ event_tx,
2226 }
2327 }
2428}
@@ -39,6 +43,7 @@ impl QueueRepository for RedisQueueRepository {
3943 info ! ( guild_id, "Closing queue for guild" ) ;
4044 self . open_guilds . write ( ) . unwrap ( ) . remove ( guild_id) ;
4145 self . clear ( guild_id) . await ;
46+ self . broadcast_update ( guild_id) ;
4247 }
4348
4449 #[ instrument( skip( self ) , fields( guild_id) ) ]
@@ -111,6 +116,7 @@ impl QueueRepository for RedisQueueRepository {
111116 0
112117 } ) ;
113118 info ! ( guild_id, user_id = %entry. user_id, queue_size = new_size, "Added user to queue" ) ;
119+ self . broadcast_update ( guild_id) ;
114120 new_size
115121 }
116122
@@ -134,6 +140,9 @@ impl QueueRepository for RedisQueueRepository {
134140 Some ( e) => info ! ( guild_id, user_id = %e. user_id, "Popped user from queue" ) ,
135141 None => debug ! ( guild_id, "No entry to pop from queue" ) ,
136142 }
143+ if entry. is_some ( ) {
144+ self . broadcast_update ( guild_id) ;
145+ }
137146 entry
138147 }
139148
@@ -159,6 +168,9 @@ impl QueueRepository for RedisQueueRepository {
159168 . filter_map ( |json_str| serde_json:: from_str ( & json_str) . ok ( ) )
160169 . collect ( ) ;
161170 info ! ( guild_id, count = entries. len( ) , "Popped entries from queue" ) ;
171+ if !entries. is_empty ( ) {
172+ self . broadcast_update ( guild_id) ;
173+ }
162174 entries
163175 }
164176
@@ -201,6 +213,26 @@ impl QueueRepository for RedisQueueRepository {
201213 } else {
202214 error ! ( guild_id, "Failed to get Redis connection for clear" ) ;
203215 }
216+ self . broadcast_update ( guild_id) ;
217+ }
218+
219+ fn subscribe ( & self ) -> tokio:: sync:: broadcast:: Receiver < QueueEvent > {
220+ self . event_tx . subscribe ( )
221+ }
222+ }
223+
224+ impl RedisQueueRepository {
225+ fn broadcast_update ( & self , guild_id : & str ) {
226+ if let Err ( err) = self . event_tx . send ( QueueEvent :: Updated {
227+ guild_id : guild_id. to_string ( ) ,
228+ } ) {
229+ error ! (
230+ guild_id,
231+ "error" = ?err,
232+ "guild_id" = guild_id,
233+ "Failed to broadcast queue update event"
234+ ) ;
235+ }
204236 }
205237}
206238
0 commit comments