Skip to content

Commit 6b0e3d4

Browse files
committed
feat(telegram): thread-level isolation
1 parent 772e4fc commit 6b0e3d4

File tree

2 files changed

+41
-8
lines changed

2 files changed

+41
-8
lines changed

src/langbot/pkg/platform/botmgr.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,17 @@ async def on_friend_message(
7575

7676
# Only add to query pool if no webhook requested to skip pipeline
7777
if not skip_pipeline:
78+
launcher_id = event.sender.id
79+
80+
if hasattr(adapter, 'get_launcher_id'):
81+
custom_launcher_id = adapter.get_launcher_id(event)
82+
if custom_launcher_id:
83+
launcher_id = custom_launcher_id
84+
7885
await self.ap.query_pool.add_query(
7986
bot_uuid=self.bot_entity.uuid,
8087
launcher_type=provider_session.LauncherTypes.PERSON,
81-
launcher_id=event.sender.id,
88+
launcher_id=launcher_id,
8289
sender_id=event.sender.id,
8390
message_event=event,
8491
message_chain=event.message_chain,
@@ -111,10 +118,17 @@ async def on_group_message(
111118

112119
# Only add to query pool if no webhook requested to skip pipeline
113120
if not skip_pipeline:
121+
launcher_id = event.group.id
122+
123+
if hasattr(adapter, 'get_launcher_id'):
124+
custom_launcher_id = adapter.get_launcher_id(event)
125+
if custom_launcher_id:
126+
launcher_id = custom_launcher_id
127+
114128
await self.ap.query_pool.add_query(
115129
bot_uuid=self.bot_entity.uuid,
116130
launcher_type=provider_session.LauncherTypes.GROUP,
117-
launcher_id=event.group.id,
131+
launcher_id=launcher_id,
118132
sender_id=event.sender.id,
119133
message_event=event,
120134
message_chain=event.message_chain,

src/langbot/pkg/platform/sources/telegram.py

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -197,11 +197,12 @@ async def reply_message(
197197
}
198198
if self.config['markdown_card'] is True:
199199
args['parse_mode'] = 'MarkdownV2'
200+
201+
if message_source.source_platform_object.message.message_thread_id:
202+
args['message_thread_id'] = message_source.source_platform_object.message.message_thread_id
203+
200204
if quote_origin:
201205
args['reply_to_message_id'] = message_source.source_platform_object.message.id
202-
203-
if message_source.source_platform_object.message.message_thread_id:
204-
args['message_thread_id'] = message_source.source_platform_object.message.message_thread_id
205206

206207
await self.bot.send_message(**args)
207208

@@ -234,12 +235,12 @@ async def reply_message_chunk(
234235
'chat_id': message_source.source_platform_object.effective_chat.id,
235236
'text': content,
236237
}
238+
if message_source.source_platform_object.message.message_thread_id:
239+
args['message_thread_id'] = message_source.source_platform_object.message.message_thread_id
240+
237241
if quote_origin:
238242
args['reply_to_message_id'] = message_source.source_platform_object.message.id
239243

240-
if message_source.source_platform_object.message.message_thread_id:
241-
args['message_thread_id'] = message_source.source_platform_object.message.message_thread_id
242-
243244
if self.config['markdown_card'] is True:
244245
args['parse_mode'] = 'MarkdownV2'
245246

@@ -267,6 +268,24 @@ async def reply_message_chunk(
267268
# self.seq = 1 # 消息回复结束之后重置seq
268269
self.msg_stream_id.pop(message_id) # 消息回复结束之后删除流式消息id
269270

271+
def get_launcher_id(self, event: platform_events.MessageEvent) -> str | None:
272+
if not isinstance(event.source_platform_object, Update):
273+
return None
274+
275+
message = event.source_platform_object.message
276+
if not message:
277+
return None
278+
279+
# specifically handle telegram forum topic and private thread(not supported by official client yet but supported by bot api)
280+
if message.message_thread_id:
281+
# check if it is a group
282+
if isinstance(event, platform_events.GroupMessage):
283+
return f'{event.group.id}#{message.message_thread_id}'
284+
elif isinstance(event, platform_events.FriendMessage):
285+
return f'{event.sender.id}#{message.message_thread_id}'
286+
287+
return None
288+
270289
async def is_stream_output_supported(self) -> bool:
271290
is_stream = False
272291
if self.config.get('enable-stream-reply', None):

0 commit comments

Comments
 (0)