Skip to content

Commit 9f343d1

Browse files
committed
Added initialize_and_wait
1 parent bf9f085 commit 9f343d1

File tree

4 files changed

+127
-4
lines changed

4 files changed

+127
-4
lines changed

docs/index.html

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,33 @@ <h1 class="title">Module <code>state_signals</code></h1>
439439
self.redis.publish(channel=&#34;event-signal-pubsub&#34;, message=sig.to_json_str())
440440
self.logger.debug(&#34;Initialization successful!&#34;)
441441

442+
def initialize_and_wait(
443+
self,
444+
await_sub_count: int,
445+
legal_events: List[str],
446+
tag: str = None,
447+
expected_resps: List[str] = None,
448+
timeout: int = 60,
449+
) -&gt; int:
450+
&#34;&#34;&#34;
451+
Calls the SignalExporter&#39;s initialize() method, and awaits a specified
452+
number of subscribers. Also includes an optional timeout. Returns 0 if
453+
sub(s) received, 1 if timed-out (or hangs if no timeout).
454+
&#34;&#34;&#34;
455+
self.initialize(
456+
legal_events=legal_events, tag=tag, expected_resps=expected_resps
457+
)
458+
counter = 0
459+
while len(self.subs) &lt; await_sub_count:
460+
time.sleep(0.1)
461+
counter += 1
462+
if counter &gt;= timeout * 10:
463+
self.logger.error(
464+
f&#34;Timeout after waiting {timeout} seconds for {await_sub_count }subs, got {len(self.subs)}&#34;
465+
)
466+
return 1
467+
return 0
468+
442469
def shutdown(self, tag: str = None) -&gt; None:
443470
&#34;&#34;&#34;
444471
Shuts down initialization response listener (stops accepting subscribers).
@@ -1135,6 +1162,33 @@ <h3>Methods</h3>
11351162
self.redis.publish(channel=&#34;event-signal-pubsub&#34;, message=sig.to_json_str())
11361163
self.logger.debug(&#34;Initialization successful!&#34;)
11371164

1165+
def initialize_and_wait(
1166+
self,
1167+
await_sub_count: int,
1168+
legal_events: List[str],
1169+
tag: str = None,
1170+
expected_resps: List[str] = None,
1171+
timeout: int = 60,
1172+
) -&gt; int:
1173+
&#34;&#34;&#34;
1174+
Calls the SignalExporter&#39;s initialize() method, and awaits a specified
1175+
number of subscribers. Also includes an optional timeout. Returns 0 if
1176+
sub(s) received, 1 if timed-out (or hangs if no timeout).
1177+
&#34;&#34;&#34;
1178+
self.initialize(
1179+
legal_events=legal_events, tag=tag, expected_resps=expected_resps
1180+
)
1181+
counter = 0
1182+
while len(self.subs) &lt; await_sub_count:
1183+
time.sleep(0.1)
1184+
counter += 1
1185+
if counter &gt;= timeout * 10:
1186+
self.logger.error(
1187+
f&#34;Timeout after waiting {timeout} seconds for {await_sub_count }subs, got {len(self.subs)}&#34;
1188+
)
1189+
return 1
1190+
return 0
1191+
11381192
def shutdown(self, tag: str = None) -&gt; None:
11391193
&#34;&#34;&#34;
11401194
Shuts down initialization response listener (stops accepting subscribers).
@@ -1187,6 +1241,45 @@ <h3>Methods</h3>
11871241
self.logger.debug(&#34;Initialization successful!&#34;)</code></pre>
11881242
</details>
11891243
</dd>
1244+
<dt id="state_signals.SignalExporter.initialize_and_wait"><code class="name flex">
1245+
<span>def <span class="ident">initialize_and_wait</span></span>(<span>self, await_sub_count: int, legal_events: List[str], tag: str = None, expected_resps: List[str] = None, timeout: int = 60) ‑> int</span>
1246+
</code></dt>
1247+
<dd>
1248+
<div class="desc"><p>Calls the SignalExporter's initialize() method, and awaits a specified
1249+
number of subscribers. Also includes an optional timeout. Returns 0 if
1250+
sub(s) received, 1 if timed-out (or hangs if no timeout).</p></div>
1251+
<details class="source">
1252+
<summary>
1253+
<span>Expand source code</span>
1254+
</summary>
1255+
<pre><code class="python">def initialize_and_wait(
1256+
self,
1257+
await_sub_count: int,
1258+
legal_events: List[str],
1259+
tag: str = None,
1260+
expected_resps: List[str] = None,
1261+
timeout: int = 60,
1262+
) -&gt; int:
1263+
&#34;&#34;&#34;
1264+
Calls the SignalExporter&#39;s initialize() method, and awaits a specified
1265+
number of subscribers. Also includes an optional timeout. Returns 0 if
1266+
sub(s) received, 1 if timed-out (or hangs if no timeout).
1267+
&#34;&#34;&#34;
1268+
self.initialize(
1269+
legal_events=legal_events, tag=tag, expected_resps=expected_resps
1270+
)
1271+
counter = 0
1272+
while len(self.subs) &lt; await_sub_count:
1273+
time.sleep(0.1)
1274+
counter += 1
1275+
if counter &gt;= timeout * 10:
1276+
self.logger.error(
1277+
f&#34;Timeout after waiting {timeout} seconds for {await_sub_count }subs, got {len(self.subs)}&#34;
1278+
)
1279+
return 1
1280+
return 0</code></pre>
1281+
</details>
1282+
</dd>
11901283
<dt id="state_signals.SignalExporter.publish_signal"><code class="name flex">
11911284
<span>def <span class="ident">publish_signal</span></span>(<span>self, event: str, sample: int = -1, tag: str = None, metadata: Dict = None, timeout: int = 20) ‑> int</span>
11921285
</code></dt>
@@ -1622,6 +1715,7 @@ <h4><code><a title="state_signals.Signal" href="#state_signals.Signal">Signal</a
16221715
<h4><code><a title="state_signals.SignalExporter" href="#state_signals.SignalExporter">SignalExporter</a></code></h4>
16231716
<ul class="">
16241717
<li><code><a title="state_signals.SignalExporter.initialize" href="#state_signals.SignalExporter.initialize">initialize</a></code></li>
1718+
<li><code><a title="state_signals.SignalExporter.initialize_and_wait" href="#state_signals.SignalExporter.initialize_and_wait">initialize_and_wait</a></code></li>
16251719
<li><code><a title="state_signals.SignalExporter.publish_signal" href="#state_signals.SignalExporter.publish_signal">publish_signal</a></code></li>
16261720
<li><code><a title="state_signals.SignalExporter.shutdown" href="#state_signals.SignalExporter.shutdown">shutdown</a></code></li>
16271721
</ul>

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "state-signals"
3-
version = "0.2.2"
3+
version = "0.3.0"
44
description = "Package for easy management of state/event signal publishing, subscribing, and responding"
55

66
license = "GPL-3.0-only"

state_signals.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,33 @@ def initialize(
391391
self.redis.publish(channel="event-signal-pubsub", message=sig.to_json_str())
392392
self.logger.debug("Initialization successful!")
393393

394+
def initialize_and_wait(
395+
self,
396+
await_sub_count: int,
397+
legal_events: List[str],
398+
tag: str = None,
399+
expected_resps: List[str] = None,
400+
timeout: int = 60,
401+
) -> int:
402+
"""
403+
Calls the SignalExporter's initialize() method, and awaits a specified
404+
number of subscribers. Also includes an optional timeout. Returns 0 if
405+
sub(s) received, 1 if timed-out (or hangs if no timeout).
406+
"""
407+
self.initialize(
408+
legal_events=legal_events, tag=tag, expected_resps=expected_resps
409+
)
410+
counter = 0
411+
while len(self.subs) < await_sub_count:
412+
time.sleep(0.1)
413+
counter += 1
414+
if counter >= timeout * 10:
415+
self.logger.error(
416+
f"Timeout after waiting {timeout} seconds for {await_sub_count }subs, got {len(self.subs)}"
417+
)
418+
return 1
419+
return 0
420+
394421
def shutdown(self, tag: str = None) -> None:
395422
"""
396423
Shuts down initialization response listener (stops accepting subscribers).

tests/basic_test.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ def _listener(responder):
3131

3232
def _init():
3333
resp_proc.start()
34-
sig_ex.initialize(legal_events=["benchmark-start", "benchmark-stop"])
34+
return sig_ex.initialize_and_wait(
35+
1, legal_events=["benchmark-start", "benchmark-stop"]
36+
)
3537

3638

3739
def _shutdown():
@@ -44,9 +46,9 @@ def _cleanup():
4446

4547
@pytest.mark.dependency()
4648
def test_init():
47-
_init()
49+
sub_check = _init()
4850
assert sig_ex.init_listener.is_alive()
49-
time.sleep(1)
51+
assert sub_check == 0
5052
assert sig_ex.subs
5153

5254

0 commit comments

Comments
 (0)