From e1224f78ca88b88ea809d64eaa87bbe3369144cc Mon Sep 17 00:00:00 2001 From: Colin Date: Tue, 15 Dec 2020 23:28:40 -0800 Subject: [PATCH] Track the specific outstanding work items to dedup work --- src/post.rs | 38 +++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/src/post.rs b/src/post.rs index 487cf19..275714f 100644 --- a/src/post.rs +++ b/src/post.rs @@ -5,6 +5,7 @@ use crate::render::{ColorTermRenderer, Renderer as _, SerializedFrame}; use itertools::Itertools as _; use lru::LruCache; use rayon::{ThreadPool, ThreadPoolBuilder}; +use std::collections::HashSet; use std::fs::{DirEntry, File, read_dir}; use std::io::BufReader; use std::ops::Deref; @@ -100,7 +101,7 @@ pub struct LoaderCache { loader: Arc, workers: ThreadPool, num_workers: u32, - busy_workers: u32, + outstanding_work: HashSet, } impl LoaderCache { @@ -116,7 +117,7 @@ impl LoaderCache { loader, workers: ThreadPoolBuilder::new().num_threads(num_workers as _).build().unwrap(), num_workers, - busy_workers: 0, + outstanding_work: HashSet::new(), } } pub fn load_first(&mut self) -> Arc { @@ -128,15 +129,18 @@ impl LoaderCache { self.load_from_paths(paths, rel) } fn load_from_paths(&mut self, paths: Paths, rel: isize) -> Arc { - // Prefetch this path + a couple more based on access pattern + // fill the cache with any ready data. Touch the desired path first though, + // to help it not get dropped here. + let _ = self.cache.get(&paths.path()); self.tend_cache_nonblocking(); - self.fetch_async(paths.path(), 0); + // Prefetch this path + a couple more based on access pattern + self.fetch_async(paths.path(), 0); let paths_to_prefetch = (1..5).into_iter() .map(|i| paths.get_rel(i*rel)) .dedup(); for path in paths_to_prefetch { - self.fetch_async(path, 2 /* leave some space in case we need something more urgent */); + self.fetch_async(path, 4 /* leave some space in case we need something more urgent */); } self.get_or_tend(paths.path()) @@ -153,14 +157,15 @@ impl LoaderCache { /// least `reserved` workers (for higher priority work). fn fetch_async(&mut self, path: PathBuf, reserved: u32) { let cached = self.cache.get(&path).is_some(); - let free_workers = self.busy_workers + reserved < self.num_workers; - if cached || !free_workers { + let in_flight = self.outstanding_work.contains(&path); + let free_workers = self.unused_workers() > reserved; + if cached || in_flight || !free_workers { return; } let sender = self.frame_sender.clone(); let loader = self.loader.clone(); - self.busy_workers += 1; + self.outstanding_work.insert(path.clone()); self.workers.spawn(move || { let _ = sender.send(loader.load(&path)); }); @@ -168,9 +173,7 @@ impl LoaderCache { fn tend_cache(&mut self, path: PathBuf) -> Arc { loop { - let frame = Arc::new(self.ready_frames.recv().unwrap()); - self.cache.put(frame.path.clone(), frame.clone()); - self.busy_workers -= 1; + let frame = self.receive(self.ready_frames.recv().unwrap()); if frame.path == path { return frame; } @@ -178,10 +181,19 @@ impl LoaderCache { } fn tend_cache_nonblocking(&mut self) { while let Ok(frame) = self.ready_frames.try_recv() { - let frame = Arc::new(frame); - self.cache.put(frame.path.clone(), frame.clone()); + self.receive(frame); } } + fn receive(&mut self, frame: Frame) -> Arc { + let frame = Arc::new(frame); + self.cache.put(frame.path.clone(), frame.clone()); + self.outstanding_work.remove(&frame.path.clone()); + frame + } + + fn unused_workers(&self) -> u32 { + self.num_workers - self.outstanding_work.len() as u32 + } } pub struct Viewer {