Files
sublime-music/sublime_music/ui/player_controls/manager.py
Benjamin Schaaf 4ba2e09cf1 WIP
2021-12-20 22:09:08 +11:00

408 lines
15 KiB
Python

import copy
from datetime import timedelta
from typing import Any, Optional, Callable, Dict, Set, Tuple
from functools import partial
from gi.repository import GObject, Gtk, GLib
from .. import util
from ...adapters import AdapterManager, Result
from ...adapters.api_objects import Song
from ...config import AppConfiguration
from ..state import RepeatType
from ..common import IconButton, IconToggleButton, SpinnerImage, SpinnerPicture
class Manager(GObject.GObject):
__gsignals__ = {
"device-update": (GObject.SignalFlags.RUN_FIRST, GObject.TYPE_NONE, (str,)),
"song-clicked": (
GObject.SignalFlags.RUN_FIRST,
GObject.TYPE_NONE,
(int, object, object),
),
"songs-removed": (GObject.SignalFlags.RUN_FIRST, GObject.TYPE_NONE, (object,)),
"refresh-window": (
GObject.SignalFlags.RUN_FIRST,
GObject.TYPE_NONE,
(object, bool),
),
"seek": (GObject.SignalFlags.RUN_FIRST, GObject.TYPE_NONE, (float,)),
}
volume = GObject.Property(type=Gtk.Adjustment)
scrubber = GObject.Property(type=Gtk.Adjustment)
scrubber_cache = GObject.Property(type=int, default=0)
play_queue_open = GObject.Property(type=bool, default=False)
play_queue_store = GObject.Property(type=Gtk.ListStore)
flap_open = GObject.Property(type=bool, default=False)
offline_mode = GObject.Property(type=bool, default=False)
has_song = GObject.Property(type=bool, default=False)
play_button_icon = GObject.Property(type=str)
play_button_tooltip = GObject.Property(type=str)
repeat_button_icon = GObject.Property(type=str)
repeat_button_active = GObject.Property(type=bool, default=False)
shuffle_button_active = GObject.Property(type=bool, default=False)
duration_label = GObject.Property(type=str, default="-:--")
progress_label = GObject.Property(type=str, default="-:--")
updating_scrubber = False
cover_art_update_order_token = 0
play_queue_update_order_token = 0
def __init__(self):
super().__init__()
self.volume = Gtk.Adjustment(1, 0, 1, 0.01, 0, 0)
self.scrubber = Gtk.Adjustment()
self.scrubber.set_step_increment(1)
self.scrubber.connect("value-changed", self.on_scrubber_changed)
self.scrubber.connect("value-changed", self.update_progress_label)
self.scrubber.connect("changed", self.update_duration_label)
self.play_queue_store = Gtk.ListStore(
bool, # playable
str, # image filename
str, # title, album, artist
bool, # playing
str, # song ID
)
# Set up drag-and-drop on the song list for editing the order of the
# playlist.
self.play_queue_store.connect("row-inserted", self.on_play_queue_model_row_move)
self.play_queue_store.connect("row-deleted", self.on_play_queue_model_row_move)
self._controls = []
# Device popover
self.device_popover = Gtk.PopoverMenu(modal=True, name="device-popover")
device_popover_box = Gtk.Box(
orientation=Gtk.Orientation.VERTICAL,
name="device-popover-box",
)
device_popover_header = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL)
popover_label = Gtk.Label(
label="<b>Devices</b>",
use_markup=True,
halign=Gtk.Align.START,
margin=5,
)
device_popover_header.add(popover_label)
refresh_devices = IconButton("view-refresh-symbolic", "Refresh device list")
refresh_devices.set_action_name("app.refresh-devices")
device_popover_header.pack_end(refresh_devices, False, False, 0)
device_popover_box.add(device_popover_header)
device_list_and_loading = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
self.device_list = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
device_list_and_loading.add(self.device_list)
device_popover_box.pack_end(device_list_and_loading, True, True, 0)
self.device_popover.add(device_popover_box)
def add_control(self, control):
self._controls.append(control)
def update(self, app_config: AppConfiguration, force: bool = False):
self.has_song = app_config.state.current_song is not None
self.update_song_progress(
app_config.state.song_progress,
app_config.state.current_song and app_config.state.current_song.duration,
app_config.state.song_stream_cache_progress)
if self.has_song:
self.cover_art_update_order_token += 1
self.update_cover_art(
app_config.state.current_song.cover_art,
order_token=self.cover_art_update_order_token)
for control in self._controls:
control.update(app_config, force=force)
if not self.has_song:
self.set_cover_art(None, False)
self.update_player_queue(app_config)
self.update_device_list(app_config)
icon = "pause" if app_config.state.playing else "start"
self.play_button_icon = f"media-playback-{icon}-symbolic"
self.play_button_tooltip = "Pause" if app_config.state.playing else "Play"
repeat_on = app_config.state.repeat_type in (
RepeatType.REPEAT_QUEUE,
RepeatType.REPEAT_SONG,
)
self.repeat_button_icon = app_config.state.repeat_type.icon
self.repeat_button_active = repeat_on
self.shuffle_button_active = app_config.state.shuffle_on
def update_song_progress(self,
progress: Optional[timedelta],
duration: Optional[timedelta],
cache_progess: Optional[timedelta]):
self.updating_scrubber = True
if progress is None or duration is None:
self.scrubber.set_value(0)
self.scrubber.set_upper(0)
self.scrubber_cache = 0
self.updating_scrubber = False
return
self.scrubber.set_value(progress.total_seconds())
self.scrubber.set_upper(duration.total_seconds())
if cache_progess is not None:
self.scrubber_cache = cache_progess.total_seconds()
else:
self.scrubber_cache = duration.total_seconds()
self.updating_scrubber = False
@util.async_callback(
partial(AdapterManager.get_cover_art_uri, scheme="file"),
before_download=lambda self: self.set_cover_art(None, True),
on_failure=lambda self, e: self.set_cover_art(None, False),
)
def update_cover_art(
self,
cover_art_filename: str,
app_config: AppConfiguration,
force: bool = False,
order_token: int = None,
is_partial: bool = False,
):
if order_token != self.cover_art_update_order_token:
return
self.set_cover_art(cover_art_filename, False)
def set_cover_art(self, cover_art_filename: Optional[str], loading: bool):
for control in self._controls:
control.set_cover_art(cover_art_filename, loading)
def on_play_queue_model_row_move(self, *args):
# TODO
return
# If we are programatically editing the song list, don't do anything.
if self.editing_play_queue_song_list:
return
# We get both a delete and insert event, I think it's deterministic
# which one comes first, but just in case, we have this
# reordering_play_queue_song_list flag.
if self.reordering_play_queue_song_list:
currently_playing_index = [
i for i, s in enumerate(self.play_queue_store) if s[3] # playing
][0]
self.state.emit(
"refresh-window",
{
"current_song_index": currently_playing_index,
"play_queue": tuple(s[-1] for s in self.play_queue_store),
},
False,
)
self.reordering_play_queue_song_list = False
else:
self.reordering_play_queue_song_list = True
def update_player_queue(self, app_config: AppConfiguration):
new_store = []
def calculate_label(song_details: Song) -> str:
title = util.esc(song_details.title)
# TODO (#71): use walrus once MYPY works with this
# album = util.esc(album.name if (album := song_details.album) else None)
# artist = util.esc(artist.name if (artist := song_details.artist) else None) # noqa
album = util.esc(song_details.album.name if song_details.album else None)
artist = util.esc(song_details.artist.name if song_details.artist else None)
return f"<b>{title}</b>\n{util.dot_join(album, artist)}"
def make_idle_index_capturing_function(
idx: int,
order_tok: int,
fn: Callable[[int, int, Any], None],
) -> Callable[[Result], None]:
return lambda f: GLib.idle_add(fn, idx, order_tok, f.result())
def on_cover_art_future_done(
idx: int,
order_token: int,
cover_art_filename: str,
):
if order_token != self.play_queue_update_order_token:
return
self.play_queue_store[idx][1] = cover_art_filename
def get_cover_art_filename_or_create_future(
cover_art_id: Optional[str], idx: int, order_token: int
) -> Optional[str]:
cover_art_result = AdapterManager.get_cover_art_uri(cover_art_id, "file")
if not cover_art_result.data_is_available:
cover_art_result.add_done_callback(
make_idle_index_capturing_function(
idx, order_token, on_cover_art_future_done
)
)
return None
# The cover art is already cached.
return cover_art_result.result()
def on_song_details_future_done(idx: int, order_token: int, song_details: Song):
if order_token != self.play_queue_update_order_token:
return
self.play_queue_store[idx][2] = calculate_label(song_details)
# Cover Art
filename = get_cover_art_filename_or_create_future(
song_details.cover_art, idx, order_token
)
if filename:
self.play_queue_store[idx][1] = filename
current_play_queue = [x[-1] for x in self.play_queue_store]
if app_config.state.play_queue != current_play_queue:
self.play_queue_update_order_token += 1
song_details_results = []
for i, (song_id, cached_status) in enumerate(
zip(
app_config.state.play_queue,
AdapterManager.get_cached_statuses(app_config.state.play_queue),
)
):
song_details_result = AdapterManager.get_song_details(song_id)
cover_art_filename = ""
label = "\n"
if song_details_result.data_is_available:
# We have the details of the song already cached.
song_details = song_details_result.result()
label = calculate_label(song_details)
filename = get_cover_art_filename_or_create_future(
song_details.cover_art, i, self.play_queue_update_order_token
)
if filename:
cover_art_filename = filename
else:
song_details_results.append((i, song_details_result))
new_store.append(
[
(
not self.offline_mode
or cached_status
in (SongCacheStatus.CACHED, SongCacheStatus.PERMANENTLY_CACHED)
),
cover_art_filename,
label,
i == app_config.state.current_song_index,
song_id,
]
)
util.diff_song_store(self.play_queue_store, new_store)
# Do this after the diff to avoid race conditions.
for idx, song_details_result in song_details_results:
song_details_result.add_done_callback(
make_idle_index_capturing_function(
idx,
self.play_queue_update_order_token,
on_song_details_future_done,
)
)
def popup_devices(self, relative_to):
self.device_popover.set_relative_to(relative_to)
self.device_popover.popup()
self.device_popover.show_all()
_current_player_id = None
_current_available_players: Dict[type, Set[Tuple[str, str]]] = {}
def update_device_list(self, app_config: AppConfiguration):
if (
self._current_available_players == app_config.state.available_players
and self._current_player_id == app_config.state.current_device
):
return
self._current_player_id = app_config.state.current_device
self._current_available_players = copy.deepcopy(
app_config.state.available_players
)
for c in self.device_list.get_children():
self.device_list.remove(c)
for i, (player_type, players) in enumerate(
app_config.state.available_players.items()
):
if len(players) == 0:
continue
if i > 0:
self.device_list.add(
Gtk.Separator(orientation=Gtk.Orientation.HORIZONTAL)
)
self.device_list.add(
Gtk.Label(
label=f"{player_type.name} Devices",
halign=Gtk.Align.START,
name="device-type-section-title",
)
)
for player_id, player_name in sorted(players, key=lambda p: p[1]):
icon = (
"audio-volume-high-symbolic"
if player_id == app_config.state.current_device
else None
)
button = IconButton(icon, label=player_name)
button.get_style_context().add_class("menu-button")
button.connect(
"clicked",
lambda _, player_id: self.state.emit("device-update", player_id),
player_id,
)
self.device_list.add(button)
self.device_list.show_all()
def on_scrubber_changed(self, _: Any):
if self.updating_scrubber:
return
self.emit("seek", self.scrubber.get_value())
def update_progress_label(self, _: Any):
if self.scrubber.get_upper() == 0:
self.progress_label = "-:--"
else:
self.progress_label = util.format_song_duration(
int(self.scrubber.get_value()))
def update_duration_label(self, _: Any):
upper = self.scrubber.get_upper()
if upper == 0:
self.duration_label = "-:--"
else:
self.duration_label = util.format_song_duration(int(upper))