diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
index ca031881..1d3458d4 100644
--- a/.github/workflows/tests.yml
+++ b/.github/workflows/tests.yml
@@ -41,6 +41,11 @@ jobs:
run: |
pip install -U .
+ - name: Run Pyright
+ run: |
+ pip install pyright
+ pyright mypylib mypyconsole mytoncore/models.py mytoncore/utils.py # wip support whole project
+
- name: Run pytest
run: |
pytest --import-mode=append # to test built package
diff --git a/modules/alert_bot.py b/modules/alert_bot.py
index ba725316..2012528f 100644
--- a/modules/alert_bot.py
+++ b/modules/alert_bot.py
@@ -1,5 +1,7 @@
import dataclasses
import time
+from typing import Optional
+
import requests
from modules.module import MtcModule
@@ -196,6 +198,7 @@ def __init__(self, ton, local, *args, **kwargs):
self.chat_id = None
self.last_db_check = 0
self.initial_sync = None
+ self.wallet: Optional[str] = None
def send_message(self, text: str, silent: bool = False, disable_web_page_preview: bool = False):
if self.token is None:
@@ -237,7 +240,7 @@ def send_alert(self, alert_name: str, *args, track_active: bool = True, **kwargs
ADNL: {self.adnl}'''
if self.ton.using_validator():
- text += f"\nWallet: {self.wallet}"
+ text += f"\nWallet: {self.wallet or 'n/a'}"
text += f'''
Time: {time_} ({int(time.time())})
@@ -250,7 +253,7 @@ def send_alert(self, alert_name: str, *args, track_active: bool = True, **kwargs
if track_active:
self._set_alert_active(alert_name, True)
- def resolve_alert(self, alert_name: str, ok_alert_name: str = None, **kwargs):
+ def resolve_alert(self, alert_name: str, ok_alert_name: Optional[str] = None, **kwargs):
if not self._is_alert_active(alert_name):
return
ok_alert_name = ok_alert_name or f"{alert_name}_ok"
@@ -292,7 +295,8 @@ def init(self):
self.hostname = get_hostname()
adnl = self.ton.GetAdnlAddr()
self.adnl = adnl
- self.wallet = self.ton.GetValidatorWallet().addrB64
+ w = self.ton.GetValidatorWallet()
+ self.wallet = w.addrB64 if w else None
self.ip = self.ton.get_node_ip()
self.set_global_vars()
self.initial_sync = self.ton.in_initial_sync()
@@ -415,6 +419,7 @@ def check_validator_wallet_balance(self):
if not validator_status.is_working or validator_status.out_of_sync >= 20:
return
validator_wallet = self.ton.GetValidatorWallet()
+ assert validator_wallet is not None, "Could not get validator wallet"
validator_account = self.ton.GetAccount(validator_wallet.addrB64)
if validator_account.status != "empty" and validator_account.balance < 10:
self.send_alert("low_wallet_balance", wallet=validator_wallet.addrB64, balance=validator_account.balance)
@@ -424,6 +429,7 @@ def check_validator_wallet_balance(self):
def check_efficiency(self):
if not self.ton.using_validator():
return
+ assert self.validator_module is not None, "Validator module is not initialized"
validator = self.validator_module.find_myself(self.ton.GetValidatorsList())
if validator is None or validator.efficiency is None:
return
@@ -526,7 +532,7 @@ def check_stake_returned(self):
trs = self.ton.GetAccountHistory(self.ton.GetAccount(res["walletAddr"]), limit=10)
for tr in trs:
- if tr.time >= config['endWorkTime'] + FREEZE_PERIOD and tr.srcAddr == '3333333333333333333333333333333333333333333333333333333333333333' and tr.body.startswith('F96F7324'): # Elector Recover Stake Response
+ if tr.time >= config['endWorkTime'] + FREEZE_PERIOD and tr.src_addr == '3333333333333333333333333333333333333333333333333333333333333333' and tr.body.startswith('F96F7324'): # Elector Recover Stake Response
self.send_alert("stake_returned", stake=round(tr.value), address=res["walletAddr"], reward=round(tr.value - res.get('stake', 0), 2))
return
self.send_alert("stake_not_returned", address=res["walletAddr"])
diff --git a/modules/backups.py b/modules/backups.py
index b9abbac1..35088733 100644
--- a/modules/backups.py
+++ b/modules/backups.py
@@ -42,7 +42,7 @@ def create_backup(self, args):
if not check_usage_args_min_max_len("create_backup", args, 0, 3):
return
tmp_dir = self.create_tmp_ton_dir()
- command_args = ["-m", self.ton.local.buffer.my_work_dir, "-t", tmp_dir]
+ command_args = ["-m", self.ton.local.my_work_dir, "-t", tmp_dir]
user = pop_user_from_args(args)
if len(args) == 1:
command_args += ["-d", args[0]]
@@ -84,7 +84,7 @@ def restore_backup(self, args):
color_print(f"{{red}}Could not create backup: {e}{{endc}}")
ip = str(ip2int(get_own_ip()))
- command_args = ["-m", self.ton.local.buffer.my_work_dir, "-n", args[0], "-i", ip]
+ command_args = ["-m", self.ton.local.my_work_dir, "-n", args[0], "-i", ip]
if self.run_restore_backup(command_args, user=user) == 0:
self.ton.local.load_db()
diff --git a/modules/btc_teleport.py b/modules/btc_teleport.py
index 8b64c872..8e72a216 100644
--- a/modules/btc_teleport.py
+++ b/modules/btc_teleport.py
@@ -16,7 +16,7 @@ class BtcTeleportModule(MtcModule):
def __init__(self, ton, local, *args, **kwargs):
super().__init__(ton, local, *args, **kwargs)
- self.keystore_path = self.ton.local.buffer.my_work_dir + '/btc_oracle_keystore'
+ self.keystore_path = self.ton.local.my_work_dir + '/btc_oracle_keystore'
self.repo_name = 'ton-teleport-btc-periphery'
self.src_dir = f"/usr/src/{self.repo_name}"
self.bin_dir = self.src_dir + '/out'
diff --git a/modules/controller.py b/modules/controller.py
index 1ea2f1e7..700daea5 100644
--- a/modules/controller.py
+++ b/modules/controller.py
@@ -190,10 +190,10 @@ def do_check_liquid_pool(self):
history = self.ton.GetAccountHistory(account, 5000)
addrs_list = list()
for message in history:
- if message.srcAddr is None or message.value is None:
+ if message.src_addr is None or message.value is None:
continue
- src_addr_full = f"{message.srcWorkchain}:{message.srcAddr}"
- dest_add_full = f"{message.destWorkchain}:{message.destAddr}"
+ src_addr_full = f"{message.src_workchain}:{message.src_addr}"
+ dest_add_full = f"{message.dest_workchain}:{message.dest_addr}"
if src_addr_full == account.addrFull:
fromto = dest_add_full
else:
diff --git a/modules/nominator_pool.py b/modules/nominator_pool.py
index 0383a676..70f21971 100644
--- a/modules/nominator_pool.py
+++ b/modules/nominator_pool.py
@@ -85,7 +85,7 @@ def update_validator_set(self, args):
def do_deposit_to_pool(self, pool_addr, amount):
wallet = self.ton.GetValidatorWallet()
- bocPath = self.ton.local.buffer.my_temp_dir + wallet.name + "validator-deposit-query.boc"
+ bocPath = self.ton.local.my_temp_dir + wallet.name + "validator-deposit-query.boc"
fiftScript = self.ton.contractsDir + "nominator-pool/func/validator-deposit.fif"
args = [fiftScript, bocPath]
self.ton.fift.Run(args)
diff --git a/modules/utilities.py b/modules/utilities.py
index d9620a03..6f84563a 100644
--- a/modules/utilities.py
+++ b/modules/utilities.py
@@ -44,10 +44,10 @@ def get_history_table(self, addr, limit):
typeText = color_text("{red}{bold}{endc}")
table += [["Time", typeText, "Coins", "From/To"]]
for message in history:
- if message.srcAddr is None:
+ if message.src_addr is None:
continue
- srcAddrFull = f"{message.srcWorkchain}:{message.srcAddr}"
- destAddFull = f"{message.destWorkchain}:{message.destAddr}"
+ srcAddrFull = f"{message.src_workchain}:{message.src_addr}"
+ destAddFull = f"{message.dest_workchain}:{message.dest_addr}"
if srcAddrFull == account.addrFull:
type = color_text("{red}{bold}>>>{endc}")
fromto = destAddFull
diff --git a/modules/validator.py b/modules/validator.py
index 4cfa8341..b79cc9b1 100644
--- a/modules/validator.py
+++ b/modules/validator.py
@@ -1,8 +1,9 @@
+from __future__ import annotations
import json
import time
from modules.btc_teleport import BtcTeleportModule
-from mypylib.mypylib import color_print, get_timestamp
+from mypylib.mypylib import Dict, color_print, get_timestamp
from modules.module import MtcModule
from mytoncore.utils import hex_shard_to_int, hex2b64
from mytonctrl.console_cmd import check_usage_two_args, add_command, check_usage_args_min_max_len
@@ -46,7 +47,7 @@ def vote_complaint(self, args):
self.ton.VoteComplaint(election_id, complaint_hash)
color_print("VoteComplaint - {green}OK{endc}")
- def find_myself(self, validators: list) -> dict:
+ def find_myself(self, validators: list) -> Dict | None:
adnl_addr = self.ton.GetAdnlAddr()
for validator in validators:
if validator.get("adnlAddr") == adnl_addr:
diff --git a/mypyconsole/mypyconsole.py b/mypyconsole/mypyconsole.py
index 7809c400..7f7d9996 100644
--- a/mypyconsole/mypyconsole.py
+++ b/mypyconsole/mypyconsole.py
@@ -6,6 +6,9 @@
import readline
from collections import deque
+from mypylib import MyPyClass
+
+
class MyPyConsoleItem():
def __init__(self, cmd: str, func, desc: str, usage: str = ''):
self.cmd = cmd
@@ -15,12 +18,12 @@ def __init__(self, cmd: str, func, desc: str, usage: str = ''):
#end define
#end class
-class MyPyConsole():
+class MyPyConsole:
RED = '\033[31m'
GREEN = '\033[92m'
ENDC = '\033[0m'
- def __init__(self):
+ def __init__(self, local: MyPyClass):
self.debug = False
self.name = "console"
self.color = self.GREEN
@@ -30,7 +33,7 @@ def __init__(self):
self.start_function = None
self.menu_items = list()
self.history = deque(maxlen=100)
- self.local = None
+ self.local: MyPyClass = local
self.add_item("help", self.help, "Print help text")
self.add_item("clear", self.clear, "Clear console")
self.add_item("history", self.print_history, "Print last commands")
diff --git a/mypylib/__init__.py b/mypylib/__init__.py
index e91a69b5..40e628a7 100644
--- a/mypylib/__init__.py
+++ b/mypylib/__init__.py
@@ -5,9 +5,7 @@
"Dict",
"MyPyClass",
"parse",
- "ping",
"get_request",
- "dir",
"b2mb",
"search_file_in_dir",
"search_dir_in_dir",
diff --git a/mypylib/mypylib.py b/mypylib/mypylib.py
index b6c96daa..a32f24be 100644
--- a/mypylib/mypylib.py
+++ b/mypylib/mypylib.py
@@ -1,5 +1,4 @@
-#!/usr/bin/env python3
-# -*- coding: utf_8 -*-
+from __future__ import annotations
import os
import re
@@ -20,6 +19,8 @@
import threading
import subprocess
import datetime as date_time_library
+from types import FrameType
+from typing import Any, Callable, Literal, Mapping, Sequence
from urllib.request import urlopen
from urllib.error import URLError
@@ -28,40 +29,43 @@
ERROR = "error"
DEBUG = "debug"
+Callback = Callable[..., Any]
+
class Dict(dict):
- def __init__(self, *args, **kwargs):
+
+ def __init__(self, *args: Mapping[str, Any], **kwargs: Any) -> None:
for item in args:
self._parse_dict(item)
self._parse_dict(kwargs)
- #end define
- def _parse_dict(self, d):
+ def _parse_dict(self, d: Mapping[str, Any]) -> None:
for key, value in d.items():
if type(value) in [dict, Dict]:
value = Dict(value)
if type(value) == list:
value = self._parse_list(value)
self[key] = value
- #end define
- def _parse_list(self, lst):
+ def _parse_list(self, lst: list[Any]) -> list[Any]:
result = list()
for value in lst:
if type(value) in [dict, Dict]:
value = Dict(value)
result.append(value)
return result
- #end define
- def __setattr__(self, key, value):
+ def __getitem__(self, key: str) -> Any:
+ return super().__getitem__(key)
+
+ def get(self, key: str, default: Any = None) -> Any:
+ return super().get(key, default)
+
+ def __setattr__(self, key: str, value: Any) -> None:
self[key] = value
- #end define
- def __getattr__(self, key):
+ def __getattr__(self, key: str) -> Any:
return self.get(key)
- #end define
-#end class
class bcolors:
@@ -86,7 +90,8 @@ class bcolors:
BOLD = bold
UNDERLINE = underline
- def get_args(*args):
+ @staticmethod
+ def get_args(*args: Any) -> str:
text = ""
for item in args:
if item is None:
@@ -95,43 +100,50 @@ def get_args(*args):
return text
#end define
- def magenta_text(*args):
+ @staticmethod
+ def magenta_text(*args: Any) -> str:
text = bcolors.get_args(*args)
text = bcolors.magenta + text + bcolors.endc
return text
#end define
- def blue_text(*args):
+ @staticmethod
+ def blue_text(*args: Any) -> str:
text = bcolors.get_args(*args)
text = bcolors.blue + text + bcolors.endc
return text
#end define
- def green_text(*args):
+ @staticmethod
+ def green_text(*args: Any) -> str:
text = bcolors.get_args(*args)
text = bcolors.green + text + bcolors.endc
return text
#end define
- def yellow_text(*args):
+ @staticmethod
+ def yellow_text(*args: Any) -> str:
text = bcolors.get_args(*args)
text = bcolors.yellow + text + bcolors.endc
return text
#end define
- def red_text(*args):
+ @staticmethod
+ def red_text(*args: Any) -> str:
text = bcolors.get_args(*args)
text = bcolors.red + text + bcolors.endc
return text
#end define
- def bold_text(*args):
+ @staticmethod
+ def bold_text(*args: Any) -> str:
text = bcolors.get_args(*args)
text = bcolors.bold + text + bcolors.endc
return text
#end define
- def underline_text(*args):
+ @staticmethod
+ def underline_text(*args: Any) -> str:
text = bcolors.get_args(*args)
text = bcolors.underline + text + bcolors.endc
return text
@@ -143,27 +155,47 @@ def underline_text(*args):
class MyPyClass:
- def __init__(self, file):
- self.working = True
- self.file = file
- self.db = Dict()
+ def __init__(self, file: str) -> None:
+ self.working: bool = True
+ self.file: str = file
+ self.db: Dict = Dict()
self.db.config = Dict()
- self.buffer = Dict()
- self.buffer.old_db = Dict()
- self.buffer.log_list = list()
- self.buffer.thread_count = None
- self.buffer.memory_using = None
- self.buffer.free_space_memory = None
+ self.old_db = Dict()
+ self.log_list = list()
+ self.thread_count: int | None = None
+ self.thread_count_old: int | None = None
+ self.memory_using: float | None = None
+ self.free_space_memory: float | None = None
+
+ self.buffer: Dict = Dict()
+
+ self.my_name: str = self.get_my_name()
+ self.my_full_name: str = self.get_my_full_name()
+ self.my_path: str = self.get_my_path()
+ self.my_work_dir: str = self.get_my_work_dir()
+ self.my_temp_dir: str = self.get_my_temp_dir()
+ self.log_file_name: str = self.my_work_dir + self.my_name + ".log"
+ self.db_path: str = self.my_work_dir + self.my_name + ".db"
+ self.pid_file_path: str = self.my_work_dir + self.my_name + ".pid"
+
+ self.translate_dict: dict | None = None
- self.refresh()
+ os.makedirs(self.my_work_dir, exist_ok=True)
+ os.makedirs(self.my_temp_dir, exist_ok=True)
+
+ self.load_db()
+ self.set_default_config()
+
+ # Remove old log file
+ if self.db.config.isDeleteOldLogFile and os.path.isfile(self.log_file_name):
+ os.remove(self.log_file_name)
# Catch the shutdown signal
signal.signal(signal.SIGINT, self.exit)
signal.signal(signal.SIGTERM, self.exit)
- #end define
- def start_service(self, service_name: str, sleep: int = 1):
+ def start_service(self, service_name: str, sleep: int = 1) -> None:
self.add_log(f"Start/restart {service_name} service", "debug")
args = ["systemctl", "restart", service_name]
subprocess.run(args)
@@ -172,43 +204,13 @@ def start_service(self, service_name: str, sleep: int = 1):
time.sleep(sleep)
# end define
- def stop_service(self, service_name: str):
+ def stop_service(self, service_name: str) -> None:
self.add_log(f"Stop {service_name} service", "debug")
args = ["systemctl", "stop", service_name]
subprocess.run(args)
# end define
- def refresh(self):
- # Get program, log and database file name
- user = get_username()
- my_name = self.get_my_name()
- my_work_dir = self.get_my_work_dir()
- self.buffer.my_name = my_name
- self.buffer.my_dir = self.get_my_dir()
- self.buffer.my_full_name = self.get_my_full_name()
- self.buffer.my_path = self.get_my_path()
- self.buffer.my_work_dir = my_work_dir
- self.buffer.my_temp_dir = self.get_my_temp_dir()
- self.buffer.log_file_name = my_work_dir + my_name + ".log"
- self.buffer.db_path = my_work_dir + my_name + ".db"
- self.buffer.pid_file_path = my_work_dir + my_name + ".pid"
- self.buffer.venvs_dir = f"/home/{user}/.local/venv"
-
- # Check all directorys
- os.makedirs(self.buffer.my_work_dir, exist_ok=True)
- os.makedirs(self.buffer.my_temp_dir, exist_ok=True)
-
- # Load local database
- self.load_db()
- self.set_default_config()
-
- # Remove old log file
- if self.db.config.isDeleteOldLogFile and os.path.isfile(self.buffer.log_file_name):
- os.remove(self.buffer.log_file_name)
- #end if
- #end define
-
- def run(self):
+ def run(self) -> None:
# Check args
if "-ef" in sys.argv:
file = open(os.devnull, 'w')
@@ -234,10 +236,10 @@ def run(self):
self.start_cycle(self.write_log, sec=1)
if self.db.config.isLocaldbSaving is True:
self.start_cycle(self.save_db, sec=1)
- self.buffer.thread_count_old = threading.active_count()
+ self.thread_count_old = threading.active_count()
# Logging the start of the program
- self.add_log(f"Start program `{self.buffer.my_path}`")
+ self.add_log(f"Start program `{self.my_path}`")
#end define
def set_default_config(self):
@@ -262,7 +264,7 @@ def set_default_config(self):
#end define
def start_only_one_process(self):
- pid_file_path = self.buffer.pid_file_path
+ pid_file_path = self.pid_file_path
if os.path.isfile(pid_file_path):
file = open(pid_file_path, 'r')
pid_str = file.read()
@@ -273,7 +275,7 @@ def start_only_one_process(self):
full_process_name = " ".join(process.cmdline())
except:
full_process_name = ""
- if full_process_name.find(self.buffer.my_full_name) > -1:
+ if full_process_name.find(self.my_full_name) > -1:
print("The process is already running")
sys.exit(1)
#end if
@@ -283,7 +285,7 @@ def start_only_one_process(self):
def write_pid(self):
pid = os.getpid()
pid_str = str(pid)
- pid_file_path = self.buffer.pid_file_path
+ pid_file_path = self.pid_file_path
with open(pid_file_path, 'w') as file:
file.write(pid_str)
#end define
@@ -293,19 +295,19 @@ def self_test(self):
memory_using = b2mb(process.memory_info().rss)
free_space_memory = b2mb(psutil.virtual_memory().available)
thread_count = threading.active_count()
- self.buffer.free_space_memory = free_space_memory
- self.buffer.memory_using = memory_using
- self.buffer.thread_count = thread_count
+ self.free_space_memory = free_space_memory
+ self.memory_using = memory_using
+ self.thread_count = thread_count
if memory_using > self.db.config.memoryUsinglimit:
self.db.config.memoryUsinglimit += 50
self.add_log(f"Memory using: {memory_using}Mb, free: {free_space_memory}Mb", WARNING)
#end define
def print_self_testing_result(self):
- thread_count_old = self.buffer.thread_count_old
- thread_count_new = self.buffer.thread_count
- memory_using = self.buffer.memory_using
- free_space_memory = self.buffer.free_space_memory
+ thread_count_old = self.thread_count_old
+ thread_count_new = self.thread_count
+ memory_using = self.memory_using
+ free_space_memory = self.free_space_memory
self.add_log(color_text("{blue}Self testing informatinon:{endc}"))
self.add_log(f"Threads: {thread_count_new} -> {thread_count_old}")
self.add_log(f"Memory using: {memory_using}Mb, free: {free_space_memory}Mb")
@@ -325,7 +327,6 @@ def get_my_full_name(self):
#end define
def get_my_name(self):
- '''return "test"'''
my_full_name = self.get_my_full_name()
my_name = my_full_name[:my_full_name.rfind('.')]
return my_name
@@ -337,36 +338,22 @@ def get_my_path(self):
return my_path
#end define
- def get_my_dir(self):
- '''return "/some_dir/"'''
- my_path = self.get_my_path()
- # my_dir = my_path[:my_path.rfind('/')+1]
- my_dir = os.path.dirname(my_path)
- my_dir = dir(my_dir)
- return my_dir
- #end define
-
def get_my_work_dir(self):
- '''return "/usr/local/bin/test/" or "/home/user/.local/share/test/"'''
if self.check_root_permission():
- # https://ru.wikipedia.org/wiki/FHS
- program_files_dir = "/usr/local/bin/"
+ program_files_dir = "/usr/local/bin"
else:
- # https://habr.com/ru/post/440620/
- user_home_dir = dir(os.getenv("HOME"))
- program_files_dir = dir(os.getenv("XDG_DATA_HOME", user_home_dir + ".local/share/"))
- my_name = self.get_my_name()
- my_work_dir = dir(program_files_dir + my_name)
+ program_files_dir = os.getenv("XDG_DATA_HOME")
+ if not program_files_dir:
+ user_home_dir = os.getenv("HOME")
+ if user_home_dir is None:
+ raise Exception("HOME environment variable is not set")
+ program_files_dir = os.path.join(user_home_dir, ".local", "share")
+ my_work_dir = os.path.join(program_files_dir, self.my_name, "")
return my_work_dir
- #end define
def get_my_temp_dir(self):
- '''return "/tmp/test/"'''
- temp_files_dir = "/tmp/" # https://ru.wikipedia.org/wiki/FHS
- my_name = self.get_my_name()
- my_temp_dir = dir(temp_files_dir + my_name)
+ my_temp_dir = os.path.join("/tmp", self.my_name, "")
return my_temp_dir
- #end define
def get_lang(self):
lang = os.getenv("LANG", "en")
@@ -420,18 +407,18 @@ def add_log(self, input_text, mode=INFO):
log_text = mode_text + time_text + thread_text + input_text
# Queue for recording
- self.buffer.log_list.append(log_text)
+ self.log_list.append(log_text)
# Print log text
print(log_text)
#end define
def write_log(self):
- log_file_name = self.buffer.log_file_name
+ log_file_name = self.log_file_name
with open(log_file_name, 'a') as file:
- while len(self.buffer.log_list) > 0:
- log_text = self.buffer.log_list.pop(0)
+ while len(self.log_list) > 0:
+ log_text = self.log_list.pop(0)
file.write(log_text + '\n')
#end while
#end with
@@ -478,26 +465,26 @@ def base64_to_dict_with_decompress(self, item):
return data
#end define
- def exit(self, signum=None, frame=None):
+ def exit(self, signum: int | None = None, frame: FrameType | None = None) -> None:
self.working = False
- if os.path.isfile(self.buffer.pid_file_path):
- os.remove(self.buffer.pid_file_path)
+ if os.path.isfile(self.pid_file_path):
+ os.remove(self.pid_file_path)
self.save()
sys.exit(0)
#end define
- def read_file(self, path):
+ def read_file(self, path: str) -> str:
with open(path, 'rt') as file:
text = file.read()
return text
#end define
- def write_file(self, path, text=""):
+ def write_file(self, path: str, text: str = "") -> None:
with open(path, 'wt') as file:
file.write(text)
#end define
- def read_db(self, db_path):
+ def read_db(self, db_path: str) -> Dict:
err = None
for i in range(10):
try:
@@ -508,14 +495,14 @@ def read_db(self, db_path):
raise Exception(f"read_db error: {err}")
#end define
- def read_db_process(self, db_path):
+ def read_db_process(self, db_path: str) -> Dict:
text = self.read_file(db_path)
data = json.loads(text)
return Dict(data)
#end define
- def write_db(self, data: dict):
- db_path = os.path.realpath(self.buffer.db_path)
+ def write_db(self, data: Mapping[str, Any]) -> None:
+ db_path = os.path.realpath(self.db_path)
text = json.dumps(data, indent=4)
self.lock_file(db_path)
try:
@@ -523,7 +510,7 @@ def write_db(self, data: dict):
finally:
self.unlock_file(db_path)
- def _write_file_atomic(self, path: str, text=""):
+ def _write_file_atomic(self, path: str, text: str = "") -> None:
tmp_path = f"{path}.tmp.{os.getpid()}.{threading.get_ident()}"
try:
with open(tmp_path, 'wt') as file:
@@ -537,7 +524,7 @@ def _write_file_atomic(self, path: str, text=""):
except OSError:
pass
- def lock_file(self, path):
+ def lock_file(self, path: str) -> None:
pid_path = path + ".lock"
for i in range(300):
if os.path.isfile(pid_path):
@@ -548,7 +535,7 @@ def lock_file(self, path):
raise Exception("lock_file error: time out.")
#end define
- def unlock_file(self, path):
+ def unlock_file(self, path: str) -> None:
pid_path = path + ".lock"
try:
os.remove(pid_path)
@@ -640,29 +627,29 @@ def mtdp_fcfc(self, key, local_data, file_data, old_file_data):
raise Exception(f"mtdp_fcfc error: {key} -> {tmp.local_item_type}, {tmp.file_item_type}, {tmp.old_file_item_type}")
#end define
- def save_db(self):
- file_data = self.read_db(self.buffer.db_path)
- need_write_local_data = self.merge_three_dicts(self.db, file_data, self.buffer.old_db)
- self.buffer.old_db = Dict(self.db)
+ def save_db(self) -> None:
+ file_data = self.read_db(self.db_path)
+ need_write_local_data = self.merge_three_dicts(self.db, file_data, self.old_db)
+ self.old_db = Dict(self.db)
if need_write_local_data:
self.write_db(self.db)
#end define
- def save(self):
+ def save(self) -> None:
self.save_db()
self.write_log()
#end define
- def load_db(self, db_path=False):
+ def load_db(self, db_path: str | None = None) -> bool:
result = False
- if not db_path:
- db_path = self.buffer.db_path
+ if db_path is None:
+ db_path = self.db_path
if not os.path.isfile(db_path):
self.write_db(self.db)
try:
file_data = self.read_db(db_path)
self.db = Dict(file_data)
- self.buffer.old_db = Dict(file_data)
+ self.old_db = Dict(file_data)
self.set_default_config()
result = True
except Exception as err:
@@ -670,12 +657,11 @@ def load_db(self, db_path=False):
return result
#end define
- def get_settings(self, file_path):
+ def get_settings(self, file_path: str) -> None:
try:
- file = open(file_path)
- text = file.read()
- file.close()
- self.db = json.loads(text)
+ with open(file_path) as file:
+ text = file.read()
+ self.db = Dict(json.loads(text))
self.save_db()
print("get setting successful: " + file_path)
self.exit()
@@ -683,15 +669,15 @@ def get_settings(self, file_path):
self.add_log(f"get_settings error: {err}", WARNING)
#end define
- def get_python3_path(self):
+ def get_python3_path(self) -> str:
python3_path = "/usr/bin/python3"
if platform.system() == "OpenBSD":
python3_path = "/usr/local/bin/python3"
return python3_path
#end define
- def fork_daemon(self):
- my_path = self.buffer.my_path
+ def fork_daemon(self) -> None:
+ my_path = self.my_path
python3_path = self.get_python3_path()
cmd = " ".join([python3_path, my_path, "-ef", '&'])
os.system(cmd)
@@ -699,9 +685,9 @@ def fork_daemon(self):
self.exit()
#end define
- def add_to_crone(self):
+ def add_to_crone(self) -> None:
python3_path = self.get_python3_path()
- cron_text = f"@reboot {python3_path} \"{self.buffer.my_path}\" -d\n"
+ cron_text = f"@reboot {python3_path} \"{self.my_path}\" -d\n"
os.system("crontab -l > mycron")
with open("mycron", 'a') as file:
file.write(cron_text)
@@ -710,8 +696,7 @@ def add_to_crone(self):
self.exit()
#end define
- def try_function(self, func, **kwargs):
- args = kwargs.get("args")
+ def try_function(self, func: Callback, args: Sequence[Any] | None = None) -> Any:
result = None
try:
if args is None:
@@ -723,9 +708,9 @@ def try_function(self, func, **kwargs):
return result
#end define
- def start_thread(self, func, **kwargs):
- name = kwargs.get("name", func.__name__)
- args = kwargs.get("args")
+ def start_thread(self, func: Callback, name: str | None = None, args: Sequence[Any] | None = None) -> None:
+ if name is None:
+ name = func.__name__
if args is None:
threading.Thread(target=func, name=name, daemon=True).start()
else:
@@ -733,33 +718,34 @@ def start_thread(self, func, **kwargs):
self.add_log("Thread {name} started".format(name=name), "debug")
#end define
- def cycle(self, func, sec, args):
+ def cycle(self, func: Callback, sec: float, args: Sequence[Any] | None) -> None:
while self.working:
self.try_function(func, args=args)
time.sleep(sec)
#end define
- def start_cycle(self, func, **kwargs):
- name = kwargs.get("name", func.__name__)
- args = kwargs.get("args")
- sec = kwargs.get("sec")
+ def start_cycle(self, func: Callback, sec: float, args: Sequence[Any] | None = None, name: str | None = None) -> None:
+ if name is None:
+ name = func.__name__
self.start_thread(self.cycle, name=name, args=(func, sec, args))
#end define
- def init_translator(self, file_path=None):
+ def init_translator(self, file_path: str | None = None) -> None:
if file_path is None:
file_path = self.db.translate_file_path
- file = open(file_path, encoding="utf-8")
- text = file.read()
- file.close()
- self.buffer.translate = json.loads(text)
+ assert file_path is not None
+ with open(file_path, encoding="utf-8") as file:
+ text = file.read()
+ self.translate_dict = json.loads(text)
#end define
- def translate(self, text):
+ def translate(self, text: str) -> str:
+ if self.translate_dict is None:
+ return text
lang = self.get_lang()
text_list = text.split(' ')
for item in text_list:
- sitem = self.buffer.translate.get(item)
+ sitem = self.translate_dict.get(item)
if sitem is None:
continue
ritem = sitem.get(lang)
@@ -769,7 +755,7 @@ def translate(self, text):
#end define
#end class
-def get_hash_md5(file_name):
+def get_hash_md5(file_name: str) -> str:
blocksize = 65536
hasher = hashlib.md5()
with open(file_name, 'rb') as file:
@@ -780,7 +766,7 @@ def get_hash_md5(file_name):
return hasher.hexdigest()
#end define
-def parse(text, search, search2=None):
+def parse(text: str | None, search: str | None, search2: str | None = None) -> str | None:
if search is None or text is None:
return None
if search not in text:
@@ -791,33 +777,18 @@ def parse(text, search, search2=None):
return text
#end define
-def ping(hostname):
- process = subprocess.run(["ping", "-c", 1, "-w", 3, hostname], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
- if process.returncode == 0:
- result = True
- else:
- result = False
- return result
-#end define
-
-def get_request(url):
+def get_request(url: str) -> str:
link = urlopen(url)
data = link.read()
text = data.decode("utf-8")
return text
#end define
-def dir(input_dir):
- if input_dir[-1:] != '/':
- input_dir += '/'
- return input_dir
-#end define
-
-def b2mb(item):
+def b2mb(item: int | str) -> float:
return round(int(item) / 1000 / 1000, 2)
#end define
-def search_file_in_dir(path, file_name):
+def search_file_in_dir(path: str, file_name: str) -> str | None:
result = None
for entry in os.scandir(path):
if entry.name.startswith('.'):
@@ -834,7 +805,7 @@ def search_file_in_dir(path, file_name):
return result
#end define
-def search_dir_in_dir(path, dir_name):
+def search_dir_in_dir(path: str, dir_name: str) -> str | None:
result = None
for entry in os.scandir(path):
if entry.name.startswith('.'):
@@ -850,15 +821,15 @@ def search_dir_in_dir(path, dir_name):
return result
#end define
-def get_dir_from_path(path):
+def get_dir_from_path(path: str) -> str:
return path[:path.rfind('/') + 1]
#end define
-def get_full_name_from_path(path):
+def get_full_name_from_path(path: str) -> str:
return path[path.rfind('/') + 1:]
#end define
-def print_table(arr):
+def print_table(arr: Sequence[Sequence[Any]]) -> None:
buff = dict()
for i in range(len(arr[0])):
buff[i] = list()
@@ -875,11 +846,11 @@ def print_table(arr):
print()
#end define
-def get_timestamp():
+def get_timestamp() -> int:
return int(time.time())
#end define
-def color_text(text):
+def color_text(text: str) -> str:
for cname in bcolors.colors:
item = '{' + cname + '}'
if item in text:
@@ -887,36 +858,32 @@ def color_text(text):
return text
#end define
-def color_print(text):
+def color_print(text: str) -> None:
text = color_text(text)
print(text)
#end define
-def get_load_avg():
+def get_load_avg() -> list[float]:
psys = platform.system()
if psys in ["FreeBSD", "Darwin", "OpenBSD"]:
loadavg = subprocess.check_output(["sysctl", "-n", "vm.loadavg"]).decode('utf-8')
if psys != "OpenBSD":
m = re.match(r"{ (\d+\.\d+) (\d+\.\d+) (\d+\.\d+).+", loadavg)
else:
- m = re.match("(\d+\.\d+) (\d+\.\d+) (\d+\.\d+)", loadavg)
+ m = re.match(r"(\d+\.\d+) (\d+\.\d+) (\d+\.\d+)", loadavg)
if m:
loadavg_arr = [m.group(1), m.group(2), m.group(3)]
else:
- loadavg_arr = [0.00, 0.00, 0.00]
+ loadavg_arr = ["0.00", "0.00", "0.00"]
else:
file = open("/proc/loadavg")
loadavg = file.read()
file.close()
loadavg_arr = loadavg.split(' ')
- output = loadavg_arr[:3]
- for i in range(len(output)):
- output[i] = float(output[i])
- return output
-#end define
+ return [float(item) for item in loadavg_arr[:3]]
-def get_internet_interface_name():
+def get_internet_interface_name() -> str:
if platform.system() == "OpenBSD":
cmd = "ifconfig egress"
text = subprocess.getoutput(cmd)
@@ -937,24 +904,25 @@ def get_internet_interface_name():
return interface_name
#end define
-def thr_sleep():
+def thr_sleep() -> None:
while True:
time.sleep(10)
#end define
-def timestamp2datetime(timestamp, format="%d.%m.%Y %H:%M:%S"):
+def timestamp2datetime(timestamp: int | float, format: str = "%d.%m.%Y %H:%M:%S") -> str:
datetime = time.localtime(timestamp)
result = time.strftime(format, datetime)
return result
#end define
-def timeago(timestamp=False):
+def timeago(timestamp: int | date_time_library.datetime | Literal[False] = False) -> str:
"""
Get a datetime object or a int() Epoch timestamp and return a
pretty string like 'an hour ago', 'Yesterday', '3 months ago',
'just now', etc
"""
now = date_time_library.datetime.now()
+ diff = date_time_library.timedelta(0)
if type(timestamp) is int:
diff = now - date_time_library.datetime.fromtimestamp(timestamp)
elif isinstance(timestamp, date_time_library.datetime):
@@ -987,7 +955,7 @@ def timeago(timestamp=False):
return str(day_diff // 365) + " years ago"
#end define
-def time2human(diff):
+def time2human(diff: int | float) -> str:
dt = date_time_library.timedelta(seconds=diff)
if dt.days < 0:
return ''
@@ -1002,18 +970,18 @@ def time2human(diff):
return str(dt.days) + " days"
#end define
-def dec2hex(dec):
+def dec2hex(dec: int) -> str:
h = hex(dec)[2:]
if len(h) % 2 > 0:
h = '0' + h
return h
#end define
-def hex2dec(h):
+def hex2dec(h: str) -> int:
return int(h, base=16)
#end define
-def run_as_root(args):
+def run_as_root(args: list[str]) -> int:
psys = platform.system()
if os.geteuid() != 0:
if psys == "Linux":
@@ -1030,7 +998,7 @@ def run_as_root(args):
return exit_code
#end define
-def get_username():
+def get_username() -> str | None:
user = os.getenv("USER")
return user
#end define
@@ -1121,15 +1089,15 @@ def add2systemd(**kwargs):
subprocess.run(args)
#end define
-def ip2int(addr):
+def ip2int(addr: str) -> int:
return struct.unpack("!i", socket.inet_aton(addr))[0]
#end define
-def int2ip(dec):
+def int2ip(dec: int) -> str:
return socket.inet_ntoa(struct.pack("!i", dec))
#end define
-def get_service_status(name):
+def get_service_status(name: str) -> bool:
status = False
psys = platform.system()
if psys == "OpenBSD":
@@ -1141,7 +1109,7 @@ def get_service_status(name):
return status
#end define
-def get_service_uptime(name):
+def get_service_uptime(name: str) -> int | None:
property = "ExecMainStartTimestampMonotonic"
args = ["systemctl", "show", name, "--property=" + property]
process = subprocess.run(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=3)
@@ -1158,7 +1126,7 @@ def get_service_uptime(name):
return uptime
#end define
-def get_service_pid(name):
+def get_service_pid(name: str) -> int | None:
property = "MainPID"
args = ["systemctl", "show", name, "--property=" + property]
process = subprocess.run(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=3)
@@ -1166,11 +1134,14 @@ def get_service_pid(name):
err = process.stderr.decode("utf-8")
if len(err) > 0:
return
- pid = int(parse(output, f"{property}=", '\n'))
+ pid_text = parse(output, f"{property}=", '\n')
+ if pid_text is None:
+ return None
+ pid = int(pid_text)
return pid
#end define
-def get_git_hash(git_path, short=False):
+def get_git_hash(git_path: str, short: bool = False) -> str | None:
args = ["git", "rev-parse", "HEAD"]
if short is True:
args.insert(2, '--short')
@@ -1184,8 +1155,9 @@ def get_git_hash(git_path, short=False):
return buff[0]
#end define
-def get_git_url(git_path):
+def get_git_url(git_path: str) -> str | None:
args = ["git", "remote", "-v"]
+ output = ""
try:
process = subprocess.run(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
cwd=git_path, timeout=3)
@@ -1194,7 +1166,7 @@ def get_git_url(git_path):
except Exception as ex:
err = str(ex)
if len(err) > 0:
- return
+ return None
lines = output.split('\n')
url = None
for line in lines:
@@ -1205,7 +1177,7 @@ def get_git_url(git_path):
return url
#end define
-def get_git_author_and_repo(git_path):
+def get_git_author_and_repo(git_path: str) -> tuple[str | None, str | None]:
author = None
repo = None
url = get_git_url(git_path)
@@ -1219,7 +1191,7 @@ def get_git_author_and_repo(git_path):
return author, repo
#end define
-def get_git_last_remote_commit(git_path, branch="master"):
+def get_git_last_remote_commit(git_path: str, branch: str = "master") -> str | None:
author, repo = get_git_author_and_repo(git_path)
if author is None or repo is None:
return
@@ -1234,14 +1206,14 @@ def get_git_last_remote_commit(git_path, branch="master"):
return sha
#end define
-def get_git_branch(git_path):
+def get_git_branch(git_path: str) -> str | None:
args = ["git", "branch", "-v"]
process = subprocess.run(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=git_path,
timeout=3)
output = process.stdout.decode("utf-8")
err = process.stderr.decode("utf-8")
if len(err) > 0:
- return
+ return None
lines = output.split('\n')
branch = None
for line in lines:
@@ -1252,8 +1224,10 @@ def get_git_branch(git_path):
return branch
#end define
-def check_git_update(git_path):
+def check_git_update(git_path: str) -> bool | None:
branch = get_git_branch(git_path)
+ if branch is None:
+ return None
new_hash = get_git_last_remote_commit(git_path, branch)
old_hash = get_git_hash(git_path)
result = False
@@ -1264,7 +1238,7 @@ def check_git_update(git_path):
return result
#end define
-def read_config_from_file(config_path:str):
+def read_config_from_file(config_path: str) -> Dict:
file = open(config_path, 'rt')
text = file.read()
file.close()
@@ -1273,20 +1247,9 @@ def read_config_from_file(config_path:str):
#end define
-def write_config_to_file(config_path:str, data:dict):
+def write_config_to_file(config_path: str, data: Mapping[str, Any]) -> None:
text = json.dumps(data, indent=4)
file = open(config_path, 'wt')
file.write(text)
file.close()
#end define
-
-def get_own_ip():
- pat = re.compile(r"^((25[0-5]|(2[0-4]|1\d|[1-9]|)\d)(\.(?!$)|$)){4}$")
- requests.packages.urllib3.util.connection.HAS_IPV6 = False
- ip = requests.get("https://ifconfig.me/ip").text
- if not pat.fullmatch(ip):
- ip = requests.get("https://ipinfo.io/ip").text
- if not pat.fullmatch(ip):
- raise Exception('Cannot get own IP address')
- return ip
-#end define
diff --git a/mytoncore/functions.py b/mytoncore/functions.py
index 97daf8d0..7de3c5ae 100755
--- a/mytoncore/functions.py
+++ b/mytoncore/functions.py
@@ -687,14 +687,14 @@ def backup_mytoncore_logs(local: MyPyClass, ton: MyTonCore):
return
now = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
log_backup_tmp_path = os.path.join(logs_path, 'mytoncore_log_' + now + '.log')
- subprocess.run(["cp", local.buffer.log_file_name, log_backup_tmp_path])
+ subprocess.run(["cp", local.log_file_name, log_backup_tmp_path])
ton.clear_dir(logs_path)
def check_mytoncore_db(local: MyPyClass, ton: MyTonCore):
try:
- local.read_db(local.buffer.db_path)
- backup_path = local.buffer.db_path + ".backup"
+ local.read_db(local.db_path)
+ backup_path = local.db_path + ".backup"
if not os.path.isfile(backup_path) or time.time() - os.path.getmtime(backup_path) > 3600*6:
ton.create_self_db_backup()
return
@@ -757,6 +757,6 @@ def mytoncore():
from mypylib.mypylib import MyPyClass
local = MyPyClass('mytoncore.py')
- print('Local DB path:', local.buffer.db_path)
+ print('Local DB path:', local.db_path)
Init(local)
General(local)
diff --git a/mytoncore/models.py b/mytoncore/models.py
index db803d9b..7a0cc0f5 100644
--- a/mytoncore/models.py
+++ b/mytoncore/models.py
@@ -1,168 +1,128 @@
+# pyright: strict
+
+from __future__ import annotations
import os
+from dataclasses import dataclass
+@dataclass
class Wallet:
- def __init__(self, name, path, version):
- self.name = name
- self.path = path
- self.addrFilePath = f"{path}.addr"
- self.privFilePath = f"{path}.pk"
- self.bocFilePath = f"{path}-query.boc"
- self.addrFull = None
- self.workchain = None
- self.addr = None
- self.addrB64 = None
- self.addrB64_init = None
- self.oldseqno = None
- self.account = None
- self.subwallet = None
- self.version = version
- #end define
-
- def Delete(self):
- os.remove(self.addrFilePath)
- os.remove(self.privFilePath)
- #end define
-#end class
-
-
+ name: str
+ path: str
+ version: str
+ addrFull: str | None = None
+ workchain: int | None = None
+ addr: str | None = None
+ addrB64: str | None = None
+ addrB64_init: str | None = None
+ oldseqno: int | None = None
+ subwallet: int | None = None
+
+ def __post_init__(self):
+ self.addrFilePath: str = f"{self.path}.addr"
+ self.privFilePath: str = f"{self.path}.pk"
+ self.bocFilePath: str = f"{self.path}-query.boc"
+
+ def Delete(self):
+ os.remove(self.addrFilePath)
+ os.remove(self.privFilePath)
+
+
+@dataclass
class Account:
- def __init__(self, workchain, addr):
- self.workchain = workchain
- self.addr = addr
- self.addrB64 = None
- self.addrFull = None
- self.status = "empty"
- self.balance = 0
- self.lt = None
- self.hash = None
- self.codeHash = None
- #end define
-#end class
-
-
-class Block():
- def __init__(self, str=None):
- self.workchain = None
- self.shardchain = None
- self.seqno = None
- self.rootHash = None
- self.fileHash = None
- self.ParsBlock(str)
- #end define
-
- def ParsBlock(self, str):
- if str is None:
- return
- buff = str.split(':')
- self.rootHash = buff[1]
- self.fileHash = buff[2]
- buff = buff[0]
- buff = buff.replace('(', '')
- buff = buff.replace(')', '')
- buff = buff.split(',')
- self.workchain = int(buff[0])
- self.shardchain = buff[1]
- self.seqno = int(buff[2])
- #end define
-
- def __str__(self):
- result = f"({self.workchain},{self.shardchain},{self.seqno}):{self.rootHash}:{self.fileHash}"
- return result
- #end define
-
- def __repr__(self):
- return self.__str__()
- #end define
-
- def __eq__(self, other):
- if other is None:
- return False
- return self.rootHash == other.rootHash and self.fileHash == other.fileHash
- #end define
-#end class
-
-
-class Trans():
- def __init__(self, block, addr=None, lt=None, hash=None):
- self.block = block
- self.addr = addr
- self.lt = lt
- self.hash = hash
- #end define
-
- def __str__(self):
- return str(self.__dict__)
- #end define
-
- def __repr__(self):
- return self.__str__()
- #end define
-
- def __eq__(self, other):
- if other is None:
- return False
- return self.hash == other.hash
- #end define
-#end class
-
-
-class Message():
- def __init__(self):
- self.trans = None
- self.type = None
- self.time = None
- self.srcWorkchain = None
- self.destWorkchain = None
- self.srcAddr = None
- self.destAddr = None
- self.value = None
- self.body = None
- self.comment = None
- self.ihr_fee = None
- self.fwd_fee = None
- self.total_fees = None
- self.ihr_disabled = None
- self.hash = None
- #end define
-
- def GetFullAddr(self, workchain, addr):
- if addr is None:
- return
- return f"{workchain}:{addr}"
- #end define
-
- def __str__(self):
- return str(self.__dict__)
- #end define
-
- def __repr__(self):
- return self.__str__()
- #end define
-
- def __eq__(self, other):
- if other is None:
- return False
- return self.hash == other.hash
- #end define
-#end class
-
-
+ workchain: int
+ addr: str
+ addrB64: str | None = None
+ addrFull: str | None = None
+ status: str = "empty"
+ balance: float = 0
+ lt: str | None = None
+ hash: str | None = None
+ codeHash: str | None = None
+
+
+@dataclass
+class Block:
+ workchain: int
+ shardchain: str
+ seqno: int
+ rootHash: str
+ fileHash: str
+
+ @classmethod
+ def from_str(cls, s: str):
+ buff = s.split(":")
+ root_hash = buff[1]
+ file_hash = buff[2]
+ buff = buff[0]
+ buff = buff.replace("(", "")
+ buff = buff.replace(")", "")
+ buff = buff.split(",")
+ workchain = int(buff[0])
+ shardchain = buff[1]
+ seqno = int(buff[2])
+ return cls(workchain, shardchain, seqno, root_hash, file_hash)
+
+ def __str__(self):
+ result = f"({self.workchain},{self.shardchain},{self.seqno}):{self.rootHash}:{self.fileHash}"
+ return result
+
+ def __repr__(self):
+ return self.__str__()
+
+
+@dataclass
+class Transaction:
+ block: Block
+ type: str | None
+ time: int | None
+ total_fees: float | None
+
+ def __str__(self):
+ return str(self.__dict__)
+
+ def __repr__(self):
+ return self.__str__()
+
+
+@dataclass
+class Message:
+ transaction: Transaction
+ src_workchain: int | None
+ dest_workchain: int | None
+ src_addr: str | None
+ dest_addr: str | None
+ value: float | None
+ body: str | None
+ comment: str | None
+ ihr_fee: float | None
+ fwd_fee: float | None
+ ihr_disabled: bool | None
+
+ @property
+ def time(self):
+ return self.transaction.time
+
+ def __str__(self):
+ return str(self.__dict__)
+
+ def __repr__(self):
+ return self.__str__()
+
+
+@dataclass
class Pool:
- def __init__(self, name, path):
- self.name = name
- self.path = path
- self.addrFilePath = f"{path}.addr"
- self.bocFilePath = f"{path}-query.boc"
- self.addrFull = None
- self.workchain = None
- self.addr = None
- self.addrB64 = None
- self.addrB64_init = None
- self.account = None
- #end define
-
- def Delete(self):
- os.remove(self.addrFilePath)
- #end define
-#end class
+ name: str
+ path: str
+ addrFull: str | None = None
+ workchain: int | None = None
+ addr: str | None = None
+ addrB64: str | None = None
+ addrB64_init: str | None = None
+
+ def __post_init__(self):
+ self.addrFilePath: str = f"{self.path}.addr"
+ self.bocFilePath: str = f"{self.path}-query.boc"
+
+ def Delete(self):
+ os.remove(self.addrFilePath)
diff --git a/mytoncore/mytoncore.py b/mytoncore/mytoncore.py
index c6c35e5e..382f382e 100644
--- a/mytoncore/mytoncore.py
+++ b/mytoncore/mytoncore.py
@@ -1,3 +1,5 @@
+from __future__ import annotations
+
import os
import base64
import time
@@ -18,12 +20,12 @@
from mytoncore.validator_console import ValidatorConsole
from mytoncore.fift import Fift
from mytoncore.models import (
- Wallet,
- Account,
- Block,
- Trans,
- Message,
- Pool,
+ Wallet,
+ Account,
+ Block,
+ Transaction,
+ Message,
+ Pool,
)
from mypylib.mypylib import (
@@ -59,10 +61,10 @@ def Refresh(self):
self.local.load_db(self.dbFile)
if not self.walletsDir:
- self.walletsDir = self.local.buffer.my_work_dir + "wallets/"
- self.contractsDir = self.local.buffer.my_work_dir + "contracts/"
- self.poolsDir = self.local.buffer.my_work_dir + "pools/"
- self.tempDir = self.local.buffer.my_temp_dir
+ self.walletsDir = self.local.my_work_dir + "wallets/"
+ self.contractsDir = self.local.my_work_dir + "contracts/"
+ self.poolsDir = self.local.my_work_dir + "pools/"
+ self.tempDir = self.local.my_temp_dir
self.nodeName = self.local.db.get("nodeName")
if self.nodeName is None:
@@ -101,7 +103,7 @@ def Refresh(self):
#end define
def CheckConfigFile(self, fift, liteClient):
- mconfig_path = self.local.buffer.db_path
+ mconfig_path = self.local.db_path
backup_path = mconfig_path + ".backup"
if fift is None or liteClient is None:
self.local.add_log("The config file is broken", "warning")
@@ -118,7 +120,7 @@ def CheckConfigFile(self, fift, liteClient):
def create_self_db_backup(self):
self.local.add_log("Create backup config file", "info")
- mconfig_path = self.local.buffer.db_path
+ mconfig_path = self.local.db_path
backup_path = mconfig_path + ".backup"
backup_tmp_path = backup_path + '.tmp'
subprocess.run(["cp", mconfig_path, backup_tmp_path])
@@ -206,8 +208,10 @@ def GetAccount(self, inputAddr):
account.addr = xhex2hex(address)
account.addrB64 = self.AddrFull2AddrB64(addrFull)
account.addrFull = addrFull
- account.status = status
- account.balance = ng2g(value)
+ if status is not None:
+ account.status = status
+ if value is not None:
+ account.balance = ng2g(value)
account.lt = parse(result, "lt = ", ' ')
account.hash = parse(result, "hash = ", '\n')
account.codeHash = codeHash
@@ -222,7 +226,7 @@ def GetCodeHash(self, code):
return codeHash
#end define
- def GetAccountHistory(self, account, limit):
+ def GetAccountHistory(self, account, limit) -> list[Message]:
self.local.add_log("start GetAccountHistory function", "debug")
addr = f"{account.workchain}:{account.addr}"
lt = account.lt
@@ -253,78 +257,47 @@ def LastTransDump(self, addr, lt, transHash, count=10):
#outmsg = self.GetVarFromDict(item, "outmsg_cnt")
total_fees = self.GetVarFromDict(item, "total_fees.grams.value")
messages = self.GetMessagesFromTransaction(item)
- transData = dict()
- transData["type"] = type
- transData["trans"] = Trans(Block(block_str))
- transData["time"] = time
- #transData["outmsg"] = outmsg
- transData["total_fees"] = total_fees
- history += self.ParsMessages(messages, transData)
+ tr = Transaction(block=Block.from_str(block_str), type=type, time=time, total_fees=ng2g(total_fees))
+ history += self.parse_messages(messages, tr)
return history, prevTransLt, prevTransHash
#end define
- def ParsMessages(self, messages, transData):
+ def parse_messages(self, messages: list[dict], tr: Transaction):
history = list()
- #for item in messages:
for data in messages:
ihr_disabled = self.GetVarFromDict(data, "message.ihr_disabled")
- bounce = self.GetVarFromDict(data, "message.bounce")
- bounced = self.GetVarFromDict(data, "message.bounced")
- srcWorkchain = self.GetVarFromDict(data, "message.info.src.workchain_id")
+ src_workchain = self.GetVarFromDict(data, "message.info.src.workchain_id")
address = self.GetVarFromDict(data, "message.info.src.address")
- srcAddr = xhex2hex(address)
- #if address:
- # src = "{}:{}".format(workchain, xhex2hex(address))
- #end if
+ src_addr = xhex2hex(address)
- destWorkchain = self.GetVarFromDict(data, "message.info.dest.workchain_id")
+ dest_workchain = self.GetVarFromDict(data, "message.info.dest.workchain_id")
address = self.GetVarFromDict(data, "message.info.dest.address")
- destAddr = xhex2hex(address)
- #if address:
- # dest = "{}:{}".format(workchain, xhex2hex(address))
- #end if
+ dest_addr = xhex2hex(address)
grams = self.GetVarFromDict(data, "message.info.value.grams.value")
ihr_fee = self.GetVarFromDict(data, "message.info.ihr_fee.value")
fwd_fee = self.GetVarFromDict(data, "message.info.fwd_fee.value")
- # import_fee = self.GetVarFromDict(data, "message.info.import_fee.value")
- #body = self.GetVarFromDict(data, "message.body.value")
message = self.GetItemFromDict(data, "message")
body = self.GetItemFromDict(message, "body")
value = self.GetItemFromDict(body, "value")
body = self.GetBodyFromDict(value)
comment = self.GetComment(body)
- #storage_ph
- #credit_ph
- #compute_ph.gas_fees
- #compute_ph.gas_used
- #compute_ph.gas_limit
-
- message = Message()
- message.type = transData.get("type")
- message.block = transData.get("block")
- message.trans = transData.get("trans")
- message.time = transData.get("time")
- #message.outmsg = transData.get("outmsg")
- message.total_fees = ng2g(transData.get("total_fees"))
- message.ihr_disabled = ihr_disabled
- message.bounce = bounce
- message.bounced = bounced
- message.srcWorkchain = srcWorkchain
- message.destWorkchain = destWorkchain
- message.srcAddr = srcAddr
- message.destAddr = destAddr
- message.value = ng2g(grams)
- message.body = body
- message.comment = comment
- message.ihr_fee = ng2g(ihr_fee)
- message.fwd_fee = ng2g(fwd_fee)
- #message.storage_ph = storage_ph
- #message.credit_ph = credit_ph
- #message.compute_ph = compute_ph
+ message = Message(
+ transaction=tr,
+ ihr_disabled=ihr_disabled,
+ src_workchain=src_workchain,
+ dest_workchain=dest_workchain,
+ src_addr=src_addr,
+ dest_addr=dest_addr,
+ value=ng2g(grams),
+ body=body,
+ comment=comment,
+ ihr_fee=ng2g(ihr_fee),
+ fwd_fee=ng2g(fwd_fee)
+ )
history.append(message)
#end for
return history
@@ -652,7 +625,7 @@ def GetLastBlock(self):
for line in lines:
if "latest masterchain block" in line:
buff = line.split(' ')
- block = Block(buff[7])
+ block = Block.from_str(buff[7])
break
return block
#end define
@@ -701,54 +674,10 @@ def GetBlock(self, workchain, shardchain, seqno):
cmd = cmd.format(workchain=workchain, shardchain=shardchain, seqno=seqno)
result = self.liteClient.Run(cmd)
block_str = parse(result, "block header of ", ' ')
- block = Block(block_str)
+ block = Block.from_str(block_str)
return block
#end define
- def GetTransactions(self, block):
- transactions = list()
- cmd = "listblocktrans {block} 999999".format(block=block)
- result = self.liteClient.Run(cmd)
- lines = result.split('\n')
- for line in lines:
- if "transaction #" in line:
- buff = line.split(' ')
- trans_id = buff[1]
- trans_id = trans_id.replace('#', '')
- trans_id = trans_id.replace(':', '')
- trans_addr = buff[3]
- trans_lt = buff[5]
- trans_hash = buff[7]
- trans = Trans(block, trans_addr, trans_lt, trans_hash)
- transactions.append(trans)
- return transactions
- #end define
-
- def GetTrans(self, trans):
- addr = f"{trans.block.workchain}:{trans.addr}"
- messageList = list()
- cmd = f"dumptrans {trans.block} {addr} {trans.lt}"
- result = self.liteClient.Run(cmd)
- data = self.Result2Dict(result)
- for key, item in data.items():
- if "transaction is" not in key:
- continue
- description = self.GetKeyFromDict(item, "description")
- type = self.GetVar(description, "trans_")
- time = self.GetVarFromDict(item, "time")
- #outmsg = self.GetVarFromDict(item, "outmsg_cnt")
- total_fees = self.GetVarFromDict(item, "total_fees.grams.value")
- messages = self.GetMessagesFromTransaction(item)
- transData = dict()
- transData["type"] = type
- transData["trans"] = trans
- transData["time"] = time
- #transData["outmsg"] = outmsg
- transData["total_fees"] = total_fees
- messageList += self.ParsMessages(messages, transData)
- return messageList
- #end define
-
def GetShards(self, block=None):
shards = list()
if block:
@@ -762,7 +691,7 @@ def GetShards(self, block=None):
buff = line.split(' ')
shard_id = buff[1]
shard_id = shard_id.replace('#', '')
- shard_block = Block(buff[3])
+ shard_block = Block.from_str(buff[3])
shard = {"id": shard_id, "block": shard_block}
shards.append(shard)
return shards
@@ -1824,7 +1753,7 @@ def GetValidatorConfig(self):
def GetOverlaysStats(self):
self.local.add_log("start GetOverlaysStats function", "debug")
- resultFilePath = self.local.buffer.my_temp_dir + "getoverlaysstats.json"
+ resultFilePath = self.local.my_temp_dir + "getoverlaysstats.json"
result = self.validatorConsole.Run(f"getoverlaysstatsjson {resultFilePath}")
if "wrote stats" not in result:
raise Exception(f"GetOverlaysStats error: {result}")
@@ -1901,7 +1830,7 @@ def MoveCoins(self, wallet, dest, coins, **kwargs):
#end if
seqno = self.GetSeqno(wallet)
- resultFilePath = self.local.buffer.my_temp_dir + wallet.name + "_wallet-query"
+ resultFilePath = self.local.my_temp_dir + wallet.name + "_wallet-query"
if "v1" in wallet.version:
fiftScript = "wallet.fif"
args = [fiftScript, wallet.path, dest, seqno, coins, "-m", mode, resultFilePath]
@@ -1932,7 +1861,7 @@ def MoveCoinsFromHW(self, wallet, destList, **kwargs):
return
#end if
- orderFilePath = self.local.buffer.my_temp_dir + wallet.name + "_order.txt"
+ orderFilePath = self.local.my_temp_dir + wallet.name + "_order.txt"
lines = list()
for dest, coins in destList:
lines.append("SEND {dest} {coins}".format(dest=dest, coins=coins))
@@ -1946,7 +1875,7 @@ def MoveCoinsFromHW(self, wallet, destList, **kwargs):
elif "v2" in wallet.version:
fiftScript = "highload-wallet-v2.fif"
seqno = self.GetSeqno(wallet)
- resultFilePath = self.local.buffer.my_temp_dir + wallet.name + "_wallet-query"
+ resultFilePath = self.local.my_temp_dir + wallet.name + "_wallet-query"
args = [fiftScript, wallet.path, wallet.subwallet, seqno, orderFilePath, resultFilePath]
if flags:
args += flags
@@ -2790,11 +2719,11 @@ def GetVarFromDict(self, data, search):
return result
#end define
- def GetVar(self, text, search):
+ def GetVar(self, text, search) -> str | None:
if search is None or text is None:
- return
+ return None
if search not in text:
- return
+ return None
text = text[text.find(search) + len(search):]
if text[0] in [':', '=', ' ']:
text = text[1:]
@@ -3327,7 +3256,7 @@ def ImportShardOverlayCert(self):
history = self.GetAccountHistory(account, 10)
vwl = self.GetValidatorsWalletsList()
for message in history:
- srcAddrFull = f"{message.srcWorkchain}:{message.srcAddr}"
+ srcAddrFull = f"{message.src_workchain}:{message.src_addr}"
srcAddrFull = self.AddrFull2AddrB64(srcAddrFull)
if srcAddrFull not in vwl:
continue
@@ -3401,7 +3330,7 @@ def DownloadContract(self, url, branch=None):
def WithdrawFromPoolProcess(self, poolAddr, amount):
self.local.add_log("start WithdrawFromPoolProcess function", "debug")
wallet = self.GetValidatorWallet()
- bocPath = self.local.buffer.my_temp_dir + wallet.name + "validator-withdraw-query.boc"
+ bocPath = self.local.my_temp_dir + wallet.name + "validator-withdraw-query.boc"
fiftScript = self.contractsDir + "nominator-pool/func/validator-withdraw.fif"
args = [fiftScript, amount, bocPath]
self.fift.Run(args)
diff --git a/mytoncore/utils.py b/mytoncore/utils.py
index f3f0879a..b5f0af63 100644
--- a/mytoncore/utils.py
+++ b/mytoncore/utils.py
@@ -1,90 +1,83 @@
+from __future__ import annotations
+
import base64
import json
import re
import subprocess
+
try:
# Python 3.9+
from importlib.resources import files, as_file
except ImportError:
# Python < 3.9
- from importlib_resources import files, as_file
+ from importlib_resources import files, as_file # pyright: ignore[reportMissingImports]
-def str2b64(s):
+def str2b64(s: str):
b = s.encode("utf-8")
b64 = base64.b64encode(b)
b64 = b64.decode("utf-8")
return b64
-# end define
-def b642str(b64):
- b64 = b64.encode("utf-8")
- b = base64.b64decode(b64)
- s = b.decode("utf-8")
+def b642str(b64: str):
+ b = base64.b64decode(b64.encode())
+ s = b.decode()
return s
-# end define
-def dict2b64(d):
+def dict2b64(d: dict):
s = json.dumps(d)
b64 = str2b64(s)
return b64
-# end define
-def b642dict(b64):
+def b642dict(b64: str):
s = b642str(b64)
d = json.loads(s)
return d
-# end define
-def hex2b64(input): # TODO: remove duplicates
- hexBytes = bytes.fromhex(input)
- b64Bytes = base64.b64encode(hexBytes)
- b64String = b64Bytes.decode()
- return b64String
-# end define
+def hex2b64(inp: str): # TODO: remove duplicates
+ hex_bytes = bytes.fromhex(inp)
+ b64_bytes = base64.b64encode(hex_bytes)
+ b64_string = b64_bytes.decode()
+ return b64_string
-def b642hex(input):
- b64Bytes = input.encode()
- hexBytes = base64.b64decode(b64Bytes)
- hexString = hexBytes.hex()
- return hexString
-# end define
+def b642hex(inp: str):
+ b64_bytes = inp.encode()
+ hex_bytes = base64.b64decode(b64_bytes)
+ hex_string = hex_bytes.hex()
+ return hex_string
-def xhex2hex(x):
+def xhex2hex(x: str) -> str | None:
try:
b = x[1:]
h = b.lower()
return h
except Exception:
return None
-#end define
-def hex2base64(h): # TODO: remove duplicates
+
+def hex2base64(h: str): # TODO: remove duplicates
b = bytes.fromhex(h)
b64 = base64.b64encode(b)
s = b64.decode("utf-8")
return s
-#end define
-def str2bool(str):
- if str == "true":
+def str2bool(s: str):
+ if s == "true":
return True
return False
-# end define
-def ng2g(ng):
+def ng2g(ng: int | None) -> float | None:
if ng is None:
- return
- return int(ng)/10**9
-#end define
+ return None
+ return int(ng) / 10**9
def parse_db_stats(path: str):
@@ -103,23 +96,25 @@ def parse_db_stats(path: str):
result[s[0]] = {}
result[s[0]] = {k: float(v) for k, v in items}
return result
-# end define
-def get_hostname():
+
+def get_hostname() -> str:
return subprocess.run(["hostname"], stdout=subprocess.PIPE).stdout.decode().strip()
+
def hex_shard_to_int(shard_id_str: str) -> dict:
try:
- wc, shard_hex = shard_id_str.split(':')
+ wc, shard_hex = shard_id_str.split(":")
wc = int(wc)
shard = int(shard_hex, 16)
- if shard >= 2 ** 63:
- shard -= 2 ** 64
+ if shard >= 2**63:
+ shard -= 2**64
return {"workchain": wc, "shard": shard}
except (ValueError, IndexError):
raise Exception(f'Invalid shard ID "{shard_id_str}"')
-def signed_int_to_hex64(value):
+
+def signed_int_to_hex64(value: int):
if value < 0:
value = (1 << 64) + value
return f"{value:016X}"
@@ -148,11 +143,11 @@ def _count_trailing_zeroes64(value: int) -> int:
if u == 0:
return 64
return ((u & -u).bit_length()) - 1
+
return 63 - _count_trailing_zeroes64(_to_unsigned64(shard_id))
def shard_prefix(shard_id: int, length_: int):
-
def _to_signed64(v: int) -> int:
return v - (1 << 64) if v >= (1 << 63) else v
@@ -162,7 +157,9 @@ def _to_signed64(v: int) -> int:
x = _lower_bit64(u)
y = 1 << (63 - length_)
if y < x:
- raise ValueError("requested prefix length is longer (more specific) than current shard id")
+ raise ValueError(
+ "requested prefix length is longer (more specific) than current shard id"
+ )
mask_non_lower = (~(y - 1)) & _MASK64 # equals -y mod 2^64; clears bits below y
res_u = (u & mask_non_lower) | y
return _to_signed64(res_u)
diff --git a/mytonctrl/mytonctrl.py b/mytonctrl/mytonctrl.py
index 560de264..682076e2 100755
--- a/mytonctrl/mytonctrl.py
+++ b/mytonctrl/mytonctrl.py
@@ -212,9 +212,9 @@ def check_installer_user(local):
def pre_up(local: MyPyClass, ton: MyTonCore):
try:
- check_mytonctrl_update(local)
- check_installer_user(local)
- check_vport(local, ton)
+ local.try_function(check_mytonctrl_update, args=[local])
+ local.try_function(check_installer_user, args=[local])
+ local.try_function(check_vport, args=[local, ton])
warnings(local, ton)
except Exception as e:
local.add_log(f'PreUp error: {e}', 'error')
@@ -275,6 +275,8 @@ def check_git(input_args, default_repo, text, default_branch='master'):
if '--url' in input_args:
git_url = pop_arg_from_args(input_args, '--url')
+ if not git_url:
+ raise Exception("git url is empty after --url flag")
if branch is None:
if '#' in git_url:
ref_fragment = git_url.rsplit('#', 1)[1]
@@ -425,10 +427,10 @@ def run_benchmark(ton, args):
print_table(table)
#end define
-def check_mytonctrl_update(local):
- git_path = local.buffer.my_dir
+def check_mytonctrl_update(local: MyPyClass):
+ git_path = '/usr/src/mytonctrl'
result = check_git_update(git_path)
- if result is True:
+ if result:
color_print(local.translate("mytonctrl_update_available"))
def print_warning(local, warning_name: str):
@@ -1068,6 +1070,6 @@ def mytonctrl():
local = MyPyClass('mytonctrl.py')
mytoncore_local = MyPyClass('mytoncore.py')
ton = MyTonCore(mytoncore_local)
- console = MyPyConsole()
+ console = MyPyConsole(local)
Init(local, ton, console, sys.argv[1:])
console.Run()
diff --git a/mytoninstaller/mytoninstaller.py b/mytoninstaller/mytoninstaller.py
index a8552554..1dd836cc 100644
--- a/mytoninstaller/mytoninstaller.py
+++ b/mytoninstaller/mytoninstaller.py
@@ -348,7 +348,7 @@ def General(local, console):
###
def mytoninstaller():
local = MyPyClass(__file__)
- console = MyPyConsole()
+ console = MyPyConsole(local)
Init(local, console)
if len(sys.argv) > 1:
diff --git a/tests/conftest.py b/tests/conftest.py
index beb7f9f4..ea1a2542 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -7,7 +7,7 @@
from mypyconsole.mypyconsole import MyPyConsole
from mytoncore.mytoncore import MyTonCore
from mytonctrl.mytonctrl import Init
-from mypylib.mypylib import MyPyClass, dir as ensure_dir, Dict
+from mypylib.mypylib import MyPyClass
from tests.helpers import remove_colors
@@ -18,10 +18,10 @@ def __init__(self, file_path: str, work_dir: str, temp_dir: str):
super().__init__(file_path)
def get_my_work_dir(self):
- return ensure_dir(self._work_dir)
+ return self._work_dir
def get_my_temp_dir(self):
- return ensure_dir(self._temp_dir)
+ return self._temp_dir
def self_test(self):
pass
@@ -39,8 +39,8 @@ def write_log(self):
@pytest.fixture()
def local(tmp_path):
- work_dir = str(tmp_path / "work")
- temp_dir = str(tmp_path / "tmp")
+ work_dir = str(tmp_path / "work") + '/'
+ temp_dir = str(tmp_path / "tmp") + '/'
file_path = str(tmp_path / "tests_runner.py")
os.makedirs(work_dir, exist_ok=True)
os.makedirs(temp_dir, exist_ok=True)
@@ -108,7 +108,7 @@ def execute(self, command: str, no_color: bool = False) -> str:
@pytest.fixture()
def cli(local, ton) -> TestMyPyConsole:
- console = TestMyPyConsole()
+ console = TestMyPyConsole(local)
mp = pytest.MonkeyPatch()
mp.setattr(MyTonCore, "using_pool", lambda self: True)
mp.setattr(MyTonCore, "using_nominator_pool", lambda self: True)
diff --git a/tests/integration/test_backup_commands.py b/tests/integration/test_backup_commands.py
index c9f1252b..e11315e4 100644
--- a/tests/integration/test_backup_commands.py
+++ b/tests/integration/test_backup_commands.py
@@ -38,18 +38,18 @@ def run_create_backup(_, args: list, user: str):
output = cli.execute("create_backup", no_color=True)
assert "create_backup - OK" in output
assert fun_user is None
- assert fun_args == ["-m", ton.local.buffer.my_work_dir, "-t", str(tmp_dir)]
+ assert fun_args == ["-m", ton.local.my_work_dir, "-t", str(tmp_dir)]
output = cli.execute("create_backup /to_dir/", no_color=True)
assert "create_backup - OK" in output
assert fun_user is None
- assert fun_args == ["-m", ton.local.buffer.my_work_dir, "-t", str(tmp_dir), "-d", "/to_dir/"]
+ assert fun_args == ["-m", ton.local.my_work_dir, "-t", str(tmp_dir), "-d", "/to_dir/"]
assert not Path(tmp_dir).exists()
output = cli.execute("create_backup /to_dir/ -u yungwine", no_color=True)
assert "create_backup - OK" in output
assert fun_user == 'yungwine'
- assert fun_args == ["-m", ton.local.buffer.my_work_dir, "-t", str(tmp_dir), "-d", "/to_dir/"]
+ assert fun_args == ["-m", ton.local.my_work_dir, "-t", str(tmp_dir), "-d", "/to_dir/"]
assert not Path(tmp_dir).exists()
return_code = 1
@@ -79,7 +79,7 @@ def fake_run_as_root(args: list):
def fake_run_restore_backup(*args, **kwargs):
# really do update db after restore
- with open(ton.local.buffer.db_path, 'w') as f:
+ with open(ton.local.db_path, 'w') as f:
new_db = ton.local.db.copy()
new_db.update({"abc": 123})
f.write(json.dumps(new_db))
@@ -94,7 +94,7 @@ def fake_run_restore_backup(*args, **kwargs):
def assert_happy_run_args(outp: str, user: str):
assert 'restore_backup - OK' in outp
exit_mock.assert_called_once() # exited after restore_backup
- assert run_args == ['bash', backup_path, '-u', user, '-m', ton.local.buffer.my_work_dir, '-n',
+ assert run_args == ['bash', backup_path, '-u', user, '-m', ton.local.my_work_dir, '-n',
'backup.tar.gz', '-i', '2130706433']
assert ton.local.db.get('abc') == 123 # db updated after restore_backup
diff --git a/tests/integration/test_btc_teleport_commands.py b/tests/integration/test_btc_teleport_commands.py
index b2033deb..9097e973 100644
--- a/tests/integration/test_btc_teleport_commands.py
+++ b/tests/integration/test_btc_teleport_commands.py
@@ -22,7 +22,7 @@ def fake_run_as_root(args: list):
output = cli.execute("remove_btc_teleport", no_color=True)
assert "Removed btc_teleport" in output
- assert run_args == ['bash', script_path, '-s', '/usr/src/ton-teleport-btc-periphery', '-k', ton.local.buffer.my_work_dir + '/btc_oracle_keystore']
+ assert run_args == ['bash', script_path, '-s', '/usr/src/ton-teleport-btc-periphery', '-k', ton.local.my_work_dir + '/btc_oracle_keystore']
# bad args
run_args = []
@@ -42,11 +42,11 @@ def fake_run_as_root(args: list):
monkeypatch.setattr(MyTonCore, "GetConfig34", lambda self: {"mainValidators": 10})
output = cli.execute("remove_btc_teleport --force", no_color=True)
assert "Removed btc_teleport" in output
- assert run_args == ['bash', script_path, '-s', '/usr/src/ton-teleport-btc-periphery', '-k', ton.local.buffer.my_work_dir + '/btc_oracle_keystore']
+ assert run_args == ['bash', script_path, '-s', '/usr/src/ton-teleport-btc-periphery', '-k', ton.local.my_work_dir + '/btc_oracle_keystore']
# non-masterchain validator
monkeypatch.setattr(MyTonCore, "GetValidatorIndex", lambda self: 10)
monkeypatch.setattr(MyTonCore, "GetConfig34", lambda self: {"mainValidators": 10})
output = cli.execute("remove_btc_teleport", no_color=True)
assert "Removed btc_teleport" in output
- assert run_args == ['bash', script_path, '-s', '/usr/src/ton-teleport-btc-periphery', '-k', ton.local.buffer.my_work_dir + '/btc_oracle_keystore']
+ assert run_args == ['bash', script_path, '-s', '/usr/src/ton-teleport-btc-periphery', '-k', ton.local.my_work_dir + '/btc_oracle_keystore']
diff --git a/tests/integration/test_mypylib_basic.py b/tests/integration/test_mypylib_basic.py
new file mode 100644
index 00000000..9d83a8ab
--- /dev/null
+++ b/tests/integration/test_mypylib_basic.py
@@ -0,0 +1,112 @@
+import json
+import sys
+from pathlib import Path
+
+import pytest
+
+from mypylib.mypylib import DEBUG, INFO, Dict, MyPyClass
+
+
+@pytest.fixture
+def local(tmp_path: Path, monkeypatch) -> MyPyClass:
+ home_dir = tmp_path / "home"
+ temp_dir = tmp_path / "tmp"
+ home_dir.mkdir()
+ temp_dir.mkdir()
+
+ monkeypatch.setenv("HOME", str(home_dir))
+ monkeypatch.delenv("XDG_DATA_HOME", raising=False)
+ monkeypatch.setattr(MyPyClass, "check_root_permission", lambda self: False)
+ monkeypatch.setattr(MyPyClass, "get_my_temp_dir", lambda self: str(temp_dir) + '/')
+
+ file_path = tmp_path / "tests_runner.py"
+ file_path.write_text("# integration test runner\n")
+ return MyPyClass(str(file_path))
+
+
+def test_mypyclass_initializes_runtime_paths_and_defaults(local, tmp_path):
+ expected_work_dir = tmp_path / "home" / ".local" / "share" / "tests_runner"
+
+ assert local.working is True
+ assert local.my_name == "tests_runner"
+ assert local.my_full_name == "tests_runner.py"
+ assert local.my_path == str(tmp_path / "tests_runner.py")
+ assert local.my_work_dir == str(expected_work_dir) + '/'
+ assert local.my_temp_dir == str(tmp_path / "tmp") + '/'
+ assert local.db_path == str(expected_work_dir / "tests_runner.db")
+ assert local.log_file_name == str(expected_work_dir / "tests_runner.log")
+ assert local.pid_file_path == str(expected_work_dir / "tests_runner.pid")
+ assert Path(local.db_path).is_file()
+
+ assert local.db.config.logLevel == INFO
+ assert local.db.config.isLimitLogFile is True
+ assert local.db.config.isDeleteOldLogFile is False
+ assert local.db.config.isStartOnlyOneProcess is True
+ assert local.db.config.isLocaldbSaving is False
+ assert local.db.config.isWritingLogFile is True
+ assert local.db.config.logFileSizeLines == 16384
+
+
+def test_mypyclass_save_db_syncs_local_and_disk_changes(local):
+ db_path = Path(local.db_path)
+
+ local.db.runtime = Dict(enabled=True)
+ local.db.config.memoryUsinglimit = 123
+ local.save_db()
+
+ persisted = json.loads(db_path.read_text())
+ assert persisted["runtime"]["enabled"] is True
+ assert persisted["config"]["memoryUsinglimit"] == 123
+ assert local.old_db.runtime.enabled is True
+
+ persisted["external"] = {"source": "disk"}
+ persisted["config"]["logLevel"] = DEBUG
+ db_path.write_text(json.dumps(persisted))
+
+ local.save_db()
+
+ assert local.db.runtime.enabled is True
+ assert local.db.external.source == "disk"
+ assert local.db.config.logLevel == DEBUG
+ assert local.old_db.external.source == "disk"
+ assert local.old_db.config.logLevel == DEBUG
+
+
+def test_mypyclass_write_log_flushes_queue_and_trims_file(local):
+ local.db.config.logFileSizeLines = 2
+
+ for i in range(260):
+ local.add_log(f"log line {i}")
+
+ assert len(local.log_list) == 260
+
+ local.write_log()
+
+ lines = Path(local.log_file_name).read_text().splitlines()
+ assert local.log_list == []
+ assert len(lines) == 2
+ assert "log line 258" in lines[0]
+ assert "log line 259" in lines[1]
+
+
+def test_mypyclass_exit_persists_state_and_cleans_up_pid_file(local, monkeypatch):
+ db_path = Path(local.db_path)
+ log_path = Path(local.log_file_name)
+ pid_path = Path(local.pid_file_path)
+
+ local.db.shutdown = "graceful"
+ local.add_log("shutting down")
+ local.write_pid()
+ assert pid_path.is_file()
+
+ exit_codes = []
+ monkeypatch.setattr(sys, "exit", lambda code=0: exit_codes.append(code))
+
+ local.exit()
+
+ persisted = json.loads(db_path.read_text())
+ assert local.working is False
+ assert exit_codes == [0]
+ assert not pid_path.exists()
+ assert persisted["shutdown"] == "graceful"
+ assert "shutting down" in log_path.read_text()