Skip to content

Commit f09ba6a

Browse files
authored
fix: Add the file upload function and optimize the media message proc… (langbot-app#2002)
* fix: Add the file upload function and optimize the media message processing * fix: Optimize the message processing logic, improve the concatenation of text elements and the sending of media messages * fix: Simplify the file request construction and message processing logic to enhance code readability
1 parent 1eda076 commit f09ba6a

File tree

1 file changed

+196
-37
lines changed
  • src/langbot/pkg/platform/sources

1 file changed

+196
-37
lines changed

src/langbot/pkg/platform/sources/lark.py

Lines changed: 196 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import annotations
22

33
import lark_oapi
4-
from lark_oapi.api.im.v1 import CreateImageRequest, CreateImageRequestBody
4+
from lark_oapi.api.im.v1 import CreateImageRequest, CreateImageRequestBody, CreateFileRequest, CreateFileRequestBody
55
import traceback
66
import typing
77
import asyncio
@@ -141,6 +141,88 @@ async def upload_image_to_lark(msg: platform_message.Image, api_client: lark_oap
141141
traceback.print_exc()
142142
return None
143143

144+
@staticmethod
145+
async def upload_file_to_lark(
146+
file_bytes: bytes,
147+
api_client: lark_oapi.Client,
148+
file_type: str,
149+
file_name: str = 'file',
150+
duration: typing.Optional[int] = None,
151+
) -> typing.Optional[str]:
152+
"""Upload a file to Lark and return the file_key, or None if upload fails.
153+
154+
Args:
155+
file_bytes: Raw file bytes.
156+
api_client: Lark API client.
157+
file_type: Lark file type, e.g. 'opus', 'mp4', 'pdf', 'doc', etc.
158+
file_name: Display name for the file.
159+
duration: Duration in milliseconds (for audio files).
160+
"""
161+
try:
162+
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
163+
temp_file.write(file_bytes)
164+
temp_file_path = temp_file.name
165+
166+
try:
167+
body_builder = (
168+
CreateFileRequestBody.builder()
169+
.file_type(file_type)
170+
.file_name(file_name)
171+
.file(open(temp_file_path, 'rb'))
172+
)
173+
if duration is not None:
174+
body_builder = body_builder.duration(duration)
175+
176+
request = CreateFileRequest.builder().request_body(body_builder.build()).build()
177+
178+
response = await api_client.im.v1.file.acreate(request)
179+
180+
if not response.success():
181+
print(
182+
f'client.im.v1.file.create failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}'
183+
)
184+
return None
185+
186+
return response.data.file_key
187+
finally:
188+
os.unlink(temp_file_path)
189+
except Exception as e:
190+
print(f'Failed to upload file to Lark: {e}')
191+
traceback.print_exc()
192+
return None
193+
194+
@staticmethod
195+
async def _get_media_bytes(
196+
msg: typing.Union[platform_message.Voice, platform_message.File],
197+
) -> typing.Optional[bytes]:
198+
"""Get bytes from a Voice or File message (base64, url, or path)."""
199+
data = None
200+
201+
if msg.base64:
202+
try:
203+
base64_str = msg.base64
204+
if ',' in base64_str:
205+
base64_str = base64_str.split(',', 1)[1]
206+
data = base64.b64decode(base64_str)
207+
except Exception:
208+
pass
209+
elif msg.url:
210+
try:
211+
async with aiohttp.ClientSession() as session:
212+
async with session.get(msg.url) as resp:
213+
if resp.status == 200:
214+
data = await resp.read()
215+
except Exception:
216+
pass
217+
elif msg.path:
218+
try:
219+
with open(msg.path, 'rb') as f:
220+
data = f.read()
221+
except Exception:
222+
pass
223+
224+
return data
225+
144226
@staticmethod
145227
async def yiri2target(
146228
message_chain: platform_message.MessageChain, api_client: lark_oapi.Client
@@ -150,10 +232,10 @@ async def yiri2target(
150232
Returns:
151233
Tuple of (text_elements, image_keys):
152234
- text_elements: List of paragraphs for post message format
153-
- image_keys: List of image_key strings for separate image messages
235+
- media_items: List of dicts with 'msg_type' and 'content' for separate media messages
154236
"""
155237
message_elements = []
156-
image_keys = []
238+
media_items = []
157239
pending_paragraph = []
158240

159241
# Regex pattern to match Markdown image syntax: ![alt](url)
@@ -196,40 +278,77 @@ async def process_text_with_images(text: str) -> typing.Tuple[str, list]:
196278
# Check for and extract Markdown images from text
197279
cleaned_text, extracted_urls = await process_text_with_images(text)
198280

199-
# Add cleaned text if not empty
281+
# Split by blank lines to create separate paragraphs for Lark post format.
282+
# Lark truncates md elements at the first \n\n, so we must use the
283+
# post format's native paragraph structure instead.
200284
if cleaned_text:
201-
pending_paragraph.append({'tag': 'md', 'text': cleaned_text})
285+
segments = re.split(r'\n\s*\n', cleaned_text)
286+
for i, segment in enumerate(segments):
287+
segment = segment.strip()
288+
if not segment:
289+
continue
290+
if i > 0 and pending_paragraph:
291+
message_elements.append(pending_paragraph)
292+
pending_paragraph = []
293+
pending_paragraph.append({'tag': 'md', 'text': segment})
202294

203295
# Process extracted image URLs
204296
for url in extracted_urls:
205-
# Create a temporary Image message to upload
206297
temp_image = platform_message.Image(url=url)
207298
image_key = await LarkMessageConverter.upload_image_to_lark(temp_image, api_client)
208299
if image_key:
209-
image_keys.append(image_key)
300+
media_items.append({'msg_type': 'image', 'content': {'image_key': image_key}})
210301

211302
elif isinstance(msg, platform_message.At):
212303
pending_paragraph.append({'tag': 'at', 'user_id': msg.target, 'style': []})
213304
elif isinstance(msg, platform_message.AtAll):
214305
pending_paragraph.append({'tag': 'at', 'user_id': 'all', 'style': []})
215306
elif isinstance(msg, platform_message.Image):
216-
# Upload image and get image_key
217307
image_key = await LarkMessageConverter.upload_image_to_lark(msg, api_client)
218308
if image_key:
219-
# Store image_key for separate image message
220-
image_keys.append(image_key)
309+
media_items.append({'msg_type': 'image', 'content': {'image_key': image_key}})
310+
elif isinstance(msg, platform_message.Voice):
311+
data = await LarkMessageConverter._get_media_bytes(msg)
312+
if data:
313+
duration = int(msg.length * 1000) if msg.length else None
314+
file_key = await LarkMessageConverter.upload_file_to_lark(
315+
data, api_client, file_type='opus', file_name='voice.opus', duration=duration
316+
)
317+
if file_key:
318+
media_items.append({'msg_type': 'audio', 'content': {'file_key': file_key}})
319+
elif isinstance(msg, platform_message.File):
320+
data = await LarkMessageConverter._get_media_bytes(msg)
321+
if data:
322+
file_name = msg.name or 'file'
323+
# Guess file_type from extension
324+
ext = os.path.splitext(file_name)[1].lstrip('.').lower() if file_name else ''
325+
file_type_map = {
326+
'opus': 'opus',
327+
'mp4': 'mp4',
328+
'pdf': 'pdf',
329+
'doc': 'doc',
330+
'docx': 'doc',
331+
'xls': 'xls',
332+
'xlsx': 'xls',
333+
'ppt': 'ppt',
334+
'pptx': 'ppt',
335+
}
336+
file_type = file_type_map.get(ext, 'stream')
337+
file_key = await LarkMessageConverter.upload_file_to_lark(
338+
data, api_client, file_type=file_type, file_name=file_name
339+
)
340+
if file_key:
341+
media_items.append({'msg_type': 'file', 'content': {'file_key': file_key}})
221342
elif isinstance(msg, platform_message.Forward):
222343
for node in msg.node_list:
223-
sub_elements, sub_image_keys = await LarkMessageConverter.yiri2target(
224-
node.message_chain, api_client
225-
)
344+
sub_elements, sub_media = await LarkMessageConverter.yiri2target(node.message_chain, api_client)
226345
message_elements.extend(sub_elements)
227-
image_keys.extend(sub_image_keys)
346+
media_items.extend(sub_media)
228347

229348
if pending_paragraph:
230349
message_elements.append(pending_paragraph)
231350

232-
return message_elements, image_keys
351+
return message_elements, media_items
233352

234353
@staticmethod
235354
async def target2yiri(
@@ -917,23 +1036,40 @@ async def reply_message(
9171036
):
9181037
# 不再需要了,因为message_id已经被包含到message_chain中
9191038
# lark_event = await self.event_converter.yiri2target(message_source)
920-
text_elements, image_keys = await self.message_converter.yiri2target(message, self.api_client)
1039+
text_elements, media_items = await self.message_converter.yiri2target(message, self.api_client)
9211040

9221041
# Send text message if there are text elements
9231042
if text_elements:
924-
final_content = {
925-
'zh_Hans': {
926-
'title': '',
927-
'content': text_elements,
928-
},
929-
}
1043+
# Determine msg_type based on content: use 'post' if at mentions
1044+
# are present (requires post paragraph structure), otherwise 'text'
1045+
needs_post = any(ele['tag'] == 'at' for paragraph in text_elements for ele in paragraph)
1046+
1047+
if needs_post:
1048+
msg_type = 'post'
1049+
final_content = json.dumps(
1050+
{
1051+
'zh_Hans': {
1052+
'title': '',
1053+
'content': text_elements,
1054+
},
1055+
}
1056+
)
1057+
else:
1058+
msg_type = 'text'
1059+
parts = []
1060+
for paragraph in text_elements:
1061+
para_text = ''.join(ele.get('text', '') for ele in paragraph)
1062+
if para_text:
1063+
parts.append(para_text)
1064+
final_content = json.dumps({'text': '\n\n'.join(parts)})
1065+
9301066
request: ReplyMessageRequest = (
9311067
ReplyMessageRequest.builder()
9321068
.message_id(message_source.message_chain.message_id)
9331069
.request_body(
9341070
ReplyMessageRequestBody.builder()
935-
.content(json.dumps(final_content))
936-
.msg_type('post')
1071+
.content(final_content)
1072+
.msg_type(msg_type)
9371073
.reply_in_thread(False)
9381074
.uuid(str(uuid.uuid4()))
9391075
.build()
@@ -963,17 +1099,15 @@ async def reply_message(
9631099
f'client.im.v1.message.reply failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}, resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}'
9641100
)
9651101

966-
# Send image messages separately using msg_type='image'
967-
for image_key in image_keys:
968-
image_content = json.dumps({'image_key': image_key})
969-
1102+
# Send media messages separately (image, audio, file, etc.)
1103+
for media in media_items:
9701104
request: ReplyMessageRequest = (
9711105
ReplyMessageRequest.builder()
9721106
.message_id(message_source.message_chain.message_id)
9731107
.request_body(
9741108
ReplyMessageRequestBody.builder()
975-
.content(image_content)
976-
.msg_type('image')
1109+
.content(json.dumps(media['content']))
1110+
.msg_type(media['msg_type'])
9771111
.reply_in_thread(False)
9781112
.uuid(str(uuid.uuid4()))
9791113
.build()
@@ -1000,7 +1134,7 @@ async def reply_message(
10001134

10011135
if not response.success():
10021136
raise Exception(
1003-
f'client.im.v1.message.reply (image) failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}, resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}'
1137+
f'client.im.v1.message.reply ({media["msg_type"]}) failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}, resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}'
10041138
)
10051139

10061140
async def reply_message_chunk(
@@ -1018,15 +1152,16 @@ async def reply_message_chunk(
10181152
message_id = bot_message.resp_message_id
10191153
msg_seq = bot_message.msg_sequence
10201154
if msg_seq % 8 == 0 or is_final:
1021-
text_elements, image_keys = await self.message_converter.yiri2target(message, self.api_client)
1155+
text_elements, media_items = await self.message_converter.yiri2target(message, self.api_client)
10221156

10231157
text_message = ''
10241158
if text_elements:
1025-
for ele in text_elements[0]:
1026-
if ele['tag'] == 'text':
1027-
text_message += ele['text']
1028-
elif ele['tag'] == 'md':
1029-
text_message += ele['text']
1159+
parts = []
1160+
for paragraph in text_elements:
1161+
para_text = ''.join(ele['text'] for ele in paragraph if ele['tag'] in ('text', 'md'))
1162+
if para_text:
1163+
parts.append(para_text)
1164+
text_message = '\n\n'.join(parts)
10301165

10311166
# content = {
10321167
# 'type': 'card_json',
@@ -1076,6 +1211,30 @@ async def reply_message_chunk(
10761211
)
10771212
return
10781213

1214+
# Send media messages when streaming is done
1215+
if is_final and media_items:
1216+
for media in media_items:
1217+
media_request: ReplyMessageRequest = (
1218+
ReplyMessageRequest.builder()
1219+
.message_id(message_source.message_chain.message_id)
1220+
.request_body(
1221+
ReplyMessageRequestBody.builder()
1222+
.content(json.dumps(media['content']))
1223+
.msg_type(media['msg_type'])
1224+
.reply_in_thread(False)
1225+
.uuid(str(uuid.uuid4()))
1226+
.build()
1227+
)
1228+
.build()
1229+
)
1230+
media_response: ReplyMessageResponse = await self.api_client.im.v1.message.areply(
1231+
media_request, req_opt
1232+
)
1233+
if not media_response.success():
1234+
raise Exception(
1235+
f'client.im.v1.message.reply ({media["msg_type"]}) failed, code: {media_response.code}, msg: {media_response.msg}, log_id: {media_response.get_log_id()}'
1236+
)
1237+
10791238
async def is_muted(self, group_id: int) -> bool:
10801239
return False
10811240

0 commit comments

Comments
 (0)