-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
361 lines (288 loc) · 11.1 KB
/
utils.py
File metadata and controls
361 lines (288 loc) · 11.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
import platform
import subprocess
from PyQt6.QtWidgets import QLabel, QFrame
from PyQt6.QtCore import pyqtSignal
import sys, os
from PyQt6.QtWidgets import (
QApplication,
QWidget,
QHBoxLayout,
QPushButton,
QButtonGroup,
)
from PyQt6.QtCore import Qt, pyqtSignal # Import pyqtSignal
from datetime import datetime, timedelta
from typing import Any, Callable, Dict, List, Optional, Tuple
import re
COLOR_DARK_BORDER = "#6272a4"
script_dir = os.path.dirname(os.path.abspath(__file__))
settings_path = os.path.join(script_dir, "configs", "settings.ini")
configs_dir = os.path.join(script_dir, "configs")
default_settings_path = os.path.join(script_dir, "src_static", "defaults.ini")
font_directory = os.path.join("src_static", "Open_Sans")
# Object Names for Styling
BTN_GREEN = "btnGreen"
BTN_RED = "btnRed"
BTN_BLUE = "btnBlue"
def parse_duration(s: str) -> timedelta:
"""Parse a duration string in SLURM format to a timedelta object."""
days = 0
if "-" in s:
# Format: D-HH:MM:SS
day_part, time_part = s.split("-")
days = int(day_part)
else:
time_part = s
parts = [int(p) for p in time_part.split(":")]
if len(parts) == 2: # MM:SS
h, m, s = 0, parts[0], parts[1]
elif len(parts) == 3: # HH:MM:SS
h, m, s = parts
else:
raise ValueError(f"Invalid time format: {s}")
return timedelta(days=days, hours=h, minutes=m, seconds=s)
def determine_job_status(state: str, exit_code: str = None) -> str:
"""
Determine the actual job status based on SLURM state and exit code.
Args:
state: SLURM job state (e.g., 'COMPLETED', 'FAILED', etc.)
exit_code: Job exit code (e.g., '0:0', '1:0', etc.)
Returns:
str: Refined job status
"""
# Handle basic state mapping
if state in ["COMPLETED"]:
# For completed jobs, check exit code to determine if truly successful
if exit_code:
try:
# Exit code format is usually "exit_status:signal"
exit_status = exit_code.split(":")[0]
exit_num = int(exit_status)
if exit_num == 0:
return "COMPLETED" # Successful completion
else:
return "FAILED" # Non-zero exit code = failure
except (ValueError, IndexError):
# If we can't parse exit code, assume success for COMPLETED state
return "COMPLETED"
else:
# No exit code available, trust the COMPLETED state
return "COMPLETED"
elif state in ["FAILED", "NODE_FAIL", "BOOT_FAIL", "OUT_OF_MEMORY"]:
return "FAILED"
elif state in ["CANCELLED", "TIMEOUT", "REVOKED", "DEADLINE"]:
return "CANCELLED"
elif state in ["RUNNING", "COMPLETING"]:
return "RUNNING"
elif state in ["PENDING"]:
return "PENDING"
elif state in ["SUSPENDED", "PREEMPTED"]:
return "SUSPENDED"
elif state in ["STOPPED"]:
return "STOPPED"
else:
return state # Return original state if unknown
def parse_memory_size(size_str):
"""Convert memory size string with suffix to bytes as integer"""
# Strip any whitespace and make uppercase for consistency
size_str = size_str.strip().upper()
# Define the multipliers for each unit
multipliers = {
"B": 1,
"K": 1024,
"M": 1024**2,
"G": 1024**3,
"T": 1024**4,
"P": 1024**5,
}
# Extract the number and unit
if size_str[-2:] in ["KB", "MB", "GB", "TB", "PB"]:
number = float(size_str[:-2])
unit = size_str[-2:-1]
else:
number = float(size_str[:-1])
unit = size_str[-1]
# Convert to bytes
bytes_value = int(number * multipliers.get(unit, 1))
return bytes_value
class ClickableLabel(QLabel):
clicked = pyqtSignal()
def mousePressEvent(self, event):
self.clicked.emit()
super().mousePressEvent(event)
class ButtonGroupWidget(QWidget): # Assuming this is a QWidget subclass
selectionChanged = pyqtSignal(str) # Signal that emits a string
def __init__(self, parent=None):
super().__init__(parent)
hbox = QHBoxLayout(self)
self.button_group = QButtonGroup(self)
self.button_group.setExclusive(True) # Ensure only one button can be checked
self.buttons = {}
button_texts = ["ALL", "ME", "PROD", "STUD"]
for text in button_texts:
btn = QPushButton(text)
btn.setObjectName(BTN_GREEN)
btn.setCheckable(True) # Make the button retain its state
hbox.addWidget(btn)
self.button_group.addButton(btn)
self.buttons[text] = btn
if "ALL" in self.buttons:
all_btn = self.buttons["ALL"]
all_btn.setChecked(True)
self.selectionChanged.emit("ALL")
self._update_button_styles(all_btn)
self.button_group.buttonClicked.connect(self._handle_button_click_and_emit)
self.button_group.buttonClicked.connect(self._update_button_styles)
hbox.setContentsMargins(0, 0, 0, 0)
hbox.setSpacing(5)
# Layout is set automatically by passing 'self' to QHBoxLayout constructor
# This slot is connected to button_group.buttonClicked for handling selectionChanged signal
def _handle_button_click_and_emit(self, clicked_button):
"""
Internal slot connected to QButtonGroup.buttonClicked.
It receives the clicked button object and emits our custom signal.
"""
# The clicked_button argument is the button that was just clicked.
# Since QButtonGroup is exclusive, this button *should* now be the checked button.
selected_text = clicked_button.text()
self.selectionChanged.emit(selected_text)
def _update_button_styles(self, clicked_button):
"""
Slot connected to QButtonGroup.buttonClicked.
Updates the objectName of buttons based on which one was clicked.
"""
# clicked_button is the button that was clicked (and is now checked due to exclusivity)
for text, btn in self.buttons.items():
if btn is clicked_button:
# Set the clicked button's objectName to BTN_BLUE
btn.setObjectName(BTN_BLUE)
else:
# Set all other buttons' objectName to BTN_GREEN
btn.setObjectName(BTN_GREEN)
btn.style().polish(btn)
def get_checked_button_text(self):
"""Convenience method to get the text of the currently checked button."""
checked_btn = self.button_group.checkedButton()
if checked_btn:
return checked_btn.text()
return None
def create_separator(shape=QFrame.Shape.HLine, color=COLOR_DARK_BORDER):
"""Creates a styled separator QFrame."""
separator = QFrame()
separator.setFrameShape(shape)
separator.setFrameShadow(QFrame.Shadow.Sunken)
separator.setStyleSheet(f"background-color: {color};")
if shape == QFrame.Shape.HLine:
separator.setFixedHeight(1)
else:
separator.setFixedWidth(1)
return separator
def _expand_node_range(node_string: str) -> List[str]:
"""
Expands a Slurm node string with ranges into a full list of node names.
Example: 'hpc-[01-03],hpc-10' -> ['hpc-01', 'hpc-02', 'hpc-03', 'hpc-10']
"""
match = re.search(r'\[([\d,-]+)\]', node_string)
if not match:
return [node_string]
prefix = node_string[:match.start()]
suffix = node_string[match.end():]
range_spec = match.group(1)
nodes = []
for part in range_spec.split(','):
if '-' in part:
start_str, end_str = part.split('-')
start, end = int(start_str), int(end_str)
padding = len(start_str)
for i in range(start, end + 1):
node_num = str(i).zfill(padding)
nodes.append(f"{prefix}{node_num}{suffix}")
else:
nodes.append(f"{prefix}{part}{suffix}")
return nodes
def _split_csv_outside_brackets(value: str) -> List[str]:
"""Split comma-separated values while preserving bracketed ranges."""
parts = []
current = []
depth = 0
for char in value:
if char == "[":
depth += 1
elif char == "]" and depth > 0:
depth -= 1
if char == "," and depth == 0:
part = "".join(current).strip()
if part:
parts.append(part)
current = []
continue
current.append(char)
tail = "".join(current).strip()
if tail:
parts.append(tail)
return parts
def _split_reservation_blocks(raw_text: str) -> List[str]:
"""Split reservation output into blocks, handling with/without blank lines."""
blocks = []
current = []
for raw_line in raw_text.splitlines():
line = raw_line.strip()
if not line:
if current:
blocks.append(" ".join(current))
current = []
continue
if line.startswith("ReservationName=") and current:
blocks.append(" ".join(current))
current = [line]
continue
current.append(line)
if current:
blocks.append(" ".join(current))
return blocks
def parse_slurm_reservations(raw_text: str) -> List[Dict[str, Any]]:
"""
Parses the raw output of 'scontrol show reservation' into a list of dictionaries,
extracting only a specific set of fields.
Args:
raw_text: The string output from the scontrol command.
Returns:
A list of dictionaries, where each dictionary represents a reservation.
"""
# Define the only fields we want to keep
target_fields = {
'ReservationName',
'Nodes',
'StartTime',
'EndTime',
'Duration',
'Flags',
'State'
}
reservations = []
reservation_blocks = _split_reservation_blocks(raw_text)
for block in reservation_blocks:
if not block.strip():
continue
res_dict = {}
single_line_block = " ".join(block.split())
pairs = re.findall(r"(\w+)=([\s\S]*?)(?=\s+\w+=|$)", single_line_block)
for key, value in pairs:
# Only process the key if it's in our target list
if key in target_fields:
value = value.strip()
# Handle special parsing for specific keys
if key == 'Nodes':
all_nodes = []
for node_part in _split_csv_outside_brackets(value):
all_nodes.extend(_expand_node_range(node_part))
res_dict[key] = all_nodes
elif key == 'Flags':
res_dict[key] = [flag for flag in value.split(',') if flag]
else:
res_dict[key] = value
reservation_name = str(res_dict.get("ReservationName", "")).lower()
flags = [flag.upper() for flag in res_dict.get("Flags", [])]
if res_dict and ("maint" in reservation_name or "MAINT" in flags):
reservations.append(res_dict)
return reservations