diff --git a/PKGBUILD b/PKGBUILD index 4178a94..2b81c9e 100644 --- a/PKGBUILD +++ b/PKGBUILD @@ -20,12 +20,15 @@ depends=( 'tesseract-data-eng' 'tesseract-data-rus' 'cliphist' + 'brightnessctl' + 'ddcutil' ) makedepends=( 'python-uv' 'git' 'python-virtualenv' ) +install=mewline.install options=('!debug') source=("git+$url.git") sha256sums=('SKIP') diff --git a/PKGBUILD.stable b/PKGBUILD.stable index 062ad53..0dfe9e2 100644 --- a/PKGBUILD.stable +++ b/PKGBUILD.stable @@ -20,12 +20,15 @@ depends=( 'tesseract-data-eng' 'tesseract-data-rus' 'cliphist' + 'brightnessctl' + 'ddcutil' ) makedepends=( 'python-uv' 'git' 'python-virtualenv' ) +install=mewline.install options=('!debug') source=("$url/archive/refs/tags/v$pkgver.tar.gz") sha256sums=('SKIP') # Автоматическая замена в workflow diff --git a/mewline.install b/mewline.install new file mode 100644 index 0000000..89e18b4 --- /dev/null +++ b/mewline.install @@ -0,0 +1,12 @@ +post_install() { + echo "=> Adding user \${SUDO_USER:-\$USER} to i2c group for external monitor brightness control..." + # Добавляем пользователя, который вызвал makepkg (или sudo pacman -U), в группу i2c + usermod -aG i2c "${SUDO_USER:-$USER}" || true + + echo "=> You might need to reload the i2c-dev module: sudo modprobe i2c-dev" + echo "=> Please logout and login again for group changes to take effect." +} + +post_upgrade() { + post_install +} diff --git a/src/mewline/services/__init__.py b/src/mewline/services/__init__.py index d0ba283..40888d3 100644 --- a/src/mewline/services/__init__.py +++ b/src/mewline/services/__init__.py @@ -5,6 +5,7 @@ from mewline.services.brightness import BrightnessService from mewline.services.cache_notification import NotificationCacheService from mewline.services.notifications import MyNotifications +from mewline.services.privacy import PrivacyService audio_service = Audio() @@ -12,6 +13,7 @@ cache_notification_service = NotificationCacheService() brightness_service = BrightnessService() battery_service = BatteryService() +privacy_service = PrivacyService() bluetooth_client = BluetoothClient() # to run notify closures thus display the status diff --git a/src/mewline/services/brightness.py b/src/mewline/services/brightness.py index 4ab3d60..d532920 100644 --- a/src/mewline/services/brightness.py +++ b/src/mewline/services/brightness.py @@ -1,4 +1,6 @@ import os +import re +import subprocess from pathlib import Path from fabric.core.service import Property @@ -13,44 +15,135 @@ def get_device(path: Path): + if not path.exists(): + return "" for item in path.iterdir(): if item.is_dir(): return item.name - return "" class BrightnessService(Service): - """Service to manage screen brightness levels.""" + """Service to manage screen brightness levels. + + Supports two backends: + - brightnessctl — for laptops / internal screens (backlight devices in + /sys/class/backlight). Changes are detected via a file monitor so the + UI stays in sync automatically. + - ddcutil — for external / desktop monitors that speak DDC/CI over I2C. + The first detected I2C bus is used. The brightness value (VCP 0x10) is + cached locally so reads are instant; writes are sent asynchronously so + the UI never freezes while waiting for the (slow) monitor response. + """ + + # ------------------------------------------------------------------ + # DDC helpers + # ------------------------------------------------------------------ + + def _ddc_detect_bus(self) -> str: + """Return the first I2C bus number (e.g. '6') found by ddcutil, or ''.""" + try: + out = subprocess.check_output( + ["ddcutil", "detect", "--brief"], + text=True, + stderr=subprocess.DEVNULL, + timeout=3, + ) + except Exception: + return "" + m = re.search(r"/dev/i2c-(\d+)", out) + return m.group(1) if m else "" + + def _ddc_get_brightness(self, bus: str) -> tuple[int, int]: + """Return (current, max) brightness from `ddcutil -b BUS getvcp 10 --brief`. + + Typical output: 'VCP 10 C 60 100' => current=60, max=100 + """ + try: + out = subprocess.check_output( + ["ddcutil", "-b", bus, "getvcp", "10", "--brief"], + text=True, + stderr=subprocess.DEVNULL, + timeout=3, + ).strip() + parts = out.split() + if len(parts) >= 5 and parts[0] == "VCP" and parts[1] == "10": + return int(parts[3]), int(parts[4]) + except Exception: # noqa: S110 + pass + return 0, 100 + + # ------------------------------------------------------------------ + # Initialisation + # ------------------------------------------------------------------ def __init__(self, **kwargs): super().__init__(**kwargs) + self.is_ddc: bool = False + self.ddc_bus: str = "" + self._ddc_cached_brightness: int = 0 + self.max_brightness_level: int = -1 + self.base_blacklight_path = Path("/sys/class/backlight") self.screen_device = get_device(self.base_blacklight_path) - self.screen_backlight_path = self.base_blacklight_path / self.screen_device - self.max_brightness_level = self.do_read_max_brightness( - self.screen_backlight_path - ) - if self.screen_device == "": - logger.warning("No backlight devices found!") + # --- Path 1: internal backlight (laptop) ---------------------- + if self.screen_device: + self.screen_backlight_path = ( + self.base_blacklight_path / self.screen_device + ) + self.max_brightness_level = self.do_read_max_brightness( + self.screen_backlight_path + ) + + self.screen_monitor = monitor_file( + str(self.screen_backlight_path / "brightness") + ) + self.screen_monitor.connect( + "changed", + lambda _, file, *args: self.emit( + "screen", + round(int(file.load_bytes()[0].get_data())), + ), + ) + + logger.info( + f"Brightness service initialised for backlight device: {self.screen_device}" + ) return - self.screen_monitor = monitor_file( - str(self.screen_backlight_path / "brightness") - ) - self.screen_monitor.connect( - "changed", - lambda _, file, *args: self.emit( - "screen", - round(int(file.load_bytes()[0].get_data())), - ), + # --- Path 2: external monitor via DDC/CI (desktop) ----------- + if executable_exists("ddcutil"): + bus = self._ddc_detect_bus() + if bus: + cur, mx = self._ddc_get_brightness(bus) + self.is_ddc = True + self.ddc_bus = bus + self._ddc_cached_brightness = cur + self.max_brightness_level = mx if mx > 0 else 100 + logger.info( + f"Brightness service initialised via ddcutil (i2c-{bus}), " + f"current={cur}, max={self.max_brightness_level}" + ) + return + else: + logger.warning( + "ddcutil is installed but no DDC/CI-capable monitors were found." + ) + else: + logger.warning("ddcutil is not installed — DDC/CI brightness unavailable.") + + logger.warning( + "No backlight device and no DDC/CI monitor detected. " + "Brightness control will be unavailable." ) - logger.info(f"Brightness service initialized for device: {self.screen_device}") + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ - def do_read_max_brightness(self, path: str) -> int: + def do_read_max_brightness(self, path: Path) -> int: """Reads the maximum brightness value from the specified path.""" max_brightness_path = os.path.join(path, "max_brightness") if os.path.exists(max_brightness_path): @@ -58,9 +151,17 @@ def do_read_max_brightness(self, path: str) -> int: return int(f.readline()) return -1 # Return -1 if file doesn't exist, indicating an error. + # ------------------------------------------------------------------ + # Property: screen_brightness + # ------------------------------------------------------------------ + @Property(int, "read-write") def screen_brightness(self) -> int: """Property to get or set the screen brightness.""" + if self.is_ddc: + # Reads from DDC are slow (~100 ms), so return the cached value. + return self._ddc_cached_brightness + brightness_path = self.screen_backlight_path / "brightness" if brightness_path.exists(): with open(brightness_path) as f: @@ -71,21 +172,38 @@ def screen_brightness(self) -> int: @screen_brightness.setter def screen_brightness(self, value: int): """Setter for screen brightness property.""" - if not (0 <= value <= self.max_brightness_level): - value = max(0, min(value, self.max_brightness_level)) + value = max(0, min(value, self.max_brightness_level)) + + # --- DDC/CI path (external monitor) -------------------------- + if self.is_ddc and self.ddc_bus: + self._ddc_cached_brightness = value + # Update the UI percentage immediately (don't wait for hardware) + if self.max_brightness_level > 0: + self.emit( + "screen", + int((value / self.max_brightness_level) * 100), + ) + try: + exec_shell_command_async( + f"ddcutil -b {self.ddc_bus} setvcp 10 {value}" + ) + except GLib.Error as e: + logger.error(f"Error setting ddcutil brightness: {e.message}") + except Exception as e: + logger.exception(f"Unexpected error setting ddcutil brightness: {e}") + return + # --- brightnessctl path (laptop) ----------------------------- try: if not executable_exists("brightnessctl"): logger.error("Command brightnessctl not found") + return exec_shell_command_async( f"brightnessctl --device '{self.screen_device}' set {value}" ) self.emit("screen", int((value / self.max_brightness_level) * 100)) - logger.info( - f"Set screen brightness to {value} (out of {self.max_brightness_level})" - ) except GLib.Error as e: logger.error(f"Error setting screen brightness: {e.message}") except Exception as e: diff --git a/src/mewline/services/privacy.py b/src/mewline/services/privacy.py new file mode 100644 index 0000000..083766b --- /dev/null +++ b/src/mewline/services/privacy.py @@ -0,0 +1,246 @@ +import json +import os +import subprocess +import threading + +from fabric.core.service import Property +from fabric.core.service import Service +from gi.repository import GLib + + +def _get_process_name(pid: str) -> str: + try: + with open(f"/proc/{pid}/comm") as f: + return f.read().strip() + except Exception: + return f"pid:{pid}" + + +class PrivacyService(Service): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._cam_active = False + self._mic_active = False + self._screen_active = False + self._loc_active = False + + self.cam_apps: list[str] = [] + self.mic_apps: list[str] = [] + self.screen_apps: list[str] = [] + self.loc_apps: list[str] = [] + + self._is_polling = False + GLib.timeout_add(2000, self._poll) + self._poll() + + @Property(bool, "readable", default_value=False) + def cam_active(self) -> bool: + return self._cam_active + + @Property(bool, "readable", default_value=False) + def mic_active(self) -> bool: + return self._mic_active + + @Property(bool, "readable", default_value=False) + def screen_active(self) -> bool: + return self._screen_active + + @Property(bool, "readable", default_value=False) + def loc_active(self) -> bool: + return self._loc_active + + def _poll(self): + if not self._is_polling: + self._is_polling = True + threading.Thread(target=self._do_poll, daemon=True).start() + return True + + def _do_poll(self): + cam, mic, screen, loc = False, False, False, False + cam_apps, mic_apps, screen_apps, loc_apps = [], [], [], [] + + # 1. Camera – /dev/video* open by some process + try: + for pid in os.listdir("/proc"): + if not pid.isdigit(): + continue + fd_dir = f"/proc/{pid}/fd" + if not os.access(fd_dir, os.R_OK): + continue + try: + for fd in os.listdir(fd_dir): + try: + if os.readlink(f"{fd_dir}/{fd}").startswith("/dev/video"): + cam = True + name = _get_process_name(pid) + if name not in cam_apps: + cam_apps.append(name) + break + except OSError: + continue + except OSError: + continue + except Exception: # noqa: S110 + pass + + # 2. PipeWire Dump parsing (Mic & Screen Sharing identical to privacy-dots.sh) + try: + out = subprocess.check_output( + ["pw-dump"], stderr=subprocess.DEVNULL, text=True, timeout=1 + ) + nodes = json.loads(out) + + # First pass: check if any mic is active globally (Audio/Source running) + # privacy_dots checks if Audio/Source or Audio/Source/Virtual is running + any_mic_running = False + for node in nodes: + if node.get("type") != "PipeWire:Interface:Node": + continue + + info = node.get("info", {}) + state = info.get("state") or node.get("state") + props = info.get("props", {}) + media_class = str(props.get("media.class", "")) + + if ( + media_class in ["Audio/Source", "Audio/Source/Virtual"] + ) and state == "running": + any_mic_running = True + break + + if any_mic_running: + mic = True + # Find the actual apps capturing the mic + for node in nodes: + if node.get("type") != "PipeWire:Interface:Node": + continue + + info = node.get("info", {}) + state = info.get("state") or node.get("state") + props = info.get("props", {}) + media_class = str(props.get("media.class", "")) + app_name = str( + props.get("application.name", "") or props.get("node.name", "") + ).strip() + + # privacy_dots logic: + # media.class == "Stream/Input/Audio" and state == "running" + if ( + media_class == "Stream/Input/Audio" + and state == "running" + and app_name + and app_name.lower() not in {"wireplumber", "pipewire"} + and app_name not in mic_apps + ): + mic_apps.append(app_name) + + # Second pass: check for screen sharing matching privacy_dots regex + # jq logic: test("^(xdph-streaming|gsr-default|game capture)") on media.name + for node in nodes: + info = node.get("info", {}) + props = info.get("props", {}) + + if not props: + continue + + media_name = str(props.get("media.name", "")).lower() + + if ( + media_name.startswith("xdph-streaming") + or media_name.startswith("gsr-default") + or media_name.startswith("game capture") + ): + screen = True + break + + if screen: + # Find the actual apps/names doing the screen share + for node in nodes: + if node.get("type") != "PipeWire:Interface:Node": + continue + + info = node.get("info", {}) + state = info.get("state") or node.get("state") + props = info.get("props", {}) + media_class = str(props.get("media.class", "")) + media_name = str(props.get("media.name", "")) + + if ( + media_class == "Stream/Input/Video" + or media_name == "gsr-default_output" + or media_name == "game capture" + ) and state == "running": + app_name = media_name or "Screen Share" + if app_name not in screen_apps: + screen_apps.append(app_name) + except Exception: # noqa: S110 + pass + + # 3. wlroots direct screencopy fallback (wf-recorder, wl-screenrec, etc) + try: + for recorder_bin in ["wf-recorder", "wl-screenrec"]: + try: + pids = subprocess.check_output( + ["pgrep", "-x", recorder_bin], + stderr=subprocess.DEVNULL, + text=True, + timeout=1, + ).strip() + + if pids: + screen = True + if recorder_bin not in screen_apps: + screen_apps.append(recorder_bin) + except subprocess.CalledProcessError: + pass + except Exception: # noqa: S110 + pass + + # 4. Location – geoclue running + try: + if subprocess.check_output( + ["pgrep", "-f", "geoclue"], + stderr=subprocess.DEVNULL, + text=True, + timeout=1, + ).strip(): + loc = True + loc_apps = ["geoclue"] + except Exception: # noqa: S110 + pass + + GLib.idle_add( + self._update_state, + cam, + mic, + screen, + loc, + cam_apps, + mic_apps, + screen_apps, + loc_apps, + ) + + def _update_state( + self, cam, mic, screen, loc, cam_apps, mic_apps, screen_apps, loc_apps + ): + self.cam_apps = cam_apps + self.mic_apps = mic_apps + self.screen_apps = screen_apps + self.loc_apps = loc_apps + + if cam != self._cam_active: + self._cam_active = cam + self.notify("cam-active") + if mic != self._mic_active: + self._mic_active = mic + self.notify("mic-active") + if screen != self._screen_active: + self._screen_active = screen + self.notify("screen-active") + if loc != self._loc_active: + self._loc_active = loc + self.notify("loc-active") + + self._is_polling = False + return False diff --git a/src/mewline/styles/combined_controls.scss b/src/mewline/styles/combined_controls.scss index ea96bf0..d493aff 100644 --- a/src/mewline/styles/combined_controls.scss +++ b/src/mewline/styles/combined_controls.scss @@ -79,3 +79,8 @@ } } +/* Privacy indicator dots - colours defined in theme variables */ +.privacy-dot-mic { color: theme.$privacy-dot-mic; } +.privacy-dot-cam { color: theme.$privacy-dot-cam; } +.privacy-dot-screen { color: theme.$privacy-dot-screen; } +.privacy-dot-loc { color: theme.$privacy-dot-loc; } diff --git a/src/mewline/styles/default_theme.scss b/src/mewline/styles/default_theme.scss index 9d2fc81..2638d20 100644 --- a/src/mewline/styles/default_theme.scss +++ b/src/mewline/styles/default_theme.scss @@ -13,4 +13,11 @@ $text-muted: #9399b2; $text-on-accent: #11111b; $shadow-color: rgba(0, 0, 0, 0.6); -$corners-color: #000000; \ No newline at end of file +$corners-color: #000000; + +// Privacy indicator dot colours. +// Override any of these in your theme file to customise the dots. +$privacy-dot-mic: #f38ba8; +$privacy-dot-cam: #fab387; +$privacy-dot-screen: #cba6f7; +$privacy-dot-loc: #89b4fa; diff --git a/src/mewline/styles/theme.scss b/src/mewline/styles/theme.scss index 06f5ead..77a2adf 100644 --- a/src/mewline/styles/theme.scss +++ b/src/mewline/styles/theme.scss @@ -1,16 +1,17 @@ $background-base: #11111b; $background-highlight: #181825; $background-element: #1e1e2e; - $accent-color: #b4befe; $accent-error: #f38ba8; $accent-warning: #fab387; $accent-success: #a6e3a1; - $text-color: #cdd6f4; $text-secondary: #585b70; $text-muted: #9399b2; $text-on-accent: #11111b; - $shadow-color: rgba(0, 0, 0, 0.6); $corners-color: #000000; +$privacy-dot-mic: #f38ba8; +$privacy-dot-cam: #fab387; +$privacy-dot-screen: #cba6f7; +$privacy-dot-loc: #89b4fa; diff --git a/src/mewline/utils/hyprland_monitors.py b/src/mewline/utils/hyprland_monitors.py index 41d0673..3b975d6 100644 --- a/src/mewline/utils/hyprland_monitors.py +++ b/src/mewline/utils/hyprland_monitors.py @@ -81,11 +81,6 @@ def get_active_gdk_monitor_id(self) -> int | None: self.get_active_hypr_monitor_name() ) - # Keep the old name around so nothing else breaks. - def get_current_gdk_monitor_id(self) -> int | None: - """Alias for get_active_gdk_monitor_id (backwards-compat).""" - return self.get_active_gdk_monitor_id() - # ------------------------------------------------------------------ # Cursor monitor # ------------------------------------------------------------------ diff --git a/src/mewline/utils/theming.py b/src/mewline/utils/theming.py index 4fd9989..e2e0a17 100644 --- a/src/mewline/utils/theming.py +++ b/src/mewline/utils/theming.py @@ -1,3 +1,4 @@ +import re from pathlib import Path from fabric import Application @@ -8,6 +9,14 @@ from mewline.errors.settings import ExecutableNotFoundError from mewline.utils.misc import executable_exists +# Matches top-level SCSS variable declarations: $name: value; +_SCSS_VAR_RE = re.compile(r"^\$([a-zA-Z0-9_-]+)\s*:\s*(.+?)\s*;", re.MULTILINE) + + +def _parse_scss_vars(content: str) -> dict[str, str]: + """Return {var_name: value} for every SCSS variable declaration in *content*.""" + return {m.group(1): m.group(2) for m in _SCSS_VAR_RE.finditer(content)} + def process_and_apply_css(app: Application): # Raise an error if sass is not found and exit the application @@ -21,27 +30,40 @@ def process_and_apply_css(app: Application): def copy_theme(path: Path): - """Function to get the system icon theme. + """Merge default theme variables with user overrides and write to theme.scss. - Args: - path (Path): path to theme + Variables present in the user theme file override the defaults; any + variable not defined by the user falls back to the value from + default_theme.scss. This guarantees that newly-introduced variables + (e.g. $privacy-dot-*) work correctly for existing themes that were + created before those variables existed. """ if path.stem == "default": path = cnst.DEFAULT_THEME_STYLE if not path.exists(): logger.warning( - f"Warning: The theme file '{path}' was not found.Using default theme." + f"Warning: The theme file '{path}' was not found. Using default theme." ) path = cnst.DEFAULT_THEME_STYLE try: - with open(path) as f: - content = f.read() + with open(cnst.DEFAULT_THEME_STYLE) as f: + default_vars = _parse_scss_vars(f.read()) + + user_vars: dict[str, str] = {} + if path != cnst.DEFAULT_THEME_STYLE: + with open(path) as f: + user_vars = _parse_scss_vars(f.read()) + + # User values take priority; missing keys fall back to defaults + merged = {**default_vars, **user_vars} with open(cnst.THEME_STYLE, "w") as f: - f.write(content) - logger.info(f"[THEME] '{path}' applied successfully.") + for name, value in merged.items(): + f.write(f"${name}: {value};\n") + + logger.info(f"[THEME] '{path}' applied successfully.") except FileNotFoundError: logger.error(f"Error: The theme file '{path}' was not found.") diff --git a/src/mewline/widgets/combined_controls.py b/src/mewline/widgets/combined_controls.py index 6bf1dc0..b402c0a 100644 --- a/src/mewline/widgets/combined_controls.py +++ b/src/mewline/widgets/combined_controls.py @@ -6,11 +6,13 @@ from fabric.widgets.revealer import Revealer from gi.repository import GLib from gi.repository import GObject +from gi.repository import Gtk from mewline import constants as cnst from mewline.config import cfg from mewline.services import audio_service from mewline.services import brightness_service +from mewline.services import privacy_service from mewline.shared.popover import Popover from mewline.shared.widget_container import ButtonWidget from mewline.utils.misc import convert_to_percent @@ -19,6 +21,11 @@ from mewline.utils.widget_utils import get_brightness_icon from mewline.utils.widget_utils import text_icon +# Opacity applied to inactive privacy dots (active dots use 1.0). +# The colour itself is controlled by $privacy-dot-{mic,cam,screen,loc} +# variables in the theme – see default_theme.scss. +_DOT_DIM_OPACITY = 0.12 + class CombinedControlsMenu(Popover): """Dropdown menu with sliders for speaker, microphone, brightness.""" @@ -33,22 +40,17 @@ def __init__(self, anchor_widget: GObject.GObject, osd_widget=None, **kwargs): self._updating_brightness_from_service = False self._updating_brightness = False - # Check if brightness control is available self.brightness_available = self._is_brightness_available() - # Sliders self.speaker_scale = create_scale(style_classes="cc-scale") self.mic_scale = create_scale(style_classes="cc-scale") if self.brightness_available: self.brightness_scale = create_scale(style_classes="cc-scale") - # Simple debounce helpers self._speaker_apply_src: int | None = None self._mic_apply_src: int | None = None self._brightness_apply_src: int | None = None - # Mute buttons as main icons - clickable buttons with nerd font icons - # Keep direct references to text icons for easy updates self.speaker_mute_icon = text_icon( get_audio_icon(self._get_speaker_volume(), self._get_speaker_muted()), size="16px", @@ -65,15 +67,16 @@ def __init__(self, anchor_widget: GObject.GObject, osd_widget=None, **kwargs): self.mic_mute_btn.children = self.mic_mute_icon self.mic_mute_btn.add_style_class("cc-mute-btn") - # Connect mute button clicks self.speaker_mute_btn.connect("clicked", self._on_speaker_mute_clicked) self.mic_mute_btn.connect("clicked", self._on_mic_mute_clicked) - # Percentage labels - self.speaker_label = text_icon("0%", size="14px") - self.mic_label = text_icon("0%", size="14px") + self.speaker_label = text_icon( + "0%", size="14px", style_classes="cc-percent-label" + ) + self.mic_label = text_icon("0%", size="14px", style_classes="cc-percent-label") + self.speaker_label.set_size_request(38, -1) + self.mic_label.set_size_request(38, -1) - # Build slider children list (mute buttons as main icons on the left) slider_children = [ Box( orientation="h", @@ -91,16 +94,26 @@ def __init__(self, anchor_widget: GObject.GObject, osd_widget=None, **kwargs): ), ] - # Add brightness only if available (no mute button for brightness) if self.brightness_available: - brightness_icon = text_icon(get_brightness_icon(self._get_brightness())) - self.brightness_label = text_icon("0%", size="14px") + brightness_icon = text_icon( + get_brightness_icon(self._get_brightness()), size="16px" + ) + brightness_icon_box = ButtonWidget() + brightness_icon_box.children = brightness_icon + brightness_icon_box.add_style_class("cc-mute-btn") + brightness_icon_box.set_can_focus(False) + + self.brightness_label = text_icon( + "0%", size="14px", style_classes="cc-percent-label" + ) + self.brightness_label.set_size_request(38, -1) + slider_children.append( Box( orientation="h", spacing=8, children=( - brightness_icon, + brightness_icon_box, self.brightness_scale, self.brightness_label, ), @@ -109,7 +122,7 @@ def __init__(self, anchor_widget: GObject.GObject, osd_widget=None, **kwargs): sliders_box = Box( orientation="v", - spacing=8, + spacing=12, style_classes="cc-menu", children=slider_children, all_visible=True, @@ -129,13 +142,11 @@ def __init__(self, anchor_widget: GObject.GObject, osd_widget=None, **kwargs): gap=2, ) - # Connect scale changes self.speaker_scale.connect("value-changed", self._on_speaker_changed) self.mic_scale.connect("value-changed", self._on_mic_changed) if self.brightness_available: self.brightness_scale.connect("value-changed", self._on_brightness_changed) - # Simple drag detection for immediate apply on release self.speaker_scale.connect( "button-release-event", self._on_scale_release, "speaker" ) @@ -145,23 +156,15 @@ def __init__(self, anchor_widget: GObject.GObject, osd_widget=None, **kwargs): "button-release-event", self._on_scale_release, "brightness" ) - # Listen to services to keep in sync self.audio.connect("notify::speaker", self._bind_speaker) self.audio.connect("notify::microphone", self._bind_microphone) if self.brightness_available: self.brightness.connect("screen", self._on_brightness_service) - # Do not continuously reposition on size-allocate to avoid jitter - - # Bind when available self._bind_speaker() self._bind_microphone() - - # Initial sync after labels exist self._sync_from_services() - # open/close inherited from Popover - def _get_speaker_volume(self) -> int: return round(self.audio.speaker.volume) if self.audio.speaker else 0 @@ -182,9 +185,7 @@ def _get_brightness(self) -> int: ) def _is_brightness_available(self) -> bool: - """Check if brightness control is available on this system.""" try: - # Try to get current brightness level return bool( hasattr(self.brightness, "max_brightness_level") and self.brightness.max_brightness_level > 0 @@ -195,33 +196,26 @@ def _is_brightness_available(self) -> bool: def _sync_from_services(self): sp = self._get_speaker_volume() mc = self._get_mic_volume() - # Initial sync without animation self.speaker_scale.set_value(sp) self.mic_scale.set_value(mc) self.speaker_label.set_text(f"{sp}%") self.mic_label.set_text(f"{mc}%") - if self.brightness_available: br = self._get_brightness() self.brightness_scale.set_value(br) self.brightness_label.set_text(f"{br}%") - - # Update mute button icons self._update_mute_buttons() def _on_speaker_changed(self, *_): if self.audio.speaker: - vol = self.speaker_scale.value - self.speaker_label.set_text(f"{int(vol)}%") - # Simple debounce + self.speaker_label.set_text(f"{int(self.speaker_scale.value)}%") if self._speaker_apply_src: GLib.source_remove(self._speaker_apply_src) self._speaker_apply_src = GLib.timeout_add(100, self._apply_speaker) def _on_mic_changed(self, *_): if self.audio.microphone: - vol = self.mic_scale.value - self.mic_label.set_text(f"{int(vol)}%") + self.mic_label.set_text(f"{int(self.mic_scale.value)}%") if self._mic_apply_src: GLib.source_remove(self._mic_apply_src) self._mic_apply_src = GLib.timeout_add(100, self._apply_mic) @@ -229,8 +223,7 @@ def _on_mic_changed(self, *_): def _on_brightness_changed(self, *_): if not self.brightness_available: return - val = self.brightness_scale.value - self.brightness_label.set_text(f"{int(val)}%") + self.brightness_label.set_text(f"{int(self.brightness_scale.value)}%") if self._brightness_apply_src: GLib.source_remove(self._brightness_apply_src) self._brightness_apply_src = GLib.timeout_add(80, self._apply_brightness) @@ -256,15 +249,19 @@ def _bind_microphone(self, *_): self._update_mic_from_service() def _on_brightness_service(self, *_): - if not self.brightness_available or self._brightness_apply_src or self._updating_brightness_from_service: - return # Don't update while we have a pending change + if ( + not self.brightness_available + or self._brightness_apply_src + or self._updating_brightness_from_service + ): + return val = self._get_brightness() self.brightness_scale.set_value(val) self.brightness_label.set_text(f"{int(val)}%") def _update_speaker_from_service(self, *_): if self._speaker_apply_src: - return # Don't update while we have a pending change + return sp = self._get_speaker_volume() self.speaker_scale.set_value(sp) self.speaker_label.set_text(f"{sp}%") @@ -272,50 +269,37 @@ def _update_speaker_from_service(self, *_): def _update_mic_from_service(self, *_): if self._mic_apply_src: - return # Don't update while we have a pending change + return mc = self._get_mic_volume() self.mic_scale.set_value(mc) self.mic_label.set_text(f"{mc}%") self._update_mute_buttons() def _update_mute_buttons(self): - """Update mute button icons based on current mute state.""" if self.audio.speaker: - vol = self._get_speaker_volume() - is_muted = self._get_speaker_muted() - icon = get_audio_icon(vol, is_muted) - # Use direct reference to text icon widget - self.speaker_mute_icon.set_text(icon) - + self.speaker_mute_icon.set_text( + get_audio_icon(self._get_speaker_volume(), self._get_speaker_muted()) + ) if self.audio.microphone: - is_muted = self._get_mic_muted() - icon = cnst.icons["microphone"]["muted" if is_muted else "active"] - # Use direct reference to text icon widget - self.mic_mute_icon.set_text(icon) + self.mic_mute_icon.set_text( + cnst.icons["microphone"]["muted" if self._get_mic_muted() else "active"] + ) def _on_speaker_mute_clicked(self, *_): - """Toggle speaker mute state.""" if self.audio.speaker: - current_muted = self._get_speaker_muted() - self.audio.speaker.set_muted(not current_muted) + self.audio.speaker.set_muted(not self._get_speaker_muted()) self._update_mute_buttons() - # Trigger OSD for speaker if self.osd_widget: self.osd_widget.show_audio_speaker() def _on_mic_mute_clicked(self, *_): - """Toggle microphone mute state.""" if self.audio.microphone: - current_muted = self._get_mic_muted() - self.audio.microphone.set_muted(not current_muted) + self.audio.microphone.set_muted(not self._get_mic_muted()) self._update_mute_buttons() - # Trigger OSD for microphone if self.osd_widget: self.osd_widget.show_audio_microphone() - # Apply helpers def _on_scale_release(self, widget, event, which): - # Immediate apply on mouse release if which == "speaker": self._apply_speaker() elif which == "mic": @@ -326,10 +310,8 @@ def _on_scale_release(self, widget, event, which): def _apply_speaker(self): if self.audio.speaker: - vol = int(self.speaker_scale.value) - vol = max(0, min(100, vol)) + vol = max(0, min(100, int(self.speaker_scale.value))) self.audio.speaker.set_volume(vol) - # Trigger OSD for speaker if self.osd_widget: self.osd_widget.show_audio_speaker() self._speaker_apply_src = None @@ -337,10 +319,8 @@ def _apply_speaker(self): def _apply_mic(self): if self.audio.microphone: - vol = int(self.mic_scale.value) - vol = max(0, min(100, vol)) + vol = max(0, min(100, int(self.mic_scale.value))) self.audio.microphone.set_volume(vol) - # Trigger OSD for microphone if self.osd_widget: self.osd_widget.show_audio_microphone() self._mic_apply_src = None @@ -349,19 +329,14 @@ def _apply_mic(self): def _apply_brightness(self): if not self.brightness_available: return False - self._updating_brightness_from_service = True - val = int(self.brightness_scale.value) - val = max(0, min(100, val)) - # translate percent back to raw units + val = max(0, min(100, int(self.brightness_scale.value))) target = int((val / 100.0) * self.brightness.max_brightness_level) self.brightness.screen_brightness = target - # Trigger OSD for brightness if self.osd_widget: self.osd_widget.show_brightness() self._brightness_apply_src = None - - GLib.timeout_add(100, self.unblock_service_updates) + GLib.timeout_add(100, self._unblock_service_updates) return False def _unblock_service_updates(self): @@ -370,35 +345,99 @@ def _unblock_service_updates(self): class CombinedControlsButton(Overlay): - """Capsule showing speaker, mic, brightness icons; toggles CombinedControlsMenu. - - Also supports mouse wheel scrolling to adjust speaker volume and show OSD. - """ + """Capsule showing speaker, mic, brightness icons + 2x2 privacy dots.""" def __init__(self, **kwargs): super().__init__(name="combined-controls", **kwargs) self.audio = audio_service self.brightness = brightness_service + self.privacy = privacy_service self.menu: CombinedControlsMenu | None = None - self.osd_widget = None # Will be set from outside + self.osd_widget = None - # Scroll debouncing self._scroll_debounce_src: int | None = None self._pending_scroll_updates = {} + self._updating_brightness = False + # ── Main icons ────────────────────────────────────────────────── self.icon_speaker = text_icon(get_audio_icon(0, False)) - self.icon_mic = text_icon(cnst.icons["microphone"]["active"]) # simplified + self.icon_mic = text_icon(cnst.icons["microphone"]["active"]) self.icon_brightness = text_icon(get_brightness_icon(self._get_brightness())) - # Initial icon syncs + for icon in (self.icon_speaker, self.icon_mic, self.icon_brightness): + icon.set_size_request(24, -1) + icon.set_halign(Gtk.Align.CENTER) + self._sync_icons() - # Connect services to update icons self.audio.connect("notify::speaker", self._bind_speaker) self.audio.connect("notify::microphone", self._bind_microphone) self.brightness.connect("screen", self._on_brightness_changed) - # Create separate EventBoxes for each icon to handle scroll events individually + # ── Privacy dots (2x2 grid = one icon slot) ──────────────────── + # Colours are defined by $privacy-dot-{mic,cam,screen,loc} in the + # theme. When inactive the dot is dimmed via _DOT_DIM_OPACITY so + # the accent colour still shows through subtly. + self.dot_mic = text_icon( + "●", size="10px", style_classes="privacy-dot-mic" + ) + self.dot_cam = text_icon( + "●", size="10px", style_classes="privacy-dot-cam" + ) + self.dot_screen = text_icon( + "●", size="10px", style_classes="privacy-dot-screen" + ) + self.dot_loc = text_icon( + "●", size="10px", style_classes="privacy-dot-loc" + ) + + for dot in (self.dot_mic, self.dot_cam, self.dot_screen, self.dot_loc): + dot.set_opacity(_DOT_DIM_OPACITY) + + dots_top = Box( + orientation="h", + spacing=3, + children=[self.dot_mic, self.dot_cam], + h_align="center", + v_align="center", + style="margin-bottom: -2px;", + ) + dots_bot = Box( + orientation="h", + spacing=3, + children=[self.dot_screen, self.dot_loc], + h_align="center", + v_align="center", + style="margin-top: -2px;", + ) + + dots_grid = Box( + orientation="v", + spacing=0, + children=[dots_top, dots_bot], + h_align="center", + v_align="center", + ) + dots_grid.set_size_request(24, -1) + + self.privacy_event_box = EventBox(child=dots_grid) + self.privacy_event_box.set_valign(Gtk.Align.CENTER) + self.privacy_event_box.set_halign(Gtk.Align.CENTER) + self.privacy_event_box.set_has_tooltip(True) + self.privacy_event_box.connect("query-tooltip", self._on_privacy_tooltip) + + # Connect to privacy service signals + for sig in ( + "notify::mic-active", + "notify::cam-active", + "notify::screen-active", + "notify::loc-active", + ): + self.privacy.connect(sig, self._update_privacy_dots) + + self._update_privacy_dots() + + # ── EventBoxes for scroll ─────────────────────────────────────── self.speaker_event_box = EventBox( child=self.icon_speaker, events=["scroll", "smooth-scroll"] ) @@ -409,7 +448,6 @@ def __init__(self, **kwargs): ) self.mic_event_box.connect("scroll-event", self._on_scroll_mic) - # Layout children with EventBoxes capsule_children = [self.speaker_event_box, self.mic_event_box] if self._is_brightness_available(): @@ -421,21 +459,51 @@ def __init__(self, **kwargs): ) capsule_children.append(self.brightness_event_box) + capsule_children.append(self.privacy_event_box) + inner = Box( orientation="h", - spacing=15, + spacing=4, children=capsule_children, style_classes="panel-box", ) - # Wrap everything in an EventBox to handle clicks on entire capsule self.main_event_box = EventBox(child=inner, events=["button-press-event"]) self.main_event_box.connect("button-press-event", self._on_clicked) - self.add(self.main_event_box) + # ── Privacy dots ─────────────────────────────────────────────────── + + def _update_privacy_dots(self, *_): + self.dot_mic.set_opacity(1.0 if self.privacy.mic_active else _DOT_DIM_OPACITY) + self.dot_cam.set_opacity(1.0 if self.privacy.cam_active else _DOT_DIM_OPACITY) + self.dot_screen.set_opacity( + 1.0 if self.privacy.screen_active else _DOT_DIM_OPACITY + ) + self.dot_loc.set_opacity(1.0 if self.privacy.loc_active else _DOT_DIM_OPACITY) + + def _on_privacy_tooltip(self, widget, x, y, keyboard_mode, tooltip): + lines = [] + if self.privacy.mic_active: + apps = ", ".join(self.privacy.mic_apps) or "unknown" + lines.append(f"󰍬 Mic: {apps}") + if self.privacy.cam_active: + apps = ", ".join(self.privacy.cam_apps) or "unknown" + lines.append(f"󰄀 Camera: {apps}") + if self.privacy.screen_active: + apps = ", ".join(self.privacy.screen_apps) or "unknown" + lines.append(f"󰹑 Screen: {apps}") + if self.privacy.loc_active: + apps = ", ".join(self.privacy.loc_apps) or "unknown" + lines.append(f" Location: {apps}") + if not lines: + lines = ["No active privacy accesses"] + tooltip.set_text("\n".join(lines)) + return True + + # ── General ──────────────────────────────────────────────────────── + def set_osd_widget(self, osd_widget): - """Set reference to OSD widget for triggering display.""" self.osd_widget = osd_widget def _on_clicked(self, *_): @@ -449,71 +517,49 @@ def _on_clicked(self, *_): self.menu.open() def _on_scroll_speaker(self, widget, event): - """Handle mouse wheel scroll to adjust speaker volume and trigger OSD.""" if not self.audio.speaker: return False - step = 5 - val_y = event.delta_y - - if val_y < 0: # scroll up - new_vol = min(100, self.audio.speaker.volume + step) - else: # scroll down - new_vol = max(0, self.audio.speaker.volume - step) - - # Store pending update and debounce + new_vol = ( + min(100, self.audio.speaker.volume + step) + if event.delta_y < 0 + else max(0, self.audio.speaker.volume - step) + ) self._pending_scroll_updates["speaker"] = new_vol self._debounce_scroll_updates() - return True def _on_scroll_mic(self, widget, event): - """Handle mouse wheel scroll to adjust microphone volume and trigger OSD.""" if not self.audio.microphone: return False - step = 5 - val_y = event.delta_y - - if val_y < 0: # scroll up - new_vol = min(100, self.audio.microphone.volume + step) - else: # scroll down - new_vol = max(0, self.audio.microphone.volume - step) - - # Store pending update and debounce + new_vol = ( + min(100, self.audio.microphone.volume + step) + if event.delta_y < 0 + else max(0, self.audio.microphone.volume - step) + ) self._pending_scroll_updates["microphone"] = new_vol self._debounce_scroll_updates() - return True def _on_scroll_brightness(self, widget, event): - """Handle mouse wheel scroll to adjust brightness and trigger OSD.""" if not self._is_brightness_available(): return False - step = 5 - val_y = event.delta_y - current = self._get_brightness() - if val_y < 0: # scroll up # noqa: SIM108 - new_val = min(100, current + step) - else: # scroll down - new_val = max(0, current - step) - - # Store pending update and debounce + new_val = ( + min(100, current + step) if event.delta_y < 0 else max(0, current - step) + ) self._pending_scroll_updates["brightness"] = new_val self._debounce_scroll_updates() - return True def _debounce_scroll_updates(self): - """Debounce scroll updates to prevent jitter.""" if self._scroll_debounce_src: GLib.source_remove(self._scroll_debounce_src) self._scroll_debounce_src = GLib.timeout_add(50, self._apply_scroll_updates) def _apply_scroll_updates(self): - """Apply pending scroll updates.""" for device, value in self._pending_scroll_updates.items(): if device == "speaker" and self.audio.speaker: self.audio.speaker.set_volume(value) @@ -529,9 +575,9 @@ def _apply_scroll_updates(self): self.brightness.screen_brightness = target if self.osd_widget: self.osd_widget.show_brightness() - - GLib.timeout_add(100, lambda: setattr(self, '_updating_brightness', False)) - + GLib.timeout_add( + 100, lambda: setattr(self, "_updating_brightness", False) + ) self._pending_scroll_updates.clear() self._scroll_debounce_src = None return False @@ -542,9 +588,7 @@ def _get_brightness(self) -> int: ) def _is_brightness_available(self) -> bool: - """Check if brightness control is available on this system.""" try: - # Try to get current brightness level return bool( hasattr(self.brightness, "max_brightness_level") and self.brightness.max_brightness_level > 0 @@ -554,14 +598,17 @@ def _is_brightness_available(self) -> bool: def _sync_icons(self): if self.audio.speaker: - vol = round(self.audio.speaker.volume) - self.icon_speaker.set_text(get_audio_icon(vol, self.audio.speaker.muted)) - + self.icon_speaker.set_text( + get_audio_icon( + round(self.audio.speaker.volume), self.audio.speaker.muted + ) + ) if self.audio.microphone: - is_muted = self.audio.microphone.muted - icon = cnst.icons["microphone"]["muted" if is_muted else "active"] - self.icon_mic.set_text(icon) - + self.icon_mic.set_text( + cnst.icons["microphone"][ + "muted" if self.audio.microphone.muted else "active" + ] + ) if self._is_brightness_available(): self.icon_brightness.set_text(get_brightness_icon(self._get_brightness())) @@ -583,11 +630,16 @@ def _on_brightness_changed(self, *_): def _update_speaker_icon(self, *_): if self.audio.speaker: - vol = round(self.audio.speaker.volume) - self.icon_speaker.set_text(get_audio_icon(vol, self.audio.speaker.muted)) + self.icon_speaker.set_text( + get_audio_icon( + round(self.audio.speaker.volume), self.audio.speaker.muted + ) + ) def _update_mic_icon(self, *_): if self.audio.microphone: - is_muted = self.audio.microphone.muted - icon = cnst.icons["microphone"]["muted" if is_muted else "active"] - self.icon_mic.set_text(icon) + self.icon_mic.set_text( + cnst.icons["microphone"][ + "muted" if self.audio.microphone.muted else "active" + ] + )