@@ -8,27 +8,21 @@ use futures::{
88 StreamExt ,
99 future:: { self , LocalBoxFuture } ,
1010} ;
11- use smol:: { LocalExecutor , future:: yield_now} ;
12- use tracing:: info;
11+ use tracing:: { info, warn} ;
1312
1413use crate :: { InputProvider , OutputStream , Value , VarName } ;
1514
16- pub struct VarData {
17- pub variable : VarName ,
18- pub channel_name : String ,
19- stream : Option < OutputStream < Value > > ,
20- }
21-
2215pub struct RedisInputProvider {
23- pub host : String ,
24- pub var_data : BTreeMap < VarName , VarData > ,
25- pub result : Rc < AsyncCell < anyhow:: Result < ( ) > > > ,
2616 pub started : Rc < AsyncCell < bool > > ,
17+ client : redis:: Client ,
18+ redis_stream : Option < redis:: aio:: PubSubStream > ,
19+ var_topics : BTreeMap < VarName , String > ,
20+ var_streams : BTreeMap < VarName , OutputStream < Value > > ,
21+ senders : BTreeMap < VarName , Sender < Value > > ,
2722}
2823
2924impl RedisInputProvider {
3025 pub fn new (
31- ex : Rc < LocalExecutor < ' static > > ,
3226 hostname : & str ,
3327 port : Option < u16 > ,
3428 var_topics : BTreeMap < VarName , String > ,
@@ -46,90 +40,54 @@ impl RedisInputProvider {
4640 ( ( v. clone ( ) , tx) , ( v. clone ( ) , rx) )
4741 } )
4842 . unzip ( ) ;
49- let topic_vars = var_topics
50- . iter ( )
51- . map ( |( k, v) | ( v. clone ( ) , k. clone ( ) ) )
52- . collect :: < BTreeMap < _ , _ > > ( ) ;
5343
54- let started = AsyncCell :: shared ( ) ;
55-
56- let client = redis:: Client :: open ( url. clone ( ) ) ?;
57-
58- let result = AsyncCell :: shared ( ) ;
59-
60- // TODO: Should not be spawning a task inside new. Should fix the potential race conditions
61- // instead of circumventing them like this.
62- info ! ( "Spawning RedisInputProvider on url: {:?}" , url) ;
63- ex. spawn ( RedisInputProvider :: input_monitor (
64- result. clone ( ) ,
65- client,
66- var_topics. clone ( ) ,
67- topic_vars,
68- senders,
69- started. clone ( ) ,
70- ) )
71- . detach ( ) ;
72-
73- let var_data = var_topics
44+ let var_streams: BTreeMap < VarName , OutputStream < Value > > = receivers
7445 . into_iter ( )
75- . zip ( receivers. into_values ( ) )
76- . map ( |( ( v, topic) , mut rx) | {
77- let stream = stream ! {
46+ . map ( |( v, mut rx) | {
47+ let stream: OutputStream < Value > = Box :: pin ( stream ! {
7848 while let Some ( x) = rx. recv( ) . await {
7949 yield x
8050 }
81- } ;
82- (
83- v. clone ( ) ,
84- VarData {
85- variable : v,
86- channel_name : topic,
87- stream : Some ( Box :: pin ( stream) ) ,
88- } ,
89- )
51+ } ) ;
52+ ( v. clone ( ) , stream)
9053 } )
9154 . collect ( ) ;
9255
56+ let started = AsyncCell :: shared ( ) ;
57+
58+ let client = redis:: Client :: open ( url. clone ( ) ) ?;
59+
9360 Ok ( RedisInputProvider {
94- host : url,
95- result,
96- var_data,
9761 started,
62+ client : client,
63+ redis_stream : None ,
64+ var_topics : var_topics,
65+ senders : senders,
66+ var_streams : var_streams,
9867 } )
9968 }
10069
101- async fn input_monitor (
102- result : Rc < AsyncCell < anyhow:: Result < ( ) > > > ,
103- client : redis:: Client ,
104- var_topics : BTreeMap < VarName , String > ,
105- topic_vars : BTreeMap < String , VarName > ,
106- senders : BTreeMap < VarName , Sender < Value > > ,
107- started : Rc < AsyncCell < bool > > ,
108- ) {
109- let result = result. guard_shared ( Err ( anyhow ! ( "RedisInputProvider crashed" ) ) ) ;
110- result. set (
111- RedisInputProvider :: input_monitor_with_result (
112- client, var_topics, topic_vars, senders, started,
113- )
114- . await ,
115- ) ;
70+ pub async fn connect ( & mut self ) -> anyhow:: Result < ( ) > {
71+ let mut pubsub = self . client . get_async_pubsub ( ) . await ?;
72+ let channel_names = self . var_topics . values ( ) . collect :: < Vec < _ > > ( ) ;
73+ info ! ( "Subscribing to Redis channel_names: {:?}" , channel_names) ;
74+ pubsub. subscribe ( channel_names) . await ?;
75+ let stream = pubsub. into_on_message ( ) ;
76+ self . redis_stream = Some ( stream) ;
77+ self . started . set ( true ) ;
78+ Ok ( ( ) )
11679 }
11780
118- async fn input_monitor_with_result (
119- client : redis:: Client ,
81+ async fn run_logic (
12082 var_topics : BTreeMap < VarName , String > ,
121- topic_vars : BTreeMap < String , VarName > ,
12283 mut senders : BTreeMap < VarName , Sender < Value > > ,
123- started : Rc < AsyncCell < bool > > ,
84+ mut redis_stream : redis :: aio :: PubSubStream ,
12485 ) -> anyhow:: Result < ( ) > {
125- let mut pubsub = client. get_async_pubsub ( ) . await ?;
126- let channel_names = var_topics. values ( ) . collect :: < Vec < _ > > ( ) ;
127- info ! ( "Subscribing to Redis channel_names: {:?}" , channel_names) ;
128- pubsub. subscribe ( channel_names) . await ?;
129- started. set ( true ) ;
130- let mut stream = pubsub. on_message ( ) ;
131-
132- while let Some ( msg) = stream. next ( ) . await {
86+ let topic_vars = var_topics
87+ . iter ( )
88+ . map ( |( k, v) | ( v. clone ( ) , k. clone ( ) ) )
89+ . collect :: < BTreeMap < _ , _ > > ( ) ;
90+ while let Some ( msg) = redis_stream. next ( ) . await {
13391 let var_name = topic_vars
13492 . get ( msg. get_channel_name ( ) )
13593 . ok_or_else ( || anyhow ! ( "Unknown channel name" ) ) ?;
@@ -163,26 +121,48 @@ impl InputProvider for RedisInputProvider {
163121 type Val = Value ;
164122
165123 fn input_stream ( & mut self , var : & VarName ) -> Option < OutputStream < Value > > {
166- let var_data = self . var_data . get_mut ( var) ?;
167- let stream = var_data. stream . take ( ) ?;
168- Some ( stream)
124+ self . var_streams . remove ( var)
169125 }
170126
171127 fn run ( & mut self ) -> LocalBoxFuture < ' static , anyhow:: Result < ( ) > > {
172- Box :: pin ( self . result . take_shared ( ) )
128+ let stream = self . redis_stream . take ( ) ;
129+
130+ match stream {
131+ Some ( stream) => Box :: pin ( Self :: run_logic (
132+ self . var_topics . clone ( ) ,
133+ std:: mem:: take ( & mut self . senders ) ,
134+ stream,
135+ ) ) ,
136+ None => Box :: pin ( future:: ready ( Err ( anyhow ! ( "Not connected to Redis yet" ) ) ) ) ,
137+ }
173138 }
174139
175140 fn ready ( & self ) -> LocalBoxFuture < ' static , Result < ( ) , anyhow:: Error > > {
176141 let started = self . started . clone ( ) ;
177142 Box :: pin ( async move {
143+ info ! ( "Checking if Redis input provider is ready" ) ;
144+ let mut attempts = 0 ;
178145 while !started. get ( ) . await {
179- yield_now ( ) . await ;
146+ attempts += 1 ;
147+ info ! (
148+ "Redis input provider not ready yet, checking again (attempt #{})" ,
149+ attempts
150+ ) ;
151+ smol:: Timer :: after ( std:: time:: Duration :: from_millis ( 100 ) ) . await ;
152+
153+ if attempts > 50 {
154+ warn ! (
155+ "Redis input provider still not ready after 5 seconds, continuing to wait"
156+ ) ;
157+ attempts = 0 ;
158+ }
180159 }
160+ info ! ( "Redis input provider is ready" ) ;
181161 Ok ( ( ) )
182162 } )
183163 }
184164
185165 fn vars ( & self ) -> Vec < VarName > {
186- self . var_data . keys ( ) . cloned ( ) . collect ( )
166+ self . var_topics . keys ( ) . cloned ( ) . collect ( )
187167 }
188168}
0 commit comments