`
+ Kang the sticker (add to your pack).
+"""
+
+import contextlib
+import os
+import random, string
+from secrets import token_hex
+
+from telethon import errors
+from telethon.errors.rpcerrorlist import StickersetInvalidError
+from telethon.tl.functions.messages import GetStickerSetRequest as GetSticker
+from telethon.tl.functions.messages import UploadMediaRequest
+from telethon.tl.functions.stickers import AddStickerToSetRequest as AddSticker
+from telethon.tl.functions.stickers import CreateStickerSetRequest
+from telethon.tl.types import (
+ DocumentAttributeSticker,
+ InputPeerSelf,
+ InputStickerSetEmpty,
+)
+from telethon.errors.rpcerrorlist import ChatSendInlineForbiddenError
+from telethon.tl.types import InputStickerSetItem as SetItem
+from telethon.tl.types import InputStickerSetShortName, User
+from telethon.utils import get_display_name, get_extension, get_input_document
+from telethon.errors import PeerIdInvalidError
+
+from . import LOGS, asst, fetch, udB, ultroid_cmd, get_string,resize_photo_sticker,quotly
+
+async def packExists(packId):
+ source = await fetch(f"https://t.me/addstickers/{packId}")
+ return (
+ not b"""
+ A Telegram user has created the Sticker Set.
+
"""
+ in source
+ )
+
+async def GetUniquePackName():
+ packName = f"{random.choice(string.ascii_lowercase)}{token_hex(random.randint(4, 8))}_by_{asst.me.username}"
+ return await GetUniquePackName() if await packExists(packName) else packName
+
+
+# TODO: simplify if possible
+
+def getName(sender, packType: str):
+ title = f"{get_display_name(sender)}'s Kang Pack"
+ if packType != "static":
+ title += f" ({packType.capitalize()})"
+ return title
+
+async def AddToNewPack(file, emoji, sender_id, title: str):
+ sn = await GetUniquePackName()
+ return await asst(
+ CreateStickerSetRequest(
+ user_id=sender_id,
+ title=title,
+ short_name=sn,
+ stickers=[SetItem(file, emoji=emoji)],
+ software="@TeamUltroid",
+ )
+ )
+
+async def inline_query_fallback(ult):
+ try:
+ result = await ult.client.inline_query(asst.me.username, "startbot")
+ if result:
+ await result[0].click(ult.chat_id, hide_via=True)
+ except (ChatSendInlineForbiddenError):
+ await ult.eor(
+ f"Inline mode is disabled in this chat.\n\n"
+ f"To create or manage your sticker pack, you need to start the assistant bot first.\n\n"
+ f"Click the button below to start it:\n"
+ f"[Start Bot](https://t.me/{asst.me.username})",
+ parse_mode="md"
+ )
+ return
+
+@ultroid_cmd(pattern="kang", manager=True)
+async def kang_func(ult):
+ """kang (reply message)
+ Create sticker and add to pack"""
+ sender = await ult.get_sender()
+ if not isinstance(sender, User):
+ return
+ sender_id = sender.id
+ if not ult.is_reply:
+ return await ult.eor("`Reply to a message..`", time=5)
+ try:
+ emoji = ult.text.split(maxsplit=1)[1]
+ except IndexError:
+ emoji = None
+ reply = await ult.get_reply_message()
+ ult = await ult.eor(get_string("com_1"))
+ type_, dl = "static", None
+ if reply.sticker:
+ file = get_input_document(reply.sticker)
+ if not emoji:
+ emoji = reply.file.emoji
+ name = reply.file.name
+ ext = get_extension(reply.media)
+ attr = list(
+ filter(
+ lambda prop: isinstance(prop, DocumentAttributeSticker),
+ reply.document.attributes,
+ )
+ )
+ inPack = attr and not isinstance(attr[0].stickerset, InputStickerSetEmpty)
+ with contextlib.suppress(KeyError):
+ type_ = {".webm": "video", ".tgs": "animated"}[ext]
+ if type_ or not inPack:
+ dl = await reply.download_media()
+ elif reply.photo:
+ dl = await reply.download_media()
+ name = "sticker.webp"
+ image = resize_photo_sticker(dl)
+ image.save(name, "WEBP")
+ try:
+ os.remove(dl)
+ except:
+ pass
+ dl = name
+ elif reply.text:
+ try:
+ reply = await ult.get_reply_message()
+ replied_to = await reply.get_reply_message()
+ sender_user = await ult.client.get_entity(reply.sender_id)
+ quotly_file = await quotly.create_quotly(
+ reply, bg="black", reply=replied_to, sender=sender_user)
+ except Exception as er:
+ return await ult.edit(f"Quotly error: {er}")
+ message = await reply.reply("Quotly by Ultroid", file=quotly_file)
+ dl = quotly_file
+ else:
+ return await ult.eor("`Reply to sticker or text to add it in your pack...`")
+ if not emoji:
+ emoji = "🏵"
+ if dl:
+ upl = await asst.upload_file(dl)
+ file = get_input_document(await asst(UploadMediaRequest(InputPeerSelf(), upl)))
+ try:
+ os.remove(dl)
+ except:
+ pass
+ get_ = udB.get_key("STICKERS") or {}
+ title = getName(sender, type_)
+ if not get_.get(sender_id) or not get_.get(sender_id, {}).get(type_):
+ try:
+ pack = await AddToNewPack(file, emoji, sender.id, title)
+ except (ValueError, PeerIdInvalidError) as e:
+ await inline_query_fallback(ult)
+ return
+ except Exception as er:
+ return await ult.eor(str(er))
+ sn = pack.set.short_name
+ if not get_.get(sender_id):
+ get_.update({sender_id: {type_: [sn]}})
+ else:
+ get_[sender_id].update({type_: [sn]})
+ udB.set_key("STICKERS", get_)
+ return await ult.edit(
+ f"**Kanged Successfully!\nEmoji :** {emoji}\n**Link :** [Click Here](https://t.me/addstickers/{sn})",
+ link_preview=False
+ )
+ name = get_[sender_id][type_][-1]
+ try:
+ await asst(GetSticker(InputStickerSetShortName(name), hash=0))
+ except StickersetInvalidError:
+ get_[sender_id][type_].remove(name)
+ try:
+ await asst(
+ AddSticker(InputStickerSetShortName(name), SetItem(file, emoji=emoji))
+ )
+ except (errors.StickerpackStickersTooMuchError, errors.StickersTooMuchError):
+ try:
+ pack = await AddToNewPack(file, emoji, sender.id, title)
+ sn = pack.set.short_name
+ except (ValueError, PeerIdInvalidError) as e:
+ await inline_query_fallback(ult)
+ return
+ except Exception as er:
+ return await ult.eor(str(er))
+ get_[sender_id][type_].append(pack.set.short_name)
+ udB.set_key("STICKERS", get_)
+ return await ult.edit(
+ f"**Created New Kang Pack!\nEmoji :** {emoji}\n**Link :** [Click Here](https://t.me/addstickers/{sn})",
+ link_preview=False
+ )
+ except Exception as er:
+ LOGS.exception(er)
+ return await ult.edit(str(er))
+ await ult.edit(
+ f"Sticker Added to Pack Successfully\n**Link :** [Click Here](https://t.me/addstickers/{name})",
+ link_preview=False
+ )
+
+
+@ultroid_cmd(pattern="listpack", manager=True)
+async def do_magic(ult):
+ """Get list of sticker packs."""
+ ko = udB.get_key("STICKERS") or {}
+ if not ko.get(ult.sender_id):
+ return await ult.reply("No Sticker Pack Found!")
+ al_ = []
+ ul = ko[ult.sender_id]
+ for _ in ul.keys():
+ al_.extend(ul[_])
+ msg = "• **Stickers Owned by You!**\n\n"
+ for _ in al_:
+ try:
+ pack = await ult.client(GetSticker(InputStickerSetShortName(_), hash=0))
+ msg += f"• [{pack.set.title}](https://t.me/addstickers/{_})\n"
+ except StickersetInvalidError:
+ for type_ in ["animated", "video", "static"]:
+ if ul.get(type_) and _ in ul[type_]:
+ ul[type_].remove(_)
+ udB.set_key("STICKERS", ko)
+ await ult.reply(msg)
diff --git a/plugins/stickertools.py b/plugins/stickertools.py
index 95839b2da..0a7a9a9e0 100644
--- a/plugins/stickertools.py
+++ b/plugins/stickertools.py
@@ -13,9 +13,6 @@
• `{i}tiny `
To create Tiny stickers.
-• `{i}kang `
- Kang the sticker (add to your pack).
-
• `{i}packkang `
Kang the Complete sticker set (with custom name).
@@ -146,238 +143,6 @@ async def pack_kangish(_):
)
-@ultroid_cmd(
- pattern="kang",
-)
-async def hehe(args):
- ultroid_bot = args.client
- xx = await args.eor(get_string("com_1"))
- user = ultroid_bot.me
- username = user.username
- username = f"@{username}" if username else user.first_name
- message = await args.get_reply_message()
- photo = None
- is_anim, is_vid = False, False
- emoji = None
- if not message:
- return await xx.eor(get_string("sts_6"))
- if message.photo:
- photo = io.BytesIO()
- photo = await ultroid_bot.download_media(message.photo, photo)
- elif message.file and "image" in message.file.mime_type.split("/"):
- photo = io.BytesIO()
- await ultroid_bot.download_file(message.media.document, photo)
- if (
- DocumentAttributeFilename(file_name="sticker.webp")
- in message.media.document.attributes
- ):
- emoji = message.media.document.attributes[1].alt
-
- elif message.file and "video" in message.file.mime_type.split("/"):
- xy = await message.download_media()
- if (message.file.duration or 0) <= 10:
- is_vid = True
- photo = await con.create_webm(xy)
- else:
- y = cv2.VideoCapture(xy)
- heh, lol = y.read()
- cv2.imwrite("ult.webp", lol)
- photo = "ult.webp"
- elif message.file and "tgsticker" in message.file.mime_type:
- await ultroid_bot.download_file(
- message.media.document,
- "AnimatedSticker.tgs",
- )
- attributes = message.media.document.attributes
- for attribute in attributes:
- if isinstance(attribute, DocumentAttributeSticker):
- emoji = attribute.alt
- is_anim = True
- photo = 1
- elif message.message:
- photo = await quotly.create_quotly(message)
- else:
- return await xx.edit(get_string("com_4"))
- if not udB.get_key("language") or udB.get_key("language") == "en":
- ra = random.choice(KANGING_STR)
- else:
- ra = get_string("sts_11")
- await xx.edit(f"`{ra}`")
- if photo:
- splat = args.text.split()
- pack = 1
- if not emoji:
- emoji = "🏵"
- if len(splat) == 3:
- pack = splat[2] # User sent ultroid_both
- emoji = splat[1]
- elif len(splat) == 2:
- if splat[1].isnumeric():
- pack = int(splat[1])
- else:
- emoji = splat[1]
-
- packname = f"ult_{user.id}_{pack}"
- packnick = f"{username}'s Pack {pack}"
- cmd = "/newpack"
- file = io.BytesIO()
-
- if is_vid:
- packname += "_vid"
- packnick += " (Video)"
- cmd = "/newvideo"
- elif is_anim:
- packname += "_anim"
- packnick += " (Animated)"
- cmd = "/newanimated"
- else:
- image = con.resize_photo_sticker(photo)
- file.name = "sticker.png"
- image.save(file, "PNG")
-
- response = await async_searcher(f"http://t.me/addstickers/{packname}")
- htmlstr = response.split("\n")
-
- if (
- " A Telegram user has created the Sticker Set."
- not in htmlstr
- ):
- async with ultroid_bot.conversation("@Stickers") as conv:
- try:
- await conv.send_message("/addsticker")
- except YouBlockedUserError:
- LOGS.info("Unblocking @Stickers for kang...")
- await ultroid_bot(functions.contacts.UnblockRequest("stickers"))
- await conv.send_message("/addsticker")
- await conv.get_response()
- await conv.send_message(packname)
- x = await conv.get_response()
- if x.text.startswith("Alright! Now send me the video sticker."):
- await conv.send_file(photo, force_document=True)
- x = await conv.get_response()
- t = "50" if (is_anim or is_vid) else "120"
- while t in x.message:
- pack += 1
- packname = f"ult_{user.id}_{pack}"
- packnick = f"{username}'s Pack {pack}"
- if is_anim:
- packname += "_anim"
- packnick += " (Animated)"
- elif is_vid:
- packnick += " (Video)"
- packname += "_vid"
- await xx.edit(get_string("sts_13").format(pack))
- await conv.send_message("/addsticker")
- await conv.get_response()
- await conv.send_message(packname)
- x = await conv.get_response()
- if x.text.startswith("Alright! Now send me the video sticker."):
- await conv.send_file(photo, force_document=True)
- x = await conv.get_response()
- if x.text in ["Invalid pack selected.", "Invalid set selected."]:
- await conv.send_message(cmd)
- await conv.get_response()
- await conv.send_message(packnick)
- await conv.get_response()
- if is_anim:
- await conv.send_file("AnimatedSticker.tgs")
- remove("AnimatedSticker.tgs")
- else:
- if is_vid:
- file = photo
- else:
- file.seek(0)
- await conv.send_file(file, force_document=True)
- await conv.get_response()
- await conv.send_message(emoji)
- await conv.get_response()
- await conv.send_message("/publish")
- if is_anim:
- await conv.get_response()
- await conv.send_message(f"<{packnick}>")
- await conv.get_response()
- await conv.send_message("/skip")
- await conv.get_response()
- await conv.send_message(packname)
- await conv.get_response()
- await xx.edit(
- get_string("sts_7").format(packname),
- parse_mode="md",
- )
- return
- if is_anim:
- await conv.send_file("AnimatedSticker.tgs")
- remove("AnimatedSticker.tgs")
- elif "send me an emoji" not in x.message:
- if is_vid:
- file = photo
- else:
- file.seek(0)
- await conv.send_file(file, force_document=True)
- rsp = await conv.get_response()
- if "Sorry, the file type is invalid." in rsp.text:
- await xx.edit(
- get_string("sts_8"),
- )
- return
- await conv.send_message(emoji)
- await conv.get_response()
- await conv.send_message("/done")
- await conv.get_response()
- await ultroid_bot.send_read_acknowledge(conv.chat_id)
- else:
- await xx.edit("`Brewing a new Pack...`")
- async with ultroid_bot.conversation("Stickers") as conv:
- await conv.send_message(cmd)
- await conv.get_response()
- await conv.send_message(packnick)
- await conv.get_response()
- if is_anim:
- await conv.send_file("AnimatedSticker.tgs")
- remove("AnimatedSticker.tgs")
- else:
- if is_vid:
- file = photo
- else:
- file.seek(0)
- await conv.send_file(file, force_document=True)
- rsp = await conv.get_response()
- if "Sorry, the file type is invalid." in rsp.text:
- await xx.edit(
- get_string("sts_8"),
- )
- return
- await conv.send_message(emoji)
- await conv.get_response()
- await conv.send_message("/publish")
- if is_anim:
- await conv.get_response()
- await conv.send_message(f"<{packnick}>")
-
- await conv.get_response()
- await conv.send_message("/skip")
- await conv.get_response()
- await conv.send_message(packname)
- await conv.get_response()
- await ultroid_bot.send_read_acknowledge(conv.chat_id)
- await xx.edit(
- get_string("sts_12").format(emoji, packname),
- parse_mode="md",
- )
- try:
- os.remove(photo)
- except BaseException:
- pass
- try:
- os.remove("AnimatedSticker.tgs")
- except BaseException:
- pass
- try:
- os.remove("ult.webp")
- except BaseException:
- pass
-
-
@ultroid_cmd(
pattern="round$",
)
diff --git a/plugins/utilities.py b/plugins/utilities.py
index f8b62f643..7ba0dd0bd 100644
--- a/plugins/utilities.py
+++ b/plugins/utilities.py
@@ -94,6 +94,7 @@
MessageMediaPhoto,
MessageMediaDocument,
DocumentAttributeVideo,
+
)
from telethon.utils import get_peer_id
@@ -743,14 +744,21 @@ async def get_restricted_msg(event):
chat, msg = get_chat_and_msgid(match)
if not (chat and msg):
return await event.eor(
- "Invalid link!\nEg: `https://t.me/TeamUltroid/3` or `https://t.me/c/1313492028/3`"
+ "Invalid link!\nExamples:\n"
+ "`https://t.me/TeamUltroid/3`\n"
+ "`https://t.me/c/1313492028/3`\n"
+ "`tg://openmessage?user_id=1234567890&message_id=1`"
)
try:
- message = await event.client.get_messages(chat, ids=msg)
+ input_entity = await event.client.get_input_entity(chat)
+ message = await event.client.get_messages(input_entity, ids=msg)
except BaseException as er:
return await event.eor(f"**ERROR**\n`{er}`")
+ if not message:
+ return await event.eor("`Message not found or may not exist.`")
+
try:
await event.client.send_message(event.chat_id, message)
await xx.try_delete()
diff --git a/pyUltroid/fns/helper.py b/pyUltroid/fns/helper.py
index 105208bd1..4b65d9c55 100644
--- a/pyUltroid/fns/helper.py
+++ b/pyUltroid/fns/helper.py
@@ -14,9 +14,9 @@
from traceback import format_exc
from urllib.parse import unquote
from urllib.request import urlretrieve
-
+from PIL import Image
+import requests
from .. import run_as_module
-
if run_as_module:
from ..configs import Var
@@ -617,3 +617,49 @@ async def shutdown(ult):
)
else:
sys.exit()
+
+
+
+
+def resize_photo_sticker(photo):
+ """Resize the given photo to 512x512 (for creating telegram sticker)."""
+ image = Image.open(photo)
+ if (image.width and image.height) < 512:
+ size1 = image.width
+ size2 = image.height
+ if image.width > image.height:
+ scale = 512 / size1
+ size1new = 512
+ size2new = size2 * scale
+ else:
+ scale = 512 / size2
+ size1new = size1 * scale
+ size2new = 512
+ size1new = math.floor(size1new)
+ size2new = math.floor(size2new)
+ sizenew = (size1new, size2new)
+ image = image.resize(sizenew)
+ else:
+ maxsize = (512, 512)
+ image.thumbnail(maxsize)
+ return image
+
+
+
+def fetch_sync(url, re_json=False, evaluate=None, method="GET", *args, **kwargs):
+ methods = {"POST": requests.post, "HEAD": requests.head, "GET": requests.get}
+ method = "POST" if kwargs.pop("post", False) else "GET"
+ output = requests.request(method, url, *args, **kwargs)
+
+
+ if callable(evaluate):
+ return evaluate(output)
+ elif re_json:
+ # type: ignore
+ if "application/json" in output.headers.get("content-type", ""):
+ return output.json()
+ return output.text
+ return output.content
+
+
+async_searcher = fetch = run_async(fetch_sync)
diff --git a/pyUltroid/fns/tools.py b/pyUltroid/fns/tools.py
index 5172d5281..d86d4091d 100644
--- a/pyUltroid/fns/tools.py
+++ b/pyUltroid/fns/tools.py
@@ -1080,14 +1080,19 @@ def safe_load(file, *args, **kwargs):
return out
+
def get_chat_and_msgid(link):
- matches = re.findall("https:\\/\\/t\\.me\\/(c\\/|)(.*)\\/(.*)", link)
- if not matches:
- return None, None
- _, chat, msg_id = matches[0]
- if chat.isdigit():
- chat = int("-100" + chat)
- return chat, int(msg_id)
+ m = re.findall(r"t\.me\/(c\/)?([^\/]+)\/(\d+)", link)
+ if m:
+ is_channel, chat, msg_id = m[0]
+ if is_channel:
+ chat = int("-100" + chat)
+ return chat, int(msg_id)
+
+ m = re.findall(r"user_id=(\d+)&message_id=(\d+)", link)
+ if m:
+ return int(m[0][0]), int(m[0][1])
# --------- END --------- #
+
diff --git a/requirements.txt b/requirements.txt
index cfa340590..43bc09068 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -12,9 +12,11 @@ aiohttp
aiohttp-cors
catbox-uploader
cloudscraper
-pynacltelegraph
+pynacl
+telegraph
enhancer
requests
aiohttp
catbox-uploader
-cloudscraper
\ No newline at end of file
+cloudscraper
+redis