|
5 | 5 | from typing import Any, Optional, Union |
6 | 6 | from pyrogram.raw.types.messages import Messages |
7 | 7 | from WebStreamer.server.exceptions import FIleNotFound |
| 8 | +from datetime import datetime |
8 | 9 |
|
9 | 10 |
|
10 | 11 | async def parse_file_id(message: "Message") -> Optional[FileId]: |
@@ -56,6 +57,34 @@ def get_hash(media_msg: Union[str, Message], length: int) -> str: |
56 | 57 | long_hash = hashlib.sha256(unique_id.encode("UTF-8")).hexdigest() |
57 | 58 | return long_hash[:length] |
58 | 59 |
|
59 | | -def get_name(media_msg: Message) -> str: |
60 | | - media = get_media_from_message(media_msg) |
61 | | - return getattr(media, 'file_name', "") or "" |
| 60 | + |
| 61 | +def get_name(media_msg: Message | FileId) -> str: |
| 62 | + |
| 63 | + if isinstance(media_msg, Message): |
| 64 | + media = get_media_from_message(media_msg) |
| 65 | + file_name = getattr(media, "file_name", "") |
| 66 | + |
| 67 | + elif isinstance(media_msg, FileId): |
| 68 | + file_name = getattr(media_msg, "file_name", "") |
| 69 | + |
| 70 | + if not file_name: |
| 71 | + if isinstance(media_msg, Message) and media_msg.media: |
| 72 | + media_type = media_msg.media.value |
| 73 | + elif media_msg.file_type: |
| 74 | + media_type = media_msg.file_type.name.lower() |
| 75 | + else: |
| 76 | + media_type = "file" |
| 77 | + |
| 78 | + formats = { |
| 79 | + "photo": "jpg", "audio": "mp3", "voice": "ogg", |
| 80 | + "video": "mp4", "animation": "mp4", "video_note": "mp4", |
| 81 | + "sticker": "webp" |
| 82 | + } |
| 83 | + |
| 84 | + ext = formats.get(media_type) |
| 85 | + ext = "." + ext if ext else "" |
| 86 | + |
| 87 | + date = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") |
| 88 | + file_name = f"{media_type}-{date}{ext}" |
| 89 | + |
| 90 | + return file_name |
0 commit comments