Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/langbot/pkg/core/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from ..plugin import connector as plugin_connector
from ..pipeline import pool
from ..pipeline import controller, pipelinemgr
from ..pipeline import aggregator as message_aggregator
from ..utils import version as version_mgr, proxy as proxy_mgr
from ..persistence import mgr as persistencemgr
from ..api.http.controller import main as http_controller
Expand Down Expand Up @@ -96,6 +97,8 @@ class Application:

query_pool: pool.QueryPool = None

msg_aggregator: message_aggregator.MessageAggregator = None

ctrl: controller.Controller = None

pipeline_mgr: pipelinemgr.PipelineManager = None
Expand Down
5 changes: 5 additions & 0 deletions src/langbot/pkg/core/stages/build_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .. import stage, app
from ...utils import version, proxy
from ...pipeline import pool, controller, pipelinemgr
from ...pipeline import aggregator as message_aggregator
from ...plugin import connector as plugin_connector
from ...command import cmdmgr
from ...provider.session import sessionmgr as llm_session_mgr
Expand Down Expand Up @@ -137,6 +138,10 @@ async def run(self, ap: app.Application):
await pipeline_mgr.initialize()
ap.pipeline_mgr = pipeline_mgr

# Initialize message aggregator (after pipeline_mgr, as it needs pipeline config)
msg_aggregator_inst = message_aggregator.MessageAggregator(ap)
ap.msg_aggregator = msg_aggregator_inst

rag_mgr_inst = rag_mgr.RAGManager(ap)
await rag_mgr_inst.initialize()
ap.rag_mgr = rag_mgr_inst
Expand Down
280 changes: 280 additions & 0 deletions src/langbot/pkg/pipeline/aggregator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
"""Message Aggregator Module

This module provides message aggregation/debounce functionality.
When users send multiple messages consecutively, the aggregator will wait
for a configurable delay period and merge them into a single message
before processing.
"""

from __future__ import annotations

import asyncio
import time
import typing
from dataclasses import dataclass, field

import langbot_plugin.api.entities.builtin.platform.message as platform_message
import langbot_plugin.api.entities.builtin.platform.events as platform_events
import langbot_plugin.api.entities.builtin.provider.session as provider_session
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter

if typing.TYPE_CHECKING:
from ..core import app


@dataclass
class PendingMessage:
"""A pending message waiting to be aggregated"""

bot_uuid: str
launcher_type: provider_session.LauncherTypes
launcher_id: typing.Union[int, str]
sender_id: typing.Union[int, str]
message_event: platform_events.MessageEvent
message_chain: platform_message.MessageChain
adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter
pipeline_uuid: typing.Optional[str]
timestamp: float = field(default_factory=time.time)


@dataclass
class SessionBuffer:
"""Buffer for a single session's pending messages"""

session_id: str
messages: list[PendingMessage] = field(default_factory=list)
timer_task: typing.Optional[asyncio.Task] = None
last_message_time: float = field(default_factory=time.time)


class MessageAggregator:
"""Message aggregator that buffers and merges consecutive messages

This class implements a debounce mechanism for incoming messages.
When a message arrives, it starts a timer. If more messages arrive
before the timer expires, they are buffered. When the timer expires,
all buffered messages are merged and sent to the query pool.
"""

ap: app.Application

buffers: dict[str, SessionBuffer]
"""Session ID -> SessionBuffer mapping"""

lock: asyncio.Lock
"""Lock for thread-safe buffer operations"""

def __init__(self, ap: app.Application):
self.ap = ap
self.buffers = {}
self.lock = asyncio.Lock()

def _get_session_id(
self,
bot_uuid: str,
launcher_type: provider_session.LauncherTypes,
launcher_id: typing.Union[int, str],
) -> str:
"""Generate a unique session ID"""
Comment on lines +80 to +81
Copy link

Copilot AI Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_get_session_id() currently keys buffers only by bot_uuid, launcher_type, and launcher_id. For group messages this means messages from different senders in the same group can be merged together, and the merged query will also keep sender_id from the first buffered message (misattributing content). Include sender_id in the buffer/session key (at least for group chats) so aggregation matches the “same user” behavior described in the metadata and avoids cross-user merges.

Suggested change
) -> str:
"""Generate a unique session ID"""
sender_id: typing.Union[int, str, None] = None,
) -> str:
"""Generate a unique session ID
For group or multi-user contexts, callers SHOULD supply ``sender_id``
so that aggregation is scoped per sender and avoids cross-user merges.
If ``sender_id`` is omitted, the session ID falls back to the
historical behavior and does not distinguish between senders.
"""
# Include sender_id in the session key when provided to avoid
# aggregating messages from different users in the same launcher
if sender_id is not None:
return f'{bot_uuid}:{launcher_type.value}:{launcher_id}:{sender_id}'

Copilot uses AI. Check for mistakes.
return f'{bot_uuid}:{launcher_type.value}:{launcher_id}'

async def _get_aggregation_config(self, pipeline_uuid: typing.Optional[str]) -> tuple[bool, float]:
"""Get aggregation configuration for a pipeline

Returns:
tuple: (enabled, delay_seconds)
"""
default_enabled = True
default_delay = 1.5

