Finished fixing all flake8 typing errors

This commit is contained in:
Sumner Evans
2020-03-06 08:02:43 -07:00
parent 4b02083a5a
commit 4f559e346d
8 changed files with 166 additions and 114 deletions

View File

@@ -2,7 +2,8 @@ import logging
import math
import os
import random
from typing import Any, Callable, Dict, Iterable, List, Optional
from concurrent.futures import Future
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple
import gi
gi.require_version('Gtk', '3.0')
@@ -24,7 +25,7 @@ class SublimeMusicApp(Gtk.Application):
super().__init__(application_id="com.sumnerevans.sublimemusic")
Notify.init('Sublime Music')
self.window = None
self.window: Optional[Gtk.Window] = None
self.state = ApplicationState()
self.state.config_file = config_file
@@ -210,13 +211,13 @@ class SublimeMusicApp(Gtk.Application):
def on_dbus_method_call(
self,
connection,
sender,
path,
interface,
method,
params,
invocation,
connection: Gio.DBusConnection,
sender: str,
path: str,
interface: str,
method: str,
params: GLib.Variant,
invocation: Gio.DBusMethodInvocation,
):
second_microsecond_conversion = 1000000
@@ -314,7 +315,7 @@ class SublimeMusicApp(Gtk.Application):
reverse=reverse_order,
)
def make_playlist_tuple(p: Playlist):
def make_playlist_tuple(p: Playlist) -> GLib.Variant:
cover_art_filename = CacheManager.get_cover_art_filename(
p.coverArt,
allow_download=False,
@@ -323,15 +324,15 @@ class SublimeMusicApp(Gtk.Application):
return GLib.Variant(
'(a(oss))', (
list(
map(
make_playlist_tuple,
playlists[index:(index + max_count)])), ))
[
make_playlist_tuple(p)
for p in playlists[index:(index + max_count)]
], ))
method_call_map = {
method_call_map: Dict[str, Dict[str, Any]] = {
'org.mpris.MediaPlayer2': {
'Raise': self.window.present,
'Quit': self.window.destroy,
'Raise': self.window and self.window.present,
'Quit': self.window and self.window.destroy,
},
'org.mpris.MediaPlayer2.Player': {
'Next': self.on_next_track,
@@ -352,34 +353,35 @@ class SublimeMusicApp(Gtk.Application):
'GetPlaylists': get_playlists,
},
}
method = method_call_map.get(interface, {}).get(method)
if method is None:
method_fn = method_call_map.get(interface, {}).get(method)
if method_fn is None:
logging.warning(
f'Unknown/unimplemented method: {interface}.{method}.')
invocation.return_value(method(*params) if callable(method) else None)
invocation.return_value(
method_fn(*params) if callable(method_fn) else None)
def on_dbus_set_property(
self,
connection,
sender,
path,
interface,
property_name,
value,
connection: Gio.DBusConnection,
sender: str,
path: str,
interface: str,
property_name: str,
value: GLib.Variant,
):
def change_loop(new_loop_status):
def change_loop(new_loop_status: GLib.Variant):
self.state.repeat_type = RepeatType.from_mpris_loop_status(
new_loop_status.get_string())
self.update_window()
def set_shuffle(new_val):
def set_shuffle(new_val: GLib.Variant):
if new_val.get_boolean() != self.state.shuffle_on:
self.on_shuffle_press(None, None)
def set_volume(new_val):
self.on_volume_change(None, value.get_double() * 100)
def set_volume(new_val: GLib.Variant):
self.on_volume_change(None, new_val.get_double() * 100)
setter_map = {
setter_map: Dict[str, Dict[str, Any]] = {
'org.mpris.MediaPlayer2.Player': {
'LoopStatus': change_loop,
'Rate': lambda _: None,
@@ -388,7 +390,7 @@ class SublimeMusicApp(Gtk.Application):
}
}
setter = setter_map.get(interface).get(property_name)
setter = setter_map.get(interface, {}).get(property_name)
if setter is None:
logging.warning('Set: Unknown property: {property_name}.')
return
@@ -612,7 +614,13 @@ class SublimeMusicApp(Gtk.Application):
self.state.current_tab = stack.get_visible_child_name()
self.update_window()
def on_song_clicked(self, win, song_index, song_queue, metadata):
def on_song_clicked(
self,
win: Any,
song_index: int,
song_queue: List[str],
metadata: Dict[str, Any],
):
# Reset the play queue so that we don't ever revert back to the
# previous one.
old_play_queue = song_queue.copy()
@@ -641,7 +649,7 @@ class SublimeMusicApp(Gtk.Application):
play_queue=song_queue,
)
def on_songs_removed(self, win, song_indexes_to_remove):
def on_songs_removed(self, win: Any, song_indexes_to_remove: List[int]):
self.state.play_queue = [
song_id for i, song_id in enumerate(self.state.play_queue)
if i not in song_indexes_to_remove
@@ -669,8 +677,8 @@ class SublimeMusicApp(Gtk.Application):
self.save_play_queue()
@dbus_propagate()
def on_song_scrub(self, _, scrub_value):
if not hasattr(self.state, 'current_song'):
def on_song_scrub(self, win: Any, scrub_value: float):
if not self.state.current_song or not self.window:
return
new_time = self.state.current_song.duration * (scrub_value / 100)
@@ -685,7 +693,7 @@ class SublimeMusicApp(Gtk.Application):
self.save_play_queue()
def on_device_update(self, _, device_uuid):
def on_device_update(self, win: Any, device_uuid: str):
if device_uuid == self.state.current_device:
return
self.state.current_device = device_uuid
@@ -709,24 +717,28 @@ class SublimeMusicApp(Gtk.Application):
self.dbus_manager.property_diff()
@dbus_propagate()
def on_mute_toggle(self, action, _):
def on_mute_toggle(self, *args):
self.state.is_muted = not self.state.is_muted
self.player.is_muted = self.state.is_muted
self.update_window()
@dbus_propagate()
def on_volume_change(self, _, value):
def on_volume_change(self, _: Any, value: float):
self.state.volume = value
self.player.volume = self.state.volume
self.update_window()
def on_window_key_press(self, window, event):
def on_window_key_press(
self,
window: Gtk.Window,
event: Gdk.EventKey,
) -> bool:
# Need to use bitwise & here to see if CTRL is pressed.
if (event.keyval == 102
and event.state & Gdk.ModifierType.CONTROL_MASK):
# Ctrl + F
window.search_entry.grab_focus()
return
return False
if window.search_entry.has_focus():
return False
@@ -742,7 +754,9 @@ class SublimeMusicApp(Gtk.Application):
action()
return True
def on_app_shutdown(self, app):
return False
def on_app_shutdown(self, app: 'SublimeMusicApp'):
Notify.uninit()
if self.state.config.server is None:
@@ -767,10 +781,12 @@ class SublimeMusicApp(Gtk.Application):
dialog.run()
dialog.destroy()
def update_window(self, force=False):
def update_window(self, force: bool = False):
if not self.window:
return
GLib.idle_add(lambda: self.window.update(self.state, force=force))
def update_play_state_from_server(self, prompt_confirm=False):
def update_play_state_from_server(self, prompt_confirm: bool = False):
# TODO (#129): need to make the up next list loading for the duration
# here if prompt_confirm is False.
was_playing = self.state.playing
@@ -778,7 +794,7 @@ class SublimeMusicApp(Gtk.Application):
self.state.playing = False
self.update_window()
def do_update(f):
def do_update(f: Future):
play_queue = f.result()
new_play_queue = [s.id for s in play_queue.entry]
new_current_song_id = str(play_queue.current)
@@ -835,9 +851,9 @@ class SublimeMusicApp(Gtk.Application):
def play_song(
self,
song_index: int,
reset=False,
old_play_queue=None,
play_queue=None,
reset: bool = False,
old_play_queue: List[str] = None,
play_queue: List[str] = None,
):
# Do this the old fashioned way so that we can have access to ``reset``
# in the callback.
@@ -869,13 +885,14 @@ class SublimeMusicApp(Gtk.Application):
song_notification.add_action(
'clicked',
'Open Sublime Music',
lambda *a: self.window.present(),
lambda *a: self.window.present()
if self.window else None,
)
song_notification.show()
def on_cover_art_download_complete(
cover_art_filename,
order_token,
cover_art_filename: str,
order_token: int,
):
if order_token != self.song_playing_order_token:
return
@@ -887,7 +904,8 @@ class SublimeMusicApp(Gtk.Application):
cover_art_filename, 70, 70, True))
song_notification.show()
def get_cover_art_filename(order_token):
def get_cover_art_filename(
order_token: int) -> Tuple[str, int]:
return (
CacheManager.get_cover_art_filename(
song.coverArt).result(),
@@ -906,8 +924,9 @@ class SublimeMusicApp(Gtk.Application):
'Unable to display notification. Is a notification '
'daemon running?')
def on_song_download_complete(song_id):
if self.state.current_song.id != song.id:
def on_song_download_complete(song_id: int):
if (self.state.current_song
and self.state.current_song.id != song.id):
return
# Switch to the local media if the player can hotswap (MPV can,