nix-files/pkgs/additional/sane-scripts/src/sane-sync-music

412 lines
14 KiB
Python
Executable File

#!/usr/bin/env nix-shell
#!nix-shell -i python3 -p "python3.withPackages (ps: [ ps.unidecode ])" -p ffmpeg -p sox
# vim: set filetype=python :
import argparse
import concurrent.futures
import datetime
import logging
import multiprocessing
import os
import subprocess
from pathlib import Path
from unidecode import unidecode
logger = logging.getLogger(__name__)
LOSSLESS_FMTS = [
'.flac',
'.wav',
]
MAYBE_LOSSY_FMTS = [
# WMA can be lossy or lossless
'.wma',
]
LOSSY_FMTS = [
'.aac',
'.m4a',
'.mp3',
'.oga',
'.ogg',
'.opus',
]
COMPAT_AUDIO_FMTS = [
'.flac',
'.mp3',
'.oga',
'.ogg',
'.opus',
]
AUDIO_FMTS = LOSSLESS_FMTS + MAYBE_LOSSY_FMTS + LOSSY_FMTS
IMAGE_FMTS = [
'.bmp',
'.gif',
'.jpeg',
'.jpg',
'.png',
]
VIDEO_FMTS = [
'.avi',
'.mov',
'.mp4',
'.webm',
]
METADATA_FMTS = [
'.inf',
'.lyrics',
'.nfo',
'.pdf',
'.toc',
'.txt',
]
NON_AUDIO_FMTS = IMAGE_FMTS + VIDEO_FMTS + METADATA_FMTS
IGNORE = [
'.DS_Store',
'.cue',
'.log',
'.m3u',
'.nsf_',
]
def approx_eq(a: float, b: float, threshold: float) -> bool:
return abs(b - a) <= threshold
def clean_name(path: Path) -> Path:
'''
transform a path into something which most filesystems/protocols can reliably support.
also removes irregularities like uppercase file extensions.
'''
out_path = Path()
for part in path.parts:
blacklisted = '"\'!@#$%^&*()[]{};:,<>?`~|\\/'
part = unidecode(part)
part = ''.join(c for c in part if c not in blacklisted)
out_path /= part
return out_path.with_suffix(out_path.suffix.lower())
class TranscodePreferences:
def __init__(self, compress: bool, compat: bool):
self.compress = compress
self.compat = compat
def get_output(self, input_ext: str) -> str | None:
"""
for some source type (e.g. `.wav`), return the desired output type (e.g. `.mp3`).
returns `.null` to indicate the file shouldn't be copied.
returns `None` if i don't understand the source file.
"""
desired_output = None
if input_ext in AUDIO_FMTS:
desired_output = input_ext
if self.compress:
desired_output = self.get_compressed_audio_output(desired_output)
if self.compat:
desired_output = self.get_compat_audio_output(desired_output)
elif input_ext in IMAGE_FMTS:
desired_output = input_ext
elif input_ext in VIDEO_FMTS:
desired_output = input_ext
elif input_ext in METADATA_FMTS:
desired_output = input_ext
elif input_ext in IGNORE:
desired_output = ".null"
return desired_output
def desired_samplerate(self, input_samplerate: int | None) -> int | None:
samplerate_map = {
192000: 48000 if self.compress else 192000,
96000: 48000 if self.compress else 96000,
88200: 44100 if self.compress else 88200,
# preserve as-is
48000: 48000,
44100: 44100,
}
return samplerate_map.get(input_samplerate)
def get_compressed_audio_output(self, input_ext: str) -> str:
if input_ext in LOSSY_FMTS:
return input_ext
else:
return ".mp3"
def get_compat_audio_output(self, input_ext: str) -> str:
if input_ext in COMPAT_AUDIO_FMTS:
return input_ext
elif input_ext in LOSSLESS_FMTS:
return ".flac"
else:
return ".mp3"
class Encoder:
def __init__(self, prefs: TranscodePreferences, dry_run: bool = False):
self.prefs = prefs
self.dry_run = dry_run
def destructive(self, default_, f, *args, **kwargs):
if self.dry_run:
pretty_args = ", ".join(
[repr(a) for a in args]
+ [f"{k}={v!r}" for k, v in kwargs.items()]
)
logger.debug(f"[dry-run: not invoking]: {f.__name__}({pretty_args})")
return default_
else:
return f(*args, **kwargs)
def _check_output(self, args: list[str], quiet: bool = False) -> bytes:
res = subprocess.run(args, capture_output=True)
stderr = res.stderr.strip()
if stderr and not quiet:
logger.error(stderr)
res.check_returncode()
return res.stdout
def check_output(self, args: list[str], has_side_effect=True, **kwargs) -> str:
if has_side_effect:
return self.destructive(b'', self._check_output, args, **kwargs)
else:
return self._check_output(args, **kwargs)
def cp(self, source: Path, dest: Path) -> None:
logger.info(f'copying {source} -> {dest}')
self.check_output(['cp', str(source), str(dest)])
def ensure_dir(self, dir: Path) -> None:
self.destructive(None, os.makedirs, str(dir), exist_ok=True)
def remove(self, path: Path) -> None:
self.destructive(None, os.remove, path)
def convert(self, source: Path, dest: Path, target_samplerate: int | None) -> None:
assert dest.suffix == '.mp3', "conversion to a target other than mp3 not yet supported"
logger.info(f'converting {source} -> {dest}')
samplerate_flags = ['-ar', str(target_samplerate)] if target_samplerate else []
self.check_output([
'ffmpeg',
'-loglevel', 'warning',
'-y', # force overwrite
'-i', str(source),
'-codec:v', 'copy',
'-codec:a', 'libmp3lame',
'-qscale:a', '0'
] + samplerate_flags + [str(dest)])
def cp_or_convert(self, source: Path, dest: Path) -> None:
source_samplerate = None
if source.suffix.lower() not in NON_AUDIO_FMTS:
try:
source_samplerate = int(
self.check_output(
['soxi', '-r', str(source)],
has_side_effect=False,
quiet=True,
).decode("utf-8").strip()
)
except:
if source.suffix.lower() in ['.aac', '.m4a', '.wma']:
# sox is known to not support these formats
logging.debug(f'unsupported extension for samplerate: {source}')
else:
logging.warning(f'unable to obtain samplerate for {source}')
target_samplerate = self.prefs.desired_samplerate(source_samplerate)
if source_samplerate and not target_samplerate:
logging.warning(f'unable to map source sample rate: {source_samplerate}')
if source_samplerate != target_samplerate:
# resampling -> convert
self.convert(source, dest, target_samplerate)
elif source.suffix.lower() != dest.suffix:
# transcoding -> convert
self.convert(source, dest, target_samplerate)
else:
# neither resampling nor transcoding -> simple copy will suffice
self.cp(source, dest)
# in all these cases, on success, synchronize the `mtime` to be in agreement
st = os.stat(source)
mtime = st.st_mtime
atime = datetime.datetime.now().timestamp()
self.destructive(None, os.utime, str(dest), (atime, mtime))
class Sync:
def __init__(self, encoder: Encoder, in_dir: str, out_dir: str, force_copy: bool = False):
self.encoder = encoder
self.in_dir = in_dir
self.out_dir = out_dir
self.force_copy = force_copy
def target_name(self, source_name: Path) -> Path | None:
n = clean_name(source_name)
output_type = self.encoder.prefs.get_output(n.suffix)
if output_type is None:
logger.warning(f"skipping {source_name} because i don't recognize its filetype ({n.suffix})")
return None
if output_type == ".null":
return None
elif output_type == n.suffix:
return n
else:
return Path(str(n) + output_type)
def calculate_delta(self) -> tuple[set[Path], set[tuple[Path, Path]], set[tuple[Path, Path]]]:
'''
Returns, as a tuple:
- dest files which need to be deleted
- new files to copy (in-path/out-path pairs)
- existing files which need to be updated (in-path/out-path pairs)
all returned paths are relative to in_dir/out_dir.
'''
in_files = { p.relative_to(self.in_dir) for p in Path(self.in_dir).rglob("*") if not p.is_dir() }
logger.info(f'found {len(in_files)} files in source')
# create a map from source path to dest path
in_out_map = ((in_f, self.target_name(in_f)) for in_f in in_files)
in_out_map = dict((in_f, out_f) for (in_f, out_f) in in_out_map if out_f is not None)
logger.info(f'recognized {len(in_files)} source files as media')
existing_out_files = { p.relative_to(self.out_dir) for p in Path(self.out_dir).rglob("*") if not p.is_dir() }
logger.info(f'found {len(existing_out_files)} files in dest')
expected_out_files = in_out_map.values()
to_del = {
f for f in existing_out_files
if f not in expected_out_files
}
logger.info(f'found {len(to_del)} files to delete')
to_copy = {
(in_f, out_f) for (in_f, out_f) in in_out_map.items()
if out_f not in existing_out_files
}
logger.info(f'found {len(to_copy)} files to copy')
to_update = {
(in_f, out_f) for (in_f, out_f) in in_out_map.items()
if (in_f, out_f) not in to_copy and (self.force_copy or self.needs_update(in_f, out_f))
}
logger.info(f'found {len(to_update)} files to update')
return to_del, to_copy, to_update
def needs_update(self, src: Path, dest: Path) -> bool:
'''
files are relative to in_dir/out_dir
'''
src_stat = os.stat(self.in_dir / src)
dest_stat = os.stat(self.out_dir / dest)
return not approx_eq(src_stat.st_mtime, dest_stat.st_mtime, threshold=120.0)
def rm_dest_files(self, files: list[Path]) -> None:
'''
files are relative to out_dir
'''
for f in files:
logger.info(f'removing {f}')
f = Path(self.out_dir) / f
self.encoder.remove(f)
# if the directory is empty after removing this file, then remove the directory (and possibly prune its parents too)
if not os.listdir(f.parent):
os.removedirs(f.parent)
def copy_one(self, src_name: Path, dest_name: Path) -> None:
'''
path names are relative to in_dir/out_dir
'''
source = Path(self.in_dir) / src_name
dest = Path(self.out_dir) / dest_name
self.encoder.ensure_dir(dest.parent)
self.encoder.cp_or_convert(source, dest)
def try_invoke(self, f, *args) -> None:
"""
try to invoke `f` with the provided `args`, and log if it fails.
this overcomes the issue that background tasks which fail via Exception otherwise do so silently.
"""
try:
f(*args)
except Exception as e:
logger.error(f"task failed: {e}")
def cp_files(self, file_pairs: list[tuple[Path, Path]], jobs: int):
logger.info(f'using {jobs} jobs to copy {len(file_pairs)} files')
with concurrent.futures.ThreadPoolExecutor(max_workers=jobs) as executor:
for src_f, dest_f in file_pairs:
executor.submit(self.try_invoke, self.copy_one, src_f, dest_f)
def sync_all(
in_dir: str,
out_dir: str,
compress: bool = False,
compat: bool = False,
force_copy: bool = False,
dry_run: bool = False,
jobs: int = None,
) -> None:
prefs = TranscodePreferences(compress=compress, compat=compat)
encoder = Encoder(prefs, dry_run=dry_run)
sync = Sync(encoder, in_dir, out_dir, force_copy=force_copy)
to_del, to_copy, to_update = sync.calculate_delta()
sync.rm_dest_files(sorted(to_del))
sync.cp_files(sorted(to_copy) + sorted(to_update), jobs = jobs or multiprocessing.cpu_count())
def main() -> None:
logging.basicConfig()
logger.setLevel(logging.INFO)
parser = argparse.ArgumentParser(description="synchronize music from one directory to another, possibly compressing it")
parser.add_argument("src", help="source directory")
parser.add_argument("dest", help="destination directory")
parser.add_argument("--compress", action='store_true', help="compress audio files (to mp3)")
parser.add_argument("--compat", action='store_true', help="convert poorly supported file formats to better-supported formats (e.g. avoid wma)")
parser.add_argument("--jobs", help="number of cores to compress music with (default: all CPU cores)", default=None, type=int)
parser.add_argument("--dry-run", action='store_true', help="don't actually run any commands")
parser.add_argument("--verbose", action='store_true', help="more logging")
parser.add_argument("--quiet", action='store_true', help="less logging")
parser.add_argument("--force-copy", action='store_true', help="copy over files that already exist (in case metadata needs updating)")
args = parser.parse_args()
if args.verbose:
logger.setLevel(logging.DEBUG)
if args.quiet:
logger.setLevel(logging.WARN)
sync_all(
args.src,
args.dest,
compress=args.compress,
compat=args.compat,
force_copy=args.force_copy,
dry_run=args.dry_run,
jobs=args.jobs,
)
if __name__ == '__main__':
main()