Skip to content

fix(python): Fixes for Python code scanning alerts #11668

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
3 changes: 2 additions & 1 deletion .github/scripts/merge_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@


def load_package(filename):
pkg = json.load(open(filename))["packages"][0]
with open(filename) as f:
pkg = json.load(f)["packages"][0]
print("Loaded package {0} from {1}".format(pkg["name"], filename), file=sys.stderr)
print("{0} platform(s), {1} tools".format(len(pkg["platforms"]), len(pkg["tools"])), file=sys.stderr)
return pkg
Expand Down
96 changes: 92 additions & 4 deletions libraries/WiFi/examples/WiFiUDPClient/udp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,92 @@
# for messages from the ESP32 board and prints them
import socket
import sys
import subprocess
import platform

def get_interface_ips():
"""Get all available interface IP addresses"""
interface_ips = []

# Try using system commands to get interface IPs
system = platform.system().lower()

try:
if system == "darwin" or system == "linux":
# Use 'ifconfig' on macOS/Linux
result = subprocess.run(['ifconfig'], capture_output=True, text=True, timeout=5)
if result.returncode == 0:
lines = result.stdout.split('\n')
for line in lines:
if 'inet ' in line and '127.0.0.1' not in line:
# Extract IP address from ifconfig output
parts = line.strip().split()
for i, part in enumerate(parts):
if part == 'inet':
if i + 1 < len(parts):
ip = parts[i + 1]
if ip not in interface_ips and ip != '127.0.0.1':
interface_ips.append(ip)
break
elif system == "windows":
# Use 'ipconfig' on Windows
result = subprocess.run(['ipconfig'], capture_output=True, text=True, timeout=5)
if result.returncode == 0:
lines = result.stdout.split('\n')
for line in lines:
if 'IPv4 Address' in line and '127.0.0.1' not in line:
# Extract IP address from ipconfig output
if ':' in line:
ip = line.split(':')[1].strip()
if ip not in interface_ips and ip != '127.0.0.1':
interface_ips.append(ip)
except (subprocess.TimeoutExpired, subprocess.SubprocessError, FileNotFoundError):
pass

# Fallback: try to get IPs using socket methods
if not interface_ips:
try:
# Get all IP addresses associated with the hostname
hostname = socket.gethostname()
ip_list = socket.gethostbyname_ex(hostname)[2]
for ip in ip_list:
if ip not in interface_ips and ip != '127.0.0.1':
interface_ips.append(ip)
except socket.gaierror:
pass

# Fail if no interfaces found
if not interface_ips:
print("Error: No network interfaces found. Please check your network configuration.")
sys.exit(1)

return interface_ips

def select_interface(interface_ips):
"""Ask user to select which interface to bind to"""
if len(interface_ips) == 1:
print(f"Using interface: {interface_ips[0]}")
return interface_ips[0]

print("Multiple network interfaces detected:")
for i, ip in enumerate(interface_ips, 1):
print(f" {i}. {ip}")

while True:
try:
choice = input(f"Select interface (1-{len(interface_ips)}): ").strip()
choice_idx = int(choice) - 1
if 0 <= choice_idx < len(interface_ips):
selected_ip = interface_ips[choice_idx]
print(f"Selected interface: {selected_ip}")
return selected_ip
else:
print(f"Please enter a number between 1 and {len(interface_ips)}")
except ValueError:
print("Please enter a valid number")
except KeyboardInterrupt:
print("\nExiting...")
sys.exit(1)

try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
Expand All @@ -10,15 +96,17 @@
print("Failed to create socket. Error Code : " + str(msg[0]) + " Message " + msg[1])
sys.exit()

# Get available interfaces and let user choose
interface_ips = get_interface_ips()
selected_ip = select_interface(interface_ips)

try:
s.bind(("", 3333))
s.bind((selected_ip, 3333))
except socket.error as msg:
print("Bind failed. Error: " + str(msg[0]) + ": " + msg[1])
sys.exit()

print("Server listening")

print("Server listening")
print(f"Server listening on {selected_ip}:3333")

while 1:
d = s.recvfrom(1024)
Expand Down
Binary file modified tools/espota.exe
Binary file not shown.
8 changes: 6 additions & 2 deletions tools/espota.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@
return 1

content_size = os.path.getsize(filename)
file_md5 = hashlib.md5(open(filename, "rb").read()).hexdigest()
with open(filename, "rb") as f:
file_md5 = hashlib.md5(f.read()).hexdigest()
logging.info("Upload size: %d", content_size)
message = "%d %d %d %s\n" % (command, local_port, content_size, file_md5)

