-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
167 lines (135 loc) · 5.41 KB
/
main.py
File metadata and controls
167 lines (135 loc) · 5.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import asyncio
import logging
from datetime import datetime # <--- Добавлено
from typing import Callable, Awaitable, Dict, Any
from aiogram import Bot, Dispatcher, types, BaseMiddleware
from aiogram.client.default import DefaultBotProperties
from aiogram.filters import Command
from aiogram.fsm.storage.memory import MemoryStorage
from aiogram.types import TelegramObject
from config import SETTINGS
from db.models import init_db, initialize_services
# Импортируем новую функцию deduct_daily_balances
from scheduler.jobs import check_api_balances, check_planned_alerts, deduct_daily_balances
from handlers import callii as callii_handlers
from handlers import balance as balance_handlers
from handlers import wazzup as wazzup_handlers
# Logging configuration
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(name)s - %(message)s'
)
logger = logging.getLogger(__name__)
# --- Middlewares ---
class TargetChatFilter(BaseMiddleware):
"""
Security middleware: Only allows interactions from the specific TARGET_CHAT_ID.
"""
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any]
) -> Any:
chat_id = None
# Determine chat ID based on event type
if hasattr(event, 'chat') and event.chat:
chat_id = event.chat.id
elif isinstance(event, types.CallbackQuery) and event.message:
chat_id = event.message.chat.id
if chat_id != SETTINGS.TARGET_CHAT_ID:
# Silent ignore for security
return
return await handler(event, data)
class DBSessionMiddleware(BaseMiddleware):
"""
Dependency Injection middleware: Provides a database session to handlers.
"""
def __init__(self, session_factory):
super().__init__()
self.session_factory = session_factory
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any]
) -> Any:
async with self.session_factory() as session:
data["session"] = session
return await handler(event, data)
# --- Scheduler ---
async def scheduler_loop(bot: Bot, session_factory):
"""
Background loop for scheduled tasks.
"""
API_CHECK_INTERVAL = 3600
PLANNED_ALERT_CHECK_INTERVAL = 600
# Variable to track if we already deducted balance today
last_deduction_date = None
while True:
try:
# Используем время сервера (без TIMEZONE)
now = datetime.now()
# --- 1. Daily Deduction Logic (Runs once per day when date changes) ---
if last_deduction_date != now.date():
async with session_factory() as session:
await deduct_daily_balances(session)
last_deduction_date = now.date()
logger.info("Daily balances deducted.")
# --- 2. Existing Tasks ---
async with session_factory() as session:
await check_api_balances(bot, session)
await asyncio.sleep(60)
async with session_factory() as session:
await check_planned_alerts(bot, session)
await asyncio.sleep(PLANNED_ALERT_CHECK_INTERVAL)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Scheduler error: {e}", exc_info=True)
await asyncio.sleep(60)
async def main():
"""Application entry point."""
# 1. Database Initialization
session_factory = await init_db(SETTINGS.DATABASE_URL)
await initialize_services(session_factory)
# 2. Bot Setup
bot = Bot(token=SETTINGS.BOT_TOKEN, default=DefaultBotProperties(parse_mode="Markdown"))
dp = Dispatcher(storage=MemoryStorage())
# 3. Middleware Registration
db_middleware = DBSessionMiddleware(session_factory)
chat_filter = TargetChatFilter()
routers = [
balance_handlers.router,
callii_handlers.router,
wazzup_handlers.router
]
# Apply middlewares to all routers and the dispatcher
for r in routers + [dp]:
# Filter first, then DB injection
r.message.middleware(chat_filter)
r.callback_query.middleware(chat_filter)
r.message.middleware(db_middleware)
r.callback_query.middleware(db_middleware)
# 4. Include Routers
dp.include_routers(*routers)
@dp.message(Command("start"))
async def command_start_handler(message: types.Message) -> None:
await message.answer(
"👋 **Система мониторинга активна.**\n\n"
"Я работаю в фоновом режиме. Используйте /balance для проверки статуса."
)
# 5. Start Scheduler & Polling
scheduler_task = asyncio.create_task(scheduler_loop(bot, session_factory))
logger.info("Bot started...")
try:
await dp.start_polling(bot)
finally:
scheduler_task.cancel()
await scheduler_task
await bot.session.close()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Bot stopped by user.")