diff --git a/application/app/application.py b/application/app/application.py index 80f1f46..0775d60 100644 --- a/application/app/application.py +++ b/application/app/application.py @@ -1,36 +1,42 @@ +import asyncio import platform from abc import ABC + from application.app.setup.console_setup import ConsoleSetup from application.app.setup.logger_setup import LoggerSetup from application.app.setup.p2p_setup import P2PNodeSetup from application.app.setup.tor_setup import TorSetup from application.logger.logger import Logger from application.settings import APP_NAME +from application.settings import PROXY_HOST +from application.settings import PROXY_PORT from application.settings import NODE_HOST from application.settings import NODE_PORT from application.version import APP_VERSION +from application.p2p.node import Node +from application.app.de_app import DeApp class DeProtocol(ABC): def __init__(self): self.setups = {} - self.on_start() - def on_start(self, proxy_host='127.0.0.1', proxy_port=9050): + async def on_start(self, proxy_host: str = PROXY_HOST, proxy_port: int = PROXY_PORT) -> None: self.setups = { 'logger': LoggerSetup(), 'tor': TorSetup(proxy_host, proxy_port), 'p2p': P2PNodeSetup(NODE_HOST, NODE_PORT), + 'shell': ConsoleSetup() } for setup in self.setups.values(): - setup.setup() - - ConsoleSetup(self.setups['p2p'].node, self.setups['tor'].tor_service).setup() + await setup.setup() + app = DeApp(self.setups['p2p'].node, self.setups['tor'].tor_service, self.setups['shell'].shell) Logger.get_instance().info(f"Starting {APP_NAME} version {APP_VERSION}, running on {platform.system()}") + await app.start() - def on_stop(self): + def on_stop(self) -> None: pass diff --git a/application/app/de_app.py b/application/app/de_app.py new file mode 100644 index 0000000..27ef392 --- /dev/null +++ b/application/app/de_app.py @@ -0,0 +1,15 @@ +# from application.console.simple_console import DeConsole +from application.network.tor_network import TorService +from application.p2p.node import Node + + +class DeApp: + def __init__(self, node: Node, tor_service: TorService, shell): + self.node = node + self.tor_service = tor_service + self.shell = shell + + async def start(self) -> None: + init_shell_vars = [self, self.tor_service, self.node] + await self.shell.start(*init_shell_vars) + diff --git a/application/app/setup/console_setup.py b/application/app/setup/console_setup.py index e3933b9..b17aae4 100644 --- a/application/app/setup/console_setup.py +++ b/application/app/setup/console_setup.py @@ -6,14 +6,10 @@ class ConsoleSetup(SetupABC): - def __init__(self, node, tor_service): - self.shell = None - self.node = node - self.tor_service = tor_service + def __init__(self): + self.shell: DeConsole = None - def setup(self): + async def setup(self): if USE_CONSOLE: Logger.get_instance().warning("Running DeProtocol in CONSOLE MODE!") - self.shell = DeConsole(self.node, self.tor_service) - self.shell.start() - Logger.get_instance().info("Console started correctly!") + self.shell = DeConsole() diff --git a/application/app/setup/logger_setup.py b/application/app/setup/logger_setup.py index a15ea15..ecc5d95 100644 --- a/application/app/setup/logger_setup.py +++ b/application/app/setup/logger_setup.py @@ -2,16 +2,15 @@ from application.app.setup.setup import SetupABC from application.logger.logger import Logger -from application.settings import APP_NAME, PROXY_HOST, PROXY_PORT, PROXY_TYPE, NODE_HOST, NODE_PORT, DEBUG, \ - DEFAULT_LOG_LEVEL +from application.settings import APP_NAME, DEFAULT_LOG_LEVEL class LoggerSetup(SetupABC): def __init__(self): - self.logger = None + self.logger: Logger = None - def setup(self): + async def setup(self) -> None: log_level = DEFAULT_LOG_LEVEL self.logger = Logger(name=APP_NAME, level=log_level) diff --git a/application/app/setup/p2p_setup.py b/application/app/setup/p2p_setup.py index 73feda0..ea0776c 100644 --- a/application/app/setup/p2p_setup.py +++ b/application/app/setup/p2p_setup.py @@ -4,13 +4,12 @@ class P2PNodeSetup(SetupABC): - def __init__(self, node_host, node_port): + def __init__(self, node_host: str, node_port: int): self.node = None self.node_host = node_host self.node_port = node_port - def setup(self): - # Start Node + async def setup(self) -> None: self.node = Node(self.node_host, self.node_port) - self.node.start() + await self.node.start() Logger.get_instance().info(f"Node started correctly! Host:Port -> {self.node_host}:{self.node_port}") diff --git a/application/app/setup/setup.py b/application/app/setup/setup.py index cd920a0..6890613 100644 --- a/application/app/setup/setup.py +++ b/application/app/setup/setup.py @@ -4,5 +4,5 @@ class SetupABC(ABC): @abstractmethod - def setup(self): + def setup(self) -> None: pass diff --git a/application/app/setup/tor_setup.py b/application/app/setup/tor_setup.py index b0304ce..bf9e9f3 100644 --- a/application/app/setup/tor_setup.py +++ b/application/app/setup/tor_setup.py @@ -4,27 +4,32 @@ from application.logger.logger import Logger from application.network.tor_network import TorService from application.settings import PROXY_TYPE +from application.settings import PROXY_PORT +from application.settings import PROXY_HOST +from application.settings import CONTROL_PORT from application.utils.tor_utils import TorUtils + class TorSetup(SetupABC): - def __init__(self, proxy_host='127.0.0.1', proxy_port=9050): + def __init__(self, proxy_host: str = PROXY_HOST, proxy_port: int = PROXY_PORT): self.tor_service = None self.tor_client = None self.proxy_host = proxy_host self.proxy_port = proxy_port - def setup(self): + async def setup(self) -> None: # Configure socks to use Tor proxy by default socks.setdefaultproxy(PROXY_TYPE, self.proxy_host, self.proxy_port) + Logger.get_instance().info( f'Default proxy configuration set: {PROXY_TYPE} - {self.proxy_host}:{self.proxy_port}') # Download and install Tor Client self.tor_client = TorUtils() - self.tor_client.download_and_install() + await self.tor_client.install() # Start Tor Service - self.tor_service = TorService(9051) - self.tor_service.start() + self.tor_service = TorService(CONTROL_PORT) + await self.tor_service.start() Logger.get_instance().info("Tor Service started correctly!") diff --git a/application/console/simple_console.py b/application/console/simple_console.py index 610b4fa..42b2024 100644 --- a/application/console/simple_console.py +++ b/application/console/simple_console.py @@ -1,34 +1,57 @@ -import threading +import asyncio import time from application.logger.logger import Logger +from application.app.de_app import DeApp +from application.settings import NODE_PORT -class DeConsole(threading.Thread): +class DeConsole: + def __init__(self): + self.app = None + self.tor_service = None + self.node = None + self.terminate_flag = asyncio.Event() - def __init__(self, node, tor_service): - super().__init__() - self.node = node - self.tor_service = tor_service - self.terminate_flag = threading.Event() + async def start(self, *args, **kwargs) -> None: + + def set_cls_vars() -> None: + if args: + self.app, self.tor_service, self.node = args + Logger.get_instance().info("Console started correctly!") + + try: + set_cls_vars() + except Exception as exs: + Logger.get_instance().error(exs) - def run(self): while not self.terminate_flag.is_set(): try: - self.handle_console() + + await self.handle_console() + except KeyboardInterrupt: Logger.get_instance().info("User requested stopping the protocol, stopping!") self.stop() + + # BUG: add exception for bug "Invalid protocol version" + except Exception as exc: Logger.get_instance().info(f"An exception is stopping DeProtocol! ({exc})") self.stop() + Logger.get_instance().info("DeProtocol successfully closed, see you soon!") - def stop(self): + def stop(self) -> None: self.terminate_flag.set() - def handle_console(self): - cmd = input("\nDEPROTO>") + async def handle_console(self) -> None: + + async def async_input(prompt='') -> str: + print(prompt, end='', flush=True) + return (await asyncio.get_running_loop().run_in_executor(None, input)).rstrip('\n') + + cmd = await async_input("\nDEPROTO>") if cmd == "help": print( """ @@ -42,12 +65,12 @@ def handle_console(self): ) if "connect " in cmd: args = cmd.replace("connect ", "") - self.node.connect(args, port=65432) + await self.node.connect(args, port=65432) if "msg " in cmd: args = cmd.replace("msg ", "") Logger.get_instance().info(f"Sent message: {args}") - self.node.message("msg", args) + await self.node.message("msg", args) if cmd == "stop": self.node.stop() @@ -69,11 +92,10 @@ def handle_console(self): if cmd == "peers": self.print_peers() - def print_peers(self): + def print_peers(self) -> None: print(f"Address: {self.tor_service.get_address()}") - Logger.get_instance().info(self.node.peers) print("--------------") - for i in self.node.nodes_connected: + for i in self.node.node_connections: print( i.id + " (" @@ -82,6 +104,6 @@ def print_peers(self): + str(time.time() - int(i.last_ping)) + "s" ) - if len(self.node.peers) == 0: + if len(self.node.node_connections) == 0: print("NO PEERS CONNECTED") print("--------------") \ No newline at end of file diff --git a/application/logger/logger.py b/application/logger/logger.py index 88d8317..4f5e645 100644 --- a/application/logger/logger.py +++ b/application/logger/logger.py @@ -7,7 +7,7 @@ class Logger: """ Represents a Logger object handles logging functionalities executed as a singleton""" _instance = None - def __init__(self, name, level=logging.INFO, fmt='[%(asctime)s] %(levelname)s: %(message)s', + def __init__(self, name: str, level=logging.INFO, fmt='[%(asctime)s] %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S'): self.logger = logging.getLogger(name) self.logger.setLevel(level) @@ -28,22 +28,22 @@ def __init__(self, name, level=logging.INFO, fmt='[%(asctime)s] %(levelname)s: % #stem_logger.addHandler(logging.StreamHandler()) - def level(self, level="trace"): + def level(self, level="trace") -> None: self.logger.setLevel(level) - def info(self, msg, *args, **kwargs): + def info(self, msg: str, *args, **kwargs) -> None: """ Information prints in logging terms """ self.logger.info(msg, *args, **kwargs) - def warning(self, msg, *args, **kwargs): + def warning(self, msg: str, *args, **kwargs) -> None: """ Warning prints in logging terms """ self.logger.warning(msg, *args, **kwargs) - def error(self, msg, *args, **kwargs): + def error(self, msg: str, *args, **kwargs) -> None: """ Error prints in logging terms""" self.logger.error(msg, *args, **kwargs) - def debug(self, msg, *args, **kwargs): + def debug(self, msg: str, *args, **kwargs) -> None: """ Debug prints in logging terms """ self.logger.debug(msg, *args, **kwargs) diff --git a/application/network/tor_network.py b/application/network/tor_network.py index 72e4cb1..f6714e1 100644 --- a/application/network/tor_network.py +++ b/application/network/tor_network.py @@ -2,31 +2,38 @@ import os.path import socket +import asyncio +import stem.util import stem.control import stem.process from application.logger.logger import Logger from application.settings import TOR_BINARIES_PATH +from application.settings import NODE_PORT +from application.settings import NODE_HOST +from application.settings import PROXY_PORT +from application.utils.tor_utils import TorUtils class TorService: - def __init__(self, port): + + def __init__(self, port: int): self.port = port self.tor_process = None self.tor_controller = None self.hidden_service = None - def start(self): + def set_tor_config(self) -> None: try: self.tor_process = stem.process.launch_tor_with_config( config={ - 'SocksPort': '9050', + 'SocksPort': f'{PROXY_PORT}', # 9050 'SocksPolicy': 'accept *', - 'ControlPort': str(self.port), + 'ControlPort': f"{self.port}", # 9051 'DataDirectory': os.path.join(os.getcwd(), 'tor_data'), 'HiddenServiceDir': os.path.join(os.getcwd(), 'tor_hidden_service'), - 'HiddenServicePort': '80 127.0.0.1:65432' + 'HiddenServicePort': f'80 {NODE_HOST}:{NODE_PORT}', # localhost : 65432/65433 }, tor_cmd=os.path.join(os.getcwd(), TOR_BINARIES_PATH), init_msg_handler=self._print_bootstrap_lines, @@ -35,45 +42,37 @@ def start(self): except Exception as e: Logger.get_instance().error(e) - self.tor_controller = stem.control.Controller.from_port(port=self.port) - self.tor_controller.authenticate() - self.tor_controller.new_circuit() - - bytes_read = self.tor_controller.get_info("traffic/read") - bytes_written = self.tor_controller.get_info("traffic/written") - + async def get_info_by_traffic(self) -> None: + bytes_read = await self.tor_controller.get_info("traffic/read") + bytes_written = await self.tor_controller.get_info("traffic/written") Logger.get_instance().info(f'Tor relay has read {bytes_read} bytes and written {bytes_written}.') - self.hidden_service = self.tor_controller.create_ephemeral_hidden_service( - {'80': '127.0.0.1:65432'}, await_publication=True + async def create_hidden_service(self) -> None: + self.hidden_service = await self.tor_controller.create_ephemeral_hidden_service( + {'80': f'{NODE_HOST}:{NODE_PORT}'}, await_publication=True ) Logger.get_instance().info(f"Hidden service created with address: {self.hidden_service.service_id}.onion") - def stop(self): + async def start(self) -> None: + self.set_tor_config() + await self.create_tor_session() + + async def create_tor_session(self) -> None: + self.tor_controller = await TorUtils.establish_tor_connection(return_=True) + await self.get_info_by_traffic() + await self.create_hidden_service() + + async def stop(self) -> None: if self.tor_controller: - self.tor_controller.close() + await self.tor_controller.close() Logger.get_instance().info("Tor Service was closed successfully") if self.tor_process: self.tor_process.kill() Logger.get_instance().warning("Tor Service process was killed!") - def _print_bootstrap_lines(self, line): + def _print_bootstrap_lines(self, line: str) -> None: if "Bootstrapped" in line: Logger.get_instance().info(line) - # unused - def connect(self, addr, port): - circuit = self.tor_controller.new_circuit() - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.settimeout(10) - - # Connect the socket to the hidden service via the Tor circuit - s.connect((addr, port)) - s = self.tor_controller.attach_stream(circuit, s) - - s.send("test") - response = s.recv(1024) - print(response) - - def get_address(self): + def get_address(self) -> str: return self.hidden_service.service_id diff --git a/application/p2p/deprecated_node.py b/application/p2p/deprecated_node.py deleted file mode 100644 index cfbeec4..0000000 --- a/application/p2p/deprecated_node.py +++ /dev/null @@ -1,415 +0,0 @@ -import hashlib -import ipaddress -import json -import socket -import threading -import time - -import socks -import stem - -from application.logger.logger import Logger -from application.p2p.file_transfer import FileDownloader -from application.p2p.file_transfer import FileManager -from application.p2p.node_connection import NodeConnection -from application.p2p.pinger import Pinger -from application.protocol import HandshakePacket -from application.protocol.packet_handler import PacketHandler -from application.utils import crypto_funcs as cf -from application.utils import portforwardlib - -msg_del_time = 30 -PORT = 65432 -FILE_PORT = 65433 - - -class Node(threading.Thread): - def __init__(self, host="", port=65432, file_port=65433): - super(Node, self).__init__() - - self.terminate_flag = threading.Event() - self.pinger = Pinger(self) # start pinger - #self.file_manager = FileManager() - # self.fileServer = fileServer(self, file_port) - self.debug = True - - self.dead_time = ( - 45 # time to disconect from node if not pinged, nodes ping after 20s - ) - - self.host = host - self.ip = host # own ip, will be changed by connection later - self.port = port - self.file_port = file_port - - self.nodes_connected = [] - - self.requested = [] # list of files we have requested. - self.msgs = {} # hashes of recieved messages - self.peers = [] - - self.publickey, self.private_key = cf.generate_keys() - self.id = cf.serialize_key(self.publickey) - - self.max_peers = 10 - - hostname = socket.gethostname() - - self.local_ip = socket.gethostbyname(hostname) - - self.banned = [] - - self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - Logger.get_instance().info("Initialisation of the Node on port: " + str(self.port)) - self.socket.bind((self.host, self.port)) - self.socket.settimeout(10.0) - self.socket.listen(1) - self.packet_handler = None - - def debug_print(self, msg): - if self.debug: - Logger.get_instance().info(str(msg)) - - def network_send(self, message, exc=[]): - for i in self.nodes_connected: - if i.connected_host in exc: - pass - else: - i.send(json.dumps(message)) - - def connect_to(self, host, port=PORT): - - if not self.check_ip_to_connect(host): - self.debug_print("connect_to: Cannot connect!!") - return False - - if len(self.nodes_connected) >= self.max_peers: - self.debug_print("Peers limit reached.") - return True - - for node in self.nodes_connected: - if node.connected_host == host: - print("[connect_to]: Already connected with this node.") - return True - - try: - sock = socks.socksocket() - sock.settimeout(60) - self.debug_print("connecting to %s port %s" % (host, port)) - sock.setproxy(socks.PROXY_TYPE_SOCKS5, "localhost", 9050, True) - - tor_controller = stem.control.Controller.from_port(port=9051) - tor_controller.authenticate() - - tor_controller.new_circuit() - - sock.connect((host, 80)) - self.packet_handler = PacketHandler(sock) - - handshake_packet = HandshakePacket(self.id) - self.packet_handler.send_packet(handshake_packet) - rec = self.packet_handler.receive_packet() - connected_node_id = rec.payload.decode("utf-8") - - if self.id == connected_node_id: - self.debug_print("Possible own ip: " + host) - if ipaddress.ip_address(host).is_private: - self.local_ip = host - else: - self.ip = host - self.banned.append(host) - sock.close() - return False - - thread_client = self.create_new_connection( - sock, connected_node_id, host, port - ) - thread_client.start() - self.nodes_connected.append(thread_client) - self.node_connected(thread_client) - - except Exception as e: - self.debug_print( - "connect_to: Could not connect with node. (" + str(e) + ")" - ) - - def create_new_connection(self, connection, id, host, port): - return NodeConnection(self, connection, id, host, port) - - def stop(self): - self.terminate_flag.set() - portforwardlib.forwardPort( - self.host, - self.port, - None, - None, - True, - "TCP", - 0, - "PYHTON-P2P-NODE", - True, - ) - portforwardlib.forwardPort( - self.host, - self.file_port, - None, - None, - True, - "TCP", - 0, - "PYHTON-P2P-FILESERVER", - True, - ) - - def run(self): - self.pinger.start() - # self.fileServer.start() - while ( - not self.terminate_flag.is_set() - ): # Check whether the thread needs to be closed - try: - connection, client_address = self.socket.accept() - - self.packet_handler = PacketHandler(connection) - rec = self.packet_handler.receive_packet() - connected_node_id = rec.payload.decode("utf-8") - handshake_packet = HandshakePacket(self.id) - self.packet_handler.send_packet(handshake_packet) - - if self.id != connected_node_id: - thread_client = self.create_new_connection( - connection, - connected_node_id, - client_address[0], - client_address[1], - ) - thread_client.start() - - self.nodes_connected.append(thread_client) - - self.node_connected(thread_client) - - else: - connection.close() - - except socket.timeout: - pass - - except Exception as e: - raise e - - time.sleep(0.01) - - self.pinger.stop() - self.fileServer.stop() - for t in self.nodes_connected: - t.stop() - - self.socket.close() - print("Node stopped") - - def ConnectToNodes(self): - for i in self.peers: - if not self.connect_to(i, PORT): - del self.peers[self.peers.index(i)] # delete wrong / own ip from peers - - def send_message(self, data, reciever=None): - # time that the message was sent - if reciever: - data = cf.encrypt(data, cf.load_key(reciever)) - self.message("msg", data, {"rnid": reciever}) - - def message(self, type, data, overides={}, ex=[]): - # time that the message was sent - dict = {"type": type, "data": data} - if "time" not in dict: - dict["time"] = str(time.time()) - - if "snid" not in dict: - # sender node id - dict["snid"] = str(self.id) - - if "rnid" not in dict: - # reciever node id - dict["rnid"] = None - - if "sig" not in dict: - dict["sig"] = cf.sign(data, self.private_key) - - dict = {**dict, **overides} - self.network_send(dict, ex) - - def send_peers(self): - self.message("peers", self.peers) - - def check_validity(self, msg): - if not ( - "time" in msg - and "type" in msg - and "snid" in msg - and "sig" in msg - and "rnid" in msg - ): - return False - - if not cf.verify(msg["data"], msg["sig"], cf.load_key(msg["snid"])): - self.debug_print( - f"Error validating signature of message from {msg['snid']}" - ) - return False - - if msg["type"] == "resp": - if "ip" not in msg and "localip" not in msg: - return False - return True - - def check_expired(self, dta): - sth = str(dta) - hash_object = hashlib.md5(sth.encode("utf-8")) - msghash = str(hash_object.hexdigest()) - - # check if the message hasn't expired. - if float(time.time()) - float(dta["time"]) < float(msg_del_time): - if msghash not in self.msgs: - self.msgs[msghash] = time.time() - return False - else: - # if message is expired - self.debug_print("expired:" + dta["msg"]) - return True - - def announce(self, dta, n): - self.message(dta["type"], dta["data"], dta, ex=n) - if len(self.msgs) > len(self.peers) * 20: - for i in self.msgs.copy(): - if time.time() - self.msgs[i] > msg_del_time: - del self.msgs[i] - - def encryption_handler(self, dta): - if dta["rnid"] == self.id: - dta["data"] = cf.decrypt(dta["data"], self.private_key) - return dta - elif dta["rnid"] is None: - return dta - else: - return False - - def data_handler(self, dta, n): - if not self.check_validity(dta): - return False - - if self.check_expired(dta): - return False - else: - self.announce(dta, n) - - dta = self.encryption_handler(dta) - if not dta: - return False - - type = dta["type"] - data = dta["data"] - - if type == "peers": - # peers handling - for i in data: - if self.check_ip_to_connect(i): - self.peers.append(i) - - self.debug_print("Known Peers: " + str(self.peers)) - self.ConnectToNodes() # cpnnect to new nodes - return True - - if type == "msg": - self.on_message(data, dta["snid"], bool(dta["rnid"])) - - if type == "req": - if self.file_manager.have_file(data): - self.message( - "resp", - data, - {"ip": self.ip, "localip": self.local_ip}, - ) - self.debug_print( - "recieved request for file: " + data + " and we have it." - ) - else: - self.debug_print( - "recieved request for file: " + data + " but we do not have it." - ) - - if type == "resp": - self.debug_print("node: " + dta["snid"] + " has file " + data) - if data in self.requested: - print("node " + dta["snid"] + " has our file!") - if dta["ip"] == "": - if dta["localip"] != "": - ip = dta["localip"] - else: - ip = dta["ip"] - - downloader = FileDownloader( - ip, FILE_PORT, str(data), self.fileServer.dirname, self.file_manager - ) - downloader.start() - - def check_ip_to_connect(self, ip): - if ( - ip not in self.peers - and ip != "" - and ip != self.ip - and ip != self.local_ip - and ip not in self.banned - ): - return True - else: - return False - - def on_message(self, data, sender, private): - self.debug_print("Incomig Message: " + data) - - def loadstate(self, file="state.json"): - with open(file, "r") as f: - peers = json.load(f) - for i in peers: - self.connect_to(i) - - def savestate(self, file="state.json"): - with open(file, "w+") as f: - json.dump(self.peers, f) - - """ - def requestFile(self, fhash): - if fhash not in self.requested and fhash not in self.file_manager.getallfiles(): - self.requested.append(fhash) - self.message("req", fhash) - - def addfile(self, path): - s = self.file_manager.addfile(path) - self.file_manager.refresh() - return s - - def setfiledir(self, path): - self.fileServer.dirname = path - self.file_manager.download_path = path - """ - - def node_connected(self, node): - self.debug_print("Connected to node: " + node.connected_host) - if node.connected_host not in self.peers: - self.peers.append(node.connected_host) - self.send_peers() - - def node_disconnected(self, node): - self.debug_print("Disconnected from: " + node.connected_host) - if node.connected_host in self.peers: - self.peers.remove(node.connected_host) - - def node_message(self, node, data): - try: - json.loads(data) - except json.decoder.JSONDecodeError: - self.debug_print(f"Error loading message from {node.id}") - return - self.data_handler(json.loads(data), [node.connected_host, self.ip]) diff --git a/application/p2p/handler/connection_handler.py b/application/p2p/handler/connection_handler.py index b15573c..eb44537 100644 --- a/application/p2p/handler/connection_handler.py +++ b/application/p2p/handler/connection_handler.py @@ -1,42 +1,44 @@ +import asyncio from application.p2p.node_connection import NodeConnection from application.protocol import HandshakePacket from application.protocol.packet_handler import PacketHandler class ConnectionHandler: - def __init__(self, conn, addr, node): - super().__init__() - self.conn = conn + def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, addr: tuple, node): + self.reader = reader + self.writer = writer self.addr = addr self.node = node - def start(self): - packet_handler = PacketHandler(self.conn) + async def send_initial_packet(self, packet_handler: PacketHandler) -> None: + handshake_packet = HandshakePacket(self.node.id) + await packet_handler.send_packet(handshake_packet) + + async def start(self) -> None: - self.send_initial_packet(packet_handler) + packet_handler = PacketHandler(self.reader, self.writer) + await self.send_initial_packet(packet_handler) - rec = packet_handler.receive_packet() + rec = await packet_handler.receive_packet() connected_node_id = rec.payload.decode("utf-8") if self.node.id != connected_node_id: - thread_client = self.create_new_connection( - self.conn, + async_client = self.create_new_connection( + self.reader, + self.writer, connected_node_id, self.addr[0], self.addr[1], ) - thread_client.start() - self.node.node_connections.append(thread_client) - - self.node.node_connected(thread_client) + self.node.node_connections.append(async_client) + asyncio.create_task(async_client.start()) else: - self.conn.close() + self.writer.close() - def send_initial_packet(self, packet_handler): - handshake_packet = HandshakePacket(self.node.id) - packet_handler.send_packet(handshake_packet) + def create_new_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, id: str, + host: str, port: int) -> NodeConnection: - def create_new_connection(self, connection, id, host, port): - return NodeConnection(self.node, connection, id, host, port) + return NodeConnection(self.node, reader, writer, id, host, port) diff --git a/application/p2p/node.py b/application/p2p/node.py index ad60053..0b1c807 100644 --- a/application/p2p/node.py +++ b/application/p2p/node.py @@ -2,82 +2,81 @@ import hashlib import json import socket -import threading +import asyncio import time import socks import stem +from siosocks.io.asyncio import open_connection +from siosocks.io.asyncio import socks_server_handler + from application.logger.logger import Logger from application.p2p.handler.connection_handler import ConnectionHandler -from application.p2p.deprecated_node import PORT +from application.p2p.node_connection import NodeConnection from application.p2p.pinger import Pinger from application.p2p.proxied_socket import Socket +from application.settings import NODE_PORT +from application.settings import PROXY_PORT, PROXY_HOST +from application.settings import CONTROL_PORT from application.utils import crypto_funcs as cf +from application.utils.tor_utils import TorUtils + -class Node(threading.Thread): - def __init__(self, host='', port=65432, onion_address=''): - super().__init__() - self.packet_handler = None - self.terminate_flag = threading.Event() - self.dead_time = 45 +class Node: + + def __init__(self, host='', port: int = NODE_PORT, onion_address=''): + # self.packet_handler = None + self.terminate_flag = asyncio.Event() + self.loop = asyncio.get_running_loop() + self.dead_time = 100 self.host = host self.port = port self.onion_address = onion_address self.node_connections = [] self.msgs = {} - self.peers = [] self.banned_address = [] self.public_key, self.private_key = cf.generate_keys() self.id = cf.serialize_key(self.public_key) - self.pinger = Pinger(self).start() - - def node_connected(self, node): - Logger.get_instance().info(f"Connected to node: {node.connected_host}") - if node.connected_host not in self.peers: - self.peers.append(node.connected_host) - # self.send_peers() - - def run(self): - with Socket(self.host, self.port) as sock: - while not self.terminate_flag.is_set(): - try: - conn, addr = sock.accept() - connection_handler = ConnectionHandler(conn, addr, self) - connection_handler.start() - except socket.timeout: - continue - except socket.error as exc: - if exc.errno == errno.ECONNRESET: - Logger.get_instance().error("SocketClosed: %s" % str(exc)) - except Exception as exc: - Logger.get_instance().error(exc) - - def connect(self, host, port=PORT): - if self.is_valid_address(host): - sock = socks.socksocket() - sock.settimeout(60) - sock.setproxy(socks.PROXY_TYPE_SOCKS5, "localhost", 9050, True) - - tor_controller = stem.control.Controller.from_port(port=9051) - tor_controller.authenticate() - tor_controller.new_circuit() + self.pinger = Pinger(self) - Logger.get_instance().info(f"connecting to {host} port {port}") + async def start(self) -> None: + server = await asyncio.start_server(self.handle_connection, + self.host, self.port) + asyncio.create_task(self._serve(server)) - sock.connect((host, 80)) + async def _serve(self, server: asyncio.AbstractServer) -> None: + async with server: + Logger.get_instance().info(f'Serving on {self.host}:{self.port}\n') + await server.serve_forever() - connection_handler = ConnectionHandler(sock, (host, port), self) - connection_handler.start() + async def handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: + addr = writer.get_extra_info("peername") + connection_handler = ConnectionHandler(reader, writer, addr, self) + asyncio.create_task(connection_handler.start()) - def is_valid_address(self, address): + async def connect(self, host: str, port: int) -> None: + if self.is_valid_address(host): + await TorUtils.establish_tor_connection() + try: + reader, writer = await open_connection(host, 80, socks_host=PROXY_HOST, + socks_port=PROXY_PORT, socks_version=5) + Logger.get_instance().info(f"Connected to {host} port {port}") + except Exception as exs: + Logger.get_instance().error(exs) + return + + connection_handler = ConnectionHandler(reader, writer, (host, port), self) + # self.loop.create_task(connection_handler.start()) + await connection_handler.start() + + def is_valid_address(self, address: str) -> bool: if address is self.host or address in self.banned_address: return False - for node in self.node_connections: if node.connected_host == self.host: Logger.get_instance().info("Already connected with this node.") @@ -85,21 +84,26 @@ def is_valid_address(self, address): return True - def stop(self): + def stop(self) -> None: self.terminate_flag.set() + def node_connected(self, node: NodeConnection) -> None: + Logger.get_instance().info(f"Connected to node: {node.connected_host}") + if node.connected_host not in self.node_connections: + self.node_connections.append(node.connected_host) + ''' TODO: All methods from here until final of the file are methods that breaks SOLID. They must be refactored ASAP so we separate responsibilities. ''' - def network_send(self, message, exc=[]): - for i in self.node_connections: - if i.connected_host in exc: + async def network_send(self, message: dict, exc=[]) -> None: + for node_ in self.node_connections: + if node_.connected_host in exc: pass else: - i.send(json.dumps(message)) + await node_.send(json.dumps(message)) - def node_message(self, node, data): + def recieve_message_from_node(self, node, data: str) -> None: try: json.loads(data) except json.decoder.JSONDecodeError: @@ -107,7 +111,7 @@ def node_message(self, node, data): return self.data_handler(json.loads(data), [node.connected_host, self.host]) - def message(self, type, data, overides={}, ex=[]): + async def message(self, type: str, data: str, overides={}, ex=[]) -> None: # time that the message was sent dict = {"type": type, "data": data} if "time" not in dict: @@ -125,10 +129,17 @@ def message(self, type, data, overides={}, ex=[]): dict["sig"] = cf.sign(data, self.private_key) dict = {**dict, **overides} - self.network_send(dict, ex) + await self.network_send(dict, ex) + + async def start_pinger(self): + await self.pinger.start() + + def stop_pinger(self): + self.pinger.stop() + # TODO: change the structure of the lower functions under SOLID @staticmethod - def check_validity(msg): + def check_validity(msg: dict) -> bool: if not ( "time" in msg and "type" in msg @@ -149,7 +160,7 @@ def check_validity(msg): return False return True - def encryption_handler(self, dta): + def encryption_handler(self, dta: dict): if dta["rnid"] == self.id: dta["data"] = cf.decrypt(dta["data"], self.private_key) return dta @@ -158,10 +169,10 @@ def encryption_handler(self, dta): else: return False - def on_message(self, data, sender, private): + def on_message(self, data: str, sender, private) -> None: Logger.get_instance().info("Incomig Message: " + data) - def data_handler(self, dta, n): + def data_handler(self, dta: dict, n) -> bool: if not Node.check_validity(dta): return False @@ -176,7 +187,7 @@ def data_handler(self, dta, n): if type == "msg": self.on_message(data, dta["snid"], bool(dta["rnid"])) - def node_disconnected(self, node): + def node_disconnected(self, node) -> None: Logger.get_instance().info("Disconnected from: " + node.connected_host) - if node.connected_host in self.peers: - self.peers.remove(node.connected_host) + if node.connected_host in self.node_connections: + self.node_connections.remove(node.connected_host) diff --git a/application/p2p/node_connection.py b/application/p2p/node_connection.py index 28f99d8..85c1f77 100644 --- a/application/p2p/node_connection.py +++ b/application/p2p/node_connection.py @@ -1,25 +1,27 @@ +import asyncio +import json import socket import sys import threading import time +from collections import deque import socks from application.logger.logger import Logger from application.utils import crypto_funcs as cf +# from application.p2p.node import Node -class NodeConnection(threading.Thread): - def __init__(self, main_node, sock, id, host, port): - - super(NodeConnection, self).__init__() - socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5, '127.0.0.1', 9050) +class NodeConnection: + def __init__(self, main_node, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, id: str, host: str, port: int): self.connected_host = host self.connected_port = port self.main_node = main_node - self.sock = sock - self.terminate_flag = threading.Event() + self.reader = reader + self.writer = writer + self.terminate_flag = asyncio.Event() self.last_ping = time.time() # Variable for parsing the incoming json messages self.buffer = "" @@ -31,79 +33,101 @@ def __init__(self, main_node, sock, id, host, port): Logger.get_instance().info( "NodeConnection.send: Started with client (" + self.connected_host - + ") '" + + ")" + ":" + str(self.connected_port) - + "'" + + "" ) - def send(self, data): + async def stop(self) -> None: + await self.disconnect_nodes() + self.terminate_flag.set() - try: - data = f"{data}-TSN" - self.sock.sendall(data.encode("utf-8")) + def is_first_peer(self) -> bool: + if len(self.main_node.node_connections) > 1: + return False + return True - except Exception as e: - Logger.get_instance().error( - "NodeConnection.send: Unexpected ercontent/ror:" - + str(sys.exc_info()[0]) - ) - Logger.get_instance().error(f"Exception: {str(e)}") - self.terminate_flag.set() + # BUG: always returns True + def is_node_dead(self): + return time.time() - self.last_ping > self.main_node.dead_time - def stop(self): - self.terminate_flag.set() + async def start(self) -> None: + if self.is_first_peer(): + asyncio.create_task(self.main_node.start_pinger()) + await self._run() - def run(self): - self.sock.settimeout(60.0) + async def _run(self): while not self.terminate_flag.is_set(): - if time.time() - self.last_ping > self.main_node.dead_time: - self.terminate_flag.set() - Logger.get_instance().warning(f"node{self.id} is dead") + # if self.is_node_dead: + # await self.stop() + # Logger.get_instance().warning(f"node {self.id} is dead") - line = "" + line = await self.receive_packet() + if line != "": + self.handle_buffer_overflow(line) + # Get the messages by finding the message ending -TSN + index = self.buffer.find("-TSN") + while index > 0: + index = self.process_message_from_buffer(index) - try: - line = self.sock.recv(4096) + await asyncio.sleep(0.01) - except socket.timeout: - # self.main_node.debug_print("NodeConnection: timeout") - pass + await self.stop() - except Exception as e: - self.terminate_flag.set() - Logger.get_instance().error( - f"NodeConnection: Socket has been terminated ({line})" - ) - Logger.get_instance().error(e) + async def send(self, data: json.JSONEncoder) -> None: + try: + data = f"{data}-TSN" + self.writer.write(data.encode('utf-8')) + await self.writer.drain() - if line != "": - try: - # BUG: possible buffer overflow when no -TSN is found! - self.buffer += str(line.decode("utf-8")) + except ConnectionResetError: + Logger.get_instance().error("Connection is lost") + await self.stop() - except Exception as e: - print(f"NodeConnection: Decoding line error | {str(e)}") + except Exception as e: + Logger.get_instance().error( + "NodeConnection.send: Unexpected ercontent/ror:" + + str(sys.exc_info()[0]) + ) + Logger.get_instance().error(f"Exception: {str(e)}") + await self.stop() - # Get the messages by finding the message ending -TSN - index = self.buffer.find("-TSN") - while index > 0: - message = self.buffer[:index] - self.buffer = self.buffer[index + 4::] + async def receive_packet(self): + try: + line = await self.reader.read(4096) + except Exception as e: + self.terminate_flag.set() + Logger.get_instance().error( + f"NodeConnection: Socket has been terminated ({e})" + ) + Logger.get_instance().error(e) + return "" + return line + + def handle_buffer_overflow(self, line): + try: + self.buffer += str(line.decode("utf-8")) + except Exception as e: + print(f"NodeConnection: Decoding line error | {str(e)}") + + def process_message_from_buffer(self, index): + message = self.buffer[:index] + self.buffer = self.buffer[index + 4::] - if message == "ping": - self.last_ping = time.time() - # self.main_node.debug_print("ping from " + self.id) - else: - self.main_node.node_message(self, message) + if message == "ping": + self.last_ping = time.time() + else: + self.main_node.recieve_message_from_node(self, message) - index = self.buffer.find("-TSN") + return self.buffer.find("-TSN") - time.sleep(0.01) + async def disconnect_nodes(self): + if self.is_first_peer(): + self.main_node.stop_pinger() self.main_node.node_disconnected(self) - self.sock.settimeout(None) - self.sock.close() + self.writer.close() del self.main_node.node_connections[self.main_node.node_connections.index(self)] - time.sleep(1) + await asyncio.sleep(1) diff --git a/application/p2p/pinger.py b/application/p2p/pinger.py index 11c3a04..6417e4d 100644 --- a/application/p2p/pinger.py +++ b/application/p2p/pinger.py @@ -1,25 +1,25 @@ -import threading -import time +import asyncio from application.logger.logger import Logger -class Pinger(threading.Thread): +class Pinger: + def __init__(self, parent): - self.terminate_flag = threading.Event() - super(Pinger, self).__init__() # CAll Thread.__init__() + self.terminate_flag = asyncio.Event() self.parent = parent self.dead_time = 30 # time to disconect from node if not pinged def stop(self): self.terminate_flag.set() - def run(self): + async def start(self): Logger.get_instance().info("Pinger Started") - while ( - not self.terminate_flag.is_set() - ): # Check whether the thread needs to be closed - for i in self.parent.node_connections: - i.send("ping") - time.sleep(20) + while not self.terminate_flag.is_set(): + for node_ in self.parent.node_connections: + await node_.send("ping") + await asyncio.sleep(20) Logger.get_instance().info("Pinger stopped") + + + diff --git a/application/peering/__init__.py b/application/peering/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/application/peering/peer.py b/application/peering/peer.py deleted file mode 100644 index 2e62fb9..0000000 --- a/application/peering/peer.py +++ /dev/null @@ -1,21 +0,0 @@ -# pylint: skip-file - -from pythonp2p import Node - -from application.logger.logger import Logger - - -class PeerNode(Node): - - def __init__(self, host='127.0.0.1', port=65433): - super().__init__(host, port) - self.start() - - def broadcast(self, data): - self.send_message(data) - - def connect(self, ip): - self.connect_to(ip) - - def on_message(self, data, sender, private): - Logger.get_instance().info(data) diff --git a/application/protocol/packet_decoder.py b/application/protocol/packet_decoder.py index 7cfe990..1eea2b4 100644 --- a/application/protocol/packet_decoder.py +++ b/application/protocol/packet_decoder.py @@ -5,7 +5,7 @@ class PacketDecoder: @staticmethod - def decode_packet(packet_bytes): + def decode_packet(packet_bytes: bytes): raw_packet = Packet.from_bytes(packet_bytes) packet_type = PacketType.from_int(raw_packet.type) return PacketFactory.create_packet(packet_type=packet_type, payload=raw_packet.payload) diff --git a/application/protocol/packet_handler.py b/application/protocol/packet_handler.py index ee059c1..206989e 100644 --- a/application/protocol/packet_handler.py +++ b/application/protocol/packet_handler.py @@ -1,41 +1,48 @@ +import asyncio + +from application.protocol.packets.handshake import HandshakePacket from application.protocol import PacketEncoder, PacketDecoder from application.protocol.packet_factory import PacketFactory from application.protocol.type import PacketType class PacketHandler: - def __init__(self, sock): - self.sock = sock + def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + self.reader = reader + self.writer = writer self.receive_buffer = bytearray() self.send_buffer = bytearray() self.sequence_number = 0 self.packet_encoder = PacketEncoder() self.packet_decoder = PacketDecoder() - def send_packet(self, packet): + async def send_packet(self, packet: HandshakePacket) -> None: packet.sequence_number = self.sequence_number encoded_packet = self.packet_encoder.encode_packet(packet) - self.sock.sendall(encoded_packet) + self.writer.write(encoded_packet) self.sequence_number += 1 + await self.writer.drain() - def receive_packet(self): - data = self.sock.recv(4096) + # TODO: change function output and condition + async def receive_packet(self): + data = await self.reader.read(4096) if not data: raise ConnectionError('Connection closed by peer') self.receive_buffer.extend(data) if packet := self.packet_decoder.decode_packet(self.receive_buffer): self.receive_buffer = self.receive_buffer[packet.size:] - return packet + return pack return None - def send_file(self, file_path): + # TODO: add async func + def send_file(self, file_path: str) -> None: with open(file_path, 'rb') as f: for data in iter(lambda: f.read(4096), b''): packet = PacketFactory.create_packet(payload=data) self.send_packet(packet) self.send_packet(PacketFactory.create_packet('')) - def receive_file(self, file_path): + def receive_file(self, file_path: str) -> None: with open(file_path, 'wb') as f: packet = self.receive_packet() while packet.TYPE is not PacketType.END_FILE: diff --git a/application/protocol/packets/packet.py b/application/protocol/packets/packet.py index 7a17b4d..48bd697 100644 --- a/application/protocol/packets/packet.py +++ b/application/protocol/packets/packet.py @@ -11,16 +11,16 @@ def __init__(self, packet_type, sequence_number, payload): self.payload = payload @property - def size(self): + def size(self) -> int: return self.HEADER_LENGTH + len(self.payload) - def to_bytes(self): + def to_bytes(self) -> bytes: payload_length = len(self.payload) header = struct.pack('!BBIH', self.PROTOCOL_VERSION, self.type.value, self.sequence_number, payload_length) return header + self.payload.encode() @classmethod - def from_bytes(cls, byte_data): + def from_bytes(cls, byte_data: bytes): header = byte_data[:cls.HEADER_LENGTH] payload = byte_data[cls.HEADER_LENGTH:] protocol_version, packet_type, sequence_number, payload_length = struct.unpack('!BBIH', header) diff --git a/application/settings.py b/application/settings.py index 47d1641..fdbc409 100644 --- a/application/settings.py +++ b/application/settings.py @@ -1,8 +1,10 @@ import logging import platform - import socks +from application.utils.utils import get_free_port + + APP_NAME = 'DeProtocol' DEFAULT_LOG_LEVEL = logging.DEBUG DEBUG = True @@ -11,8 +13,10 @@ PROXY_PORT = 9050 PROXY_TYPE = socks.PROXY_TYPE_SOCKS5 +CONTROL_PORT = 9051 + NODE_HOST = '127.0.0.1' -NODE_PORT = 65432 +NODE_PORT = get_free_port(65432) system_os = platform.system() @@ -26,8 +30,8 @@ 'path': 'bin/tor/tor', }, 'Darwin': { - 'url': 'https://dist.torproject.org/torbrowser/12.0.5/tor-expert-bundle-12.0.5-osx-x86_64.tar.gz', - 'path': 'Contents/Resources/TorBrowser/Tor/tor', + 'url': 'https://dist.torproject.org/torbrowser/12.0.5/tor-expert-bundle-12.0.5-macos-x86_64.tar.gz', + 'path': 'bin/tor/tor', } } diff --git a/application/utils/portforwardlib.py b/application/utils/portforwardlib.py index aad14b5..b3ea05f 100644 --- a/application/utils/portforwardlib.py +++ b/application/utils/portforwardlib.py @@ -45,8 +45,8 @@ def discover(): for _ in range(10): try: data, from_addr = sock.recvfrom(1024) - # ip = fromaddr[0] - # print "from ip: %s"%ip + ip = fromaddr[0] + print( "from ip: %s"%ip) parsed = re.findall( r"(?P.*?): (?P.*?)\r\n", str(data, "utf-8") ) @@ -59,7 +59,7 @@ def discover(): paths.append(router_path) except socket.error: - """no data yet""" + print("""no data yet""") break return paths @@ -210,11 +210,11 @@ def get_my_ip(router_ip=None): return ret -def forwardPort( - e_port, i_port, router, lan_ip, disable, protocol, time, description, verbose -): - if verbose: - Logger.get_instance().info("Discovering routers...") +def forwardPort(): +# e_port, i_port, router, lan_ip, disable, protocol, time, description, verbose +# ): +# if verbose: +# Logger.get_instance().info("Discovering routers...") res = discover() diff --git a/application/utils/tor_utils.py b/application/utils/tor_utils.py index fb485b1..d1026f6 100644 --- a/application/utils/tor_utils.py +++ b/application/utils/tor_utils.py @@ -1,32 +1,38 @@ # pylint: skip-file import os -import platform -import subprocess import tarfile +import asyncio +import aiohttp +import aiofiles -import requests from tqdm import tqdm +import stem.control from application.logger.logger import Logger from application.settings import TOR_BINARIES_FILENAME from application.settings import TOR_BINARIES_URL +from application.settings import CONTROL_PORT class TorUtils: - @staticmethod - def download_and_install(): + + async def install(self): if os.path.isfile(TOR_BINARIES_FILENAME): os.remove(TOR_BINARIES_FILENAME) Logger.get_instance().warning( 'A tor installation was found in your system, if DeProtocol is not working please delete tor.tar.gz' ) return - response = requests.get(TOR_BINARIES_URL) - Logger.get_instance().info("Tor Client binaries downloaded") - with open('tor.tar.gz', 'wb') as f: - f.write(response.content) + async with aiohttp.ClientSession() as session: + response = await session.get(TOR_BINARIES_URL) + result = await response.content.read() + Logger.get_instance().info("Tor Client binaries downloaded") + async with aiofiles.open('tor.tar.gz', 'wb') as f: + await f.write(result) + self.write() + def write(self): try: with tarfile.open('tor.tar.gz', 'r:gz') as tar: members = tar.getmembers() @@ -35,3 +41,24 @@ def download_and_install(): Logger.get_instance().info('Tor Client binaries were successfully decompressed') except Exception as exc: Logger.get_instance().error(exc) + + @staticmethod + async def establish_tor_connection(return_=False): + try: + tor_controller = stem.control.Controller.from_port(port=CONTROL_PORT) + await tor_controller.authenticate() + await tor_controller.new_circuit() + await asyncio.sleep(0.2) + if return_: + return tor_controller + except TypeError as exs: + Logger.get_instance().error( + """ + + You are using an outdated version of stem that does not support the current functionality of the protocol. + Since pip does not detect the latest version of stem, install the current version in the following way: + 1. pip uninstall stem + 2. pip install git+https://github.com/torproject/stem.git + 3. restart app + """) + raise exs diff --git a/main.py b/main.py index b6ae121..7503ccf 100644 --- a/main.py +++ b/main.py @@ -1,4 +1,8 @@ +import asyncio from application.app.application import DeProtocol +from application.utils.portforwardlib import forwardPort if __name__ == '__main__': main_app = DeProtocol() + asyncio.run(main_app.on_start()) + diff --git a/requirements.txt b/requirements.txt index b9fb96a..0958107 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,7 +15,7 @@ pytest==7.3.0 pythonp2p==1.7.1 requests==2.28.2 six==1.16.0 -stem==1.8.1 +# stem==1.8.0 tomli==2.0.1 tqdm==4.65.0 urllib3==1.26.15