Skip to content

Commit 2c68349

Browse files
committed
pipeline: remove old tests, integrate with out test suite
These have probably not been run by anyone in ages, better to move the code to our test suite where it is regularly exercised. In fact, the latter covers most of the cases already. The only missing tests seem to be those were exceptions are raised in the first or last stage. Thus, this adds such tests.
1 parent 9756a5d commit 2c68349

File tree

2 files changed

+36
-69
lines changed

2 files changed

+36
-69
lines changed

beets/util/pipeline.py

Lines changed: 0 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -492,64 +492,3 @@ def pull(self):
492492
msgs = next_msgs
493493
for msg in msgs:
494494
yield msg
495-
496-
497-
# Smoke test.
498-
if __name__ == "__main__":
499-
import time
500-
501-
# Test a normally-terminating pipeline both in sequence and
502-
# in parallel.
503-
def produce():
504-
for i in range(5):
505-
print("generating %i" % i)
506-
time.sleep(1)
507-
yield i
508-
509-
def work():
510-
num = yield
511-
while True:
512-
print("processing %i" % num)
513-
time.sleep(2)
514-
num = yield num * 2
515-
516-
def consume():
517-
while True:
518-
num = yield
519-
time.sleep(1)
520-
print("received %i" % num)
521-
522-
ts_start = time.time()
523-
Pipeline([produce(), work(), consume()]).run_sequential()
524-
ts_seq = time.time()
525-
Pipeline([produce(), work(), consume()]).run_parallel()
526-
ts_par = time.time()
527-
Pipeline([produce(), (work(), work()), consume()]).run_parallel()
528-
ts_end = time.time()
529-
print("Sequential time:", ts_seq - ts_start)
530-
print("Parallel time:", ts_par - ts_seq)
531-
print("Multiply-parallel time:", ts_end - ts_par)
532-
print()
533-
534-
# Test a pipeline that raises an exception.
535-
def exc_produce():
536-
for i in range(10):
537-
print("generating %i" % i)
538-
time.sleep(1)
539-
yield i
540-
541-
def exc_work():
542-
num = yield
543-
while True:
544-
print("processing %i" % num)
545-
time.sleep(3)
546-
if num == 3:
547-
raise Exception()
548-
num = yield num * 2
549-
550-
def exc_consume():
551-
while True:
552-
num = yield
553-
print("received %i" % num)
554-
555-
Pipeline([exc_produce(), exc_work(), exc_consume()]).run_parallel(1)

test/test_pipeline.py

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,16 @@ def _consume(result):
3939
result.append(i)
4040

4141

42-
# A worker that raises an exception.
42+
# Pipeline stages that raise an exception.
4343
class PipelineError(Exception):
4444
pass
4545

4646

47+
def _exc_produce(num=5):
48+
yield from range(num)
49+
raise PipelineError()
50+
51+
4752
def _exc_work(num=3):
4853
i = None
4954
while True:
@@ -53,6 +58,14 @@ def _exc_work(num=3):
5358
i *= 2
5459

5560

61+
def _exc_consume(result, num=4):
62+
while True:
63+
i = yield
64+
if i == num:
65+
raise PipelineError()
66+
result.append(i)
67+
68+
5669
# A worker that yields a bubble.
5770
def _bub_work(num=3):
5871
i = None
@@ -121,17 +134,32 @@ def test_pull(self):
121134
class ExceptionTest(unittest.TestCase):
122135
def setUp(self):
123136
self.result = []
124-
self.pl = pipeline.Pipeline(
125-
(_produce(), _exc_work(), _consume(self.result))
126-
)
127137

128-
def test_run_sequential(self):
138+
def run_sequential(self, *stages):
139+
pl = pipeline.Pipeline(stages)
129140
with pytest.raises(PipelineError):
130-
self.pl.run_sequential()
141+
pl.run_sequential()
131142

132-
def test_run_parallel(self):
143+
def run_parallel(self, *stages):
144+
pl = pipeline.Pipeline(stages)
133145
with pytest.raises(PipelineError):
134-
self.pl.run_parallel()
146+
pl.run_parallel()
147+
148+
def test_run_sequential(self):
149+
"""Test that exceptions from various stages of the pipeline are
150+
properly propagated when running sequentially.
151+
"""
152+
self.run_sequential(_exc_produce(), _work(), _consume(self.result))
153+
self.run_sequential(_produce(), _exc_work(), _consume(self.result))
154+
self.run_sequential(_produce(), _work(), _exc_consume(self.result))
155+
156+
def test_run_parallel(self):
157+
"""Test that exceptions from various stages of the pipeline are
158+
properly propagated when running in parallel.
159+
"""
160+
self.run_parallel(_exc_produce(), _work(), _consume(self.result))
161+
self.run_parallel(_produce(), _exc_work(), _consume(self.result))
162+
self.run_parallel(_produce(), _work(), _exc_consume(self.result))
135163

136164
def test_pull(self):
137165
pl = pipeline.Pipeline((_produce(), _exc_work()))

0 commit comments

Comments
 (0)