@@ -56,6 +56,9 @@ def append(self, data):
5656 def add_operator (self , operator ):
5757 return ray .get (self .actor .add_operator .remote (operator ))
5858
59+ def add_multi_operator (self , operator ):
60+ return ray .get (self .actor .add_multi_operator .remote (operator ))
61+
5962 def add_source (self , source_config ):
6063 return ray .get (self .actor .add_source .remote (self , source_config ))
6164
@@ -112,45 +115,72 @@ def _idle_time(self):
112115 return time .time () - latest_timestamp
113116
114117
118+ class StreamContext :
119+ def __init__ (self ):
120+ self .sink_restrictions = {}
121+ self .subscribers = {}
122+ self .latest_sent_event_timestamp = None
123+ self .event_counter = 0
124+ self .limit_subscribers = False
125+ self .is_multi_operator = False
126+
127+ def publish (self , data ):
128+ if data is not None :
129+ for name , subscriber in self .subscribers .items ():
130+ if name in self .sink_restrictions :
131+ type_restrictions = self .sink_restrictions [name ]
132+ if not self ._accepts_data_type (data , type_restrictions ):
133+ continue
134+ _eval (self , subscriber , data )
135+
136+ # Check if the sink we are routing the message to has any restrictions
137+ # in terms of message type. A message will only be routed to a sink
138+ # if the sink accepts its type.
139+ def _accepts_data_type (self , data , type_restrictions ):
140+ # If there are no restrictions return immediately:
141+ if len (type_restrictions ) == 0 :
142+ return True
143+ for restricted_type in type_restrictions :
144+ if isinstance (data , restricted_type ):
145+ return True
146+ return False
147+
148+
115149@ray .remote (num_cpus = 0 )
116150class StreamActor :
117151 def __init__ (self , name , operator = None ):
118152 self .name = name
119- self ._subscribers = {}
120153 self ._operator = operator
121154 self ._sources = {}
122155 self ._sinks = {}
123- self ._latest_sent_event_timestamp = None
124- self ._limit_subscribers = False
125- self ._event_counter = 0
156+ self .context = StreamContext ()
126157
127158 def send_to (self , subscriber , name = None ):
128- if self ._limit_subscribers :
159+ if self .context . limit_subscribers :
129160 return
130- if name in self ._subscribers :
161+ if name in self .context . subscribers :
131162 raise RuntimeError (
132163 f'Stream { self .name } already has a subscriber named { name } .' )
133164 if name is None :
134165 name = object ()
135- self ._subscribers [name ] = subscriber
166+ self .context . subscribers [name ] = subscriber
136167
137168 def append (self , data ):
138169 if data is None :
139170 return
140171 if self ._operator is not None :
141- data = _eval (self ._operator , data )
142- for name , subscriber in self ._subscribers .items ():
143- if name in self ._sinks :
144- integration = self ._sinks [name ]
145- if not integration .accepts_data_type (data ):
146- continue
147- _eval (subscriber , data )
148- self ._latest_sent_event_timestamp = time .time ()
149- self ._event_counter += 1
172+ data = _eval (self .context , self ._operator , data )
173+ self .context .publish (data )
174+ self .context .latest_sent_event_timestamp = time .time ()
175+ self .context .event_counter += 1
150176
151177 def add_operator (self , operator ):
152178 self ._operator = operator
153179
180+ def add_multitask_operator (self , operator ):
181+ self ._operator = operator
182+ self .context .is_multi_operator = True
183+
154184 def add_source (self , stream , source_config ):
155185 source_config ["integration_type" ] = 'source'
156186 source_name = name_source (source_config )
@@ -170,13 +200,15 @@ def add_sink(self, stream, sink_config):
170200 f'Stream { self .name } already has a sink named { sink_name } .' )
171201 self ._sinks [sink_name ] = _global_camel .add_sink (
172202 stream , sink_config , sink_name )
203+ self .context .sink_restrictions [sink_name ] = self ._sinks [
204+ sink_name ].get_restricted_data_type ()
173205 return sink_name
174206
175207 def unsubscribe (self , subscriber_name ):
176208 if subscriber_name not in self ._subscribers :
177209 raise RuntimeError (f'Stream { self .name } has no subscriber named'
178210 f' { subscriber_name } .' )
179- self ._subscribers .pop (subscriber_name )
211+ self .context . subscribers .pop (subscriber_name )
180212
181213 def disconnect_source (self , source_name ):
182214 if source_name not in self ._sources :
@@ -191,7 +223,8 @@ def disconnect_sink(self, sink_name):
191223 f'Stream { self .name } has no sink named { sink_name } .' )
192224 _global_camel .disconnect (self ._sinks [sink_name ])
193225 self ._sinks .pop (sink_name )
194- self ._subscribers .pop (sink_name )
226+ self .context .sink_restrictions .pop (sink_name )
227+ self .context .subscribers .pop (sink_name )
195228
196229 def disconnect_all (self , stream_drain_timeout ):
197230 for source_name in dict (self ._sources ):
@@ -201,29 +234,35 @@ def disconnect_all(self, stream_drain_timeout):
201234 self .disconnect_sink (sink_name )
202235
203236 def event_count (self ):
204- return self .event_count
237+ return self .context . event_counter
205238
206239 def _meta (self , action , * args , ** kwargs ):
207240 return verify_do (self , _global_camel , action , * args , ** kwargs )
208241
209242 def _get_latest_timestamp (self ):
210- return self ._latest_sent_event_timestamp
243+ return self .context . latest_sent_event_timestamp
211244
212245 def _fetch_processors (self ):
213- self ._limit_subscribers = True
214- return self ._subscribers , self ._operator
246+ self .context . limit_subscribers = True
247+ return self .context . subscribers , self ._operator
215248
216249 def _update_timestamp (self , timestamp ):
217- self ._latest_sent_event_timestamp = timestamp
250+ self .context . latest_sent_event_timestamp = timestamp
218251
219252
220- def _eval (f , data ):
253+ def _eval (context , f , data ):
221254 if isinstance (f , Stream ):
222255 return f .append (data )
223256 elif isinstance (f , ray .actor .ActorHandle ):
224257 return f .append .remote (data )
225- elif isinstance (f , ray .actor .ActorMethod ) or isinstance (
226- f , ray .remote_function .RemoteFunction ):
258+ elif isinstance (f , ray .actor .ActorMethod ):
259+ return f .remote (data )
260+ elif isinstance (f , ray .remote_function .RemoteFunction ):
261+ if context .is_multi_operator :
262+ if context .subscribers is None :
263+ raise RuntimeError ('No subscribers or sinks provided.' )
264+ f .remote (context , data )
265+ return None
227266 return f .remote (data )
228267 else :
229268 return f (data )
0 commit comments