diff --git a/assets/pyper.png b/assets/pyper.png
deleted file mode 100644
index d08abbe..0000000
Binary files a/assets/pyper.png and /dev/null differ
diff --git a/docs/Tutorial/Pipeline.md b/docs/Tutorial/Pipeline.md
deleted file mode 100644
index 9a70065..0000000
--- a/docs/Tutorial/Pipeline.md
+++ /dev/null
@@ -1 +0,0 @@
-# Pipeline
\ No newline at end of file
diff --git a/docs/Tutorial/task.md b/docs/Tutorial/task.md
deleted file mode 100644
index 6f59a15..0000000
--- a/docs/Tutorial/task.md
+++ /dev/null
@@ -1 +0,0 @@
-# task
\ No newline at end of file
diff --git a/docs/index.html b/docs/index.html
deleted file mode 100644
index d97b1ba..0000000
--- a/docs/index.html
+++ /dev/null
@@ -1,21 +0,0 @@
-
-
-
-
-
- Pyper
-
-
-
-
- Lorem ipsum dolor sit amet, consectetur adipiscing elit. Nullam accumsan sapien eu odio malesuada, vel posuere nunc facilisis. Morbi auctor ipsum nec eros dictum, quis pulvinar lorem pharetra.
- Sed varius, lacus sit amet luctus tincidunt, sapien lacus vehicula risus, nec congue erat neque nec nunc. Integer in sagittis lectus. Aliquam erat volutpat. Aenean malesuada purus non ipsum faucibus, quis scelerisque augue ultrices.
- Curabitur efficitur magna id metus ultricies, nec auctor nulla bibendum. Ut vitae justo nec arcu tincidunt facilisis.
-
-
-
-
\ No newline at end of file
diff --git a/src/pyper/_core/pipeline.py b/src/pyper/_core/pipeline.py
index 2a79359..341d528 100644
--- a/src/pyper/_core/pipeline.py
+++ b/src/pyper/_core/pipeline.py
@@ -11,11 +11,11 @@
class Pipeline:
- """A sequence of at least 1 connected Tasks.
+ """A sequence of at least 1 Tasks.
Two pipelines can be piped into another via:
```python
- new_pipeline = p1 >> p2
+ new_pipeline = p1 | p2
# OR
new_pipeline = p1.pipe(p2)
```
@@ -43,8 +43,8 @@ def pipe(self, other) -> Pipeline:
raise TypeError(f"{other} of type {type(other)} cannot be piped into a Pipeline")
return Pipeline(self.tasks + other.tasks)
- def __rshift__(self, other: Pipeline) -> Pipeline:
- """Allow the syntax `pipeline1 >> pipeline2`."""
+ def __or__(self, other: Pipeline) -> Pipeline:
+ """Allow the syntax `pipeline1 | pipeline2`."""
return self.pipe(other)
def consume(self, other: Callable) -> Callable:
@@ -55,8 +55,8 @@ def consumer(*args, **kwargs):
return consumer
raise TypeError(f"{other} must be a callable that takes a generator")
- def __and__(self, other: Callable) -> Callable:
- """Allow the syntax `pipeline & consumer`."""
+ def __gt__(self, other: Callable) -> Callable:
+ """Allow the syntax `pipeline > consumer`."""
return self.consume(other)
def __repr__(self):
diff --git a/src/pyper/_core/sync_helper/output.py b/src/pyper/_core/sync_helper/output.py
index 59eeaca..b473bdf 100644
--- a/src/pyper/_core/sync_helper/output.py
+++ b/src/pyper/_core/sync_helper/output.py
@@ -38,7 +38,7 @@ def __call__(self, *args, **kwargs):
tp.raise_error_if_exists()
try:
# Use the timeout strategy for unblocking main thread without busy waiting
- if (data := q_out.get(timeout=1)) is StopSentinel:
+ if (data := q_out.get(timeout=0.1)) is StopSentinel:
tp.raise_error_if_exists()
break
yield data
diff --git a/src/pyper/_core/task.py b/src/pyper/_core/task.py
index fcab208..1707be2 100644
--- a/src/pyper/_core/task.py
+++ b/src/pyper/_core/task.py
@@ -7,6 +7,9 @@
class Task:
"""The representation of a function within a Pipeline."""
+
+ __slots__ = "is_gen", "is_async", "func", "join", "concurrency", "throttle", "daemon"
+
def __init__(
self,
func: Callable,
diff --git a/tests/test_async.py b/tests/test_async.py
index d02fba5..c26c670 100644
--- a/tests/test_async.py
+++ b/tests/test_async.py
@@ -32,23 +32,23 @@ async def consumer(data):
@pytest.mark.asyncio
async def test_pipeline():
- p = task(f1) >> task(f2)
+ p = task(f1) | task(f2)
assert p(1).__next__() == 1
@pytest.mark.asyncio
async def test_joined_pipeline():
- p = task(af1) >> task(af2) >> task(af4, join=True)
+ p = task(af1) | task(af2) | task(af4, join=True)
assert await p(1).__anext__() == 1
@pytest.mark.asyncio
async def test_consumer():
- p = task(af1) >> task(af2) & consumer
+ p = task(af1) | task(af2) > consumer
assert await p(1) == 1
@pytest.mark.asyncio
async def test_invalid_first_stage_concurrency():
try:
- p = task(af1, concurrency=2) >> task(af2) & consumer
+ p = task(af1, concurrency=2) | task(af2) > consumer
await p(1)
except Exception as e:
assert isinstance(e, RuntimeError)
@@ -58,7 +58,7 @@ async def test_invalid_first_stage_concurrency():
@pytest.mark.asyncio
async def test_invalid_first_stage_join():
try:
- p = task(af1, join=True) >> task(af2) & consumer
+ p = task(af1, join=True) | task(af2) > consumer
await p(1)
except Exception as e:
assert isinstance(e, RuntimeError)
@@ -68,7 +68,7 @@ async def test_invalid_first_stage_join():
@pytest.mark.asyncio
async def test_error_handling():
try:
- p = task(af1) >> task(af2) >> task(af3) & consumer
+ p = task(af1) | task(af2) | task(af3) > consumer
await p(1)
except Exception as e:
assert isinstance(e, RuntimeError)
@@ -77,13 +77,13 @@ async def test_error_handling():
@pytest.mark.asyncio
async def test_unified_pipeline():
- p = task(af1) >> task(f1) >> task(af2) >> task(f2) & consumer
+ p = task(af1) | task(f1) | task(af2) | task(f2) > consumer
assert await p(1) == 1
@pytest.mark.asyncio
async def test_error_handling_in_daemon():
try:
- p = task(af1) >> task(af2) >> task(f3, daemon=True) & consumer
+ p = task(af1) | task(af2) | task(f3, daemon=True) > consumer
await p(1)
except Exception as e:
assert isinstance(e, RuntimeError)
diff --git a/tests/test_sync.py b/tests/test_sync.py
index 6026ee6..621ebc1 100644
--- a/tests/test_sync.py
+++ b/tests/test_sync.py
@@ -17,7 +17,7 @@ def f4(a1, a2, a3, data, k1, k2):
def f5(data):
# Make queue monitor timeout on main thread
- time.sleep(1.2)
+ time.sleep(0.2)
raise RuntimeError
def consumer(data):
@@ -27,28 +27,28 @@ def consumer(data):
return total
def test_pipeline():
- p = task(f1) >> task(f2)
+ p = task(f1) | task(f2)
assert p(1).__next__() == 1
def test_joined_pipeline():
- p = task(f1) >> task(f2) >> task(f3, join=True)
+ p = task(f1) | task(f2) | task(f3, join=True)
assert p(1).__next__() == 1
def test_bind():
- p = task(f1) >> task(f4, bind=task.bind(1, 1, 1, k1=1, k2=2))
+ p = task(f1) | task(f4, bind=task.bind(1, 1, 1, k1=1, k2=2))
assert p(1).__next__() == 1
def test_redundant_bind_ok():
- p = task(f1) >> task(f2, bind=task.bind())
+ p = task(f1) | task(f2, bind=task.bind())
assert p(1).__next__() == 1
def test_consumer():
- p = task(f1) >> task(f2) & consumer
+ p = task(f1) | task(f2) > consumer
assert p(1) == 1
def test_invalid_first_stage_concurrency():
try:
- p = task(f1, concurrency=2) >> task(f2) & consumer
+ p = task(f1, concurrency=2) | task(f2) > consumer
p(1)
except Exception as e:
assert isinstance(e, RuntimeError)
@@ -57,7 +57,7 @@ def test_invalid_first_stage_concurrency():
def test_invalid_first_stage_join():
try:
- p = task(f1, join=True) >> task(f2) & consumer
+ p = task(f1, join=True) | task(f2) > consumer
p(1)
except Exception as e:
assert isinstance(e, RuntimeError)
@@ -66,7 +66,7 @@ def test_invalid_first_stage_join():
def test_error_handling():
try:
- p = task(f1) >> task(f2) >> task(f5) & consumer
+ p = task(f1) | task(f2) | task(f5) > consumer
p(1)
except Exception as e:
print(e)
@@ -76,7 +76,7 @@ def test_error_handling():
def test_error_handling_in_daemon():
try:
- p = task(f5, daemon=True) >> task(f2) & consumer
+ p = task(f5, daemon=True) | task(f2) > consumer
p(1)
except Exception as e:
print(e)
diff --git a/tests/test_task.py b/tests/test_task.py
index 1515c21..0579e1b 100644
--- a/tests/test_task.py
+++ b/tests/test_task.py
@@ -87,12 +87,12 @@ def test_async_task():
assert isinstance(p, AsyncPipeline)
def test_piped_async_task():
- p = task(afunc) >> task(func)
+ p = task(afunc) | task(func)
assert isinstance(p, AsyncPipeline)
def test_invalid_pipe():
try:
- task(func) >> 1
+ task(func) | 1
except Exception as e:
assert isinstance(e, TypeError)
else:
@@ -100,7 +100,7 @@ def test_invalid_pipe():
def test_invalid_async_pipe():
try:
- task(afunc) >> 1
+ task(afunc) | 1
except Exception as e:
assert isinstance(e, TypeError)
else:
@@ -108,7 +108,7 @@ def test_invalid_async_pipe():
def test_invalid_consumer():
try:
- task(func) & 1
+ task(func) > 1
except Exception as e:
assert isinstance(e, TypeError)
else:
@@ -116,7 +116,7 @@ def test_invalid_consumer():
def test_invalid_async_consumer():
try:
- task(afunc) & func
+ task(afunc) > func
except Exception as e:
assert isinstance(e, TypeError)
else: