-
-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathfixer.py
More file actions
1147 lines (988 loc) · 41.6 KB
/
fixer.py
File metadata and controls
1147 lines (988 loc) · 41.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from __future__ import annotations
import asyncio
import contextlib
from typing import TYPE_CHECKING, Any, Final, Literal, cast
import discord
import emoji
from discord import app_commands
from discord.app_commands import locale_str
from discord.ext import commands
from loguru import logger
from pydantic import BaseModel, field_validator
from embed_fixer.core.translator import Translator
from embed_fixer.fixes import DOMAINS, AppendURLFix, Domain, DomainId, FixMethod, Website
from embed_fixer.models import GuildFixMethod, GuildSettings, IgnoreMe
from embed_fixer.utils.download_media import MediaDownloader
from embed_fixer.utils.fetch_info import PostInfoFetcher
from embed_fixer.utils.misc import (
append_path_to_url,
capture_exception,
domain_in_url,
extract_urls,
get_filesize,
remove_query_params,
replace_domain,
sanitize_username,
)
if TYPE_CHECKING:
from collections.abc import Sequence
from embed_fixer.bot import EmbedFixer, Interaction
USERNAME_SUFFIX: Final[str] = " (Embed Fixer)"
ERROR_MSG_DELETE_AFTER: Final[int] = 10
DEFAULT_FILESIZE_LIMIT: Final[int] = 10 * 1024 * 1024 # 10 MB
type SendType = Literal["webhook", "reply", "channel", "interaction"]
class MockMessage:
"""Mock message object for slash command URL fixing."""
def __init__(
self,
content: str,
channel: discord.interactions.InteractionChannel,
guild: discord.Guild | None,
author: discord.User | discord.Member,
) -> None:
self.id = 0
self.content = content
self.channel = channel
self.guild = guild
self.author = author
self.attachments: list[discord.Attachment] = []
self.tts = False
self.reference: discord.MessageReference | None = None
self.webhook_id: int | None = None
class Media(BaseModel):
url: str
file: discord.File | None = None
model_config = {"arbitrary_types_allowed": True}
class PostExtractionResult(BaseModel):
medias: list[Media]
content: str
author_md: str
@field_validator("author_md", mode="after")
@classmethod
def __remove_emojis(cls, v: str) -> str:
return emoji.replace_emoji(v, "")
class FindFixResult(BaseModel):
fix_found: bool
medias: list[Media]
sauces: list[str]
content: str
author_md: str
urls: list[str]
async def add_reaction_safe(message: discord.Message, emoji: str) -> None:
try:
await message.add_reaction(emoji)
except discord.Forbidden:
logger.warning(
f"Failed to add reaction to message {message.id} in channel {message.channel.id}"
)
except discord.NotFound:
pass
async def remove_reaction_safe(
message: discord.Message, emoji: str, member: discord.Member
) -> None:
try:
await message.remove_reaction(emoji, member)
except discord.Forbidden:
logger.warning(
f"Failed to remove reaction from message {message.id} in channel {message.channel.id}"
)
except discord.NotFound:
pass
class FixerCog(commands.Cog):
def __init__(self, bot: EmbedFixer) -> None:
self.bot = bot
self.fetch_info = PostInfoFetcher(self.bot.session)
self.fix_embed_ctx = app_commands.ContextMenu(
name=app_commands.locale_str("fix_embed"), callback=self.fix_embed
)
self.extract_medias_ctx = app_commands.ContextMenu(
name=app_commands.locale_str("extract_medias"), callback=self.extract_medias
)
async def cog_load(self) -> None:
self.bot.tree.add_command(self.fix_embed_ctx)
self.bot.tree.add_command(self.extract_medias_ctx)
async def cog_unload(self) -> None:
self.bot.tree.remove_command(self.fix_embed_ctx.name, type=self.fix_embed_ctx.type)
self.bot.tree.remove_command(
self.extract_medias_ctx.name, type=self.extract_medias_ctx.type
)
@staticmethod
def _skip_channel(settings: GuildSettings | None, channel_id: int) -> bool:
if settings is None:
return False
if settings.enable_fix_channels and channel_id not in settings.enable_fix_channels:
return True
return channel_id in settings.disable_fix_channels
async def _nsfw_skip(self, url: str, domain: Domain, *, is_nsfw_channel: bool) -> bool:
"""Skip NSFW domains if the channel is not NSFW."""
pixiv_skip = (
domain.id == DomainId.PIXIV
and not is_nsfw_channel
and await self.fetch_info.pixiv_is_nsfw(url)
)
kemono_skip = domain.id == DomainId.KEMONO and not is_nsfw_channel
twitter_skip = (
domain.id == DomainId.TWITTER
and not is_nsfw_channel
and await self.fetch_info.twitter_is_nsfw(url)
)
return pixiv_skip or kemono_skip or twitter_skip
@staticmethod
async def _get_original_author(
message: discord.Message, guild: discord.Guild
) -> discord.Member | None:
query = message.author.display_name.removesuffix(USERNAME_SUFFIX)
authors = await guild.query_members(query, limit=100)
if not authors:
return None
return next((a for a in authors if a.display_name == query), None)
@staticmethod
async def _determine_fix_method(
settings: GuildSettings | None, domain: Domain
) -> FixMethod | None:
if not domain.fix_methods:
return None
guild_fix_method = (
None
if settings is None
else await GuildFixMethod.get_or_none(guild_id=settings.id, domain_id=domain.id)
)
if guild_fix_method is None:
fix_method = domain.default_fix_method
else:
fix_method = next(
(f for f in domain.fix_methods if f.id == guild_fix_method.fix_id), None
)
if fix_method is None:
fix_method = domain.default_fix_method
asyncio.create_task(guild_fix_method.delete())
return fix_method
@staticmethod
def _get_matching_domain_website(
settings: GuildSettings | None, clean_url: str
) -> tuple[Domain | None, Website | None]:
domain: Domain | None = None
website: Website | None = None
for d in DOMAINS:
if settings is not None and d.id in settings.disabled_domains:
continue
for w in d.websites:
if w.match(clean_url):
domain = d
website = w
break
# Break from outer loop
else:
continue
break
return domain, website
@staticmethod
def _apply_fxembed_translation(url: str, *, translang: str) -> str:
# FxEmbed (fxtwitter) can translate posts by appending /{lang}
# See https://github.com/FxEmbed/FxEmbed#translate-posts-xtwitter for more info
return append_path_to_url(url, f"/{translang}")
async def _find_fixes( # noqa: C901, PLR0912, PLR0914, PLR0915
self,
message: discord.Message | MockMessage,
*,
settings: GuildSettings | None,
filesize_limit: int,
extract_media: bool = False,
is_ctx_menu: bool = False,
) -> FindFixResult:
channel_id = message.channel.id
fix_found = False
medias: list[Media] = []
sauces: list[str] = []
content = ""
author_md = ""
is_nsfw_channel = (
is_ctx_menu
# Text or Voice channel is nsfw
or (
isinstance(message.channel, (discord.TextChannel, discord.VoiceChannel))
and message.channel.nsfw
)
# Thread's parent is nsfw
or (
isinstance(message.channel, discord.Thread)
and message.channel.parent is not None
and message.channel.parent.nsfw
)
)
urls = extract_urls(message.content)
for url, spoilered in urls:
try:
clean_url = remove_query_params(url).replace("www.", "")
except ValueError:
logger.warning(f"Invalid URL found: {url}")
continue
domain, website = self._get_matching_domain_website(settings, clean_url)
if domain is None or website is None:
continue
logger.debug(f"Matched domain {domain.id!r} for URL: {clean_url}")
if await self._nsfw_skip(url, domain, is_nsfw_channel=is_nsfw_channel):
continue
if extract_media or (
settings is not None and channel_id in settings.extract_media_channels
):
if not is_ctx_menu and isinstance(message, discord.Message):
asyncio.create_task(add_reaction_safe(message, "⌛"))
spoiler = spoilered or (
is_nsfw_channel
and (settings is not None and channel_id not in settings.disable_image_spoilers)
)
result = await self._extract_post_info(
domain.id, url, spoiler=spoiler, filesize_limit=filesize_limit
)
medias.extend(
Media(url=media.url)
if (media.file and get_filesize(media.file.fp) > filesize_limit)
else media
for media in result.medias
)
content, author_md = result.content, result.author_md
logger.debug(f"Extracted {len(result.medias)} media files from {url}")
if medias:
fix_found = True
if spoilered:
message.content = message.content.replace(f"||{url}||", "")
else:
message.content = message.content.replace(url, "")
sauces.append(clean_url)
continue
if extract_media:
continue
fix_method = await self._determine_fix_method(settings, domain)
if (
fix_method is None
or not fix_method.fixes
or (website.skip_method_ids and fix_method.id in website.skip_method_ids)
):
logger.debug(f"No valid fix method for domain {domain.id!r} and URL: {clean_url}")
continue
for fix in fix_method.fixes:
if isinstance(fix, AppendURLFix):
new_url = f"https://{fix.domain}?url={clean_url}"
if domain.id == DomainId.FACEBOOK:
# For facebook, replace /v/ with /r/, found by @zzxc.
new_url = new_url.replace("/v/", "/r/")
else:
if not domain_in_url(clean_url, fix.old_domain):
continue
new_url = replace_domain(clean_url, fix.old_domain, fix.new_domain)
logger.debug(f"Replaced domain {fix.old_domain} with {fix.new_domain}")
if (
fix_method.id == 1
and settings is not None
and settings.translate_target_lang
): # FxEmbed
new_url = self._apply_fxembed_translation(
new_url, translang=settings.translate_target_lang
)
if fix_method.has_ads and (
settings is not None and not settings.show_original_link_btn
):
# If the fix method has ads and the guild hasn't enabled
# "Show Original Link Button", recommend enabling it.
guild_lang = await Translator.get_guild_lang(message.guild)
recommend_msg = self.bot.translator.get(
guild_lang, "recommend_original_link_btn"
)
message.content = f"-# {recommend_msg}\n{message.content}"
fix_found = True
message.content = message.content.replace(url, new_url)
break
return FindFixResult(
fix_found=fix_found,
medias=medias,
sauces=sauces,
content=content,
author_md=author_md,
urls=[url for url, _ in urls],
)
async def _extract_post_info(
self, domain_id: DomainId, url: str, *, spoiler: bool = False, filesize_limit: int
) -> PostExtractionResult:
logger.debug(f"Extracting post info from {url} for domain {domain_id!r}")
media_urls: list[str] = []
content = ""
info = None
try:
if domain_id is DomainId.PIXIV:
info = await self.fetch_info.pixiv(url)
content = "" if info is None else info.description
media_urls = [] if info is None else info.image_urls
elif domain_id is DomainId.TWITTER:
info = await self.fetch_info.twitter(url)
content = "" if info is None else info.text
media_urls = [] if info is None else [media.url for media in info.medias]
elif domain_id is DomainId.BLUESKY:
info = await self.fetch_info.bluesky(url)
content = "" if info is None else info.record.text
media_urls = [] if info is None else info.media_urls
elif domain_id is DomainId.KEMONO:
media_urls = await self.fetch_info.kemono(url)
else:
return PostExtractionResult(medias=[], content="", author_md="")
except Exception:
logger.exception(f"Failed to extract post info from {url} for domain {domain_id!r}")
return PostExtractionResult(medias=[], content="", author_md="")
logger.debug(f"Extracted media URLs: {media_urls}")
downloader = MediaDownloader(self.bot.session, media_urls=media_urls)
await downloader.start(spoiler=spoiler, filesize_limit=filesize_limit)
medias: list[Media] = []
for media_url in media_urls:
file_ = downloader.files.get(media_url)
medias.append(Media(url=media_url, file=file_))
logger.debug(f"Downloaded {len(medias)} media files")
return PostExtractionResult(
medias=medias, content=content[:2000], author_md="" if info is None else info.author_md
)
async def _resolve_author_mention(
self, resolved_ref: discord.Message, message: discord.Message
) -> str:
assert message.guild is not None
# Replying to a webhook message
if resolved_ref.webhook_id is not None:
author = await self._get_original_author(resolved_ref, message.guild)
if author is None:
return resolved_ref.author.display_name
# Author of the webhook message is the same as this message's author, don't mention
if author.id == message.author.id:
return author.display_name
return author.mention
# Replying to a normal message
if resolved_ref.author.id == message.author.id:
# Author of the normal message is the same as this message's author, don't mention
return resolved_ref.author.display_name
return resolved_ref.author.mention
async def _send_fixes(
self,
message: discord.Message,
result: FindFixResult,
*,
guild_settings: GuildSettings | None,
filesize_limit: int,
interaction: Interaction | None = None,
) -> SendType | None:
silent = False
medias, sauces = result.medias, result.sauces
medias.extend([Media(url=a.url, file=await a.to_file()) for a in message.attachments])
show_post_content = (
None
if guild_settings is None
else message.channel.id in guild_settings.show_post_content_channels
)
if show_post_content:
if result.author_md:
message.content += f"\n-# {result.author_md}"
if result.content:
message.content += f"\n{result.content}"
if len(sauces) > 1:
sauces_str = "\n".join(f"<{sauce}>" for sauce in sauces)
message.content += f"\n||{sauces_str}||"
sauces.clear()
# If the message was originally replying to another message, since this message
# will be deleted for a fix, add the reply to the new message containing the fix.
if (
message.reference is not None
and isinstance(resolved_ref := message.reference.resolved, discord.Message)
and message.guild is not None
):
mention = await self._resolve_author_mention(resolved_ref, message)
replying_to = self.bot.translator.get(
await Translator.get_guild_lang(message.guild),
"replying_to",
user=mention,
url=resolved_ref.jump_url,
)
message.content = f"{replying_to}\n{message.content}"
silent = True
if medias:
return await self._send_files(
message,
medias,
sauces,
guild_settings=guild_settings,
filesize_limit=filesize_limit,
interaction=interaction,
silent=silent,
)
return await self._send_message(
message,
urls=result.urls,
guild_settings=guild_settings,
interaction=interaction,
silent=silent,
)
@staticmethod
def _batch_medias(medias: list[Media], filesize_limit: int) -> list[list[Media]]:
"""Batch medias by count (max 10) and total size (max filesize_limit)."""
batches: list[list[Media]] = []
current_batch: list[Media] = []
current_size = 0
for media in medias:
file_size = 0 if media.file is None else get_filesize(media.file.fp)
# If adding this file would exceed limits, start a new batch
if current_batch and (
len(current_batch) >= 10 or current_size + file_size > filesize_limit
):
batches.append(current_batch)
current_batch = []
current_size = 0
current_batch.append(media)
current_size += file_size
if current_batch:
batches.append(current_batch)
return batches
async def _send_files( # noqa: PLR0913
self,
message: discord.Message,
medias: list[Media],
sauces: list[str],
*,
guild_settings: GuildSettings | None,
filesize_limit: int,
interaction: Interaction | None = None,
silent: bool = False,
) -> SendType | None:
"""Send multiple files in batches of 10 and within filesize limit."""
guild_lang: str | None = None
send_type: SendType | None = None
for chunk in self._batch_medias(medias, filesize_limit):
kwargs: dict[str, Any] = {"silent": silent}
if sauces:
if guild_lang is None:
guild_lang = await Translator.get_guild_lang(message.guild)
view = discord.ui.View()
view.add_item(
discord.ui.Button(
url=sauces[0], label=self.bot.translator.get(guild_lang, "sauce")
)
)
kwargs["view"] = view
files: list[discord.File] = []
for media in chunk:
if media.file is not None:
files.append(media.file)
else:
message.content += f"\n{media.url}"
send_type = await self._send_message(
message,
guild_settings=guild_settings,
medias=chunk,
interaction=interaction,
**kwargs,
)
message.content = ""
return send_type
def _add_original_link_button(
self,
urls: list[str] | None,
show_original_link_btn: bool,
guild_lang: str,
kwargs: dict[str, Any],
) -> None:
"""Add original link button to kwargs if enabled."""
if show_original_link_btn and urls:
view = discord.ui.View()
view.add_item(
discord.ui.Button(
url=urls[0], label=self.bot.translator.get(guild_lang, "original_link")
)
)
kwargs["view"] = view
async def _send_via_interaction(
self,
interaction: Interaction,
message: discord.Message,
files: list[discord.File],
**kwargs: Any,
) -> discord.Message:
"""Send message via interaction."""
allowed_mentions = discord.AllowedMentions(
everyone=False, users=False, roles=False, replied_user=False
)
if interaction.response.is_done():
await interaction.followup.send(
message.content,
tts=message.tts,
files=files,
allowed_mentions=allowed_mentions,
**kwargs,
)
else:
await interaction.response.send_message(
message.content,
tts=message.tts,
files=files,
allowed_mentions=allowed_mentions,
**kwargs,
)
return await interaction.original_response()
async def _get_target_channel(
self, message: discord.Message, funnel_target_channel: int | None
) -> discord.abc.GuildChannel | discord.abc.MessageableChannel | discord.abc.PrivateChannel:
"""Get the target channel for sending the message."""
if funnel_target_channel is None:
return message.channel
return self.bot.get_channel(funnel_target_channel) or await self.bot.fetch_channel(
funnel_target_channel
)
async def _send_via_webhook_or_reply(
self,
message: discord.Message,
webhook_channel: discord.abc.GuildChannel
| discord.abc.MessageableChannel
| discord.abc.PrivateChannel,
files: list[discord.File],
guild_settings: GuildSettings | None,
**kwargs: Any,
) -> tuple[discord.Message | None, SendType]:
"""Send message via webhook or reply."""
webhook = await self._get_or_create_webhook(webhook_channel, message.guild)
if webhook is not None:
try:
return await self._send_webhook(message, webhook, files=files, **kwargs), "webhook"
except Exception as e:
err_message = self.bot.translator.get(
await Translator.get_guild_lang(message.guild), "failed_to_send_webhook"
)
await message.channel.send(
f"{err_message}\n\n{e}", delete_after=ERROR_MSG_DELETE_AFTER
)
raise
elif guild_settings is not None and guild_settings.delete_original_message_in_threads:
return (
await message.channel.send(message.content, tts=message.tts, files=files, **kwargs),
"channel",
)
else:
# In a thread or something that doesn't have webhook, then reply (and suppress embed
# of the original message later)
return await message.reply(
message.content, tts=message.tts, files=files, mention_author=False, **kwargs
), "reply"
async def _handle_http_exception(
self,
e: discord.HTTPException,
message: discord.Message,
medias: Sequence[Media],
guild_settings: GuildSettings | None,
**kwargs: Any,
) -> None:
"""Handle HTTP exceptions during message sending."""
if e.code == 40005: # Request entity too large
message.content += "\n".join(media.url for media in medias)
await self._send_message(message, guild_settings=guild_settings, **kwargs)
else:
raise e
async def _send_message(
self,
message: discord.Message,
*,
urls: list[str] | None = None,
guild_settings: GuildSettings | None,
medias: Sequence[Media] | None = None,
interaction: Interaction | None = None,
**kwargs: Any,
) -> SendType | None:
"""Send a message with a webhook, interaction, or reply."""
fix_message: discord.Message | None = None
medias = medias or []
files = [media.file for media in medias if media.file is not None]
# Extract settings
disable_delete_reaction = (
None if guild_settings is None else guild_settings.disable_delete_reaction
)
delete_msg_emoji = None if guild_settings is None else guild_settings.delete_msg_emoji
funnel_target_channel = (
None if guild_settings is None else guild_settings.funnel_target_channel
)
show_original_link_btn = (
False if guild_settings is None else guild_settings.show_original_link_btn
)
# Add original link button if needed
if show_original_link_btn and urls:
guild_lang = await Translator.get_guild_lang(message.guild)
self._add_original_link_button(urls, show_original_link_btn, guild_lang, kwargs)
# Send message via interaction or webhook/channel
if interaction is not None:
fix_message = await self._send_via_interaction(interaction, message, files, **kwargs)
send_type = "interaction"
else:
try:
webhook_channel = await self._get_target_channel(message, funnel_target_channel)
fix_message, send_type = await self._send_via_webhook_or_reply(
message, webhook_channel, files, guild_settings=guild_settings, **kwargs
)
except discord.HTTPException as e:
await self._handle_http_exception(e, message, medias, guild_settings, **kwargs)
return None
await self._add_delete_reaction(
message, interaction, fix_message, disable_delete_reaction, delete_msg_emoji
)
return send_type
async def _add_delete_reaction(
self,
message: discord.Message,
interaction: Interaction | None,
fix_message: discord.Message | None,
disable_delete_reaction: bool | None,
delete_msg_emoji: str | None,
) -> None:
if (
not disable_delete_reaction
and delete_msg_emoji
and interaction is None
and fix_message is not None
):
guild_id = message.guild.id if message.guild else "DM"
err_message = f"Failed to add reaction {delete_msg_emoji!r} to message {fix_message.id} in {guild_id}"
try:
await fix_message.add_reaction(delete_msg_emoji)
except discord.Forbidden:
logger.warning(err_message)
with contextlib.suppress(discord.Forbidden):
await fix_message.reply(
self.bot.translator.get(
await Translator.get_guild_lang(message.guild),
"no_perms_to_add_reactions",
),
delete_after=ERROR_MSG_DELETE_AFTER,
)
except discord.HTTPException as e:
if e.code != 50035: # Invalid Form Body (emoji), user error so ignore
capture_exception(e)
with contextlib.suppress(discord.Forbidden):
await fix_message.reply(
self.bot.translator.get(
await Translator.get_guild_lang(message.guild),
"add_reaction_error",
emoji=delete_msg_emoji,
),
delete_after=ERROR_MSG_DELETE_AFTER,
)
async def _send_webhook(
self,
message: discord.Message,
webhook: discord.Webhook,
files: list[discord.File] | None = None,
**kwargs: Any,
) -> discord.Message:
files = files or []
return await webhook.send(
message.content,
username=f"{sanitize_username(message.author.display_name)}{USERNAME_SUFFIX}",
avatar_url=message.author.display_avatar.url,
tts=message.tts,
wait=True,
files=files,
**kwargs,
)
async def _get_or_create_webhook(
self,
channel: discord.abc.GuildChannel
| discord.abc.MessageableChannel
| discord.abc.PrivateChannel,
guild: discord.Guild | None,
) -> discord.Webhook | None:
if not isinstance(channel, discord.TextChannel):
return None
try:
webhooks = await channel.webhooks()
except discord.Forbidden:
with contextlib.suppress(discord.Forbidden):
await channel.send(
self.bot.translator.get(
await Translator.get_guild_lang(guild), "no_perms_to_manage_webhooks"
),
delete_after=ERROR_MSG_DELETE_AFTER,
)
return None
webhook_name = self.bot.user.name
webhook = discord.utils.get(webhooks, name=webhook_name)
if webhook is None:
webhook = await channel.create_webhook(
name=webhook_name, avatar=await self.bot.user.display_avatar.read()
)
return webhook
async def _handle_reply(self, message: discord.Message, resolved_ref: discord.Message) -> None:
if message.content.startswith("$"):
return
guild = message.guild
if guild is None:
return
author = await self._get_original_author(resolved_ref, guild)
# Can't find author or author is a bot or the author is the same as the message author
if author is None or author.bot or author.id == message.author.id:
return
try:
await message.reply(
self.bot.translator.get(
await Translator.get_guild_lang(guild),
"replying_to",
user=author.mention,
url=resolved_ref.jump_url,
),
mention_author=False,
silent=True,
)
except discord.Forbidden:
logger.warning(f"No permission to send reply in {message.channel.id=} in {guild.id=}")
except discord.HTTPException as e:
if e.code == 50035: # Invalid Form Body, mostly due to "Unknown message", so ignore
return
capture_exception(e)
@commands.Cog.listener()
async def on_raw_reaction_add(self, payload: discord.RawReactionActionEvent) -> None: # noqa: PLR0911
if payload.guild_id is None or payload.user_id == self.bot.user.id:
return
settings, _ = await GuildSettings.get_or_create(id=payload.guild_id)
if str(payload.emoji) != settings.delete_msg_emoji:
return
channel_id = payload.channel_id
channel = self.bot.get_channel(channel_id) or await self.bot.fetch_channel(channel_id)
if isinstance(
channel, discord.ForumChannel | discord.CategoryChannel | discord.abc.PrivateChannel
):
return
try:
message = await channel.fetch_message(payload.message_id)
except discord.NotFound:
return
except discord.Forbidden:
logger.warning(
f"Failed to fetch message in {channel!r}, bot perms: {channel.permissions_for(channel.guild.me)}"
)
return
# The emoji is added to a webhook message which also has the USERNAME_SUFFIX,
is_webhook = (
message.webhook_id is not None and USERNAME_SUFFIX in message.author.display_name
)
# The emoji is added to a message sent by the bot, and is also replying to
# another message.
is_reply = (
message.reference is not None
and isinstance(message.reference.resolved, discord.Message)
and message.author.id == self.bot.user.id
)
if (guild := message.guild) is None or not (is_webhook or is_reply):
return
if is_webhook:
author = await self._get_original_author(message, guild)
if author is None:
return
else:
message_ref = cast("discord.MessageReference", message.reference)
resolved_ref = cast("discord.Message", message_ref.resolved)
author = resolved_ref.author
if payload.user_id == author.id:
try:
await message.delete()
except discord.Forbidden:
logger.warning(f"Failed to delete message in {channel.id=} in {guild.id=}")
with contextlib.suppress(discord.Forbidden):
await message.reply(
self.bot.translator.get(
await Translator.get_guild_lang(guild), "no_perms_to_delete_msg"
),
delete_after=ERROR_MSG_DELETE_AFTER,
)
@commands.Cog.listener("on_message")
async def embed_fixer(self, message: discord.Message) -> None:
if message.content.startswith(f"{self.bot.user.mention} jsk py"):
return
channel, guild, author = message.channel, message.guild, message.author
if (
(message.webhook_id is not None and USERNAME_SUFFIX in author.display_name)
or self.bot.user.id == author.id
or guild is None
or await IgnoreMe.contains(author.id)
):
return
guild_settings, _ = await GuildSettings.get_or_create(id=guild.id)
whitelist_role_skip = (
isinstance(author, discord.Member)
and guild_settings.whitelist_role_ids
and not any(role.id in guild_settings.whitelist_role_ids for role in author.roles)
)
if (
self._skip_channel(guild_settings, channel.id)
or (not guild_settings.bot_visibility and author.bot)
or whitelist_role_skip
):
return
try:
result = await self._find_fixes(
message, settings=guild_settings, filesize_limit=guild.filesize_limit
)
except Exception as e:
capture_exception(e)
return
logger.debug(f"FindFixResult for message {message.id} in {guild.id=}: {result}")
if result.fix_found:
try:
send_type = await self._send_fixes(
message,
result,
guild_settings=guild_settings,
filesize_limit=guild.filesize_limit,
)
except discord.HTTPException:
logger.warning(f"Failed to send fixes in {channel.id=} in {guild.id=}")
return
except Exception as e:
capture_exception(e)
return
if send_type in {"webhook", "channel"}:
# send_type is only "channel" when delete_original_message_in_threads is enabled
# this is checked in _send_via_webhook_or_reply.
await self.delete_message_safe(message, channel, guild)
elif send_type == "reply":
await self.suppress_embed_safe(message, channel, guild)
await remove_reaction_safe(message, "⌛", guild.me)
# If this is a normal message (no embed fix found) replying to a webhook message,
# reply to this message containing mention to the original author of the webhook message.
elif (
message.reference is not None
and isinstance(resolved_ref := message.reference.resolved, discord.Message)
and resolved_ref.webhook_id is not None
and not author.bot
and not guild_settings.disable_webhook_reply
):
await self._handle_reply(message, resolved_ref)
async def suppress_embed_safe(
self,
message: discord.Message,
channel: discord.abc.MessageableChannel,