-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathMPRISController.py
More file actions
192 lines (166 loc) · 8.3 KB
/
MPRISController.py
File metadata and controls
192 lines (166 loc) · 8.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import json
import dbus_next
from dbus_next.introspection import Interface, Node
from dbus_next.aio.proxy_object import ProxyInterface, ProxyObject
import platform
import asyncio
import sys
from threading import Thread, Event
from typing import Any, List, Optional, cast, override
from .MediaControllerTypes import MediaPlaybackStateInner, default_media_playback_state, MediaControllerBase
from lib.Logger import log
if platform.system() == "Linux":
from dbus_next.aio.message_bus import MessageBus
from dbus_next.constants import BusType
else:
MessageBus = None
import time
class MPRISController(MediaControllerBase):
def __init__(self):
super().__init__()
if MessageBus is None:
log('error', 'MPRISController requires dbus-next, which is not available on this platform.')
raise NotImplementedError("MPRISController is not implemented for this platform.")
self._loop: asyncio.AbstractEventLoop = asyncio.new_event_loop()
self._stop_event: Event = Event()
self._last_state: Optional[MediaPlaybackStateInner] = None
self._player_iface: ProxyInterface | None = None
self._poll_thread: Thread = Thread(target=self._run, daemon=True)
self._poll_thread.start()
def _run(self):
asyncio.set_event_loop(self._loop)
self._loop.run_until_complete(self._init_player())
while not self._stop_event.is_set():
self._loop.run_until_complete(self._poll())
time.sleep(1)
async def _init_player(self):
if not MessageBus:
log('error', 'MPRISController requires dbus-next, which is not available on this platform.')
raise NotImplementedError("MPRISController is not implemented for this platform.")
self._bus = await MessageBus().connect()
names = await self._list_names()
mpris_names = [name for name in names if name.startswith("org.mpris.MediaPlayer2.")]
log('debug', f'MPRIS names found: {mpris_names}')
for name in mpris_names:
try:
introspection: Node = await self._bus.introspect(name, "/org/mpris/MediaPlayer2")
proxy_obj: ProxyObject = self._bus.get_proxy_object(name, "/org/mpris/MediaPlayer2", introspection)
player_iface: ProxyInterface = proxy_obj.get_interface("org.mpris.MediaPlayer2.Player")
status: str = await player_iface.get_playback_status()
log('debug', f'Player {name} status: {status}')
if status == "Playing":
self._player_iface = player_iface
return
except Exception:
log('debug', f'Failed to introspect or get player interface for {name}, continuing...')
log('debug', f'Exception details: {sys.exc_info()[1]}')
continue
# fallback to first player found
if mpris_names:
log('debug', 'No active MPRIS player found, using the first available player.')
introspection = await self._bus.introspect(mpris_names[0], "/org/mpris/MediaPlayer2")
proxy_obj = self._bus.get_proxy_object(mpris_names[0], "/org/mpris/MediaPlayer2", introspection)
self._player_iface = proxy_obj.get_interface("org.mpris.MediaPlayer2.Player")
async def _list_names(self) -> list[str]:
introspection = await self._bus.introspect("org.freedesktop.DBus", "/org/freedesktop/DBus")
proxy = self._bus.get_proxy_object("org.freedesktop.DBus", "/org/freedesktop/DBus", introspection)
iface = proxy.get_interface("org.freedesktop.DBus")
return await iface.call_list_names()
async def _poll(self):
if not self._player_iface:
return
try:
state = await self._get_media_playback_state()
if state != self._last_state:
log('debug', f'Media playback state changed: {state}')
self._last_state = state
if self.on_media_playback_info_changed:
self.on_media_playback_info_changed(state)
except Exception:
log('error', 'Error while polling media playback state')
log('debug', f'Exception details: {sys.exc_info()[1]}')
pass
async def _get_media_playback_state(self) -> MediaPlaybackStateInner:
if not self._player_iface:
log('debug', 'No player interface available, returning default state')
return default_media_playback_state()
try:
metadata = await self._player_iface.get_metadata()
playback_status = await self._player_iface.get_playback_status()
# Fix the Shuffle property, since VLC is being stupid.
introspection: Node = self._player_iface.introspection
for prop in introspection.properties:
# log('debug', vars(prop))
if prop.name == "Shuffle" and prop.signature == "d":
# log('debug', f'Found Incorrect Shuffle property: {prop.name} with type {prop.signature}. Fixing to boolean.')
# Fix the type of Shuffle property to boolean
prop.signature = "b"
shuffle = cast(bool, await self._player_iface.get_shuffle()) if hasattr(self._player_iface, 'get_shuffle') else None
# if hasattr(self._player_iface, 'get_shuffle'):
# log('debug', f'get_shuffle: {shuffle}')
loop_status = await self._player_iface.get_loop_status() if hasattr(self._player_iface, 'get_loop_status') else None
# shuffle = False
# loop_status = None
artists_list = cast(list[str], (cast(dbus_next.signature.Variant, metadata.get("xesam:artist")) or {"value":None}).value)
# Concatenate artists
artists = ', '.join(artists_list) if artists_list else None
return {
"artist": artists,
"subtitle": cast(str, (cast(dbus_next.signature.Variant, metadata.get("xesam:album")) or {"value":None}).value),
"title": cast(str, (cast(dbus_next.signature.Variant, metadata.get("xesam:title")) or {"value":None}).value),
"is_shuffle_active": bool(shuffle) if shuffle is not None else None,
"auto_repeat_mode": loop_status,
"playback_status": playback_status
}
except Exception:
log('error', 'Error getting media playback state')
log('debug', f'Exception details: {sys.exc_info()[1]}')
# Print exception location for debugging
log('debug', f'Exception location: {sys.exc_info()[2].tb_frame.f_code.co_filename}:{sys.exc_info()[2].tb_lineno}')
return default_media_playback_state()
# async def _safe_get_property(self, prop):
# try:
# return await self._player_iface.get_property(prop)
# except Exception:
# return None
@staticmethod
def _run_coroutine_in_loop(loop: asyncio.AbstractEventLoop, coro):
asyncio.run_coroutine_threadsafe(coro, loop)
@override
def play(self) -> bool:
if self._player_iface:
self._run_coroutine_in_loop(self._loop, self._player_iface.call_play())
return True
return False
@override
def pause(self) -> bool:
if self._player_iface:
self._run_coroutine_in_loop(self._loop, self._player_iface.call_pause())
return True
return False
@override
def stop(self) -> bool:
if self._player_iface:
self._run_coroutine_in_loop(self._loop, self._player_iface.call_stop())
return True
return False
@override
def prev_track(self) -> bool:
if self._player_iface:
self._run_coroutine_in_loop(self._loop, self._player_iface.call_previous())
return True
return False
@override
def next_track(self) -> bool:
if self._player_iface:
self._run_coroutine_in_loop(self._loop, self._player_iface.call_next())
return True
return False
@override
def get_media_playback_state(self) -> MediaPlaybackStateInner:
return self._last_state or default_media_playback_state()
@override
def cleanup(self):
self._stop_event.set()
self._poll_thread.join(timeout=2)
self._loop.stop()