sane-sync-music: refactor to facilitate future tweaks

This commit is contained in:
Colin 2023-12-07 18:42:46 +00:00
parent 07c7050335
commit 9619c6d2e1

View File

@ -123,62 +123,73 @@ def clean_name(path: str) -> Path:
out_path /= part
return out_path
def target_name(source_name: str) -> Path:
n = clean_name(source_name)
if n.suffix in MAKE_MP3:
return Path(str(n) + '.mp3')
else:
return n
class Sync:
def __init__(self, encoder: Encoder, in_dir: str, out_dir: str):
self.encoder = encoder
self.in_dir = in_dir
self.out_dir = out_dir
def calculate_delta(in_dir: str, out_dir: str) -> tuple[set[Path], set[Path]]:
'''
Returns the set of dest files which need to be deleted, followed by the files to copy
'''
in_files = { p.relative_to(in_dir) for p in Path(in_dir).rglob("*") if not p.is_dir() }
logger.info(f'found {len(in_files)} files in source')
existing_out_files = { p.relative_to(out_dir) for p in Path(out_dir).rglob("*") if not p.is_dir() }
logger.info(f'found {len(existing_out_files)} files in dest')
def target_name(self, source_name: str) -> Path:
n = clean_name(source_name)
if n.suffix in MAKE_MP3:
return Path(str(n) + '.mp3')
else:
return n
expected_out_files = { target_name(n) for n in in_files }
def calculate_delta(self) -> tuple[set[Path], set[Path]]:
'''
Returns the set of dest files which need to be deleted, followed by the files to copy
'''
in_files = { p.relative_to(self.in_dir) for p in Path(self.in_dir).rglob("*") if not p.is_dir() }
logger.info(f'found {len(in_files)} files in source')
existing_out_files = { p.relative_to(self.out_dir) for p in Path(self.out_dir).rglob("*") if not p.is_dir() }
logger.info(f'found {len(existing_out_files)} files in dest')
to_del = { f for f in existing_out_files if f not in expected_out_files }
logger.info(f'found {len(to_del)} files to delete')
# FIXME: files which exist but have changed (e.g fixed metadata) are incorrectly skipped
to_copy = { f for f in in_files if target_name(f) not in existing_out_files and f.suffix not in IGNORE }
logger.info(f'found {len(to_copy)} files to copy')
expected_out_files = { self.target_name(n) for n in in_files }
return to_del, to_copy
to_del = { f for f in existing_out_files if f not in expected_out_files }
logger.info(f'found {len(to_del)} files to delete')
# FIXME: files which exist but have changed (e.g fixed metadata) are incorrectly skipped
to_copy = { f for f in in_files if self.target_name(f) not in existing_out_files and f.suffix not in IGNORE }
logger.info(f'found {len(to_copy)} files to copy')
def rm_dest_files(encoder: Encoder, out_dir: str, files: set[Path]) -> None:
for f in files:
logger.info(f'removing {f} because it does not exist on host')
encoder.remove(Path(out_dir) / f)
return to_del, to_copy
def copy_one(encoder: Encoder, source: Path, dest: Path) -> None:
encoder.ensure_dir(dest.parent)
if source.suffix in MAKE_MP3:
logger.debug(f'converting {source} -> {dest}')
encoder.convert(source, dest)
elif source.suffix in COPY_RAW:
logger.debug(f'copying {source} -> {dest}')
encoder.cp(source, dest)
else:
logger.warning(f"skipping {source} because I don't know what to do with that file type")
def rm_dest_files(self, files: set[Path]) -> None:
for f in files:
logger.info(f'removing {f} because it does not exist on host')
self.encoder.remove(Path(self.out_dir) / f)
def cp_src_files(encoder: Encoder, in_dir: Path, out_dir: Path, src_names: set[Path], jobs: int):
logger.info(f'using {jobs} jobs to copy {len(src_names)} files')
# Parallel(n_jobs=jobs)(delayed(copy_one)(encoder, in_dir / n, out_dir / target_name(n)) for n in src_names)
with concurrent.futures.ThreadPoolExecutor(max_workers=jobs) as executor:
for n in src_names:
executor.submit(copy_one, encoder, in_dir / n, out_dir / target_name(n))
def copy_one(self, name: Path) -> None:
source = self.in_dir / name
dest = self.out_dir / self.target_name(name)
self.encoder.ensure_dir(dest.parent)
if source.suffix in MAKE_MP3:
logger.debug(f'converting {source} -> {dest}')
self.encoder.convert(source, dest)
elif source.suffix in COPY_RAW:
logger.debug(f'copying {source} -> {dest}')
self.encoder.cp(source, dest)
else:
logger.warning(f"skipping {source} because I don't know what to do with that file type")
def cp_src_files(self, src_names: set[Path], jobs: int):
logger.info(f'using {jobs} jobs to copy {len(src_names)} files')
# Parallel(n_jobs=jobs)(delayed(copy_one)(encoder, in_dir / n, out_dir / target_name(n)) for n in src_names)
with concurrent.futures.ThreadPoolExecutor(max_workers=jobs) as executor:
for n in src_names:
executor.submit(self.copy_one, n)
def sync_all(in_dir: str, out_dir: str, jobs: int = None, dry_run: bool = False) -> None:
encoder = Encoder(dry_run=dry_run)
to_del, to_copy = calculate_delta(in_dir, out_dir)
sync = Sync(encoder, in_dir, out_dir)
to_del, to_copy = sync.calculate_delta()
rm_dest_files(encoder, out_dir, to_del)
cp_src_files(encoder, in_dir, out_dir, to_copy, jobs = jobs or multiprocessing.cpu_count())
sync.rm_dest_files(to_del)
sync.cp_src_files(to_copy, jobs = jobs or multiprocessing.cpu_count())
def main() -> None:
logging.basicConfig()