if pipeline_uuid is None:
return default_enabled, default_delay

# Get pipeline from pipeline manager
pipeline = await self.ap.pipeline_mgr.get_pipeline_by_uuid(pipeline_uuid)
if pipeline is None:
return default_enabled, default_delay

config = pipeline.pipeline_entity.config or {}
trigger_config = config.get('trigger', {})
aggregation_config = trigger_config.get('message-aggregation', {})

enabled = aggregation_config.get('enabled', default_enabled)
delay = aggregation_config.get('delay', default_delay)

# Clamp delay to valid range
delay = max(1.0, min(10.0, float(delay)))
Copy link

Copilot AI Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delay = ... float(delay) will raise ValueError if the pipeline config contains a non-numeric value (e.g. user-edited JSON/YAML, corrupted DB value), which would break message processing. Consider wrapping the cast in a try/except and falling back to default_delay before clamping.

Suggested change
delay = aggregation_config.get('delay', default_delay)
# Clamp delay to valid range
delay = max(1.0, min(10.0, float(delay)))
delay_raw = aggregation_config.get('delay', default_delay)
try:
delay = float(delay_raw)
except (TypeError, ValueError):
delay = default_delay
# Clamp delay to valid range
delay = max(1.0, min(10.0, delay))

Copilot uses AI. Check for mistakes.

return enabled, delay

async def add_message(
Copy link

Copilot AI Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new aggregation/debounce behavior is non-trivial (buffering, keying, merge semantics). Add unit tests to cover: (1) messages within the delay window are merged, (2) different senders in the same group are not merged, and (3) reply-critical message_event/message_id stays intact when merging.

Copilot uses AI. Check for mistakes.
self,
bot_uuid: str,
launcher_type: provider_session.LauncherTypes,
launcher_id: typing.Union[int, str],
sender_id: typing.Union[int, str],
message_event: platform_events.MessageEvent,
message_chain: platform_message.MessageChain,
adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter,
pipeline_uuid: typing.Optional[str] = None,
) -> None:
"""Add a message to the aggregation buffer

If aggregation is disabled for the pipeline, the message is sent
directly to the query pool. Otherwise, it's buffered and will be
merged with other messages from the same session.
"""
enabled, delay = await self._get_aggregation_config(pipeline_uuid)

if not enabled:
# Aggregation disabled, send directly to query pool
await self.ap.query_pool.add_query(
bot_uuid=bot_uuid,
launcher_type=launcher_type,
launcher_id=launcher_id,
sender_id=sender_id,
message_event=message_event,
message_chain=message_chain,
adapter=adapter,
pipeline_uuid=pipeline_uuid,
)
return

session_id = self._get_session_id(bot_uuid, launcher_type, launcher_id)

pending_msg = PendingMessage(
bot_uuid=bot_uuid,
launcher_type=launcher_type,
launcher_id=launcher_id,
sender_id=sender_id,
message_event=message_event,
message_chain=message_chain,
adapter=adapter,
pipeline_uuid=pipeline_uuid,
)

async with self.lock:
if session_id in self.buffers:
buffer = self.buffers[session_id]
# Cancel existing timer
if buffer.timer_task and not buffer.timer_task.done():
buffer.timer_task.cancel()
try:
await buffer.timer_task
except asyncio.CancelledError:
pass
buffer.messages.append(pending_msg)
else:
buffer = SessionBuffer(
session_id=session_id,
messages=[pending_msg],
)
self.buffers[session_id] = buffer

buffer.last_message_time = time.time()
# Start new timer
buffer.timer_task = asyncio.create_task(self._delayed_flush(session_id, delay))

async def _delayed_flush(self, session_id: str, delay: float) -> None:
"""Wait for delay then flush the buffer"""
try:
await asyncio.sleep(delay)
await self._flush_buffer(session_id)
except asyncio.CancelledError:
# Timer was cancelled, new message arrived
pass

async def _flush_buffer(self, session_id: str) -> None:
"""Flush the buffer for a session, merging all messages"""
async with self.lock:
buffer = self.buffers.pop(session_id, None)

if buffer is None or not buffer.messages:
return

if len(buffer.messages) == 1:
# Only one message, no need to merge
msg = buffer.messages[0]
await self.ap.query_pool.add_query(
bot_uuid=msg.bot_uuid,
launcher_type=msg.launcher_type,
launcher_id=msg.launcher_id,
sender_id=msg.sender_id,
message_event=msg.message_event,
message_chain=msg.message_chain,
adapter=msg.adapter,
pipeline_uuid=msg.pipeline_uuid,
)
return

# Merge multiple messages
merged_msg = self._merge_messages(buffer.messages)
await self.ap.query_pool.add_query(
bot_uuid=merged_msg.bot_uuid,
launcher_type=merged_msg.launcher_type,
launcher_id=merged_msg.launcher_id,
sender_id=merged_msg.sender_id,
message_event=merged_msg.message_event,
message_chain=merged_msg.message_chain,
adapter=merged_msg.adapter,
pipeline_uuid=merged_msg.pipeline_uuid,
)

