Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 186 additions & 73 deletions tools/espota.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,101 +81,182 @@
sys.stderr.flush()


def serve(remote_addr, local_addr, remote_port, local_port, password, filename, command=FLASH): # noqa: C901
# Create a TCP/IP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_address = (local_addr, local_port)
logging.info("Starting on %s:%s", str(server_address[0]), str(server_address[1]))
try:
sock.bind(server_address)
sock.listen(1)
except Exception as e:
logging.error("Listen Failed: %s", str(e))
return 1

content_size = os.path.getsize(filename)
with open(filename, "rb") as f:
file_md5 = hashlib.md5(f.read()).hexdigest()
logging.info("Upload size: %d", content_size)
message = "%d %d %d %s\n" % (command, local_port, content_size, file_md5)

# Wait for a connection
def send_invitation_and_get_auth_challenge(remote_addr, remote_port, message, md5_target):
"""
Send invitation to ESP device and get authentication challenge.
Returns (success, auth_data, error_message) tuple.
"""
remote_address = (remote_addr, int(remote_port))
inv_tries = 0
data = ""

msg = "Sending invitation to %s " % remote_addr
sys.stderr.write(msg)
sys.stderr.flush()

while inv_tries < 10:
inv_tries += 1
sock2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
remote_address = (remote_addr, int(remote_port))
try:
sent = sock2.sendto(message.encode(), remote_address) # noqa: F841
except: # noqa: E722
sys.stderr.write("failed\n")
sys.stderr.flush()
sock2.close()
logging.error("Host %s Not Found", remote_addr)
return 1
return False, None, "Host %s Not Found" % remote_addr

sock2.settimeout(TIMEOUT)
try:
data = sock2.recv(69).decode() # "AUTH " + 64-char SHA256 nonce
if md5_target:
data = sock2.recv(37).decode() # "AUTH " + 32-char MD5 nonce
else:
data = sock2.recv(69).decode() # "AUTH " + 64-char SHA256 nonce
sock2.close()
break
except: # noqa: E722
sys.stderr.write(".")
sys.stderr.flush()
sock2.close()

sys.stderr.write("\n")
sys.stderr.flush()

if inv_tries == 10:
logging.error("No response from the ESP")
return 1
if data != "OK":
if data.startswith("AUTH"):
nonce = data.split()[1]
return False, None, "No response from the ESP"

return True, data, None


# Generate client nonce (cnonce)
cnonce_text = "%s%u%s%s" % (filename, content_size, file_md5, remote_addr)
cnonce = hashlib.sha256(cnonce_text.encode()).hexdigest()
def authenticate(remote_addr, remote_port, password, md5_target, filename, content_size, file_md5, nonce):
"""
Perform authentication with the ESP device using either MD5 or SHA256 method.
Returns (success, error_message) tuple.
"""
cnonce_text = "%s%u%s%s" % (filename, content_size, file_md5, remote_addr)
remote_address = (remote_addr, int(remote_port))

# PBKDF2-HMAC-SHA256 challenge/response protocol
# The ESP32 stores the password as SHA256 hash, so we need to hash the password first
# 1. Hash the password with SHA256 (to match ESP32 storage)
password_hash = hashlib.sha256(password.encode()).hexdigest()
if md5_target:
# Generate client nonce (cnonce)
cnonce = hashlib.md5(cnonce_text.encode()).hexdigest()

# 2. Derive key using PBKDF2-HMAC-SHA256 with the password hash
salt = nonce + ":" + cnonce
derived_key = hashlib.pbkdf2_hmac("sha256", password_hash.encode(), salt.encode(), 10000)
derived_key_hex = derived_key.hex()
# MD5 challenge/response protocol (insecure, use only for compatibility with old firmwares)
# 1. Hash the password with MD5 (to match ESP32 storage)
password_hash = hashlib.md5(password.encode()).hexdigest()

# 2. Create challenge response
challenge = "%s:%s:%s" % (password_hash, nonce, cnonce)
response = hashlib.md5(challenge.encode()).hexdigest()
expected_response_length = 32
else:
# Generate client nonce (cnonce)
cnonce = hashlib.sha256(cnonce_text.encode()).hexdigest()

# PBKDF2-HMAC-SHA256 challenge/response protocol
# The ESP32 stores the password as SHA256 hash, so we need to hash the password first
# 1. Hash the password with SHA256 (to match ESP32 storage)
password_hash = hashlib.sha256(password.encode()).hexdigest()

# 2. Derive key using PBKDF2-HMAC-SHA256 with the password hash
salt = nonce + ":" + cnonce
derived_key = hashlib.pbkdf2_hmac("sha256", password_hash.encode(), salt.encode(), 10000)
derived_key_hex = derived_key.hex()

# 3. Create challenge response
challenge = derived_key_hex + ":" + nonce + ":" + cnonce
response = hashlib.sha256(challenge.encode()).hexdigest()
expected_response_length = 64

# Send authentication response
sock2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
message = "%d %s %s\n" % (AUTH, cnonce, response)
sock2.sendto(message.encode(), remote_address)
sock2.settimeout(10)
try:
data = sock2.recv(expected_response_length).decode()
except: # noqa: E722
sock2.close()
return False, "No Answer to our Authentication"

if data != "OK":
sock2.close()
return False, data

sock2.close()
return True, None
except Exception as e:
sock2.close()
return False, str(e)

# 3. Create challenge response
challenge = derived_key_hex + ":" + nonce + ":" + cnonce
response = hashlib.sha256(challenge.encode()).hexdigest()

