-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbot.py
More file actions
88 lines (67 loc) · 2.38 KB
/
bot.py
File metadata and controls
88 lines (67 loc) · 2.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import config
import asyncio
import aioschedule
from loguru import logger
from googlesheet_table import GoogleTable
from aiogram import Bot, types
from aiogram.utils import executor
from aiogram.dispatcher import Dispatcher
from aiogram.utils.exceptions import ChatNotFound
logger.add(
config.settings['LOG_FILE'],
format='{time} {level} {message}',
level='DEBUG',
)
class BirthDateTelegramBot(Bot):
def __init__(self, token, parse_mode, google_table=None):
super().__init__(token, parse_mode=parse_mode)
self._google_table: GoogleTable = google_table
bot = BirthDateTelegramBot(
token=config.settings['TOKEN'],
parse_mode=types.ParseMode.HTML,
google_table=GoogleTable('creds.json', config.settings['GOOGLESHEET_URL']),
)
dispatcher = Dispatcher(bot)
SERJ_ID = config.settings['USER_ID']
@dispatcher.message_handler(content_types=['sticker'])
async def cat_sticker_handler(message: types.Message):
await bot.send_sticker(
message.from_user.id,
sticker='CAACAgIAAxkBAAIBBWNJstZXmnX9_z310cvPE0RbloHAAAKvFgACFXAhSGhYH3WyKi5kKgQ'
)
async def create_answer(birth_dates: dict) -> str:
answers = []
for value in birth_dates.values():
if not value:
continue
names = ' ✨\n✨ '.join(list(value.values())[0])
answers.append(f'❗️ {list(value.keys())[0]} - 🥳 день рождения 🥳 этих ребят:\n\n✨ {names} ✨')
return '\n\n'.join(answers)
async def find_birth_dates() -> dict:
return {
0: bot._google_table.search_names(),
1: bot._google_table.search_names(time_delta=1),
7: bot._google_table.search_names(time_delta=7),
}
async def send_message():
birth_dates = await find_birth_dates()
answer = await create_answer(birth_dates)
if not answer:
return
try:
await bot.send_message(SERJ_ID, answer)
except ChatNotFound as error:
logger.info(f'{error}: sending error')
return
except Exception as error:
logger.warning(error)
return
async def scheduler():
aioschedule.every().day.at('09:00').do(send_message)
while True:
await aioschedule.run_pending()
await asyncio.sleep(1)
async def on_startup(_):
asyncio.create_task(scheduler())
if __name__ == '__main__':
executor.start_polling(dispatcher, skip_updates=True, on_startup=on_startup)