def _merge_messages(self, messages: list[PendingMessage]) -> PendingMessage:
"""Merge multiple messages into one

The merged message uses the first message as base, and combines
all message chains with newline separators.
"""
if len(messages) == 1:
return messages[0]

base_msg = messages[0]

# Build merged message chain
merged_chain = platform_message.MessageChain([])

for i, msg in enumerate(messages):
if i > 0:
# Add newline separator between messages
merged_chain.append(platform_message.Plain('\n'))

# Copy all components from this message
for component in msg.message_chain:
merged_chain.append(component)

# Create merged message with updated chain
# We need to update the message_event's message_chain as well
merged_event = base_msg.message_event
merged_event.message_chain = merged_chain

return PendingMessage(
bot_uuid=base_msg.bot_uuid,
launcher_type=base_msg.launcher_type,
launcher_id=base_msg.launcher_id,
sender_id=base_msg.sender_id,
message_event=merged_event,
Copy link

Copilot AI Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_merge_messages() mutates message_event.message_chain to the merged chain. Several adapters rely on message_source.message_chain.message_id when replying/quoting; replacing the chain risks losing the original message_id and other event metadata. Since QueryPool.add_query() already takes message_chain separately, keep message_event unmodified (or choose a specific event to keep for reply reference) and only pass the merged chain via the message_chain field.

Suggested change
# Create merged message with updated chain
# We need to update the message_event's message_chain as well
merged_event = base_msg.message_event
merged_event.message_chain = merged_chain
return PendingMessage(
bot_uuid=base_msg.bot_uuid,
launcher_type=base_msg.launcher_type,
launcher_id=base_msg.launcher_id,
sender_id=base_msg.sender_id,
message_event=merged_event,
# Create merged message using the original event (do not mutate its chain)
return PendingMessage(
bot_uuid=base_msg.bot_uuid,
launcher_type=base_msg.launcher_type,
launcher_id=base_msg.launcher_id,
sender_id=base_msg.sender_id,
message_event=base_msg.message_event,

Copilot uses AI. Check for mistakes.
message_chain=merged_chain,
adapter=base_msg.adapter,
pipeline_uuid=base_msg.pipeline_uuid,
)

async def flush_all(self) -> None:
"""Flush all pending buffers immediately

This is useful during shutdown to ensure no messages are lost.
"""
async with self.lock:
session_ids = list(self.buffers.keys())

for session_id in session_ids:
async with self.lock:
buffer = self.buffers.get(session_id)
if buffer and buffer.timer_task and not buffer.timer_task.done():
buffer.timer_task.cancel()
try:
await buffer.timer_task
except asyncio.CancelledError:
pass

await self._flush_buffer(session_id)
4 changes: 2 additions & 2 deletions src/langbot/pkg/platform/botmgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async def on_friend_message(
if custom_launcher_id:
launcher_id = custom_launcher_id

await self.ap.query_pool.add_query(
await self.ap.msg_aggregator.add_message(
bot_uuid=self.bot_entity.uuid,
launcher_type=provider_session.LauncherTypes.PERSON,
launcher_id=launcher_id,
Expand Down Expand Up @@ -125,7 +125,7 @@ async def on_group_message(
if custom_launcher_id:
launcher_id = custom_launcher_id

await self.ap.query_pool.add_query(
await self.ap.msg_aggregator.add_message(
bot_uuid=self.bot_entity.uuid,
launcher_type=provider_session.LauncherTypes.GROUP,
launcher_id=launcher_id,
Expand Down
4 changes: 4 additions & 0 deletions src/langbot/templates/default-pipeline-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
"prefix": [],
"regexp": []
},
"message-aggregation": {
"enabled": true,
"delay": 1.5
},
"misc": {
"combine-quote-message": true
}
Expand Down
28 changes: 28 additions & 0 deletions src/langbot/templates/metadata/pipeline/trigger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,34 @@ stages:
type: array[string]
required: true
default: []
- name: message-aggregation
label:
en_US: Message Aggregation
zh_Hans: 消息聚合
description:
en_US: When a user sends multiple messages consecutively, wait for a period and merge them into one before processing
zh_Hans: 当用户连续发送多条消息时,等待一段时间后合并为一条消息再处理(防抖)
config:
- name: enabled
label:
en_US: Enable Message Aggregation
zh_Hans: 启用消息聚合
description:
en_US: If enabled, consecutive messages from the same user will be merged after a delay
zh_Hans: 如果启用,同一用户连续发送的消息将在延迟后合并处理
type: boolean
required: true
default: true
- name: delay
label:
en_US: Aggregation Delay (seconds)
zh_Hans: 聚合延迟(秒)
description:
en_US: 'Wait time before merging messages. Range: 1.0-10.0 seconds.'
zh_Hans: '合并消息前的等待时间。范围:1.0-10.0 秒。'
type: float
required: true
default: 1.5
- name: misc
label:
en_US: Misc
Expand Down
Loading