11import ctypes
2+ import logging
3+ import winreg
4+
25from ctypes import wintypes
36
7+ logger = logging .getLogger (__name__ )
8+
9+
410# Define ULONG_PTR
511if ctypes .sizeof (ctypes .c_void_p ) == 8 : # 64-bit system
612 ULONG_PTR = ctypes .c_uint64
915
1016# Relation types define the type of processor data returned (for core info, we use RelationProcessorCore)
1117RelationProcessorCore = 0 # Type used to identify cores
12-
18+ RelationAll = 0xffff
19+ # Constants for GetSystemCpuSetInformation API
20+ SystemLogicalProcessorInformation = 0 # Not used here, kept for reference
21+ SystemCpuSetInformation = 1
1322
1423# Define GROUP_AFFINITY structure
1524class GROUP_AFFINITY (ctypes .Structure ):
@@ -19,7 +28,6 @@ class GROUP_AFFINITY(ctypes.Structure):
1928 ("Reserved" , wintypes .WORD * 3 ),
2029 ]
2130
22-
2331# Define PROCESSOR_RELATIONSHIP structure
2432class PROCESSOR_RELATIONSHIP (ctypes .Structure ):
2533 _fields_ = [
@@ -30,6 +38,23 @@ class PROCESSOR_RELATIONSHIP(ctypes.Structure):
3038 ("GroupMask" , GROUP_AFFINITY * 1 ), # Array of group masks
3139 ]
3240
41+ class SYSTEM_CPU_SET_INFORMATION (ctypes .Structure ):
42+ _fields_ = [
43+ ("Size" , wintypes .DWORD ), # Size of the structure
44+ ("Type" , wintypes .DWORD ), # Should be 1 (SystemCpuSetInformation)
45+ ("Id" , wintypes .DWORD ), # Logical CPU ID
46+ ("Group" , wintypes .WORD ), # CPU group number
47+ ("LogicalProcessorIndex" , wintypes .BYTE ),
48+ ("CoreIndex" , wintypes .BYTE ),
49+ ("LastLevelCacheIndex" , wintypes .BYTE ),
50+ ("NumaNodeIndex" , wintypes .BYTE ),
51+ ("EfficiencyClass" , wintypes .BYTE ), # Efficiency class field
52+ ("AllFlags" , wintypes .BYTE ),
53+ ("SchedulingClass" , wintypes .BYTE ), # Scheduling class field
54+ ("Reserved" , wintypes .BYTE * 9 ),
55+ ("Reserved2" , wintypes .DWORD ),
56+ ("GroupAffinity" , GROUP_AFFINITY ),
57+ ]
3358
3459# SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX base container
3560class SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX (ctypes .Structure ):
@@ -40,8 +65,8 @@ class SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX(ctypes.Structure):
4065 ]
4166
4267
43- def get_processor_info () -> list [tuple [int , int ]]:
44- ret : list [tuple [int , int ]] = []
68+ def get_processor_info () -> list [tuple [int , int , int ]]:
69+ ret : list [tuple [int , int , int ]] = []
4570
4671 # Load the kernel32.dll library
4772 kernel32 = ctypes .WinDLL ("kernel32" , use_last_error = True )
@@ -64,63 +89,134 @@ def get_processor_info() -> list[tuple[int, int]]:
6489 if not logical_proc_info_ex (RelationProcessorCore , buffer , ctypes .byref (required_size )):
6590 raise ctypes .WinError (ctypes .get_last_error ())
6691
92+ # Call GetSystemCpuSetInformation to retrieve scheduling data
93+ cpu_set_info = get_cpu_set_information () # Use existing helper to fetch CPU set info
94+ scheduling_class_map = {cpu ["Logical Processor Index" ]: cpu ["Scheduling Class" ] for cpu in cpu_set_info }
95+
6796 # Parse the buffer
6897 offset = 0
6998 while offset < required_size .value :
7099 info = ctypes .cast (ctypes .byref (buffer , offset ),
71100 ctypes .POINTER (SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX )).contents
72101 if info .Relationship == RelationProcessorCore :
73- flags = info .Processor .Flags
74102 efficiency_class = info .Processor .EfficiencyClass
75- ret .append ((efficiency_class , flags ))
103+ mask = info .Processor .GroupMask [0 ].Mask
104+ # Map each bit in the affinity mask to logical processors
105+ for i in range (mask .bit_length ()): # Iterate over each bit in the mask (logical processors)
106+ if mask & (1 << i ): # Check if logical processor `i` is part of this physical core group
107+ scheduling_class = scheduling_class_map .get (i , 0 ) # Get Scheduling Class (default to 0)
108+ # Only associate this logical processor (physical core + index connection)
109+ ret .append ((efficiency_class , scheduling_class , mask ))
110+ break # Use only the first logical processor for this physical core
76111
77112 # Move to the next structure in the buffer
78113 offset += info .Size
79114
80115 return ret
81116
82117
83- def get_p_core_affinity ():
84- kernel32 = ctypes .WinDLL ("kernel32" , use_last_error = True )
85- logical_proc_info_ex = kernel32 .GetLogicalProcessorInformationEx
86- logical_proc_info_ex .restype = wintypes .BOOL
87- logical_proc_info_ex .argtypes = [wintypes .DWORD , ctypes .c_void_p , ctypes .POINTER (wintypes .DWORD )]
118+ def get_cpu_set_information ():
119+ """
120+ Queries logical CPU information using GetSystemCpuSetInformation and retrieves relevant fields.
88121
89- # Step 1: Retrieve the buffer size required
90- required_size = wintypes .DWORD (0 )
91- if not logical_proc_info_ex (RelationProcessorCore , None , ctypes .byref (required_size )):
92- if ctypes .get_last_error () != 122 : # ERROR_INSUFFICIENT_BUFFER
93- raise ctypes .WinError (ctypes .get_last_error ())
94-
95- # Step 2: Allocate the buffer
96- buffer = ctypes .create_string_buffer (required_size .value )
122+ Returns:
123+ list[dict]: A list of dictionaries containing information about each logical CPU.
124+ """
125+ kernel32 = ctypes .WinDLL ("Kernel32.dll" )
97126
98- # Step 3: Populate the buffer with processor information
99- if not logical_proc_info_ex (RelationProcessorCore , buffer , ctypes .byref (required_size )):
100- raise ctypes .WinError (ctypes .get_last_error ())
101-
102- # Step 4: Parse the buffer to find all P-cores
103- affinity_mask = 0
127+ kernel32 .GetSystemCpuSetInformation .argtypes = [
128+ ctypes .c_void_p , # PSYSTEM_CPU_SET_INFORMATION (pointer, can be None)
129+ wintypes .ULONG , # ULONG BufferLength (size of the buffer, 0 at first call)
130+ ctypes .POINTER (wintypes .ULONG ), # PULONG ReturnedLength (pointer to get size)
131+ wintypes .HANDLE , # HANDLE Process (handle to the process, NULL for current)
132+ wintypes .ULONG # ULONG Flags (reserved, must be 0)
133+ ]
134+ kernel32 .GetSystemCpuSetInformation .restype = wintypes .BOOL
135+
136+ # Determine required buffer size
137+ buffer_size = wintypes .DWORD (0 )
138+ result = kernel32 .GetSystemCpuSetInformation (
139+ None , # No buffer since we're querying the size
140+ 0 , # BufferLength = 0
141+ ctypes .byref (buffer_size ), # Retrieve required buffer size
142+ None , # Current process
143+ 0 # Reserved
144+ )
145+ if buffer_size .value == 0 :
146+ raise RuntimeError ("GetSystemCpuSetInformation failed to get buffer size" )
147+
148+ # Allocate buffer
149+ buffer = ctypes .create_string_buffer (buffer_size .value )
150+
151+ # Retrieve CPU set information
152+ result = kernel32 .GetSystemCpuSetInformation (
153+ ctypes .byref (buffer ), # Provide allocated buffer
154+ buffer_size , # Size of the buffer
155+ ctypes .byref (buffer_size ), # Size of actual data written
156+ None , # Current process
157+ 0 # Reserved
158+ )
159+
160+ if result == 0 :
161+ raise RuntimeError ("GetSystemCpuSetInformation failed to retrieve information" )
162+
163+ # Parse buffer
164+ cpu_info_list = []
104165 offset = 0
105- while offset < required_size .value :
106- info = ctypes .cast (
166+ while offset < buffer_size .value :
167+ # Cast buffer to SYSTEM_CPU_SET_INFORMATION structure
168+ cpu_info = ctypes .cast (
107169 ctypes .byref (buffer , offset ),
108- ctypes .POINTER (SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX )
170+ ctypes .POINTER (SYSTEM_CPU_SET_INFORMATION )
109171 ).contents
110172
111- if info .Relationship == RelationProcessorCore : # Only handle processor core relationships
112- efficiency_class = info .Processor .EfficiencyClass
113- flags = info .Processor .Flags
114- mask = info .Processor .GroupMask [0 ].Mask # Logical processors bitmask
173+ # Add relevant fields to the dictionary
174+ cpu_info_list .append ({
175+ "CPU Id" : cpu_info .Id ,
176+ "Logical Processor Index" : cpu_info .LogicalProcessorIndex ,
177+ "Core Index" : cpu_info .CoreIndex ,
178+ "Efficiency Class" : cpu_info .EfficiencyClass ,
179+ "Scheduling Class" : cpu_info .SchedulingClass
180+ })
115181
116- # Check if it's a P-core (EfficiencyClass > 0)
117- if efficiency_class > 0 : # Adjust to match your P-core/E-core criteria
118- affinity_mask |= mask # Add this core's logical processors to the affinity mask
182+ # Move to the next structure
183+ offset += cpu_info .Size
119184
120- # Move to the next entry in the buffer
121- offset += info .Size
185+ return cpu_info_list
186+
187+
188+ def get_p_core_affinity () -> int :
189+ """
190+ Calculate the affinity mask for all logical processors associated with performance cores (P-cores).
122191
123- return affinity_mask
192+ Returns:
193+ int: An affinity mask where each bit represents a logical processor, with `1` for P-cores.
194+ """
195+ processor_info = get_processor_info () # Retrieve individual core data
196+ p_core_affinity_mask = 0 # Initialize to zero
197+
198+ for efficiency_class , _ , mask in processor_info :
199+ if efficiency_class > 0 : # Check if this is a P-core
200+ p_core_affinity_mask |= mask # Add this core's bitmask to the affinity mask
201+
202+ return p_core_affinity_mask
203+
204+
205+ def get_e_core_affinity () -> int :
206+ """
207+ Calculate the affinity mask for all logical processors associated with efficiency cores (E-cores).
208+
209+ Returns:
210+ int: An affinity mask where each bit represents a logical processor, with `1` for P-cores.
211+ """
212+ processor_info = get_processor_info () # Retrieve individual core data
213+ e_core_affinity_mask = 0 # Initialize to zero
214+
215+ for efficiency_class , _ , mask in processor_info :
216+ if efficiency_class == 0 : # Check if this is an E-core
217+ e_core_affinity_mask |= mask # Add this core's bitmask to the affinity mask
218+
219+ return e_core_affinity_mask
124220
125221
126222def get_cpus_from_affinity (affinity_mask : int ) -> list [int ]:
@@ -136,7 +232,97 @@ def get_cpus_from_affinity(affinity_mask: int) -> list[int]:
136232 return core_ids
137233
138234
235+ def determine_performance_class (efficiency_class : int , flags : int ) -> int :
236+ """
237+ Determines the performance class of a processor core.
238+
239+ Args:
240+ efficiency_class (int): The processor core's efficiency class as given
241+ by GetLogicalProcessorInformationEx.
242+ flags (int): Processor flags or the group affinity mask.
243+
244+ Returns:
245+ int: The core's performance class.
246+ """
247+ # Example: Map efficiency class & flags to performance class
248+ # You may override or extend this logic depending on architecture
249+ if efficiency_class == 0 :
250+ return 0 # Efficiency cores (E-cores)
251+ elif efficiency_class == 1 :
252+ return 1 # Performance cores (P-cores)
253+ else :
254+ # AMD or Custom: Infer based on flags or other attributes
255+ return (flags >> 2 ) & 0b11 # Example: Mock performance tier from 'flags'
256+
257+
258+ def get_performance_classes () -> dict :
259+ """
260+ Groups logical cores by their performance class.
261+
262+ Returns:
263+ dict: A dictionary mapping each performance class to its associated logical cores.
264+ Keys are `performance_class` integers, and values are combined bitmasks of logical processors.
265+ """
266+ processor_info = get_processor_info () # Retrieve individual core data
267+ performance_classes = {} # Dictionary to store logical processors mapped by performance class
268+
269+ for _ , performance_class , mask in processor_info :
270+ # Group logical processors by performance class
271+ if performance_class not in performance_classes :
272+ performance_classes [performance_class ] = 0
273+ performance_classes [performance_class ] |= mask # Aggregate logical processors for the class using bitwise OR
274+
275+ return performance_classes
276+
277+
278+ def get_cpu_name ():
279+ # Open the registry key for the CPU information
280+ registry_key = winreg .OpenKey (
281+ winreg .HKEY_LOCAL_MACHINE ,
282+ r"HARDWARE\DESCRIPTION\System\CentralProcessor\0"
283+ )
284+
285+ # Read the value of ProcessorNameString
286+ cpu_name , _ = winreg .QueryValueEx (registry_key , "ProcessorNameString" )
287+
288+ # Close the registry key
289+ winreg .CloseKey (registry_key )
290+
291+ return cpu_name .strip ()
292+
293+
294+ def log_cpu ():
295+ logger .debug (f"CPU: { get_cpu_name ()} " )
296+ processor_info = get_processor_info ()
297+ efficiency_classes = sorted ({entry [0 ] for entry in processor_info })
298+ if len (efficiency_classes ) > 1 :
299+ logger .info (f"CPU cores have different efficiency classes: { efficiency_classes } " )
300+ for efficiency_class in sorted (efficiency_classes , reverse = True ):
301+ for eclass , pclass , mask in processor_info :
302+ if eclass != efficiency_class :
303+ continue
304+ logger .debug (f"logical cores with efficiency class { efficiency_class } : { get_cpus_from_affinity (mask )} " )
305+ else :
306+ logger .debug (f"all CPU cores have the same efficiency class { efficiency_classes [0 ]} " )
307+ performance_classes = sorted ({entry [1 ] for entry in processor_info })
308+ if len (performance_classes ) > 1 :
309+ logger .debug (f"CPU cores have different performance classes: { performance_classes } " )
310+ for performance_class in sorted (performance_classes , reverse = True ):
311+ for eclass , pclass , mask in processor_info :
312+ if pclass != performance_class :
313+ continue
314+ logger .debug (f"logical cores with performance class { performance_class } : { get_cpus_from_affinity (mask )} " )
315+ else :
316+ logger .debug (f"all CPU cores have the same performance class { performance_classes [0 ]} " )
317+
318+
139319if __name__ == '__main__' :
320+ print (get_cpu_name ())
140321 print (get_processor_info ())
141- print (get_p_core_affinity ())
142- print (get_cpus_from_affinity (get_p_core_affinity ()))
322+ p_core_affinity_mask = get_p_core_affinity ()
323+ print (f"Performance Cores: { get_cpus_from_affinity (p_core_affinity_mask )} " )
324+ e_core_affinity_mask = get_e_core_affinity ()
325+ print (f"Efficiency Cores: { get_cpus_from_affinity (e_core_affinity_mask )} " )
326+ performance_classes = get_performance_classes ()
327+ for plcass , affinity_mask in performance_classes .items ():
328+ print (f"Performance Class { plcass } : { get_cpus_from_affinity (affinity_mask )} " )
0 commit comments