Improved resource download logic to avoid duplicate downloads
This commit is contained in:
@@ -34,27 +34,6 @@ class LibremsonicApp(Gtk.Application):
|
|||||||
|
|
||||||
self.connect('shutdown', self.on_app_shutdown)
|
self.connect('shutdown', self.on_app_shutdown)
|
||||||
|
|
||||||
self.last_play_queue_update = 0
|
|
||||||
|
|
||||||
def time_observer(value):
|
|
||||||
self.state.song_progress = value
|
|
||||||
GLib.idle_add(
|
|
||||||
self.window.player_controls.update_scrubber,
|
|
||||||
self.state.song_progress,
|
|
||||||
self.state.current_song.duration,
|
|
||||||
)
|
|
||||||
if not value:
|
|
||||||
self.last_play_queue_update = 0
|
|
||||||
elif self.last_play_queue_update + 15 <= value:
|
|
||||||
self.save_play_queue()
|
|
||||||
|
|
||||||
def on_track_end():
|
|
||||||
GLib.idle_add(self.on_next_track)
|
|
||||||
|
|
||||||
self.mpv_player = MPVPlayer(time_observer, on_track_end)
|
|
||||||
self.chromecast_player = ChromecastPlayer(time_observer, on_track_end)
|
|
||||||
self.player = self.mpv_player
|
|
||||||
|
|
||||||
# Handle command line option parsing.
|
# Handle command line option parsing.
|
||||||
def do_command_line(self, command_line):
|
def do_command_line(self, command_line):
|
||||||
options = command_line.get_options_dict()
|
options = command_line.get_options_dict()
|
||||||
@@ -135,6 +114,35 @@ class LibremsonicApp(Gtk.Application):
|
|||||||
# it exists.
|
# it exists.
|
||||||
self.state.load()
|
self.state.load()
|
||||||
|
|
||||||
|
self.last_play_queue_update = 0
|
||||||
|
|
||||||
|
def time_observer(value):
|
||||||
|
self.state.song_progress = value
|
||||||
|
GLib.idle_add(
|
||||||
|
self.window.player_controls.update_scrubber,
|
||||||
|
self.state.song_progress,
|
||||||
|
self.state.current_song.duration,
|
||||||
|
)
|
||||||
|
if not value:
|
||||||
|
self.last_play_queue_update = 0
|
||||||
|
elif self.last_play_queue_update + 15 <= value:
|
||||||
|
self.save_play_queue()
|
||||||
|
|
||||||
|
def on_track_end():
|
||||||
|
GLib.idle_add(self.on_next_track)
|
||||||
|
|
||||||
|
self.mpv_player = MPVPlayer(
|
||||||
|
time_observer,
|
||||||
|
on_track_end,
|
||||||
|
self.state.config,
|
||||||
|
)
|
||||||
|
self.chromecast_player = ChromecastPlayer(
|
||||||
|
time_observer,
|
||||||
|
on_track_end,
|
||||||
|
self.state.config,
|
||||||
|
)
|
||||||
|
self.player = self.mpv_player
|
||||||
|
|
||||||
# If there is no current server, show the dialog to select a server.
|
# If there is no current server, show the dialog to select a server.
|
||||||
if (self.state.config.current_server is None
|
if (self.state.config.current_server is None
|
||||||
or self.state.config.current_server < 0):
|
or self.state.config.current_server < 0):
|
||||||
@@ -288,6 +296,7 @@ class LibremsonicApp(Gtk.Application):
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
def on_app_shutdown(self, app):
|
def on_app_shutdown(self, app):
|
||||||
|
CacheManager.should_exit = True
|
||||||
self.player.pause()
|
self.player.pause()
|
||||||
self.state.save()
|
self.state.save()
|
||||||
self.save_play_queue()
|
self.save_play_queue()
|
||||||
|
@@ -4,7 +4,8 @@ import threading
|
|||||||
import shutil
|
import shutil
|
||||||
import json
|
import json
|
||||||
import hashlib
|
import hashlib
|
||||||
from collections import defaultdict, namedtuple
|
from collections import defaultdict
|
||||||
|
from time import sleep
|
||||||
|
|
||||||
from concurrent.futures import ThreadPoolExecutor, Future
|
from concurrent.futures import ThreadPoolExecutor, Future
|
||||||
from enum import EnumMeta, Enum
|
from enum import EnumMeta, Enum
|
||||||
@@ -68,6 +69,7 @@ class SongCacheStatus(Enum):
|
|||||||
|
|
||||||
class CacheManager(metaclass=Singleton):
|
class CacheManager(metaclass=Singleton):
|
||||||
executor: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=50)
|
executor: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=50)
|
||||||
|
should_exit: bool = False
|
||||||
|
|
||||||
class CacheEncoder(json.JSONEncoder):
|
class CacheEncoder(json.JSONEncoder):
|
||||||
def default(self, obj):
|
def default(self, obj):
|
||||||
@@ -94,7 +96,6 @@ class CacheManager(metaclass=Singleton):
|
|||||||
server: Server
|
server: Server
|
||||||
browse_by_tags: bool
|
browse_by_tags: bool
|
||||||
|
|
||||||
download_move_lock = threading.Lock()
|
|
||||||
download_set_lock = threading.Lock()
|
download_set_lock = threading.Lock()
|
||||||
current_downloads: Set[Path] = set()
|
current_downloads: Set[Path] = set()
|
||||||
|
|
||||||
@@ -201,25 +202,39 @@ class CacheManager(metaclass=Singleton):
|
|||||||
abs_path = self.calculate_abs_path(relative_path)
|
abs_path = self.calculate_abs_path(relative_path)
|
||||||
download_path = self.calculate_download_path(relative_path)
|
download_path = self.calculate_download_path(relative_path)
|
||||||
if not abs_path.exists() or force:
|
if not abs_path.exists() or force:
|
||||||
print(abs_path, 'not found. Downloading...')
|
resource_downloading = False
|
||||||
|
|
||||||
with self.download_set_lock:
|
with self.download_set_lock:
|
||||||
|
if abs_path in self.current_downloads:
|
||||||
|
resource_downloading = True
|
||||||
|
|
||||||
self.current_downloads.add(abs_path)
|
self.current_downloads.add(abs_path)
|
||||||
|
|
||||||
os.makedirs(download_path.parent, exist_ok=True)
|
if resource_downloading:
|
||||||
before_download()
|
print(abs_path, 'already being downloaded.')
|
||||||
self.save_file(download_path, download_fn())
|
# The resource is already being downloaded. Busy loop until
|
||||||
|
# it has completed. Then, just return the path to the
|
||||||
|
# resource.
|
||||||
|
while abs_path in self.current_downloads:
|
||||||
|
sleep(0.5)
|
||||||
|
|
||||||
# Move the file to its cache download location. We need a lock
|
return str(abs_path)
|
||||||
# here just in case we fired two downloads of the same asset
|
|
||||||
# for some reason.
|
else:
|
||||||
with self.download_move_lock:
|
print(abs_path, 'not found. Downloading...')
|
||||||
|
|
||||||
|
os.makedirs(download_path.parent, exist_ok=True)
|
||||||
|
before_download()
|
||||||
|
self.save_file(download_path, download_fn())
|
||||||
|
|
||||||
|
# Move the file to its cache download location.
|
||||||
os.makedirs(abs_path.parent, exist_ok=True)
|
os.makedirs(abs_path.parent, exist_ok=True)
|
||||||
if download_path.exists():
|
if download_path.exists():
|
||||||
shutil.move(download_path, abs_path)
|
shutil.move(download_path, abs_path)
|
||||||
|
|
||||||
with self.download_set_lock:
|
with self.download_set_lock:
|
||||||
self.current_downloads.discard(abs_path)
|
self.current_downloads.discard(abs_path)
|
||||||
|
|
||||||
|
print(abs_path, 'downloaded. Returning.')
|
||||||
|
|
||||||
return str(abs_path)
|
return str(abs_path)
|
||||||
|
|
||||||
@@ -447,6 +462,9 @@ class CacheManager(metaclass=Singleton):
|
|||||||
# TODO handle application close somehow. I think we will need to
|
# TODO handle application close somehow. I think we will need to
|
||||||
# raise some sort of an exception, not sure.
|
# raise some sort of an exception, not sure.
|
||||||
def do_download_song(song_id):
|
def do_download_song(song_id):
|
||||||
|
if CacheManager.should_exit:
|
||||||
|
return
|
||||||
|
|
||||||
# Do the actual download.
|
# Do the actual download.
|
||||||
song_details_future = CacheManager.get_song_details(song_id)
|
song_details_future = CacheManager.get_song_details(song_id)
|
||||||
song = song_details_future.result()
|
song = song_details_future.result()
|
||||||
|
@@ -80,7 +80,7 @@ class Server:
|
|||||||
|
|
||||||
def _get(self, url, **params):
|
def _get(self, url, **params):
|
||||||
params = {**self._get_params(), **params}
|
params = {**self._get_params(), **params}
|
||||||
print(f'[START] post: {url}')
|
print(f'[START] get: {url}')
|
||||||
|
|
||||||
# Deal with datetime parameters (convert to milliseconds since 1970)
|
# Deal with datetime parameters (convert to milliseconds since 1970)
|
||||||
for k, v in params.items():
|
for k, v in params.items():
|
||||||
@@ -92,7 +92,7 @@ class Server:
|
|||||||
if result.status_code != 200:
|
if result.status_code != 200:
|
||||||
raise Exception(f'Fail! {result.status_code}')
|
raise Exception(f'Fail! {result.status_code}')
|
||||||
|
|
||||||
print(f'[FINISH] post: {url}')
|
print(f'[FINISH] get: {url}')
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def _get_json(
|
def _get_json(
|
||||||
@@ -101,8 +101,8 @@ class Server:
|
|||||||
**params: Union[None, str, datetime, int, List[int]],
|
**params: Union[None, str, datetime, int, List[int]],
|
||||||
) -> Response:
|
) -> Response:
|
||||||
"""
|
"""
|
||||||
Make a post to a *Sonic REST API. Handle all types of errors including
|
Make a get request to a *Sonic REST API. Handle all types of errors
|
||||||
*Sonic ``<error>`` responses.
|
including *Sonic ``<error>`` responses.
|
||||||
|
|
||||||
:returns: a Response containing all of the data of the
|
:returns: a Response containing all of the data of the
|
||||||
response, deserialized
|
response, deserialized
|
||||||
|
@@ -10,16 +10,20 @@ from concurrent.futures import ThreadPoolExecutor, Future
|
|||||||
import pychromecast
|
import pychromecast
|
||||||
import mpv
|
import mpv
|
||||||
|
|
||||||
|
from libremsonic.config import AppConfiguration
|
||||||
|
|
||||||
|
|
||||||
class Player:
|
class Player:
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
on_timepos_change: Callable[[float], None],
|
on_timepos_change: Callable[[float], None],
|
||||||
on_track_end: Callable[[], None],
|
on_track_end: Callable[[], None],
|
||||||
|
config: AppConfiguration,
|
||||||
):
|
):
|
||||||
self.on_timepos_change = on_timepos_change
|
self.on_timepos_change = on_timepos_change
|
||||||
self.on_track_end = on_track_end
|
self.on_track_end = on_track_end
|
||||||
self._song_loaded = False
|
self._song_loaded = False
|
||||||
|
self.config = config
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def playing(self):
|
def playing(self):
|
||||||
@@ -146,8 +150,9 @@ class ChromecastPlayer(Player):
|
|||||||
media_status_listener = MediaStatusListener()
|
media_status_listener = MediaStatusListener()
|
||||||
|
|
||||||
class ServerThread(threading.Thread):
|
class ServerThread(threading.Thread):
|
||||||
def __init__(self, port, directory):
|
def __init__(self, host, port, directory):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
|
self.host = host
|
||||||
self.port = port
|
self.port = port
|
||||||
self.directory = directory
|
self.directory = directory
|
||||||
|
|
||||||
@@ -160,7 +165,7 @@ class ChromecastPlayer(Player):
|
|||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.server = HTTPServer(
|
self.server = HTTPServer(
|
||||||
('0.0.0.0', self.port),
|
(self.host, self.port),
|
||||||
self.generate_handler(self.directory),
|
self.generate_handler(self.directory),
|
||||||
)
|
)
|
||||||
# TODO figure out how to make this stop when the app closes.
|
# TODO figure out how to make this stop when the app closes.
|
||||||
@@ -190,14 +195,15 @@ class ChromecastPlayer(Player):
|
|||||||
|
|
||||||
# Set host_ip
|
# Set host_ip
|
||||||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
s.connect(("8.8.8.8", 80))
|
s.connect(('8.8.8.8', 80))
|
||||||
self.host_ip = s.getsockname()[0]
|
self.host_ip = s.getsockname()[0]
|
||||||
s.close()
|
s.close()
|
||||||
|
|
||||||
# TODO make this come from the app config
|
# TODO make the port come from the app config
|
||||||
self.server_thread = ChromecastPlayer.ServerThread(
|
self.server_thread = ChromecastPlayer.ServerThread(
|
||||||
|
'0.0.0.0',
|
||||||
8080,
|
8080,
|
||||||
'/home/sumner/.local/share/libremsonic',
|
self.config.cache_location,
|
||||||
)
|
)
|
||||||
self.server_thread.daemon = True
|
self.server_thread.daemon = True
|
||||||
self.server_thread.start()
|
self.server_thread.start()
|
||||||
|
Reference in New Issue
Block a user