-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathbot.py
More file actions
523 lines (440 loc) · 21.8 KB
/
bot.py
File metadata and controls
523 lines (440 loc) · 21.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
import os
import re
import json
import unicodedata
from dotenv import load_dotenv
from telegram import Update, ChatPermissions, ParseMode
from telegram.ext import Updater, MessageHandler, Filters, CallbackContext, CommandHandler, Filters
from collections import defaultdict, deque
from datetime import datetime, timedelta, timezone, time
from combot.scheduled_warnings import messages
from combot.brand_assets import messages as brand_assets_messages
load_dotenv() # Load .env vars
# Get bot token from environment
BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN')
GROUP_CHAT_ID = os.getenv('GROUP_CHAT_ID')
# File path for filters
FILTERS_FILE = "filters/filters.json"
# File path for metrics
METRICS_FILE = "filters/metrics.json"
# File path for accompanying filter media
MEDIA_FOLDER = "media"
# File paths for phrases
BAN_PHRASES_FILE = "blocklists/ban_phrases.txt"
MUTE_PHRASES_FILE = "blocklists/mute_phrases.txt"
DELETE_PHRASES_FILE = "blocklists/delete_phrases.txt"
WHITELIST_PHRASES_FILE = "whitelists/whitelist_phrases.txt"
# Suspicious names to auto-ban
SUSPICIOUS_USERNAMES = [normalize_name(name) for name in [
"dev", "developer", "admin", "mod", "owner", "arc", "arc_agent", "arc agent",
"arch_agent", "arch agent", "support", "helpdesk", "administrator", "arc admin", "arc_admin"
]]
BIO_PHRASES = [
"verify in bio", "link in bio", "read bio", "look at bio", "info in bio"
]
# Mute duration in seconds (3 days)
MUTE_DURATION = 3 * 24 * 60 * 60
# auto spam detection variables
SPAM_THRESHOLD = 3
TIME_WINDOW = timedelta(seconds=15)
SPAM_TRACKER = defaultdict(lambda: deque(maxlen=SPAM_THRESHOLD))
SPAM_RECORDS = {} # stores flagged spam messages for 5 minutes
SPAM_RECORD_DURATION = timedelta(minutes=5)
def get_admin_ids(context, chat_id):
# Fetch chat admins dynamically
chat_admins = context.bot.get_chat_administrators(chat_id)
return [admin.user.id for admin in chat_admins]
def normalize_name(name: str) -> str:
name = unicodedata.normalize("NFKD", name)
name = ''.join(c for c in name if not unicodedata.combining(c))
name = re.sub(r'[^a-zA-Z0-9_ ]+', '', name)
name = name.lower()
name = name.strip()
name = re.sub(r'\s+', ' ', name)
return name
def get_admin_names(context, chat_id):
"""Return a list of normalized full names (lowercased, whitespace cleaned) for all human admins."""
chat_admins = context.bot.get_chat_administrators(chat_id)
return [normalize_name(admin.user.full_name) for admin in chat_admins if not admin.user.is_bot]
# combot security message
def post_security_message(context: CallbackContext, index: int):
try:
chat = context.bot.get_chat(GROUP_CHAT_ID)
pinned = chat.pinned_message
if pinned:
try:
context.bot.unpin_chat_message(chat_id=GROUP_CHAT_ID, message_id=pinned.message_id)
except Exception as e:
print(f"[Security] Failed to unpin message: {e}")
try:
context.bot.delete_message(chat_id=GROUP_CHAT_ID, message_id=pinned.message_id)
except Exception as e:
print(f"[Security] Failed to delete message: {e}")
except Exception as e:
print(f"[Security] Failed to retrieve chat or pinned message: {e}")
try:
message = messages[index]
sent_message = context.bot.send_message(
chat_id=GROUP_CHAT_ID,
text=message,
parse_mode=ParseMode.HTML
)
context.bot.pin_chat_message(
chat_id=GROUP_CHAT_ID,
message_id=sent_message.message_id,
disable_notification=True
)
except Exception as e:
print(f"[Security] Failed to pin message: {e}")
# combot brand assets
def post_brand_assets(context: CallbackContext, index: int = 0):
try:
chat = context.bot.get_chat(GROUP_CHAT_ID)
pinned = chat.pinned_message
if pinned:
try:
context.bot.unpin_chat_message(chat_id=GROUP_CHAT_ID, message_id=pinned.message_id)
except Exception as e:
print(f"[Brand Assets] Failed to unpin message: {e}")
try:
context.bot.delete_message(chat_id=GROUP_CHAT_ID, message_id=pinned.message_id)
except Exception as e:
print(f"[Brand Assets] Failed to delete message: {e}")
except Exception as e:
print(f"[Brand Assets] Failed to retrieve chat or pinned message: {e}")
try:
message = brand_assets_messages[index]
sent_message = context.bot.send_message(
chat_id=GROUP_CHAT_ID,
text=message,
parse_mode=ParseMode.HTML
)
context.bot.pin_chat_message(
chat_id=GROUP_CHAT_ID,
message_id=sent_message.message_id,
disable_notification=True
)
except Exception as e:
print(f"[Brand Assets] Failed to send or pin message: {e}")
# Load filters as dict
def load_filters(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
return json.load(file)
FILTERS = load_filters(FILTERS_FILE)
# Load metrics
def load_metrics(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
return json.load(file)
METRICS = load_metrics(METRICS_FILE)
# Load blocklist/whitelisted words/phrases from files
def load_phrases(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
return [line.strip().lower() for line in file.readlines()]
BAN_PHRASES = load_phrases(BAN_PHRASES_FILE)
MUTE_PHRASES = load_phrases(MUTE_PHRASES_FILE)
DELETE_PHRASES = load_phrases(DELETE_PHRASES_FILE)
WHITELIST_PHRASES = load_phrases(WHITELIST_PHRASES_FILE)
def contains_multiplication_phrase(text):
text = text.lower()
# Match digit(s) possibly separated by spaces, next to an 'x'
pattern = r"(?:\d\s*)+x|x\s*(?:\d\s*)+"
return re.search(pattern, text)
def contains_give_sol_phrase(text):
text = text.lower()
# Match 'give' followed by a number and then 'sol' or 'solana'
pattern = r"give\s*(\d+)\s*(sol|solana)"
return re.search(pattern, text)
# check for spam
def check_for_spam(message_text, user_id):
now = datetime.now(timezone.utc)
# track user and timestamp of the message
print(f"Checking for spam: {message_text} from user: {user_id}")
SPAM_TRACKER[message_text].append((user_id, now))
# Filter out old messages that are outside of the time window
recent = [entry for entry in SPAM_TRACKER[message_text] if now - entry[1] <= TIME_WINDOW]
SPAM_TRACKER[message_text] = deque(recent)
print(f"Recent messages for '{message_text}': {recent}")
# If recent messages exceed the threshold, flag as spam
if len(recent) >= SPAM_THRESHOLD:
print(f"Spam detected for message: '{message_text}'")
# flag message as spam and store for 5 minutes in memory
SPAM_RECORDS[message_text] = now # only store message and timestamp
spammer_ids = list(set([entry[0] for entry in recent])) # Return list of user_ids to mute
print(f"Flagging {len(spammer_ids)} users for spam: {spammer_ids}")
return spammer_ids
elif recent and len(recent) < SPAM_THRESHOLD and (now - recent[0][1] > TIME_WINDOW):
# Not spam, expired window – clean it up
SPAM_TRACKER.pop(message_text, None)
return []
# check for recent spam and mute spammers
def check_recent_spam(message_text):
now = datetime.now(timezone.utc)
timestamp = SPAM_RECORDS.get(message_text)
if timestamp:
print(f"Message '{message_text}' is flagged as spam, timestamp: {timestamp}")
return timestamp and (now - timestamp <= SPAM_RECORD_DURATION)
# clean up spam records
def cleanup_spam_records(context: CallbackContext):
now = datetime.now(timezone.utc)
expired_messages = []
for message_text, timestamp in list(SPAM_RECORDS.items()):
if now - timestamp > SPAM_RECORD_DURATION:
expired_messages.append(message_text)
del SPAM_RECORDS[message_text]
print(f"[CLEANUP] Removed expired spam record: '{message_text}'")
if not expired_messages:
print("[CLEANUP] No expired spam messages to remove.")
def contains_non_x_links(text: str) -> bool:
# Matches all URLs
url_pattern = r'(https?://[^\s]+)'
urls = re.findall(url_pattern, text)
for url in urls:
# Allow only Twitter/X links
if not re.search(r'https?://(www\.)?(x\.com|twitter\.com)/[^\s]+', url):
return True # Found a non-X link
return False
# Suspicious auto-ban function
def handle_new_members(update, context):
message = update.message
if message is None or not message.new_chat_members:
return
chat_id = message.chat.id
admin_names = get_admin_names(context, chat_id)
for new_user in message.new_chat_members:
name = new_user.full_name or "No Name"
username = new_user.username or "No Username"
user_id = new_user.id
name_info = f"Name: {name}, Username: @{username}" if new_user.username else f"Name: {name} (no username)"
print(f"[JOIN] {name_info} (ID: {user_id})")
# Normalize names and usernames
name_norm = normalize_name(name)
username_norm = normalize_name(username)
if name_norm in admin_names:
try:
context.bot.ban_chat_member(chat_id, user_id)
print(f"[BANNED] Name '{name}' normalized to '{name_norm}' matches an admin name. Banned for impersonation.")
continue
except Exception as e:
print(f"[ERROR] Failed to ban user with admin name {user_id}: {e}")
# Check for suspicious keywords
if any(keyword in name_norm or keyword in username_norm for keyword in SUSPICIOUS_USERNAMES):
try:
context.bot.ban_chat_member(chat_id, user_id)
print(f"[BANNED] Suspicious user auto-banned: {name_info}")
continue
except Exception as e:
print(f"[ERROR] Failed to ban {user_id}: {e}")
# Check for bio phrases
if any(keyword in name_norm or keyword in username_norm for keyword in BIO_PHRASES):
try:
context.bot.ban_chat_member(chat_id, user_id)
print(f"[BANNED] User with suspicious name (bio phrase): {name_info}")
except Exception as e:
print(f"[ERROR] Failed to ban user with bio phrase in name {user_id}: {e}")
def list_filters(update: Update, context: CallbackContext):
# Load the latest filters
with open(FILTERS_FILE, 'r', encoding='utf-8') as f:
filters = json.load(f)
# Get and sort all triggers alphabetically (removing leading slash only for sorting)
sorted_triggers = sorted(filters.keys(), key=lambda k: k.lstrip('/').lower())
# Re-apply slash only if the original trigger had it
formatted_triggers = [f"`{trigger}`" for trigger in sorted_triggers]
# Telegram messages max out at 4096 characters
response = "*Available Filters:*\n" + "\n".join(formatted_triggers)
if len(response) > 4000:
for i in range(0, len(formatted_triggers), 80): # 80 items per message chunk
chunk = "*Available Filters:*\n" + "\n".join(formatted_triggers[i:i+80])
update.message.reply_text(chunk, parse_mode="Markdown")
else:
update.message.reply_text(response, parse_mode="Markdown")
def check_message(update: Update, context: CallbackContext):
print(f"[GROUP MESSAGE] {update.message.text}")
should_skip_spam_check = False
message = update.message or update.channel_post # Handle both messages and channel posts
if not message:
print("==== No message or channel post detected ====")
return
message_text = message.text.lower()
chat_id = update.effective_chat.id
user_id = update.effective_user.id
user = update.effective_user
# Fetch chat admins to prevent acting on their messages
chat_admins = context.bot.get_chat_administrators(chat_id)
admin_ids = [admin.user.id for admin in chat_admins]
# Normalize and fetch admin names for impersonation check
admin_names_normalized = get_admin_names(context, chat_id)
name_normalized = normalize_name(user.full_name)
username_normalized = normalize_name(user.username or "")
# Ignore messages from admins
if user_id not in admin_ids:
combined_identity = f"{name_normalized} {username_normalized}"
# Check for suspicious keywords
if any(keyword in combined_identity for keyword in SUSPICIOUS_USERNAMES):
try:
context.bot.ban_chat_member(chat_id=chat_id, user_id=user_id)
print(f"[BANNED] Suspicious keyword match in name/username: {user.full_name} (@{user.username})")
return
except Exception as e:
print(f"[ERROR] Failed to ban suspicious user {user_id}: {e}")
# Check for bio-like phrases
if any(keyword in combined_identity for keyword in BIO_PHRASES):
try:
context.bot.ban_chat_member(chat_id=chat_id, user_id=user_id)
print(f"[BANNED] Bio phrase detected in name/username: {combined_identity}")
return
except Exception as e:
print(f"[ERROR] Failed to ban user with bio phrase {user_id}: {e}")
# Check for impersonation
if name_normalized in admin_names_normalized:
try:
context.bot.ban_chat_member(chat_id=chat_id, user_id=user_id)
print(f"[BANNED] Impersonation detected: {user.full_name} matched an admin name")
return
except Exception as e:
print(f"[ERROR] Failed to ban impersonator {user_id}: {e}")
# check if message is too short
if len(message_text.strip()) < 2:
context.bot.delete_message(chat_id=chat_id, message_id=message.message_id)
return
# Delete message if it contains non-X links
if contains_non_x_links(message.text):
print(f"[LINK FILTER] Message from user {user_id} contains non-X links. Deleting.")
context.bot.delete_message(chat_id=chat_id, message_id=message.message_id)
return
# Check for multiplication spam
if contains_multiplication_phrase(message_text):
context.bot.delete_message(chat_id=chat_id, message_id=message.message_id)
return
# Check for "give x sol" or "give x solana" spam
if contains_give_sol_phrase(message_text):
context.bot.delete_message(chat_id=chat_id, message_id=message.message_id)
return
# Block forwarded messages from non-admins
if message.forward_date or message.forward_from or message.forward_from_chat:
print(f"[FORWARD DETECTED] User {user_id} forwarded a message.")
context.bot.delete_message(chat_id=chat_id, message_id=message.message_id)
return
# 1. autospam - check if its a command or matches a filter
for trigger in FILTERS.keys():
normalized_trigger = trigger.strip().lower()
pattern = rf'(?<!\w)/?{re.escape(normalized_trigger)}(_\w+)?(?!\w)'
if re.search(pattern, message_text):
should_skip_spam_check = True
print(f"[SPAM CHECK SKIPPED] Message '{message_text}' matched FILTER trigger: '{trigger}'")
break
# 2. autospam - check whitelist
if not should_skip_spam_check:
if message_text.strip() in WHITELIST_PHRASES:
print(f"[SPAM CHECK SKIPPED] Message '{message_text}' matched WHITELIST.")
should_skip_spam_check = True
# 3. autospam - check for spam
if not should_skip_spam_check:
# Run spam detection only if no FILTER trigger matched
spammer_ids = check_for_spam(message_text, user_id)
if check_recent_spam(message_text) and user_id not in spammer_ids:
spammer_ids.append(user_id)
if spammer_ids:
print(f"Muting spammers for message: '{message_text}'")
for spammer_id in set(spammer_ids):
try:
until_date = message.date + timedelta(seconds=MUTE_DURATION)
permissions = ChatPermissions(can_send_messages=False)
context.bot.restrict_chat_member(chat_id=chat_id, user_id=spammer_id, permissions=permissions, until_date=until_date)
print(f"Muted user {spammer_id} for spam message.")
except Exception as e:
print(f"Failed to mute spammer {spammer_id}: {e}")
return
# Check for banned phrases
for phrase in BAN_PHRASES:
# Use word boundaries to match exact words
if re.search(r'\b' + re.escape(phrase) + r'\b', message_text):
print(f"[BAN MATCH] Phrase: '{phrase}' matched in message: '{message_text}'")
context.bot.ban_chat_member(chat_id=chat_id, user_id=user.id)
message.reply_text(f"arc angel fallen. {user.first_name} has been banned.")
return
# Check for muted phrases
for phrase in MUTE_PHRASES:
# Use word boundaries to match exact words
if re.search(r'\b' + re.escape(phrase) + r'\b', message_text):
print(f"[MUTE MATCH] Phrase: '{phrase}' matched in message: '{message_text}'")
until_date = message.date + timedelta(seconds=MUTE_DURATION)
permissions = ChatPermissions(can_send_messages=False)
context.bot.restrict_chat_member(chat_id=chat_id, user_id=user.id, permissions=permissions, until_date=until_date)
message.reply_text(f"{user.first_name} has been muted for 3 days.")
return
# Check for deleted phrases
for phrase in DELETE_PHRASES:
# Use word boundaries to match exact words
if re.search(r'\b' + re.escape(phrase) + r'\b', message_text):
print(f"[DELETE MATCH] Phrase: '{phrase}' matched in message: '{message_text}'")
context.bot.delete_message(chat_id=chat_id, message_id=message.message_id)
return
# Filter Responses (apply to all)
for trigger, filter_data in FILTERS.items():
normalized_trigger = trigger.strip().lower()
# use word boundaries but allow underscores to be appended
pattern = rf'(?<!\w)/?{re.escape(normalized_trigger)}(_\w+)?(?!\w)'
if re.search(pattern, message_text):
response_text = filter_data.get("response_text", "")
media_file = filter_data.get("media")
media_type = filter_data.get("type", "gif").lower()
if media_file:
media_path = os.path.join(MEDIA_FOLDER, media_file)
if os.path.exists(media_path):
with open(media_path, 'rb') as media:
if media_type in ["gif", "animation"]:
context.bot.send_animation(chat_id=chat_id, animation=media, caption=response_text or None)
elif media_type == "image":
context.bot.send_photo(chat_id=chat_id, photo=media, caption=response_text or None)
elif media_type == "video":
context.bot.send_video(chat_id=chat_id, video=media, caption=response_text or None)
elif response_text:
message.reply_text(response_text)
elif response_text:
message.reply_text(response_text)
return # Respond only once
if re.search(r'(?<!\w)/metrics(?!\w)', message_text):
try:
with open("filters/metrics.json", "r", encoding="utf-8") as f:
data = json.load(f)
response_text = data.get("last_metrics_message", "⚠️ Metrics message is missing or invalid.")
message.reply_text(response_text)
except Exception as e:
message.reply_text(f"⚠️ Error reading metrics: {e}")
return
if re.search(r'(?<!\w)/growth(?!\w)', message_text):
try:
with open("filters/growth.json", "r", encoding="utf-8") as f:
data = json.load(f)
response_text = data.get("last_weekly_metrics_message", "⚠️ Weekly metrics message is missing or invalid.")
message.reply_text(response_text)
except Exception as e:
message.reply_text(f"⚠️ Error reading weekly metrics: {e}")
return
if re.search(r'(?<!\w)/posts(?!\w)', message_text):
try:
with open("filters/posts.json", "r", encoding="utf-8") as f:
data = json.load(f)
response_text = data.get("latest_posts_message", "⚠️ Latest posts message is missing or invalid.")
message.reply_text(response_text, disable_web_page_preview=False, parse_mode="Markdown")
except Exception as e:
message.reply_text(f"⚠️ Error reading posts: {e}")
return
def main():
print("starting bot")
updater = Updater(BOT_TOKEN, use_context=True)
dp = updater.dispatcher
job_queue = updater.job_queue
# Scheduled jobs
job_queue.run_daily(lambda context: post_security_message(context, 0), time=time(hour=8, minute=0))
job_queue.run_daily(lambda context: post_security_message(context, 1), time=time(hour=16, minute=0))
job_queue.run_daily(post_brand_assets, time=time(hour=0, minute=0))
job_queue.run_repeating(cleanup_spam_records, interval=60, first=60)
# Message and command handlers
dp.add_handler(CommandHandler("filters", list_filters))
dp.add_handler(MessageHandler(Filters.status_update.new_chat_members, handle_new_members))
dp.add_handler(MessageHandler(Filters.text | Filters.command, check_message))
updater.start_polling()
updater.idle()
if __name__ == '__main__':
main()