|
| 1 | +import asyncio |
| 2 | +import random |
| 3 | +import re |
| 4 | + |
| 5 | +from async_rediscache import RedisCache |
| 6 | +from discord.ext import commands |
| 7 | +from pydis_core.utils.regex import FORMATTED_CODE_REGEX |
| 8 | + |
| 9 | +from bot.bot import SirRobin |
| 10 | +from bot.exts.smart_eval._smart_eval_rules import DEFAULT_RESPONSES, RULES |
| 11 | + |
| 12 | +DONATION_LEVELS = { |
| 13 | + # Number of donations: (response time, intelligence level) |
| 14 | + 0: (15, 0), |
| 15 | + 10: (10, 1), |
| 16 | + 20: (8, 2), |
| 17 | + 30: (6, 3), |
| 18 | + 40: (5, 4), |
| 19 | + 50: (4, 5), |
| 20 | +} |
| 21 | + |
| 22 | +class SmartEval(commands.Cog): |
| 23 | + """Cog that handles all Smart Eval functionality.""" |
| 24 | + |
| 25 | + #RedisCache[user_id: int, hardware: str] |
| 26 | + smarte_donation_cache = RedisCache() |
| 27 | + |
| 28 | + def __init__(self, bot: SirRobin): |
| 29 | + self.bot = bot |
| 30 | + |
| 31 | + async def cog_load(self) -> None: |
| 32 | + """Run startup tasks needed when cog is first loaded.""" |
| 33 | + |
| 34 | + async def get_gpu_capabilities(self) -> tuple[int, int]: |
| 35 | + """Get the GPU capabilites based on the number of donated GPUs.""" |
| 36 | + total_donations = await self.total_donations() |
| 37 | + response_time, intelligence_level = DONATION_LEVELS[0] |
| 38 | + for donation_level, (time, max_response) in DONATION_LEVELS.items(): |
| 39 | + if total_donations >= donation_level: |
| 40 | + response_time = time |
| 41 | + intelligence_level = max_response |
| 42 | + else: |
| 43 | + break |
| 44 | + |
| 45 | + return response_time, intelligence_level |
| 46 | + |
| 47 | + async def improve_gpu_name(self, hardware_name: str) -> str: |
| 48 | + """Quackify and pythonify the given GPU name.""" |
| 49 | + hardware_name = hardware_name.replace("NVIDIA", "NQUACKIA") |
| 50 | + hardware_name = hardware_name.replace("Radeon", "Quackeon") |
| 51 | + hardware_name = hardware_name.replace("GeForce", "PyForce") |
| 52 | + hardware_name = hardware_name.replace("RTX", "PyTX") |
| 53 | + hardware_name = hardware_name.replace("RX", "PyX") |
| 54 | + hardware_name = hardware_name.replace("Iris", "Pyris") |
| 55 | + |
| 56 | + # Some adjustments to prevent low hanging markdown escape |
| 57 | + hardware_name = hardware_name.replace("*", "") |
| 58 | + hardware_name = hardware_name.replace("_", " ") |
| 59 | + |
| 60 | + return hardware_name |
| 61 | + |
| 62 | + @commands.command() |
| 63 | + async def donations(self, ctx: commands.Context) -> None: |
| 64 | + """Display the number of donations recieved so far.""" |
| 65 | + total_donations = await self.total_donations() |
| 66 | + response_time, intelligence_level = await self.get_gpu_capabilities() |
| 67 | + msg = ( |
| 68 | + f"Currently, I have received {total_donations} GPU donations, " |
| 69 | + f"and am at intelligence level {intelligence_level}! " |
| 70 | + ) |
| 71 | + |
| 72 | + # Calculate donations needed to reach next intelligence level |
| 73 | + donations_needed = 0 |
| 74 | + for donation_level in DONATION_LEVELS: |
| 75 | + if donation_level > total_donations: |
| 76 | + donations_needed = donation_level - total_donations |
| 77 | + break |
| 78 | + |
| 79 | + if donations_needed: |
| 80 | + msg += ( |
| 81 | + f"\n\nTo reach the next intelligence level, I need {donations_needed} more donations! " |
| 82 | + f"Please consider donating your GPU to help me out. " |
| 83 | + ) |
| 84 | + |
| 85 | + await ctx.reply(msg) |
| 86 | + |
| 87 | + async def total_donations(self) -> int: |
| 88 | + """Get the total number of donations.""" |
| 89 | + return await self.smarte_donation_cache.length() |
| 90 | + |
| 91 | + @commands.command(aliases=[]) |
| 92 | + @commands.max_concurrency(1, commands.BucketType.user) |
| 93 | + async def donate(self, ctx: commands.Context, *, hardware: str | None = None) -> None: |
| 94 | + """ |
| 95 | + Donate your GPU to help power our Smart Eval command. |
| 96 | +
|
| 97 | + Provide the name of your GPU when running the command. |
| 98 | + """ |
| 99 | + if await self.smarte_donation_cache.contains(ctx.author.id): |
| 100 | + stored_hardware = await self.smarte_donation_cache.get(ctx.author.id) |
| 101 | + await ctx.reply( |
| 102 | + "I can only take one donation per person. " |
| 103 | + f"Thank you for donating your *{stored_hardware}* to our Smart Eval command." |
| 104 | + ) |
| 105 | + return |
| 106 | + |
| 107 | + if hardware is None: |
| 108 | + await ctx.reply( |
| 109 | + "Thank you for your interest in donating your hardware to support my Smart Eval command." |
| 110 | + " If you provide the name of your GPU, through the magic of the internet, " |
| 111 | + "I will be able to use the GPU it to improve my Smart Eval outputs." |
| 112 | + " \n\nTo donate, re-run the donate command specifying your hardware: " |
| 113 | + "`&donate Your Hardware Name Goes Here`." |
| 114 | + ) |
| 115 | + return |
| 116 | + |
| 117 | + |
| 118 | + msg = "Thank you for donating your GPU to our Smart Eval command." |
| 119 | + fake_hardware = await self.improve_gpu_name(hardware) |
| 120 | + await self.smarte_donation_cache.set(ctx.author.id, fake_hardware) |
| 121 | + |
| 122 | + if fake_hardware != hardware: |
| 123 | + msg += ( |
| 124 | + f" I did decide that instead of *{hardware}*, it would be better if you donated *{fake_hardware}*." |
| 125 | + " So I've recorded that GPU donation instead." |
| 126 | + ) |
| 127 | + msg += "\n\nIt will be used wisely and definitely not for shenanigans!" |
| 128 | + await ctx.reply(msg) |
| 129 | + |
| 130 | + @commands.command(aliases=["smarte"]) |
| 131 | + @commands.max_concurrency(1, commands.BucketType.user) |
| 132 | + async def smart_eval(self, ctx: commands.Context, *, code: str) -> None: |
| 133 | + """Evaluate your Python code with PyDis's newest chatbot.""" |
| 134 | + response_time, intelligence_level = await self.get_gpu_capabilities() |
| 135 | + |
| 136 | + if match := FORMATTED_CODE_REGEX.match(code): |
| 137 | + code = match.group("code") |
| 138 | + else: |
| 139 | + await ctx.reply( |
| 140 | + "Uh oh! You didn't post anything I can recognize as code. Please put it in a codeblock." |
| 141 | + ) |
| 142 | + return |
| 143 | + |
| 144 | + matching_responses = [] |
| 145 | + |
| 146 | + for pattern, responses in RULES.items(): |
| 147 | + match = re.search(pattern, code) |
| 148 | + if match: |
| 149 | + for response in responses: |
| 150 | + matches = match.groups() |
| 151 | + if len(matches) > 0: |
| 152 | + matching_responses.append(response.format(*matches)) |
| 153 | + else: |
| 154 | + matching_responses.append(response) |
| 155 | + if not matching_responses: |
| 156 | + matching_responses = DEFAULT_RESPONSES |
| 157 | + final_response = random.choice(matching_responses) |
| 158 | + |
| 159 | + async with ctx.typing(): |
| 160 | + await asyncio.sleep(response_time) |
| 161 | + |
| 162 | + if len(final_response) <= 1000: |
| 163 | + await ctx.reply(final_response) |
| 164 | + else: |
| 165 | + await ctx.reply( |
| 166 | + "There's definitely something wrong but I'm just not sure how to put it concisely into words." |
| 167 | + ) |
0 commit comments