Skip to content

Commit e781086

Browse files
Create indentity_manager.py
1 parent c99f3f4 commit e781086

File tree

1 file changed

+145
-0
lines changed

1 file changed

+145
-0
lines changed
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
import logging
2+
import subprocess
3+
import random
4+
import time
5+
import uuid
6+
import ipaddress
7+
from typing import Dict, Any
8+
import platform
9+
10+
class IdentityManager:
11+
def __init__(self, logger: logging.Logger):
12+
self.logger = logger
13+
self.original_mac = None
14+
self.original_ip = None
15+
self.interface = self._get_default_interface()
16+
17+
def _get_default_interface(self) -> str:
18+
"""Detects the default network interface."""
19+
system = platform.system()
20+
if system == "Linux":
21+
try:
22+
result = subprocess.run(["route", "-n"], capture_output=True, text=True, check=True)
23+
for line in result.stdout.splitlines():
24+
if "UG" in line and "0.0.0.0" in line:
25+
return line.split()[-1]
26+
except Exception as e:
27+
self.logger.error(f"Error detecting default interface: {e}")
28+
return "eth0" # Default to eth0 if detection fails
29+
elif system == "Darwin":
30+
try:
31+
result = subprocess.run(["route", "-n", "get", "default"], capture_output=True, text=True, check=True)
32+
for line in result.stdout.splitlines():
33+
if "interface:" in line:
34+
return line.split("interface:")[1].strip()
35+
except Exception as e:
36+
self.logger.error(f"Error detecting default interface: {e}")
37+
return "en0" # Default to en0 if detection fails
38+
else:
39+
self.logger.warning(f"Unsupported OS: {system}. Defaulting to eth0.")
40+
return "eth0"
41+
return "eth0"
42+
43+
def get_current_mac(self, interface: str = None) -> str:
44+
interface = interface or self.interface
45+
try:
46+
result = subprocess.run(["ifconfig", interface], capture_output=True, text=True, check=True)
47+
for line in result.stdout.splitlines():
48+
if "ether" in line or "lladdr" in line:
49+
parts = line.split("ether" if "ether" in line else "lladdr")
50+
if len(parts) > 1:
51+
return parts[1].strip().split(" ")[0]
52+
return None
53+
except Exception as e:
54+
self.logger.error(f"Error getting MAC address: {e}")
55+
return None
56+
57+
def get_current_ip(self, interface: str = None) -> str:
58+
interface = interface or self.interface
59+
try:
60+
result = subprocess.run(["ifconfig", interface], capture_output=True, text=True, check=True)
61+
for line in result.stdout.splitlines():
62+
if "inet " in line:
63+
return line.split("inet ")[1].strip().split(" ")[0]
64+
return None
65+
except Exception as e:
66+
self.logger.error(f"Error getting IP address: {e}")
67+
return None
68+
69+
def change_mac_address(self, interface: str = None, new_mac: str = None):
70+
interface = interface or self.interface
71+
if not new_mac:
72+
new_mac = self._generate_random_mac()
73+
try:
74+
self.logger.info(f"Changing MAC address on {interface} to {new_mac}")
75+
subprocess.run(["ifconfig", interface, "down"], check=True)
76+
subprocess.run(["ifconfig", interface, "hw", "ether", new_mac], check=True)
77+
subprocess.run(["ifconfig", interface, "up"], check=True)
78+
self.logger.info(f"MAC address on {interface} changed to {new_mac}")
79+
return True
80+
except Exception as e:
81+
self.logger.error(f"Error changing MAC address: {e}")
82+
return False
83+
84+
def restore_mac_address(self, interface: str = None):
85+
interface = interface or self.interface
86+
if self.original_mac:
87+
self.change_mac_address(interface, self.original_mac)
88+
self.logger.info(f"MAC address on {interface} restored to {self.original_mac}")
89+
else:
90+
self.logger.warning("Original MAC address not recorded.")
91+
92+
def change_ip_address(self, interface: str = None, new_ip: str = None):
93+
interface = interface or self.interface
94+
if not new_ip:
95+
new_ip = self._generate_random_ip()
96+
try:
97+
self.logger.info(f"Changing IP address on {interface} to {new_ip}")
98+
subprocess.run(["ifconfig", interface, "down"], check=True)
99+
subprocess.run(["ifconfig", interface, "inet", new_ip], check=True)
100+
subprocess.run(["ifconfig", interface, "up"], check=True)
101+
self.logger.info(f"IP address on {interface} changed to {new_ip}")
102+
return True
103+
except Exception as e:
104+
self.logger.error(f"Error changing IP address: {e}")
105+
return False
106+
107+
def restore_ip_address(self, interface: str = None):
108+
interface = interface or self.interface
109+
if self.original_ip:
110+
self.change_ip_address(interface, self.original_ip)
111+
self.logger.info(f"IP address on {interface} restored to {self.original_ip}")
112+
else:
113+
self.logger.warning("Original IP address not recorded.")
114+
115+
def _generate_random_mac(self) -> str:
116+
mac = [0x00, 0x16, 0x3e,
117+
random.randint(0x00, 0x7f),
118+
random.randint(0x00, 0xff),
119+
random.randint(0x00, 0xff)]
120+
return ':'.join(map(lambda x: "%02x" % x, mac))
121+
122+
def _generate_random_ip(self) -> str:
123+
while True:
124+
ip = f"10.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(1, 254)}"
125+
try:
126+
ipaddress.ip_address(ip)
127+
return ip
128+
except ValueError:
129+
continue
130+
131+
def start_tor_session(self, data: Dict[str, Any] = None):
132+
self.logger.info(f"Starting Tor session - Data: {data}")
133+
self.original_mac = self.get_current_mac()
134+
self.original_ip = self.get_current_ip()
135+
self.change_mac_address()
136+
self.change_ip_address()
137+
time.sleep(random.uniform(2, 5))
138+
self.logger.info("Tor session started")
139+
140+
def stop_tor_session(self, data: Dict[str, Any] = None):
141+
self.logger.info(f"Stopping Tor session - Data: {data}")
142+
self.restore_mac_address()
143+
self.restore_ip_address()
144+
time.sleep(random.uniform(1, 3))
145+
self.logger.info("Tor session stopped")

0 commit comments

Comments
 (0)