Files
sublime-music/libremsonic/cache_manager.py
2019-08-07 23:13:47 -06:00

597 lines
22 KiB
Python

import os
import glob
import threading
import shutil
import json
import hashlib
from collections import defaultdict
from time import sleep
from concurrent.futures import ThreadPoolExecutor, Future
from enum import EnumMeta, Enum
from datetime import datetime
from pathlib import Path
from typing import (
Any,
List,
Optional,
Union,
Callable,
Set,
DefaultDict,
Tuple,
)
import requests
from libremsonic.config import AppConfiguration, ServerConfiguration
from libremsonic.server import Server
from libremsonic.server.api_object import APIObject
from libremsonic.server.api_objects import (
Playlist,
PlaylistWithSongs,
Child,
# Non-ID3 versions
Artist,
ArtistInfo,
Directory,
# ID3 versions
ArtistID3,
ArtistInfo2,
ArtistWithAlbumsID3,
AlbumWithSongsID3,
)
class Singleton(type):
def __getattr__(cls, name):
if not CacheManager._instance:
return None
# If the cache has a function to do the thing we want, use it. If
# not, then go directly to the server (this is useful for things
# that just send data to the server.)
if hasattr(CacheManager._instance, name):
return getattr(CacheManager._instance, name)
else:
return getattr(CacheManager._instance.server, name)
return None
class SongCacheStatus(Enum):
NOT_CACHED = 0
CACHED = 1
PERMANENTLY_CACHED = 2
DOWNLOADING = 3
class CacheManager(metaclass=Singleton):
executor: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=50)
should_exit: bool = False
class CacheEncoder(json.JSONEncoder):
def default(self, obj):
if type(obj) == datetime:
return int(obj.timestamp() * 1000)
elif type(obj) == set:
return list(obj)
elif isinstance(obj, APIObject):
return {k: v for k, v in obj.__dict__.items() if v is not None}
elif isinstance(obj, EnumMeta):
return None
return json.JSONEncoder.default(self, obj)
class __CacheManagerInternal:
# Thread lock for preventing threads from overriding the state while
# it's being saved.
cache_lock = threading.Lock()
cache: DefaultDict[str, Any] = defaultdict(dict)
permanently_cached_paths: Set[str] = set()
# The server instance.
server: Server
browse_by_tags: bool
download_set_lock = threading.Lock()
current_downloads: Set[Path] = set()
# TODO make this configurable.
download_limiter_semaphore = threading.Semaphore(5)
def __init__(
self,
app_config: AppConfiguration,
server_config: ServerConfiguration,
):
self.app_config = app_config
self.browse_by_tags = self.app_config.servers[
self.app_config.current_server].browse_by_tags
self.server = Server(
name=server_config.name,
hostname=server_config.server_address,
username=server_config.username,
password=server_config.password,
)
self.load_cache_info()
def load_cache_info(self):
cache_meta_file = self.calculate_abs_path('.cache_meta')
if not cache_meta_file.exists():
return
with open(cache_meta_file, 'r') as f:
try:
meta_json = json.load(f)
except json.decoder.JSONDecodeError:
return
cache_configs = [
# Playlists
('playlists', Playlist, list),
('playlist_details', PlaylistWithSongs, dict),
('song_details', Child, dict),
# Non-ID3 caches
('albums', Child, list),
('album_details', Directory, dict),
('artists', Artist, list),
('artist_details', Directory, dict),
('artist_infos', ArtistInfo, dict),
# ID3 caches
('albums_id3', AlbumWithSongsID3, list),
('album_details_id3', AlbumWithSongsID3, dict),
('artists_id3', ArtistID3, list),
('artist_details_id3', ArtistWithAlbumsID3, dict),
('artist_infos_id3', ArtistInfo2, dict),
]
for name, type_name, default in cache_configs:
if default == list:
self.cache[name] = [
type_name.from_json(x)
for x in meta_json.get(name, [])
]
elif default == dict:
self.cache[name] = {
id: type_name.from_json(x)
for id, x in meta_json.get(name, {}).items()
}
def save_cache_info(self):
os.makedirs(self.app_config.cache_location, exist_ok=True)
cache_meta_file = self.calculate_abs_path('.cache_meta')
with open(cache_meta_file, 'w+') as f, self.cache_lock:
f.write(
json.dumps(self.cache,
indent=2,
cls=CacheManager.CacheEncoder))
def save_file(self, absolute_path: Path, data: bytes):
# Make the necessary directories and write to file.
os.makedirs(absolute_path.parent, exist_ok=True)
with open(absolute_path, 'wb+') as f:
f.write(data)
def calculate_abs_path(self, *relative_paths):
return Path(
self.app_config.cache_location).joinpath(*relative_paths)
def calculate_download_path(self, *relative_paths):
"""
Determine where to temporarily put the file as it is downloading.
"""
xdg_cache_home = (os.environ.get('XDG_CACHE_HOME')
or os.path.expanduser('~/.cache'))
return Path(xdg_cache_home).joinpath('libremsonic',
*relative_paths)
def return_cached_or_download(
self,
relative_path: Union[Path, str],
download_fn: Callable[[], bytes],
before_download: Callable[[], None] = lambda: None,
force: bool = False,
):
abs_path = self.calculate_abs_path(relative_path)
download_path = self.calculate_download_path(relative_path)
if not abs_path.exists() or force:
before_download()
resource_downloading = False
with self.download_set_lock:
if abs_path in self.current_downloads:
resource_downloading = True
self.current_downloads.add(abs_path)
if resource_downloading:
print(abs_path, 'already being downloaded.')
# The resource is already being downloaded. Busy loop until
# it has completed. Then, just return the path to the
# resource.
while abs_path in self.current_downloads:
sleep(0.5)
return str(abs_path)
else:
print(abs_path, 'not found. Downloading...')
os.makedirs(download_path.parent, exist_ok=True)
self.save_file(download_path, download_fn())
# Move the file to its cache download location.
os.makedirs(abs_path.parent, exist_ok=True)
if download_path.exists():
shutil.move(download_path, abs_path)
with self.download_set_lock:
self.current_downloads.discard(abs_path)
print(abs_path, 'downloaded. Returning.')
return str(abs_path)
def delete_cached(self, relative_path: Union[Path, str]):
"""
:param relative_path: The path to the cached element to delete.
Note that this can be a globed path.
"""
abs_path = self.calculate_abs_path(relative_path)
for path in glob.glob(str(abs_path)):
Path(path).unlink()
def get_playlists(
self,
before_download: Callable[[], None] = lambda: None,
force: bool = False,
) -> Future:
def do_get_playlists() -> List[Playlist]:
if not self.cache.get('playlists') or force:
before_download()
playlists = self.server.get_playlists().playlist
with self.cache_lock:
self.cache['playlists'] = playlists
self.save_cache_info()
return self.cache['playlists']
return CacheManager.executor.submit(do_get_playlists)
def get_playlist(
self,
playlist_id: int,
before_download: Callable[[], None] = lambda: None,
force: bool = False,
) -> Future:
def do_get_playlist() -> PlaylistWithSongs:
cache_name = "playlist_details"
if playlist_id not in self.cache.get(cache_name) or force:
before_download()
playlist = self.server.get_playlist(playlist_id)
with self.cache_lock:
self.cache['playlist_details'][playlist_id] = playlist
# Playlists have the song details, so save those too.
for song in (playlist.entry or []):
self.cache['song_details'][song.id] = song
self.save_cache_info()
playlist_details = self.cache['playlist_details'][playlist_id]
# Invalidate the cached photo if we are forcing a retrieval
# from the server.
if force:
self.delete_cached(
f'cover_art/{playlist_details.coverArt}_*')
return playlist_details
return CacheManager.executor.submit(do_get_playlist)
def update_playlist(self, playlist_id, *args, **kwargs):
self.server.update_playlist(playlist_id, *args, **kwargs)
with self.cache_lock:
del self.cache['playlist_details'][playlist_id]
def get_artists(
self,
before_download: Callable[[], None] = lambda: None,
force: bool = False,
) -> Future:
def do_get_artists() -> List[Union[Artist, ArtistID3]]:
cache_name = self.id3ify('artists')
server_fn = (self.server.get_artists if self.browse_by_tags
else self.server.get_indexes)
if not self.cache.get(cache_name) or force:
before_download()
raw_artists = server_fn()
artists: List[Union[Artist, ArtistID3]] = []
for index in raw_artists.index:
artists.extend(index.artist)
with self.cache_lock:
self.cache[cache_name] = artists
self.save_cache_info()
return self.cache[cache_name]
return CacheManager.executor.submit(do_get_artists)
def get_artist(
self,
artist_id,
before_download: Callable[[], None] = lambda: None,
force: bool = False,
) -> Future:
def do_get_artist() -> Union[ArtistWithAlbumsID3, Child]:
# TODO: implement the non-ID3 version
cache_name = self.id3ify('artist_details')
server_fn = (self.server.get_artist if self.browse_by_tags else
self.server.get_music_directory)
if artist_id not in self.cache.get(cache_name, {}) or force:
before_download()
artist = server_fn(artist_id)
with self.cache_lock:
self.cache[cache_name][artist_id] = artist
self.save_cache_info()
return self.cache[cache_name][artist_id]
return CacheManager.executor.submit(do_get_artist)
def get_artist_info(
self,
artist_id,
before_download: Callable[[], None] = lambda: None,
force: bool = False,
) -> Future:
def do_get_artist_info() -> Union[ArtistInfo, ArtistInfo2]:
cache_name = self.id3ify('artist_infos')
server_fn = (self.server.get_artist_info2
if self.browse_by_tags else
self.server.get_artist_info)
if artist_id not in self.cache.get(cache_name, {}) or force:
before_download()
artist_info = server_fn(id=artist_id)
if artist_info:
with self.cache_lock:
self.cache[cache_name][artist_id] = artist_info
self.save_cache_info()
return self.cache[cache_name][artist_id]
return CacheManager.executor.submit(do_get_artist_info)
def get_artist_artwork(
self,
artist: Union[Artist, ArtistID3],
before_download: Callable[[], None] = lambda: None,
force: bool = False,
) -> Future:
def do_get_artist_artwork_filename() -> str:
artist_info = CacheManager.get_artist_info(artist.id).result()
lastfm_url = ''.join(artist_info.largeImageUrl)
placeholder_image_url = 'https://lastfm-img2.akamaized.net' \
'/i/u/300x300/2a96cbd8b46e442fc41c2b86b821562f.png'
# If it is the placeholder LastFM URL, then just use the cover
# art filename given by the server.
if lastfm_url == placeholder_image_url:
if isinstance(artist, ArtistWithAlbumsID3):
return CacheManager.get_cover_art_filename(
artist.coverArt, size=300).result()
elif (isinstance(artist, Directory)
and len(artist.child) > 0):
# Retrieve the first album's cover art
return CacheManager.get_cover_art_filename(
artist.child[0].coverArt, size=300).result()
url_hash = hashlib.md5(lastfm_url.encode('utf-8')).hexdigest()
return self.return_cached_or_download(
f'cover_art/artist.{url_hash}',
lambda: requests.get(lastfm_url).content,
before_download=before_download,
force=force,
)
return CacheManager.executor.submit(do_get_artist_artwork_filename)
def get_albums(
self,
type_: str,
before_download: Callable[[], None] = lambda: None,
force: bool = False,
) -> Future:
def do_get_albums() -> List[Child]:
cache_name = self.id3ify('albums')
server_fn = (self.server.get_album_list2 if self.browse_by_tags
else self.server.get_album_list)
if not self.cache.get(cache_name) or force:
before_download()
albums = server_fn(type_)
with self.cache_lock:
self.cache[cache_name] = albums.album
self.save_cache_info()
return self.cache[cache_name]
return CacheManager.executor.submit(do_get_albums)
def get_album(
self,
album_id,
before_download: Callable[[], None] = lambda: None,
force: bool = False,
) -> Future:
def do_get_album() -> Union[AlbumWithSongsID3, Child]:
# TODO: implement the non-ID3 version
cache_name = self.id3ify('album_details')
server_fn = (self.server.get_album if self.browse_by_tags else
self.server.get_music_directory)
if album_id not in self.cache.get(cache_name, {}) or force:
before_download()
album = server_fn(album_id)
with self.cache_lock:
self.cache[cache_name][album_id] = album
self.save_cache_info()
return self.cache[cache_name][album_id]
return CacheManager.executor.submit(do_get_album)
def batch_delete_cached_songs(
self,
song_ids: List[int],
on_song_delete: Callable[[], None],
) -> Future:
def do_delete_cached_songs():
# Do the actual download.
for song_id in song_ids:
song_details_future = CacheManager.get_song_details(
song_id)
def filename_future_done(f):
relative_path = f.result().path
abs_path = self.calculate_abs_path(relative_path)
if abs_path.exists():
abs_path.unlink()
on_song_delete()
song_details_future.add_done_callback(filename_future_done)
return CacheManager.executor.submit(do_delete_cached_songs)
def batch_download_songs(
self,
song_ids: List[int],
before_download: Callable[[], None],
on_song_download_complete: Callable[[int], None],
) -> Future:
# TODO handle application close somehow. I think we will need to
# raise some sort of an exception, not sure.
def do_download_song(song_id):
if CacheManager.should_exit:
return
# Do the actual download.
song_details_future = CacheManager.get_song_details(song_id)
song = song_details_future.result()
self.return_cached_or_download(
song.path,
lambda: self.server.download(song.id),
before_download=before_download,
)
self.download_limiter_semaphore.release()
on_song_download_complete(song_id)
def do_batch_download_songs():
self.current_downloads = self.current_downloads.union(
set(song_ids))
for song_id in song_ids:
self.download_limiter_semaphore.acquire()
CacheManager.executor.submit(do_download_song, song_id)
return CacheManager.executor.submit(do_batch_download_songs)
def get_cover_art_filename(
self,
id: str,
before_download: Callable[[], None] = lambda: None,
size: Union[str, int] = 200,
force: bool = False,
) -> Future:
def do_get_cover_art_filename() -> str:
return self.return_cached_or_download(
f'cover_art/{id}_{size}',
lambda: self.server.get_cover_art(id, str(size)),
before_download=before_download,
force=force,
)
return CacheManager.executor.submit(do_get_cover_art_filename)
def get_song_details(
self,
song_id: int,
before_download: Callable[[], None] = lambda: None,
force: bool = False,
) -> Future:
def do_get_song_details() -> Child:
if not self.cache['song_details'].get(song_id) or force:
before_download()
song_details = self.server.get_song(song_id)
with self.cache_lock:
self.cache['song_details'][song_id] = song_details
self.save_cache_info()
return self.cache['song_details'][song_id]
return CacheManager.executor.submit(do_get_song_details)
def get_play_queue(self) -> Future:
return CacheManager.executor.submit(self.server.get_play_queue)
def get_song_filename_or_stream(
self,
song: Child,
format=None,
force_stream: bool = False,
) -> Tuple[str, bool]:
abs_path = self.calculate_abs_path(song.path)
if not abs_path.exists() or force_stream:
return (
self.server.get_stream_url(song.id, format=format),
True,
)
return (str(abs_path), False)
def get_cached_status(self, song: Child) -> SongCacheStatus:
cache_path = self.calculate_abs_path(song.path)
if cache_path.exists():
if cache_path in self.permanently_cached_paths:
return SongCacheStatus.PERMANENTLY_CACHED
else:
return SongCacheStatus.CACHED
elif cache_path in self.current_downloads:
return SongCacheStatus.DOWNLOADING
else:
return SongCacheStatus.NOT_CACHED
def id3ify(self, cache_type):
return cache_type + ('_id3' if self.browse_by_tags else '')
_instance: Optional[__CacheManagerInternal] = None
def __init__(self, server_config: ServerConfiguration):
raise Exception('Do not instantiate the CacheManager.')
@classmethod
def reset(cls, app_config, server_config):
CacheManager._instance = CacheManager.__CacheManagerInternal(
app_config, server_config)