@@ -22,6 +22,9 @@ class MessageBus:
2222 - Handler registration
2323 - Channel routing
2424 - Error handling
25+ - **Latest-wins per session**: when a new message arrives for a session
26+ that already has an in-flight or queued message, the older message is
27+ cancelled/skipped and only the newest message is processed.
2528 """
2629
2730 def __init__ (self , max_queue_size : int = 100 , max_concurrency : int = 4 ):
@@ -41,6 +44,21 @@ def __init__(self, max_queue_size: int = 100, max_concurrency: int = 4):
4144 self ._semaphore = asyncio .Semaphore (max_concurrency )
4245 self ._active_tasks : set [asyncio .Task ] = set ()
4346
47+ # Message coalescing: when multiple messages arrive for the same
48+ # session before processing begins, they are merged into a single
49+ # message so the agent sees the full context (e.g. a follow-up
50+ # clarification is kept together with the original request).
51+ # If a task is already in-flight, it is cancelled and the new
52+ # (merged) message is processed instead.
53+ self ._seq_counter : int = 0
54+ self ._latest_seq : dict [str , int ] = {}
55+ self ._session_locks : dict [str , asyncio .Lock ] = {}
56+ self ._session_tasks : dict [str , asyncio .Task ] = {}
57+ # Per-session accumulator: messages are buffered here on publish()
58+ # and drained at processing time so that all pending messages for a
59+ # session are coalesced into one.
60+ self ._session_buffer : dict [str , list [InboundMessage ]] = {}
61+
4462 def set_handler (self , handler : MessageHandler ) -> None :
4563 """
4664 Set the message handler (typically the agent).
@@ -74,12 +92,39 @@ async def publish(self, message: InboundMessage) -> bool:
7492 would freeze the channel's event handler (e.g. Discord gateway
7593 heartbeats), potentially causing a disconnect.
7694
95+ **Message coalescing**: the message is added to a per-session
96+ buffer *and* enqueued. When processing starts, all buffered
97+ messages for the session are merged into one so the agent sees
98+ the full context (follow-ups, corrections, etc.). If a task is
99+ already running for this session, it is cancelled and the new
100+ (coalesced) message takes over.
101+
77102 Args:
78103 message: Inbound message from a channel.
79104
80105 Returns:
81106 True if the message was enqueued, False if the queue is full.
82107 """
108+ # Assign a sequence number for ordering
109+ self ._seq_counter += 1
110+ message ._bus_seq = self ._seq_counter
111+ session_key = message .session_key or message .channel
112+ self ._latest_seq [session_key ] = self ._seq_counter
113+
114+ # Accumulate in per-session buffer for coalescing at processing time
115+ self ._session_buffer .setdefault (session_key , []).append (message )
116+
117+ # Cancel the currently running task for this session (if any).
118+ # The cancelled task will release its session lock, allowing the
119+ # new (coalesced) message to proceed once it is dequeued.
120+ existing_task = self ._session_tasks .get (session_key )
121+ if existing_task and not existing_task .done ():
122+ existing_task .cancel ()
123+ logger .info (
124+ f"Cancelling in-flight task for session { session_key } — "
125+ f"newer message arrived: { message .content [:50 ]} ..."
126+ )
127+
83128 try :
84129 self ._queue .put_nowait (message )
85130 logger .debug (f"Published message from { message .channel } : { message .content [:50 ]} ..." )
@@ -110,6 +155,9 @@ async def _process_message(self, message: InboundMessage) -> None:
110155 else :
111156 logger .warning (f"No outbound handler for channel: { target_channel } " )
112157
158+ except asyncio .CancelledError :
159+ # Let CancelledError propagate — the caller handles it.
160+ raise
113161 except Exception as e :
114162 logger .error (f"Error processing message: { e } " )
115163 # Send error response to ensure channel cleanup (typing, reactions).
@@ -129,11 +177,132 @@ async def _process_message(self, message: InboundMessage) -> None:
129177 except Exception as send_err :
130178 logger .error (f"Failed to send error response: { send_err } " )
131179
180+ def _get_session_lock (self , session_key : str ) -> asyncio .Lock :
181+ """Return (or create) a per-session lock for serialised processing."""
182+ lock = self ._session_locks .get (session_key )
183+ if lock is None :
184+ lock = asyncio .Lock ()
185+ self ._session_locks [session_key ] = lock
186+ return lock
187+
188+ @staticmethod
189+ def _coalesce_messages (messages : list [InboundMessage ]) -> InboundMessage :
190+ """Merge a list of messages into one, preserving the latest metadata.
191+
192+ The content of all messages is joined with newlines so the agent
193+ sees the full context. Media attachments are concatenated. All
194+ other fields (channel, session_key, metadata, …) are taken from
195+ the **last** message since it is the most recent user intent.
196+ """
197+ if len (messages ) == 1 :
198+ return messages [0 ]
199+
200+ base = messages [- 1 ] # newest message is the base
201+ merged_content = "\n " .join (m .content for m in messages )
202+
203+ # Merge media from all messages (deduplicated, order preserved)
204+ seen : set [str ] = set ()
205+ merged_media : list [str ] = []
206+ for m in messages :
207+ for path in m .media :
208+ if path not in seen :
209+ seen .add (path )
210+ merged_media .append (path )
211+
212+ # Build the coalesced message from the newest, replacing content/media
213+ coalesced = InboundMessage (
214+ content = merged_content ,
215+ channel = base .channel ,
216+ session_key = base .session_key ,
217+ sender_id = base .sender_id ,
218+ sender_name = base .sender_name ,
219+ message_id = base .message_id ,
220+ timestamp = base .timestamp ,
221+ media = merged_media ,
222+ metadata = base .metadata .copy () if base .metadata else {},
223+ )
224+ coalesced ._bus_seq = base ._bus_seq
225+ return coalesced
226+
132227 async def _process_with_semaphore (self , message : InboundMessage ) -> None :
133- """Process a single message under the concurrency semaphore."""
228+ """Process a single message under the concurrency semaphore.
229+
230+ **Per-session serialisation**: messages belonging to the same
231+ ``session_key`` are processed one at a time via a per-session lock
232+ so that a fast second message cannot run concurrently with the
233+ first.
234+
235+ **Message coalescing**: before starting actual work, all pending
236+ messages for this session are drained from ``_session_buffer``
237+ and merged into one. This means follow-up messages ("also use
238+ TypeScript") are kept together with the original request.
239+
240+ If this trigger message is not the latest for its session (i.e.
241+ a newer trigger was already enqueued), it yields to the newer
242+ trigger which will perform the coalescing instead.
243+
244+ **Cancellation-safe**: if this task is cancelled (because a newer
245+ message triggered cancellation via ``publish()``), the session
246+ lock and semaphore are properly released and the queue bookkeeping
247+ is maintained.
248+ """
249+ session_key = message .session_key or message .channel
250+ session_lock = self ._get_session_lock (session_key )
251+
134252 try :
135- async with self ._semaphore :
136- await self ._process_message (message )
253+ # Acquire per-session lock first (no semaphore slot consumed
254+ # while waiting, so other sessions are not starved).
255+ async with session_lock :
256+ # Only the trigger with the highest seq should coalesce
257+ # and process. Earlier triggers for the same session
258+ # exit here — the latest trigger will pick up all
259+ # buffered messages.
260+ msg_seq = message ._bus_seq
261+ latest = self ._latest_seq .get (session_key , 0 )
262+ if msg_seq < latest :
263+ logger .info (
264+ f"Skipping earlier trigger (seq={ msg_seq } , "
265+ f"latest={ latest } ) for session { session_key } "
266+ )
267+ return
268+
269+ # Drain the per-session buffer and coalesce
270+ buffered = self ._session_buffer .pop (session_key , [])
271+ if buffered :
272+ message = self ._coalesce_messages (buffered )
273+ if len (buffered ) > 1 :
274+ logger .info (
275+ f"Coalesced { len (buffered )} messages for "
276+ f"session { session_key } "
277+ )
278+
279+ # Register as the active task for this session
280+ current_task = asyncio .current_task ()
281+ self ._session_tasks [session_key ] = current_task # type: ignore[assignment]
282+
283+ try :
284+ async with self ._semaphore :
285+ await self ._process_message (message )
286+ except asyncio .CancelledError :
287+ logger .info (
288+ f"Task cancelled for session { session_key } : "
289+ f"{ message .content [:50 ]} ..."
290+ )
291+ # Do NOT re-raise inside the session_lock context —
292+ # we want to release the lock cleanly so the next
293+ # message can proceed.
294+ return
295+ finally :
296+ # Only clear if we are still the registered task
297+ if self ._session_tasks .get (session_key ) is current_task :
298+ self ._session_tasks .pop (session_key , None )
299+ except asyncio .CancelledError :
300+ # Cancelled while waiting for the session lock — nothing to
301+ # clean up, just exit silently.
302+ logger .debug (
303+ f"Task cancelled while waiting for lock, "
304+ f"session { session_key } : { message .content [:50 ]} ..."
305+ )
137306 finally :
138307 self ._queue .task_done ()
139308
0 commit comments