import os import glob import threading import shutil import json import hashlib from collections import defaultdict from time import sleep from concurrent.futures import ThreadPoolExecutor, Future from enum import EnumMeta, Enum from datetime import datetime from pathlib import Path from typing import ( Any, List, Optional, Union, Callable, Set, DefaultDict, Tuple, ) import requests from libremsonic.config import AppConfiguration, ServerConfiguration from libremsonic.server import Server from libremsonic.server.api_object import APIObject from libremsonic.server.api_objects import ( Playlist, PlaylistWithSongs, Child, # Non-ID3 versions Artist, ArtistInfo, Directory, # ID3 versions ArtistID3, ArtistInfo2, ArtistWithAlbumsID3, AlbumWithSongsID3, ) class Singleton(type): def __getattr__(cls, name): if not CacheManager._instance: return None # If the cache has a function to do the thing we want, use it. If # not, then go directly to the server (this is useful for things # that just send data to the server.) if hasattr(CacheManager._instance, name): return getattr(CacheManager._instance, name) else: return getattr(CacheManager._instance.server, name) return None class SongCacheStatus(Enum): NOT_CACHED = 0 CACHED = 1 PERMANENTLY_CACHED = 2 DOWNLOADING = 3 class CacheManager(metaclass=Singleton): executor: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=50) should_exit: bool = False class CacheEncoder(json.JSONEncoder): def default(self, obj): if type(obj) == datetime: return int(obj.timestamp() * 1000) elif type(obj) == set: return list(obj) elif isinstance(obj, APIObject): return {k: v for k, v in obj.__dict__.items() if v is not None} elif isinstance(obj, EnumMeta): return None return json.JSONEncoder.default(self, obj) class __CacheManagerInternal: # Thread lock for preventing threads from overriding the state while # it's being saved. cache_lock = threading.Lock() cache: DefaultDict[str, Any] = defaultdict(dict) permanently_cached_paths: Set[str] = set() # The server instance. server: Server browse_by_tags: bool download_set_lock = threading.Lock() current_downloads: Set[Path] = set() # TODO make this configurable. download_limiter_semaphore = threading.Semaphore(5) def __init__( self, app_config: AppConfiguration, server_config: ServerConfiguration, ): self.app_config = app_config self.browse_by_tags = self.app_config.servers[ self.app_config.current_server].browse_by_tags self.server = Server( name=server_config.name, hostname=server_config.server_address, username=server_config.username, password=server_config.password, ) self.load_cache_info() def load_cache_info(self): cache_meta_file = self.calculate_abs_path('.cache_meta') if not cache_meta_file.exists(): return with open(cache_meta_file, 'r') as f: try: meta_json = json.load(f) except json.decoder.JSONDecodeError: return cache_configs = [ # Playlists ('playlists', Playlist, list), ('playlist_details', PlaylistWithSongs, dict), ('song_details', Child, dict), # Non-ID3 caches ('albums', Child, list), ('album_details', Child, dict), ('artists', Artist, list), ('artist_details', Directory, dict), ('artist_infos', ArtistInfo, dict), # ID3 caches ('albums_id3', AlbumWithSongsID3, list), ('album_details_id3', AlbumWithSongsID3, dict), ('artists_id3', ArtistID3, list), ('artist_details_id3', ArtistWithAlbumsID3, dict), ('artist_infos_id3', ArtistInfo2, dict), ] for name, type_name, default in cache_configs: if default == list: self.cache[name] = [ type_name.from_json(x) for x in meta_json.get(name, []) ] elif default == dict: self.cache[name] = { id: type_name.from_json(x) for id, x in meta_json.get(name, {}).items() } def save_cache_info(self): os.makedirs(self.app_config.cache_location, exist_ok=True) cache_meta_file = self.calculate_abs_path('.cache_meta') with open(cache_meta_file, 'w+') as f, self.cache_lock: f.write( json.dumps(self.cache, indent=2, cls=CacheManager.CacheEncoder)) def save_file(self, absolute_path: Path, data: bytes): # Make the necessary directories and write to file. os.makedirs(absolute_path.parent, exist_ok=True) with open(absolute_path, 'wb+') as f: f.write(data) def calculate_abs_path(self, *relative_paths): return Path( self.app_config.cache_location).joinpath(*relative_paths) def calculate_download_path(self, *relative_paths): """ Determine where to temporarily put the file as it is downloading. """ xdg_cache_home = (os.environ.get('XDG_CACHE_HOME') or os.path.expanduser('~/.cache')) return Path(xdg_cache_home).joinpath('libremsonic', *relative_paths) def return_cached_or_download( self, relative_path: Union[Path, str], download_fn: Callable[[], bytes], before_download: Callable[[], None] = lambda: None, force: bool = False, ): abs_path = self.calculate_abs_path(relative_path) download_path = self.calculate_download_path(relative_path) if not abs_path.exists() or force: resource_downloading = False with self.download_set_lock: if abs_path in self.current_downloads: resource_downloading = True self.current_downloads.add(abs_path) if resource_downloading: print(abs_path, 'already being downloaded.') # The resource is already being downloaded. Busy loop until # it has completed. Then, just return the path to the # resource. while abs_path in self.current_downloads: sleep(0.5) return str(abs_path) else: print(abs_path, 'not found. Downloading...') os.makedirs(download_path.parent, exist_ok=True) before_download() self.save_file(download_path, download_fn()) # Move the file to its cache download location. os.makedirs(abs_path.parent, exist_ok=True) if download_path.exists(): shutil.move(download_path, abs_path) with self.download_set_lock: self.current_downloads.discard(abs_path) print(abs_path, 'downloaded. Returning.') return str(abs_path) def delete_cached(self, relative_path: Union[Path, str]): """ :param relative_path: The path to the cached element to delete. Note that this can be a globed path. """ abs_path = self.calculate_abs_path(relative_path) for path in glob.glob(str(abs_path)): Path(path).unlink() def get_playlists( self, before_download: Callable[[], None] = lambda: None, force: bool = False, ) -> Future: def do_get_playlists() -> List[Playlist]: if not self.cache.get('playlists') or force: before_download() playlists = self.server.get_playlists().playlist with self.cache_lock: self.cache['playlists'] = playlists self.save_cache_info() return self.cache['playlists'] return CacheManager.executor.submit(do_get_playlists) def get_playlist( self, playlist_id: int, before_download: Callable[[], None] = lambda: None, force: bool = False, ) -> Future: def do_get_playlist() -> PlaylistWithSongs: cache_name = "playlist_details" if playlist_id not in self.cache.get(cache_name) or force: before_download() playlist = self.server.get_playlist(playlist_id) with self.cache_lock: self.cache['playlist_details'][playlist_id] = playlist # Playlists have the song details, so save those too. for song in (playlist.entry or []): self.cache['song_details'][song.id] = song self.save_cache_info() playlist_details = self.cache['playlist_details'][playlist_id] # Invalidate the cached photo if we are forcing a retrieval # from the server. if force: self.delete_cached( f'cover_art/{playlist_details.coverArt}_*') return playlist_details return CacheManager.executor.submit(do_get_playlist) def get_artists( self, before_download: Callable[[], None] = lambda: None, force: bool = False, ) -> Future: def do_get_artists() -> List[Union[Artist, ArtistID3]]: cache_name = 'artists' + ('_id3' if self.browse_by_tags else '') server_fn = (self.server.get_artists if self.browse_by_tags else self.server.get_indexes) if not self.cache.get(cache_name) or force: before_download() raw_artists = server_fn() artists: List[Union[Artist, ArtistID3]] = [] for index in raw_artists.index: artists.extend(index.artist) with self.cache_lock: self.cache[cache_name] = artists self.save_cache_info() return self.cache[cache_name] return CacheManager.executor.submit(do_get_artists) def get_artist( self, artist_id, before_download: Callable[[], None] = lambda: None, force: bool = False, ) -> Future: def do_get_artist() -> Union[ArtistWithAlbumsID3, Child]: # TODO: implement the non-ID3 version cache_name = f"artist_details{'_id3' if self.browse_by_tags else ''}" server_fn = (self.server.get_artist if self.browse_by_tags else self.server.get_music_directory) if artist_id not in self.cache.get(cache_name, {}) or force: before_download() artist = server_fn(artist_id) with self.cache_lock: self.cache[cache_name][artist_id] = artist self.save_cache_info() return self.cache[cache_name][artist_id] return CacheManager.executor.submit(do_get_artist) def get_artist_info( self, artist_id, before_download: Callable[[], None] = lambda: None, force: bool = False, ) -> Future: def do_get_artist_info() -> Union[ArtistInfo, ArtistInfo2]: cache_name = f"artist_infos{'_id3' if self.browse_by_tags else ''}" server_fn = (self.server.get_artist_info2 if self.browse_by_tags else self.server.get_artist_info) if artist_id not in self.cache.get(cache_name, {}) or force: before_download() artist_info = server_fn(id=artist_id) if artist_info: with self.cache_lock: self.cache[cache_name][artist_id] = artist_info self.save_cache_info() return self.cache[cache_name][artist_id] return CacheManager.executor.submit(do_get_artist_info) def get_artist_artwork( self, artist: Union[Artist, ArtistID3], before_download: Callable[[], None] = lambda: None, force: bool = False, ) -> Future: def do_get_artist_artwork_filename() -> str: artist_info = CacheManager.get_artist_info(artist.id).result() lastfm_url = ''.join(artist_info.largeImageUrl) # If it is the placeholder LastFM URL, then just use the cover # art filename given by the server. if lastfm_url == 'https://lastfm-img2.akamaized.net/i/u/300x300/2a96cbd8b46e442fc41c2b86b821562f.png': if isinstance(artist, ArtistWithAlbumsID3): return CacheManager.get_cover_art_filename( artist.coverArt, size=300).result() elif (isinstance(artist, Directory) and len(artist.child) > 0): # Retrieve the first album's cover art return CacheManager.get_cover_art_filename( artist.child[0].coverArt, size=300).result() url_hash = hashlib.md5(lastfm_url.encode('utf-8')).hexdigest() return self.return_cached_or_download( f'cover_art/artist.{url_hash}', lambda: requests.get(lastfm_url).content, before_download=before_download, force=force, ) return CacheManager.executor.submit(do_get_artist_artwork_filename) def get_albums( self, type_: str, before_download: Callable[[], None] = lambda: None, force: bool = False, ) -> Future: def do_get_albums() -> List[Child]: cache_name = f"albums{'_id3' if self.browse_by_tags else ''}" server_fn = (self.server.get_album_list2 if self.browse_by_tags else self.server.get_album_list) if not self.cache.get(cache_name) or force: before_download() albums = server_fn(type_) with self.cache_lock: self.cache[cache_name] = albums.album self.save_cache_info() return self.cache[cache_name] return CacheManager.executor.submit(do_get_albums) def batch_delete_cached_songs( self, song_ids: List[int], on_song_delete: Callable[[], None], ) -> Future: def do_delete_cached_songs(): # Do the actual download. for song_id in song_ids: song_details_future = CacheManager.get_song_details( song_id) def filename_future_done(f): relative_path = f.result().path abs_path = self.calculate_abs_path(relative_path) if abs_path.exists(): abs_path.unlink() on_song_delete() song_details_future.add_done_callback(filename_future_done) return CacheManager.executor.submit(do_delete_cached_songs) def batch_download_songs( self, song_ids: List[int], before_download: Callable[[], None], on_song_download_complete: Callable[[int], None], ) -> Future: # TODO handle application close somehow. I think we will need to # raise some sort of an exception, not sure. def do_download_song(song_id): if CacheManager.should_exit: return # Do the actual download. song_details_future = CacheManager.get_song_details(song_id) song = song_details_future.result() self.return_cached_or_download( song.path, lambda: self.server.download(song.id), before_download=before_download, ) self.download_limiter_semaphore.release() on_song_download_complete(song_id) def do_batch_download_songs(): self.current_downloads = self.current_downloads.union( set(song_ids)) for song_id in song_ids: self.download_limiter_semaphore.acquire() CacheManager.executor.submit(do_download_song, song_id) return CacheManager.executor.submit(do_batch_download_songs) def get_cover_art_filename( self, id: str, before_download: Callable[[], None] = lambda: None, size: Union[str, int] = 200, force: bool = False, ) -> Future: def do_get_cover_art_filename() -> str: return self.return_cached_or_download( f'cover_art/{id}_{size}', lambda: self.server.get_cover_art(id, str(size)), before_download=before_download, force=force, ) return CacheManager.executor.submit(do_get_cover_art_filename) def get_song_details( self, song_id: int, before_download: Callable[[], None] = lambda: None, force: bool = False, ) -> Future: def do_get_song_details() -> Child: if not self.cache['song_details'].get(song_id) or force: before_download() song_details = self.server.get_song(song_id) with self.cache_lock: self.cache['song_details'][song_id] = song_details self.save_cache_info() return self.cache['song_details'][song_id] return CacheManager.executor.submit(do_get_song_details) def get_play_queue(self) -> Future: return CacheManager.executor.submit(self.server.get_play_queue) def get_song_filename_or_stream( self, song: Child, format=None, force_stream: bool = False, ) -> Tuple[str, bool]: abs_path = self.calculate_abs_path(song.path) if not abs_path.exists() or force_stream: return ( self.server.get_stream_url(song.id, format=format), True, ) return (str(abs_path), False) def get_cached_status(self, song: Child) -> SongCacheStatus: cache_path = self.calculate_abs_path(song.path) if cache_path.exists(): if cache_path in self.permanently_cached_paths: return SongCacheStatus.PERMANENTLY_CACHED else: return SongCacheStatus.CACHED elif cache_path in self.current_downloads: return SongCacheStatus.DOWNLOADING else: return SongCacheStatus.NOT_CACHED _instance: Optional[__CacheManagerInternal] = None def __init__(self, server_config: ServerConfiguration): raise Exception('Do not instantiate the CacheManager.') @classmethod def reset(cls, app_config, server_config): CacheManager._instance = CacheManager.__CacheManagerInternal( app_config, server_config)