-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsingle_bot.py
More file actions
106 lines (84 loc) · 3.82 KB
/
single_bot.py
File metadata and controls
106 lines (84 loc) · 3.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import asyncio
import os
from contextlib import suppress
import sentry_sdk
from aiogram import Bot, Dispatcher
from aiogram.client.default import DefaultBotProperties
from aiogram.exceptions import TelegramBadRequest
from aiogram.fsm.storage.base import DefaultKeyBuilder
from aiogram.fsm.storage.redis import RedisStorage
from loguru import logger
from bot.routers.supports import router as support_router
from config.bot_config import bot_config
async def aiogram_on_startup_polling(dispatcher: Dispatcher, bot: Bot) -> None:
await bot.delete_webhook(drop_pending_updates=True)
with suppress(TelegramBadRequest):
await bot.send_message(chat_id=bot_config.ADMIN_ID, text='Single bot started')
logger.info("Started polling")
async def aiogram_on_shutdown_polling(dispatcher: Dispatcher, bot: Bot) -> None:
logger.debug("Stopping polling")
await bot.session.close()
await dispatcher.storage.close()
logger.info("Stopped polling")
def main():
# Настройка логирования
logger.add("logs/SingleSupportBot.log", rotation="1 MB", level='INFO')
# Получаем токен бота поддержки из конфига
support_token = bot_config.single_bot_token
if not support_token:
logger.error("Не найден токен бота поддержки (single_bot_token)")
return
# Создаем экземпляр бота
bot = Bot(token=support_token, default=DefaultBotProperties(parse_mode='HTML'))
# Получаем информацию о боте
bot_info = asyncio.run(bot.get_me())
# Создаем конфигурацию для одиночного бота
single_bot_config = {
str(bot_info.id): {
"id": bot_info.id,
"username": bot_info.username,
"token": support_token,
"start_message": bot_config.SINGLE_START_MESSAGE,
"security_policy": bot_config.SINGLE_SECURITY_POLICY,
"master_chat": bot_config.SINGLE_MASTER_CHAT or bot_config.ADMIN_ID,
"master_thread": None,
"no_start_message": False,
"special_commands": 0,
"mark_bad": False,
"owner": bot_config.ADMIN_ID,
"can_work": True,
"ignore_commands": False,
"use_local_names": False,
"local_names": {},
"use_auto_reply": bot_config.SINGLE_USE_AUTO_REPLY,
"auto_reply": bot_config.SINGLE_AUTO_REPLY,
"ignore_users": []
}
}
# Устанавливаем конфигурацию в bot_config
bot_config.json_config = single_bot_config
# Настраиваем хранилище состояний
storage = RedisStorage.from_url(url=bot_config.REDIS_URL,
key_builder=DefaultKeyBuilder(with_bot_id=True, with_destiny=True))
# Создаем диспетчер
dispatcher = Dispatcher(storage=storage)
from bot.middlewares.db import DbSessionMiddleware
dispatcher.update.middleware(DbSessionMiddleware())
# Регистрируем маршрутизатор поддержки
dispatcher.include_router(support_router)
# Регистрируем обработчики запуска и остановки
dispatcher.startup.register(aiogram_on_startup_polling)
dispatcher.shutdown.register(aiogram_on_shutdown_polling)
# Запускаем бота на поллинге
asyncio.run(dispatcher.start_polling(bot))
if __name__ == '__main__':
# Инициализация Sentry
if len(bot_config.SENTRY_DSN) > 10:
sentry_sdk.init(
dsn=bot_config.SENTRY_DSN,
traces_sample_rate=1.0,
profiles_sample_rate=1.0,
)
main()