|
| 1 | +""" |
| 2 | +This module provides an Telegram bot API to interact with the user |
| 3 | +without a direct access to Home Security Service. |
| 4 | +
|
| 5 | +Main responsibilities of the servicer as follows: |
| 6 | + - Bot provides if hardware and the bot itself is alive. (/alive) |
| 7 | + - Bot provides if the service is dead or alive. (/health hss.service) |
| 8 | + - Bot restarts the service if the command is sent. (/restart hss.service) |
| 9 | + - Bot provides the latest N logs if wanted. (/logs hss.service:N) |
| 10 | + - Bot provides if protectors are in house, and whose. (/inhouse) |
| 11 | + - Bot provides an image-shot if wanted. (/imageshot) |
| 12 | + - Bot schedules a reboot for the hardware. (/reboot) |
| 13 | + - Bot provides a shell access to the hardware. (/shell) |
| 14 | +""" |
| 15 | +import asyncio |
| 16 | +import json |
| 17 | +from typing import Any |
| 18 | + |
| 19 | +import cv2 |
| 20 | +from pystemd import systemd1 as systemd |
| 21 | +from telebot.async_telebot import AsyncTeleBot |
| 22 | + |
| 23 | +from core.strategies.eye.picamera_strategy import PiCameraStrategy |
| 24 | +from core.strategies.wifi.admin_panel_strategy import AdminPanelStrategy |
| 25 | + |
| 26 | + |
| 27 | +def read_configurations() -> tuple[dict[str, Any], dict[str, Any]]: |
| 28 | + """ |
| 29 | + This method reads the configurations from the .config.json file. |
| 30 | + """ |
| 31 | + with open(".config.json", "r", encoding="utf-8") as file: |
| 32 | + _config = json.load(file) |
| 33 | + main_settings = _config['main_settings'] |
| 34 | + strategy_settings = _config['strategy_settings'] |
| 35 | + return main_settings, strategy_settings |
| 36 | + |
| 37 | + |
| 38 | +# Definitations |
| 39 | +MAIN_CONIGS, STRATEGY_CONFIGS = read_configurations() |
| 40 | +SERVICER_BOT = AsyncTeleBot(token=STRATEGY_CONFIGS["telegram_strategy"]["bot_key"]) |
| 41 | +KNOWN_LOG_LOCATIONS: dict[str, str] = { |
| 42 | + "hss.service": "/home/raspberry/.home-security-system/logs/hss.log" |
| 43 | +} |
| 44 | + |
| 45 | + |
| 46 | +@SERVICER_BOT.message_handler(commands=["info", "help", "hi"]) |
| 47 | +async def info(message): |
| 48 | + """ |
| 49 | + This method is called when the /info, /help or /hi command is sent. |
| 50 | + """ |
| 51 | + await SERVICER_BOT.reply_to(message, |
| 52 | + "Hi, I am the Home Security System Servicer Bot.\n\n" |
| 53 | + "Here are the commands you can use:\n" |
| 54 | + "/alive - provides if hardware and the bot itself is alive.\n" |
| 55 | + "/health hss.service - provides if the service is dead or alive.\n" |
| 56 | + "/restart hss.service - restarts the given service.\n" |
| 57 | + "/logs hss.service:N - provides the latest N logs.\n" |
| 58 | + "/inhouse - provides if protectors are in house, and whose.\n" |
| 59 | + "/imageshot - captures an image and sends.\n" |
| 60 | + "/reboot - reboots the hardware.\n" |
| 61 | + "/shell echo 'test'- provides a shell access to the hardware.\n" |
| 62 | + "/info, /help, /hi - this help text.\n") |
| 63 | + |
| 64 | + |
| 65 | +@SERVICER_BOT.message_handler(commands=['alive']) |
| 66 | +async def alive(message): |
| 67 | + """ |
| 68 | + This method is called when the /alive command is sent. |
| 69 | + """ |
| 70 | + await SERVICER_BOT.reply_to(message, "I am alive.") |
| 71 | + |
| 72 | + |
| 73 | +@SERVICER_BOT.message_handler(commands=['health']) |
| 74 | +async def health(message): |
| 75 | + """ |
| 76 | + This method is called when the /health command is sent. |
| 77 | + """ |
| 78 | + parameters = message.text[len('/health'):] |
| 79 | + service_name = parameters.strip().split(' ')[0] |
| 80 | + with systemd.Unit(service_name.encode("utf-8")) as service: |
| 81 | + active_state: str = service.Unit.ActiveState.decode("utf-8") |
| 82 | + sub_state: str = service.Unit.SubState.decode("utf-8") |
| 83 | + service_name: str = service.Unit.Description.decode("utf-8") |
| 84 | + main_pid: str = service.Service.MainPID |
| 85 | + await SERVICER_BOT.reply_to(message, |
| 86 | + f"Service: {service_name}\n" |
| 87 | + f"Active State: {active_state}\n" |
| 88 | + f"Sub State: {sub_state}\n" |
| 89 | + f"Main PID: {main_pid}") |
| 90 | + |
| 91 | + |
| 92 | +@SERVICER_BOT.message_handler(commands=['restart']) |
| 93 | +async def restart(message): |
| 94 | + """ |
| 95 | + This method is called when the /restart command is sent. |
| 96 | + """ |
| 97 | + parameters = message.text[len('/restart'):] |
| 98 | + service_name = parameters.strip().split(' ')[0] |
| 99 | + with systemd.Unit(service_name.encode("utf-8")) as service: |
| 100 | + service.Unit.Restart("fail") |
| 101 | + await SERVICER_BOT.reply_to(message, f"{service_name} is restarted.") |
| 102 | + |
| 103 | + |
| 104 | +@SERVICER_BOT.message_handler(commands=['logs']) |
| 105 | +async def logs(message): |
| 106 | + """ |
| 107 | + This method is called when the /logs command is sent. |
| 108 | + """ |
| 109 | + first_parameter = message.text[len('/logs'):].strip().split(' ')[0] |
| 110 | + service_name, last_n_lines = first_parameter.split(":") |
| 111 | + if service_name not in KNOWN_LOG_LOCATIONS: |
| 112 | + await SERVICER_BOT.reply_to(message, f"Unknown service: {service_name}") |
| 113 | + with open(KNOWN_LOG_LOCATIONS[service_name], "r") as log_file: |
| 114 | + logs = log_file.readlines()[-int(last_n_lines):] |
| 115 | + await SERVICER_BOT.reply_to(message, "".join(logs)) |
| 116 | + |
| 117 | + |
| 118 | +@SERVICER_BOT.message_handler(commands=['inhouse']) |
| 119 | +async def in_house(message): |
| 120 | + """ |
| 121 | + This method is called when the /in-house command is sent. |
| 122 | + """ |
| 123 | + protectors_list = MAIN_CONIGS["protectors"] |
| 124 | + strategy = AdminPanelStrategy(STRATEGY_CONFIGS["admin_panel_strategy"]) |
| 125 | + connected_macs = strategy._get_all_connected() |
| 126 | + connected_protectors = "\n\t- " + "\n\t- ".join([ |
| 127 | + protector['name'] for protector in protectors_list |
| 128 | + if protector['address'] in [device.address for device in connected_macs] |
| 129 | + ]) |
| 130 | + response = f"Connected MACs: {[device.address for device in connected_macs]}\n\n\n" \ |
| 131 | + f"Protectors in house: {connected_protectors}" |
| 132 | + await SERVICER_BOT.reply_to(message, response) |
| 133 | + |
| 134 | + |
| 135 | +@SERVICER_BOT.message_handler(commands=['imageshot']) |
| 136 | +async def image_shot(message): |
| 137 | + """ |
| 138 | + This method is called when the /image-shot command is sent. |
| 139 | + """ |
| 140 | + camera = PiCameraStrategy() |
| 141 | + frame = camera.get_frame() |
| 142 | + success, encoded_frame = cv2.imencode('.png', frame) |
| 143 | + if not success: |
| 144 | + await SERVICER_BOT.reply_to(message, "Failed to capture the image.") |
| 145 | + return |
| 146 | + await SERVICER_BOT.send_photo(message.chat.id, encoded_frame.tobytes()) |
| 147 | + del frame, encoded_frame |
| 148 | + |
| 149 | + |
| 150 | +@SERVICER_BOT.message_handler(commands=['reboot']) |
| 151 | +async def reboot(message): |
| 152 | + """ |
| 153 | + This method is called when the /reboot command is sent. |
| 154 | + """ |
| 155 | + await SERVICER_BOT.reply_to(message, "Rebooting the hardware.") |
| 156 | + with systemd.Manager() as manager: |
| 157 | + manager.Reboot() |
| 158 | + await SERVICER_BOT.reply_to(message, "Hardware is rebooted.") |
| 159 | + |
| 160 | + |
| 161 | +@SERVICER_BOT.message_handler(commands=["shell"]) |
| 162 | +async def shell_run(message): |
| 163 | + """ |
| 164 | + This method is called when the /shell command is sent. |
| 165 | + """ |
| 166 | + command = message.text[len("/shell"):].strip() |
| 167 | + process = await asyncio.create_subprocess_shell(command, |
| 168 | + stdout=asyncio.subprocess.PIPE, |
| 169 | + stderr=asyncio.subprocess.PIPE) |
| 170 | + stdout, stderr = await process.communicate() |
| 171 | + await SERVICER_BOT.reply_to(message, f"stdout: {stdout.decode()}\nstderr: {stderr.decode()}") |
| 172 | + |
| 173 | +if __name__ == "__main__": |
| 174 | + asyncio.run(SERVICER_BOT.polling()) |
0 commit comments