Files
sublime-music/sublime/adapters/manager.py
2020-05-12 13:36:37 -06:00

1087 lines
39 KiB
Python

import logging
import tempfile
import threading
from concurrent.futures import Future, ThreadPoolExecutor
from dataclasses import dataclass
from datetime import timedelta
from pathlib import Path
from time import sleep
from typing import (
Any,
Callable,
cast,
Generic,
Optional,
Sequence,
Set,
Tuple,
Type,
TypeVar,
Union,
)
import requests
from sublime import util
from sublime.config import AppConfiguration
from .adapter_base import Adapter, CacheMissError, CachingAdapter, SongCacheStatus
from .api_objects import (
Album,
Artist,
Genre,
Playlist,
PlaylistDetails,
PlayQueue,
SearchResult,
Song,
)
from .filesystem import FilesystemAdapter
from .subsonic import SubsonicAdapter
T = TypeVar("T")
class Result(Generic[T]):
"""
A result from a :class:`AdapterManager` function. This is effectively a wrapper
around a :class:`concurrent.futures.Future`, but it resolves immediately if the data
already exists.
"""
_data: Optional[T] = None
_future: Optional[Future] = None
_default_value: Optional[T] = None
_on_cancel: Optional[Callable[[], None]] = None
def __init__(
self,
data_resolver: Union[T, Callable[[], T]],
*args,
is_download: bool = False,
default_value: T = None,
on_cancel: Callable[[], None] = None,
):
"""
Creates a :class:`Result` object.
:param data_resolver: the actual data, or a function that will return the actual
data. If the latter, the function will be executed by the thread pool.
:param is_download: whether or not this result requires a file download. If it
does, then it uses a separate executor.
"""
if callable(data_resolver):
if is_download:
self._future = AdapterManager.download_executor.submit(
data_resolver, *args
)
else:
self._future = AdapterManager.executor.submit(data_resolver, *args)
self._future.add_done_callback(self._on_future_complete)
else:
self._data = data_resolver
self._default_value = default_value
self._on_cancel = on_cancel
def _on_future_complete(self, future: Future):
try:
self._data = future.result()
except Exception as e:
if self._default_value:
self._data = self._default_value
else:
raise e
def result(self) -> T:
"""
Retrieve the actual data. If the data exists already, then return it, otherwise,
blocking-wait on the future's result.
"""
try:
if self._data is not None:
return self._data
if self._future is not None:
return self._future.result()
assert 0, "AdapterManager.Result had neither _data nor _future member!"
except Exception as e:
if self._default_value:
self._data = self._default_value
raise e
def add_done_callback(self, fn: Callable, *args):
"""Attaches the callable ``fn`` to the future."""
if self._future is not None:
self._future.add_done_callback(fn, *args)
else:
# Run the function immediately if it's not a future.
fn(self, *args)
def cancel(self) -> bool:
"""Cancel the future, or do nothing if the data already exists."""
if self._on_cancel:
self._on_cancel()
if self._future is not None:
return self._future.cancel()
return True
@property
def data_is_available(self) -> bool:
"""
Whether or not the data is available at the current moment. This can be used to
determine whether or not the UI needs to put the callback into a
:class:`GLib.idle_add` call.
"""
return self._data is not None
class AdapterManager:
available_adapters: Set[Any] = {FilesystemAdapter, SubsonicAdapter}
current_download_hashes: Set[str] = set()
download_set_lock = threading.Lock()
executor: ThreadPoolExecutor = ThreadPoolExecutor()
download_executor: ThreadPoolExecutor = ThreadPoolExecutor()
is_shutting_down: bool = False
@dataclass
class _AdapterManagerInternal:
ground_truth_adapter: Adapter
caching_adapter: Optional[CachingAdapter] = None
concurrent_download_limit: int = 5
def __post_init__(self):
self._download_dir = tempfile.TemporaryDirectory()
self.download_path = Path(self._download_dir.name)
self.download_limiter_semaphore = threading.Semaphore(
self.concurrent_download_limit
)
def shutdown(self):
self.ground_truth_adapter.shutdown()
if self.caching_adapter:
self.caching_adapter.shutdown()
self._download_dir.cleanup()
_instance: Optional[_AdapterManagerInternal] = None
@staticmethod
def register_adapter(adapter_class: Type):
if not issubclass(adapter_class, Adapter):
raise TypeError("Attempting to register a class that is not an adapter.")
AdapterManager.available_adapters.add(adapter_class)
def __init__(self):
"""
This should not ever be called. You should only ever use the static methods on
this class.
"""
raise Exception(
"Do not instantiate the AdapterManager. "
"Only use the static methods on the class."
)
@staticmethod
def shutdown():
logging.info("AdapterManager shutdown start")
AdapterManager.is_shutting_down = True
AdapterManager.executor.shutdown()
AdapterManager.download_executor.shutdown()
if AdapterManager._instance:
AdapterManager._instance.shutdown()
logging.info("AdapterManager shutdown complete")
@staticmethod
def reset(config: AppConfiguration):
# First, shutdown the current one...
if AdapterManager._instance:
AdapterManager._instance.shutdown()
# TODO: actually do stuff with the config to determine which adapters
# to create, etc.
assert config.server is not None
source_data_dir = Path(config.cache_location, config.server.strhash())
source_data_dir.joinpath("g").mkdir(parents=True, exist_ok=True)
source_data_dir.joinpath("c").mkdir(parents=True, exist_ok=True)
ground_truth_adapter_type = SubsonicAdapter
ground_truth_adapter = ground_truth_adapter_type(
{
key: getattr(config.server, key)
for key in ground_truth_adapter_type.get_config_parameters()
},
source_data_dir.joinpath("g"),
)
caching_adapter_type = FilesystemAdapter
caching_adapter = None
if caching_adapter_type and ground_truth_adapter_type.can_be_cached:
caching_adapter = caching_adapter_type(
{
key: getattr(config.server, key)
for key in caching_adapter_type.get_config_parameters()
},
source_data_dir.joinpath("c"),
is_cache=True,
)
AdapterManager._instance = AdapterManager._AdapterManagerInternal(
ground_truth_adapter,
caching_adapter=caching_adapter,
concurrent_download_limit=config.concurrent_download_limit,
)
# Data Helper Methods
# ==================================================================================
TAdapter = TypeVar("TAdapter", bound=Adapter)
@staticmethod
def _adapter_can_do(adapter: Optional[TAdapter], action_name: str) -> bool:
return (
adapter is not None
and adapter.can_service_requests
and getattr(adapter, f"can_{action_name}", False)
)
@staticmethod
def _cache_can_do(action_name: str) -> bool:
return AdapterManager._instance is not None and AdapterManager._adapter_can_do(
AdapterManager._instance.caching_adapter, action_name
)
@staticmethod
def _ground_truth_can_do(action_name: str) -> bool:
return AdapterManager._instance is not None and AdapterManager._adapter_can_do(
AdapterManager._instance.ground_truth_adapter, action_name
)
@staticmethod
def _can_use_cache(force: bool, action_name: str) -> bool:
if force:
return False
return AdapterManager._cache_can_do(action_name)
@staticmethod
def _any_adapter_can_do(action_name: str) -> bool:
if AdapterManager._instance is None:
return False
return AdapterManager._ground_truth_can_do(
action_name
) or AdapterManager._cache_can_do(action_name)
@staticmethod
def _create_ground_truth_result(
function_name: str, *args, before_download: Callable[[], None] = None, **kwargs
) -> Result:
"""
Creates a Result using the given ``function_name`` on the ground truth adapter.
"""
def future_fn() -> Any:
assert AdapterManager._instance
if before_download:
before_download()
return getattr(
AdapterManager._instance.ground_truth_adapter, function_name
)(*args, **kwargs)
return Result(future_fn)
@staticmethod
def _create_download_fn(uri: str, params_hash: str) -> Callable[[], str]:
"""
Create a function to download the given URI to a temporary file, and return the
filename. The returned function will spin-loop if the resource is already being
downloaded to prevent multiple requests for the same download.
"""
def download_fn() -> str:
assert AdapterManager._instance
download_tmp_filename = AdapterManager._instance.download_path.joinpath(
params_hash
)
resource_downloading = False
with AdapterManager.download_set_lock:
if params_hash in AdapterManager.current_download_hashes:
resource_downloading = True
AdapterManager.current_download_hashes.add(params_hash)
# TODO figure out how to retry if the other request failed.
if resource_downloading:
logging.info(f"{uri} already being downloaded.")
# The resource is already being downloaded. Busy loop until
# it has completed. Then, just return the path to the
# resource.
t = 0.0
while params_hash in AdapterManager.current_download_hashes and t < 20:
sleep(0.2)
t += 0.2
# TODO handle the timeout
else:
logging.info(f"{uri} not found. Downloading...")
try:
data = requests.get(uri)
# TODO (#122): make better
if not data:
raise Exception("Download failed!")
if "json" in data.headers.get("Content-Type", ""):
raise Exception("Didn't expect JSON!")
with open(download_tmp_filename, "wb+") as f:
f.write(data.content)
finally:
# Always release the download set lock, even if there's an error.
with AdapterManager.download_set_lock:
AdapterManager.current_download_hashes.discard(params_hash)
logging.info(f"{uri} downloaded. Returning.")
return str(download_tmp_filename)
return download_fn
@staticmethod
def _create_caching_done_callback(
cache_key: CachingAdapter.CachedDataKey, params: Tuple[Any, ...]
) -> Callable[[Result], None]:
"""
Create a function to let the caching_adapter ingest new data.
:param cache_key: the cache key to ingest.
:param params: the parameters to uniquely identify the cached item.
"""
def future_finished(f: Result):
assert AdapterManager._instance
assert AdapterManager._instance.caching_adapter
AdapterManager._instance.caching_adapter.ingest_new_data(
cache_key, params, f.result(),
)
return future_finished
@staticmethod
def _get_scheme() -> str:
# TODO eventually this will come from the players
assert AdapterManager._instance
scheme_priority = ("https", "http")
schemes = sorted(
AdapterManager._instance.ground_truth_adapter.supported_schemes,
key=scheme_priority.index,
)
return list(schemes)[0]
R = TypeVar("R")
@staticmethod
def _get_from_cache_or_ground_truth(
function_name: str,
*args: Any,
cache_key: CachingAdapter.CachedDataKey = None,
before_download: Callable[[], None] = None,
use_ground_truth_adapter: bool = False,
allow_download: bool = True,
on_result_finished: Callable[[Result], None] = None,
**kwargs: Any,
) -> Result[R]:
"""
Get data from one of the adapters.
:param function_name: The function to call on the adapter.
:param args: The arguments to pass to the adapter function (also used for the
cache parameter tuple to uniquely identify the request).
:param cache_key: The cache key to use to invalidate caches and ingest caches.
:param before_download: Function to call before doing a network request.
:param allow_download: Whether or not to allow a network request to retrieve the
data.
:param on_result_finished: A function to run after the result received from the
ground truth adapter. (Has no effect if the result is from the caching
adapter.)
:param kwargs: The keyword arguments to pass to the adapter function.
"""
assert AdapterManager._instance
logging.info(f"START: {function_name}")
partial_data = None
if AdapterManager._can_use_cache(use_ground_truth_adapter, function_name):
assert AdapterManager._instance.caching_adapter
try:
logging.info(f"END: TRY SERVE FROM CACHE: {function_name}")
return Result(
getattr(AdapterManager._instance.caching_adapter, function_name)(
*args, **kwargs
)
)
except CacheMissError as e:
partial_data = e.partial_data
logging.info(f"Cache Miss on {function_name}.")
except Exception:
logging.exception(f"Error on {function_name} retrieving from cache.")
if (
cache_key
and AdapterManager._instance.caching_adapter
and use_ground_truth_adapter
):
AdapterManager._instance.caching_adapter.invalidate_data(cache_key, args)
# TODO don't short circuit if not allow_download because it could be the
# filesystem adapter.
if not allow_download or not AdapterManager._ground_truth_can_do(function_name):
logging.info(f"END: NO DOWNLOAD: {function_name}")
if partial_data:
logging.debug("partial_data exists, returning", partial_data)
return Result(cast(AdapterManager.R, partial_data))
raise Exception(f"No adapters can service {function_name} at the moment.")
result: Result[AdapterManager.R] = AdapterManager._create_ground_truth_result(
function_name, *args, before_download=before_download, **kwargs,
)
if AdapterManager._instance.caching_adapter:
if cache_key:
result.add_done_callback(
AdapterManager._create_caching_done_callback(cache_key, args)
)
if on_result_finished:
result.add_done_callback(on_result_finished)
logging.info(f"END: {function_name}")
logging.debug(result)
return result
# TODO abstract more stuff
# Usage and Availability Properties
# ==================================================================================
@staticmethod
def can_get_playlists() -> bool:
return AdapterManager._any_adapter_can_do("get_playlists")
@staticmethod
def can_get_playlist_details() -> bool:
return AdapterManager._any_adapter_can_do("get_playlist_details")
@staticmethod
def can_create_playlist() -> bool:
return AdapterManager._any_adapter_can_do("create_playlist")
@staticmethod
def can_update_playlist() -> bool:
return AdapterManager._any_adapter_can_do("update_playlist")
@staticmethod
def can_delete_playlist() -> bool:
return AdapterManager._any_adapter_can_do("delete_playlist")
@staticmethod
def can_get_song_filename_or_stream() -> bool:
return AdapterManager._any_adapter_can_do("get_song_uri")
@staticmethod
def can_batch_download_songs() -> bool:
# We can only download from the ground truth adapter.
return AdapterManager._ground_truth_can_do("get_song_uri")
@staticmethod
def can_get_genres() -> bool:
return AdapterManager._any_adapter_can_do("get_genres")
@staticmethod
def can_scrobble_song() -> bool:
return AdapterManager._any_adapter_can_do("scrobble_song")
@staticmethod
def can_get_artists() -> bool:
return AdapterManager._any_adapter_can_do("get_artists")
@staticmethod
def can_get_artist() -> bool:
return AdapterManager._any_adapter_can_do("get_artist")
@staticmethod
def can_get_play_queue() -> bool:
return AdapterManager._ground_truth_can_do("get_play_queue")
@staticmethod
def can_save_play_queue() -> bool:
return AdapterManager._ground_truth_can_do("save_play_queue")
@staticmethod
def can_search() -> bool:
return AdapterManager._any_adapter_can_do("search")
# Data Retrieval Methods
# ==================================================================================
@staticmethod
def get_playlists(
before_download: Callable[[], None] = lambda: None,
force: bool = False, # TODO: rename to use_ground_truth_adapter?
allow_download: bool = True,
) -> Result[Sequence[Playlist]]:
return AdapterManager._get_from_cache_or_ground_truth(
"get_playlists",
cache_key=CachingAdapter.CachedDataKey.PLAYLISTS,
before_download=before_download,
use_ground_truth_adapter=force,
allow_download=allow_download,
)
@staticmethod
def get_playlist_details(
playlist_id: str,
before_download: Callable[[], None] = lambda: None,
force: bool = False, # TODO: rename to use_ground_truth_adapter?
allow_download: bool = True,
) -> Result[PlaylistDetails]:
return AdapterManager._get_from_cache_or_ground_truth(
"get_playlist_details",
playlist_id,
cache_key=CachingAdapter.CachedDataKey.PLAYLIST_DETAILS,
before_download=before_download,
use_ground_truth_adapter=force,
allow_download=allow_download,
)
@staticmethod
def create_playlist(
name: str, songs: Sequence[Song] = None
) -> Result[Optional[PlaylistDetails]]:
def on_result_finished(f: Result[Optional[PlaylistDetails]]):
assert AdapterManager._instance
assert AdapterManager._instance.caching_adapter
if playlist := f.result():
AdapterManager._instance.caching_adapter.ingest_new_data(
CachingAdapter.CachedDataKey.PLAYLIST_DETAILS,
(playlist.id,),
playlist,
)
else:
AdapterManager._instance.caching_adapter.invalidate_data(
CachingAdapter.CachedDataKey.PLAYLISTS, ()
)
return AdapterManager._get_from_cache_or_ground_truth(
"create_playlist",
name,
songs=songs,
on_result_finished=on_result_finished,
use_ground_truth_adapter=True,
)
@staticmethod
def update_playlist(
playlist_id: str,
name: str = None,
comment: str = None,
public: bool = False,
song_ids: Sequence[str] = None,
append_song_ids: Sequence[str] = None,
before_download: Callable[[], None] = lambda: None,
) -> Result[PlaylistDetails]:
return AdapterManager._get_from_cache_or_ground_truth(
"update_playlist",
playlist_id,
name=name,
comment=comment,
public=public,
song_ids=song_ids,
append_song_ids=append_song_ids,
before_download=before_download,
use_ground_truth_adapter=True,
cache_key=CachingAdapter.CachedDataKey.PLAYLIST_DETAILS,
)
@staticmethod
def delete_playlist(playlist_id: str):
# TODO: make non-blocking?
assert AdapterManager._instance
AdapterManager._instance.ground_truth_adapter.delete_playlist(playlist_id)
if AdapterManager._instance.caching_adapter:
AdapterManager._instance.caching_adapter.delete_data(
CachingAdapter.CachedDataKey.PLAYLIST_DETAILS, (playlist_id,)
)
# TODO allow this to take a set of schemes and unify with get_cover_art_filename
@staticmethod
def get_cover_art_uri(cover_art_id: str = None) -> str:
assert AdapterManager._instance
if (
not AdapterManager._ground_truth_can_do("get_cover_art_uri")
or not cover_art_id
):
return ""
return AdapterManager._instance.ground_truth_adapter.get_cover_art_uri(
cover_art_id, AdapterManager._get_scheme()
)
@staticmethod
def get_cover_art_filename(
cover_art_id: str = None,
before_download: Callable[[], None] = lambda: None,
force: bool = False, # TODO: rename to use_ground_truth_adapter?
allow_download: bool = True,
) -> Result[str]: # TODO: convert to return bytes?
existing_cover_art_filename = str(
Path(__file__).parent.joinpath("images/default-album-art.png")
)
if cover_art_id is None:
return Result(existing_cover_art_filename)
assert AdapterManager._instance
# There could be partial data if the cover art exists, but for some reason was
# marked out-of-date.
if AdapterManager._can_use_cache(force, "get_cover_art_uri"):
assert AdapterManager._instance.caching_adapter
try:
return Result(
AdapterManager._instance.caching_adapter.get_cover_art_uri(
cover_art_id, "file"
)
)
except CacheMissError as e:
if e.partial_data is not None:
existing_cover_art_filename = cast(str, e.partial_data)
logging.info(f'Cache Miss on {"get_cover_art_uri"}.')
except Exception:
logging.exception(
f'Error on {"get_cover_art_uri"} retrieving from cache.'
)
if not allow_download:
return Result(existing_cover_art_filename)
if AdapterManager._instance.caching_adapter and force:
AdapterManager._instance.caching_adapter.invalidate_data(
CachingAdapter.CachedDataKey.COVER_ART_FILE, (cover_art_id,)
)
if not AdapterManager._ground_truth_can_do("get_cover_art_uri"):
return Result(existing_cover_art_filename)
# TODO: make _get_from_cache_or_ground_truth work with downloading
if before_download:
before_download()
future: Result[str] = Result(
AdapterManager._create_download_fn(
AdapterManager._instance.ground_truth_adapter.get_cover_art_uri(
cover_art_id, AdapterManager._get_scheme()
),
util.params_hash("cover_art", cover_art_id),
),
is_download=True,
default_value=existing_cover_art_filename,
)
if AdapterManager._instance.caching_adapter:
future.add_done_callback(
AdapterManager._create_caching_done_callback(
CachingAdapter.CachedDataKey.COVER_ART_FILE, (cover_art_id,),
)
)
return future
# TODO allow this to take a set of schemes
@staticmethod
def get_song_filename_or_stream(
song: Song, format: str = None, force_stream: bool = False,
) -> Tuple[str, bool]: # TODO probably don't need to return a tuple anymore
assert AdapterManager._instance
cached_song_filename = None
if AdapterManager._can_use_cache(force_stream, "get_song_uri"):
assert AdapterManager._instance.caching_adapter
try:
return (
AdapterManager._instance.caching_adapter.get_song_uri(
song.id, "file"
),
False,
)
except CacheMissError as e:
if e.partial_data is not None:
cached_song_filename = cast(str, e.partial_data)
logging.info(f'Cache Miss on {"get_song_filename_or_stream"}.')
except Exception:
logging.exception(
f'Error on {"get_song_filename_or_stream"} retrieving from cache.'
)
if not AdapterManager._ground_truth_can_do("get_song_uri"):
if force_stream or cached_song_filename is None:
raise Exception("Can't stream the song.")
return (cached_song_filename, False)
# TODO implement subsonic extension to get the hash of the song and compare
# here. That way of the cache gets blown away, but not the song files, it will
# not have to re-download.
if force_stream and not AdapterManager._ground_truth_can_do("stream"):
raise Exception("Can't stream the song.")
return (
AdapterManager._instance.ground_truth_adapter.get_song_uri(
song.id, AdapterManager._get_scheme(), stream=True,
),
True,
)
@staticmethod
def batch_download_songs(
song_ids: Sequence[str],
before_download: Callable[[str], None],
on_song_download_complete: Callable[[str], None],
one_at_a_time: bool = False,
delay: float = 0.0,
) -> Result[None]:
assert AdapterManager._instance
# This only really makes sense if we have a caching_adapter.
if not AdapterManager._instance.caching_adapter:
return Result(None)
cancelled = False
def do_download_song(song_id: str):
if AdapterManager.is_shutting_down or cancelled:
return
assert AdapterManager._instance
assert AdapterManager._instance.caching_adapter
logging.info(f"Downloading {song_id}")
try:
# Download the actual song file.
try:
# If the song file is already cached, return immediately.
AdapterManager._instance.caching_adapter.get_song_uri(
song_id, "file"
)
except CacheMissError:
# The song is not already cached.
if before_download:
before_download(song_id)
# Download the song.
song_tmp_filename = AdapterManager._create_download_fn(
AdapterManager._instance.ground_truth_adapter.get_song_uri(
song_id, AdapterManager._get_scheme()
),
util.params_hash("song", song_id),
)()
AdapterManager._instance.caching_adapter.ingest_new_data(
CachingAdapter.CachedDataKey.SONG_FILE,
(song_id,),
song_tmp_filename,
)
on_song_download_complete(song_id)
# Download the corresponding cover art.
song = AdapterManager.get_song_details(song_id).result()
AdapterManager.get_cover_art_filename(song.cover_art).result()
finally:
# Release the semaphore lock. This will allow the next song in the queue
# to be downloaded. I'm doing this in the finally block so that it
# always runs, regardless of whether an exception is thrown or the
# function returns.
AdapterManager._instance.download_limiter_semaphore.release()
def do_batch_download_songs():
sleep(delay)
for song_id in song_ids:
if cancelled:
return
# Only allow a certain number of songs to be downloaded
# simultaneously.
AdapterManager._instance.download_limiter_semaphore.acquire()
# Prevents further songs from being downloaded.
if AdapterManager.is_shutting_down:
break
result = Result(do_download_song, song_id, is_download=True)
if one_at_a_time:
# Wait the file to download.
result.result()
def on_cancel():
nonlocal cancelled
cancelled = True
return Result(do_batch_download_songs, is_download=True, on_cancel=on_cancel)
@staticmethod
def batch_delete_cached_songs(
song_ids: Sequence[str], on_song_delete: Callable[[str], None]
):
assert AdapterManager._instance
# This only really makes sense if we have a caching_adapter.
if not AdapterManager._instance.caching_adapter:
return
for song_id in song_ids:
AdapterManager._instance.caching_adapter.delete_data(
CachingAdapter.CachedDataKey.SONG_FILE, (song_id,)
)
on_song_delete(song_id)
@staticmethod
def get_song_details(
song_id: str,
allow_download: bool = True,
before_download: Callable[[], None] = lambda: None,
force: bool = False,
) -> Result[Song]:
return AdapterManager._get_from_cache_or_ground_truth(
"get_song_details",
song_id,
allow_download=allow_download,
before_download=before_download,
use_ground_truth_adapter=force,
cache_key=CachingAdapter.CachedDataKey.SONG_DETAILS,
)
@staticmethod
def get_genres(force: bool = False) -> Result[Sequence[Genre]]:
return AdapterManager._get_from_cache_or_ground_truth(
"get_genres",
use_ground_truth_adapter=force,
cache_key=CachingAdapter.CachedDataKey.GENRES,
)
@staticmethod
def scrobble_song(song: Song):
assert AdapterManager._instance
AdapterManager._create_ground_truth_result("scrobble_song", song)
@staticmethod
def get_artists(
force: bool = False, before_download: Callable[[], None] = lambda: None
) -> Result[Sequence[Artist]]:
def do_get_artists() -> Sequence[Artist]:
artists: Sequence[Artist] = AdapterManager._get_from_cache_or_ground_truth(
"get_artists",
use_ground_truth_adapter=force,
before_download=before_download,
cache_key=CachingAdapter.CachedDataKey.ARTISTS,
).result()
ignored_articles: Set[str] = set()
if AdapterManager._any_adapter_can_do("get_ignored_articles"):
try:
ignored_articles = AdapterManager._get_from_cache_or_ground_truth(
"get_ignored_articles",
use_ground_truth_adapter=force,
cache_key=CachingAdapter.CachedDataKey.IGNORED_ARTICLES,
).result()
except Exception:
logging.exception("Failed to retrieve ignored_articles")
def strip_ignored_articles(artist: Artist) -> str:
name_parts = artist.name.split()
if name_parts[0] in ignored_articles:
name_parts = name_parts[1:]
return " ".join(name_parts)
return sorted(artists, key=strip_ignored_articles)
return Result(do_get_artists)
@staticmethod
def get_artist(
artist_id: str,
before_download: Callable[[], None] = lambda: None,
force: bool = False,
) -> Result[Artist]:
def on_result_finished(f: Result[Artist]):
if not force:
return
assert AdapterManager._instance
assert AdapterManager._instance.caching_adapter
if artist := f.result():
for album in artist.albums or []:
AdapterManager._instance.caching_adapter.invalidate_data(
CachingAdapter.CachedDataKey.ALBUM, (album.id,)
)
return AdapterManager._get_from_cache_or_ground_truth(
"get_artist",
artist_id,
before_download=before_download,
use_ground_truth_adapter=force,
cache_key=CachingAdapter.CachedDataKey.ARTIST,
on_result_finished=on_result_finished,
)
# Albums
@staticmethod
def get_albums(
before_download: Callable[[], None] = lambda: None, force: bool = False,
) -> Result[Sequence[Album]]:
return AdapterManager._get_from_cache_or_ground_truth(
"get_albums",
cache_key=CachingAdapter.CachedDataKey.ALBUMS,
before_download=before_download,
use_ground_truth_adapter=force,
)
@staticmethod
def get_album(
album_id: str,
before_download: Callable[[], None] = lambda: None,
force: bool = False,
) -> Result[Album]:
return AdapterManager._get_from_cache_or_ground_truth(
"get_album",
album_id,
before_download=before_download,
use_ground_truth_adapter=force,
cache_key=CachingAdapter.CachedDataKey.ALBUM,
)
@staticmethod
def get_play_queue() -> Result[Optional[PlayQueue]]:
assert AdapterManager._instance
future: Result[
Optional[PlayQueue]
] = AdapterManager._create_ground_truth_result("get_play_queue")
if AdapterManager._instance.caching_adapter:
def future_finished(f: Result):
assert AdapterManager._instance
assert AdapterManager._instance.caching_adapter
if play_queue := f.result():
for song in play_queue.songs:
AdapterManager._instance.caching_adapter.ingest_new_data(
CachingAdapter.CachedDataKey.SONG_DETAILS, (song.id,), song
)
future.add_done_callback(future_finished)
return future
@staticmethod
def save_play_queue(
song_ids: Sequence[int],
current_song_index: int = None,
position: timedelta = None,
):
assert AdapterManager._instance
AdapterManager._create_ground_truth_result(
"save_play_queue",
song_ids,
current_song_index=current_song_index,
position=position,
)
@staticmethod
def search(
query: str,
search_callback: Callable[[SearchResult], None],
before_download: Callable[[], None] = lambda: None,
) -> Result[bool]:
if query == "":
search_callback(SearchResult(""))
return Result(True)
before_download()
# Keep track of if the result is cancelled and if it is, then don't do anything
# with any results.
cancelled = False
# This function actually does the search and calls the search_callback when each
# of the futures completes. Returns whether or not it was cancelled.
def do_search() -> bool:
# Sleep for a little while before returning the local results. They are less
# expensive to retrieve (but they still incur some overhead due to the GTK
# UI main loop queue).
sleep(0.3)
if cancelled:
logging.info(f"Cancelled query {query} before caching adapter")
return False
assert AdapterManager._instance
# Caching Adapter Results
search_result = SearchResult(query)
if AdapterManager._can_use_cache(False, "search"):
assert AdapterManager._instance.caching_adapter
try:
logging.info(
f"Returning caching adapter search results for '{query}'."
)
search_result.update(
AdapterManager._instance.caching_adapter.search(query)
)
search_callback(search_result)
except Exception:
logging.exception("Error on caching adapter search")
if not AdapterManager._ground_truth_can_do("search"):
return False
# Wait longer to see if the user types anything else so we don't peg the
# server with tons of requests.
sleep(
1 if AdapterManager._instance.ground_truth_adapter.is_networked else 0.2
)
if cancelled:
logging.info(f"Cancelled query {query} before server results")
return False
try:
ground_truth_search_results = AdapterManager._instance.ground_truth_adapter.search( # noqa: E501
query
)
search_result.update(ground_truth_search_results)
search_callback(search_result)
except Exception:
logging.exception(
"Failed getting search results from server for query '{query}'"
)
if AdapterManager._instance.caching_adapter:
AdapterManager._instance.caching_adapter.ingest_new_data(
CachingAdapter.CachedDataKey.SEARCH_RESULTS,
(),
ground_truth_search_results,
)
return True
# When the future is cancelled (this will happen if a new search is created),
# set cancelled to True so that the search function can abort.
def on_cancel():
nonlocal cancelled
cancelled = True
return Result(do_search, on_cancel=on_cancel)
# Cache Status Methods
# ==================================================================================
@staticmethod
def get_cached_status(song: Song) -> SongCacheStatus:
assert AdapterManager._instance
if not AdapterManager._instance.caching_adapter:
return SongCacheStatus.NOT_CACHED
if util.params_hash("song", song.id) in AdapterManager.current_download_hashes:
return SongCacheStatus.DOWNLOADING
return AdapterManager._instance.caching_adapter.get_cached_status(song)