def serve(remote_addr, local_addr, remote_port, local_port, password, md5_target, filename, command=FLASH): # noqa: C901
# Create a TCP/IP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_address = (local_addr, local_port)
logging.info("Starting on %s:%s", str(server_address[0]), str(server_address[1]))
try:
sock.bind(server_address)
sock.listen(1)
except Exception as e:
logging.error("Listen Failed: %s", str(e))
return 1

content_size = os.path.getsize(filename)
with open(filename, "rb") as f:
file_md5 = hashlib.md5(f.read()).hexdigest()
logging.info("Upload size: %d", content_size)
message = "%d %d %d %s\n" % (command, local_port, content_size, file_md5)

# Send invitation and get authentication challenge
success, data, error = send_invitation_and_get_auth_challenge(remote_addr, remote_port, message, md5_target)
if not success:
logging.error(error)
return 1

if data != "OK":
if data.startswith("AUTH"):
nonce = data.split()[1]

# Try authentication with the specified method first
sys.stderr.write("Authenticating...")
sys.stderr.flush()
message = "%d %s %s\n" % (AUTH, cnonce, response)
sock2.sendto(message.encode(), remote_address)
sock2.settimeout(10)
try:
data = sock2.recv(64).decode() # SHA256 produces 64 character response
except: # noqa: E722
sys.stderr.write("FAIL\n")
logging.error("No Answer to our Authentication")
sock2.close()
return 1
if data != "OK":
sys.stderr.write("FAIL\n")
logging.error("%s", data)
sock2.close()
sys.exit(1)
return 1
auth_success, auth_error = authenticate(remote_addr, remote_port, password, md5_target, filename, content_size, file_md5, nonce)

if not auth_success:
# If authentication failed and we're not already using MD5, try with MD5
if not md5_target:
sys.stderr.write("FAIL\n")
logging.warning("Authentication failed with SHA256, retrying with MD5: %s", auth_error)

# Restart the entire process with MD5 to get a fresh nonce
success, data, error = send_invitation_and_get_auth_challenge(remote_addr, remote_port, message, True)
if not success:
logging.error("Failed to re-establish connection for MD5 retry: %s", error)
return 1

if data.startswith("AUTH"):
nonce = data.split()[1]
sys.stderr.write("Retrying with MD5...")
sys.stderr.flush()
auth_success, auth_error = authenticate(remote_addr, remote_port, password, True, filename, content_size, file_md5, nonce)
else:
auth_success = False
auth_error = "Expected AUTH challenge for MD5 retry, got: " + data

if not auth_success:
sys.stderr.write("FAIL\n")
logging.error("Authentication failed with both SHA256 and MD5: %s", auth_error)
return 1
else:
# Already tried MD5 and it failed
sys.stderr.write("FAIL\n")
logging.error("Authentication failed: %s", auth_error)
return 1

sys.stderr.write("OK\n")
else:
logging.error("Bad Answer: %s", data)
sock2.close()
return 1
sock2.close()

logging.info("Waiting for device...")

Expand Down Expand Up @@ -207,7 +288,9 @@
try:
connection.sendall(chunk)
res = connection.recv(10)
last_response_contained_ok = "OK" in res.decode()
response_text = res.decode().strip()
last_response_contained_ok = "OK" in response_text
logging.debug("Chunk response: '%s'", response_text)
except Exception as e:
sys.stderr.write("\n")
logging.error("Error Uploading: %s", str(e))
Expand All @@ -222,26 +305,41 @@
sys.stderr.write("\n")
logging.info("Waiting for result...")
count = 0
while count < 5:
received_any_response = False
while count < 10: # Increased from 5 to 10 attempts
count += 1
connection.settimeout(60)
connection.settimeout(30) # Reduced from 60s to 30s per attempt
try:
data = connection.recv(32).decode()
logging.info("Result: %s", data)
data = connection.recv(32).decode().strip()
received_any_response = True
logging.info("Result attempt %d: '%s'", count, data)

if "OK" in data:
logging.info("Success")
connection.close()
return 0
elif data: # Got some response but not OK
logging.warning("Unexpected response from device: '%s'", data)

except socket.timeout:
logging.debug("Timeout waiting for result (attempt %d/10)", count)
continue
except Exception as e:
logging.error("Error receiving result: %s", str(e))
connection.close()
return 1

logging.error("Error response from device")
connection.close()
return 1
logging.debug("Error receiving result (attempt %d/10): %s", count, str(e))
# Don't return error here, continue trying
continue

# After all attempts, provide detailed error information
if received_any_response:
logging.warning("Upload completed but device sent unexpected response(s). This may still be successful.")
logging.warning("Device might be rebooting to apply firmware - this is normal.")
connection.close()
return 0 # Consider it successful if we got any response and upload completed
else:
logging.error("No response from device after upload completion")
logging.error("This could indicate device reboot (normal) or network issues")
connection.close()
return 1
except Exception as e: # noqa: E722
logging.error("Error: %s", str(e))
finally:
Expand Down Expand Up @@ -269,6 +367,14 @@

# authentication
parser.add_argument("-a", "--auth", dest="auth", help="Set authentication password.", action="store", default="")
parser.add_argument(
"-m",
"--md5-target",
dest="md5_target",
help="Target device is using MD5 checksum. This is insecure, use only for compatibility with old firmwares.",
action="store_true",
default=False,
)

# image
parser.add_argument("-f", "--file", dest="image", help="Image file.", metavar="FILE", default=None)
Expand Down Expand Up @@ -335,7 +441,14 @@
command = SPIFFS

return serve(
options.esp_ip, options.host_ip, options.esp_port, options.host_port, options.auth, options.image, command
options.esp_ip,
options.host_ip,
options.esp_port,
options.host_port,
options.auth,
options.md5_target,
options.image,
command
)


Expand Down
Loading