import copy from datetime import timedelta from typing import Any, Optional, Callable, Dict, Set, Tuple from functools import partial import bleach from gi.repository import GObject, Gtk, GLib, GdkPixbuf from .. import util from ...util import resolve_path from ...adapters import AdapterManager, Result from ...adapters.api_objects import Song from ...config import AppConfiguration from ..state import RepeatType from ..common import IconButton, IconToggleButton, SpinnerImage, SpinnerPicture class Manager(GObject.GObject): __gsignals__ = { "device-update": (GObject.SignalFlags.RUN_FIRST, GObject.TYPE_NONE, (str,)), "song-clicked": ( GObject.SignalFlags.RUN_FIRST, GObject.TYPE_NONE, (int, object, object), ), "songs-removed": (GObject.SignalFlags.RUN_FIRST, GObject.TYPE_NONE, (object,)), "refresh-window": ( GObject.SignalFlags.RUN_FIRST, GObject.TYPE_NONE, (object, bool), ), "seek": (GObject.SignalFlags.RUN_FIRST, GObject.TYPE_NONE, (float,)), } volume = GObject.Property(type=Gtk.Adjustment) scrubber = GObject.Property(type=Gtk.Adjustment) scrubber_cache = GObject.Property(type=int, default=0) play_queue_open = GObject.Property(type=bool, default=False) play_queue_store = GObject.Property(type=Gtk.ListStore) flap_reveal_progress = GObject.Property(type=float, default=False) offline_mode = GObject.Property(type=bool, default=False) has_song = GObject.Property(type=bool, default=False) play_button_icon = GObject.Property(type=str) play_button_tooltip = GObject.Property(type=str) repeat_button_icon = GObject.Property(type=str) repeat_button_active = GObject.Property(type=bool, default=False) shuffle_button_active = GObject.Property(type=bool, default=False) duration_label = GObject.Property(type=str, default="-:--") progress_label = GObject.Property(type=str, default="-:--") updating_scrubber = False cover_art_update_order_token = 0 play_queue_update_order_token = 0 def __init__(self): super().__init__() self.volume = Gtk.Adjustment.new(1, 0, 1, 0.01, 0, 0) self.scrubber = Gtk.Adjustment() self.scrubber.set_step_increment(1) self.scrubber.connect("value-changed", self.on_scrubber_changed) self.scrubber.connect("value-changed", self.update_progress_label) self.scrubber.connect("changed", self.update_duration_label) self.play_queue_store = Gtk.ListStore( bool, # playable str, # image filename str, # title, album, artist bool, # playing str, # song ID ) self.play_queue_store_art_cache = {} # Set up drag-and-drop on the song list for editing the order of the # playlist. self.play_queue_store.connect("row-inserted", self.on_play_queue_model_row_move) self.play_queue_store.connect("row-deleted", self.on_play_queue_model_row_move) self._controls = [] # Device popover self.device_popover = Gtk.PopoverMenu(modal=True, name="device-popover") device_popover_box = Gtk.Box( orientation=Gtk.Orientation.VERTICAL, name="device-popover-box", ) device_popover_header = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL) popover_label = Gtk.Label( label="Devices", use_markup=True, halign=Gtk.Align.START, margin=5, ) device_popover_header.add(popover_label) refresh_devices = IconButton("view-refresh-symbolic", "Refresh device list") refresh_devices.set_action_name("app.refresh-devices") device_popover_header.pack_end(refresh_devices, False, False, 0) device_popover_box.add(device_popover_header) device_list_and_loading = Gtk.Box(orientation=Gtk.Orientation.VERTICAL) self.device_list = Gtk.Box(orientation=Gtk.Orientation.VERTICAL) device_list_and_loading.add(self.device_list) device_popover_box.pack_end(device_list_and_loading, True, True, 0) self.device_popover.add(device_popover_box) def add_control(self, control): self._controls.append(control) def update(self, app_config: AppConfiguration, force: bool = False): self.has_song = app_config.state.current_song is not None self.update_song_progress( app_config.state.song_progress, app_config.state.current_song and app_config.state.current_song.duration, app_config.state.song_stream_cache_progress) if self.has_song: self.cover_art_update_order_token += 1 self.update_cover_art( app_config.state.current_song.cover_art, order_token=self.cover_art_update_order_token) for control in self._controls: control.update(app_config, force=force) if not self.has_song: self.set_cover_art(None, False) self.update_player_queue(app_config) self.update_device_list(app_config) icon = "pause" if app_config.state.playing else "start" self.play_button_icon = f"media-playback-{icon}-symbolic" self.play_button_tooltip = "Pause" if app_config.state.playing else "Play" repeat_on = app_config.state.repeat_type in ( RepeatType.REPEAT_QUEUE, RepeatType.REPEAT_SONG, ) self.repeat_button_icon = app_config.state.repeat_type.icon self.repeat_button_active = repeat_on self.shuffle_button_active = app_config.state.shuffle_on def update_song_progress(self, progress: Optional[timedelta], duration: Optional[timedelta], cache_progess: Optional[timedelta]): self.updating_scrubber = True if progress is None or duration is None: self.scrubber.set_value(0) self.scrubber.set_upper(0) self.scrubber_cache = 0 self.updating_scrubber = False return self.scrubber.set_value(progress.total_seconds()) self.scrubber.set_upper(duration.total_seconds()) if cache_progess is not None: self.scrubber_cache = cache_progess.total_seconds() else: self.scrubber_cache = duration.total_seconds() self.updating_scrubber = False @util.async_callback( partial(AdapterManager.get_cover_art_uri, scheme="file"), before_download=lambda self: self.set_cover_art(None, True), on_failure=lambda self, e: self.set_cover_art(None, False), ) def update_cover_art( self, cover_art_filename: str, app_config: AppConfiguration, force: bool = False, order_token: int = None, is_partial: bool = False, ): if order_token != self.cover_art_update_order_token: return self.set_cover_art(cover_art_filename, False) def set_cover_art(self, cover_art_filename: Optional[str], loading: bool): for control in self._controls: control.set_cover_art(cover_art_filename, loading) def get_play_queue_cover_art(self, filename: Optional[str], playing: bool): if filename in self.play_queue_store_art_cache: return self.play_queue_store_art_cache[filename] if not filename: return None pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale(filename, 50, 50, True) # If this is the playing song, then overlay the play icon. if playing: play_overlay_pixbuf = GdkPixbuf.Pixbuf.new_from_file( str(resolve_path("ui/images/play-queue-play.png")) ) play_overlay_pixbuf.composite( pixbuf, 0, 0, 50, 50, 0, 0, 1, 1, GdkPixbuf.InterpType.NEAREST, 200 ) self.play_queue_store_art_cache[filename] = pixbuf return pixbuf def on_play_queue_model_row_move(self, *args): self.play_queue_store_art_cache = {} # TODO return # If we are programatically editing the song list, don't do anything. if self.editing_play_queue_song_list: return # We get both a delete and insert event, I think it's deterministic # which one comes first, but just in case, we have this # reordering_play_queue_song_list flag. if self.reordering_play_queue_song_list: currently_playing_index = [ i for i, s in enumerate(self.play_queue_store) if s[3] # playing ][0] self.state.emit( "refresh-window", { "current_song_index": currently_playing_index, "play_queue": tuple(s[-1] for s in self.play_queue_store), }, False, ) self.reordering_play_queue_song_list = False else: self.reordering_play_queue_song_list = True def update_player_queue(self, app_config: AppConfiguration): new_store = [] def calculate_label(song_details: Song) -> str: title = bleach.clean(song_details.title) # TODO (#71): use walrus once MYPY works with this # album = bleach.clean(album.name if (album := song_details.album) else None) # artist = bleach.clean(artist.name if (artist := song_details.artist) else None) # noqa album = bleach.clean(song_details.album.name if song_details.album else None) artist = bleach.clean(song_details.artist.name if song_details.artist else None) return f"{title}\n{util.dot_join(album, artist)}" def make_idle_index_capturing_function( idx: int, order_tok: int, fn: Callable[[int, int, Any], None], ) -> Callable[[Result], None]: return lambda f: GLib.idle_add(fn, idx, order_tok, f.result()) def on_cover_art_future_done( idx: int, order_token: int, cover_art_filename: str, ): if order_token != self.play_queue_update_order_token: return self.play_queue_store[idx][1] = cover_art_filename def get_cover_art_filename_or_create_future( cover_art_id: Optional[str], idx: int, order_token: int ) -> Optional[str]: cover_art_result = AdapterManager.get_cover_art_uri(cover_art_id, "file") if not cover_art_result.data_is_available: cover_art_result.add_done_callback( make_idle_index_capturing_function( idx, order_token, on_cover_art_future_done ) ) return None # The cover art is already cached. return cover_art_result.result() def on_song_details_future_done(idx: int, order_token: int, song_details: Song): if order_token != self.play_queue_update_order_token: return self.play_queue_store[idx][2] = calculate_label(song_details) # Cover Art filename = get_cover_art_filename_or_create_future( song_details.cover_art, idx, order_token ) if filename: self.play_queue_store[idx][1] = filename current_play_queue = [x[-1] for x in self.play_queue_store] if app_config.state.play_queue != current_play_queue: self.play_queue_update_order_token += 1 song_details_results = [] for i, (song_id, cached_status) in enumerate( zip( app_config.state.play_queue, AdapterManager.get_cached_statuses(app_config.state.play_queue), ) ): song_details_result = AdapterManager.get_song_details(song_id) cover_art_filename = "" label = "\n" if song_details_result.data_is_available: # We have the details of the song already cached. song_details = song_details_result.result() label = calculate_label(song_details) filename = get_cover_art_filename_or_create_future( song_details.cover_art, i, self.play_queue_update_order_token ) if filename: cover_art_filename = filename else: song_details_results.append((i, song_details_result)) new_store.append( [ ( not self.offline_mode or cached_status in (SongCacheStatus.CACHED, SongCacheStatus.PERMANENTLY_CACHED) ), cover_art_filename, label, i == app_config.state.current_song_index, song_id, ] ) util.diff_song_store(self.play_queue_store, new_store) # Do this after the diff to avoid race conditions. for idx, song_details_result in song_details_results: song_details_result.add_done_callback( make_idle_index_capturing_function( idx, self.play_queue_update_order_token, on_song_details_future_done, ) ) def popup_devices(self, relative_to): self.device_popover.set_relative_to(relative_to) self.device_popover.popup() self.device_popover.show_all() _current_player_id = None _current_available_players: Dict[type, Set[Tuple[str, str]]] = {} def update_device_list(self, app_config: AppConfiguration): if ( self._current_available_players == app_config.state.available_players and self._current_player_id == app_config.state.current_device ): return self._current_player_id = app_config.state.current_device self._current_available_players = copy.deepcopy( app_config.state.available_players ) for c in self.device_list.get_children(): self.device_list.remove(c) for i, (player_type, players) in enumerate( app_config.state.available_players.items() ): if len(players) == 0: continue if i > 0: self.device_list.add( Gtk.Separator(orientation=Gtk.Orientation.HORIZONTAL) ) self.device_list.add( Gtk.Label( label=f"{player_type.name} Devices", halign=Gtk.Align.START, name="device-type-section-title", ) ) for player_id, player_name in sorted(players, key=lambda p: p[1]): icon = ( "audio-volume-high-symbolic" if player_id == app_config.state.current_device else None ) button = IconButton(icon, label=player_name) button.get_style_context().add_class("menu-button") button.connect( "clicked", lambda _, player_id: self.state.emit("device-update", player_id), player_id, ) self.device_list.add(button) self.device_list.show_all() def on_scrubber_changed(self, _: Any): if self.updating_scrubber: return self.emit("seek", self.scrubber.get_value()) def update_progress_label(self, _: Any): if self.scrubber.get_upper() == 0: self.progress_label = "-:--" else: self.progress_label = util.format_song_duration( int(self.scrubber.get_value())) def update_duration_label(self, _: Any): upper = self.scrubber.get_upper() if upper == 0: self.duration_label = "-:--" else: self.duration_label = util.format_song_duration(int(upper))