|
80 | 80 | UserSubmitFunction1, |
81 | 81 | ] |
82 | 82 |
|
83 | | -PendingMessage = Tuple[Any, Literal["start", "end", True], Union[str, None]] |
| 83 | +ChunkOption = Literal["start", "end", True, False] |
| 84 | + |
| 85 | +PendingMessage = Tuple[Any, ChunkOption, Union[str, None]] |
84 | 86 |
|
85 | 87 |
|
86 | 88 | @add_example(ex_dir="../templates/chat/starters/hello") |
@@ -570,6 +572,11 @@ async def append_message( |
570 | 572 | similar) is specified in model's completion method. |
571 | 573 | ::: |
572 | 574 | """ |
| 575 | + # If we're in a stream, queue the message |
| 576 | + if self._current_stream_id: |
| 577 | + self._pending_messages.append((message, False, None)) |
| 578 | + return |
| 579 | + |
573 | 580 | msg = normalize_message(message) |
574 | 581 | msg = await self._transform_message(msg) |
575 | 582 | if msg is None: |
@@ -668,20 +675,20 @@ async def message_stream(self): |
668 | 675 | await self._append_message_chunk( |
669 | 676 | "", |
670 | 677 | chunk="end", |
671 | | - stream_id=self._current_stream_id, |
| 678 | + stream_id=cast(str, self._current_stream_id), |
672 | 679 | ) |
673 | 680 |
|
674 | 681 | async def _append_message_chunk( |
675 | 682 | self, |
676 | 683 | message: Any, |
677 | 684 | *, |
678 | 685 | chunk: Literal[True, "start", "end"] = True, |
| 686 | + stream_id: str, |
679 | 687 | operation: Literal["append", "replace"] = "append", |
680 | | - stream_id: str | None = None, |
681 | 688 | icon: HTML | Tag | TagList | None = None, |
682 | 689 | ) -> None: |
683 | | - # If currently we're in a stream, handle other messages (outside the stream) later |
684 | | - if not self._can_append_message(stream_id): |
| 690 | + # If currently we're in a *different* stream, queue the message chunk |
| 691 | + if self._current_stream_id and self._current_stream_id != stream_id: |
685 | 692 | self._pending_messages.append((message, chunk, stream_id)) |
686 | 693 | return |
687 | 694 |
|
@@ -876,24 +883,21 @@ async def _append_message_stream( |
876 | 883 | await self._flush_pending_messages() |
877 | 884 |
|
878 | 885 | async def _flush_pending_messages(self): |
879 | | - still_pending: list[PendingMessage] = [] |
880 | | - for msg, chunk, stream_id in self._pending_messages: |
881 | | - if self._can_append_message(stream_id): |
882 | | - await self._append_message_chunk(msg, chunk=chunk, stream_id=stream_id) |
| 886 | + pending = self._pending_messages |
| 887 | + self._pending_messages = [] |
| 888 | + for msg, chunk, stream_id in pending: |
| 889 | + if chunk is False: |
| 890 | + await self.append_message(msg) |
883 | 891 | else: |
884 | | - still_pending.append((msg, chunk, stream_id)) |
885 | | - self._pending_messages = still_pending |
886 | | - |
887 | | - def _can_append_message(self, stream_id: str | None) -> bool: |
888 | | - if self._current_stream_id is None: |
889 | | - return True |
890 | | - return self._current_stream_id == stream_id |
| 892 | + await self._append_message_chunk( |
| 893 | + msg, chunk=chunk, stream_id=cast(str, stream_id) |
| 894 | + ) |
891 | 895 |
|
892 | 896 | # Send a message to the UI |
893 | 897 | async def _send_append_message( |
894 | 898 | self, |
895 | 899 | message: TransformedMessage | ChatMessage, |
896 | | - chunk: Literal["start", "end", True, False] = False, |
| 900 | + chunk: ChunkOption = False, |
897 | 901 | operation: Literal["append", "replace"] = "append", |
898 | 902 | icon: HTML | Tag | TagList | None = None, |
899 | 903 | ): |
|
0 commit comments