Skip to content

Commit 017c2c0

Browse files
authored
Add timeouts for disconnects. (#17)
1 parent 1d4636b commit 017c2c0

File tree

3 files changed

+36
-17
lines changed

3 files changed

+36
-17
lines changed

examples/http/http_source.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
import ray
1818
import rayvens
19-
import time
2019
import sys
2120

2221
# Receive message from stock price source and print it to console using the
@@ -27,7 +26,7 @@
2726
print(f'usage: {sys.argv[0]} <run_mode>')
2827
sys.exit(1)
2928
run_mode = sys.argv[1]
30-
if run_mode not in ['operator']:
29+
if run_mode not in ['local', 'operator']:
3130
raise RuntimeError(f'Invalid run mode provided: {run_mode}')
3231

3332
# Initialize ray either on the cluster or locally otherwise.
@@ -67,7 +66,5 @@
6766
# Log all events from stream-attached sources.
6867
stream >> (lambda event: print('LOG:', event))
6968

70-
# Wait before ending program.
71-
time.sleep(20)
72-
73-
stream.disconnect_all()
69+
# Disconnect all sources after 20 seconds.
70+
stream.disconnect_all(after=20)

examples/slack/slack_operator_mode.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import ray
1818
import rayvens
1919
import sys
20-
import time
2120

2221
# Send message to Slack sink using the kamel anywhere operator implementation.
2322

@@ -50,14 +49,11 @@
5049
webhookUrl=slack_webhook)
5150

5251
# Add sink to stream.
53-
sink = stream.actor.add_sink.remote(stream, sink_config)
54-
55-
# Wait for sink to reach running state.
56-
ray.get(sink)
52+
sink = stream.add_sink(sink_config)
5753

5854
# Sends message to all sinks attached to this stream.
5955
stream << f'Sending message to Slack sink in run mode {run_mode}.'
6056

61-
time.sleep(5)
62-
63-
ray.get(stream.actor.disconnect_all.remote())
57+
# Disconnect any sources or sinks attached to the stream 2 seconds after
58+
# the stream is idle (i.e. no events were propagated by the stream).
59+
stream.disconnect_all(after_idle_for=2)

rayvens/api.py

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import os
1818
import ray
19+
import time
1920

2021
from rayvens.core.local import start as start_http
2122
from rayvens.core.kafka import start as start_kafka
@@ -63,18 +64,38 @@ def add_sink(self, sink_config):
6364
def unsubscribe(self, subscriber_name):
6465
return ray.get(self.actor.unsubscribe.remote(subscriber_name))
6566

66-
def disconnect_source(self, source_name):
67+
def disconnect_source(self, source_name, after_idle_for=None, after=None):
68+
self._wait_for_timeout(after_idle_for)
6769
return ray.get(self.actor.disconnect_source.remote(source_name))
6870

69-
def disconnect_sink(self, sink_name):
71+
def disconnect_sink(self, sink_name, after_idle_for=None, after=None):
72+
self._wait_for_timeout(after_idle_for)
7073
return ray.get(self.actor.disconnect_sink.remote(sink_name))
7174

72-
def disconnect_all(self):
75+
def disconnect_all(self, after_idle_for=None, after=None):
76+
self._wait_for_timeout(after_idle_for, after)
7377
return ray.get(self.actor.disconnect_all.remote())
7478

7579
def _meta(self, action, *args, **kwargs):
7680
return ray.get(self.actor._meta.remote(action, *args, **kwargs))
7781

82+
def _wait_for_timeout(self, after_idle_for, after):
83+
if after_idle_for is not None and after_idle_for > 0:
84+
while True:
85+
time_elapsed_since_last_event = self._idle_time()
86+
87+
# Idle timeout exceeds the user-specified time limit:
88+
if time_elapsed_since_last_event > after_idle_for:
89+
break
90+
91+
# Check again after waiting for the rest of the timeout time:
92+
time.sleep(after_idle_for - time_elapsed_since_last_event + 1)
93+
if after is not None and after > 0:
94+
time.sleep(after)
95+
96+
def _idle_time(self):
97+
return time.time() - ray.get(self.actor._get_latest_timestamp.remote())
98+
7899

79100
@ray.remote(num_cpus=0)
80101
class StreamActor:
@@ -84,6 +105,7 @@ def __init__(self, name, operator=None):
84105
self._operator = operator
85106
self._sources = {}
86107
self._sinks = {}
108+
self._latest_sent_event_timestamp = None
87109

88110
def send_to(self, subscriber, name=None):
89111
if name in self._subscribers:
@@ -104,6 +126,7 @@ def append(self, data):
104126
if not integration.accepts_data_type(data):
105127
continue
106128
_eval(subscriber, data)
129+
self._latest_sent_event_timestamp = time.time()
107130

108131
def add_operator(self, operator):
109132
self._operator = operator
@@ -159,6 +182,9 @@ def disconnect_all(self):
159182
def _meta(self, action, *args, **kwargs):
160183
return verify_do(self, _global_camel, action, *args, **kwargs)
161184

185+
def _get_latest_timestamp(self):
186+
return self._latest_sent_event_timestamp
187+
162188

163189
def _eval(f, data):
164190
if isinstance(f, Stream):

0 commit comments

Comments
 (0)