Track outstanding work to optimize prefetching

This commit is contained in:
2020-12-15 23:00:33 -08:00
parent 56630b02b3
commit d940e66708

View File

@@ -99,10 +99,13 @@ pub struct LoaderCache {
frame_sender: Sender<Frame>, frame_sender: Sender<Frame>,
loader: Arc<Loader>, loader: Arc<Loader>,
workers: ThreadPool, workers: ThreadPool,
num_workers: u32,
busy_workers: u32,
} }
impl LoaderCache { impl LoaderCache {
pub fn new(loader: Loader, size: usize) -> Self { pub fn new(loader: Loader, size: usize) -> Self {
let num_workers = 16;
let loader = Arc::new(loader); let loader = Arc::new(loader);
let (sender, receiver) = mpsc::channel(); let (sender, receiver) = mpsc::channel();
@@ -111,7 +114,9 @@ impl LoaderCache {
ready_frames: receiver, ready_frames: receiver,
frame_sender: sender, frame_sender: sender,
loader, loader,
workers: ThreadPoolBuilder::new().num_threads(8).build().unwrap(), workers: ThreadPoolBuilder::new().num_threads(num_workers as _).build().unwrap(),
num_workers,
busy_workers: 0,
} }
} }
pub fn load_first(&mut self) -> Arc<Frame> { pub fn load_first(&mut self) -> Arc<Frame> {
@@ -124,16 +129,19 @@ impl LoaderCache {
} }
fn load_from_paths(&mut self, paths: Paths, rel: isize) -> Arc<Frame> { fn load_from_paths(&mut self, paths: Paths, rel: isize) -> Arc<Frame> {
// Prefetch this path + a couple more based on access pattern // Prefetch this path + a couple more based on access pattern
let paths_to_fetch = (0..5).into_iter() self.tend_cache_nonblocking();
self.fetch_async(paths.path(), 0);
let paths_to_prefetch = (1..5).into_iter()
.map(|i| paths.get_rel(i*rel)) .map(|i| paths.get_rel(i*rel))
.dedup(); .dedup();
for path in paths_to_fetch { for path in paths_to_prefetch {
self.fetch_async(path); self.fetch_async(path, 2 /* leave some space in case we need something more urgent */);
} }
self.get_or_tend(paths.path()) self.get_or_tend(paths.path())
} }
fn get_or_tend(&mut self, path: PathBuf) -> Arc<Frame> { fn get_or_tend(&mut self, path: PathBuf) -> Arc<Frame> {
self.tend_cache_nonblocking();
if let Some(cached) = self.cache.get(&path) { if let Some(cached) = self.cache.get(&path) {
return cached.clone(); return cached.clone();
} }
@@ -141,9 +149,18 @@ impl LoaderCache {
self.tend_cache(path) self.tend_cache(path)
} }
fn fetch_async(&self, path: PathBuf) { /// If not cached, fetch the Frame in a background worker, but reserve at
/// least `reserved` workers (for higher priority work).
fn fetch_async(&mut self, path: PathBuf, reserved: u32) {
let cached = self.cache.get(&path).is_some();
let free_workers = self.busy_workers + reserved < self.num_workers;
if cached || !free_workers {
return;
}
let sender = self.frame_sender.clone(); let sender = self.frame_sender.clone();
let loader = self.loader.clone(); let loader = self.loader.clone();
self.busy_workers += 1;
self.workers.spawn(move || { self.workers.spawn(move || {
let _ = sender.send(loader.load(&path)); let _ = sender.send(loader.load(&path));
}); });
@@ -153,6 +170,7 @@ impl LoaderCache {
loop { loop {
let frame = Arc::new(self.ready_frames.recv().unwrap()); let frame = Arc::new(self.ready_frames.recv().unwrap());
self.cache.put(frame.path.clone(), frame.clone()); self.cache.put(frame.path.clone(), frame.clone());
self.busy_workers -= 1;
if frame.path == path { if frame.path == path {
return frame; return frame;
} }