|
| 1 | +""" |
| 2 | +This python API is written for use with the Nordic Semiconductor's Power Profiler Kit II (PPK 2). |
| 3 | +The PPK2 uses Serial communication. |
| 4 | +The official nRF Connect Power Profiler was used as a reference: https://github.com/NordicSemiconductor/pc-nrfconnect-ppk |
| 5 | +""" |
| 6 | + |
| 7 | +import serial |
| 8 | +import time |
| 9 | +import struct |
| 10 | +import logging |
| 11 | + |
| 12 | +class PPK2_Command(): |
| 13 | + """Serial command opcodes""" |
| 14 | + NO_OP = 0x00 |
| 15 | + TRIGGER_SET = 0x01 |
| 16 | + AVG_NUM_SET = 0x02 # no-firmware |
| 17 | + TRIGGER_WINDOW_SET = 0x03 |
| 18 | + TRIGGER_INTERVAL_SET = 0x04 |
| 19 | + TRIGGER_SINGLE_SET = 0x05 |
| 20 | + AVERAGE_START = 0x06 |
| 21 | + AVERAGE_STOP = 0x07 |
| 22 | + RANGE_SET = 0x08 |
| 23 | + LCD_SET = 0x09 |
| 24 | + TRIGGER_STOP = 0x0a |
| 25 | + DEVICE_RUNNING_SET = 0x0c |
| 26 | + REGULATOR_SET = 0x0d |
| 27 | + SWITCH_POINT_DOWN = 0x0e |
| 28 | + SWITCH_POINT_UP = 0x0f |
| 29 | + TRIGGER_EXT_TOGGLE = 0x11 |
| 30 | + SET_POWER_MODE = 0x11 |
| 31 | + RES_USER_SET = 0x12 |
| 32 | + SPIKE_FILTERING_ON = 0x15 |
| 33 | + SPIKE_FILTERING_OFF = 0x16 |
| 34 | + GET_META_DATA = 0x19 |
| 35 | + RESET = 0x20 |
| 36 | + SET_USER_GAINS = 0x25 |
| 37 | + |
| 38 | + |
| 39 | +class PPK2_Modes(): |
| 40 | + """PPK2 measurement modes""" |
| 41 | + AMPERE_MODE = "AMPERE_MODE" |
| 42 | + SOURCE_MODE = "SOURCE_MODE" |
| 43 | + |
| 44 | + |
| 45 | +class PPK2_API(): |
| 46 | + def __init__(self, port): |
| 47 | + |
| 48 | + self.ser = None |
| 49 | + self.ser = serial.Serial(port) |
| 50 | + self.ser.baudrate = 9600 |
| 51 | + |
| 52 | + self.modifiers = { |
| 53 | + "Calibrated": None, |
| 54 | + "R": {"0": 1031.64, "1": 101.65, "2": 10.15, "3": 0.94, "4": 0.043}, |
| 55 | + "GS": {"0": 1, "1": 1, "2": 1, "3": 1, "4": 1}, |
| 56 | + "GI": {"0": 1, "1": 1, "2": 1, "3": 1, "4": 1}, |
| 57 | + "O": {"0": 0, "1": 0, "2": 0, "3": 0, "4": 0}, |
| 58 | + "S": {"0": 0, "1": 0, "2": 0, "3": 0, "4": 0}, |
| 59 | + "I": {"0": 0, "1": 0, "2": 0, "3": 0, "4": 0}, |
| 60 | + "UG": {"0": 1, "1": 1, "2": 1, "3": 1, "4": 1}, |
| 61 | + "HW": None, |
| 62 | + "IA": None |
| 63 | + } |
| 64 | + |
| 65 | + self.vdd_low = 800 |
| 66 | + self.vdd_high = 5000 |
| 67 | + |
| 68 | + self.current_vdd = 0 |
| 69 | + |
| 70 | + self.adc_mult = 1.8 / 163840 |
| 71 | + |
| 72 | + self.MEAS_ADC = self._generate_mask(14, 0) |
| 73 | + self.MEAS_RANGE = self._generate_mask(3, 14) |
| 74 | + self.MEAS_LOGIC = self._generate_mask(8, 24) |
| 75 | + |
| 76 | + self.mode = None |
| 77 | + |
| 78 | + self.rolling_avg = None |
| 79 | + self.rolling_avg4 = None |
| 80 | + self.prev_range = None |
| 81 | + self.consecutive_range_samples = 0 |
| 82 | + |
| 83 | + self.spike_filter_alpha = 0.18 |
| 84 | + self.spike_filter_alpha5 = 0.06 |
| 85 | + self.spike_filter_samples = 3 |
| 86 | + self.after_spike = 0 |
| 87 | + |
| 88 | + # adc measurement buffer remainder and len of remainder |
| 89 | + self.remainder = {"sequence": b'', "len": 0} |
| 90 | + |
| 91 | + def __del__(self): |
| 92 | + """Destructor""" |
| 93 | + try: |
| 94 | + if self.ser: |
| 95 | + self.ser.close() |
| 96 | + except Exception as e: |
| 97 | + logging.error(f"An error occured while closing ppk2_api: {e}") |
| 98 | + |
| 99 | + def _pack_struct(self, cmd_tuple): |
| 100 | + """Returns packed struct""" |
| 101 | + return struct.pack("B" * len(cmd_tuple), *cmd_tuple) |
| 102 | + |
| 103 | + def _write_serial(self, cmd_tuple): |
| 104 | + """Writes cmd bytes to serial""" |
| 105 | + try: |
| 106 | + cmd_packed = self._pack_struct(cmd_tuple) |
| 107 | + self.ser.write(cmd_packed) |
| 108 | + except Exception as e: |
| 109 | + logging.error(f"An error occured when writing to serial port: {e}") |
| 110 | + |
| 111 | + def _twos_comp(self, val): |
| 112 | + """Compute the 2's complement of int32 value""" |
| 113 | + if (val & (1 << (32 - 1))) != 0: |
| 114 | + val = val - (1 << 32) # compute negative value |
| 115 | + return val |
| 116 | + |
| 117 | + def _convert_source_voltage(self, mV): |
| 118 | + """Convert input voltage to device command""" |
| 119 | + # minimal possible mV is 800 |
| 120 | + if mV < self.vdd_low: |
| 121 | + mV = self.vdd_low |
| 122 | + |
| 123 | + # maximal possible mV is 5000 |
| 124 | + if mV > self.vdd_high: |
| 125 | + mV = self.vdd_high |
| 126 | + |
| 127 | + offset = 32 |
| 128 | + # get difference to baseline (the baseline is 800mV but the initial offset is 32) |
| 129 | + diff_to_baseline = mV - self.vdd_low + offset |
| 130 | + base_b_1 = 3 |
| 131 | + base_b_2 = 0 # is actually 32 - compensated with above offset |
| 132 | + |
| 133 | + # get the number of times we have to increase the first byte of the command |
| 134 | + ratio = int(diff_to_baseline / 256) |
| 135 | + remainder = diff_to_baseline % 256 # get the remainder for byte 2 |
| 136 | + |
| 137 | + set_b_1 = base_b_1 + ratio |
| 138 | + set_b_2 = base_b_2 + remainder |
| 139 | + |
| 140 | + return set_b_1, set_b_2 |
| 141 | + |
| 142 | + def _read_metadata(self): |
| 143 | + """Read metadata""" |
| 144 | + # try to get metadata from device |
| 145 | + for _ in range(0, 5): |
| 146 | + # it appears the second reading is the metadata |
| 147 | + read = self.ser.read(self.ser.in_waiting) |
| 148 | + time.sleep(0.1) |
| 149 | + |
| 150 | + # TODO add a read_until serial read function with a timeout |
| 151 | + if read != b'' and "END" in read.decode("utf-8"): |
| 152 | + return read.decode("utf-8") |
| 153 | + |
| 154 | + def _parse_metadata(self, metadata): |
| 155 | + """Parse metadata and store it to modifiers""" |
| 156 | + # TODO handle more robustly |
| 157 | + try: |
| 158 | + data_split = [row.split(": ") for row in metadata.split("\n")] |
| 159 | + |
| 160 | + for key in self.modifiers.keys(): |
| 161 | + for data_pair in data_split: |
| 162 | + if key == data_pair[0]: |
| 163 | + self.modifiers[key] = data_pair[1] |
| 164 | + for ind in range(0, 5): |
| 165 | + if key+str(ind) == data_pair[0]: |
| 166 | + if "R" in data_pair[0]: |
| 167 | + # problem on some PPK2s with wrong calibration values - this doesn't fix it |
| 168 | + if float(data_pair[1]) != 0: |
| 169 | + self.modifiers[key][str(ind)] = float( |
| 170 | + data_pair[1]) |
| 171 | + else: |
| 172 | + self.modifiers[key][str(ind)] = float( |
| 173 | + data_pair[1]) |
| 174 | + return True |
| 175 | + except Exception as e: |
| 176 | + # if exception triggers serial port is probably not correct |
| 177 | + return None |
| 178 | + |
| 179 | + def _generate_mask(self, bits, pos): |
| 180 | + pos = pos |
| 181 | + mask = ((2**bits-1) << pos) |
| 182 | + mask = self._twos_comp(mask) |
| 183 | + return {"mask": mask, "pos": pos} |
| 184 | + |
| 185 | + def _get_masked_value(self, value, meas): |
| 186 | + masked_value = (value & meas["mask"]) >> meas["pos"] |
| 187 | + if meas["pos"] == 24: |
| 188 | + if masked_value == 255: |
| 189 | + masked_value = -1 |
| 190 | + return masked_value |
| 191 | + |
| 192 | + def _handle_raw_data(self, adc_value): |
| 193 | + """Convert raw value to analog value""" |
| 194 | + try: |
| 195 | + current_measurement_range = min(self._get_masked_value( |
| 196 | + adc_value, self.MEAS_RANGE), 4) # 5 is the number of parameters |
| 197 | + adc_result = self._get_masked_value(adc_value, self.MEAS_ADC) * 4 |
| 198 | + bits = self._get_masked_value(adc_value, self.MEAS_LOGIC) |
| 199 | + analog_value = self.get_adc_result( |
| 200 | + current_measurement_range, adc_result) * 10**6 |
| 201 | + return analog_value |
| 202 | + except Exception as e: |
| 203 | + print("Measurement outside of range!") |
| 204 | + return None |
| 205 | + |
| 206 | + @staticmethod |
| 207 | + def list_devices(): |
| 208 | + import serial.tools.list_ports |
| 209 | + ports = serial.tools.list_ports.comports() |
| 210 | + devices = [port.device for port in ports if port.product == 'PPK2'] |
| 211 | + return devices |
| 212 | + |
| 213 | + def get_data(self): |
| 214 | + """Return readings of one sampling period""" |
| 215 | + sampling_data = self.ser.read(self.ser.in_waiting) |
| 216 | + return sampling_data |
| 217 | + |
| 218 | + def get_modifiers(self): |
| 219 | + """Gets and sets modifiers from device memory""" |
| 220 | + self._write_serial((PPK2_Command.GET_META_DATA, )) |
| 221 | + metadata = self._read_metadata() |
| 222 | + ret = self._parse_metadata(metadata) |
| 223 | + return ret |
| 224 | + |
| 225 | + def start_measuring(self): |
| 226 | + """Start continous measurement""" |
| 227 | + self._write_serial((PPK2_Command.AVERAGE_START, )) |
| 228 | + |
| 229 | + def stop_measuring(self): |
| 230 | + """Stop continous measurement""" |
| 231 | + self._write_serial((PPK2_Command.AVERAGE_STOP, )) |
| 232 | + |
| 233 | + def set_source_voltage(self, mV): |
| 234 | + """Inits device - based on observation only REGULATOR_SET is the command. |
| 235 | + The other two values correspond to the voltage level. |
| 236 | +
|
| 237 | + 800mV is the lowest setting - [3,32] - the values then increase linearly |
| 238 | + """ |
| 239 | + b_1, b_2 = self._convert_source_voltage(mV) |
| 240 | + self._write_serial((PPK2_Command.REGULATOR_SET, b_1, b_2)) |
| 241 | + self.current_vdd = mV |
| 242 | + |
| 243 | + def toggle_DUT_power(self, state): |
| 244 | + """Toggle DUT power based on parameter""" |
| 245 | + if state == "ON": |
| 246 | + self._write_serial( |
| 247 | + (PPK2_Command.DEVICE_RUNNING_SET, PPK2_Command.TRIGGER_SET)) # 12,1 |
| 248 | + |
| 249 | + if state == "OFF": |
| 250 | + self._write_serial( |
| 251 | + (PPK2_Command.DEVICE_RUNNING_SET, PPK2_Command.NO_OP)) # 12,0 |
| 252 | + |
| 253 | + def use_ampere_meter(self): |
| 254 | + """Configure device to use ampere meter""" |
| 255 | + self.mode = PPK2_Modes.AMPERE_MODE |
| 256 | + self._write_serial((PPK2_Command.SET_POWER_MODE, |
| 257 | + PPK2_Command.TRIGGER_SET)) # 17,1 |
| 258 | + |
| 259 | + def use_source_meter(self): |
| 260 | + """Configure device to use source meter""" |
| 261 | + self.mode = PPK2_Modes.SOURCE_MODE |
| 262 | + self._write_serial((PPK2_Command.SET_POWER_MODE, |
| 263 | + PPK2_Command.AVG_NUM_SET)) # 17,2 |
| 264 | + |
| 265 | + def get_adc_result(self, current_range, adc_value): |
| 266 | + """Get result of adc conversion""" |
| 267 | + current_range = str(current_range) |
| 268 | + result_without_gain = (adc_value - self.modifiers["O"][current_range]) * ( |
| 269 | + self.adc_mult / self.modifiers["R"][current_range]) |
| 270 | + adc = self.modifiers["UG"][current_range] * (result_without_gain * (self.modifiers["GS"][current_range] * result_without_gain + self.modifiers["GI"][current_range]) + ( |
| 271 | + self.modifiers["S"][current_range] * (self.current_vdd / 1000) + self.modifiers["I"][current_range])) |
| 272 | + |
| 273 | + prev_rolling_avg = self.rolling_avg |
| 274 | + prev_rolling_avg4 = self.rolling_avg4 |
| 275 | + |
| 276 | + # spike filtering / rolling average |
| 277 | + if self.rolling_avg is None: |
| 278 | + self.rolling_avg = adc |
| 279 | + else: |
| 280 | + self.rolling_avg = self.spike_filter_alpha * adc + (1 - self.spike_filter_alpha) * self.rolling_avg |
| 281 | + |
| 282 | + if self.rolling_avg4 is None: |
| 283 | + self.rolling_avg4 = adc |
| 284 | + else: |
| 285 | + self.rolling_avg4 = self.spike_filter_alpha5 * adc + (1 - self.spike_filter_alpha5) * self.rolling_avg4 |
| 286 | + |
| 287 | + if self.prev_range is None: |
| 288 | + self.prev_range = current_range |
| 289 | + |
| 290 | + if self.prev_range != current_range or self.after_spike > 0: |
| 291 | + if self.prev_range != current_range: |
| 292 | + self.consecutive_range_samples = 0 |
| 293 | + self.after_spike = self.spike_filter_samples |
| 294 | + else: |
| 295 | + self.consecutive_range_samples += 1 |
| 296 | + |
| 297 | + if current_range == "4": |
| 298 | + if self.consecutive_range_samples < 2: |
| 299 | + self.rolling_avg = prev_rolling_avg |
| 300 | + self.rolling_avg4 = prev_rolling_avg4 |
| 301 | + adc = self.rolling_avg4 |
| 302 | + else: |
| 303 | + adc = self.rolling_avg |
| 304 | + |
| 305 | + self.after_spike -= 1 |
| 306 | + |
| 307 | + self.prev_range = current_range |
| 308 | + return adc |
| 309 | + |
| 310 | + def _digital_to_analog(self, adc_value): |
| 311 | + """Convert discrete value to analog value""" |
| 312 | + return int.from_bytes(adc_value, byteorder="little", signed=False) # convert reading to analog value |
| 313 | + |
| 314 | + def get_samples(self, buf): |
| 315 | + """ |
| 316 | + Returns list of samples read in one sampling period. |
| 317 | + The number of sampled values depends on the delay between serial reads. |
| 318 | + Manipulation of samples is left to the user. |
| 319 | + See example for more info. |
| 320 | + """ |
| 321 | + |
| 322 | + sample_size = 4 # one analog value is 4 bytes in size |
| 323 | + offset = self.remainder["len"] |
| 324 | + samples = [] |
| 325 | + |
| 326 | + first_reading = ( |
| 327 | + self.remainder["sequence"] + buf[0:sample_size-offset])[:4] |
| 328 | + adc_val = self._digital_to_analog(first_reading) |
| 329 | + measurement = self._handle_raw_data(adc_val) |
| 330 | + if measurement is not None: |
| 331 | + samples.append(measurement) |
| 332 | + |
| 333 | + offset = sample_size - offset |
| 334 | + |
| 335 | + while offset <= len(buf) - sample_size: |
| 336 | + next_val = buf[offset:offset + sample_size] |
| 337 | + offset += sample_size |
| 338 | + adc_val = self._digital_to_analog(next_val) |
| 339 | + measurement = self._handle_raw_data(adc_val) |
| 340 | + if measurement is not None: |
| 341 | + samples.append(measurement) |
| 342 | + |
| 343 | + self.remainder["sequence"] = buf[offset:len(buf)] |
| 344 | + self.remainder["len"] = len(buf)-offset |
| 345 | + |
| 346 | + return samples # return list of samples, handle those lists in PPK2 API wrapper |
0 commit comments