|
2 | 2 | # License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net> |
3 | 3 |
|
4 | 4 |
|
| 5 | +import os |
5 | 6 | from collections import deque |
6 | 7 | from contextlib import suppress |
7 | 8 | from types import TracebackType |
8 | 9 | from typing import TYPE_CHECKING, Any, Callable, ContextManager, Deque, Dict, NamedTuple, Optional, Sequence, Type, Union, cast |
9 | 10 |
|
10 | | -from kitty.fast_data_types import monotonic |
| 11 | +from kitty.constants import kitten_exe, running_in_kitty |
| 12 | +from kitty.fast_data_types import monotonic, safe_pipe |
11 | 13 | from kitty.types import DecoratedFunc, ParsedShortcut |
12 | 14 | from kitty.typing import ( |
13 | 15 | AbstractEventLoop, |
@@ -47,6 +49,98 @@ def is_click(a: ButtonEvent, b: ButtonEvent) -> bool: |
47 | 49 | return x*x + y*y <= 4 |
48 | 50 |
|
49 | 51 |
|
| 52 | +class KittenUI: |
| 53 | + allow_remote_control: bool = False |
| 54 | + remote_control_password: bool | str = False |
| 55 | + |
| 56 | + def __init__(self, func: Callable[[list[str]], str], allow_remote_control: bool, remote_control_password: bool | str): |
| 57 | + self.func = func |
| 58 | + self.allow_remote_control = allow_remote_control |
| 59 | + self.remote_control_password = remote_control_password |
| 60 | + self.password = self.to = '' |
| 61 | + self.rc_fd = -1 |
| 62 | + self.initialized = False |
| 63 | + |
| 64 | + def initialize(self) -> None: |
| 65 | + if self.initialized: |
| 66 | + return |
| 67 | + self.initialized = True |
| 68 | + if running_in_kitty(): |
| 69 | + return |
| 70 | + if self.allow_remote_control: |
| 71 | + self.to = os.environ.get('KITTY_LISTEN_ON', '') |
| 72 | + self.rc_fd = int(self.to.partition(':')[-1]) |
| 73 | + os.set_inheritable(self.rc_fd, False) |
| 74 | + if (self.remote_control_password or self.remote_control_password == '') and not self.password: |
| 75 | + import socket |
| 76 | + with socket.fromfd(self.rc_fd, socket.AF_UNIX, socket.SOCK_STREAM) as s: |
| 77 | + data = s.recv(256) |
| 78 | + if not data.endswith(b'\n'): |
| 79 | + raise Exception(f'The remote control password was invalid: {data!r}') |
| 80 | + self.password = data.strip().decode() |
| 81 | + |
| 82 | + def __call__(self, args: list[str]) -> str: |
| 83 | + self.initialize() |
| 84 | + return self.func(args) |
| 85 | + |
| 86 | + def allow_indiscriminate_remote_control(self, enable: bool = True) -> None: |
| 87 | + if self.rc_fd > -1: |
| 88 | + if enable: |
| 89 | + os.set_inheritable(self.rc_fd, True) |
| 90 | + if self.password: |
| 91 | + os.environ['KITTY_RC_PASSWORD'] = self.password |
| 92 | + else: |
| 93 | + os.set_inheritable(self.rc_fd, False) |
| 94 | + if self.password: |
| 95 | + os.environ.pop('KITTY_RC_PASSWORD', None) |
| 96 | + |
| 97 | + def remote_control(self, cmd: str | Sequence[str], **kw: Any) -> Any: |
| 98 | + if not self.allow_remote_control: |
| 99 | + raise ValueError('Remote control is not enabled, remember to use allow_remote_control=True') |
| 100 | + prefix = [kitten_exe(), '@'] |
| 101 | + r = -1 |
| 102 | + pass_fds = list(kw.get('pass_fds') or ()) |
| 103 | + try: |
| 104 | + if self.rc_fd > -1: |
| 105 | + pass_fds.append(self.rc_fd) |
| 106 | + if self.password and self.rc_fd > -1: |
| 107 | + r, w = safe_pipe(False) |
| 108 | + os.write(w, self.password.encode()) |
| 109 | + os.close(w) |
| 110 | + prefix += ['--password-file', f'fd:{r}', '--use-password', 'always'] |
| 111 | + pass_fds.append(r) |
| 112 | + if pass_fds: |
| 113 | + kw['pass_fds'] = tuple(pass_fds) |
| 114 | + if isinstance(cmd, str): |
| 115 | + cmd = ' '.join(prefix) |
| 116 | + else: |
| 117 | + cmd = prefix + list(cmd) |
| 118 | + import subprocess |
| 119 | + if self.rc_fd > -1: |
| 120 | + is_inheritable = os.get_inheritable(self.rc_fd) |
| 121 | + if not is_inheritable: |
| 122 | + os.set_inheritable(self.rc_fd, True) |
| 123 | + try: |
| 124 | + return subprocess.run(cmd, **kw) |
| 125 | + finally: |
| 126 | + if self.rc_fd > -1 and not is_inheritable: |
| 127 | + os.set_inheritable(self.rc_fd, False) |
| 128 | + finally: |
| 129 | + if r > -1: |
| 130 | + os.close(r) |
| 131 | + |
| 132 | + |
| 133 | +def kitten_ui( |
| 134 | + allow_remote_control: bool = KittenUI.allow_remote_control, |
| 135 | + remote_control_password: bool | str = KittenUI.allow_remote_control, |
| 136 | +) -> Callable[[Callable[[list[str]], str]], KittenUI]: |
| 137 | + |
| 138 | + def wrapper(impl: Callable[..., Any]) -> KittenUI: |
| 139 | + return KittenUI(impl, allow_remote_control, remote_control_password) |
| 140 | + |
| 141 | + return wrapper |
| 142 | + |
| 143 | + |
50 | 144 | class Handler: |
51 | 145 |
|
52 | 146 | image_manager_class: Optional[Type[ImageManagerType]] = None |
|
0 commit comments