Files
fdtd-coremem/crates/post/src/lib.rs

284 lines
9.0 KiB
Rust

//! Post-processing tools
use coremem::meas::AbstractMeasurement;
use coremem::render::{ColorTermRenderer, Renderer as _, RenderConfig, SerializedFrame};
use coremem::sim::{GenericSim, StaticSim};
use itertools::Itertools as _;
use lru::LruCache;
use rayon::{ThreadPool, ThreadPoolBuilder};
use std::collections::HashSet;
use std::fs::{DirEntry, File, read_dir};
use std::io::BufReader;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::mpsc::{self, Receiver, Sender};
#[derive(Clone, Debug)]
pub struct Error(String);
impl From<std::io::Error> for Error {
fn from(e: std::io::Error) -> Self {
Self(e.to_string())
}
}
impl From<Box<bincode::ErrorKind>> for Error {
fn from(e: Box<bincode::ErrorKind>) -> Self {
Self(e.to_string())
}
}
pub type Result<T> = std::result::Result<T, Error>;
pub struct Frame {
path: PathBuf,
data: SerializedFrame<StaticSim>,
}
impl Frame {
pub fn measurements(&self) -> &[Box<dyn AbstractMeasurement>] {
&*self.data.measurements
}
pub fn path(&self) -> &Path {
&*self.path
}
}
impl Deref for Frame {
type Target = StaticSim;
fn deref(&self) -> &Self::Target {
&self.data.state
}
}
#[derive(Default)]
pub struct Loader {
dir: PathBuf,
}
struct Paths {
entries: Vec<DirEntry>,
idx: usize,
}
impl Paths {
fn path(&self) -> PathBuf {
self.entries[self.idx].path()
}
fn get_rel(&self, rel: isize) -> PathBuf {
let idx = self.idx as isize + rel;
let idx = idx.max(0).min(self.entries.len() as isize - 1) as usize;
self.entries[idx].path()
}
}
impl Loader {
pub fn new(dir: PathBuf) -> Self {
Self { dir }
}
fn find_rel(&self, cur: &Frame, rel: isize) -> Paths {
let entries = self.get_entries();
let pos = entries.binary_search_by(|e| {
natord::compare(&e.path().to_string_lossy(), &cur.path.to_string_lossy())
}).unwrap();
let new_pos = pos as isize + rel;
let new_pos = new_pos.max(0).min(entries.len() as isize - 1) as usize;
Paths {
entries,
idx: new_pos,
}
}
fn find_first(&self) -> Paths {
Paths {
entries: self.get_entries(),
idx: 0,
}
}
pub fn load_first(&self) -> Result<Frame> {
self.load(&*self.find_first().path())
}
pub fn load_rel(&self, cur: &Frame, rel: isize) -> Result<Frame> {
self.load(&*self.find_rel(cur, rel).path())
}
fn load(&self, path: &Path) -> Result<Frame> {
let mut reader = BufReader::new(File::open(path).unwrap());
// let data = bincode::deserialize_from(&mut reader).or_else(|_| -> Result<_> {
// reader.seek(SeekFrom::Start(0)).unwrap();
// let data: SerializedFrame<SimState<f32>> =
// bincode::deserialize_from(&mut reader)?;
// Ok(SerializedFrame::to_static(data))
// }).or_else(|_| -> Result<_> {
// reader.seek(SeekFrom::Start(0)).unwrap();
// let data: SerializedFrame<SimState<f64>> =
// bincode::deserialize_from(reader)?;
// Ok(SerializedFrame::to_static(data))
// })?;
// TODO: try to decode a few common sim types (as above) if this fails?
let data = bincode::deserialize_from(&mut reader)?;
Ok(Frame {
path: path.into(),
data
})
}
/// Return entries sorted by path
pub fn get_entries(&self) -> Vec<DirEntry> {
let mut entries: Vec<_> = read_dir(&self.dir).unwrap().into_iter()
.map(|entry| entry.unwrap()).collect();
entries.sort_by(|a, b| natord::compare(&a.path().to_string_lossy(), &b.path().to_string_lossy()));
entries
}
}
/// Owns a Loader and wraps it in a Lru cache along with a prefetcher to optimize loads
pub struct LoaderCache {
cache: LruCache<PathBuf, Result<Arc<Frame>>>,
/// Frames completed by a background worker but not yet moved into the cache
ready_frames: Receiver<(PathBuf, Result<Frame>)>,
/// Channel end used by workers to post new results
frame_sender: Sender<(PathBuf, Result<Frame>)>,
loader: Arc<Loader>,
workers: ThreadPool,
num_workers: u32,
/// Loads currently being performed by a background worker
outstanding_work: HashSet<PathBuf>,
}
impl LoaderCache {
pub fn new(loader: Loader, size: usize, num_workers: u32) -> Self {
let loader = Arc::new(loader);
let (sender, receiver) = mpsc::channel();
Self {
cache: LruCache::new(size),
ready_frames: receiver,
frame_sender: sender,
loader,
workers: ThreadPoolBuilder::new().num_threads(num_workers as _).build().unwrap(),
num_workers,
outstanding_work: HashSet::new(),
}
}
pub fn load_first(&mut self) -> Arc<Frame> {
let paths = self.loader.find_first();
self.load_from_paths(paths, 1)
}
pub fn load_rel(&mut self, cur: &Frame, rel: isize) -> Arc<Frame> {
let paths = self.loader.find_rel(cur, rel);
self.load_from_paths(paths, rel)
}
fn load_from_paths(&mut self, paths: Paths, rel: isize) -> Arc<Frame> {
// fill the cache with any ready data. Touch the desired path first though,
// to help it not get dropped here.
let _ = self.cache.get(&paths.path());
self.tend_cache_nonblocking();
// Prefetch this path + a couple more based on access pattern
self.fetch_async(paths.path(), 0);
let paths_to_prefetch = (1..5).into_iter()
.map(|i| paths.get_rel(i*rel))
.dedup();
for path in paths_to_prefetch {
self.fetch_async(path, 4 /* leave some space in case we need something more urgent */);
}
self.get_or_tend(paths.path())
}
/// If not cached, fetch the Frame in a background worker, but reserve at
/// least `reserved` workers (for higher priority work).
fn fetch_async(&mut self, path: PathBuf, reserved: u32) {
let cached = self.cache.get(&path).is_some();
let in_flight = self.outstanding_work.contains(&path);
let free_workers = self.unused_workers() > reserved;
if cached || in_flight || !free_workers {
return;
}
let sender = self.frame_sender.clone();
let loader = self.loader.clone();
self.outstanding_work.insert(path.clone());
self.workers.spawn(move || {
let loaded = loader.load(&path);
let _ = sender.send((path, loaded));
});
}
/// Read new frames until the cache is populated with the desired path.
/// May read 0 frames, if the path's already cached.
fn get_or_tend(&mut self, path: PathBuf) -> Arc<Frame> {
loop {
if let Some(cached) = self.cache.get(&path) {
return cached.as_ref().unwrap().clone();
}
let _ = self.receive(self.ready_frames.recv().unwrap());
}
}
/// Stream frames into the cache until the next frame would block.
fn tend_cache_nonblocking(&mut self) {
while let Ok(frame) = self.ready_frames.try_recv() {
let _ = self.receive(frame);
}
}
/// Move some `frame` which came from a worker into the cache, and perform
/// all necessary accounting.
fn receive(&mut self, (path, frame): (PathBuf, Result<Frame>)) -> Result<Arc<Frame>> {
let frame = frame.map(Arc::new);
self.cache.put(path.clone(), frame.clone());
self.outstanding_work.remove(&path);
frame
}
fn unused_workers(&self) -> u32 {
self.num_workers - self.outstanding_work.len() as u32
}
}
pub struct Viewer {
viewing: Arc<Frame>,
z: u32,
cache: LoaderCache,
renderer: ColorTermRenderer,
render_config: RenderConfig,
last_config: RenderConfig,
}
impl Viewer {
pub fn new(loader: Loader) -> Self {
let mut cache = LoaderCache::new(loader, 6, 6);
let viewing = cache.load_first();
Self {
z: viewing.depth() / 2,
viewing,
cache,
renderer: Default::default(),
render_config: Default::default(),
last_config: Default::default(),
}
}
pub fn navigate(&mut self, time_steps: isize, z_steps: i32) {
let new_z = (self.z as i32).saturating_add(z_steps);
let new_z = new_z.max(0).min(self.viewing.depth() as i32 - 1) as u32;
if time_steps == 0 && new_z == self.z && self.render_config == self.last_config {
return;
}
self.z = new_z;
self.last_config = self.render_config;
self.viewing = self.cache.load_rel(&self.viewing, time_steps);
self.render();
}
pub fn render(&self) {
self.renderer.render_z_slice(
&**self.viewing, self.z, &self.viewing.data.measurements, self.render_config
);
}
pub fn render_config(&mut self) -> &mut RenderConfig {
&mut self.render_config
}
}