-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
145 lines (104 loc) · 4.01 KB
/
main.py
File metadata and controls
145 lines (104 loc) · 4.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
from dotenv import dotenv_values
import asyncio
from telethon import TelegramClient, events, functions, errors
from telethon.utils import get_extension
from datetime import datetime
import os
import aiofiles
from aiocsv import AsyncReader, AsyncWriter
from enum import Enum, auto
config = dotenv_values('.env')
api_id = int(config['API_ID'])
api_hash = config['API_HASH']
bot = TelegramClient('anon', api_id, api_hash)
chats = []
class State(Enum):
WAIT_FILE = auto()
conversation_state = {}
def get_current_time():
now = datetime.now()
dt_string = now.strftime("%d_%m_%Y_%H_%M_%S")
return dt_string
async def parse_csv(path):
good_names = []
cnt = 0
async with aiofiles.open(path, mode="r", encoding="utf-8", newline="") as afp:
async for name in AsyncReader(afp):
cnt += 1
try:
result = await bot(functions.account.CheckUsernameRequest(
username=name[0]
))
if result:
good_names.append(['@' + name[0]])
except errors.FloodWaitError as fW:
await asyncio.sleep(fW.seconds)
except errors.UsernameInvalidError:
continue
saved_path = path.split('.')[0] + '_out.' + path.split('.')[1]
async with aiofiles.open(saved_path, mode="w", encoding="utf-8", newline="") as afp:
writer = AsyncWriter(afp, dialect="unix")
await writer.writerows(good_names)
good_p = 100 * (len(good_names) / cnt)
bad_p = 100 - good_p
msg = 'Stats:\n' + f'Good:\t{good_p}%\n' + f'Bad:\t{bad_p}%'
return msg, saved_path
@bot.on(events.NewMessage(incoming=True, pattern="/start", chats=chats)) # Delete on commit
async def start(event):
sender = await event.get_sender()
msg = f"Hello, **{sender.username}**!\
\nThis is a username checker bot.\
\nType /test and follow further instructions"
await event.reply(msg)
@bot.on(events.NewMessage(incoming=True, pattern="/help", chats=chats)) # Chat id here
async def bot_help(event):
msg = f"/start - begin interaction with bot\
\n/help - print this help message\
\nHow to use this bot:\
\nYou should send the bot a /test command and after it replies,\
send a csv file that contains nicknames you want to check"
await event.reply(msg)
@bot.on(events.NewMessage(incoming=True, chats=chats)) # Chat id here
async def check_nicknames(event):
if '/test' in event.raw_text:
date = get_current_time()
file_path = f"tmp/{event.sender_id}_{date}"
state = conversation_state.get(event.sender_id)
if state is None:
await event.respond('Hi! Please send a file')
conversation_state[event.sender_id] = State.WAIT_FILE
elif state == State.WAIT_FILE:
await event.respond('Working...')
extension = ''
if event.document:
extension = get_extension(event.document)
else:
await bot.send_message(
event.chat_id,
"Please send me a file"
)
# TODO: Finish handler if no file is sent
if extension == '.csv' or extension == '.xls':
file_path += '.csv'
await bot.download_file(
event.document,
file_path
)
msg, saved_path = await parse_csv(file_path)
await bot.send_file(
event.chat_id,
saved_path,
caption=msg
)
os.remove(file_path)
os.remove(saved_path)
else:
await bot.send_message(
event.chat_id,
"Wrong file format"
)
# TODO: Fix issues when sending file from iPhone
del conversation_state[event.sender_id]
with bot:
bot.start()
bot.run_until_disconnected()