@@ -42,21 +42,19 @@ pub async fn consumer(cfg: &RedisInputOpts) -> Result<DynConsumer> {
42
42
. unwrap_or_else ( || format ! ( "{}_delays" , cfg. queue_key) ) ;
43
43
let delayed_lock_key = format ! ( "{delayed_queue_key}_lock" ) ;
44
44
45
- backends:: RedisBackend :: < backends:: redis:: RedisMultiplexedConnectionManager > :: builder (
46
- backends:: RedisConfig {
47
- dsn : cfg. dsn . clone ( ) ,
48
- max_connections : cfg. max_connections ,
49
- reinsert_on_nack : cfg. reinsert_on_nack ,
50
- queue_key : cfg. queue_key . clone ( ) ,
51
- delayed_queue_key,
52
- delayed_lock_key,
53
- consumer_group : cfg. consumer_group . clone ( ) ,
54
- consumer_name : cfg. consumer_name . clone ( ) ,
55
- // FIXME: expose in config?
56
- payload_key : "payload" . to_string ( ) ,
57
- ack_deadline_ms : cfg. ack_deadline_ms ,
58
- } ,
59
- )
45
+ backends:: RedisBackend :: builder ( backends:: RedisConfig {
46
+ dsn : cfg. dsn . clone ( ) ,
47
+ max_connections : cfg. max_connections ,
48
+ reinsert_on_nack : cfg. reinsert_on_nack ,
49
+ queue_key : cfg. queue_key . clone ( ) ,
50
+ delayed_queue_key,
51
+ delayed_lock_key,
52
+ consumer_group : cfg. consumer_group . clone ( ) ,
53
+ consumer_name : cfg. consumer_name . clone ( ) ,
54
+ // FIXME: expose in config?
55
+ payload_key : "payload" . to_string ( ) ,
56
+ ack_deadline_ms : cfg. ack_deadline_ms ,
57
+ } )
60
58
. make_dynamic ( )
61
59
. build_consumer ( )
62
60
. await
@@ -69,22 +67,20 @@ pub async fn producer(cfg: &RedisOutputOpts) -> Result<DynProducer> {
69
67
. unwrap_or_else ( || format ! ( "{}_delays" , cfg. queue_key) ) ;
70
68
let delayed_lock_key = format ! ( "{delayed_queue_key}_lock" ) ;
71
69
72
- backends:: RedisBackend :: < backends:: redis:: RedisMultiplexedConnectionManager > :: builder (
73
- backends:: RedisConfig {
74
- dsn : cfg. dsn . clone ( ) ,
75
- max_connections : cfg. max_connections ,
76
- queue_key : cfg. queue_key . clone ( ) ,
77
- delayed_queue_key,
78
- delayed_lock_key,
79
- // FIXME: expose in config?
80
- payload_key : "payload" . to_string ( ) ,
81
- // consumer stuff we don't care about.
82
- reinsert_on_nack : false ,
83
- consumer_group : String :: new ( ) ,
84
- consumer_name : String :: new ( ) ,
85
- ack_deadline_ms : cfg. ack_deadline_ms ,
86
- } ,
87
- )
70
+ backends:: RedisBackend :: builder ( backends:: RedisConfig {
71
+ dsn : cfg. dsn . clone ( ) ,
72
+ max_connections : cfg. max_connections ,
73
+ queue_key : cfg. queue_key . clone ( ) ,
74
+ delayed_queue_key,
75
+ delayed_lock_key,
76
+ // FIXME: expose in config?
77
+ payload_key : "payload" . to_string ( ) ,
78
+ // consumer stuff we don't care about.
79
+ reinsert_on_nack : false ,
80
+ consumer_group : String :: new ( ) ,
81
+ consumer_name : String :: new ( ) ,
82
+ ack_deadline_ms : cfg. ack_deadline_ms ,
83
+ } )
88
84
. make_dynamic ( )
89
85
. build_producer ( )
90
86
. await
0 commit comments