@@ -193,6 +193,8 @@ def __init__(
193193 self ._current_stream_id : str | None = None
194194 self ._pending_messages : list [PendingMessage ] = []
195195
196+ self ._manual_stream_id : str | None = None
197+
196198 # If a user input message is transformed into a response, we need to cancel
197199 # the next user input submit handling
198200 self ._suspend_input_handler : bool = False
@@ -552,21 +554,19 @@ async def append_message(
552554 """
553555 await self ._append_message (message , icon = icon )
554556
555- async def inject_message_chunk (
557+ async def append_message_chunk (
556558 self ,
557559 message_chunk : Any ,
558560 * ,
559561 operation : Literal ["append" , "replace" ] = "append" ,
560562 force : bool = False ,
561563 ):
562564 """
563- Inject a chunk of message content into the current message stream.
565+ Append a message chunk to the current message stream.
564566
565- Sometimes when streaming a message (i.e., `.append_message_stream()`), you may
566- want to inject a content into the streaming message while the stream is
567- busy doing other things (e.g., calling a tool). This method allows you to
568- inject any content you want into the current message stream (assuming one is
569- active).
567+ Append a chunk of message content to either the currently running
568+ `.append_message_stream()` or to one that was manually started with
569+ `.start_message_stream()`.
570570
571571 Parameters
572572 ----------
@@ -577,14 +577,16 @@ async def inject_message_chunk(
577577 force
578578 Whether to start a new stream if one is not currently active.
579579 """
580- stream_id = self ._current_stream_id
580+ # Can append to either an active `.start_message_stream()` or a
581+ # # `.append_message_stream()`
582+ stream_id = self ._manual_stream_id or self ._current_stream_id
581583 if stream_id is None :
582584 if not force :
583585 raise ValueError (
584- "Can't inject a message chunk when no message stream is active . "
585- "Use `force=True` to start a new stream if one is not currently active." ,
586+ "Can't append a message chunk without an active message stream. "
587+ "Use `force=True` to start a new message stream if one is not currently active." ,
586588 )
587- await self .start_message_stream (force = True )
589+ await self .start_message_stream ()
588590
589591 return await self ._append_message (
590592 message_chunk ,
@@ -593,41 +595,41 @@ async def inject_message_chunk(
593595 operation = operation ,
594596 )
595597
596- async def start_message_stream (self , * , force : bool = False ):
598+ async def start_message_stream (self ):
597599 """
598600 Start a new message stream.
599601
600- Parameters
601- ----------
602- force
603- Whether to force starting a new stream even if one is already active
602+ Starts a new message stream which can then be appended to using
603+ `.append_message_chunk()`.
604604 """
605- stream_id = self . _current_stream_id
606- if stream_id is not None :
607- if not force :
608- raise ValueError (
609- "Can't start a new message stream when a message stream is already active. "
610- "Use `force=True` to end a currently active stream and start a new one." ,
611- )
612- await self . end_message_stream ()
613-
614- id = _utils . private_random_id ()
615- return await self . _append_message ( "" , chunk = "start" , stream_id = id )
605+ # Since `._append_message()` manages a queue of message streams, we can just
606+ # start a new stream here. Note that, if a stream is already active, this
607+ # stream should start once the current stream ends.
608+ stream_id = _utils . private_random_id ()
609+ # Separately track the stream id so ``.append_message_chunk()``/`.end_message_stream()`
610+ self . _manual_stream_id = stream_id
611+ return await self . _append_message (
612+ "" ,
613+ chunk = "start" ,
614+ stream_id = stream_id ,
615+ )
616616
617617 async def end_message_stream (self ):
618618 """
619619 End the current message stream (if any).
620+
621+ Ends a message stream that was started with `.start_message_stream()`.
620622 """
621- stream_id = self ._current_stream_id
623+ stream_id = self ._manual_stream_id
622624 if stream_id is None :
623625 warnings .warn ("No currently active stream to end." , stacklevel = 2 )
624626 return
625627
626- with reactive . isolate ():
627- # TODO: .cancel() method should probably just handle this
628- self . latest_message_stream . cancel ()
629-
630- return await self . _append_message ( "" , chunk = "end" , stream_id = stream_id )
628+ return await self . _append_message (
629+ "" ,
630+ chunk = "end" ,
631+ stream_id = stream_id ,
632+ )
631633
632634 async def _append_message (
633635 self ,
@@ -771,8 +773,8 @@ def latest_message_stream(self) -> reactive.ExtendedTask[[], str]:
771773 """
772774 React to changes in the latest message stream.
773775
774- Reactively reads for the :class:`~shiny.reactive.ExtendedTask` behind the
775- latest message stream .
776+ Reactively reads for the :class:`~shiny.reactive.ExtendedTask` behind an
777+ `.append_message_stream()` .
776778
777779 From the return value (i.e., the extended task), you can then:
778780
0 commit comments