@@ -21,6 +21,12 @@ fn decode_form_encoded_url(input: &str) -> Result<Arc<str>> {
2121 Ok ( urlencoding:: decode ( & with_spaces) ?. into ( ) )
2222}
2323
24+ #[ derive( Debug , Deserialize ) ]
25+ pub struct RedisConfig {
26+ redis_url : String ,
27+ redis_channel : String ,
28+ }
29+
2430#[ derive( Debug , Deserialize ) ]
2531pub struct Spec {
2632 bucket_name : String ,
@@ -29,8 +35,7 @@ pub struct Spec {
2935 included_patterns : Option < Vec < String > > ,
3036 excluded_patterns : Option < Vec < String > > ,
3137 sqs_queue_url : Option < String > ,
32- redis_url : Option < String > ,
33- redis_channel : Option < String > ,
38+ redis : Option < RedisConfig > ,
3439}
3540
3641struct SqsContext {
@@ -339,65 +344,51 @@ impl Executor {
339344 let mut pubsub = redis_context. subscribe ( ) . await ?;
340345 let mut change_messages = vec ! [ ] ;
341346
342- // Try to get a message with a timeout
343- let message_result = tokio:: time:: timeout (
344- std:: time:: Duration :: from_secs ( 20 ) ,
345- pubsub. on_message ( ) . next ( ) ,
346- )
347- . await ;
348-
349- match message_result {
350- Ok ( Some ( message) ) => {
351- let payload: String = message. get_payload ( ) ?;
352- // Parse the Redis message - MinIO sends S3 event notifications in JSON format
353- let notification: S3EventNotification = utils:: deser:: from_json_str ( & payload) ?;
354- let mut changes = vec ! [ ] ;
347+ // Wait for a message without timeout - long waiting is expected for event notifications
348+ let message = pubsub. on_message ( ) . next ( ) . await ;
355349
356- for record in notification. records {
357- let s3 = if let Some ( s3) = record. s3 {
358- s3
359- } else {
360- continue ;
361- } ;
350+ if let Some ( message) = message {
351+ let payload: String = message. get_payload ( ) ?;
352+ // Parse the Redis message - MinIO sends S3 event notifications in JSON format
353+ let notification: S3EventNotification = utils:: deser:: from_json_str ( & payload) ?;
354+ let mut changes = vec ! [ ] ;
362355
363- if s3. bucket . name != self . bucket_name {
364- continue ;
365- }
356+ for record in notification. records {
357+ let s3 = if let Some ( s3) = record. s3 {
358+ s3
359+ } else {
360+ continue ;
361+ } ;
366362
367- if !self
368- . prefix
369- . as_ref ( )
370- . is_none_or ( |prefix| s3. object . key . starts_with ( prefix) )
371- {
372- continue ;
373- }
363+ if s3. bucket . name != self . bucket_name {
364+ continue ;
365+ }
374366
375- if record. event_name . starts_with ( "ObjectCreated:" )
376- || record. event_name . starts_with ( "ObjectRemoved:" )
377- {
378- let decoded_key = decode_form_encoded_url ( & s3. object . key ) ?;
379- changes. push ( SourceChange {
380- key : KeyValue :: from_single_part ( decoded_key) ,
381- key_aux_info : serde_json:: Value :: Null ,
382- data : PartialSourceRowData :: default ( ) ,
383- } ) ;
384- }
367+ if !self
368+ . prefix
369+ . as_ref ( )
370+ . is_none_or ( |prefix| s3. object . key . starts_with ( prefix) )
371+ {
372+ continue ;
385373 }
386374
387- if !changes. is_empty ( ) {
388- change_messages. push ( SourceChangeMessage {
389- changes,
390- ack_fn : None , // Redis pub/sub doesn't require acknowledgment
375+ if record. event_name . starts_with ( "ObjectCreated:" )
376+ || record. event_name . starts_with ( "ObjectRemoved:" )
377+ {
378+ let decoded_key = decode_form_encoded_url ( & s3. object . key ) ?;
379+ changes. push ( SourceChange {
380+ key : KeyValue :: from_single_part ( decoded_key) ,
381+ key_aux_info : serde_json:: Value :: Null ,
382+ data : PartialSourceRowData :: default ( ) ,
391383 } ) ;
392384 }
393385 }
394- Ok ( None ) => {
395- // No message received
396- return Ok ( Vec :: new ( ) ) ;
397- }
398- Err ( _) => {
399- // Timeout - no message received, return empty vec
400- return Ok ( Vec :: new ( ) ) ;
386+
387+ if !changes. is_empty ( ) {
388+ change_messages. push ( SourceChangeMessage {
389+ changes,
390+ ack_fn : None , // Redis pub/sub doesn't require acknowledgment
391+ } ) ;
401392 }
402393 }
403394
@@ -452,14 +443,13 @@ impl SourceFactoryBase for Factory {
452443 ) -> Result < Box < dyn SourceExecutor > > {
453444 let config = aws_config:: load_defaults ( BehaviorVersion :: latest ( ) ) . await ;
454445
455- let redis_context =
456- if let ( Some ( redis_url) , Some ( redis_channel) ) = ( spec. redis_url , spec. redis_channel ) {
457- Some ( Arc :: new (
458- RedisContext :: new ( & redis_url, & redis_channel) . await ?,
459- ) )
460- } else {
461- None
462- } ;
446+ let redis_context = if let Some ( redis_config) = & spec. redis {
447+ Some ( Arc :: new (
448+ RedisContext :: new ( & redis_config. redis_url , & redis_config. redis_channel ) . await ?,
449+ ) )
450+ } else {
451+ None
452+ } ;
463453
464454 Ok ( Box :: new ( Executor {
465455 client : Client :: new ( & config) ,
0 commit comments