-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtgdelete.py
More file actions
executable file
·113 lines (89 loc) · 3.48 KB
/
tgdelete.py
File metadata and controls
executable file
·113 lines (89 loc) · 3.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#!/bin/env python3
from telethon.sync import TelegramClient, events
from telethon import functions, types
from telethon.tl.types import PeerUser, PeerChannel, PeerChat
import argparse
import asyncio
client = TelegramClient("telegram-deleter", "425874", "d2f7f845f91154f283e313ebb76245a2")
async def delete_chat(chatid, timer = 5):
# Fetch the chat from the ID
chat = await client.get_entity(chatid)
# It's more efficient on both speed and network usage
# , if I give the delete function a list of IDs
# instead of going one at a time.
messageids = list()
# Start iterating over the messages.
print(f"!!!! GATHERING ALL SELF MESSAGES FROM CHAT {chat.title} !!!!", flush = True)
async for message in client.iter_messages(chat, from_user = "me"):
print(f"[{chat.title}] {message.text}")
messageids.append(message.id)
if timer:
print(f"!!!! DELETING MESSAGES in {timer} seconds! !!!!")
await asyncio.sleep(timer)
await client.delete_messages(chat, messageids, revoke = True)
print(f"{len(messageids)} messages deleted!")
async def dump_chats():
async for chat in client.iter_dialogs():
if chat.name:
print(f"{chat.name} {chat.id}")
else:
print(f"[NO NAME] {chat.id}")
async def count_messages():
# TODO: make faster
counts = {}
async for chat in client.iter_dialogs():
print(f"Counting messages in {chat.name}...")
counts.update({chat.name: 0})
async for message in client.iter_messages(chat, from_user = "me"):
counts[chat.name]+=1
print(counts)
async def delete_chats(exclude = list()):
print("!!!! DELETING MESSAGES FROM ALL PUBLIC CHAT ROOMS !!!!")
async for chat in client.iter_dialogs():
if not chat.is_group:
print("Skipping non-group thread...")
continue
if chat.id in exclude:
print(f"Skipping {chat.name} due to user request...")
continue
await delete_chat(chat.id, timer = 2)
async def timer_delete(event):
if isinstance(event.message.peer_id, PeerUser):
chatid = event.message.peer_id.user_id
elif isinstance(event.message.peer_id, PeerChat):
chatid = event.message.peer_id.chat_id
elif isinstance(event.message.peer_id, PeerChannel):
chatid = event.message.peer_id.channel_id
if chatid in args.global_delete_exclude:
print(f"Excluding message {event.message.id} from timer delete...")
return
print(f"New message {event.message.id} in chat {chatid}")
print("Waiting 600 second(s) for secret termination...")
await asyncio.sleep(600)
await event.message.delete()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--delete-chat-id", type = int)
parser.add_argument("--delete-public-chats", action = "store_true")
parser.add_argument("--global-delete-exclude", type = int, nargs = "*", default = [])
parser.add_argument("--dump-chats", action = "store_true")
parser.add_argument("--count-messages", action = "store_true")
parser.add_argument("--timer-delete", action = "store_true")
args = parser.parse_args()
with client:
if args.timer_delete:
client.add_event_handler(timer_delete, events.NewMessage(from_users = "me"))
client.run_until_disconnected()
exit(0)
if args.dump_chats:
client.loop.run_until_complete(dump_chats())
exit(0)
if args.count_messages:
client.loop.run_until_complete(count_messages())
exit(0)
if args.delete_public_chats:
client.loop.run_until_complete(delete_chats(args.global_delete_exclude))
exit(0)
if not args.delete_chat_id:
raise Exception("Must have chat to delete from!")
client.loop.run_until_complete(delete_chat(args.delete_chat_id))