Skip to content

Commit e7e8edb

Browse files
authored
Merge pull request #14 from pyper-dev/dev
Fix type hints. Update docs/readme
2 parents 7ccb863 + 9034e4b commit e7e8edb

File tree

8 files changed

+106
-39
lines changed

8 files changed

+106
-39
lines changed

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@
2222

2323
---
2424

25-
Pyper is a comprehensive framework for concurrent and parallel data-processing, based on functional programming patterns. Used for 🌐 **Data Collection**, 🔀 **ETL Systems**, and general-purpose 🛠️ **Python Scripting**
25+
Pyper is a flexible framework for concurrent and parallel data-processing, based on functional programming patterns. Used for 🔀 **ETL Systems**, ⚙️ **Data Microservices**, and 🌐 **Data Collection**
2626

2727
See the [Documentation](https://pyper-dev.github.io/pyper/)
2828

29-
Key features:
29+
**Key features:**
3030

3131
* 💡**Intuitive API**: Easy to learn, easy to think about. Implements clean abstractions to seamlessly unify threaded, multiprocessed, and asynchronous work.
3232
* 🚀 **Functional Paradigm**: Python functions are the building blocks of data pipelines. Let's you write clean, reusable code naturally.
@@ -99,7 +99,7 @@ if __name__ == "__main__":
9999
asyncio.run(main())
100100
```
101101

102-
Pyper provides an elegant abstraction of the execution of each function via `pyper.task`, allowing you to focus on building out the **logical** functions of your program. In the `main` function:
102+
Pyper provides an elegant abstraction of the execution of each task, allowing you to focus on building out the **logical** functions of your program. In the `main` function:
103103

104104
* `pipeline` defines a function; this takes the parameters of its first task (`get_data`) and yields each output from its last task (`step3`)
105105
* Tasks are piped together using the `|` operator (motivated by Unix's pipe operator) as a syntactic representation of passing inputs/outputs between tasks.
@@ -114,7 +114,7 @@ In the pipeline, we are executing three different types of work:
114114

115115
`task` acts as one intuitive API for unifying the execution of each different type of function.
116116

117-
Each task submits their outputs to the next task within the pipeline via queue-based data structures, which is the mechanism underpinning how concurrency and parallelism are achieved. See the [docs](https://pyper-dev.github.io/pyper/docs/UserGuide/BasicConcepts) for a breakdown of what a pipeline looks like under the hood.
117+
Each task has workers that submit outputs to the next task within the pipeline via queue-based data structures; this is the mechanism underpinning how concurrency and parallelism are achieved. See the [docs](https://pyper-dev.github.io/pyper/docs/UserGuide/BasicConcepts) for a breakdown of what a pipeline looks like under the hood.
118118

119119
---
120120

docs/src/assets/img/hint1.png

34.4 KB
Loading

docs/src/assets/img/hint2.png

33.8 KB
Loading

docs/src/assets/img/hint3.png

44.7 KB
Loading

docs/src/docs/UserGuide/AdvancedConcepts.md

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,14 @@ The correct way to optimize the performance of CPU-bound tasks is through parall
7575
# Okay
7676
@task(workers=10, multiprocess=True)
7777
def long_computation(data: int):
78-
for i in range(10_000_000):
78+
for i in range(1, 1_000_000):
7979
data *= i
8080
return data
8181

8282
# Bad -- cannot benefit from concurrency
8383
@task(workers=10)
8484
def long_computation(data: int):
85-
for i in range(10_000_000):
85+
for i in range(1, 1_000_000):
8686
data *= i
8787
return data
8888
```
@@ -150,7 +150,7 @@ The correct pattern is generally to define functions which take these resources
150150
from aiohttp import ClientSession
151151
from pyper import task
152152

153-
async def list_user_ids(session: ClientSession) -> list:
153+
async def list_user_ids(session: ClientSession) -> list[int]:
154154
async with session.get("/users") as r:
155155
return await r.json()
156156

@@ -163,7 +163,7 @@ When defining a pipeline, these additional arguments are plugged into tasks usin
163163

164164
```python
165165
async def main():
166-
async with ClientSession("http://localhost:8000") as session:
166+
async with ClientSession("http://localhost:8000/api") as session:
167167
user_data_pipeline = (
168168
task(list_user_ids, branch=True, bind=task.bind(session=session))
169169
| task(fetch_user_data, workers=10, bind=task.bind(session=session))
@@ -183,7 +183,7 @@ from pyper import task, AsyncPipeline
183183

184184
def user_data_pipeline(session: ClientSession) -> AsyncPipeline:
185185

186-
async def list_user_ids() -> list:
186+
async def list_user_ids() -> list[int]:
187187
async with session.get("/users") as r:
188188
return await r.json()
189189

@@ -197,11 +197,11 @@ def user_data_pipeline(session: ClientSession) -> AsyncPipeline:
197197
)
198198
```
199199

200-
Now `user_data_pipeline` constructs a self-contained sata-flow, which can be reused without having to define its internal pipeline everytime.
200+
Now `user_data_pipeline` constructs a self-contained data-flow, which can be reused without having to define its internal pipeline everytime.
201201

202202
```python
203203
async def main():
204-
async with ClientSession("http://localhost:8000") as session:
204+
async with ClientSession("http://localhost:8000/api") as session:
205205
run = (
206206
user_data_pipeline(session)
207207
| task(write_to_file, join=True)

docs/src/docs/UserGuide/ComposingPipelines.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,13 @@ if __name__ == "__main__":
8989
```
9090

9191
{: .info}
92-
Pyper comes with fantastic IDE intellisense support which understands these operators, and will always show you which variables are `Pipeline` or `AsyncPipeline` objects; this also preserves type hints from your own functions, showing you the parameter and return type specs for each pipeline or consumer
92+
Pyper comes with fantastic intellisense support which understands these operators and preserves parameter/return type hints from user-defined functions
93+
94+
<img src="../../assets/img/hint1.png" alt="Type Hint" style="width: 500; height: auto;">
95+
96+
<img src="../../assets/img/hint2.png" alt="Type Hint" style="width: 500; height: auto;">
97+
98+
<img src="../../assets/img/hint3.png" alt="Type Hint" style="width: 500; height: auto;">
9399

94100
## Nested Pipelines
95101

docs/src/index.md

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,13 @@ The Python package space has great support for achieving
4141

4242
However, the solutions for _general-purpose_ data processing are less established.
4343

44-
Pyper aims to offer a comprehensive framework for concurrent and parallel data-processing in Python, designed with the following goals in mind:
44+
{: .text-green-200}
45+
**Pyper aims to offer a flexible framework for concurrent and parallel data-processing in Python**
46+
47+
It is designed with the following goals in mind:
4548

4649
* **Unified API**: Combine threads, processes and async code using one intuitive pattern
47-
* **Functional Paradigm**: Data pipelines compose together flexibly as functions
50+
* **Functional Paradigm**: Data pipelines compose together straightforwardly as functions
4851
* **Lazy Execution**: Built from the ground up to support generators, and provides mechanisms for fine-grained memory control
4952
* **Error Handling**: Data flows fail fast, even in long-running threads, and propagate their errors cleanly
5053
* **Complex Data Flows**: Data pipelines support branching/joining data flows, as well as sharing contexts/resources between tasks
@@ -61,6 +64,6 @@ $ pip install python-pyper
6164

6265
## Where Next?
6366

64-
* Check out the 📖 **[User Guide](./docs/UserGuide/)** to get started with Pyper
67+
* Check out the 📖 **[User Guide](./docs/UserGuide/BasicConcepts)** to get started with Pyper
6568

6669
* See some 🎯 **[Examples](./docs/Examples/)** of possible use cases

src/pyper/_core/decorators.py

Lines changed: 82 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515

1616
_P = ParamSpec('P')
1717
_R = t.TypeVar('R')
18-
_ArgsKwargs: t.TypeAlias = t.Optional[t.Tuple[t.Tuple[t.Any], t.Dict[str, t.Any]]]
18+
_Default = t.TypeVar('T', bound=t.NoReturn) # Matches to no type hints
19+
ArgsKwargs: t.TypeAlias = t.Optional[t.Tuple[t.Tuple[t.Any], t.Dict[str, t.Any]]]
1920

2021

2122
class task:
@@ -28,19 +29,57 @@ class task:
2829
workers (int): Defines the number of workers to run the task
2930
throttle (int): Limits the number of results the task is able to produce when all consumers are busy
3031
multiprocess (bool): Allows the task to be multiprocessed (cannot be `True` for async tasks)
31-
bind (tuple[args, kwargs]): Additional args and kwargs to bind to the task when defining a pipeline
32+
bind (tuple[tuple, dict]): Additional args and kwargs to bind to the task when defining a pipeline
3233
3334
Returns:
34-
Pipeline: A `Pipeline` instance consisting of one task.
35+
A `Pipeline` instance consisting of one task.
3536
36-
Example:
37-
```python
38-
def f(x: int):
39-
return x + 1
37+
Examples:
38+
```python
39+
def spam(x: int):
40+
return x + 1
41+
42+
p = task(spam)
43+
44+
def ham(x: int):
45+
return [x, x + 1, x + 2]
4046
41-
p = task(f, workers=10, multiprocess=True)
42-
```
47+
p = task(ham, branch=True, workers=10)
48+
49+
async def eggs(x: int):
50+
yield x
51+
yield x + 1
52+
yield x + 2
53+
54+
p = task(eggs, branch=True, throttle=1)
55+
```
4356
"""
57+
@t.overload
58+
def __new__(
59+
cls,
60+
func: t.Callable[_P, _Default],
61+
/,
62+
*,
63+
branch: bool = False,
64+
join: bool = False,
65+
workers: int = 1,
66+
throttle: int = 0,
67+
multiprocess: bool = False,
68+
bind: ArgsKwargs = None) -> Pipeline[_P, _Default]: ...
69+
70+
@t.overload
71+
def __new__(
72+
cls,
73+
func: None = None,
74+
/,
75+
*,
76+
branch: t.Literal[True],
77+
join: bool = False,
78+
workers: int = 1,
79+
throttle: int = 0,
80+
multiprocess: bool = False,
81+
bind: ArgsKwargs = None) -> t.Type[_branched_partial_task]: ...
82+
4483
@t.overload
4584
def __new__(
4685
cls,
@@ -52,20 +91,20 @@ def __new__(
5291
workers: int = 1,
5392
throttle: int = 0,
5493
multiprocess: bool = False,
55-
bind: _ArgsKwargs = None) -> t.Type[task]: ...
94+
bind: ArgsKwargs = None) -> t.Type[task]: ...
5695

5796
@t.overload
5897
def __new__(
5998
cls,
6099
func: t.Callable[_P, t.Union[t.Awaitable[t.Iterable[_R]], t.AsyncGenerator[_R]]],
61100
/,
62101
*,
63-
branch: True,
102+
branch: t.Literal[True],
64103
join: bool = False,
65104
workers: int = 1,
66105
throttle: int = 0,
67106
multiprocess: bool = False,
68-
bind: _ArgsKwargs = None) -> AsyncPipeline[_P, _R]: ...
107+
bind: ArgsKwargs = None) -> AsyncPipeline[_P, _R]: ...
69108

70109
@t.overload
71110
def __new__(
@@ -78,20 +117,20 @@ def __new__(
78117
workers: int = 1,
79118
throttle: int = 0,
80119
multiprocess: bool = False,
81-
bind: _ArgsKwargs = None) -> AsyncPipeline[_P, _R]: ...
120+
bind: ArgsKwargs = None) -> AsyncPipeline[_P, _R]: ...
82121

83122
@t.overload
84123
def __new__(
85124
cls,
86125
func: t.Callable[_P, t.Iterable[_R]],
87126
/,
88127
*,
89-
branch: True,
128+
branch: t.Literal[True],
90129
join: bool = False,
91130
workers: int = 1,
92131
throttle: int = 0,
93132
multiprocess: bool = False,
94-
bind: _ArgsKwargs = None) -> Pipeline[_P, _R]: ...
133+
bind: ArgsKwargs = None) -> Pipeline[_P, _R]: ...
95134

96135
@t.overload
97136
def __new__(
@@ -104,7 +143,7 @@ def __new__(
104143
workers: int = 1,
105144
throttle: int = 0,
106145
multiprocess: bool = False,
107-
bind: _ArgsKwargs = None) -> Pipeline[_P, _R]: ...
146+
bind: ArgsKwargs = None) -> Pipeline[_P, _R]: ...
108147

109148
def __new__(
110149
cls,
@@ -116,25 +155,44 @@ def __new__(
116155
workers: int = 1,
117156
throttle: int = 0,
118157
multiprocess: bool = False,
119-
bind: _ArgsKwargs = None):
158+
bind: ArgsKwargs = None):
120159
# Classic decorator trick: @task() means func is None, @task without parentheses means func is passed.
121160
if func is None:
122161
return functools.partial(cls, branch=branch, join=join, workers=workers, throttle=throttle, multiprocess=multiprocess, bind=bind)
123162
return Pipeline([Task(func=func, branch=branch, join=join, workers=workers, throttle=throttle, multiprocess=multiprocess, bind=bind)])
124163

125164
@staticmethod
126-
def bind(*args, **kwargs) -> _ArgsKwargs:
165+
def bind(*args, **kwargs) -> ArgsKwargs:
127166
"""Bind additional `args` and `kwargs` to a task.
128167
129168
Example:
130-
```python
131-
def f(x: int, y: int):
132-
return x + y
169+
```python
170+
def f(x: int, y: int):
171+
return x + y
133172
134-
p = task(f, bind=task.bind(y=1))
135-
p(x=1)
136-
```
173+
p = task(f, bind=task.bind(y=1))
174+
p(x=1)
175+
```
137176
"""
138177
if not args and not kwargs:
139178
return None
140179
return args, kwargs
180+
181+
182+
class _branched_partial_task:
183+
@t.overload
184+
def __new__(cls, func: t.Callable[_P, _Default]) -> Pipeline[_P, _Default]: ...
185+
186+
@t.overload
187+
def __new__(
188+
cls,
189+
func: t.Callable[_P, t.Union[t.Awaitable[t.Iterable[_R]], t.AsyncGenerator[_R]]]) -> AsyncPipeline[_P, _R]: ...
190+
191+
@t.overload
192+
def __new__(cls, func: t.Callable[_P, t.Iterable[_R]]) -> Pipeline[_P, _R]: ...
193+
194+
@t.overload
195+
def __new__(cls, func: t.Callable[_P, _R]) -> Pipeline[_P, t.Any]: ...
196+
197+
def __new__(cls):
198+
raise NotImplementedError

0 commit comments

Comments
 (0)