Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file removed assets/pyper.png
Binary file not shown.
1 change: 0 additions & 1 deletion docs/Tutorial/Pipeline.md

This file was deleted.

1 change: 0 additions & 1 deletion docs/Tutorial/task.md

This file was deleted.

21 changes: 0 additions & 21 deletions docs/index.html

This file was deleted.

12 changes: 6 additions & 6 deletions src/pyper/_core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@


class Pipeline:
"""A sequence of at least 1 connected Tasks.
"""A sequence of at least 1 Tasks.

Two pipelines can be piped into another via:
```python
new_pipeline = p1 >> p2
new_pipeline = p1 | p2
# OR
new_pipeline = p1.pipe(p2)
```
Expand Down Expand Up @@ -43,8 +43,8 @@ def pipe(self, other) -> Pipeline:
raise TypeError(f"{other} of type {type(other)} cannot be piped into a Pipeline")
return Pipeline(self.tasks + other.tasks)

def __rshift__(self, other: Pipeline) -> Pipeline:
"""Allow the syntax `pipeline1 >> pipeline2`."""
def __or__(self, other: Pipeline) -> Pipeline:
"""Allow the syntax `pipeline1 | pipeline2`."""
return self.pipe(other)

def consume(self, other: Callable) -> Callable:
Expand All @@ -55,8 +55,8 @@ def consumer(*args, **kwargs):
return consumer
raise TypeError(f"{other} must be a callable that takes a generator")

def __and__(self, other: Callable) -> Callable:
"""Allow the syntax `pipeline & consumer`."""
def __gt__(self, other: Callable) -> Callable:
"""Allow the syntax `pipeline > consumer`."""
return self.consume(other)

def __repr__(self):
Expand Down
2 changes: 1 addition & 1 deletion src/pyper/_core/sync_helper/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __call__(self, *args, **kwargs):
tp.raise_error_if_exists()
try:
# Use the timeout strategy for unblocking main thread without busy waiting
if (data := q_out.get(timeout=1)) is StopSentinel:
if (data := q_out.get(timeout=0.1)) is StopSentinel:
tp.raise_error_if_exists()
break
yield data
Expand Down
3 changes: 3 additions & 0 deletions src/pyper/_core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

class Task:
"""The representation of a function within a Pipeline."""

__slots__ = "is_gen", "is_async", "func", "join", "concurrency", "throttle", "daemon"

def __init__(
self,
func: Callable,
Expand Down
16 changes: 8 additions & 8 deletions tests/test_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,23 @@ async def consumer(data):

@pytest.mark.asyncio
async def test_pipeline():
p = task(f1) >> task(f2)
p = task(f1) | task(f2)
assert p(1).__next__() == 1

@pytest.mark.asyncio
async def test_joined_pipeline():
p = task(af1) >> task(af2) >> task(af4, join=True)
p = task(af1) | task(af2) | task(af4, join=True)
assert await p(1).__anext__() == 1

@pytest.mark.asyncio
async def test_consumer():
p = task(af1) >> task(af2) & consumer
p = task(af1) | task(af2) > consumer
assert await p(1) == 1

@pytest.mark.asyncio
async def test_invalid_first_stage_concurrency():
try:
p = task(af1, concurrency=2) >> task(af2) & consumer
p = task(af1, concurrency=2) | task(af2) > consumer
await p(1)
except Exception as e:
assert isinstance(e, RuntimeError)
Expand All @@ -58,7 +58,7 @@ async def test_invalid_first_stage_concurrency():
@pytest.mark.asyncio
async def test_invalid_first_stage_join():
try:
p = task(af1, join=True) >> task(af2) & consumer
p = task(af1, join=True) | task(af2) > consumer
await p(1)
except Exception as e:
assert isinstance(e, RuntimeError)
Expand All @@ -68,7 +68,7 @@ async def test_invalid_first_stage_join():
@pytest.mark.asyncio
async def test_error_handling():
try:
p = task(af1) >> task(af2) >> task(af3) & consumer
p = task(af1) | task(af2) | task(af3) > consumer
await p(1)
except Exception as e:
assert isinstance(e, RuntimeError)
Expand All @@ -77,13 +77,13 @@ async def test_error_handling():

@pytest.mark.asyncio
async def test_unified_pipeline():
p = task(af1) >> task(f1) >> task(af2) >> task(f2) & consumer
p = task(af1) | task(f1) | task(af2) | task(f2) > consumer
assert await p(1) == 1

@pytest.mark.asyncio
async def test_error_handling_in_daemon():
try:
p = task(af1) >> task(af2) >> task(f3, daemon=True) & consumer
p = task(af1) | task(af2) | task(f3, daemon=True) > consumer
await p(1)
except Exception as e:
assert isinstance(e, RuntimeError)
Expand Down
20 changes: 10 additions & 10 deletions tests/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def f4(a1, a2, a3, data, k1, k2):

def f5(data):
# Make queue monitor timeout on main thread
time.sleep(1.2)
time.sleep(0.2)
raise RuntimeError

def consumer(data):
Expand All @@ -27,28 +27,28 @@ def consumer(data):
return total

def test_pipeline():
p = task(f1) >> task(f2)
p = task(f1) | task(f2)
assert p(1).__next__() == 1

def test_joined_pipeline():
p = task(f1) >> task(f2) >> task(f3, join=True)
p = task(f1) | task(f2) | task(f3, join=True)
assert p(1).__next__() == 1

def test_bind():
p = task(f1) >> task(f4, bind=task.bind(1, 1, 1, k1=1, k2=2))
p = task(f1) | task(f4, bind=task.bind(1, 1, 1, k1=1, k2=2))
assert p(1).__next__() == 1

def test_redundant_bind_ok():
p = task(f1) >> task(f2, bind=task.bind())
p = task(f1) | task(f2, bind=task.bind())
assert p(1).__next__() == 1

def test_consumer():
p = task(f1) >> task(f2) & consumer
p = task(f1) | task(f2) > consumer
assert p(1) == 1

def test_invalid_first_stage_concurrency():
try:
p = task(f1, concurrency=2) >> task(f2) & consumer
p = task(f1, concurrency=2) | task(f2) > consumer
p(1)
except Exception as e:
assert isinstance(e, RuntimeError)
Expand All @@ -57,7 +57,7 @@ def test_invalid_first_stage_concurrency():

def test_invalid_first_stage_join():
try:
p = task(f1, join=True) >> task(f2) & consumer
p = task(f1, join=True) | task(f2) > consumer
p(1)
except Exception as e:
assert isinstance(e, RuntimeError)
Expand All @@ -66,7 +66,7 @@ def test_invalid_first_stage_join():

def test_error_handling():
try:
p = task(f1) >> task(f2) >> task(f5) & consumer
p = task(f1) | task(f2) | task(f5) > consumer
p(1)
except Exception as e:
print(e)
Expand All @@ -76,7 +76,7 @@ def test_error_handling():

def test_error_handling_in_daemon():
try:
p = task(f5, daemon=True) >> task(f2) & consumer
p = task(f5, daemon=True) | task(f2) > consumer
p(1)
except Exception as e:
print(e)
Expand Down
10 changes: 5 additions & 5 deletions tests/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,36 +87,36 @@ def test_async_task():
assert isinstance(p, AsyncPipeline)

def test_piped_async_task():
p = task(afunc) >> task(func)
p = task(afunc) | task(func)
assert isinstance(p, AsyncPipeline)

def test_invalid_pipe():
try:
task(func) >> 1
task(func) | 1
except Exception as e:
assert isinstance(e, TypeError)
else:
raise AssertionError

def test_invalid_async_pipe():
try:
task(afunc) >> 1
task(afunc) | 1
except Exception as e:
assert isinstance(e, TypeError)
else:
raise AssertionError

def test_invalid_consumer():
try:
task(func) & 1
task(func) > 1
except Exception as e:
assert isinstance(e, TypeError)
else:
raise AssertionError

def test_invalid_async_consumer():
try:
task(afunc) & func
task(afunc) > func
except Exception as e:
assert isinstance(e, TypeError)
else:
Expand Down