1+ import time
2+ import csv
3+ import datetime
4+ from threading import Thread
5+ # import numpy as np
6+ # import matplotlib.pyplot as plt
7+ # import matplotlib
8+ from src .ppk2_api import PPK2_API
9+
10+ class PowerProfiler ():
11+ def __init__ (self , serial_port = None , source_voltage_mV = 3300 , filename = None ):
12+ """Initialize PPK2 power profiler with serial"""
13+ self .measuring = None
14+ self .measurement_thread = None
15+ self .ppk2 = None
16+
17+ try :
18+ if serial_port :
19+ self .ppk2 = PPK2_API (serial_port )
20+ else :
21+ serial_port = self .discover_port ()
22+ if serial_port :
23+ self .ppk2 = PPK2_API (serial_port )
24+ ret = self .ppk2 .get_modifiers () # try to read modifiers, if it fails serial port is probably not correct
25+ except Exception as e :
26+ ret = None
27+ raise e
28+
29+ if not ret :
30+ self .ppk2 = None
31+ #raise Exception(f"Error when initing PowerProfiler with serial port {serial_port}")
32+ else :
33+ self .ppk2 .use_source_meter ()
34+
35+ self .source_voltage_mV = source_voltage_mV
36+
37+ self .ppk2 .set_source_voltage (self .source_voltage_mV ) # set to 3.3V
38+
39+ self .measuring = False
40+ self .current_measurements = []
41+
42+ # local variables used to calculate power consumption
43+ self .measurement_start_time = None
44+ self .measurement_stop_time = None
45+
46+ time .sleep (1 )
47+
48+ self .stop = False
49+
50+ self .measurement_thread = Thread (target = self .measurement_loop , daemon = True )
51+ self .measurement_thread .start ()
52+
53+ # write to csv
54+ self .filename = filename
55+ if self .filename is not None :
56+ with open (self .filename , 'w' , newline = '' ) as file :
57+ writer = csv .writer (file )
58+ row = []
59+ for key in ["ts" , "avg1000" ]:
60+ row .append (key )
61+ writer .writerow (row )
62+
63+ def write_csv_rows (self , samples ):
64+ """Write csv row"""
65+ with open (self .filename , 'a' , newline = '' ) as file :
66+ writer = csv .writer (file )
67+ for sample in samples :
68+ row = [datetime .datetime .now ().strftime ('%d-%m-%Y %H:%M:%S.%f' ), sample ]
69+ writer .writerow (row )
70+
71+ def delete_power_profiler (self ):
72+ """Join thread"""
73+ self .measuring = False
74+ self .stop = True
75+
76+ if self .measurement_thread :
77+ self .measurement_thread .join ()
78+ self .measurement_thread = None
79+
80+ if self .ppk2 :
81+ self .disable_power ()
82+
83+ def discover_port (self ):
84+ """Discovers ppk2 serial port"""
85+ ppk2s_connected = PPK2_API .list_devices ()
86+ if (len (ppk2s_connected ) == 1 ):
87+ ppk2_port = ppk2s_connected [0 ]
88+ print (f'Found PPK2 at { ppk2_port } ' )
89+ return ppk2_port
90+ else :
91+ print (f'Too many connected PPK2\' s: { ppk2s_connected } ' )
92+ return None
93+
94+ def enable_power (self ):
95+ """Enable ppk2 power"""
96+ if self .ppk2 :
97+ self .ppk2 .toggle_DUT_power ("ON" )
98+ return True
99+ return False
100+
101+ def disable_power (self ):
102+ """Disable ppk2 power"""
103+ if self .ppk2 :
104+ self .ppk2 .toggle_DUT_power ("OFF" )
105+ return True
106+ return False
107+
108+ def measurement_loop (self ):
109+ """Endless measurement loop will run in a thread"""
110+ while True and not self .stop :
111+ if self .measuring : # read data if currently measuring
112+ read_data = self .ppk2 .get_data ()
113+ if read_data != b'' :
114+ #samples = self.ppk2.get_samples(read_data)
115+ samples = self ._average_samples (self .ppk2 .get_samples (read_data ), 1024 ) # optionally average samples
116+ self .current_measurements += samples # can easily sum lists, will append individual data
117+ time .sleep (0.001 ) # TODO figure out correct sleep duration
118+
119+ def _average_samples (self , list , window_size ):
120+ """Average samples based on window size"""
121+ chunks = [list [val :val + window_size ] for val in range (0 , len (list ), window_size )]
122+ avgs = []
123+ for chunk in chunks :
124+ avgs .append (sum (chunk ) / len (chunk ))
125+
126+ return avgs
127+
128+ def start_measuring (self ):
129+ """Start measuring"""
130+ if not self .measuring : # toggle measuring flag only if currently not measuring
131+ self .current_measurements = [] # reset current measurements
132+ self .ppk2 .start_measuring () # send command to ppk2
133+ self .measuring = True # set internal flag
134+ self .measurement_start_time = time .time ()
135+
136+ def stop_measuring (self ):
137+ """Stop measuring and return average of period"""
138+ self .measurement_stop_time = time .time ()
139+ self .measuring = False
140+ self .ppk2 .stop_measuring () # send command to ppk2
141+
142+ #samples_average = self._average_samples(self.current_measurements, 1000)
143+ if self .filename is not None :
144+ self .write_csv_rows (self .current_measurements )
145+
146+ def get_min_current_mA (self ):
147+ return min (self .current_measurements ) / 1000
148+
149+ def get_max_current_mA (self ):
150+ return max (self .current_measurements ) / 1000
151+
152+ def get_average_current_mA (self ):
153+ """Returns average current of last measurement in mA"""
154+ if len (self .current_measurements ) == 0 :
155+ return 0
156+
157+ average_current_mA = (sum (self .current_measurements ) / len (self .current_measurements )) / 1000 # measurements are in microamperes, divide by 1000
158+ return average_current_mA
159+
160+ def get_average_power_consumption_mWh (self ):
161+ """Return average power consumption of last measurement in mWh"""
162+ average_current_mA = self .get_average_current_mA () # convert microamperes to milliamperes
163+ average_power_mW = (self .source_voltage_mV / 1000 ) * average_current_mA # divide by 1000 as source voltage is in millivolts - this gives us milliwatts
164+ measurement_duration_h = self .get_measurement_duration_s () / 3600 # duration in seconds, divide by 3600 to get hours
165+ average_consumption_mWh = average_power_mW * measurement_duration_h
166+ return average_consumption_mWh
167+
168+ def get_average_charge_mC (self ):
169+ """Returns average charge in milli coulomb"""
170+ average_current_mA = self .get_average_current_mA ()
171+ measurement_duration_s = self .get_measurement_duration_s () # in seconds
172+ return average_current_mA * measurement_duration_s
173+
174+ def get_measurement_duration_s (self ):
175+ """Returns duration of measurement"""
176+ measurement_duration_s = (self .measurement_stop_time - self .measurement_start_time ) # measurement duration in seconds
177+ return measurement_duration_s
178+
179+ # pp = PowerProfiler("/dev/ttyACM1")
180+ # pp.start_measuring()
181+ # time.sleep(10)
182+ # pp.stop_measuring()
0 commit comments