Track the specific outstanding work items to dedup work
This commit is contained in:
38
src/post.rs
38
src/post.rs
@@ -5,6 +5,7 @@ use crate::render::{ColorTermRenderer, Renderer as _, SerializedFrame};
|
|||||||
use itertools::Itertools as _;
|
use itertools::Itertools as _;
|
||||||
use lru::LruCache;
|
use lru::LruCache;
|
||||||
use rayon::{ThreadPool, ThreadPoolBuilder};
|
use rayon::{ThreadPool, ThreadPoolBuilder};
|
||||||
|
use std::collections::HashSet;
|
||||||
use std::fs::{DirEntry, File, read_dir};
|
use std::fs::{DirEntry, File, read_dir};
|
||||||
use std::io::BufReader;
|
use std::io::BufReader;
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
@@ -100,7 +101,7 @@ pub struct LoaderCache {
|
|||||||
loader: Arc<Loader>,
|
loader: Arc<Loader>,
|
||||||
workers: ThreadPool,
|
workers: ThreadPool,
|
||||||
num_workers: u32,
|
num_workers: u32,
|
||||||
busy_workers: u32,
|
outstanding_work: HashSet<PathBuf>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl LoaderCache {
|
impl LoaderCache {
|
||||||
@@ -116,7 +117,7 @@ impl LoaderCache {
|
|||||||
loader,
|
loader,
|
||||||
workers: ThreadPoolBuilder::new().num_threads(num_workers as _).build().unwrap(),
|
workers: ThreadPoolBuilder::new().num_threads(num_workers as _).build().unwrap(),
|
||||||
num_workers,
|
num_workers,
|
||||||
busy_workers: 0,
|
outstanding_work: HashSet::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn load_first(&mut self) -> Arc<Frame> {
|
pub fn load_first(&mut self) -> Arc<Frame> {
|
||||||
@@ -128,15 +129,18 @@ impl LoaderCache {
|
|||||||
self.load_from_paths(paths, rel)
|
self.load_from_paths(paths, rel)
|
||||||
}
|
}
|
||||||
fn load_from_paths(&mut self, paths: Paths, rel: isize) -> Arc<Frame> {
|
fn load_from_paths(&mut self, paths: Paths, rel: isize) -> Arc<Frame> {
|
||||||
// Prefetch this path + a couple more based on access pattern
|
// fill the cache with any ready data. Touch the desired path first though,
|
||||||
|
// to help it not get dropped here.
|
||||||
|
let _ = self.cache.get(&paths.path());
|
||||||
self.tend_cache_nonblocking();
|
self.tend_cache_nonblocking();
|
||||||
self.fetch_async(paths.path(), 0);
|
|
||||||
|
|
||||||
|
// Prefetch this path + a couple more based on access pattern
|
||||||
|
self.fetch_async(paths.path(), 0);
|
||||||
let paths_to_prefetch = (1..5).into_iter()
|
let paths_to_prefetch = (1..5).into_iter()
|
||||||
.map(|i| paths.get_rel(i*rel))
|
.map(|i| paths.get_rel(i*rel))
|
||||||
.dedup();
|
.dedup();
|
||||||
for path in paths_to_prefetch {
|
for path in paths_to_prefetch {
|
||||||
self.fetch_async(path, 2 /* leave some space in case we need something more urgent */);
|
self.fetch_async(path, 4 /* leave some space in case we need something more urgent */);
|
||||||
}
|
}
|
||||||
|
|
||||||
self.get_or_tend(paths.path())
|
self.get_or_tend(paths.path())
|
||||||
@@ -153,14 +157,15 @@ impl LoaderCache {
|
|||||||
/// least `reserved` workers (for higher priority work).
|
/// least `reserved` workers (for higher priority work).
|
||||||
fn fetch_async(&mut self, path: PathBuf, reserved: u32) {
|
fn fetch_async(&mut self, path: PathBuf, reserved: u32) {
|
||||||
let cached = self.cache.get(&path).is_some();
|
let cached = self.cache.get(&path).is_some();
|
||||||
let free_workers = self.busy_workers + reserved < self.num_workers;
|
let in_flight = self.outstanding_work.contains(&path);
|
||||||
if cached || !free_workers {
|
let free_workers = self.unused_workers() > reserved;
|
||||||
|
if cached || in_flight || !free_workers {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let sender = self.frame_sender.clone();
|
let sender = self.frame_sender.clone();
|
||||||
let loader = self.loader.clone();
|
let loader = self.loader.clone();
|
||||||
self.busy_workers += 1;
|
self.outstanding_work.insert(path.clone());
|
||||||
self.workers.spawn(move || {
|
self.workers.spawn(move || {
|
||||||
let _ = sender.send(loader.load(&path));
|
let _ = sender.send(loader.load(&path));
|
||||||
});
|
});
|
||||||
@@ -168,9 +173,7 @@ impl LoaderCache {
|
|||||||
|
|
||||||
fn tend_cache(&mut self, path: PathBuf) -> Arc<Frame> {
|
fn tend_cache(&mut self, path: PathBuf) -> Arc<Frame> {
|
||||||
loop {
|
loop {
|
||||||
let frame = Arc::new(self.ready_frames.recv().unwrap());
|
let frame = self.receive(self.ready_frames.recv().unwrap());
|
||||||
self.cache.put(frame.path.clone(), frame.clone());
|
|
||||||
self.busy_workers -= 1;
|
|
||||||
if frame.path == path {
|
if frame.path == path {
|
||||||
return frame;
|
return frame;
|
||||||
}
|
}
|
||||||
@@ -178,10 +181,19 @@ impl LoaderCache {
|
|||||||
}
|
}
|
||||||
fn tend_cache_nonblocking(&mut self) {
|
fn tend_cache_nonblocking(&mut self) {
|
||||||
while let Ok(frame) = self.ready_frames.try_recv() {
|
while let Ok(frame) = self.ready_frames.try_recv() {
|
||||||
let frame = Arc::new(frame);
|
self.receive(frame);
|
||||||
self.cache.put(frame.path.clone(), frame.clone());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
fn receive(&mut self, frame: Frame) -> Arc<Frame> {
|
||||||
|
let frame = Arc::new(frame);
|
||||||
|
self.cache.put(frame.path.clone(), frame.clone());
|
||||||
|
self.outstanding_work.remove(&frame.path.clone());
|
||||||
|
frame
|
||||||
|
}
|
||||||
|
|
||||||
|
fn unused_workers(&self) -> u32 {
|
||||||
|
self.num_workers - self.outstanding_work.len() as u32
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Viewer {
|
pub struct Viewer {
|
||||||
|
Reference in New Issue
Block a user