Skip to content

Commit 0a711ce

Browse files
committed
feat(update-check): implement update check service and widget
This commit introduces a new UpdateCheckService that periodically checks for Windows and winget updates based on user-defined configurations. The service loads its settings from the application configuration, allowing users to enable or disable updates, set intervals, and exclude specific updates. Additionally, a corresponding UpdateCheckWidget is created to display the update information in the user interface, utilizing signals to communicate update counts and names. This feature enhances user experience by providing real-time feedback on available updates, promoting better system maintenance.
1 parent 8c69ee4 commit 0a711ce

File tree

3 files changed

+365
-0
lines changed

3 files changed

+365
-0
lines changed
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
import win32com.client
2+
import subprocess
3+
import threading
4+
import logging
5+
from core.event_service import EventService
6+
from core.config import get_config
7+
8+
class UpdateCheckServiceConfig:
9+
@staticmethod
10+
def load_config():
11+
try:
12+
config = get_config(show_error_dialog=True)
13+
except Exception as e:
14+
logging.error(f"Error loading config: {e}")
15+
return None
16+
17+
if 'widgets' in config:
18+
for widget_name, widget_config in config['widgets'].items():
19+
if widget_config.get('type') == 'yasb.update_check.UpdateCheckWidget':
20+
winget_update_options = widget_config['options'].get('winget_update', {})
21+
windows_update_options = widget_config['options'].get('windows_update', {})
22+
winget_update_enabled = winget_update_options.get('enabled', False)
23+
windows_update_enabled = windows_update_options.get('enabled', False)
24+
winget_update_interval = int(winget_update_options.get('interval') * 60)
25+
windows_update_interval = int(windows_update_options.get('interval') * 60)
26+
winget_update_exclude = winget_update_options.get('exclude',[])
27+
windows_update_exclude = windows_update_options.get('exclude',[])
28+
return {
29+
'winget_update_enabled': winget_update_enabled,
30+
'windows_update_enabled': windows_update_enabled,
31+
'winget_update_interval': winget_update_interval,
32+
'windows_update_interval': windows_update_interval,
33+
'winget_update_exclude': winget_update_exclude,
34+
'windows_update_exclude': windows_update_exclude
35+
}
36+
else:
37+
logging.error("No widgets found in the configuration")
38+
return None
39+
40+
class UpdateCheckService:
41+
def __init__(self):
42+
self.event_service = EventService()
43+
44+
config = UpdateCheckServiceConfig.load_config()
45+
if config:
46+
self.winget_update_enabled = config['winget_update_enabled']
47+
self.windows_update_enabled = config['windows_update_enabled']
48+
self.winget_update_interval = config['winget_update_interval']
49+
self.windows_update_interval = config['windows_update_interval']
50+
self.winget_update_exclude = config['winget_update_exclude']
51+
self.windows_update_exclude = config['windows_update_exclude']
52+
else:
53+
self.winget_update_enabled = False
54+
self.windows_update_enabled = False
55+
self.winget_update_interval = 86400
56+
self.windows_update_interval = 86400
57+
58+
def start(self):
59+
if self.windows_update_enabled:
60+
self.start_windows_update_timer()
61+
if self.winget_update_enabled:
62+
self.start_winget_update_timer()
63+
64+
def start_windows_update_timer(self):
65+
thread = threading.Thread(target=self.windows_update_timer_callback)
66+
thread.daemon = True
67+
thread.start()
68+
69+
def start_winget_update_timer(self):
70+
thread = threading.Thread(target=self.winget_update_timer_callback)
71+
thread.daemon = True
72+
thread.start()
73+
74+
def windows_update_timer_callback(self):
75+
update_info = self.get_windows_update()
76+
self.emit_event('windows_update', update_info)
77+
threading.Timer(self.windows_update_interval, self.windows_update_timer_callback).start()
78+
79+
def winget_update_timer_callback(self):
80+
update_info = self.get_winget_update()
81+
self.emit_event('winget_update', update_info)
82+
threading.Timer(self.winget_update_interval, self.winget_update_timer_callback).start()
83+
84+
def emit_event(self, event_name, data):
85+
self.event_service.emit_event(event_name, data)
86+
87+
def get_windows_update(self):
88+
try:
89+
# Create the Windows Update Session
90+
update_session = win32com.client.Dispatch("Microsoft.Update.Session")
91+
update_searcher = update_session.CreateUpdateSearcher()
92+
# Search for updates that are not installed
93+
search_result = update_searcher.Search("IsInstalled=0")
94+
# Check if there are any updates available
95+
if (count := search_result.Updates.Count) > 0:
96+
update_names = [update.Title for update in search_result.Updates if update.Title not in self.windows_update_exclude]
97+
return {"count": count, "names": update_names}
98+
return {"count": 0, "names": []}
99+
except Exception as e:
100+
logging.error(f"Error running windows update: {e}")
101+
102+
def get_winget_update(self):
103+
try:
104+
result = subprocess.run(
105+
['winget', 'upgrade'],
106+
capture_output=True,
107+
text=True,
108+
check=True,
109+
shell=True,
110+
creationflags=subprocess.CREATE_NO_WINDOW
111+
)
112+
# Split the output into lines
113+
lines = result.stdout.strip().split('\n')
114+
# Find the line that starts with "Name", it contains the header
115+
fl = 0
116+
while not lines[fl].startswith("Name"):
117+
fl += 1
118+
# Line fl has the header, we can find char positions for Id, Version, Available, and Source
119+
id_start = lines[fl].index("Id")
120+
version_start = lines[fl].index("Version")
121+
available_start = lines[fl].index("Available")
122+
source_start = lines[fl].index("Source")
123+
# Now cycle through the real packages and split accordingly
124+
upgrade_list = []
125+
126+
for line in lines[fl + 1:]:
127+
# Stop processing when reaching the explicit targeting section
128+
if line.startswith("The following packages have an upgrade available"):
129+
break
130+
if len(line) > (available_start + 1) and not line.startswith('-'):
131+
name = line[:id_start].strip()
132+
if name in self.winget_update_exclude:
133+
continue
134+
id = line[id_start:version_start].strip()
135+
version = line[version_start:available_start].strip()
136+
available = line[available_start:source_start].strip()
137+
software = {
138+
"name": name,
139+
"id": id,
140+
"version": version,
141+
"available_version": available
142+
}
143+
upgrade_list.append(software)
144+
145+
update_names = [f"{software['name']} ({software['id']}): {software['version']} -> {software['available_version']}" for software in upgrade_list]
146+
count = len(upgrade_list)
147+
return {"count": count, "names": update_names}
148+
149+
except subprocess.CalledProcessError as e:
150+
logging.error(f"Error running winget upgrade: {e}")
151+
return {"count": 0, "names": []}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
DEFAULTS = {
2+
'windows_update': {
3+
'enabled': False,
4+
'label': '{count}',
5+
'interval': 1440,
6+
'exclude': []
7+
},
8+
'winget_update': {
9+
'enabled': False,
10+
'label': '{count}',
11+
'interval': 240,
12+
'exclude': []
13+
}
14+
}
15+
16+
VALIDATION_SCHEMA = {
17+
'windows_update': {
18+
'type': 'dict',
19+
'schema': {
20+
'enabled': {
21+
'type': 'boolean',
22+
'default': DEFAULTS['windows_update']['enabled']
23+
},
24+
'label': {
25+
'type': 'string',
26+
'default': DEFAULTS['windows_update']['label']
27+
},
28+
'interval': {
29+
'type': 'integer',
30+
'default': DEFAULTS['windows_update']['interval'],
31+
'min': 30,
32+
'max': 10080
33+
},
34+
'exclude': {
35+
'type': 'list',
36+
'default': DEFAULTS['windows_update']['exclude'],
37+
'schema': {
38+
'type': 'string'
39+
}
40+
}
41+
}
42+
},
43+
'winget_update': {
44+
'type': 'dict',
45+
'schema': {
46+
'enabled': {
47+
'type': 'boolean',
48+
'default': DEFAULTS['winget_update']['enabled']
49+
},
50+
'label': {
51+
'type': 'string',
52+
'default': DEFAULTS['winget_update']['label']
53+
},
54+
'interval': {
55+
'type': 'integer',
56+
'default': DEFAULTS['winget_update']['interval'],
57+
'min': 10,
58+
'max': 10080
59+
},
60+
'exclude': {
61+
'type': 'list',
62+
'default': DEFAULTS['winget_update']['exclude'],
63+
'schema': {
64+
'type': 'string'
65+
}
66+
}
67+
}
68+
}
69+
}
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
import logging
2+
import re
3+
from core.widgets.base import BaseWidget
4+
from core.validation.widgets.yasb.update_check import VALIDATION_SCHEMA
5+
from PyQt6.QtWidgets import QLabel, QHBoxLayout, QWidget
6+
from PyQt6.QtCore import Qt, pyqtSignal
7+
from core.event_service import EventService
8+
9+
try:
10+
from core.utils.widgets.update_check import UpdateCheckService
11+
except ImportError:
12+
UpdateCheckService = None
13+
logging.warning("Failed to load UpdateCheckService Event Listener")
14+
15+
class UpdateCheckWidget(BaseWidget):
16+
validation_schema = VALIDATION_SCHEMA
17+
event_listener = UpdateCheckService
18+
windows_update_signal = pyqtSignal(object)
19+
winget_update_signal = pyqtSignal(object)
20+
21+
def __init__(
22+
self,
23+
windows_update: dict[str, str],
24+
winget_update: dict[str, str]
25+
):
26+
super().__init__(class_name="update-check-widget")
27+
self._event_service = EventService()
28+
29+
self._windows_update = windows_update
30+
self._winget_update = winget_update
31+
32+
self._window_update_enabled = self._windows_update['enabled']
33+
self._windows_update_label = self._windows_update['label']
34+
35+
self._winget_update_enabled = self._winget_update['enabled']
36+
self._winget_update_label = self._winget_update['label']
37+
38+
self.windows_update_data = 0
39+
self.winget_update_data = 0
40+
41+
# Use the main layout directly
42+
self._create_dynamically_label(self._winget_update_label, self._windows_update_label)
43+
44+
self.windows_update_signal.connect(self._on_windows_update_signal)
45+
self._event_service.register_event("windows_update", self.windows_update_signal)
46+
47+
self.winget_update_signal.connect(self._on_winget_update_signal)
48+
self._event_service.register_event("winget_update", self.winget_update_signal)
49+
50+
self.check_and_hide()
51+
self._update_label('winget', 0, [])
52+
self._update_label('windows', 0, [])
53+
54+
def _create_dynamically_label(self, windows_label: str, winget_label: str):
55+
def process_content(label_text, label_type):
56+
# Create a new container for each set of labels
57+
container = QWidget()
58+
container_layout = QHBoxLayout()
59+
container_layout.setSpacing(0)
60+
container_layout.setContentsMargins(0, 0, 0, 0)
61+
container.setLayout(container_layout)
62+
class_name = "windows" if label_type == "windows" else "winget"
63+
container.setProperty("class", f"widget-container {class_name}")
64+
self.widget_layout.addWidget(container)
65+
# Initially hide the container
66+
container.hide()
67+
# Split label text into parts, separating only span tags
68+
label_parts = re.split(r'(<span.*?>.*?</span>)', label_text)
69+
label_parts = [part for part in label_parts if part]
70+
widgets = []
71+
72+
for part in label_parts:
73+
part = part.strip()
74+
if not part:
75+
continue
76+
if '<span' in part and '</span>' in part:
77+
class_name = re.search(r'class=(["\'])([^"\']+?)\1', part)
78+
class_result = class_name.group(2) if class_name else 'icon'
79+
icon = re.sub(r'<span.*?>|</span>', '', part).strip()
80+
label = QLabel(icon)
81+
label.setProperty("class", class_result)
82+
else:
83+
label = QLabel(part)
84+
label.setProperty("class", "label")
85+
86+
label.setAlignment(Qt.AlignmentFlag.AlignCenter)
87+
container_layout.addWidget(label)
88+
widgets.append(label)
89+
90+
return container, widgets
91+
92+
if self._winget_update_enabled:
93+
self._winget_container, self._widget_widget = process_content(self._winget_update_label, "winget")
94+
if self._window_update_enabled:
95+
self._windows_container, self._widget_windows = process_content(self._windows_update_label, "windows")
96+
97+
def _update_label(self, widget_type, data, names):
98+
if widget_type == 'winget':
99+
active_widgets = self._widget_widget
100+
active_label_content = self._winget_update_label
101+
container = self._winget_container
102+
elif widget_type == 'windows':
103+
active_widgets = self._widget_windows
104+
active_label_content = self._windows_update_label
105+
container = self._windows_container
106+
else:
107+
return
108+
109+
label_parts = re.split(r'(<span.*?>.*?</span>)', active_label_content)
110+
label_parts = [part for part in label_parts if part]
111+
widget_index = 0
112+
if data == 0:
113+
container.hide()
114+
return
115+
container.show()
116+
for part in label_parts:
117+
part = part.strip()
118+
if part and widget_index < len(active_widgets) and isinstance(active_widgets[widget_index], QLabel):
119+
if '<span' in part and '</span>' in part:
120+
# This part is an icon
121+
icon = re.sub(r'<span.*?>|</span>', '', part).strip()
122+
active_widgets[widget_index].setText(icon)
123+
else:
124+
# This part is text, replace {count} with actual data
125+
formatted_text = part.format(count=data)
126+
active_widgets[widget_index].setText(formatted_text)
127+
active_widgets[widget_index].setToolTip("\n".join(names))
128+
active_widgets[widget_index].setStyleSheet("""QToolTip { padding:4px;color: #cdd6f4;font-size:12px; background-color: #1e1e2e; border: 1px solid #313244;border-radius: 8px; }""")
129+
widget_index += 1
130+
131+
def _on_windows_update_signal(self, data):
132+
self.windows_update_data = data['count']
133+
self._update_label('windows', self.windows_update_data, data['names'])
134+
self.check_and_hide()
135+
136+
def _on_winget_update_signal(self, data):
137+
self.winget_update_data = data['count']
138+
self._update_label('winget', self.winget_update_data, data['names'])
139+
self.check_and_hide()
140+
141+
def check_and_hide(self):
142+
if self.windows_update_data == 0 and self.winget_update_data == 0:
143+
self.hide()
144+
else:
145+
self.show()

0 commit comments

Comments
 (0)