|
26 | 26 | import base64
|
27 | 27 | import binascii
|
28 | 28 | import datetime
|
| 29 | +import logging |
29 | 30 | import re
|
30 | 31 | from textwrap import shorten
|
31 | 32 | from typing import TYPE_CHECKING, Any, Self, TypeAlias
|
32 | 33 |
|
33 | 34 | import discord
|
| 35 | +import mystbin |
34 | 36 | import yarl
|
35 | 37 | from discord.ext import commands
|
36 | 38 |
|
|
45 | 47 |
|
46 | 48 | ModLogType: TypeAlias = PythonistaAPIWebsocketPayload[ModLogPayload]
|
47 | 49 |
|
| 50 | +logger = logging.getLogger(__name__) |
| 51 | + |
| 52 | +BASE_BADBIN_RE = r"https://(?P<site>{domains})/(?P<slug>[a-zA-Z0-9]+)[.]?(?P<ext>[a-z]{{1,8}})?" |
48 | 53 | TOKEN_RE = re.compile(r"[a-zA-Z0-9_-]{23,28}\.[a-zA-Z0-9_-]{6,7}\.[a-zA-Z0-9_-]{27}")
|
49 | 54 | PROSE_LOOKUP = {
|
50 | 55 | 1: "banned",
|
@@ -105,6 +110,12 @@ def __init__(self, bot: core.Bot, /) -> None:
|
105 | 110 | self.dpy_mod_cache: dict[int, discord.User | discord.Member] = {}
|
106 | 111 | self._req_lock = asyncio.Lock()
|
107 | 112 |
|
| 113 | + domains = core.CONFIG["BADBIN"]["domains"] |
| 114 | + formatted = BASE_BADBIN_RE.format(domains="|".join(domains)) |
| 115 | + |
| 116 | + self.BADBIN_RE = re.compile(formatted) |
| 117 | + logger.info("Badbin initialized with following domains: %s", ", ".join(domains)) |
| 118 | + |
108 | 119 | async def github_request(
|
109 | 120 | self,
|
110 | 121 | method: str,
|
@@ -195,6 +206,48 @@ async def find_discord_tokens(self, message: discord.Message) -> None:
|
195 | 206 | )
|
196 | 207 | await message.reply(msg)
|
197 | 208 |
|
| 209 | + async def pull_badbin_content(self, site: str, slug: str, *, fail_hard: bool = True) -> str: |
| 210 | + async with self.bot.session.get(f"https://{site}/raw/{slug}") as f: |
| 211 | + if 200 > f.status > 299: |
| 212 | + if fail_hard: |
| 213 | + f.raise_for_status() |
| 214 | + else: |
| 215 | + err = f"Could not read {slug} from {site}. Details: {f.status}\n\n{await f.read()}" |
| 216 | + logger.error(err) |
| 217 | + return err # if we don't fail hard, we'll return the error message in the new paste. |
| 218 | + |
| 219 | + return (await f.read()).decode() |
| 220 | + |
| 221 | + async def post_mystbin_content(self, contents: list[tuple[str, str]]) -> tuple[str, str | None]: |
| 222 | + response = await self.bot.mb_client.create_paste( |
| 223 | + files=[mystbin.File(filename=a, content=b, attachment_url=None) for a, b in contents] |
| 224 | + ) |
| 225 | + return response.id, response.notice or None |
| 226 | + |
| 227 | + @commands.Cog.listener("on_message") |
| 228 | + async def find_badbins(self, message: discord.Message) -> None: |
| 229 | + matches = self.BADBIN_RE.findall(message.content) |
| 230 | + |
| 231 | + if matches: |
| 232 | + contents: list[tuple[str, str]] = [] |
| 233 | + |
| 234 | + for match in matches: |
| 235 | + site, slug, ext = match |
| 236 | + |
| 237 | + if site is None or slug is None: |
| 238 | + continue |
| 239 | + |
| 240 | + contents.append((await self.pull_badbin_content(site, slug), f"migrated.{ext or 'txt'}")) |
| 241 | + |
| 242 | + if contents: |
| 243 | + key, notice = await self.post_mystbin_content(contents) |
| 244 | + msg = f"I've detected a badbin and have uploaded your pastes here: https://mystb.in/{key}" |
| 245 | + |
| 246 | + if notice: |
| 247 | + msg += "\nnotice: " + notice |
| 248 | + |
| 249 | + await message.reply(msg, mention_author=False) |
| 250 | + |
198 | 251 | @commands.Cog.listener()
|
199 | 252 | async def on_papi_dpy_modlog(self, payload: ModLogType, /) -> None:
|
200 | 253 | moderation_payload = payload["payload"]
|
|
0 commit comments