|
| 1 | +import os |
| 2 | +import shutil |
| 3 | +from ampy.files import Files |
| 4 | +from ampy.pyboard import Pyboard |
| 5 | +import serial, threading, time, subprocess |
| 6 | +import serial.tools.list_ports |
| 7 | +import filecmp |
| 8 | + |
| 9 | + |
| 10 | +# Folder to store the last synced XRPLib files |
| 11 | +TEMP_FOLDER = ".temp_xrplib" |
| 12 | + |
| 13 | + |
| 14 | +def ensure_directory_exists(files, directory): |
| 15 | + """Ensure that a directory exists on the Pico. If not, create it.""" |
| 16 | + try: |
| 17 | + if directory != "/": # Root directory always exists |
| 18 | + files.mkdir(directory) |
| 19 | + print(f"Directory {directory} created on Pico.") |
| 20 | + except Exception as e: |
| 21 | + # Ignore the error if the directory already exists |
| 22 | + pass |
| 23 | + |
| 24 | +def copy_file_to_pico(files, local_file_path, pico_destination_path): |
| 25 | + """Copy a single file to the Pico, replacing it if it already exists.""" |
| 26 | + try: |
| 27 | + with open(local_file_path, 'rb') as local_file: |
| 28 | + file_content = local_file.read() |
| 29 | + |
| 30 | + # Write or replace the file on the Pico |
| 31 | + files.put(pico_destination_path, file_content) |
| 32 | + print(f"Successfully copied {local_file_path} to {pico_destination_path} on Pico.") |
| 33 | + |
| 34 | + except Exception as e: |
| 35 | + print(f"Failed to copy file {local_file_path}. Error: {str(e)}") |
| 36 | + |
| 37 | +def copy_directory_to_pico(files, local_directory, pico_destination_directory): |
| 38 | + """Copy an entire directory to the Pico, including all subdirectories and files""" |
| 39 | + try: |
| 40 | + for root, dirs, files_list in os.walk(local_directory): |
| 41 | + relative_path = os.path.relpath(root, local_directory) |
| 42 | + pico_dir_path = os.path.join(pico_destination_directory, relative_path).replace("\\", "/") |
| 43 | + ensure_directory_exists(files, pico_dir_path) |
| 44 | + |
| 45 | + for file_name in files_list: |
| 46 | + local_file_path = os.path.join(root, file_name) |
| 47 | + pico_file_path = os.path.join(pico_dir_path, file_name).replace("\\", "/") |
| 48 | + copy_file_to_pico(files, local_file_path, pico_file_path) |
| 49 | + |
| 50 | + except Exception as e: |
| 51 | + print(f"Failed to copy directory {local_directory}. Error: {str(e)}") |
| 52 | + finally: |
| 53 | + pyb.close() # Ensure the connection to the Pico is closed |
| 54 | + |
| 55 | +def create_temp_folder(): |
| 56 | + """Create a temporary folder to store the last synced XRPLib files.""" |
| 57 | + if not os.path.exists(TEMP_FOLDER): |
| 58 | + os.makedirs(TEMP_FOLDER) |
| 59 | + |
| 60 | +def compare_and_copy(files, local_directory, pico_destination_directory): |
| 61 | + """Compare local XRPLib with the last synced version and copy only changed files.""" |
| 62 | + create_temp_folder() |
| 63 | + for root, dirs, files_list in os.walk(local_directory): |
| 64 | + relative_path = os.path.relpath(root, local_directory) |
| 65 | + pico_dir_path = os.path.join(pico_destination_directory, relative_path).replace("\\", "/") |
| 66 | + temp_dir_path = os.path.join(TEMP_FOLDER, relative_path) |
| 67 | + |
| 68 | + if not os.path.exists(temp_dir_path): |
| 69 | + os.makedirs(temp_dir_path) |
| 70 | + |
| 71 | + for file_name in files_list: |
| 72 | + local_file_path = os.path.join(root, file_name) |
| 73 | + temp_file_path = os.path.join(temp_dir_path, file_name) |
| 74 | + pico_file_path = os.path.join(pico_dir_path, file_name).replace("\\", "/") |
| 75 | + |
| 76 | + if not os.path.exists(temp_file_path) or not filecmp.cmp(local_file_path, temp_file_path, shallow=False): |
| 77 | + # If the file doesn't exist in the temp folder, or it has changed, copy it |
| 78 | + copy_file_to_pico(files, local_file_path, pico_file_path) |
| 79 | + # Update the temp folder with the new file |
| 80 | + shutil.copy2(local_file_path, temp_file_path) |
| 81 | + |
| 82 | +def read_serial_output(port, baudrate=115200, timeout=1): |
| 83 | + """Read and print serial output from the Pico W.""" |
| 84 | + try: |
| 85 | + with serial.Serial(port, baudrate, timeout=timeout) as ser: |
| 86 | + print("Reading serial output...") |
| 87 | + while True: |
| 88 | + if ser.in_waiting > 0: |
| 89 | + output = ser.read(ser.in_waiting).decode('utf-8') |
| 90 | + print(output, end='') # Print without adding extra newlines |
| 91 | + time.sleep(0.1) # Small delay to prevent high CPU usage |
| 92 | + except Exception as e: |
| 93 | + print(f"Failed to read serial output. Error: {str(e)}") |
| 94 | + |
| 95 | +def run_pico_script(port, script_name): |
| 96 | + """Run a MicroPython script on the Pico using ampy.""" |
| 97 | + try: |
| 98 | + subprocess.run(['ampy', '-p', port, 'run', script_name], check=True) |
| 99 | + print(f"Successfully ran {script_name} on Pico.") |
| 100 | + except subprocess.CalledProcessError as e: |
| 101 | + print(f"Failed to run script {script_name}. Error: {str(e)}") |
| 102 | + |
| 103 | +def list_files_on_pico(port): |
| 104 | + """List all files and folders on the Pico.""" |
| 105 | + try: |
| 106 | + pyb = Pyboard(port) |
| 107 | + files = Files(pyb) |
| 108 | + file_list = files.ls('/') |
| 109 | + |
| 110 | + print("Files and folders on Pico:") |
| 111 | + for file in file_list: |
| 112 | + print(file) |
| 113 | + |
| 114 | + except Exception as e: |
| 115 | + print(f"Failed to list files. Error: {str(e)}") |
| 116 | + |
| 117 | +def copy_all_files_to_pico(port, directory=True, main=True, telemetry=True): |
| 118 | + pyb = Pyboard(port) |
| 119 | + files = Files(pyb) |
| 120 | + |
| 121 | + if directory: |
| 122 | + compare_and_copy(files, "XRPLib", "lib/XRPLib") |
| 123 | + if main: |
| 124 | + copy_file_to_pico(files, "mainprogram.py", "mainprogram.py") |
| 125 | + if telemetry: |
| 126 | + copy_file_to_pico(files, "telemetry.txt", "telemetry.txt") |
| 127 | + |
| 128 | +def detect_pico_port(): |
| 129 | + """Auto-detect the Pico W's serial port.""" |
| 130 | + ports = list(serial.tools.list_ports.comports()) |
| 131 | + for port in ports: |
| 132 | + if "usbmodem" in port.device or "COM" in port.device: # Adjust this pattern if needed |
| 133 | + return port.device |
| 134 | + raise IOError("Pico W not found. Please check the connection.") |
| 135 | + |
| 136 | +if __name__ == "__main__": |
| 137 | + |
| 138 | + # Auto-detect the Pico W port |
| 139 | + pico_port = detect_pico_port() |
| 140 | + print(f"Pico W detected on port: {pico_port}") |
| 141 | + |
| 142 | + # Copy any changed files to the Pico |
| 143 | + copy_all_files_to_pico(pico_port, telemetry=False) |
| 144 | + |
| 145 | + # Run the mainprogram.py script on the Pico |
| 146 | + run_pico_script(pico_port, "mainprogram.py") |
0 commit comments