Expand Down Expand Up @@ -163,6 +164,7 @@
sock2.close()

logging.info("Waiting for device...")

try:
sock.settimeout(10)
connection, client_address = sock.accept()
Expand All @@ -172,6 +174,7 @@
logging.error("No response from device")
sock.close()
return 1

try:
with open(filename, "rb") as f:
if PROGRESS:
Expand Down Expand Up @@ -225,7 +228,8 @@
logging.error("Error response from device")
connection.close()
return 1

except Exception as e: # noqa: E722
logging.error("Error: %s", str(e))
finally:
connection.close()

Expand Down
Binary file modified tools/get.exe
Binary file not shown.
61 changes: 54 additions & 7 deletions tools/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,49 @@
dist_dir = current_dir + "/dist/"


def is_safe_archive_path(path):
# Check for absolute paths (both Unix and Windows style)
if path.startswith('/') or (len(path) > 1 and path[1] == ':' and path[2] in '\\/'):
raise ValueError(f"Absolute path not allowed: {path}")

# Normalize the path to handle any path separators
normalized_path = os.path.normpath(path)

# Check for directory traversal attempts using normalized path
if ".." in normalized_path.split(os.sep):
raise ValueError(f"Directory traversal not allowed: {path}")

# Additional check for paths that would escape the target directory
if normalized_path.startswith(".."):
raise ValueError(f"Path would escape target directory: {path}")

# Check for any remaining directory traversal patterns in the original path
# This catches cases that might not be normalized properly
path_parts = path.replace('\\', '/').split('/')
if '..' in path_parts:
raise ValueError(f"Directory traversal not allowed: {path}")

return True


def safe_tar_extract(tar_file, destination):
# Validate all paths before extraction
for member in tar_file.getmembers():
is_safe_archive_path(member.name)

# If all paths are safe, proceed with extraction
tar_file.extractall(destination, filter="tar")


def safe_zip_extract(zip_file, destination):
# Validate all paths before extraction
for name in zip_file.namelist():
is_safe_archive_path(name)

# If all paths are safe, proceed with extraction
zip_file.extractall(destination)


def sha256sum(filename, blocksize=65536):
hash = hashlib.sha256()
with open(filename, "rb") as f:
Expand Down Expand Up @@ -212,6 +255,10 @@ def unpack(filename, destination, force_extract, checksum): # noqa: C901
print("File corrupted or incomplete!")
cfile = None
file_is_corrupted = True
except ValueError as e:
print(f"Security validation failed: {e}")
cfile = None
file_is_corrupted = True

if file_is_corrupted:
corrupted_filename = filename + ".corrupted"
Expand Down Expand Up @@ -243,15 +290,15 @@ def unpack(filename, destination, force_extract, checksum): # noqa: C901
if filename.endswith("tar.gz"):
if not cfile:
cfile = tarfile.open(filename, "r:gz")
cfile.extractall(destination, filter="tar")
safe_tar_extract(cfile, destination)
elif filename.endswith("tar.xz"):
if not cfile:
cfile = tarfile.open(filename, "r:xz")
cfile.extractall(destination, filter="tar")
safe_tar_extract(cfile, destination)
elif filename.endswith("zip"):
if not cfile:
cfile = zipfile.ZipFile(filename)
cfile.extractall(destination)
safe_zip_extract(cfile, destination)
else:
raise NotImplementedError("Unsupported archive type")

Expand Down Expand Up @@ -348,9 +395,8 @@ def get_tool(tool, force_download, force_extract):
urlretrieve(url, local_path, report_progress, context=ctx)
elif "Windows" in sys_name:
r = requests.get(url)
f = open(local_path, "wb")
f.write(r.content)
f.close()
with open(local_path, "wb") as f:
f.write(r.content)
else:
is_ci = os.environ.get("GITHUB_WORKSPACE")
if is_ci:
Expand All @@ -374,7 +420,8 @@ def get_tool(tool, force_download, force_extract):


def load_tools_list(filename, platform):
tools_info = json.load(open(filename))["packages"][0]["tools"]
with open(filename, "r") as f:
tools_info = json.load(f)["packages"][0]["tools"]
tools_to_download = []
for t in tools_info:
if platform == "x86_64-mingw32":
Expand Down
Loading