Skip to content

Commit 750aab7

Browse files
Add support for async apply
Add support for async apply
2 parents 2ed55ba + 2a7eb5b commit 750aab7

File tree

12 files changed

+200
-23
lines changed

12 files changed

+200
-23
lines changed

dev-requirements.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
-r requirements.in
12
bump2version
23
coverage
34
hypothesis

dev-requirements.txt

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,17 @@
77
aiohttp==3.5.4
88
alabaster==0.7.12 # via sphinx
99
ansicolors==1.0.2 # via cuvner
10-
appdirs==1.4.3 # via black
10+
anyio==1.0.0 # via asks
11+
appdirs==1.4.3
1112
argh==0.26.2 # via watchdog
13+
asks==2.3.3
1214
asn1crypto==0.24.0 # via cryptography
1315
astroid==2.2.5 # via pylint
16+
async-exit-stack==1.0.1
17+
async-generator==1.10
1418
async-timeout==3.0.1 # via aiohttp
1519
atomicwrites==1.3.0 # via pytest
16-
attrs==19.1.0 # via aiohttp, automat, black, hypothesis, pytest, service-identity, treq, twisted
20+
attrs==19.1.0
1721
automat==0.7.0 # via twisted
1822
babel==2.6.0 # via sphinx
1923
black==19.3b0
@@ -22,17 +26,18 @@ bump2version==0.5.10
2226
certifi==2019.3.9 # via requests
2327
cffi==1.12.2 # via cryptography
2428
chardet==3.0.4 # via aiohttp, requests
25-
click==7.0 # via black, cuvner
29+
click==7.0
2630
constantly==15.1.0 # via twisted
2731
coverage==4.5.3
2832
cryptography==2.6.1 # via pyopenssl, service-identity
2933
cuvner==18.0.1
3034
docshtest==0.0.2
3135
docutils==0.14 # via readme-renderer, sphinx
3236
filelock==3.0.10 # via tox
37+
h11==0.9.0 # via asks
3338
hyperlink==19.0.0 # via twisted
3439
hypothesis==4.17.1
35-
idna==2.8 # via hyperlink, requests, twisted, yarl
40+
idna==2.8 # via hyperlink, requests, trio, twisted, yarl
3641
imagesize==1.1.0 # via sphinx
3742
incremental==17.5.0 # via cuvner, treq, twisted
3843
isort==4.3.17 # via pylint
@@ -42,9 +47,13 @@ lazy-object-proxy==1.3.1 # via astroid
4247
lockfile==0.12.2
4348
lxml==4.3.4
4449
markupsafe==1.1.1 # via jinja2
50+
marshmallow==3.0.0rc7
4551
mccabe==0.6.1 # via pylint
4652
more-itertools==7.0.0 # via pytest
4753
multidict==4.5.2 # via aiohttp, yarl
54+
mypy-extensions==0.4.1 # via mypy, trio-typing
55+
mypy==0.711 # via trio-typing
56+
outcome==1.0.0 # via trio
4857
packaging==19.0 # via pytest-sugar, sphinx
4958
parso==0.4.0
5059
pathtools==0.1.2 # via watchdog
@@ -59,6 +68,7 @@ pyhamcrest==1.9.0 # via twisted
5968
pylint==2.3.1
6069
pyopenssl==19.0.0 # via twisted
6170
pyparsing==2.4.0 # via packaging
71+
pyrsistent==0.15.2
6272
pytest-clarity==0.1.0a1
6373
pytest-cov==2.6.1
6474
pytest-sugar==0.9.2
@@ -69,8 +79,10 @@ readme-renderer==24.0 # via twine
6979
requests-toolbelt==0.8.0 # via twine
7080
requests==2.21.0
7181
service-identity==18.1.0 # via twisted
72-
six==1.12.0 # via astroid, automat, bleach, cryptography, cuvner, packaging, pyhamcrest, pyopenssl, pytest, readme-renderer, tox, treq
82+
six==1.12.0 # via astroid, automat, bleach, cryptography, cuvner, packaging, pyhamcrest, pyopenssl, pyrsistent, pytest, readme-renderer, tox, treq
83+
sniffio==1.1.0 # via anyio, trio
7384
snowballstemmer==1.2.1 # via sphinx
85+
sortedcontainers==2.1.0 # via trio
7486
sphinx-rtd-theme==0.4.3
7587
sphinx==2.0.1
7688
sphinxcontrib-applehelp==1.0.1 # via sphinx
@@ -80,13 +92,17 @@ sphinxcontrib-jsmath==1.0.1 # via sphinx
8092
sphinxcontrib-qthelp==1.0.2 # via sphinx
8193
sphinxcontrib-serializinghtml==1.1.3 # via sphinx
8294
termcolor==1.1.0 # via pytest-clarity, pytest-sugar
83-
toml==0.10.0 # via black, tox
95+
toml==0.10.0
96+
toolz==0.9.0
8497
tox==3.9.0
8598
tqdm==4.31.1 # via twine
8699
treq==18.6.0
100+
trio-typing==0.2.0
101+
trio==0.11.0
87102
twine==1.13.0
88103
twisted[tls]==19.2.0
89-
typed-ast==1.3.1 # via astroid
104+
typed-ast==1.4.0 # via astroid, mypy
105+
typing-extensions==3.7.4 # via trio-typing
90106
unidiff==0.5.5 # via cuvner
91107
urllib3==1.24.2
92108
virtualenv==16.4.3 # via tox

