From 916d2c041c04d311f349fb0de890201ce884b70d Mon Sep 17 00:00:00 2001 From: Carson Date: Tue, 11 Mar 2025 20:49:20 -0500 Subject: [PATCH 01/21] feat(Chat): Add .start_message_stream(), .end_message_stream(), and .inject_message_chunk(). Append instead of replace messages unless transforms are used --- shiny/ui/_chat.py | 241 ++++++++++++------ shiny/ui/_chat_types.py | 22 +- .../shiny/components/chat/inject/app.py | 52 ++++ .../chat/inject/test_chat_inject.py | 29 +++ tests/pytest/test_chat.py | 96 ++++--- 5 files changed, 302 insertions(+), 138 deletions(-) create mode 100644 tests/playwright/shiny/components/chat/inject/app.py create mode 100644 tests/playwright/shiny/components/chat/inject/test_chat_inject.py diff --git a/shiny/ui/_chat.py b/shiny/ui/_chat.py index a2467a0f4..e5822c302 100644 --- a/shiny/ui/_chat.py +++ b/shiny/ui/_chat.py @@ -1,6 +1,7 @@ from __future__ import annotations import inspect +import warnings from typing import ( Any, AsyncIterable, @@ -38,7 +39,7 @@ as_provider_message, ) from ._chat_tokenizer import TokenEncoding, TokenizersEncoding, get_default_tokenizer -from ._chat_types import ChatMessage, ClientMessage, TransformedMessage +from ._chat_types import ChatMessage, ClientMessage, Role, TransformedMessage from ._html_deps_py_shiny import chat_deps from .fill import as_fill_item, as_fillable_container @@ -231,18 +232,18 @@ async def _init_chat(): @reactive.effect(priority=9999) @reactive.event(self._user_input) async def _on_user_input(): - msg = ChatMessage(content=self._user_input(), role="user") + content = self._user_input() # It's possible that during the transform, a message is appended, so get # the length now, so we can insert the new message at the right index n_pre = len(self._messages()) - msg_post = await self._transform_message(msg) - if msg_post is not None: - self._store_message(msg_post) + content, _ = await self._transform_content(content, role="user") + if content is not None: + self._store_content(content, role="user") self._suspend_input_handler = False else: # A transformed value of None is a special signal to suspend input # handling (i.e., don't generate a response) - self._store_message(as_transformed_message(msg), index=n_pre) + self._store_content(content or "", role="user", index=n_pre) await self._remove_loading_message() self._suspend_input_handler = True @@ -483,14 +484,15 @@ def messages( res: list[ChatMessage | ProviderMessage] = [] for i, m in enumerate(messages): transform = False - if m["role"] == "assistant": + if m.role == "assistant": transform = transform_assistant - elif m["role"] == "user": + elif m.role == "user": transform = transform_user == "all" or ( transform_user == "last" and i == len(messages) - 1 ) - content_key = m["transform_key" if transform else "pre_transform_key"] - chat_msg = ChatMessage(content=str(m[content_key]), role=m["role"]) + key = "transform_key" if transform else "pre_transform_key" + content_val = getattr(m, getattr(m, key)) + chat_msg = ChatMessage(content=str(content_val), role=m.role) if not isinstance(format, MISSING_TYPE): chat_msg = as_provider_message(chat_msg, format) res.append(chat_msg) @@ -550,11 +552,89 @@ async def append_message( """ await self._append_message(message, icon=icon) + async def inject_message_chunk( + self, + message_chunk: Any, + *, + operation: Literal["append", "replace"] = "append", + force: bool = False, + ): + """ + Inject a chunk of message content into the current message stream. + + Sometimes when streaming a message (i.e., `.append_message_stream()`), you may + want to inject a content into the streaming message while the stream is + busy doing other things (e.g., calling a tool). This method allows you to + inject any content you want into the current message stream (assuming one is + active). + + Parameters + ---------- + message_chunk + A message chunk to inject. + operation + Whether to append or replace the current message stream content. + force + Whether to start a new stream if one is not currently active. + """ + stream_id = self._current_stream_id + if stream_id is None: + if not force: + raise ValueError( + "Can't inject a message chunk when no message stream is active. " + "Use `force=True` to start a new stream if one is not currently active.", + ) + await self.start_message_stream(force=True) + + return await self._append_message( + message_chunk, + chunk=True, + stream_id=stream_id, + operation=operation, + ) + + async def start_message_stream(self, *, force: bool = False): + """ + Start a new message stream. + + Parameters + ---------- + force + Whether to force starting a new stream even if one is already active + """ + stream_id = self._current_stream_id + if stream_id is not None: + if not force: + raise ValueError( + "Can't start a new message stream when a message stream is already active. " + "Use `force=True` to end a currently active stream and start a new one.", + ) + await self.end_message_stream() + + id = _utils.private_random_id() + return await self._append_message("", chunk="start", stream_id=id) + + async def end_message_stream(self): + """ + End the current message stream (if any). + """ + stream_id = self._current_stream_id + if stream_id is None: + warnings.warn("No currently active stream to end.", stacklevel=2) + return + + with reactive.isolate(): + # TODO: .cancel() method should probably just handle this + self.latest_message_stream.cancel() + + return await self._append_message("", chunk="end", stream_id=stream_id) + async def _append_message( self, message: Any, *, chunk: ChunkOption = False, + operation: Literal["append", "replace"] = "append", stream_id: str | None = None, icon: HTML | Tag | TagList | None = None, ) -> None: @@ -570,27 +650,39 @@ async def _append_message( if chunk is False: msg = normalize_message(message) - chunk_content = None else: msg = normalize_message_chunk(message) - # Update the current stream message - chunk_content = msg["content"] - self._current_stream_message += chunk_content - msg["content"] = self._current_stream_message - if chunk == "end": + if operation == "replace": self._current_stream_message = "" + self._current_stream_message += msg["content"] - msg = await self._transform_message( - msg, chunk=chunk, chunk_content=chunk_content - ) - if msg is None: - return - self._store_message(msg, chunk=chunk) - await self._send_append_message( - msg, - chunk=chunk, - icon=icon, - ) + try: + content, transformed = await self._transform_content( + msg["content"], role=msg["role"], chunk=chunk + ) + # Act like nothing happened if content transformed to None + if content is None: + return + # Store if this is a whole message or the end of a streaming message + if chunk is False: + self._store_content(content, role=msg["role"]) + elif chunk == "end": + # Transforming content requires replacing all the content, so take + # it as is. Otherwise, store the accumulated stream message. + self._store_content( + content=content if transformed else self._current_stream_message, + role=msg["role"], + ) + await self._send_append_message( + content=content, + role=msg["role"], + chunk=chunk, + operation="replace" if transformed else operation, + icon=icon, + ) + finally: + if chunk == "end": + self._current_stream_message = "" async def append_message_stream( self, @@ -737,11 +829,13 @@ def _can_append_message(self, stream_id: str | None) -> bool: # Send a message to the UI async def _send_append_message( self, - message: TransformedMessage, + content: str | HTML, + role: Role, chunk: ChunkOption = False, + operation: Literal["append", "replace"] = "append", icon: HTML | Tag | TagList | None = None, ): - if message["role"] == "system": + if role == "system": # System messages are not displayed in the UI return @@ -756,15 +850,15 @@ async def _send_append_message( elif chunk == "end": chunk_type = "message_end" - content = message["content_client"] content_type = "html" if isinstance(content, HTML) else "markdown" # TODO: pass along dependencies for both content and icon (if any) msg = ClientMessage( content=str(content), - role=message["role"], + role=role, content_type=content_type, chunk_type=chunk_type, + operation=operation, ) if icon is not None: @@ -892,44 +986,35 @@ async def _transform_wrapper(content: str, chunk: str, done: bool): else: return _set_transform(fn) - async def _transform_message( + async def _transform_content( self, - message: ChatMessage, + content: str, + role: Role, chunk: ChunkOption = False, - chunk_content: str | None = None, - ) -> TransformedMessage | None: - res = as_transformed_message(message) - key = res["transform_key"] - - if message["role"] == "user" and self._transform_user is not None: - content = await self._transform_user(message["content"]) - - elif message["role"] == "assistant" and self._transform_assistant is not None: - content = await self._transform_assistant( - message["content"], - chunk_content or "", + ) -> tuple[str | HTML | None, bool]: + content2 = content + transformed = False + if role == "user" and self._transform_user is not None: + content2 = await self._transform_user(content) + transformed = True + elif role == "assistant" and self._transform_assistant is not None: + all_content = content if chunk is False else self._current_stream_message + content2 = await self._transform_assistant( + all_content, + content, chunk == "end" or chunk is False, ) - else: - return res - - if content is None: - return None - - res[key] = content # type: ignore + transformed = True - return res + return (content2, transformed) # Just before storing, handle chunk msg type and calculate tokens - def _store_message( + def _store_content( self, - message: TransformedMessage, - chunk: ChunkOption = False, + content: str | HTML, + role: Role, index: int | None = None, ) -> None: - # Don't actually store chunks until the end - if chunk is True or chunk == "start": - return None with reactive.isolate(): messages = self._messages() @@ -937,12 +1022,14 @@ def _store_message( if index is None: index = len(messages) + msg = TransformedMessage.from_content(content=content, role=role) + messages = list(messages) - messages.insert(index, message) + messages.insert(index, msg) self._messages.set(tuple(messages)) - if message["role"] == "user": - self._latest_user_input.set(message) + if role == "user": + self._latest_user_input.set(msg) return None @@ -966,9 +1053,9 @@ def _trim_messages( n_other_messages: int = 0 token_counts: list[int] = [] for m in messages: - count = self._get_token_count(m["content_server"]) + count = self._get_token_count(m.content_server) token_counts.append(count) - if m["role"] == "system": + if m.role == "system": n_system_tokens += count n_system_messages += 1 else: @@ -989,7 +1076,7 @@ def _trim_messages( n_other_messages2: int = 0 token_counts.reverse() for i, m in enumerate(reversed(messages)): - if m["role"] == "system": + if m.role == "system": messages2.append(m) continue remaining_non_system_tokens -= token_counts[i] @@ -1012,13 +1099,13 @@ def _trim_anthropic_messages( self, messages: tuple[TransformedMessage, ...], ) -> tuple[TransformedMessage, ...]: - if any(m["role"] == "system" for m in messages): + if any(m.role == "system" for m in messages): raise ValueError( "Anthropic requires a system prompt to be specified in it's `.create()` method " "(not in the chat messages with `role: system`)." ) for i, m in enumerate(messages): - if m["role"] == "user": + if m.role == "user": return messages[i:] return () @@ -1064,7 +1151,8 @@ def user_input(self, transform: bool = False) -> str | None: if msg is None: return None key = "content_server" if transform else "content_client" - return str(msg[key]) + val = getattr(msg, key) + return str(val) def _user_input(self) -> str: id = self.user_input_id @@ -1308,21 +1396,4 @@ def chat_ui( return res -def as_transformed_message(message: ChatMessage) -> TransformedMessage: - if message["role"] == "user": - transform_key = "content_server" - pre_transform_key = "content_client" - else: - transform_key = "content_client" - pre_transform_key = "content_server" - - return TransformedMessage( - content_client=message["content"], - content_server=message["content"], - role=message["role"], - transform_key=transform_key, - pre_transform_key=pre_transform_key, - ) - - CHAT_INSTANCES: WeakValueDictionary[str, Chat] = WeakValueDictionary() diff --git a/shiny/ui/_chat_types.py b/shiny/ui/_chat_types.py index b10bfdf29..11af67706 100644 --- a/shiny/ui/_chat_types.py +++ b/shiny/ui/_chat_types.py @@ -1,5 +1,6 @@ from __future__ import annotations +from dataclasses import dataclass from typing import Literal, TypedDict from htmltools import HTML @@ -17,16 +18,35 @@ class ChatMessage(TypedDict): # A message once transformed have been applied -class TransformedMessage(TypedDict): +@dataclass +class TransformedMessage: content_client: str | HTML content_server: str role: Role transform_key: Literal["content_client", "content_server"] pre_transform_key: Literal["content_client", "content_server"] + @classmethod + def from_content(cls, content: str | HTML, role: Role) -> TransformedMessage: + if role == "user": + transform_key = "content_server" + pre_transform_key = "content_client" + else: + transform_key = "content_client" + pre_transform_key = "content_server" + + return cls( + content_client=content, + content_server=str(content), + role=role, + transform_key=transform_key, + pre_transform_key=pre_transform_key, + ) + # A message that can be sent to the client class ClientMessage(ChatMessage): content_type: Literal["markdown", "html"] chunk_type: Literal["message_start", "message_end"] | None + operation: Literal["append", "replace"] icon: NotRequired[str] diff --git a/tests/playwright/shiny/components/chat/inject/app.py b/tests/playwright/shiny/components/chat/inject/app.py new file mode 100644 index 000000000..5f8566598 --- /dev/null +++ b/tests/playwright/shiny/components/chat/inject/app.py @@ -0,0 +1,52 @@ +import asyncio + +from shiny import reactive +from shiny.express import input, render, ui + +ui.page_opts(title="Hello Chat") + +chat = ui.Chat(id="chat") +chat.ui() + + +async def generator(): + yield "Starting stream..." + await asyncio.sleep(0.5) + yield "...stream complete" + + +@reactive.effect +async def _(): + await chat.append_message_stream(generator()) + + +@reactive.effect +async def _(): + await chat.inject_message_chunk("injected chunk") + + +ui.input_action_button("run_test", "Run test") + + +@reactive.effect +@reactive.event(input.run_test) +async def _(): + await chat.start_message_stream() + for chunk in ["can ", "inject ", "chunks"]: + await asyncio.sleep(0.2) + await chat.inject_message_chunk(chunk) + await chat.end_message_stream() + + +ui.input_action_button("run_test2", "Run test 2") + + +@reactive.effect +@reactive.event(input.run_test2) +async def _(): + await chat.append_message_stream(["can ", "append ", "chunks"]) + + +@render.code +def message_out(): + return str(chat.messages()) diff --git a/tests/playwright/shiny/components/chat/inject/test_chat_inject.py b/tests/playwright/shiny/components/chat/inject/test_chat_inject.py new file mode 100644 index 000000000..28d15ba53 --- /dev/null +++ b/tests/playwright/shiny/components/chat/inject/test_chat_inject.py @@ -0,0 +1,29 @@ +from playwright.sync_api import Page, expect +from utils.deploy_utils import skip_on_webkit + +from shiny.playwright import controller +from shiny.run import ShinyAppProc + + +@skip_on_webkit +def test_validate_chat_inject(page: Page, local_app: ShinyAppProc) -> None: + page.goto(local_app.url) + + TIMEOUT = 30 * 1000 + + chat = controller.Chat(page, "chat") + expect(chat.loc).to_be_visible(timeout=TIMEOUT) + + chat.expect_latest_message( + "Starting stream...injected chunk...stream complete", + timeout=TIMEOUT, + ) + + btn = controller.InputActionButton(page, "run_test") + expect(btn.loc).to_be_visible(timeout=TIMEOUT) + btn.click() + + chat.expect_latest_message( + "can inject chunks", + timeout=TIMEOUT, + ) diff --git a/tests/pytest/test_chat.py b/tests/pytest/test_chat.py index 0b04e4a6d..ba4ed71e4 100644 --- a/tests/pytest/test_chat.py +++ b/tests/pytest/test_chat.py @@ -7,14 +7,12 @@ import pytest from shiny import Session -from shiny._namespaces import Root -from shiny.module import ResolvedId +from shiny._namespaces import ResolvedId, Root from shiny.session import session_context from shiny.types import MISSING from shiny.ui import Chat -from shiny.ui._chat import as_transformed_message from shiny.ui._chat_normalize import normalize_message, normalize_message_chunk -from shiny.ui._chat_types import ChatMessage +from shiny.ui._chat_types import ChatMessage, Role, TransformedMessage # ---------------------------------------------------------------------- # Helpers @@ -43,6 +41,10 @@ def is_type_in_union(type: object, union: object) -> bool: return False +def transformed_message(content: str, role: Role): + return TransformedMessage.from_content(content=content, role=role) + + def test_chat_message_trimming(): with session_context(test_session): chat = Chat(id="chat") @@ -53,11 +55,9 @@ def generate_content(token_count: int) -> str: return " ".join(["foo" for _ in range(1, n)]) msgs = ( - as_transformed_message( - { - "content": generate_content(102), - "role": "system", - } + transformed_message( + content=generate_content(102), + role="system", ), ) @@ -66,18 +66,8 @@ def generate_content(token_count: int) -> str: chat._trim_messages(msgs, token_limits=(100, 0), format=MISSING) msgs = ( - as_transformed_message( - { - "content": generate_content(100), - "role": "system", - } - ), - as_transformed_message( - { - "content": generate_content(2), - "role": "user", - } - ), + transformed_message(content=generate_content(100), role="system"), + transformed_message(content=generate_content(2), role="user"), ) # Throws since only the system message fits @@ -93,30 +83,24 @@ def generate_content(token_count: int) -> str: content3 = generate_content(2) msgs = ( - as_transformed_message( - { - "content": content1, - "role": "system", - } + transformed_message( + content=content1, + role="system", ), - as_transformed_message( - { - "content": content2, - "role": "user", - } + transformed_message( + content=content2, + role="user", ), - as_transformed_message( - { - "content": content3, - "role": "user", - } + transformed_message( + content=content3, + role="user", ), ) # Should discard the 1st user message trimmed = chat._trim_messages(msgs, token_limits=(103, 0), format=MISSING) assert len(trimmed) == 2 - contents = [msg["content_server"] for msg in trimmed] + contents = [msg.content_server for msg in trimmed] assert contents == [content1, content3] content1 = generate_content(50) @@ -125,38 +109,48 @@ def generate_content(token_count: int) -> str: content4 = generate_content(2) msgs = ( - as_transformed_message( - {"content": content1, "role": "system"}, + transformed_message( + content=content1, + role="system", ), - as_transformed_message( - {"content": content2, "role": "user"}, + transformed_message( + content=content2, + role="user", ), - as_transformed_message( - {"content": content3, "role": "system"}, + transformed_message( + content=content3, + role="system", ), - as_transformed_message( - {"content": content4, "role": "user"}, + transformed_message( + content=content4, + role="user", ), ) # Should discard the 1st user message trimmed = chat._trim_messages(msgs, token_limits=(103, 0), format=MISSING) assert len(trimmed) == 3 - contents = [msg["content_server"] for msg in trimmed] + contents = [msg.content_server for msg in trimmed] assert contents == [content1, content3, content4] content1 = generate_content(50) content2 = generate_content(10) msgs = ( - as_transformed_message({"content": content1, "role": "assistant"}), - as_transformed_message({"content": content2, "role": "user"}), + transformed_message( + content=content1, + role="assistant", + ), + transformed_message( + content=content2, + role="user", + ), ) # Anthropic requires 1st message to be a user message trimmed = chat._trim_messages(msgs, token_limits=(30, 0), format="anthropic") assert len(trimmed) == 1 - contents = [msg["content_server"] for msg in trimmed] + contents = [msg.content_server for msg in trimmed] assert contents == [content2] @@ -404,9 +398,7 @@ def test_as_google_message(): def test_as_langchain_message(): - from langchain_core.language_models.base import ( - LanguageModelInput, - ) + from langchain_core.language_models.base import LanguageModelInput from langchain_core.language_models.base import ( Sequence as LangchainSequence, # pyright: ignore[reportPrivateImportUsage] ) From 4c32a0d90b4bc6940fe84cf907ae80cc68d11b2f Mon Sep 17 00:00:00 2001 From: Carson Date: Tue, 11 Mar 2025 22:44:24 -0500 Subject: [PATCH 02/21] Go back to a more minimal change --- shiny/ui/_chat.py | 86 ++++++++++++++++++++++++----------------- shiny/ui/_chat_types.py | 10 ++--- 2 files changed, 55 insertions(+), 41 deletions(-) diff --git a/shiny/ui/_chat.py b/shiny/ui/_chat.py index e5822c302..daff42b5b 100644 --- a/shiny/ui/_chat.py +++ b/shiny/ui/_chat.py @@ -232,18 +232,20 @@ async def _init_chat(): @reactive.effect(priority=9999) @reactive.event(self._user_input) async def _on_user_input(): - content = self._user_input() + msg = ChatMessage(content=self._user_input(), role="user") # It's possible that during the transform, a message is appended, so get # the length now, so we can insert the new message at the right index n_pre = len(self._messages()) - content, _ = await self._transform_content(content, role="user") - if content is not None: - self._store_content(content, role="user") + msg_post, _ = await self._transform_message(msg) + if msg_post is not None: + self._store_message(msg_post) self._suspend_input_handler = False else: # A transformed value of None is a special signal to suspend input # handling (i.e., don't generate a response) - self._store_content(content or "", role="user", index=n_pre) + self._store_message( + TransformedMessage.from_message(msg), index=n_pre + ) await self._remove_loading_message() self._suspend_input_handler = True @@ -657,25 +659,29 @@ async def _append_message( self._current_stream_message += msg["content"] try: - content, transformed = await self._transform_content( - msg["content"], role=msg["role"], chunk=chunk - ) + msg_t, transformed = await self._transform_message(msg, chunk=chunk) # Act like nothing happened if content transformed to None - if content is None: + if msg_t is None: return # Store if this is a whole message or the end of a streaming message if chunk is False: - self._store_content(content, role=msg["role"]) + self._store_message(msg_t) elif chunk == "end": # Transforming content requires replacing all the content, so take # it as is. Otherwise, store the accumulated stream message. - self._store_content( - content=content if transformed else self._current_stream_message, - role=msg["role"], - ) + if transformed: + self._store_message(msg_t) + else: + self._store_message( + TransformedMessage.from_message( + ChatMessage( + content=self._current_stream_message, role="assistant" + ) + ) + ) await self._send_append_message( - content=content, - role=msg["role"], + content=msg_t.content_client, + role=msg_t.role, chunk=chunk, operation="replace" if transformed else operation, icon=icon, @@ -986,33 +992,43 @@ async def _transform_wrapper(content: str, chunk: str, done: bool): else: return _set_transform(fn) - async def _transform_content( + async def _transform_message( self, - content: str, - role: Role, + message: ChatMessage, chunk: ChunkOption = False, - ) -> tuple[str | HTML | None, bool]: - content2 = content + ) -> tuple[TransformedMessage | None, bool]: + res = TransformedMessage.from_message(message) + key = res.transform_key transformed = False - if role == "user" and self._transform_user is not None: - content2 = await self._transform_user(content) + + if message["role"] == "user" and self._transform_user is not None: + content = await self._transform_user(message["content"]) transformed = True - elif role == "assistant" and self._transform_assistant is not None: - all_content = content if chunk is False else self._current_stream_message - content2 = await self._transform_assistant( + elif message["role"] == "assistant" and self._transform_assistant is not None: + all_content = ( + message["content"] if chunk is False else self._current_stream_message + ) + setattr(res, res.pre_transform_key, all_content) + content = await self._transform_assistant( all_content, - content, + message["content"], chunk == "end" or chunk is False, ) transformed = True + else: + return (res, transformed) + + if content is None: + return (None, transformed) + + setattr(res, key, content) - return (content2, transformed) + return (res, transformed) # Just before storing, handle chunk msg type and calculate tokens - def _store_content( + def _store_message( self, - content: str | HTML, - role: Role, + message: TransformedMessage, index: int | None = None, ) -> None: @@ -1022,14 +1038,12 @@ def _store_content( if index is None: index = len(messages) - msg = TransformedMessage.from_content(content=content, role=role) - messages = list(messages) - messages.insert(index, msg) + messages.insert(index, message) self._messages.set(tuple(messages)) - if role == "user": - self._latest_user_input.set(msg) + if message.role == "user": + self._latest_user_input.set(message) return None diff --git a/shiny/ui/_chat_types.py b/shiny/ui/_chat_types.py index 11af67706..f7bb282af 100644 --- a/shiny/ui/_chat_types.py +++ b/shiny/ui/_chat_types.py @@ -27,8 +27,8 @@ class TransformedMessage: pre_transform_key: Literal["content_client", "content_server"] @classmethod - def from_content(cls, content: str | HTML, role: Role) -> TransformedMessage: - if role == "user": + def from_message(cls, message: ChatMessage) -> TransformedMessage: + if message["role"] == "user": transform_key = "content_server" pre_transform_key = "content_client" else: @@ -36,9 +36,9 @@ def from_content(cls, content: str | HTML, role: Role) -> TransformedMessage: pre_transform_key = "content_server" return cls( - content_client=content, - content_server=str(content), - role=role, + content_client=message["content"], + content_server=message["content"], + role=message["role"], transform_key=transform_key, pre_transform_key=pre_transform_key, ) From 81a99d276cae823307df4c6c0d0d31dc6550fa11 Mon Sep 17 00:00:00 2001 From: Carson Date: Wed, 12 Mar 2025 09:35:34 -0500 Subject: [PATCH 03/21] Cleanup --- shiny/ui/_chat.py | 75 +++++++++++++++++++++++------------------------ 1 file changed, 36 insertions(+), 39 deletions(-) diff --git a/shiny/ui/_chat.py b/shiny/ui/_chat.py index daff42b5b..fe6c55307 100644 --- a/shiny/ui/_chat.py +++ b/shiny/ui/_chat.py @@ -236,16 +236,14 @@ async def _on_user_input(): # It's possible that during the transform, a message is appended, so get # the length now, so we can insert the new message at the right index n_pre = len(self._messages()) - msg_post, _ = await self._transform_message(msg) + msg_post = await self._transform_message(msg) if msg_post is not None: self._store_message(msg_post) self._suspend_input_handler = False else: # A transformed value of None is a special signal to suspend input # handling (i.e., don't generate a response) - self._store_message( - TransformedMessage.from_message(msg), index=n_pre - ) + self._store_message(msg, index=n_pre) await self._remove_loading_message() self._suspend_input_handler = True @@ -659,31 +657,28 @@ async def _append_message( self._current_stream_message += msg["content"] try: - msg_t, transformed = await self._transform_message(msg, chunk=chunk) - # Act like nothing happened if content transformed to None - if msg_t is None: + msg = await self._transform_message(msg, chunk=chunk) + # Act like nothing happened if transformed to None + if msg is None: return - # Store if this is a whole message or the end of a streaming message - if chunk is False: - self._store_message(msg_t) + msg_store = msg + # Transforming requires *replacing* content + if isinstance(msg, TransformedMessage): + operation = "replace" elif chunk == "end": - # Transforming content requires replacing all the content, so take - # it as is. Otherwise, store the accumulated stream message. - if transformed: - self._store_message(msg_t) - else: - self._store_message( - TransformedMessage.from_message( - ChatMessage( - content=self._current_stream_message, role="assistant" - ) - ) - ) + # When not transforming, ensure full message is stored + msg_store = ChatMessage( + content=self._current_stream_message, + role="assistant", + ) + # Only store full messages + if chunk is False or chunk == "end": + self._store_message(msg_store) + # Send the message to the client await self._send_append_message( - content=msg_t.content_client, - role=msg_t.role, + message=msg, chunk=chunk, - operation="replace" if transformed else operation, + operation=operation, icon=icon, ) finally: @@ -835,13 +830,15 @@ def _can_append_message(self, stream_id: str | None) -> bool: # Send a message to the UI async def _send_append_message( self, - content: str | HTML, - role: Role, + message: TransformedMessage | ChatMessage, chunk: ChunkOption = False, operation: Literal["append", "replace"] = "append", icon: HTML | Tag | TagList | None = None, ): - if role == "system": + if not isinstance(message, TransformedMessage): + message = TransformedMessage.from_message(message) + + if message.role == "system": # System messages are not displayed in the UI return @@ -856,12 +853,13 @@ async def _send_append_message( elif chunk == "end": chunk_type = "message_end" + content = message.content_client content_type = "html" if isinstance(content, HTML) else "markdown" # TODO: pass along dependencies for both content and icon (if any) msg = ClientMessage( content=str(content), - role=role, + role=message.role, content_type=content_type, chunk_type=chunk_type, operation=operation, @@ -996,14 +994,11 @@ async def _transform_message( self, message: ChatMessage, chunk: ChunkOption = False, - ) -> tuple[TransformedMessage | None, bool]: + ) -> ChatMessage | TransformedMessage | None: res = TransformedMessage.from_message(message) - key = res.transform_key - transformed = False if message["role"] == "user" and self._transform_user is not None: content = await self._transform_user(message["content"]) - transformed = True elif message["role"] == "assistant" and self._transform_assistant is not None: all_content = ( message["content"] if chunk is False else self._current_stream_message @@ -1014,24 +1009,26 @@ async def _transform_message( message["content"], chunk == "end" or chunk is False, ) - transformed = True else: - return (res, transformed) + return message if content is None: - return (None, transformed) + return None - setattr(res, key, content) + setattr(res, res.transform_key, content) - return (res, transformed) + return res # Just before storing, handle chunk msg type and calculate tokens def _store_message( self, - message: TransformedMessage, + message: TransformedMessage | ChatMessage, index: int | None = None, ) -> None: + if not isinstance(message, TransformedMessage): + message = TransformedMessage.from_message(message) + with reactive.isolate(): messages = self._messages() From 08c104a70546ba4f7e8eba67d47626e9c3147b9c Mon Sep 17 00:00:00 2001 From: Carson Date: Wed, 12 Mar 2025 10:21:41 -0500 Subject: [PATCH 04/21] Polish API and clarify behavior --- shiny/ui/_chat.py | 74 ++++++++++++++++++++++++----------------------- 1 file changed, 38 insertions(+), 36 deletions(-) diff --git a/shiny/ui/_chat.py b/shiny/ui/_chat.py index fe6c55307..d3390038e 100644 --- a/shiny/ui/_chat.py +++ b/shiny/ui/_chat.py @@ -39,7 +39,7 @@ as_provider_message, ) from ._chat_tokenizer import TokenEncoding, TokenizersEncoding, get_default_tokenizer -from ._chat_types import ChatMessage, ClientMessage, Role, TransformedMessage +from ._chat_types import ChatMessage, ClientMessage, TransformedMessage from ._html_deps_py_shiny import chat_deps from .fill import as_fill_item, as_fillable_container @@ -193,6 +193,8 @@ def __init__( self._current_stream_id: str | None = None self._pending_messages: list[PendingMessage] = [] + self._manual_stream_id: str | None = None + # If a user input message is transformed into a response, we need to cancel # the next user input submit handling self._suspend_input_handler: bool = False @@ -552,7 +554,7 @@ async def append_message( """ await self._append_message(message, icon=icon) - async def inject_message_chunk( + async def append_message_chunk( self, message_chunk: Any, *, @@ -560,13 +562,11 @@ async def inject_message_chunk( force: bool = False, ): """ - Inject a chunk of message content into the current message stream. + Append a message chunk to the current message stream. - Sometimes when streaming a message (i.e., `.append_message_stream()`), you may - want to inject a content into the streaming message while the stream is - busy doing other things (e.g., calling a tool). This method allows you to - inject any content you want into the current message stream (assuming one is - active). + Append a chunk of message content to either the currently running + `.append_message_stream()` or to one that was manually started with + `.start_message_stream()`. Parameters ---------- @@ -577,14 +577,16 @@ async def inject_message_chunk( force Whether to start a new stream if one is not currently active. """ - stream_id = self._current_stream_id + # Can append to either an active `.start_message_stream()` or a + # # `.append_message_stream()` + stream_id = self._manual_stream_id or self._current_stream_id if stream_id is None: if not force: raise ValueError( - "Can't inject a message chunk when no message stream is active. " - "Use `force=True` to start a new stream if one is not currently active.", + "Can't append a message chunk without an active message stream. " + "Use `force=True` to start a new message stream if one is not currently active.", ) - await self.start_message_stream(force=True) + await self.start_message_stream() return await self._append_message( message_chunk, @@ -593,41 +595,41 @@ async def inject_message_chunk( operation=operation, ) - async def start_message_stream(self, *, force: bool = False): + async def start_message_stream(self): """ Start a new message stream. - Parameters - ---------- - force - Whether to force starting a new stream even if one is already active + Starts a new message stream which can then be appended to using + `.append_message_chunk()`. """ - stream_id = self._current_stream_id - if stream_id is not None: - if not force: - raise ValueError( - "Can't start a new message stream when a message stream is already active. " - "Use `force=True` to end a currently active stream and start a new one.", - ) - await self.end_message_stream() - - id = _utils.private_random_id() - return await self._append_message("", chunk="start", stream_id=id) + # Since `._append_message()` manages a queue of message streams, we can just + # start a new stream here. Note that, if a stream is already active, this + # stream should start once the current stream ends. + stream_id = _utils.private_random_id() + # Separately track the stream id so ``.append_message_chunk()``/`.end_message_stream()` + self._manual_stream_id = stream_id + return await self._append_message( + "", + chunk="start", + stream_id=stream_id, + ) async def end_message_stream(self): """ End the current message stream (if any). + + Ends a message stream that was started with `.start_message_stream()`. """ - stream_id = self._current_stream_id + stream_id = self._manual_stream_id if stream_id is None: warnings.warn("No currently active stream to end.", stacklevel=2) return - with reactive.isolate(): - # TODO: .cancel() method should probably just handle this - self.latest_message_stream.cancel() - - return await self._append_message("", chunk="end", stream_id=stream_id) + return await self._append_message( + "", + chunk="end", + stream_id=stream_id, + ) async def _append_message( self, @@ -771,8 +773,8 @@ def latest_message_stream(self) -> reactive.ExtendedTask[[], str]: """ React to changes in the latest message stream. - Reactively reads for the :class:`~shiny.reactive.ExtendedTask` behind the - latest message stream. + Reactively reads for the :class:`~shiny.reactive.ExtendedTask` behind an + `.append_message_stream()`. From the return value (i.e., the extended task), you can then: From edc30f02be2de933bdd5124b972b55162cff0385 Mon Sep 17 00:00:00 2001 From: Carson Date: Wed, 12 Mar 2025 11:37:19 -0500 Subject: [PATCH 05/21] Fix test --- tests/playwright/shiny/components/chat/inject/app.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/playwright/shiny/components/chat/inject/app.py b/tests/playwright/shiny/components/chat/inject/app.py index 5f8566598..7ddb6db27 100644 --- a/tests/playwright/shiny/components/chat/inject/app.py +++ b/tests/playwright/shiny/components/chat/inject/app.py @@ -22,7 +22,7 @@ async def _(): @reactive.effect async def _(): - await chat.inject_message_chunk("injected chunk") + await chat.append_message_chunk("injected chunk") ui.input_action_button("run_test", "Run test") @@ -34,7 +34,7 @@ async def _(): await chat.start_message_stream() for chunk in ["can ", "inject ", "chunks"]: await asyncio.sleep(0.2) - await chat.inject_message_chunk(chunk) + await chat.append_message_chunk(chunk) await chat.end_message_stream() From 648d9cc20277f274aea195149708161a19a181ed Mon Sep 17 00:00:00 2001 From: Carson Date: Thu, 13 Mar 2025 12:22:28 -0500 Subject: [PATCH 06/21] wip first pass at properly nested streams --- shiny/ui/_chat.py | 149 +++++++++++++----- .../shiny/components/chat/inject/app.py | 59 ++++--- 2 files changed, 143 insertions(+), 65 deletions(-) diff --git a/shiny/ui/_chat.py b/shiny/ui/_chat.py index 6f68ced39..b38ae7ed3 100644 --- a/shiny/ui/_chat.py +++ b/shiny/ui/_chat.py @@ -2,6 +2,7 @@ import inspect import warnings +from contextlib import asynccontextmanager from typing import ( Any, AsyncIterable, @@ -202,7 +203,11 @@ def __init__( self._current_stream_id: str | None = None self._pending_messages: list[PendingMessage] = [] + # Identifier for a manual stream (i.e., one started with `.start_message_stream()`) self._manual_stream_id: str | None = None + # If a manual stream gets nested within another stream, we need to keep track of + # the accumulated message separately + self._nested_stream_message: str = "" # If a user input message is transformed into a response, we need to cancel # the next user input submit handling @@ -578,34 +583,40 @@ async def append_message_chunk( message_chunk: Any, *, operation: Literal["append", "replace"] = "append", - force: bool = False, ): """ Append a message chunk to the current message stream. - Append a chunk of message content to either the currently running - `.append_message_stream()` or to one that was manually started with - `.start_message_stream()`. + Append a chunk of message content to either a stream started with + `.message_stream()` or an active `.append_message_stream()`. Parameters ---------- message_chunk A message chunk to inject. operation - Whether to append or replace the current message stream content. - force - Whether to start a new stream if one is not currently active. + Whether to append or replace the *current* message stream content. + + Note + ---- + A useful pattern for displaying tool calls in a chat is for the tools to display + content using an "inner" `.message_stream()` while the response generation is + happening in an "outer" `.append_message_stream()`. This allows the inner stream + to display "ephemeral" content, then eventually show a final state with + `.append_message_chunk(operation="replace")`. + + Raises + ------ + ValueError + If there is active stream (i.e., no `.message_stream()` or + `.append_message_stream()`) """ - # Can append to either an active `.start_message_stream()` or a - # # `.append_message_stream()` - stream_id = self._manual_stream_id or self._current_stream_id + stream_id = self._current_stream_id if stream_id is None: - if not force: - raise ValueError( - "Can't append a message chunk without an active message stream. " - "Use `force=True` to start a new message stream if one is not currently active.", - ) - await self.start_message_stream() + raise ValueError( + "Can't .append_message_chunk() without an active message stream. " + "Use .message_stream() or .append_message_stream() to start one." + ) return await self._append_message( message_chunk, @@ -614,40 +625,84 @@ async def append_message_chunk( operation=operation, ) - async def start_message_stream(self): + @asynccontextmanager + async def message_stream(self): """ - Start a new message stream. + Message stream context manager. + + A context manager for streaming messages into the chat. Note this stream + can occur within a longer running `.append_message_stream()` or used on its own. - Starts a new message stream which can then be appended to using - `.append_message_chunk()`. + Note + ---- + A useful pattern for displaying tool calls in a chat interface is for the + tool to display using `.message_stream()` while the the response generation + is happening through `.append_message_stream()`. This allows the inner stream + to display "ephemeral" content, then eventually show a final state + with `.append_message_chunk(operation="replace")`. """ - # Since `._append_message()` manages a queue of message streams, we can just - # start a new stream here. Note that, if a stream is already active, this - # stream should start once the current stream ends. - stream_id = _utils.private_random_id() - # Separately track the stream id so ``.append_message_chunk()``/`.end_message_stream()` - self._manual_stream_id = stream_id + await self._start_stream() + try: + yield + finally: + await self._end_stream() + + async def _start_stream(self): + if self._manual_stream_id is not None: + # TODO: support this? + raise ValueError("Nested .message_stream() isn't currently supported.") + # If we're currently streaming (i.e., through append_message_stream()), then + # end the client message stream (since we start a new one below) + if self._current_stream_id is not None: + await self._send_append_message( + message=ChatMessage(content="", role="assistant"), + chunk="end", + operation="append", + ) + # Regardless whether this is an "inner" stream, we start a new message on the + # client so it can handle `operation="replace"` without having to track where + # the inner stream started. + self._manual_stream_id = _utils.private_random_id() + stream_id = self._current_stream_id or self._manual_stream_id return await self._append_message( "", chunk="start", stream_id=stream_id, + # TODO: find a cleaner way to do this, and remove the gap between the messages + icon=( + HTML("") + if self._is_nested_stream + else None + ), ) - async def end_message_stream(self): - """ - End the current message stream (if any). - - Ends a message stream that was started with `.start_message_stream()`. - """ - stream_id = self._manual_stream_id - if stream_id is None: - warnings.warn("No currently active stream to end.", stacklevel=2) + async def _end_stream(self): + if self._manual_stream_id is None and self._current_stream_id is None: + warnings.warn( + "Tried to end a message stream, but one isn't currently active.", + stacklevel=2, + ) return - return await self._append_message( - "", - chunk="end", - stream_id=stream_id, + if self._is_nested_stream: + # If inside another stream, just update server-side message state + self._current_stream_message += self._nested_stream_message + self._nested_stream_message = "" + else: + # Otherwise, end this "manual" message stream + await self._append_message( + "", chunk="end", stream_id=self._manual_stream_id + ) + + self._manual_stream_id = None + return + + @property + def _is_nested_stream(self): + return ( + self._current_stream_id is not None + and self._manual_stream_id is not None + and self._current_stream_id != self._manual_stream_id ) async def _append_message( @@ -673,9 +728,14 @@ async def _append_message( msg = normalize_message(message) else: msg = normalize_message_chunk(message) - if operation == "replace": - self._current_stream_message = "" - self._current_stream_message += msg.content + if self._is_nested_stream: + if operation == "replace": + self._nested_stream_message = "" + self._nested_stream_message += msg.content + else: + if operation == "replace": + self._current_stream_message = "" + self._current_stream_message += msg.content try: msg = await self._transform_message(msg, chunk=chunk) @@ -704,7 +764,10 @@ async def _append_message( ) finally: if chunk == "end": - self._current_stream_message = "" + if self._is_nested_stream: + self._nested_stream_message = "" + else: + self._current_stream_message = "" async def append_message_stream( self, diff --git a/tests/playwright/shiny/components/chat/inject/app.py b/tests/playwright/shiny/components/chat/inject/app.py index 7ddb6db27..42093e04a 100644 --- a/tests/playwright/shiny/components/chat/inject/app.py +++ b/tests/playwright/shiny/components/chat/inject/app.py @@ -9,42 +9,57 @@ chat.ui() -async def generator(): - yield "Starting stream..." - await asyncio.sleep(0.5) - yield "...stream complete" - - +# Launch a stream on load @reactive.effect async def _(): - await chat.append_message_stream(generator()) + await chat.append_message_stream(mock_stream()) -@reactive.effect -async def _(): - await chat.append_message_chunk("injected chunk") +async def mock_stream(): + yield "Starting outer stream...\n\n" + await asyncio.sleep(0.5) + await mock_tool() + await asyncio.sleep(0.5) + yield "\n\n...outer stream complete" -ui.input_action_button("run_test", "Run test") +# While the "outer" `.append_message_stream()` is running, +# start an "inner" stream with .message_stream() +async def mock_tool(): + steps = [ + "Starting inner stream 🔄...\n\n", + "Progress: 0%...", + "Progress: 50%...", + "Progress: 100%...", + ] + async with chat.message_stream(): + for chunk in steps: + await chat.append_message_chunk(chunk) + await asyncio.sleep(0.5) + await chat.append_message_chunk( + "Completed inner stream ✅", + operation="replace", + ) -@reactive.effect -@reactive.event(input.run_test) -async def _(): - await chat.start_message_stream() - for chunk in ["can ", "inject ", "chunks"]: - await asyncio.sleep(0.2) - await chat.append_message_chunk(chunk) - await chat.end_message_stream() +@chat.on_user_submit +async def _(user_input: str): + await chat.append_message_stream(f"You said: {user_input}") -ui.input_action_button("run_test2", "Run test 2") +ui.input_action_button("add_stream_basic", "Add .message_stream()") @reactive.effect -@reactive.event(input.run_test2) +@reactive.event(input.add_stream_basic) async def _(): - await chat.append_message_stream(["can ", "append ", "chunks"]) + async with chat.message_stream(): + await chat.append_message_chunk("Running test...") + await asyncio.sleep(1) + await chat.append_message_chunk("Test complete!") + + +# TODO: more tests, like submitting input, etc. @render.code From 3786b792f6d7f3ce179884adf2b7411dfd10122e Mon Sep 17 00:00:00 2001 From: Carson Date: Thu, 13 Mar 2025 17:54:33 -0500 Subject: [PATCH 07/21] Support nested streams and simplify logic --- shiny/ui/_chat.py | 208 ++++++++---------- .../shiny/components/chat/inject/app.py | 49 +++-- 2 files changed, 122 insertions(+), 135 deletions(-) diff --git a/shiny/ui/_chat.py b/shiny/ui/_chat.py index b38ae7ed3..f86390573 100644 --- a/shiny/ui/_chat.py +++ b/shiny/ui/_chat.py @@ -1,7 +1,6 @@ from __future__ import annotations import inspect -import warnings from contextlib import asynccontextmanager from typing import ( Any, @@ -81,9 +80,7 @@ UserSubmitFunction1, ] -ChunkOption = Literal["start", "end", True, False] - -PendingMessage = Tuple[Any, ChunkOption, Union[str, None]] +PendingMessage = Tuple[Any, Literal["start", "end", True], Union[str, None]] @add_example(ex_dir="../templates/chat/starters/hello") @@ -199,15 +196,12 @@ def __init__( self.on_error = on_error # Chunked messages get accumulated (using this property) before changing state - self._current_stream_message = "" + self._current_stream_message: str = "" self._current_stream_id: str | None = None self._pending_messages: list[PendingMessage] = [] - # Identifier for a manual stream (i.e., one started with `.start_message_stream()`) - self._manual_stream_id: str | None = None - # If a manual stream gets nested within another stream, we need to keep track of - # the accumulated message separately - self._nested_stream_message: str = "" + # For tracking message stream state when entering/exiting nested streams + self._message_stream_checkpoint: str = "" # If a user input message is transformed into a response, we need to cancel # the next user input submit handling @@ -576,7 +570,16 @@ async def append_message( similar) is specified in model's completion method. ::: """ - await self._append_message(message, icon=icon) + msg = normalize_message(message) + msg = await self._transform_message(msg) + if msg is None: + return + self._store_message(msg) + await self._send_append_message( + message=msg, + chunk=False, + icon=icon, + ) async def append_message_chunk( self, @@ -618,9 +621,8 @@ async def append_message_chunk( "Use .message_stream() or .append_message_stream() to start one." ) - return await self._append_message( + return await self._append_message_chunk( message_chunk, - chunk=True, stream_id=stream_id, operation=operation, ) @@ -641,75 +643,39 @@ async def message_stream(self): to display "ephemeral" content, then eventually show a final state with `.append_message_chunk(operation="replace")`. """ - await self._start_stream() + # Save the current stream state in a checkpoint (so that we can handle + # ``.append_message_chunk(operation="replace")` correctly) + old_checkpoint = self._message_stream_checkpoint + self._message_stream_checkpoint = self._current_stream_message + + # No stream currently exists, start one + is_root_stream = not self._current_stream_id + if is_root_stream: + await self._append_message_chunk( + "", + chunk="start", + stream_id=_utils.private_random_id(), + ) + try: yield finally: - await self._end_stream() - - async def _start_stream(self): - if self._manual_stream_id is not None: - # TODO: support this? - raise ValueError("Nested .message_stream() isn't currently supported.") - # If we're currently streaming (i.e., through append_message_stream()), then - # end the client message stream (since we start a new one below) - if self._current_stream_id is not None: - await self._send_append_message( - message=ChatMessage(content="", role="assistant"), - chunk="end", - operation="append", - ) - # Regardless whether this is an "inner" stream, we start a new message on the - # client so it can handle `operation="replace"` without having to track where - # the inner stream started. - self._manual_stream_id = _utils.private_random_id() - stream_id = self._current_stream_id or self._manual_stream_id - return await self._append_message( - "", - chunk="start", - stream_id=stream_id, - # TODO: find a cleaner way to do this, and remove the gap between the messages - icon=( - HTML("") - if self._is_nested_stream - else None - ), - ) - - async def _end_stream(self): - if self._manual_stream_id is None and self._current_stream_id is None: - warnings.warn( - "Tried to end a message stream, but one isn't currently active.", - stacklevel=2, - ) - return - - if self._is_nested_stream: - # If inside another stream, just update server-side message state - self._current_stream_message += self._nested_stream_message - self._nested_stream_message = "" - else: - # Otherwise, end this "manual" message stream - await self._append_message( - "", chunk="end", stream_id=self._manual_stream_id - ) - - self._manual_stream_id = None - return - - @property - def _is_nested_stream(self): - return ( - self._current_stream_id is not None - and self._manual_stream_id is not None - and self._current_stream_id != self._manual_stream_id - ) + # Restore the previous stream state + self._message_stream_checkpoint = old_checkpoint + + # If this was the root stream, end it + if is_root_stream: + await self._append_message_chunk( + "", + chunk="end", + stream_id=self._current_stream_id, + ) - async def _append_message( + async def _append_message_chunk( self, message: Any, *, - chunk: ChunkOption = False, + chunk: Literal[True, "start", "end"] = True, operation: Literal["append", "replace"] = "append", stream_id: str | None = None, icon: HTML | Tag | TagList | None = None, @@ -724,37 +690,40 @@ async def _append_message( if chunk == "end": self._current_stream_id = None - if chunk is False: - msg = normalize_message(message) + # Normalize into a ChatMessage() + msg = normalize_message_chunk(message) + + # Remember this content chunk for passing to transformer + this_chunk = msg.content + + # Transforming requires replacing + if self._needs_transform(msg): + operation = "replace" + + if operation == "replace": + # Replace up to the latest checkpoint + self._current_stream_message = self._message_stream_checkpoint + this_chunk + msg.content = self._current_stream_message else: - msg = normalize_message_chunk(message) - if self._is_nested_stream: - if operation == "replace": - self._nested_stream_message = "" - self._nested_stream_message += msg.content - else: - if operation == "replace": - self._current_stream_message = "" - self._current_stream_message += msg.content + self._current_stream_message += msg.content try: - msg = await self._transform_message(msg, chunk=chunk) - # Act like nothing happened if transformed to None - if msg is None: - return - msg_store = msg - # Transforming requires *replacing* content - if isinstance(msg, TransformedMessage): - operation = "replace" + if self._needs_transform(msg): + msg = await self._transform_message( + msg, chunk=chunk, chunk_content=this_chunk + ) + # Act like nothing happened if transformed to None + if msg is None: + return + if chunk == "end": + self._store_message(msg) elif chunk == "end": - # When not transforming, ensure full message is stored - msg_store = ChatMessage( - content=self._current_stream_message, - role="assistant", + # When `operation="append"`, msg.content is just a chunk, but we must + # store the full message + self._store_message( + ChatMessage(content=self._current_stream_message, role=msg.role) ) - # Only store full messages - if chunk is False or chunk == "end": - self._store_message(msg_store) + # Send the message to the client await self._send_append_message( message=msg, @@ -764,10 +733,8 @@ async def _append_message( ) finally: if chunk == "end": - if self._is_nested_stream: - self._nested_stream_message = "" - else: - self._current_stream_message = "" + self._current_stream_message = "" + self._message_stream_checkpoint = "" async def append_message_stream( self, @@ -898,21 +865,21 @@ async def _append_message_stream( id = _utils.private_random_id() empty = ChatMessageDict(content="", role="assistant") - await self._append_message(empty, chunk="start", stream_id=id, icon=icon) + await self._append_message_chunk(empty, chunk="start", stream_id=id, icon=icon) try: async for msg in message: - await self._append_message(msg, chunk=True, stream_id=id) + await self._append_message_chunk(msg, chunk=True, stream_id=id) return self._current_stream_message finally: - await self._append_message(empty, chunk="end", stream_id=id) + await self._append_message_chunk(empty, chunk="end", stream_id=id) await self._flush_pending_messages() async def _flush_pending_messages(self): still_pending: list[PendingMessage] = [] for msg, chunk, stream_id in self._pending_messages: if self._can_append_message(stream_id): - await self._append_message(msg, chunk=chunk, stream_id=stream_id) + await self._append_message_chunk(msg, chunk=chunk, stream_id=stream_id) else: still_pending.append((msg, chunk, stream_id)) self._pending_messages = still_pending @@ -926,7 +893,7 @@ def _can_append_message(self, stream_id: str | None) -> bool: async def _send_append_message( self, message: TransformedMessage | ChatMessage, - chunk: ChunkOption = False, + chunk: Literal["start", "end", True, False] = False, operation: Literal["append", "replace"] = "append", icon: HTML | Tag | TagList | None = None, ): @@ -1092,32 +1059,35 @@ async def _transform_wrapper(content: str, chunk: str, done: bool): async def _transform_message( self, message: ChatMessage, - chunk: ChunkOption = False, - ) -> ChatMessage | TransformedMessage | None: + chunk: Literal["start", "end", True, False] = False, + chunk_content: str = "", + ) -> TransformedMessage | None: res = TransformedMessage.from_chat_message(message) if message.role == "user" and self._transform_user is not None: content = await self._transform_user(message.content) elif message.role == "assistant" and self._transform_assistant is not None: - all_content = ( - message.content if chunk is False else self._current_stream_message - ) - setattr(res, res.pre_transform_key, all_content) content = await self._transform_assistant( - all_content, message.content, + chunk_content, chunk == "end" or chunk is False, ) else: - return message + return res if content is None: return None setattr(res, res.transform_key, content) - return res + def _needs_transform(self, message: ChatMessage) -> bool: + if message.role == "user" and self._transform_user is not None: + return True + elif message.role == "assistant" and self._transform_assistant is not None: + return True + return False + # Just before storing, handle chunk msg type and calculate tokens def _store_message( self, diff --git a/tests/playwright/shiny/components/chat/inject/app.py b/tests/playwright/shiny/components/chat/inject/app.py index 42093e04a..c3c7c2db3 100644 --- a/tests/playwright/shiny/components/chat/inject/app.py +++ b/tests/playwright/shiny/components/chat/inject/app.py @@ -3,7 +3,7 @@ from shiny import reactive from shiny.express import input, render, ui -ui.page_opts(title="Hello Chat") +ui.page_opts(title="Hello message streams") chat = ui.Chat(id="chat") chat.ui() @@ -15,30 +15,47 @@ async def _(): await chat.append_message_stream(mock_stream()) +SLEEP_TIME = 0.25 + + async def mock_stream(): yield "Starting outer stream...\n\n" - await asyncio.sleep(0.5) + await asyncio.sleep(SLEEP_TIME) await mock_tool() - await asyncio.sleep(0.5) + await asyncio.sleep(SLEEP_TIME) yield "\n\n...outer stream complete" -# While the "outer" `.append_message_stream()` is running, -# start an "inner" stream with .message_stream() async def mock_tool(): - steps = [ - "Starting inner stream 🔄...\n\n", - "Progress: 0%...", - "Progress: 50%...", - "Progress: 100%...", - ] + # While the "outer" `.append_message_stream()` is running, + # start an "inner" stream with .message_stream() async with chat.message_stream(): - for chunk in steps: - await chat.append_message_chunk(chunk) - await asyncio.sleep(0.5) + await chat.append_message_chunk("\n\nStarting inner stream 1 🔄...") + await asyncio.sleep(SLEEP_TIME) + await chat.append_message_chunk("Progress: 0%") + await asyncio.sleep(SLEEP_TIME) + + async with chat.message_stream(): + await chat.append_message_chunk("\n\nStarting nested stream 2 🔄...") + await asyncio.sleep(SLEEP_TIME) + await chat.append_message_chunk("Progress: 0%") + await asyncio.sleep(SLEEP_TIME) + await chat.append_message_chunk(" Progress: 50%") + await asyncio.sleep(SLEEP_TIME) + await chat.append_message_chunk(" Progress: 100%") + await asyncio.sleep(SLEEP_TIME) + await chat.append_message_chunk( + "\n\nCompleted _another_ inner stream ✅", operation="replace" + ) + + await chat.append_message_chunk("\n\nBack to stream 1...") + await chat.append_message_chunk(" Progress: 50%") + await asyncio.sleep(SLEEP_TIME) + await chat.append_message_chunk(" Progress: 100%") + await asyncio.sleep(SLEEP_TIME) + await chat.append_message_chunk( - "Completed inner stream ✅", - operation="replace", + "\n\nCompleted inner _and nested_ stream ✅", operation="replace" ) From 260f9022fcb84e6ca6c74dfbc32f2984ace97363 Mon Sep 17 00:00:00 2001 From: Carson Date: Fri, 14 Mar 2025 10:28:10 -0500 Subject: [PATCH 08/21] .append_message() should also queue when a stream is active --- shiny/ui/_chat.py | 38 +++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/shiny/ui/_chat.py b/shiny/ui/_chat.py index f86390573..4c57576e1 100644 --- a/shiny/ui/_chat.py +++ b/shiny/ui/_chat.py @@ -80,7 +80,9 @@ UserSubmitFunction1, ] -PendingMessage = Tuple[Any, Literal["start", "end", True], Union[str, None]] +ChunkOption = Literal["start", "end", True, False] + +PendingMessage = Tuple[Any, ChunkOption, Union[str, None]] @add_example(ex_dir="../templates/chat/starters/hello") @@ -570,6 +572,11 @@ async def append_message( similar) is specified in model's completion method. ::: """ + # If we're in a stream, queue the message + if self._current_stream_id: + self._pending_messages.append((message, False, None)) + return + msg = normalize_message(message) msg = await self._transform_message(msg) if msg is None: @@ -668,7 +675,7 @@ async def message_stream(self): await self._append_message_chunk( "", chunk="end", - stream_id=self._current_stream_id, + stream_id=cast(str, self._current_stream_id), ) async def _append_message_chunk( @@ -676,12 +683,12 @@ async def _append_message_chunk( message: Any, *, chunk: Literal[True, "start", "end"] = True, + stream_id: str, operation: Literal["append", "replace"] = "append", - stream_id: str | None = None, icon: HTML | Tag | TagList | None = None, ) -> None: - # If currently we're in a stream, handle other messages (outside the stream) later - if not self._can_append_message(stream_id): + # If currently we're in a *different* stream, queue the message chunk + if self._current_stream_id and self._current_stream_id != stream_id: self._pending_messages.append((message, chunk, stream_id)) return @@ -876,24 +883,21 @@ async def _append_message_stream( await self._flush_pending_messages() async def _flush_pending_messages(self): - still_pending: list[PendingMessage] = [] - for msg, chunk, stream_id in self._pending_messages: - if self._can_append_message(stream_id): - await self._append_message_chunk(msg, chunk=chunk, stream_id=stream_id) + pending = self._pending_messages + self._pending_messages = [] + for msg, chunk, stream_id in pending: + if chunk is False: + await self.append_message(msg) else: - still_pending.append((msg, chunk, stream_id)) - self._pending_messages = still_pending - - def _can_append_message(self, stream_id: str | None) -> bool: - if self._current_stream_id is None: - return True - return self._current_stream_id == stream_id + await self._append_message_chunk( + msg, chunk=chunk, stream_id=cast(str, stream_id) + ) # Send a message to the UI async def _send_append_message( self, message: TransformedMessage | ChatMessage, - chunk: Literal["start", "end", True, False] = False, + chunk: ChunkOption = False, operation: Literal["append", "replace"] = "append", icon: HTML | Tag | TagList | None = None, ): From bd6d40f8d95711ca6e488237ce857c27fceef79e Mon Sep 17 00:00:00 2001 From: Carson Date: Fri, 14 Mar 2025 11:12:55 -0500 Subject: [PATCH 09/21] Fix/simplify transform logic --- shiny/ui/_chat.py | 25 ++++++++++--------------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/shiny/ui/_chat.py b/shiny/ui/_chat.py index 4c57576e1..2eff2ef25 100644 --- a/shiny/ui/_chat.py +++ b/shiny/ui/_chat.py @@ -692,32 +692,26 @@ async def _append_message_chunk( self._pending_messages.append((message, chunk, stream_id)) return - # Update current stream state self._current_stream_id = stream_id - if chunk == "end": - self._current_stream_id = None - # Normalize into a ChatMessage() + # Normalize various message types into a ChatMessage() msg = normalize_message_chunk(message) - # Remember this content chunk for passing to transformer - this_chunk = msg.content - - # Transforming requires replacing - if self._needs_transform(msg): - operation = "replace" - if operation == "replace": - # Replace up to the latest checkpoint - self._current_stream_message = self._message_stream_checkpoint + this_chunk + self._current_stream_message = self._message_stream_checkpoint + msg.content msg.content = self._current_stream_message else: self._current_stream_message += msg.content try: if self._needs_transform(msg): + # Transforming may change the meaning of msg.content to be a *replace* + # not *append*. So, update msg.content and the operation accordingly. + chunk_content = msg.content + msg.content = self._current_stream_message + operation = "replace" msg = await self._transform_message( - msg, chunk=chunk, chunk_content=this_chunk + msg, chunk=chunk, chunk_content=chunk_content ) # Act like nothing happened if transformed to None if msg is None: @@ -740,6 +734,7 @@ async def _append_message_chunk( ) finally: if chunk == "end": + self._current_stream_id = None self._current_stream_message = "" self._message_stream_checkpoint = "" @@ -1063,7 +1058,7 @@ async def _transform_wrapper(content: str, chunk: str, done: bool): async def _transform_message( self, message: ChatMessage, - chunk: Literal["start", "end", True, False] = False, + chunk: ChunkOption = False, chunk_content: str = "", ) -> TransformedMessage | None: res = TransformedMessage.from_chat_message(message) From 0c2e9ecf7998aa1b6204cef689aa2b18d012b22e Mon Sep 17 00:00:00 2001 From: Carson Date: Fri, 14 Mar 2025 11:48:28 -0500 Subject: [PATCH 10/21] Reduce diff --- shiny/ui/_chat.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/shiny/ui/_chat.py b/shiny/ui/_chat.py index 2eff2ef25..b9ea062af 100644 --- a/shiny/ui/_chat.py +++ b/shiny/ui/_chat.py @@ -502,9 +502,11 @@ def messages( transform = transform_user == "all" or ( transform_user == "last" and i == len(messages) - 1 ) - key = "transform_key" if transform else "pre_transform_key" - content_val = getattr(m, getattr(m, key)) - chat_msg = ChatMessageDict(content=str(content_val), role=m.role) + content_key = getattr( + m, "transform_key" if transform else "pre_transform_key" + ) + content = getattr(m, content_key) + chat_msg = ChatMessageDict(content=str(content), role=m.role) if not isinstance(format, MISSING_TYPE): chat_msg = as_provider_message(chat_msg, format) res.append(chat_msg) From a05b447a4b7de6028ce8656dd62907f1a7d85c1f Mon Sep 17 00:00:00 2001 From: Carson Date: Fri, 14 Mar 2025 13:30:12 -0500 Subject: [PATCH 11/21] Update test --- .../shiny/components/chat/inject/app.py | 84 ------------------- .../chat/inject/test_chat_inject.py | 29 ------- .../components/chat/message-stream/app.py | 75 +++++++++++++++++ .../test_chat_message_stream.py | 46 ++++++++++ 4 files changed, 121 insertions(+), 113 deletions(-) delete mode 100644 tests/playwright/shiny/components/chat/inject/app.py delete mode 100644 tests/playwright/shiny/components/chat/inject/test_chat_inject.py create mode 100644 tests/playwright/shiny/components/chat/message-stream/app.py create mode 100644 tests/playwright/shiny/components/chat/message-stream/test_chat_message_stream.py diff --git a/tests/playwright/shiny/components/chat/inject/app.py b/tests/playwright/shiny/components/chat/inject/app.py deleted file mode 100644 index c3c7c2db3..000000000 --- a/tests/playwright/shiny/components/chat/inject/app.py +++ /dev/null @@ -1,84 +0,0 @@ -import asyncio - -from shiny import reactive -from shiny.express import input, render, ui - -ui.page_opts(title="Hello message streams") - -chat = ui.Chat(id="chat") -chat.ui() - - -# Launch a stream on load -@reactive.effect -async def _(): - await chat.append_message_stream(mock_stream()) - - -SLEEP_TIME = 0.25 - - -async def mock_stream(): - yield "Starting outer stream...\n\n" - await asyncio.sleep(SLEEP_TIME) - await mock_tool() - await asyncio.sleep(SLEEP_TIME) - yield "\n\n...outer stream complete" - - -async def mock_tool(): - # While the "outer" `.append_message_stream()` is running, - # start an "inner" stream with .message_stream() - async with chat.message_stream(): - await chat.append_message_chunk("\n\nStarting inner stream 1 🔄...") - await asyncio.sleep(SLEEP_TIME) - await chat.append_message_chunk("Progress: 0%") - await asyncio.sleep(SLEEP_TIME) - - async with chat.message_stream(): - await chat.append_message_chunk("\n\nStarting nested stream 2 🔄...") - await asyncio.sleep(SLEEP_TIME) - await chat.append_message_chunk("Progress: 0%") - await asyncio.sleep(SLEEP_TIME) - await chat.append_message_chunk(" Progress: 50%") - await asyncio.sleep(SLEEP_TIME) - await chat.append_message_chunk(" Progress: 100%") - await asyncio.sleep(SLEEP_TIME) - await chat.append_message_chunk( - "\n\nCompleted _another_ inner stream ✅", operation="replace" - ) - - await chat.append_message_chunk("\n\nBack to stream 1...") - await chat.append_message_chunk(" Progress: 50%") - await asyncio.sleep(SLEEP_TIME) - await chat.append_message_chunk(" Progress: 100%") - await asyncio.sleep(SLEEP_TIME) - - await chat.append_message_chunk( - "\n\nCompleted inner _and nested_ stream ✅", operation="replace" - ) - - -@chat.on_user_submit -async def _(user_input: str): - await chat.append_message_stream(f"You said: {user_input}") - - -ui.input_action_button("add_stream_basic", "Add .message_stream()") - - -@reactive.effect -@reactive.event(input.add_stream_basic) -async def _(): - async with chat.message_stream(): - await chat.append_message_chunk("Running test...") - await asyncio.sleep(1) - await chat.append_message_chunk("Test complete!") - - -# TODO: more tests, like submitting input, etc. - - -@render.code -def message_out(): - return str(chat.messages()) diff --git a/tests/playwright/shiny/components/chat/inject/test_chat_inject.py b/tests/playwright/shiny/components/chat/inject/test_chat_inject.py deleted file mode 100644 index 28d15ba53..000000000 --- a/tests/playwright/shiny/components/chat/inject/test_chat_inject.py +++ /dev/null @@ -1,29 +0,0 @@ -from playwright.sync_api import Page, expect -from utils.deploy_utils import skip_on_webkit - -from shiny.playwright import controller -from shiny.run import ShinyAppProc - - -@skip_on_webkit -def test_validate_chat_inject(page: Page, local_app: ShinyAppProc) -> None: - page.goto(local_app.url) - - TIMEOUT = 30 * 1000 - - chat = controller.Chat(page, "chat") - expect(chat.loc).to_be_visible(timeout=TIMEOUT) - - chat.expect_latest_message( - "Starting stream...injected chunk...stream complete", - timeout=TIMEOUT, - ) - - btn = controller.InputActionButton(page, "run_test") - expect(btn.loc).to_be_visible(timeout=TIMEOUT) - btn.click() - - chat.expect_latest_message( - "can inject chunks", - timeout=TIMEOUT, - ) diff --git a/tests/playwright/shiny/components/chat/message-stream/app.py b/tests/playwright/shiny/components/chat/message-stream/app.py new file mode 100644 index 000000000..a0217fe99 --- /dev/null +++ b/tests/playwright/shiny/components/chat/message-stream/app.py @@ -0,0 +1,75 @@ +import asyncio + +from shiny import reactive +from shiny.express import input, render, ui + +SLEEP_TIME = 0.75 + +ui.page_opts(title="Hello chat message streams") + +with ui.sidebar(style="height:100%"): + ui.input_action_button("basic_stream", "Add message stream") + ui.input_action_button("nested_stream", "Add nested stream") + + ui.h6("Message state:", class_="mt-auto mb-0") + + @render.code + def message_state(): + return str(chat.messages()) + + +chat = ui.Chat(id="chat") +chat.ui() + + +# TODO: test submitting input after adding stream +@chat.on_user_submit +async def _(user_input: str): + await chat.append_message(f"You said: {user_input}") + + +@reactive.effect +@reactive.event(input.basic_stream) +async def _(): + chunks = [ + "Starting stream 1 🔄...\n\n", + "Progress: 0%", + " Progress: 50%", + " Progress: 100%", + ] + async with chat.message_stream(): + for chunk in chunks: + await chat.append_message_chunk(chunk) + await asyncio.sleep(SLEEP_TIME) + await chat.append_message_chunk("Completed stream 1 ✅", operation="replace") + + +# TODO: add test here for nested .message_stream() + + +@reactive.effect +@reactive.event(input.nested_stream) +async def _(): + await chat.append_message_stream(mock_stream()) + + +async def mock_stream(): + yield "Starting outer stream...\n\n" + await asyncio.sleep(SLEEP_TIME) + await mock_tool() + await asyncio.sleep(SLEEP_TIME) + yield "\n\n...outer stream complete" + + +async def mock_tool(): + chunks = [ + "Starting inner stream 🔄...\n\n", + "Progress: 0%", + " Progress: 50%", + " Progress: 100%", + ] + for chunk in chunks: + await chat.append_message_chunk(chunk, operation="replace") + + +# TODO: more tests, like submitting input, etc. diff --git a/tests/playwright/shiny/components/chat/message-stream/test_chat_message_stream.py b/tests/playwright/shiny/components/chat/message-stream/test_chat_message_stream.py new file mode 100644 index 000000000..05fcbe789 --- /dev/null +++ b/tests/playwright/shiny/components/chat/message-stream/test_chat_message_stream.py @@ -0,0 +1,46 @@ +from playwright.sync_api import Page, expect +from utils.deploy_utils import skip_on_webkit + +from shiny.playwright import controller +from shiny.run import ShinyAppProc + + +@skip_on_webkit +def test_validate_chat_inject(page: Page, local_app: ShinyAppProc) -> None: + page.goto(local_app.url) + + TIMEOUT = 30 * 1000 + + chat = controller.Chat(page, "chat") + expect(chat.loc).to_be_visible(timeout=TIMEOUT) + + basic_stream = controller.InputActionButton(page, "basic_stream") + expect(basic_stream.loc).to_be_visible(timeout=TIMEOUT) + basic_stream.click() + + # TODO: how to test the progress messages? + chat.expect_latest_message( + "Completed stream 1 ✅", + timeout=TIMEOUT, + ) + + chat.set_user_input("Hello") + chat.send_user_input() + chat.expect_latest_message("You said: Hello", timeout=TIMEOUT) + + nested_stream = controller.InputActionButton(page, "nested_stream") + expect(nested_stream.loc).to_be_visible(timeout=TIMEOUT) + nested_stream.click() + + # TODO: how to test the progress messages? + chat.expect_latest_message( + "Starting outer stream...\n\nCompleted inner stream ✅\n\n...outer stream complete", + timeout=TIMEOUT, + ) + + chat.set_user_input("Hello") + chat.send_user_input() + chat.expect_latest_message("You said: Hello", timeout=TIMEOUT) + + # TODO: test message state + # message_state = controller.OutputCode(page, "message_state") From bb171f73415d2685201c1e2b7d26e55b8a76a616 Mon Sep 17 00:00:00 2001 From: Carson Date: Fri, 14 Mar 2025 16:00:49 -0500 Subject: [PATCH 12/21] Yield a MessageStream() instance with a .append() and .restore() method --- shiny/ui/_chat.py | 62 ++++++++++++++----- .../components/chat/message-stream/app.py | 17 +++-- 2 files changed, 56 insertions(+), 23 deletions(-) diff --git a/shiny/ui/_chat.py b/shiny/ui/_chat.py index b9ea062af..aa7a68ed7 100644 --- a/shiny/ui/_chat.py +++ b/shiny/ui/_chat.py @@ -590,12 +590,7 @@ async def append_message( icon=icon, ) - async def append_message_chunk( - self, - message_chunk: Any, - *, - operation: Literal["append", "replace"] = "append", - ): + async def append_message_chunk(self, message_chunk: Any): """ Append a message chunk to the current message stream. @@ -606,8 +601,6 @@ async def append_message_chunk( ---------- message_chunk A message chunk to inject. - operation - Whether to append or replace the *current* message stream content. Note ---- @@ -633,7 +626,6 @@ async def append_message_chunk( return await self._append_message_chunk( message_chunk, stream_id=stream_id, - operation=operation, ) @asynccontextmanager @@ -644,6 +636,12 @@ async def message_stream(self): A context manager for streaming messages into the chat. Note this stream can occur within a longer running `.append_message_stream()` or used on its own. + Yields + ------ + : + A `MessageStream` instance with a method for `.append()`ing message chunks + and a method for `.restore()`ing the stream back to it's initial state. + Note ---- A useful pattern for displaying tool calls in a chat interface is for the @@ -658,16 +656,14 @@ async def message_stream(self): self._message_stream_checkpoint = self._current_stream_message # No stream currently exists, start one - is_root_stream = not self._current_stream_id + stream_id = self._current_stream_id + is_root_stream = stream_id is None if is_root_stream: - await self._append_message_chunk( - "", - chunk="start", - stream_id=_utils.private_random_id(), - ) + stream_id = _utils.private_random_id() + await self._append_message_chunk("", chunk="start", stream_id=stream_id) try: - yield + yield MessageStream(self, stream_id) finally: # Restore the previous stream state self._message_stream_checkpoint = old_checkpoint @@ -677,7 +673,7 @@ async def message_stream(self): await self._append_message_chunk( "", chunk="end", - stream_id=cast(str, self._current_stream_id), + stream_id=stream_id, ) async def _append_message_chunk( @@ -1496,4 +1492,36 @@ def chat_ui( return res +class MessageStream: + """""" + + def __init__(self, chat: Chat, stream_id: str): + self._chat = chat + self._stream_id = stream_id + + async def restore(self): + """ + Restore the stream back to its initial state. + """ + await self._chat._append_message_chunk( + "", + operation="replace", + stream_id=self._stream_id, + ) + + async def append(self, message_chunk: Any): + """ + Append a message chunk to the stream. + + Parameters + ----------- + message_chunk + A message chunk to append to this stream + """ + await self._chat._append_message_chunk( + message_chunk, + stream_id=self._stream_id, + ) + + CHAT_INSTANCES: WeakValueDictionary[str, Chat] = WeakValueDictionary() diff --git a/tests/playwright/shiny/components/chat/message-stream/app.py b/tests/playwright/shiny/components/chat/message-stream/app.py index a0217fe99..b9113e93f 100644 --- a/tests/playwright/shiny/components/chat/message-stream/app.py +++ b/tests/playwright/shiny/components/chat/message-stream/app.py @@ -3,7 +3,7 @@ from shiny import reactive from shiny.express import input, render, ui -SLEEP_TIME = 0.75 +SLEEP_TIME = 0.25 ui.page_opts(title="Hello chat message streams") @@ -37,11 +37,12 @@ async def _(): " Progress: 50%", " Progress: 100%", ] - async with chat.message_stream(): + async with chat.message_stream() as stream: for chunk in chunks: - await chat.append_message_chunk(chunk) + await stream.append(chunk) await asyncio.sleep(SLEEP_TIME) - await chat.append_message_chunk("Completed stream 1 ✅", operation="replace") + await stream.restore() + await stream.append("Completed stream 1 ✅") # TODO: add test here for nested .message_stream() @@ -68,8 +69,12 @@ async def mock_tool(): " Progress: 50%", " Progress: 100%", ] - for chunk in chunks: - await chat.append_message_chunk(chunk, operation="replace") + async with chat.message_stream() as stream: + for chunk in chunks: + await stream.append(chunk) + await asyncio.sleep(SLEEP_TIME) + await stream.restore() + await stream.append("Completed inner stream ✅") # TODO: more tests, like submitting input, etc. From 5e822fc220e959b001859d62248fd5753c0015d0 Mon Sep 17 00:00:00 2001 From: Carson Date: Mon, 17 Mar 2025 10:25:05 -0500 Subject: [PATCH 13/21] Cut public .append_message_chunk() method; rename context manager method --- shiny/ui/_chat.py | 70 +++++-------------- .../components/chat/message-stream/app.py | 6 +- 2 files changed, 21 insertions(+), 55 deletions(-) diff --git a/shiny/ui/_chat.py b/shiny/ui/_chat.py index aa7a68ed7..68e32642a 100644 --- a/shiny/ui/_chat.py +++ b/shiny/ui/_chat.py @@ -590,68 +590,32 @@ async def append_message( icon=icon, ) - async def append_message_chunk(self, message_chunk: Any): - """ - Append a message chunk to the current message stream. - - Append a chunk of message content to either a stream started with - `.message_stream()` or an active `.append_message_stream()`. - - Parameters - ---------- - message_chunk - A message chunk to inject. - - Note - ---- - A useful pattern for displaying tool calls in a chat is for the tools to display - content using an "inner" `.message_stream()` while the response generation is - happening in an "outer" `.append_message_stream()`. This allows the inner stream - to display "ephemeral" content, then eventually show a final state with - `.append_message_chunk(operation="replace")`. - - Raises - ------ - ValueError - If there is active stream (i.e., no `.message_stream()` or - `.append_message_stream()`) - """ - stream_id = self._current_stream_id - if stream_id is None: - raise ValueError( - "Can't .append_message_chunk() without an active message stream. " - "Use .message_stream() or .append_message_stream() to start one." - ) - - return await self._append_message_chunk( - message_chunk, - stream_id=stream_id, - ) - @asynccontextmanager - async def message_stream(self): + async def append_message_context(self): """ Message stream context manager. - A context manager for streaming messages into the chat. Note this stream - can occur within a longer running `.append_message_stream()` or used on its own. + A context manager for streaming messages into the chat. Note this context + manager can be used in isolation, nested within itself, or used while a + long-running `.append_message_stream()` is in progress. Yields ------ : - A `MessageStream` instance with a method for `.append()`ing message chunks - and a method for `.restore()`ing the stream back to it's initial state. + A `MessageStream` class instance, which has a method for `.append()`ing + message chunks to as well as way to `.restore()` the stream back to it's + initial state. Note ---- - A useful pattern for displaying tool calls in a chat interface is for the - tool to display using `.message_stream()` while the the response generation - is happening through `.append_message_stream()`. This allows the inner stream - to display "ephemeral" content, then eventually show a final state - with `.append_message_chunk(operation="replace")`. + A useful pattern for displaying tool calls in a chatbot is for the tool to + display using `.append_message_context()` while the the response generation is + happening through `.append_message_stream()`. This allows the tool to display + things like progress updates (or other "ephemeral" content) and optionally + `.restore()` the stream back to it's initial state when ready to display the + "final" content. """ - # Save the current stream state in a checkpoint (so that we can handle - # ``.append_message_chunk(operation="replace")` correctly) + # Checkpoint the current stream state so operation="replace" can return to it old_checkpoint = self._message_stream_checkpoint self._message_stream_checkpoint = self._current_stream_message @@ -665,7 +629,7 @@ async def message_stream(self): try: yield MessageStream(self, stream_id) finally: - # Restore the previous stream state + # Restore the checkpoint self._message_stream_checkpoint = old_checkpoint # If this was the root stream, end it @@ -1493,7 +1457,9 @@ def chat_ui( class MessageStream: - """""" + """ + An object to yield from a `.append_message_context()` context manager. + """ def __init__(self, chat: Chat, stream_id: str): self._chat = chat diff --git a/tests/playwright/shiny/components/chat/message-stream/app.py b/tests/playwright/shiny/components/chat/message-stream/app.py index b9113e93f..8d95e055e 100644 --- a/tests/playwright/shiny/components/chat/message-stream/app.py +++ b/tests/playwright/shiny/components/chat/message-stream/app.py @@ -37,7 +37,7 @@ async def _(): " Progress: 50%", " Progress: 100%", ] - async with chat.message_stream() as stream: + async with chat.append_message_context() as stream: for chunk in chunks: await stream.append(chunk) await asyncio.sleep(SLEEP_TIME) @@ -45,7 +45,7 @@ async def _(): await stream.append("Completed stream 1 ✅") -# TODO: add test here for nested .message_stream() +# TODO: add test here for nested .append_message_context() @reactive.effect @@ -69,7 +69,7 @@ async def mock_tool(): " Progress: 50%", " Progress: 100%", ] - async with chat.message_stream() as stream: + async with chat.append_message_context() as stream: for chunk in chunks: await stream.append(chunk) await asyncio.sleep(SLEEP_TIME) From 24f05e59cccc849bfd58794999129da02f043ada Mon Sep 17 00:00:00 2001 From: Carson Date: Mon, 17 Mar 2025 11:10:03 -0500 Subject: [PATCH 14/21] Improve docstring --- shiny/ui/_chat.py | 44 +++++++++++++++++++++++++++++++++++++++----- 1 file changed, 39 insertions(+), 5 deletions(-) diff --git a/shiny/ui/_chat.py b/shiny/ui/_chat.py index 68e32642a..fb2718019 100644 --- a/shiny/ui/_chat.py +++ b/shiny/ui/_chat.py @@ -595,16 +595,50 @@ async def append_message_context(self): """ Message stream context manager. - A context manager for streaming messages into the chat. Note this context - manager can be used in isolation, nested within itself, or used while a - long-running `.append_message_stream()` is in progress. + A context manager for appending streaming messages into the chat. This context + manager can: + + 1. Be used in isolation to append a new streaming message to the chat. + * Compared to `.append_message_stream()` this method is more flexible but + isn't non-blocking by default (i.e., it doesn't launch an extended task). + 2. Be nested within itself + * Nesting is primarily useful for making checkpoints to `.restore()` back + to (see the example below). + 3. Be used from within a `.append_message_stream()` + * Useful for inserting additional content from another context into the + stream (e.g., see the note about tool calls below). Yields ------ : A `MessageStream` class instance, which has a method for `.append()`ing - message chunks to as well as way to `.restore()` the stream back to it's - initial state. + message content chunks to as well as way to `.restore()` the stream back to + it's initial state. Note that `.append()` supports the same message content + types as `.append_message()`. + + Example + ------- + ```python + import asyncio + + from shiny import reactive + from shiny.express import ui + + chat = ui.Chat(id="my_chat") + chat.ui() + + @reactive.effect + async def _(): + async with chat.append_message_context() as msg: + await msg.append("Starting stream...\n\nProgress:") + async with chat.append_message_context() as progress: + for x in [0, 50, 100]: + await progress.append(f" {x}%") + await asyncio.sleep(1) + await progress.restore() + await msg.restore() + await msg.append("Completed stream") + ``` Note ---- From 4995ade9094d2d07291c7116b8b5c7f23e06f3ad Mon Sep 17 00:00:00 2001 From: Carson Date: Mon, 17 Mar 2025 12:01:27 -0500 Subject: [PATCH 15/21] More complete playwright test --- .../chat/append_message_context/app.py | 120 ++++++++++++++++++ .../test_chat_append_message_context.py | 94 ++++++++++++++ .../components/chat/message-stream/app.py | 80 ------------ .../test_chat_message_stream.py | 46 ------- 4 files changed, 214 insertions(+), 126 deletions(-) create mode 100644 tests/playwright/shiny/components/chat/append_message_context/app.py create mode 100644 tests/playwright/shiny/components/chat/append_message_context/test_chat_append_message_context.py delete mode 100644 tests/playwright/shiny/components/chat/message-stream/app.py delete mode 100644 tests/playwright/shiny/components/chat/message-stream/test_chat_message_stream.py diff --git a/tests/playwright/shiny/components/chat/append_message_context/app.py b/tests/playwright/shiny/components/chat/append_message_context/app.py new file mode 100644 index 000000000..6bdda9c8d --- /dev/null +++ b/tests/playwright/shiny/components/chat/append_message_context/app.py @@ -0,0 +1,120 @@ +import asyncio + +from shiny import reactive +from shiny.express import input, render, ui + +SLEEP_TIME = 0.25 + +ui.page_opts(title="Hello chat message streams") + +with ui.sidebar(style="height:100%"): + ui.input_action_button("stream_1", "Stream 1") + ui.input_action_button("stream_2", "Stream 2") + ui.input_action_button("stream_3", "Stream 3") + ui.input_action_button("stream_4", "Stream 4") + ui.input_action_button("stream_5", "Stream 5") + ui.input_action_button("stream_6", "Stream 6") + + ui.h6("Message state:", class_="mt-auto mb-0") + + @render.code + def message_state(): + return str(chat.messages()) + + +chat = ui.Chat(id="chat") +chat.ui() + + +@chat.on_user_submit +async def _(user_input: str): + await chat.append_message(f"You said: {user_input}") + + +@reactive.effect +@reactive.event(input.stream_1) +async def _(): + async with chat.append_message_context() as msg: + await msg.append("Basic") + await asyncio.sleep(SLEEP_TIME) + await msg.append(" stream") + + +@reactive.effect +@reactive.event(input.stream_2) +async def _(): + async with chat.append_message_context() as msg: + await msg.append("Basic") + await asyncio.sleep(SLEEP_TIME) + await msg.append(" stream") + await asyncio.sleep(SLEEP_TIME) + await msg.restore() + await asyncio.sleep(SLEEP_TIME) + await msg.append("Finished") + + +@reactive.effect +@reactive.event(input.stream_3) +async def _(): + async with chat.append_message_context() as outer: + await outer.append("Outer start") + await asyncio.sleep(SLEEP_TIME) + async with chat.append_message_context() as inner: + await inner.append("Inner start") + await asyncio.sleep(SLEEP_TIME) + await inner.append("Inner end") + await asyncio.sleep(SLEEP_TIME) + await outer.append("Outer end") + + +@reactive.effect +@reactive.event(input.stream_4) +async def _(): + async with chat.append_message_context() as outer: + await outer.append("Outer start") + await asyncio.sleep(SLEEP_TIME) + async with chat.append_message_context() as inner: + await inner.append("Inner start") + await asyncio.sleep(SLEEP_TIME) + await inner.restore() + await inner.append("Inner end") + await asyncio.sleep(SLEEP_TIME) + await outer.append("Outer end") + + +@reactive.effect +@reactive.event(input.stream_5) +async def _(): + async with chat.append_message_context() as outer: + await outer.append("Outer start") + await asyncio.sleep(SLEEP_TIME) + await outer.restore() + async with chat.append_message_context() as inner: + await inner.append("Inner start") + await asyncio.sleep(SLEEP_TIME) + await inner.append("Inner end") + await asyncio.sleep(SLEEP_TIME) + await outer.append("Outer end") + + +@reactive.effect +@reactive.event(input.stream_6) +async def _(): + await chat.append_message_stream(outer_stream()) + + +async def outer_stream(): + yield "Outer start" + await asyncio.sleep(SLEEP_TIME) + await inner_stream() + await asyncio.sleep(SLEEP_TIME) + yield "Outer end" + + +async def inner_stream(): + async with chat.append_message_context() as stream: + await stream.append("Inner start") + await asyncio.sleep(SLEEP_TIME) + await stream.append("Inner progress") + await stream.restore() + await stream.append("Inner end") diff --git a/tests/playwright/shiny/components/chat/append_message_context/test_chat_append_message_context.py b/tests/playwright/shiny/components/chat/append_message_context/test_chat_append_message_context.py new file mode 100644 index 000000000..cee1ac56a --- /dev/null +++ b/tests/playwright/shiny/components/chat/append_message_context/test_chat_append_message_context.py @@ -0,0 +1,94 @@ +from playwright.sync_api import Page, expect +from utils.deploy_utils import skip_on_webkit + +from shiny.playwright import controller +from shiny.run import ShinyAppProc + + +@skip_on_webkit +def test_validate_chat_append_message_context( + page: Page, local_app: ShinyAppProc +) -> None: + page.goto(local_app.url) + + TIMEOUT = 30 * 1000 + + chat = controller.Chat(page, "chat") + expect(chat.loc).to_be_visible(timeout=TIMEOUT) + + stream_1 = controller.InputActionButton(page, "stream_1") + expect(stream_1.loc).to_be_visible(timeout=TIMEOUT) + stream_1.click() + + chat.expect_latest_message("Basic stream", timeout=TIMEOUT) + + stream_2 = controller.InputActionButton(page, "stream_2") + expect(stream_2.loc).to_be_visible(timeout=TIMEOUT) + stream_2.click() + + chat.expect_latest_message("Finished", timeout=TIMEOUT) + + chat.set_user_input("Hello") + chat.send_user_input() + chat.expect_latest_message("You said: Hello", timeout=TIMEOUT) + + stream_3 = controller.InputActionButton(page, "stream_3") + expect(stream_3.loc).to_be_visible(timeout=TIMEOUT) + stream_3.click() + + chat.expect_latest_message( + "Outer startInner startInner endOuter end", + timeout=TIMEOUT, + ) + + stream_4 = controller.InputActionButton(page, "stream_4") + expect(stream_4.loc).to_be_visible(timeout=TIMEOUT) + stream_4.click() + + chat.expect_latest_message( + "Outer startInner endOuter end", + timeout=TIMEOUT, + ) + + stream_5 = controller.InputActionButton(page, "stream_5") + expect(stream_5.loc).to_be_visible(timeout=TIMEOUT) + stream_5.click() + + chat.expect_latest_message( + "Inner startInner endOuter end", + timeout=TIMEOUT, + ) + + stream_6 = controller.InputActionButton(page, "stream_6") + expect(stream_6.loc).to_be_visible(timeout=TIMEOUT) + stream_6.click() + + chat.expect_latest_message( + "Outer startInner endOuter end", + timeout=TIMEOUT, + ) + + chat.set_user_input("Goodbye") + chat.send_user_input() + chat.expect_latest_message("You said: Goodbye", timeout=TIMEOUT) + + # Test server-side message state + message_state = controller.OutputCode(page, "message_state") + message_state_expected = tuple( + [ + {"content": "Basic stream", "role": "assistant"}, + {"content": "Finished", "role": "assistant"}, + {"content": "Hello", "role": "user"}, + {"content": "You said: Hello", "role": "assistant"}, + { + "content": "Outer startInner startInner endOuter end", + "role": "assistant", + }, + {"content": "Outer startInner endOuter end", "role": "assistant"}, + {"content": "Inner startInner endOuter end", "role": "assistant"}, + {"content": "Outer startInner endOuter end", "role": "assistant"}, + {"content": "Goodbye", "role": "user"}, + {"content": "You said: Goodbye", "role": "assistant"}, + ] + ) + message_state.expect_value(str(message_state_expected)) diff --git a/tests/playwright/shiny/components/chat/message-stream/app.py b/tests/playwright/shiny/components/chat/message-stream/app.py deleted file mode 100644 index 8d95e055e..000000000 --- a/tests/playwright/shiny/components/chat/message-stream/app.py +++ /dev/null @@ -1,80 +0,0 @@ -import asyncio - -from shiny import reactive -from shiny.express import input, render, ui - -SLEEP_TIME = 0.25 - -ui.page_opts(title="Hello chat message streams") - -with ui.sidebar(style="height:100%"): - ui.input_action_button("basic_stream", "Add message stream") - ui.input_action_button("nested_stream", "Add nested stream") - - ui.h6("Message state:", class_="mt-auto mb-0") - - @render.code - def message_state(): - return str(chat.messages()) - - -chat = ui.Chat(id="chat") -chat.ui() - - -# TODO: test submitting input after adding stream -@chat.on_user_submit -async def _(user_input: str): - await chat.append_message(f"You said: {user_input}") - - -@reactive.effect -@reactive.event(input.basic_stream) -async def _(): - chunks = [ - "Starting stream 1 🔄...\n\n", - "Progress: 0%", - " Progress: 50%", - " Progress: 100%", - ] - async with chat.append_message_context() as stream: - for chunk in chunks: - await stream.append(chunk) - await asyncio.sleep(SLEEP_TIME) - await stream.restore() - await stream.append("Completed stream 1 ✅") - - -# TODO: add test here for nested .append_message_context() - - -@reactive.effect -@reactive.event(input.nested_stream) -async def _(): - await chat.append_message_stream(mock_stream()) - - -async def mock_stream(): - yield "Starting outer stream...\n\n" - await asyncio.sleep(SLEEP_TIME) - await mock_tool() - await asyncio.sleep(SLEEP_TIME) - yield "\n\n...outer stream complete" - - -async def mock_tool(): - chunks = [ - "Starting inner stream 🔄...\n\n", - "Progress: 0%", - " Progress: 50%", - " Progress: 100%", - ] - async with chat.append_message_context() as stream: - for chunk in chunks: - await stream.append(chunk) - await asyncio.sleep(SLEEP_TIME) - await stream.restore() - await stream.append("Completed inner stream ✅") - - -# TODO: more tests, like submitting input, etc. diff --git a/tests/playwright/shiny/components/chat/message-stream/test_chat_message_stream.py b/tests/playwright/shiny/components/chat/message-stream/test_chat_message_stream.py deleted file mode 100644 index 05fcbe789..000000000 --- a/tests/playwright/shiny/components/chat/message-stream/test_chat_message_stream.py +++ /dev/null @@ -1,46 +0,0 @@ -from playwright.sync_api import Page, expect -from utils.deploy_utils import skip_on_webkit - -from shiny.playwright import controller -from shiny.run import ShinyAppProc - - -@skip_on_webkit -def test_validate_chat_inject(page: Page, local_app: ShinyAppProc) -> None: - page.goto(local_app.url) - - TIMEOUT = 30 * 1000 - - chat = controller.Chat(page, "chat") - expect(chat.loc).to_be_visible(timeout=TIMEOUT) - - basic_stream = controller.InputActionButton(page, "basic_stream") - expect(basic_stream.loc).to_be_visible(timeout=TIMEOUT) - basic_stream.click() - - # TODO: how to test the progress messages? - chat.expect_latest_message( - "Completed stream 1 ✅", - timeout=TIMEOUT, - ) - - chat.set_user_input("Hello") - chat.send_user_input() - chat.expect_latest_message("You said: Hello", timeout=TIMEOUT) - - nested_stream = controller.InputActionButton(page, "nested_stream") - expect(nested_stream.loc).to_be_visible(timeout=TIMEOUT) - nested_stream.click() - - # TODO: how to test the progress messages? - chat.expect_latest_message( - "Starting outer stream...\n\nCompleted inner stream ✅\n\n...outer stream complete", - timeout=TIMEOUT, - ) - - chat.set_user_input("Hello") - chat.send_user_input() - chat.expect_latest_message("You said: Hello", timeout=TIMEOUT) - - # TODO: test message state - # message_state = controller.OutputCode(page, "message_state") From 7ac2ba19ee39757899493a4ce82f0c8842988e78 Mon Sep 17 00:00:00 2001 From: Carson Date: Tue, 18 Mar 2025 12:02:48 -0500 Subject: [PATCH 16/21] Rename append_message_context() -> message_stream_context() --- shiny/ui/_chat.py | 10 +++++----- .../chat/append_message_context/app.py | 18 +++++++++--------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/shiny/ui/_chat.py b/shiny/ui/_chat.py index fb2718019..981cbe17f 100644 --- a/shiny/ui/_chat.py +++ b/shiny/ui/_chat.py @@ -591,7 +591,7 @@ async def append_message( ) @asynccontextmanager - async def append_message_context(self): + async def message_stream_context(self): """ Message stream context manager. @@ -629,9 +629,9 @@ async def append_message_context(self): @reactive.effect async def _(): - async with chat.append_message_context() as msg: + async with chat.message_stream_context() as msg: await msg.append("Starting stream...\n\nProgress:") - async with chat.append_message_context() as progress: + async with chat.message_stream_context() as progress: for x in [0, 50, 100]: await progress.append(f" {x}%") await asyncio.sleep(1) @@ -643,7 +643,7 @@ async def _(): Note ---- A useful pattern for displaying tool calls in a chatbot is for the tool to - display using `.append_message_context()` while the the response generation is + display using `.message_stream_context()` while the the response generation is happening through `.append_message_stream()`. This allows the tool to display things like progress updates (or other "ephemeral" content) and optionally `.restore()` the stream back to it's initial state when ready to display the @@ -1492,7 +1492,7 @@ def chat_ui( class MessageStream: """ - An object to yield from a `.append_message_context()` context manager. + An object to yield from a `.message_stream_context()` context manager. """ def __init__(self, chat: Chat, stream_id: str): diff --git a/tests/playwright/shiny/components/chat/append_message_context/app.py b/tests/playwright/shiny/components/chat/append_message_context/app.py index 6bdda9c8d..af93dce89 100644 --- a/tests/playwright/shiny/components/chat/append_message_context/app.py +++ b/tests/playwright/shiny/components/chat/append_message_context/app.py @@ -34,7 +34,7 @@ async def _(user_input: str): @reactive.effect @reactive.event(input.stream_1) async def _(): - async with chat.append_message_context() as msg: + async with chat.message_stream_context() as msg: await msg.append("Basic") await asyncio.sleep(SLEEP_TIME) await msg.append(" stream") @@ -43,7 +43,7 @@ async def _(): @reactive.effect @reactive.event(input.stream_2) async def _(): - async with chat.append_message_context() as msg: + async with chat.message_stream_context() as msg: await msg.append("Basic") await asyncio.sleep(SLEEP_TIME) await msg.append(" stream") @@ -56,10 +56,10 @@ async def _(): @reactive.effect @reactive.event(input.stream_3) async def _(): - async with chat.append_message_context() as outer: + async with chat.message_stream_context() as outer: await outer.append("Outer start") await asyncio.sleep(SLEEP_TIME) - async with chat.append_message_context() as inner: + async with chat.message_stream_context() as inner: await inner.append("Inner start") await asyncio.sleep(SLEEP_TIME) await inner.append("Inner end") @@ -70,10 +70,10 @@ async def _(): @reactive.effect @reactive.event(input.stream_4) async def _(): - async with chat.append_message_context() as outer: + async with chat.message_stream_context() as outer: await outer.append("Outer start") await asyncio.sleep(SLEEP_TIME) - async with chat.append_message_context() as inner: + async with chat.message_stream_context() as inner: await inner.append("Inner start") await asyncio.sleep(SLEEP_TIME) await inner.restore() @@ -85,11 +85,11 @@ async def _(): @reactive.effect @reactive.event(input.stream_5) async def _(): - async with chat.append_message_context() as outer: + async with chat.message_stream_context() as outer: await outer.append("Outer start") await asyncio.sleep(SLEEP_TIME) await outer.restore() - async with chat.append_message_context() as inner: + async with chat.message_stream_context() as inner: await inner.append("Inner start") await asyncio.sleep(SLEEP_TIME) await inner.append("Inner end") @@ -112,7 +112,7 @@ async def outer_stream(): async def inner_stream(): - async with chat.append_message_context() as stream: + async with chat.message_stream_context() as stream: await stream.append("Inner start") await asyncio.sleep(SLEEP_TIME) await stream.append("Inner progress") From f8ff4252044e1a4aa66080dce94c31d742ff3598 Mon Sep 17 00:00:00 2001 From: Carson Date: Tue, 18 Mar 2025 19:23:00 -0500 Subject: [PATCH 17/21] Rename .restore() -> .clear() --- shiny/ui/_chat.py | 14 +++++++------- .../components/chat/append_message_context/app.py | 8 ++++---- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/shiny/ui/_chat.py b/shiny/ui/_chat.py index 981cbe17f..ece0df7cc 100644 --- a/shiny/ui/_chat.py +++ b/shiny/ui/_chat.py @@ -602,7 +602,7 @@ async def message_stream_context(self): * Compared to `.append_message_stream()` this method is more flexible but isn't non-blocking by default (i.e., it doesn't launch an extended task). 2. Be nested within itself - * Nesting is primarily useful for making checkpoints to `.restore()` back + * Nesting is primarily useful for making checkpoints to `.clear()` back to (see the example below). 3. Be used from within a `.append_message_stream()` * Useful for inserting additional content from another context into the @@ -612,7 +612,7 @@ async def message_stream_context(self): ------ : A `MessageStream` class instance, which has a method for `.append()`ing - message content chunks to as well as way to `.restore()` the stream back to + message content chunks to as well as way to `.clear()` the stream back to it's initial state. Note that `.append()` supports the same message content types as `.append_message()`. @@ -635,8 +635,8 @@ async def _(): for x in [0, 50, 100]: await progress.append(f" {x}%") await asyncio.sleep(1) - await progress.restore() - await msg.restore() + await progress.clear() + await msg.clear() await msg.append("Completed stream") ``` @@ -646,7 +646,7 @@ async def _(): display using `.message_stream_context()` while the the response generation is happening through `.append_message_stream()`. This allows the tool to display things like progress updates (or other "ephemeral" content) and optionally - `.restore()` the stream back to it's initial state when ready to display the + `.clear()` the stream back to it's initial state when ready to display the "final" content. """ # Checkpoint the current stream state so operation="replace" can return to it @@ -1499,9 +1499,9 @@ def __init__(self, chat: Chat, stream_id: str): self._chat = chat self._stream_id = stream_id - async def restore(self): + async def clear(self): """ - Restore the stream back to its initial state. + Set the stream back to its original state. """ await self._chat._append_message_chunk( "", diff --git a/tests/playwright/shiny/components/chat/append_message_context/app.py b/tests/playwright/shiny/components/chat/append_message_context/app.py index af93dce89..ebfac81ef 100644 --- a/tests/playwright/shiny/components/chat/append_message_context/app.py +++ b/tests/playwright/shiny/components/chat/append_message_context/app.py @@ -48,7 +48,7 @@ async def _(): await asyncio.sleep(SLEEP_TIME) await msg.append(" stream") await asyncio.sleep(SLEEP_TIME) - await msg.restore() + await msg.clear() await asyncio.sleep(SLEEP_TIME) await msg.append("Finished") @@ -76,7 +76,7 @@ async def _(): async with chat.message_stream_context() as inner: await inner.append("Inner start") await asyncio.sleep(SLEEP_TIME) - await inner.restore() + await inner.clear() await inner.append("Inner end") await asyncio.sleep(SLEEP_TIME) await outer.append("Outer end") @@ -88,7 +88,7 @@ async def _(): async with chat.message_stream_context() as outer: await outer.append("Outer start") await asyncio.sleep(SLEEP_TIME) - await outer.restore() + await outer.clear() async with chat.message_stream_context() as inner: await inner.append("Inner start") await asyncio.sleep(SLEEP_TIME) @@ -116,5 +116,5 @@ async def inner_stream(): await stream.append("Inner start") await asyncio.sleep(SLEEP_TIME) await stream.append("Inner progress") - await stream.restore() + await stream.clear() await stream.append("Inner end") From 0ffdf219d3d7ce2010391322237f404b2f55b4a1 Mon Sep 17 00:00:00 2001 From: Carson Date: Tue, 18 Mar 2025 19:40:55 -0500 Subject: [PATCH 18/21] Update changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 55638d52b..24d64d85a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * Both `ui.Chat()` and `ui.MarkdownStream()` now support arbirary Shiny UI elements inside of messages. This allows for gathering input from the user (e.g., `ui.input_select()`), displaying of rich output (e.g., `render.DataGrid()`), and more. (#1868) +* Added a new `.message_stream_context()` method to `ui.Chat()`. This context manager is a useful alternative to `.append_message_stream()` when you want to: (1) Nest a stream within another and/or +(2) Overwrite/replace streaming content. (#1906) + ### Changes * Express mode's `app_opts()` requires all arguments to be keyword-only. If you are using positional arguments, you will need to update your code. (#1895) From c9fad380899b010b3ec65035cf2c018a49305594 Mon Sep 17 00:00:00 2001 From: Carson Date: Wed, 19 Mar 2025 19:18:54 -0500 Subject: [PATCH 19/21] Drop .clear() method in favor of replace --- shiny/ui/_chat.py | 11 ++++++++--- .../components/chat/append_message_context/app.py | 12 ++++-------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/shiny/ui/_chat.py b/shiny/ui/_chat.py index ece0df7cc..c48099e2a 100644 --- a/shiny/ui/_chat.py +++ b/shiny/ui/_chat.py @@ -1499,12 +1499,17 @@ def __init__(self, chat: Chat, stream_id: str): self._chat = chat self._stream_id = stream_id - async def clear(self): + async def replace(self, message_chunk: Any): """ - Set the stream back to its original state. + Replace the content of the stream with new content. + + Parameters + ----------- + message_chunk + The new content to replace the current content. """ await self._chat._append_message_chunk( - "", + message_chunk, operation="replace", stream_id=self._stream_id, ) diff --git a/tests/playwright/shiny/components/chat/append_message_context/app.py b/tests/playwright/shiny/components/chat/append_message_context/app.py index ebfac81ef..e8e9e7569 100644 --- a/tests/playwright/shiny/components/chat/append_message_context/app.py +++ b/tests/playwright/shiny/components/chat/append_message_context/app.py @@ -48,9 +48,7 @@ async def _(): await asyncio.sleep(SLEEP_TIME) await msg.append(" stream") await asyncio.sleep(SLEEP_TIME) - await msg.clear() - await asyncio.sleep(SLEEP_TIME) - await msg.append("Finished") + await msg.replace("Finished") @reactive.effect @@ -76,8 +74,7 @@ async def _(): async with chat.message_stream_context() as inner: await inner.append("Inner start") await asyncio.sleep(SLEEP_TIME) - await inner.clear() - await inner.append("Inner end") + await inner.replace("Inner end") await asyncio.sleep(SLEEP_TIME) await outer.append("Outer end") @@ -88,7 +85,7 @@ async def _(): async with chat.message_stream_context() as outer: await outer.append("Outer start") await asyncio.sleep(SLEEP_TIME) - await outer.clear() + await outer.replace("") async with chat.message_stream_context() as inner: await inner.append("Inner start") await asyncio.sleep(SLEEP_TIME) @@ -116,5 +113,4 @@ async def inner_stream(): await stream.append("Inner start") await asyncio.sleep(SLEEP_TIME) await stream.append("Inner progress") - await stream.clear() - await stream.append("Inner end") + await stream.replace("Inner end") From bd5e6105f22efb74e116d0b59f5576ca0b06d621 Mon Sep 17 00:00:00 2001 From: Carson Date: Wed, 19 Mar 2025 19:20:55 -0500 Subject: [PATCH 20/21] Rename test files --- .../{append_message_context => message_stream_context}/app.py | 0 .../test_chat_message_stream_context.py} | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename tests/playwright/shiny/components/chat/{append_message_context => message_stream_context}/app.py (100%) rename tests/playwright/shiny/components/chat/{append_message_context/test_chat_append_message_context.py => message_stream_context/test_chat_message_stream_context.py} (98%) diff --git a/tests/playwright/shiny/components/chat/append_message_context/app.py b/tests/playwright/shiny/components/chat/message_stream_context/app.py similarity index 100% rename from tests/playwright/shiny/components/chat/append_message_context/app.py rename to tests/playwright/shiny/components/chat/message_stream_context/app.py diff --git a/tests/playwright/shiny/components/chat/append_message_context/test_chat_append_message_context.py b/tests/playwright/shiny/components/chat/message_stream_context/test_chat_message_stream_context.py similarity index 98% rename from tests/playwright/shiny/components/chat/append_message_context/test_chat_append_message_context.py rename to tests/playwright/shiny/components/chat/message_stream_context/test_chat_message_stream_context.py index cee1ac56a..12f8f6193 100644 --- a/tests/playwright/shiny/components/chat/append_message_context/test_chat_append_message_context.py +++ b/tests/playwright/shiny/components/chat/message_stream_context/test_chat_message_stream_context.py @@ -6,7 +6,7 @@ @skip_on_webkit -def test_validate_chat_append_message_context( +def test_validate_chat_message_stream_context( page: Page, local_app: ShinyAppProc ) -> None: page.goto(local_app.url) From 2a6d9343defe5d2ecaf6071328964337e2dc6574 Mon Sep 17 00:00:00 2001 From: Carson Date: Thu, 20 Mar 2025 11:24:01 -0500 Subject: [PATCH 21/21] Include the operation when queueing message chunks --- shiny/ui/_chat.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/shiny/ui/_chat.py b/shiny/ui/_chat.py index c48099e2a..b4024f4f3 100644 --- a/shiny/ui/_chat.py +++ b/shiny/ui/_chat.py @@ -82,7 +82,12 @@ ChunkOption = Literal["start", "end", True, False] -PendingMessage = Tuple[Any, ChunkOption, Union[str, None]] +PendingMessage = Tuple[ + Any, + ChunkOption, + Literal["append", "replace"], + Union[str, None], +] @add_example(ex_dir="../templates/chat/starters/hello") @@ -576,7 +581,7 @@ async def append_message( """ # If we're in a stream, queue the message if self._current_stream_id: - self._pending_messages.append((message, False, None)) + self._pending_messages.append((message, False, "append", None)) return msg = normalize_message(message) @@ -685,7 +690,7 @@ async def _append_message_chunk( ) -> None: # If currently we're in a *different* stream, queue the message chunk if self._current_stream_id and self._current_stream_id != stream_id: - self._pending_messages.append((message, chunk, stream_id)) + self._pending_messages.append((message, chunk, operation, stream_id)) return self._current_stream_id = stream_id @@ -876,12 +881,15 @@ async def _append_message_stream( async def _flush_pending_messages(self): pending = self._pending_messages self._pending_messages = [] - for msg, chunk, stream_id in pending: + for msg, chunk, operation, stream_id in pending: if chunk is False: await self.append_message(msg) else: await self._append_message_chunk( - msg, chunk=chunk, stream_id=cast(str, stream_id) + msg, + chunk=chunk, + operation=operation, + stream_id=cast(str, stream_id), ) # Send a message to the UI