Skip to content

Commit cf36d18

Browse files
committed
Make stream() async so is non-blocking
1 parent 3b4a5a4 commit cf36d18

File tree

3 files changed

+44
-29
lines changed

3 files changed

+44
-29
lines changed

shiny/api-examples/MarkdownStream/app-core.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
import time
1+
import asyncio
22

33
import requests
44

5-
from shiny import App, ui
5+
from shiny import App, ui, reactive
66

77
app_ui = ui.page_fluid(
88
ui.card(
@@ -23,13 +23,16 @@ def server(input, output, session):
2323
readme_chunks = readme.text.replace("\n", " \n ").split(" ")
2424

2525
# Generate words from the README.md file (with a small delay)
26-
def chunk_generator():
26+
async def chunk_generator():
2727
for chunk in readme_chunks:
28-
time.sleep(0.02)
28+
await asyncio.sleep(0.02)
2929
yield chunk + " "
3030

3131
md = ui.MarkdownStream("shiny-readme")
32-
md.stream(chunk_generator())
32+
33+
@reactive.effect
34+
async def _():
35+
await md.stream(chunk_generator())
3336

3437

3538
app = App(app_ui, server)

shiny/api-examples/MarkdownStream/app-express.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
import time
1+
import asyncio
22

33
import requests
44

5+
from shiny import reactive
56
from shiny.express import session, ui
67

78
ui.page_opts(full_width=True)
@@ -14,10 +15,10 @@
1415

1516

1617
# Generate words from the README.md file (with a small delay)
17-
def chunk_generator():
18+
async def chunk_generator():
1819
for chunk in readme_chunks:
1920
if not session.is_stub_session():
20-
time.sleep(0.02)
21+
await asyncio.sleep(0.02)
2122
yield chunk + " "
2223

2324

@@ -27,4 +28,9 @@ def chunk_generator():
2728
ui.card_header("Shiny README.md")
2829
md.ui()
2930

30-
md.stream(chunk_generator())
31+
md.ui(height=300)
32+
33+
34+
@reactive.effect
35+
async def _():
36+
await md.stream(chunk_generator())

shiny/ui/_markdown_stream.py

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
from contextlib import contextmanager
2-
from typing import Iterable, Literal, Union
1+
from contextlib import asynccontextmanager
2+
from typing import AsyncIterable, Iterable, Literal, Union
33

44
from htmltools import css
55

6-
from .. import reactive
6+
from .. import _utils, reactive
77
from .._docstring import add_example
88
from .._typing_extensions import TypedDict
99
from ..session import require_active_session
@@ -125,7 +125,11 @@ def ui(
125125
height=height,
126126
)
127127

128-
def stream(self, content: Iterable[str], clear: bool = True):
128+
async def stream(
129+
self,
130+
content: Iterable[str] | AsyncIterable[str],
131+
clear: bool = True,
132+
):
129133
"""
130134
Stream content into the markdown stream.
131135
@@ -138,13 +142,15 @@ def stream(self, content: Iterable[str], clear: bool = True):
138142
Whether to clear the existing content before streaming the new content.
139143
"""
140144

145+
content = _utils.wrap_async_iterable(content)
146+
141147
@reactive.extended_task
142148
async def _task():
143149
if clear:
144-
self._replace("")
145-
with self._streaming_dot():
146-
for c in content:
147-
self._append(c)
150+
await self._replace("")
151+
async with self._streaming_dot():
152+
async for c in content:
153+
await self._append(c)
148154

149155
_task()
150156

@@ -157,31 +163,31 @@ async def _handle_error():
157163
await self._raise_exception(e)
158164
_handle_error.destroy() # type: ignore
159165

160-
def _append(self, content: str):
166+
async def _append(self, content: str):
161167
msg: ContentMessage = {
162168
"id": self.id,
163169
"content": content,
164170
"operation": "append",
165171
}
166172

167-
self._send_custom_message(msg)
173+
await self._send_custom_message(msg)
168174

169-
def _replace(self, content: str):
175+
async def _replace(self, content: str):
170176
msg: ContentMessage = {
171177
"id": self.id,
172178
"content": content,
173179
"operation": "replace",
174180
}
175181

176-
self._send_custom_message(msg)
182+
await self._send_custom_message(msg)
177183

178-
@contextmanager
179-
def _streaming_dot(self):
184+
@asynccontextmanager
185+
async def _streaming_dot(self):
180186
start: isStreamingMessage = {
181187
"id": self.id,
182188
"isStreaming": True,
183189
}
184-
self._send_custom_message(start)
190+
await self._send_custom_message(start)
185191

186192
try:
187193
yield
@@ -190,7 +196,7 @@ def _streaming_dot(self):
190196
"id": self.id,
191197
"isStreaming": False,
192198
}
193-
self._send_custom_message(end)
199+
await self._send_custom_message(end)
194200

195201
async def _raise_exception(self, e: BaseException):
196202
if self.on_error == "unhandled":
@@ -200,12 +206,12 @@ async def _raise_exception(self, e: BaseException):
200206
msg = f"Error in MarkdownStream('{self.id}'): {str(e)}"
201207
raise NotifyException(msg, sanitize=sanitize) from e
202208

203-
def _send_custom_message(self, msg: Union[ContentMessage, isStreamingMessage]):
209+
async def _send_custom_message(
210+
self, msg: Union[ContentMessage, isStreamingMessage]
211+
):
204212
if self._session.is_stub_session():
205213
return
206-
self._session._send_message_sync(
207-
{"custom": {"shinyMarkdownStreamMessage": msg}}
208-
)
214+
await self._session.send_custom_message("shinyMarkdownStreamMessage", msg)
209215

210216

211217
@add_example()

0 commit comments

Comments
 (0)