fdtd-coremem/crates/coremem/src/driver.rs

554 lines
20 KiB
Rust

use crate::diagnostics::SyncDiagnostics;
use crate::geom::{Coord, Index, Region};
use crate::mat;
use crate::meas::{self, AbstractMeasurement};
use crate::real::Real;
use crate::render::{self, MultiRenderer, Renderer};
use crate::sim::AbstractSim;
use crate::sim::units::{Frame, Time};
use crate::stim::{
DynStimuli,
Fields,
FieldMags,
ModulatedVectorField,
RenderedStimulus,
Stimulus,
StimuliVec,
TimeVarying,
VectorField,
};
use crate::worker::JobPool;
use coremem_cross::compound::list;
use coremem_cross::dim::DimSlice;
use coremem_cross::step::SimMeta;
use log::{info, trace};
use serde::{Deserialize, Serialize};
use std::cell::Cell;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::Instant;
pub struct Driver<R, S, Stim=DriverStimulusDynVec<R>> {
state: S,
renderer: Arc<MultiRenderer<S>>,
render_pool: JobPool<S, ()>,
measurements: Vec<Arc<dyn AbstractMeasurement<S>>>,
stimuli: StimAccess<R, Stim>,
/// simulation end time
sim_end_time: Option<Frame>,
diag: SyncDiagnostics,
last_diag_time: Instant,
}
impl<S: AbstractSim, Stim> Driver<S::Real, S, Stim> {
pub fn new_with_stim(mut state: S, stimuli: Stim) -> Self {
let diag = SyncDiagnostics::new();
state.use_diagnostics(diag.clone());
Self {
state,
renderer: Arc::new(MultiRenderer::new()),
render_pool: JobPool::new(1),
measurements: vec![
Arc::new(meas::Time),
Arc::new(meas::Meta),
Arc::new(meas::Energy::world()),
Arc::new(meas::Power::world()),
],
stimuli: StimAccess::new(diag.clone(), stimuli),
sim_end_time: None,
diag,
last_diag_time: Instant::now(),
}
}
pub fn add_measurement<Meas: AbstractMeasurement<S> + 'static>(&mut self, m: Meas) {
self.measurements.push(Arc::new(m));
}
pub fn add_stimulus<SNew>(&mut self, s: SNew)
where Stim: Pushable<SNew>
{
self.stimuli.push(self.state.meta(), s)
}
}
impl<S: AbstractSim> Driver<S::Real, S, DriverStimulusDynVec<S::Real>> {
pub fn new(state: S) -> Self {
Self::new_with_stim(state, DriverStimulusDynVec::default())
}
}
impl<S: AbstractSim> Driver<S::Real, S, list::Empty> {
pub fn new_list_stim(state: S) -> Self {
Self::new_with_stim(state, list::Empty::default())
}
}
impl<S: AbstractSim, Stim> Driver<S::Real, S, Stim> {
/// add a stimulus onto a list of non-monomorphized stimuli.
/// this necessarily must return a new Self.
/// (well, with enough tuning we could actually Box just the first reference...
pub fn with_add_stimulus<E>(self, s: E) -> Driver<S::Real, S, list::Appended<Stim, E>>
where Stim: list::Appendable<E>
{
Driver {
state: self.state,
renderer: self.renderer,
render_pool: self.render_pool,
measurements: self.measurements,
stimuli: StimAccess::new(self.diag.clone(), self.stimuli.into_inner().append(s)),
sim_end_time: self.sim_end_time,
diag: self.diag,
last_diag_time: self.last_diag_time,
}
}
pub fn with_stimulus<NewStim>(self, stimuli: NewStim) -> Driver<S::Real, S, NewStim> {
Driver {
state: self.state,
renderer: self.renderer,
render_pool: self.render_pool,
measurements: self.measurements,
stimuli: StimAccess::new(self.diag.clone(), stimuli),
sim_end_time: self.sim_end_time,
diag: self.diag,
last_diag_time: self.last_diag_time,
}
}
pub fn with_concrete_stimulus<T>(self) -> Driver<S::Real, S, DriverStimulusVec<T>> {
self.with_stimulus(DriverStimulusVec::<T>::default())
}
pub fn with_modulated_stimulus<T>(self) -> Driver<S::Real, S, DriverStimulusModulated<S::Real, T>> {
self.with_stimulus(DriverStimulusModulated::<S::Real, T>::default())
}
}
impl<R, S, Stim> Driver<R, S, Stim> {
/// when we step the simulation N times, we do so with a constant stimulus over those N frames.
/// lower-resolution quantization of stimuli lets us batch more step calls (critical to perf)
/// but at the cost of precision.
pub fn set_steps_per_stimulus(&mut self, steps: u64) {
self.stimuli.steps_per_stimulus = steps;
}
}
impl<S: AbstractSim, Stim> Driver<S::Real, S, Stim> {
pub fn fill_region<Reg: Region, M: Into<S::Material> + Clone>(&mut self, region: &Reg, mat: M) {
self.state.fill_region(region, mat);
}
pub fn test_region_filled<Reg: Region, M>(&mut self, region: &Reg, mat: M) -> bool
where
M: Into<S::Material> + Clone,
S::Material: PartialEq
{
self.state.test_region_filled(region, mat)
}
pub fn size(&self) -> Index {
self.state.size()
}
pub fn timestep(&self) -> f32 {
self.state.timestep()
}
pub fn time(&self) -> f32 {
self.state.time()
}
pub fn add_classical_boundary<C: Coord>(&mut self, thickness: C)
where S::Material: From<mat::IsomorphicConductor<S::Real>>
{
let timestep = self.state.timestep();
self.state.fill_boundary_using(thickness, |boundary_ness| {
let b = boundary_ness.elem_pow(3.0);
let cond = b * (0.5 / timestep);
let iso_cond = cond.x() + cond.y() + cond.z();
let iso_conductor = mat::IsomorphicConductor::new(iso_cond.cast());
iso_conductor
});
}
}
impl<S: AbstractSim + 'static, Stim> Driver<S::Real, S, Stim> {
fn add_renderer<Rend: Renderer<S> + 'static>(
&mut self, renderer: Rend, name: &str, step_frequency: u64, frame_limit: Option<u64>
) {
info!("render to {} at f={} until f={:?}", name, step_frequency, frame_limit);
self.renderer.push(renderer, step_frequency, frame_limit);
}
pub fn add_y4m_renderer<P: Into<PathBuf>>(&mut self, output: P, step_frequency: u64, frame_limit: Option<u64>) {
let output = output.into();
let name = output.to_string_lossy().into_owned();
self.add_renderer(render::Y4MRenderer::new(output), &*name, step_frequency, frame_limit);
}
pub fn add_term_renderer(&mut self, step_frequency: u64, frame_limit: Option<u64>) {
self.add_renderer(render::ColorTermRenderer, "terminal", step_frequency, frame_limit);
}
pub fn add_csv_renderer(&mut self, path: &str, step_frequency: u64, frame_limit: Option<u64>) {
self.add_renderer(render::CsvRenderer::new(path), path, step_frequency, frame_limit);
}
}
impl<S: AbstractSim + Serialize + 'static, Stim> Driver<S::Real, S, Stim> {
pub fn add_serializer_renderer(&mut self, out_base: &str, step_frequency: u64, frame_limit: Option<u64>) {
let fmt_str = format!("{out_base}{{step_no}}.bc", out_base=out_base);
self.add_renderer(render::SerializerRenderer::new_generic(&*fmt_str), &*fmt_str, step_frequency, frame_limit);
}
}
impl<S, Stim> Driver<S::Real, S, Stim>
where
S: AbstractSim + Send + Sync + Serialize + for<'a> Deserialize<'a> + 'static
{
/// instruct the driver to periodically save the simulation state to the provided path.
/// also attempts to load an existing state file, returning `true` on success.
pub fn add_state_file(&mut self, state_file: &str, snapshot_frequency: u64) -> bool {
let ser = render::SerializerRenderer::new(state_file);
let loaded = ser.try_load().map(|s| {
self.state = s.state;
self.state.use_diagnostics(self.diag.clone());
}).is_some();
self.add_renderer(ser, state_file, snapshot_frequency, None);
loaded
}
}
impl<S, Stim> Driver<S::Real, S, Stim>
where
S: AbstractSim + Clone + Default + Send + 'static,
Stim: DriverStimulus<S::Real> + Send + 'static,
{
fn render(&mut self) {
let their_state = self.diag.instrument_render_prep(|| {
if self.render_pool.num_workers() != 3 {
let diag = self.diag.clone();
// TODO: these measurements will come to differ from the ones in the Driver,
// if the user calls `add_measurement`!
let measurements = self.measurements.clone();
let renderer = self.renderer.clone();
self.render_pool.spawn_workers(3, move |state| {
// unblock the main thread (this limits the number of renders in-flight at any time
trace!("render begin");
diag.instrument_render_cpu_side(|| {
let meas: Vec<&dyn AbstractMeasurement<S>> = measurements.iter().map(|m| &**m).collect();
renderer.render(&state, &*meas, Default::default());
});
trace!("render end");
});
}
self.state.clone()
});
// TODO: this instrumentation is not 100% accurate.
// - 'prep' and 'blocked' have effectively been folded together.
// - either delete 'prep', or change this block to use a `try_send` (prep) followed by a
// `send` (blocking)
self.diag.instrument_render_blocked(|| {
self.render_pool.tend();
self.render_pool.send(their_state);
});
}
/// Return the number of steps actually stepped
fn step_at_most(&mut self, at_most: u32) -> u32 {
let diag = self.diag.clone();
diag.instrument_driver(move || {
assert!(at_most != 0);
let start_step = self.state.step_no();
if self.renderer.any_work_for_frame(start_step) {
self.render();
}
// maybe the renderer or stimulus needs servicing before the max frame the user asked for.
// step less than `at_most`, in that case.
let next_frame_for_user = start_step + at_most as u64;
let next_frame_to_render = self.renderer.next_frame_for_work(start_step);
let next_frame_for_stim = self.stimuli.next_frame_for_work(start_step);
let step_to = [Some(next_frame_for_user), next_frame_to_render, Some(next_frame_for_stim)]
.into_iter()
.flatten()
.min()
.unwrap();
let steps_this_time = (step_to - start_step).try_into().unwrap();
let meta = self.state.meta();
let stim = self.stimuli.get_for(meta, start_step);
// prefetch the next stimulus, in the background.
self.diag.instrument_stimuli_prep(|| {
self.stimuli.start_job(meta, step_to);
});
trace!("step begin");
self.diag.instrument_step(steps_this_time as u64, || {
self.state.step_multiple(steps_this_time, &stim);
});
trace!("step end");
if self.last_diag_time.elapsed().as_secs_f64() >= 5.0 {
self.last_diag_time = Instant::now();
let step = self.state.step_no();
let diagstr = self.diag.format();
let sim_time = self.state.time() as f64;
let percent_complete = self.sim_end_time.map(|t| {
format!("[{:.1}%] ", 100.0 * self.state.time() / *t.to_seconds(self.timestep()))
}).unwrap_or_default();
info!(
"{}t={:.2e} frame {:06} {}",
percent_complete, sim_time, step, diagstr
);
}
steps_this_time
})
}
pub fn step(&mut self) {
self.step_at_most(1);
}
/// Returns the number of timesteps needed to reach the end time
pub fn steps_until<T: Time>(&mut self, sim_end_time: T) -> u64 {
let sim_end_step = sim_end_time.to_frame(self.state.timestep());
let start_step = self.state.step_no();
sim_end_step.saturating_sub(start_step)
}
pub fn step_until<T: Time>(&mut self, sim_end_time: T) {
let sim_end_time = sim_end_time.to_frame(self.state.timestep());
self.sim_end_time = Some(sim_end_time);
let mut stepped = false;
while self.state.step_no() < *sim_end_time {
let steps_left = *sim_end_time - self.state.step_no();
// sanity limit: don't try to step too much at once else we may lock up the GPU/etc.
self.step_at_most(steps_left.min(1000) as u32);
stepped = true;
}
if stepped {
// render the final frame -- unless we already *have*
self.render();
}
self.render_pool.join_workers();
self.sim_end_time = None;
}
}
// this is effectively `Cow`, but without the `ToOwned` (Clone) requirement
pub enum ValueOrRef<'a, T> {
Value(T),
Ref(&'a T),
}
impl<'a, T> AsRef<T> for ValueOrRef<'a, T> {
fn as_ref(&self) -> &T {
match self {
ValueOrRef::Value(x) => &x,
ValueOrRef::Ref(x) => x,
}
}
}
/// gives an opportunity to optimize a Stimulus for a specific setting
/// before passing it off to the simulation.
pub trait DriverStimulus<R: Real> {
type Optimized: Stimulus<R>;
fn optimized_for<'a>(
&'a self, meta: SimMeta<f32>, _step: u64
) -> ValueOrRef<'a, Self::Optimized>;
}
pub trait Pushable<T> {
fn push(&mut self, meta: SimMeta<f32>, t: T);
}
pub struct DriverStimulusVec<T>(StimuliVec<T>);
impl<T> Default for DriverStimulusVec<T> {
fn default() -> Self {
Self(Default::default())
}
}
impl<R: Real, T: Stimulus<R>> DriverStimulus<R> for DriverStimulusVec<T> {
type Optimized = StimuliVec<T>;
fn optimized_for<'a>(
&'a self, _meta: SimMeta<f32>, _step: u64
) -> ValueOrRef<'a, Self::Optimized> {
ValueOrRef::Ref(&self.0)
}
}
impl<T> Pushable<T> for DriverStimulusVec<T> {
fn push(&mut self, _meta: SimMeta<f32>, t: T) {
self.0.push(t)
}
}
#[derive(Default)]
pub struct DriverStimulusDynVec<R>(DynStimuli<R>);
impl<R: Real> DriverStimulus<R> for DriverStimulusDynVec<R> {
type Optimized = DynStimuli<R>;
fn optimized_for<'a>(
&'a self, _meta: SimMeta<f32>, _step: u64
) -> ValueOrRef<'a, Self::Optimized> {
ValueOrRef::Ref(&self.0)
}
}
impl<R: Real, T: Stimulus<R> + Send + 'static> Pushable<T> for DriverStimulusDynVec<R> {
fn push(&mut self, _meta: SimMeta<f32>, t: T) {
self.0.push(Box::new(t))
}
}
/// optimized stimulus which will evaluate the vector fields _only once_
pub struct DriverStimulusModulated<R, T>(StimuliVec<ModulatedStaticField<R, T>>);
impl<R: Real, T> Default for DriverStimulusModulated<R, T> {
fn default() -> Self {
Self(Default::default())
}
}
impl<R: Real, V: VectorField<R>, T> Pushable<ModulatedVectorField<V, T>> for DriverStimulusModulated<R, T> {
fn push(&mut self, meta: SimMeta<f32>, stim: ModulatedVectorField<V, T>) {
let (vfield, timef) = stim.into_inner();
let dim = meta.dim();
let mut storage = Vec::new();
storage.resize_with(dim.product_sum_usize(), Fields::default);
let mut view = DimSlice::new(dim, storage);
let mut_view: DimSlice<&mut [_]> = view.as_mut();
for (loc, value) in mut_view.enumerated() {
*value = vfield.at(meta.feature_size().cast(), loc.into());
}
self.0.push(ModulatedStaticField::new(view, timef))
}
}
impl<R: Real, T: TimeVarying<R>> DriverStimulus<R> for DriverStimulusModulated<R, T> {
type Optimized = StimuliVec<ModulatedStaticField<R, FieldMags<R>>>;
fn optimized_for<'a>(
&'a self, meta: SimMeta<f32>, step: u64,
) -> ValueOrRef<'a, Self::Optimized> {
let t_sec = meta.time_step().cast::<R>() * R::from_primitive(step);
let opt = self.0.iter().map(|modulated| ModulatedVectorField::new(
// TODO: remove this costly clone!
(*modulated.fields()).clone(),
modulated.modulation().at(t_sec),
)).collect();
ValueOrRef::Value(StimuliVec::from_vec(opt))
}
}
/// a Stimulus where the field has been pre-calculated
pub type ModulatedStaticField<R, T> = ModulatedVectorField<DimSlice<Vec<Fields<R>>>, T>;
/// wraps a Stimulus to help provide async functionality on top of it.
/// the caller can request evaluation at a specific time, and either block on that or
/// come back and re-request that time later, expecting that it's been evaluated in the background.
struct StimAccess<R, T> {
stim: Arc<Mutex<T>>,
steps_per_stimulus: u64,
diag: SyncDiagnostics,
/// is the background thread doing work (or, has it completed work and placed it on the return
/// queue)?
/// A.K.A. "can i safely do a blocking recv on response_channel".
outstanding: Cell<bool>,
worker: JobPool<(SimMeta<f32>, u64), (SimMeta<f32>, u64, RenderedStimulus<R>)>,
}
impl<R, T> StimAccess<R, T> {
fn new(diag: SyncDiagnostics, stim: T) -> Self {
Self {
stim: Arc::new(Mutex::new(stim)),
steps_per_stimulus: 1,
diag,
outstanding: Cell::new(false),
worker: JobPool::new(1),
}
}
fn into_inner(self) -> T {
let _ = self.maybe_wait_for_job(Default::default(), 0);
// with the worker joined, there should be no outstanding handles on the arc.
Arc::try_unwrap(self.stim).ok().unwrap().into_inner().unwrap()
}
fn next_frame_for_work(&self, after: u64) -> u64 {
let f = after + self.steps_per_stimulus;
f - f % self.steps_per_stimulus
}
/// used internally.
/// waits for an outstanding job (if any).
/// if the response matches the request, return the response,
/// else discard the response.
fn maybe_wait_for_job(&self, meta: SimMeta<f32>, step: u64) -> Option<RenderedStimulus<R>> {
if !self.outstanding.get() {
return None;
}
// block until job is complete and receive the result
let completed = self.diag.instrument_stimuli_blocked(|| {
self.worker.recv()
});
let (job_meta, job_step, rendered) = completed;
self.outstanding.set(false);
Some(rendered)
.filter(|_| (job_meta, job_step) == (meta, step))
}
}
impl<R: Real, T: DriverStimulus<R> + Send + 'static> StimAccess<R, T> {
fn get_for(&mut self, meta: SimMeta<f32>, step: u64) -> RenderedStimulus<R> {
// either claim the outstanding job (if it exists and matches)...
self.maybe_wait_for_job(meta, step).unwrap_or_else(|| {
// or start a job and wait for it to complete inline
self.start_job(meta, step);
self.maybe_wait_for_job(meta, step).unwrap()
})
}
// begin rendering the stimulus in the background
fn start_job(&mut self, meta: SimMeta<f32>, step: u64) {
// only one in-progress job allowed!
assert!(!self.outstanding.get());
self.outstanding.set(true);
self.ensure_worker();
self.worker.send((meta, step));
}
fn ensure_worker(&mut self) {
if self.worker.num_workers() != 0 {
return;
}
let stim = self.stim.clone();
let diag = self.diag.clone();
self.worker.spawn_worker(move |(meta, step)| {
let stim = diag.instrument_stimuli(|| {
let stim = stim.lock().unwrap();
let opt = stim.optimized_for(meta, step);
opt.as_ref().rendered(
meta.time_step().cast(),
// TODO: convert this to an integer
meta.time_step().cast::<R>() * R::from_primitive(step),
meta.feature_size().cast(),
meta.dim()
).into_owned()
//^ this 'into_owned' ought to be a no-op.
//^ it would only ever be borrowed if we accidentally called `rendered` twice.
});
(meta, step, stim)
});
}
}
impl<R, S, T: Pushable<S>> Pushable<S> for StimAccess<R, T> {
fn push(&mut self, meta: SimMeta<f32>, t: S) {
// invalidate any outstanding jobs (because the stimulus will have changed)
let _ = self.maybe_wait_for_job(Default::default(), 0);
self.stim.lock().unwrap().push(meta, t)
}
}