|
| 1 | +import _overlapped |
| 2 | +import _thread |
1 | 3 | import _winapi |
2 | 4 | import math |
3 | | -import msvcrt |
4 | | -import os |
5 | | -import subprocess |
6 | | -import uuid |
| 5 | +import struct |
7 | 6 | import winreg |
8 | | -from test.support import os_helper |
9 | | -from test.libregrtest.utils import print_warning |
10 | 7 |
|
11 | 8 |
|
12 | | -# Max size of asynchronous reads |
13 | | -BUFSIZE = 8192 |
14 | 9 | # Seconds per measurement |
15 | 10 | SAMPLING_INTERVAL = 1 |
16 | 11 | # Exponential damping factor to compute exponentially weighted moving average |
|
19 | 14 | # Initialize the load using the arithmetic mean of the first NVALUE values |
20 | 15 | # of the Processor Queue Length |
21 | 16 | NVALUE = 5 |
22 | | -# Windows registry subkey of HKEY_LOCAL_MACHINE where the counter names |
23 | | -# of typeperf are registered |
24 | | -COUNTER_REGISTRY_KEY = (r"SOFTWARE\Microsoft\Windows NT\CurrentVersion" |
25 | | - r"\Perflib\CurrentLanguage") |
26 | 17 |
|
27 | 18 |
|
28 | 19 | class WindowsLoadTracker(): |
29 | 20 | """ |
30 | | - This class asynchronously interacts with the `typeperf` command to read |
31 | | - the system load on Windows. Multiprocessing and threads can't be used |
32 | | - here because they interfere with the test suite's cases for those |
33 | | - modules. |
| 21 | + This class asynchronously reads the performance counters to calculate |
| 22 | + the system load on Windows. A "raw" thread is used here to prevent |
| 23 | + interference with the test suite's cases for the threading module. |
34 | 24 | """ |
35 | 25 |
|
36 | 26 | def __init__(self): |
| 27 | + # Pre-flight test for access to the performance data; |
| 28 | + # `PermissionError` will be raised if not allowed |
| 29 | + winreg.QueryInfoKey(winreg.HKEY_PERFORMANCE_DATA) |
| 30 | + |
37 | 31 | self._values = [] |
38 | 32 | self._load = None |
39 | | - self._buffer = '' |
40 | | - self._popen = None |
41 | | - self.start() |
42 | | - |
43 | | - def start(self): |
44 | | - # Create a named pipe which allows for asynchronous IO in Windows |
45 | | - pipe_name = r'\\.\pipe\typeperf_output_' + str(uuid.uuid4()) |
46 | | - |
47 | | - open_mode = _winapi.PIPE_ACCESS_INBOUND |
48 | | - open_mode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE |
49 | | - open_mode |= _winapi.FILE_FLAG_OVERLAPPED |
50 | | - |
51 | | - # This is the read end of the pipe, where we will be grabbing output |
52 | | - self.pipe = _winapi.CreateNamedPipe( |
53 | | - pipe_name, open_mode, _winapi.PIPE_WAIT, |
54 | | - 1, BUFSIZE, BUFSIZE, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL |
55 | | - ) |
56 | | - # The write end of the pipe which is passed to the created process |
57 | | - pipe_write_end = _winapi.CreateFile( |
58 | | - pipe_name, _winapi.GENERIC_WRITE, 0, _winapi.NULL, |
59 | | - _winapi.OPEN_EXISTING, 0, _winapi.NULL |
60 | | - ) |
61 | | - # Open up the handle as a python file object so we can pass it to |
62 | | - # subprocess |
63 | | - command_stdout = msvcrt.open_osfhandle(pipe_write_end, 0) |
64 | | - |
65 | | - # Connect to the read end of the pipe in overlap/async mode |
66 | | - overlap = _winapi.ConnectNamedPipe(self.pipe, overlapped=True) |
67 | | - overlap.GetOverlappedResult(True) |
68 | | - |
69 | | - # Spawn off the load monitor |
70 | | - counter_name = self._get_counter_name() |
71 | | - command = ['typeperf', counter_name, '-si', str(SAMPLING_INTERVAL)] |
72 | | - self._popen = subprocess.Popen(' '.join(command), |
73 | | - stdout=command_stdout, |
74 | | - cwd=os_helper.SAVEDCWD) |
75 | | - |
76 | | - # Close our copy of the write end of the pipe |
77 | | - os.close(command_stdout) |
78 | | - |
79 | | - def _get_counter_name(self): |
80 | | - # accessing the registry to get the counter localization name |
81 | | - with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, COUNTER_REGISTRY_KEY) as perfkey: |
82 | | - counters = winreg.QueryValueEx(perfkey, 'Counter')[0] |
83 | | - |
84 | | - # Convert [key1, value1, key2, value2, ...] list |
85 | | - # to {key1: value1, key2: value2, ...} dict |
86 | | - counters = iter(counters) |
87 | | - counters_dict = dict(zip(counters, counters)) |
88 | | - |
89 | | - # System counter has key '2' and Processor Queue Length has key '44' |
90 | | - system = counters_dict['2'] |
91 | | - process_queue_length = counters_dict['44'] |
92 | | - return f'"\\{system}\\{process_queue_length}"' |
93 | | - |
94 | | - def close(self, kill=True): |
95 | | - if self._popen is None: |
| 33 | + self._running = _overlapped.CreateEvent(None, True, False, None) |
| 34 | + self._stopped = _overlapped.CreateEvent(None, True, False, None) |
| 35 | + |
| 36 | + _thread.start_new_thread(self._update_load, (), {}) |
| 37 | + |
| 38 | + def _update_load(self, |
| 39 | + # localize module access to prevent shutdown errors |
| 40 | + _wait=_winapi.WaitForSingleObject, |
| 41 | + _signal=_overlapped.SetEvent): |
| 42 | + # run until signaled to stop |
| 43 | + while _wait(self._running, 1000): |
| 44 | + self._calculate_load() |
| 45 | + # notify stopped |
| 46 | + _signal(self._stopped) |
| 47 | + |
| 48 | + def _calculate_load(self, |
| 49 | + # localize module access to prevent shutdown errors |
| 50 | + _query=winreg.QueryValueEx, |
| 51 | + _hkey=winreg.HKEY_PERFORMANCE_DATA, |
| 52 | + _unpack=struct.unpack_from): |
| 53 | + # get the 'System' object |
| 54 | + data, _ = _query(_hkey, '2') |
| 55 | + # PERF_DATA_BLOCK { |
| 56 | + # WCHAR Signature[4] 8 + |
| 57 | + # DWOWD LittleEndian 4 + |
| 58 | + # DWORD Version 4 + |
| 59 | + # DWORD Revision 4 + |
| 60 | + # DWORD TotalByteLength 4 + |
| 61 | + # DWORD HeaderLength = 24 byte offset |
| 62 | + # ... |
| 63 | + # } |
| 64 | + obj_start, = _unpack('L', data, 24) |
| 65 | + # PERF_OBJECT_TYPE { |
| 66 | + # DWORD TotalByteLength |
| 67 | + # DWORD DefinitionLength |
| 68 | + # DWORD HeaderLength |
| 69 | + # ... |
| 70 | + # } |
| 71 | + data_start, defn_start = _unpack('4xLL', data, obj_start) |
| 72 | + data_base = obj_start + data_start |
| 73 | + defn_base = obj_start + defn_start |
| 74 | + # find the 'Processor Queue Length' counter (index=44) |
| 75 | + while defn_base < data_base: |
| 76 | + # PERF_COUNTER_DEFINITION { |
| 77 | + # DWORD ByteLength |
| 78 | + # DWORD CounterNameTitleIndex |
| 79 | + # ... [7 DWORDs/28 bytes] |
| 80 | + # DWORD CounterOffset |
| 81 | + # } |
| 82 | + size, idx, offset = _unpack('LL28xL', data, defn_base) |
| 83 | + defn_base += size |
| 84 | + if idx == 44: |
| 85 | + counter_offset = data_base + offset |
| 86 | + # the counter is known to be PERF_COUNTER_RAWCOUNT (DWORD) |
| 87 | + processor_queue_length, = _unpack('L', data, counter_offset) |
| 88 | + break |
| 89 | + else: |
96 | 90 | return |
97 | 91 |
|
98 | | - self._load = None |
99 | | - |
100 | | - if kill: |
101 | | - self._popen.kill() |
102 | | - self._popen.wait() |
103 | | - self._popen = None |
104 | | - |
105 | | - def __del__(self): |
106 | | - self.close() |
107 | | - |
108 | | - def _parse_line(self, line): |
109 | | - # typeperf outputs in a CSV format like this: |
110 | | - # "07/19/2018 01:32:26.605","3.000000" |
111 | | - # (date, process queue length) |
112 | | - tokens = line.split(',') |
113 | | - if len(tokens) != 2: |
114 | | - raise ValueError |
115 | | - |
116 | | - value = tokens[1] |
117 | | - if not value.startswith('"') or not value.endswith('"'): |
118 | | - raise ValueError |
119 | | - value = value[1:-1] |
120 | | - return float(value) |
121 | | - |
122 | | - def _read_lines(self): |
123 | | - overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True) |
124 | | - bytes_read, res = overlapped.GetOverlappedResult(False) |
125 | | - if res != 0: |
126 | | - return () |
127 | | - |
128 | | - output = overlapped.getbuffer() |
129 | | - output = output.decode('oem', 'replace') |
130 | | - output = self._buffer + output |
131 | | - lines = output.splitlines(True) |
132 | | - |
133 | | - # bpo-36670: typeperf only writes a newline *before* writing a value, |
134 | | - # not after. Sometimes, the written line in incomplete (ex: only |
135 | | - # timestamp, without the process queue length). Only pass the last line |
136 | | - # to the parser if it's a valid value, otherwise store it in |
137 | | - # self._buffer. |
138 | | - try: |
139 | | - self._parse_line(lines[-1]) |
140 | | - except ValueError: |
141 | | - self._buffer = lines.pop(-1) |
| 92 | + # We use an exponentially weighted moving average, imitating the |
| 93 | + # load calculation on Unix systems. |
| 94 | + # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation |
| 95 | + # https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average |
| 96 | + if self._load is not None: |
| 97 | + self._load = (self._load * LOAD_FACTOR_1 |
| 98 | + + processor_queue_length * (1.0 - LOAD_FACTOR_1)) |
| 99 | + elif len(self._values) < NVALUE: |
| 100 | + self._values.append(processor_queue_length) |
142 | 101 | else: |
143 | | - self._buffer = '' |
| 102 | + self._load = sum(self._values) / len(self._values) |
144 | 103 |
|
145 | | - return lines |
| 104 | + def close(self, kill=True): |
| 105 | + self.__del__() |
| 106 | + return |
| 107 | + |
| 108 | + def __del__(self, |
| 109 | + # localize module access to prevent shutdown errors |
| 110 | + _wait=_winapi.WaitForSingleObject, |
| 111 | + _close=_winapi.CloseHandle, |
| 112 | + _signal=_overlapped.SetEvent): |
| 113 | + if self._running is not None: |
| 114 | + # tell the update thread to quit |
| 115 | + _signal(self._running) |
| 116 | + # wait for the update thread to signal done |
| 117 | + _wait(self._stopped, -1) |
| 118 | + # cleanup events |
| 119 | + _close(self._running) |
| 120 | + _close(self._stopped) |
| 121 | + self._running = self._stopped = None |
146 | 122 |
|
147 | 123 | def getloadavg(self): |
148 | | - if self._popen is None: |
149 | | - return None |
150 | | - |
151 | | - returncode = self._popen.poll() |
152 | | - if returncode is not None: |
153 | | - self.close(kill=False) |
154 | | - return None |
155 | | - |
156 | | - try: |
157 | | - lines = self._read_lines() |
158 | | - except BrokenPipeError: |
159 | | - self.close() |
160 | | - return None |
161 | | - |
162 | | - for line in lines: |
163 | | - line = line.rstrip() |
164 | | - |
165 | | - # Ignore the initial header: |
166 | | - # "(PDH-CSV 4.0)","\\\\WIN\\System\\Processor Queue Length" |
167 | | - if 'PDH-CSV' in line: |
168 | | - continue |
169 | | - |
170 | | - # Ignore blank lines |
171 | | - if not line: |
172 | | - continue |
173 | | - |
174 | | - try: |
175 | | - processor_queue_length = self._parse_line(line) |
176 | | - except ValueError: |
177 | | - print_warning("Failed to parse typeperf output: %a" % line) |
178 | | - continue |
179 | | - |
180 | | - # We use an exponentially weighted moving average, imitating the |
181 | | - # load calculation on Unix systems. |
182 | | - # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation |
183 | | - # https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average |
184 | | - if self._load is not None: |
185 | | - self._load = (self._load * LOAD_FACTOR_1 |
186 | | - + processor_queue_length * (1.0 - LOAD_FACTOR_1)) |
187 | | - elif len(self._values) < NVALUE: |
188 | | - self._values.append(processor_queue_length) |
189 | | - else: |
190 | | - self._load = sum(self._values) / len(self._values) |
191 | | - |
192 | 124 | return self._load |
0 commit comments