requirements.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,4 @@ toml
1111
asks
1212
marshmallow>=3.0rc1
1313
pyrsistent
14+
trio_typing

requirements.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ click==7.0
1515
h11==0.9.0 # via asks
1616
idna==2.8 # via trio
1717
marshmallow==3.0.0rc7
18+
mypy-extensions==0.4.1 # via mypy, trio-typing
19+
mypy==0.711 # via trio-typing
1820
outcome==1.0.0 # via trio
1921
parso==0.4.0
2022
pyrsistent==0.15.2
@@ -23,4 +25,7 @@ sniffio==1.1.0 # via anyio, trio
2325
sortedcontainers==2.1.0 # via trio
2426
toml==0.10.0
2527
toolz==0.9.0
28+
trio-typing==0.2.0
2629
trio==0.11.0
30+
typed-ast==1.4.0 # via mypy
31+
typing-extensions==3.7.4 # via trio-typing

setup.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[bumpversion]
2-
current_version = 0.0.109
2+
current_version = 0.0.110
33
commit = True
44
tag = True
55

setup.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@
88
with open(PROJECT_ROOT / "requirements.in") as f:
99
INSTALL_REQUIRES = f.read().splitlines()
1010

11+
with open(PROJECT_ROOT / "dev-requirements.in") as f:
12+
DEV_INSTALL_REQUIRES = f.read().splitlines()
1113

1214
setuptools.setup(
1315
name="mario",
14-
version="0.0.109",
16+
version="0.0.110",
1517
description="Shell pipes for Python.",
1618
long_description=open(PROJECT_ROOT / "README.rst").read(),
1719
long_description_content_type="text/x-rst",
@@ -44,4 +46,5 @@
4446
"console_scripts": ["mario = mario.cli:cli"],
4547
"mario_plugins": ["basic = mario.plugins"],
4648
},
49+
extras={"dev": DEV_INSTALL_REQUIRES},
4750
)

src/mario/_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.0.109"
1+
__version__ = "0.0.110"

src/mario/plugins/basic.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,13 @@ async def async_filter(function, items, exit_stack, max_concurrent):
8686

8787
@registry.add_traversal("apply", calculate_more_params=calculate_function)
8888
async def apply(function, items):
89+
"""When ``function`` takes an iterable."""
8990
return traversals.AsyncIterableWrapper([await function([x async for x in items])])
9091

9192

9293
@registry.add_traversal("async_apply", calculate_more_params=calculate_function)
9394
async def async_apply(function, items):
95+
"""When ``function`` takes an async iterable."""
9496
return await traversals.async_apply(function, items)
9597

9698

