
According to https://www.freedesktop.org/wiki/Specifications/mpris-spec/metadata artUrl must use scheme 'file://' for local files. Fixes the issue for me in Gnome
456 lines
16 KiB
Python
456 lines
16 KiB
Python
import functools
|
|
import logging
|
|
import re
|
|
from collections import defaultdict
|
|
from datetime import timedelta
|
|
from typing import Any, Callable, DefaultDict, Dict, List, Match, Optional, Tuple
|
|
|
|
from deepdiff import DeepDiff
|
|
from gi.repository import Gio, GLib
|
|
|
|
from ..adapters import AdapterManager, CacheMissError
|
|
from ..config import AppConfiguration
|
|
from ..players import PlayerManager
|
|
from ..ui.state import RepeatType
|
|
from ..util import resolve_path
|
|
|
|
|
|
def dbus_propagate(param_self: Any = None) -> Callable:
|
|
"""Wraps a function which causes changes to DBus properties."""
|
|
|
|
def decorator(function: Callable) -> Callable:
|
|
@functools.wraps(function)
|
|
def wrapper(*args):
|
|
function(*args)
|
|
if (param_self or args[0]).dbus_manager:
|
|
(param_self or args[0]).dbus_manager.property_diff()
|
|
|
|
return wrapper
|
|
|
|
return decorator
|
|
|
|
|
|
class DBusManager:
|
|
second_microsecond_conversion = 1000000
|
|
|
|
current_state: Dict = {}
|
|
|
|
def __init__(
|
|
self,
|
|
connection: Gio.DBusConnection,
|
|
do_on_method_call: Callable[
|
|
[
|
|
Gio.DBusConnection,
|
|
str,
|
|
str,
|
|
str,
|
|
str,
|
|
GLib.Variant,
|
|
Gio.DBusMethodInvocation,
|
|
],
|
|
None,
|
|
],
|
|
on_set_property: Callable[
|
|
[Gio.DBusConnection, str, str, str, str, GLib.Variant], None
|
|
],
|
|
get_config_and_player_manager: Callable[
|
|
[], Tuple[AppConfiguration, Optional[PlayerManager]]
|
|
],
|
|
):
|
|
self.get_config_and_player_manager = get_config_and_player_manager
|
|
self.do_on_method_call = do_on_method_call
|
|
self.on_set_property = on_set_property
|
|
self.connection = connection
|
|
|
|
def dbus_name_acquired(connection: Gio.DBusConnection, name: str):
|
|
specs = [
|
|
"org.mpris.MediaPlayer2.xml",
|
|
"org.mpris.MediaPlayer2.Player.xml",
|
|
"org.mpris.MediaPlayer2.Playlists.xml",
|
|
"org.mpris.MediaPlayer2.TrackList.xml",
|
|
]
|
|
for spec in specs:
|
|
spec_path = resolve_path("dbus/mpris_specs", spec)
|
|
with open(spec_path) as f:
|
|
node_info = Gio.DBusNodeInfo.new_for_xml(f.read())
|
|
|
|
connection.register_object(
|
|
"/org/mpris/MediaPlayer2",
|
|
node_info.interfaces[0],
|
|
self.on_method_call,
|
|
self.on_get_property,
|
|
self.on_set_property,
|
|
)
|
|
|
|
# TODO (#127): I have no idea what to do here.
|
|
def dbus_name_lost(*args):
|
|
pass
|
|
|
|
self.bus_number = Gio.bus_own_name_on_connection(
|
|
connection,
|
|
"org.mpris.MediaPlayer2.sublimemusic",
|
|
Gio.BusNameOwnerFlags.NONE,
|
|
dbus_name_acquired,
|
|
dbus_name_lost,
|
|
)
|
|
|
|
def shutdown(self):
|
|
logging.info("DBusManager is shutting down.")
|
|
self.property_diff()
|
|
Gio.bus_unown_name(self.bus_number)
|
|
|
|
def on_get_property(
|
|
self,
|
|
connection: Gio.DBusConnection,
|
|
sender: str,
|
|
path: str,
|
|
interface: str,
|
|
property_name: str,
|
|
) -> GLib.Variant:
|
|
value = self.property_dict().get(interface, {}).get(property_name)
|
|
return DBusManager.to_variant(value)
|
|
|
|
def on_method_call(
|
|
self,
|
|
connection: Gio.DBusConnection,
|
|
sender: str,
|
|
path: str,
|
|
interface: str,
|
|
method: str,
|
|
params: GLib.Variant,
|
|
invocation: Gio.DBusMethodInvocation,
|
|
):
|
|
# TODO (#127): I don't really know if this works.
|
|
if interface == "org.freedesktop.DBus.Properties":
|
|
if method == "Get":
|
|
invocation.return_value(
|
|
self.on_get_property(connection, sender, path, interface, *params)
|
|
)
|
|
elif method == "Set":
|
|
self.on_set_property(connection, sender, path, interface, *params)
|
|
elif method == "GetAll":
|
|
all_properties = {
|
|
k: DBusManager.to_variant(v)
|
|
for k, v in self.property_dict()[interface].items()
|
|
}
|
|
invocation.return_value(GLib.Variant("(a{sv})", (all_properties,)))
|
|
|
|
return
|
|
self.do_on_method_call(
|
|
connection,
|
|
sender,
|
|
path,
|
|
interface,
|
|
method,
|
|
params,
|
|
invocation,
|
|
)
|
|
|
|
@staticmethod
|
|
def to_variant(value: Any) -> GLib.Variant:
|
|
if callable(value):
|
|
return DBusManager.to_variant(value())
|
|
|
|
if isinstance(value, GLib.Variant):
|
|
return value
|
|
|
|
if type(value) == tuple:
|
|
return GLib.Variant(*value)
|
|
|
|
if type(value) == dict:
|
|
return GLib.Variant(
|
|
"a{sv}",
|
|
{k: DBusManager.to_variant(v) for k, v in value.items()},
|
|
)
|
|
|
|
variant_type = {list: "as", str: "s", int: "i", float: "d", bool: "b"}.get(
|
|
type(value)
|
|
)
|
|
if not variant_type:
|
|
return value
|
|
return GLib.Variant(variant_type, value)
|
|
|
|
_escape_re = re.compile(r"[^\w]+")
|
|
|
|
@staticmethod
|
|
@functools.lru_cache(maxsize=1024)
|
|
def _escape_id(id: str) -> str:
|
|
"""
|
|
Escapes an ID for use in a DBus object identifier.
|
|
|
|
>>> DBusManager._escape_id("tr-1843")
|
|
'tr_0x45_1843'
|
|
>>> DBusManager._escape_id("bc9c7726-8739-4add-8df0-88c6233f37df")
|
|
'bc9c7726_0x45_8739_0x45_4add_0x45_8df0_0x45_88c6233f37df'
|
|
>>> DBusManager._escape_id("spaces spaces everywhere")
|
|
'spaces_0x32_spaces_0x32_everywhere'
|
|
>>> DBusManager._escape_id("already/has/slashes")
|
|
'already_0x47_has_0x47_slashes'
|
|
"""
|
|
|
|
def replace(m: Match[str]) -> str:
|
|
return f"_0x{ord(m[0])}_"
|
|
|
|
return DBusManager._escape_re.sub(replace, id)
|
|
|
|
def property_dict(self) -> Dict[str, Any]:
|
|
config, player_manager = self.get_config_and_player_manager()
|
|
if config is None or player_manager is None:
|
|
return {}
|
|
|
|
state = config.state
|
|
has_current_song = state.current_song is not None
|
|
has_next_song = False
|
|
if state.repeat_type in (RepeatType.REPEAT_QUEUE, RepeatType.REPEAT_SONG):
|
|
has_next_song = True
|
|
elif has_current_song:
|
|
has_next_song = state.current_song_index < len(state.play_queue) - 1
|
|
|
|
active_playlist = self.get_active_playlist(state.active_playlist_id)
|
|
|
|
playlist_count = 0
|
|
try:
|
|
get_playlists_result = AdapterManager.get_playlists(allow_download=False)
|
|
if get_playlists_result.data_is_available:
|
|
playlist_count = len(get_playlists_result.result())
|
|
except Exception:
|
|
pass
|
|
|
|
return {
|
|
"org.mpris.MediaPlayer2": {
|
|
"CanQuit": True,
|
|
"CanRaise": True,
|
|
"HasTrackList": True,
|
|
"Identity": "Sublime Music",
|
|
"DesktopEntry": "sublime-music",
|
|
"SupportedUriSchemes": [],
|
|
"SupportedMimeTypes": [],
|
|
},
|
|
"org.mpris.MediaPlayer2.Player": {
|
|
"PlaybackStatus": {
|
|
(False, False): "Stopped",
|
|
(False, True): "Stopped",
|
|
(True, False): "Paused",
|
|
(True, True): "Playing",
|
|
}[player_manager.song_loaded, state.playing],
|
|
"LoopStatus": state.repeat_type.as_mpris_loop_status(),
|
|
"Rate": 1.0,
|
|
"Shuffle": state.shuffle_on,
|
|
"Metadata": self.get_mpris_metadata(
|
|
state.current_song_index,
|
|
state.play_queue,
|
|
)
|
|
if state.current_song
|
|
else {},
|
|
"Volume": 0.0 if state.is_muted else state.volume / 100,
|
|
"Position": (
|
|
"x",
|
|
int(
|
|
max(state.song_progress.total_seconds(), 0)
|
|
* self.second_microsecond_conversion
|
|
),
|
|
),
|
|
"MinimumRate": 1.0,
|
|
"MaximumRate": 1.0,
|
|
"CanGoNext": has_current_song and has_next_song,
|
|
"CanGoPrevious": has_current_song,
|
|
"CanPlay": True,
|
|
"CanPause": True,
|
|
"CanSeek": True,
|
|
"CanControl": True,
|
|
},
|
|
"org.mpris.MediaPlayer2.TrackList": {
|
|
"Tracks": DBusManager.get_dbus_playlist(state.play_queue),
|
|
"CanEditTracks": False,
|
|
},
|
|
"org.mpris.MediaPlayer2.Playlists": {
|
|
"PlaylistCount": playlist_count,
|
|
"Orderings": ["Alphabetical", "Created", "Modified"],
|
|
"ActivePlaylist": ("(b(oss))", active_playlist),
|
|
},
|
|
}
|
|
|
|
@functools.lru_cache(maxsize=10)
|
|
def get_active_playlist(
|
|
self, active_playlist_id: Optional[str]
|
|
) -> Tuple[bool, GLib.Variant]:
|
|
if not active_playlist_id or not AdapterManager.can_get_playlist_details():
|
|
return (False, GLib.Variant("(oss)", ("/", "", "")))
|
|
|
|
try:
|
|
playlist = AdapterManager.get_playlist_details(
|
|
active_playlist_id, allow_download=False
|
|
).result()
|
|
|
|
try:
|
|
cover_art = AdapterManager.get_cover_art_uri(
|
|
playlist.cover_art, "file", allow_download=False
|
|
).result()
|
|
except CacheMissError:
|
|
cover_art = ""
|
|
|
|
return (
|
|
True,
|
|
GLib.Variant(
|
|
"(oss)",
|
|
(
|
|
"/playlist/" + DBusManager._escape_id(playlist.id),
|
|
playlist.name,
|
|
cover_art,
|
|
),
|
|
),
|
|
)
|
|
except Exception:
|
|
logging.exception("Couldn't get playlist details")
|
|
return (False, GLib.Variant("(oss)", ("/", "", "")))
|
|
|
|
@functools.lru_cache(maxsize=10)
|
|
def get_mpris_metadata(
|
|
self, idx: int, play_queue: Tuple[str, ...]
|
|
) -> Dict[str, Any]:
|
|
try:
|
|
song = AdapterManager.get_song_details(
|
|
play_queue[idx], allow_download=False
|
|
).result()
|
|
except Exception:
|
|
return {}
|
|
|
|
trackid = DBusManager.get_dbus_playlist(play_queue)[idx]
|
|
duration = (
|
|
"x",
|
|
int(
|
|
(song.duration or timedelta(0)).total_seconds()
|
|
* self.second_microsecond_conversion
|
|
),
|
|
)
|
|
|
|
try:
|
|
cover_art = AdapterManager.get_cover_art_uri(
|
|
song.cover_art, "file", allow_download=False
|
|
).result()
|
|
cover_art = "file://" + cover_art
|
|
except CacheMissError:
|
|
cover_art = ""
|
|
|
|
artist_name = song.artist.name if song.artist else ""
|
|
return {
|
|
"mpris:trackid": trackid,
|
|
"mpris:length": duration,
|
|
# Art URIs should be sent as (UTF-8) strings.
|
|
# Local files should use the "file://" schema.
|
|
"mpris:artUrl": cover_art,
|
|
# TODO (#71) use walrus once MYPY isn't retarded
|
|
"xesam:album": (song.album.name if song.album else ""),
|
|
"xesam:albumArtist": [artist_name],
|
|
"xesam:artist": [artist_name],
|
|
"xesam:title": song.title,
|
|
}
|
|
|
|
@staticmethod
|
|
@functools.lru_cache(maxsize=20)
|
|
def get_dbus_playlist(play_queue: Tuple[str, ...]) -> List[str]:
|
|
"""
|
|
Gets a playlist formatted for DBus. If multiples of the same element exist in
|
|
the queue, it will use ``/0`` after the song ID to differentiate between the
|
|
instances.
|
|
|
|
>>> DBusManager.get_dbus_playlist(("2", "1", "3", "1"))
|
|
['/song/2/0', '/song/1/0', '/song/3/0', '/song/1/1']
|
|
"""
|
|
seen_counts: DefaultDict[str, int] = defaultdict(int)
|
|
tracks = []
|
|
for song_id in play_queue:
|
|
num_seen = seen_counts[song_id]
|
|
tracks.append(f"/song/{DBusManager._escape_id(song_id)}/{num_seen}")
|
|
seen_counts[song_id] += 1
|
|
|
|
return tracks
|
|
|
|
diff_parse_re = re.compile(r"root\['(.*?)'\]\['(.*?)'\](?:\[.*\])?")
|
|
|
|
def property_diff(self):
|
|
new_property_dict = self.property_dict()
|
|
diff = DeepDiff(self.current_state, new_property_dict)
|
|
|
|
changes = defaultdict(dict)
|
|
|
|
for path, change in diff.get("values_changed", {}).items():
|
|
interface, property_name = self.diff_parse_re.match(path).groups()
|
|
changes[interface][property_name] = change["new_value"]
|
|
|
|
if diff.get("dictionary_item_added"):
|
|
changes = new_property_dict
|
|
|
|
for interface, changed_props in changes.items():
|
|
# If the metadata has changed, just make the entire Metadata object
|
|
# part of the update.
|
|
if "Metadata" in changed_props.keys():
|
|
changed_props["Metadata"] = new_property_dict[interface]["Metadata"]
|
|
|
|
# Special handling for when the position changes (a seek).
|
|
# Technically, I'm sending this signal too often, but I don't think
|
|
# it really matters.
|
|
if (
|
|
interface == "org.mpris.MediaPlayer2.Player"
|
|
and "Position" in changed_props
|
|
):
|
|
self.connection.emit_signal(
|
|
None,
|
|
"/org/mpris/MediaPlayer2",
|
|
interface,
|
|
"Seeked",
|
|
GLib.Variant("(x)", (changed_props["Position"][1],)),
|
|
)
|
|
|
|
# Do not emit the property change.
|
|
del changed_props["Position"]
|
|
|
|
# Special handling for when the track list changes.
|
|
# Technically, I'm supposed to use `TrackAdded` and `TrackRemoved`
|
|
# signals when minor changes occur, but the docs also say that:
|
|
#
|
|
# > It is left up to the implementation to decide when a change to
|
|
# > the track list is invasive enough that this signal should be
|
|
# > emitted instead of a series of TrackAdded and TrackRemoved
|
|
# > signals.
|
|
#
|
|
# So I think that any change is invasive enough that I should use
|
|
# this signal.
|
|
if (
|
|
interface == "org.mpris.MediaPlayer2.TrackList"
|
|
and "Tracks" in changed_props
|
|
):
|
|
track_list = changed_props["Tracks"]
|
|
if len(track_list) > 0:
|
|
current_track = new_property_dict["org.mpris.MediaPlayer2.Player"][
|
|
"Metadata"
|
|
].get("mpris:trackid", track_list[0])
|
|
self.connection.emit_signal(
|
|
None,
|
|
"/org/mpris/MediaPlayer2",
|
|
interface,
|
|
"TrackListReplaced",
|
|
GLib.Variant("(aoo)", (track_list, current_track)),
|
|
)
|
|
|
|
self.connection.emit_signal(
|
|
None,
|
|
"/org/mpris/MediaPlayer2",
|
|
"org.freedesktop.DBus.Properties",
|
|
"PropertiesChanged",
|
|
GLib.Variant(
|
|
"(sa{sv}as)",
|
|
(
|
|
interface,
|
|
{
|
|
k: DBusManager.to_variant(v)
|
|
for k, v in changed_props.items()
|
|
},
|
|
[],
|
|
),
|
|
),
|
|
)
|
|
|
|
# Update state for next diff.
|
|
self.current_state = new_property_dict
|