src/mario/testing/atools.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
async def async_drop_falsy(items):
2+
async for x in items:
3+
if x:
4+
yield x
5+
6+
7+
def drop_falsy(items):
8+
for x in items:
9+
if x:
10+
yield x
11+
12+
13+
def my_max(items):
14+
print("hello")
15+
16+
it = iter(items)
17+
18+
try:
19+
current_max = next(it)
20+
except StopIteration as e:
21+
raise ValueError("Iterable was empty") from e
22+
for x in it:
23+
24+
if x > current_max:
25+
current_max = x
26+
return x
27+
28+
29+
# def gen_max(items):
30+
# it = iter(items)
31+
# try:
32+
# current_max = next(it)
33+
# except StopIteration as e:
34+
# raise ValueError("Iterable was empty") from e
35+
# for x in it:
36+
37+
# if x > current_max:
38+
# current_max = x
39+
# yield x
40+
41+
42+
def gen_max(items):
43+
yield from [my_max(items)]
44+
45+
46+
import functools
47+
48+
49+
def wrap_sync_fold(function):
50+
@functools.wraps(function)
51+
def wrap(items):
52+
yield from [function(items)]
53+
54+
return wrap
55+
56+
57+
wrapped_max = wrap_sync_fold(my_max)
58+
59+
wrapped_drop_falsy = wrap_sync_fold(drop_falsy)

src/mario/traversals.py

Lines changed: 94 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,101 @@ async def __anext__(self):
6868
raise StopAsyncIteration
6969

7070

71-
async def async_apply(
72-
function: Callable[[Iterable[T]], Any], aiterable: AsyncIterable[T]
71+
import typing as t
72+
import functools
73+
import itertools
74+
import threading
75+
76+
77+
import trio
78+
import trio_typing
79+
80+
T = t.TypeVar("T")
81+
82+
83+
def _pull_values_from_async_iterator(
84+
in_trio: trio.BlockingTrioPortal,
85+
ait: t.AsyncIterator[T],
86+
send_to_trio: trio.abc.SendChannel[T],
87+
):
88+
"""Make a generator by asking Trio to async-iterate over the async iterable,
89+
then yield the results.
90+
91+
This function is run in a thread.
92+
"""
93+
while True:
94+
try:
95+
yield in_trio.run(ait.__anext__)
96+
except StopAsyncIteration:
97+
break
98+
99+
100+
def _threaded_sync_apply(
101+
in_trio: trio.BlockingTrioPortal,
102+
function: t.Callable[[t.Iterable[T]], t.Any],
103+
send_to_trio: trio.abc.SendChannel,
104+
ait: t.AsyncIterator[T],
105+
receive_from_thread: trio.abc.ReceiveChannel,
106+
iterator=None,
107+
):
108+
"""Extract values from async iterable into a sync iterator, apply function to
109+
it, and send the result back into Trio.
110+
111+
This function is run in a thread.
112+
113+
"""
114+
115+
try:
116+
for x in iterator:
117+
in_trio.run(send_to_trio.send, x)
118+
finally:
119+
in_trio.run(send_to_trio.aclose)
120+
121+
122+
async def sync_apply(
123+
sync_function: t.Callable[[t.Iterable[T]], t.Any], data: t.AsyncIterable[T]
73124
):
74-
async for item in IterableToAsyncIterable(
75-
[function((await x) for x in AsyncIterableToIterable(aiterable))]
76-
):
77-
if isinstance(item, types.CoroutineType):
78-
return await item
79-
return item
125+
"""Apply a sync Callable[Iterable] to an async iterable by pulling values in a
126+
thread.
127+
"""
128+
in_trio = trio.BlockingTrioPortal()
129+
send_to_trio, receive_from_thread = trio.open_memory_channel[t.Tuple[T, t.Any]](0)
130+
131+
async with trio.open_nursery() as n:
132+
133+
iterator = await sync_function(
134+
_pull_values_from_async_iterator(in_trio, data, send_to_trio)
135+
)
136+
n.start_soon(
137+
trio.run_sync_in_worker_thread,
138+
functools.partial(
139+
_threaded_sync_apply,
140+
function=sync_function,
141+
iterator=iterator,
142+
ait=data,
143+
in_trio=in_trio,
144+
send_to_trio=send_to_trio,
145+
receive_from_thread=receive_from_thread,
146+
),
147+
)
148+
149+
async for x in receive_from_thread:
150+
yield x
151+
152+
153+
import functools
154+
155+
156+
def wrap_sync_fold(function):
157+
@functools.wraps(function)
158+
def wrap(items):
159+
yield from [function(items)]
160+
161+
return wrap
162+
163+
164+
async def async_apply(function, data):
165+
return await function(data)
80166

81167

82168
@async_generator.asynccontextmanager

0 commit comments

Comments
 (0)