use crate::diagnostics::SyncDiagnostics; use crate::geom::{Coord, Index, Region}; use crate::mat; use crate::meas::{self, AbstractMeasurement}; use crate::real::Real; use crate::render::{self, MultiRenderer, Renderer}; use crate::sim::AbstractSim; use crate::sim::units::{Frame, Time}; use crate::stim::{ DynStimuli, Fields, FieldMags, ModulatedVectorField, RenderedStimulus, Stimulus, StimuliVec, TimeVarying, VectorField, }; use crate::worker::JobPool; use coremem_cross::compound::list; use coremem_cross::dim::DimSlice; use coremem_cross::step::SimMeta; use log::{info, trace}; use serde::{Deserialize, Serialize}; use std::cell::Cell; use std::path::PathBuf; use std::sync::{Arc, Mutex}; use std::time::Instant; pub struct Driver> { state: S, renderer: Arc>, render_pool: JobPool, measurements: Vec>>, stimuli: StimAccess, /// simulation end time sim_end_time: Option, diag: SyncDiagnostics, last_diag_time: Instant, } impl Driver { pub fn new_with_stim(mut state: S, stimuli: Stim) -> Self { let diag = SyncDiagnostics::new(); state.use_diagnostics(diag.clone()); Self { state, renderer: Arc::new(MultiRenderer::new()), render_pool: JobPool::new(1), measurements: vec![ Arc::new(meas::Time), Arc::new(meas::Meta), Arc::new(meas::Energy::world()), Arc::new(meas::Power::world()), ], stimuli: StimAccess::new(diag.clone(), stimuli), sim_end_time: None, diag, last_diag_time: Instant::now(), } } pub fn add_measurement + 'static>(&mut self, m: Meas) { self.measurements.push(Arc::new(m)); } pub fn add_stimulus(&mut self, s: SNew) where Stim: Pushable { self.stimuli.push(self.state.meta(), s) } } impl Driver> { pub fn new(state: S) -> Self { Self::new_with_stim(state, DriverStimulusDynVec::default()) } } impl Driver { pub fn new_list_stim(state: S) -> Self { Self::new_with_stim(state, list::Empty::default()) } } impl Driver { /// add a stimulus onto a list of non-monomorphized stimuli. /// this necessarily must return a new Self. /// (well, with enough tuning we could actually Box just the first reference... pub fn with_add_stimulus(self, s: E) -> Driver> where Stim: list::Appendable { Driver { state: self.state, renderer: self.renderer, render_pool: self.render_pool, measurements: self.measurements, stimuli: StimAccess::new(self.diag.clone(), self.stimuli.into_inner().append(s)), sim_end_time: self.sim_end_time, diag: self.diag, last_diag_time: self.last_diag_time, } } pub fn with_stimulus(self, stimuli: NewStim) -> Driver { Driver { state: self.state, renderer: self.renderer, render_pool: self.render_pool, measurements: self.measurements, stimuli: StimAccess::new(self.diag.clone(), stimuli), sim_end_time: self.sim_end_time, diag: self.diag, last_diag_time: self.last_diag_time, } } pub fn with_concrete_stimulus(self) -> Driver> { self.with_stimulus(DriverStimulusVec::::default()) } pub fn with_modulated_stimulus(self) -> Driver> { self.with_stimulus(DriverStimulusModulated::::default()) } } impl Driver { /// when we step the simulation N times, we do so with a constant stimulus over those N frames. /// lower-resolution quantization of stimuli lets us batch more step calls (critical to perf) /// but at the cost of precision. pub fn set_steps_per_stimulus(&mut self, steps: u64) { self.stimuli.steps_per_stimulus = steps; } } impl Driver { pub fn fill_region + Clone>(&mut self, region: &Reg, mat: M) { self.state.fill_region(region, mat); } pub fn test_region_filled(&mut self, region: &Reg, mat: M) -> bool where M: Into + Clone, S::Material: PartialEq { self.state.test_region_filled(region, mat) } pub fn size(&self) -> Index { self.state.size() } pub fn timestep(&self) -> f32 { self.state.timestep() } pub fn time(&self) -> f32 { self.state.time() } pub fn add_classical_boundary(&mut self, thickness: C) where S::Material: From> { let timestep = self.state.timestep(); self.state.fill_boundary_using(thickness, |boundary_ness| { let b = boundary_ness.elem_pow(3.0); let cond = b * (0.5 / timestep); let iso_cond = cond.x() + cond.y() + cond.z(); let iso_conductor = mat::IsomorphicConductor::new(iso_cond.cast()); iso_conductor }); } } impl Driver { fn add_renderer + 'static>( &mut self, renderer: Rend, name: &str, step_frequency: u64, frame_limit: Option ) { info!("render to {} at f={} until f={:?}", name, step_frequency, frame_limit); self.renderer.push(renderer, step_frequency, frame_limit); } pub fn add_y4m_renderer>(&mut self, output: P, step_frequency: u64, frame_limit: Option) { let output = output.into(); let name = output.to_string_lossy().into_owned(); self.add_renderer(render::Y4MRenderer::new(output), &*name, step_frequency, frame_limit); } pub fn add_term_renderer(&mut self, step_frequency: u64, frame_limit: Option) { self.add_renderer(render::ColorTermRenderer, "terminal", step_frequency, frame_limit); } pub fn add_csv_renderer(&mut self, path: &str, step_frequency: u64, frame_limit: Option) { self.add_renderer(render::CsvRenderer::new(path), path, step_frequency, frame_limit); } } impl Driver { pub fn add_serializer_renderer(&mut self, out_base: &str, step_frequency: u64, frame_limit: Option) { let fmt_str = format!("{out_base}{{step_no}}.bc", out_base=out_base); self.add_renderer(render::SerializerRenderer::new_generic(&*fmt_str), &*fmt_str, step_frequency, frame_limit); } } impl Driver where S: AbstractSim + Send + Sync + Serialize + for<'a> Deserialize<'a> + 'static { /// instruct the driver to periodically save the simulation state to the provided path. /// also attempts to load an existing state file, returning `true` on success. pub fn add_state_file(&mut self, state_file: &str, snapshot_frequency: u64) -> bool { let ser = render::SerializerRenderer::new(state_file); let loaded = ser.try_load().map(|s| { self.state = s.state; self.state.use_diagnostics(self.diag.clone()); }).is_some(); self.add_renderer(ser, state_file, snapshot_frequency, None); loaded } } impl Driver where S: AbstractSim + Clone + Default + Send + 'static, Stim: DriverStimulus + Send + 'static, { fn render(&mut self) { let their_state = self.diag.instrument_render_prep(|| { if self.render_pool.num_workers() != 3 { let diag = self.diag.clone(); // TODO: these measurements will come to differ from the ones in the Driver, // if the user calls `add_measurement`! let measurements = self.measurements.clone(); let renderer = self.renderer.clone(); self.render_pool.spawn_workers(3, move |state| { // unblock the main thread (this limits the number of renders in-flight at any time trace!("render begin"); diag.instrument_render_cpu_side(|| { let meas: Vec<&dyn AbstractMeasurement> = measurements.iter().map(|m| &**m).collect(); renderer.render(&state, &*meas, Default::default()); }); trace!("render end"); }); } self.state.clone() }); // TODO: this instrumentation is not 100% accurate. // - 'prep' and 'blocked' have effectively been folded together. // - either delete 'prep', or change this block to use a `try_send` (prep) followed by a // `send` (blocking) self.diag.instrument_render_blocked(|| { self.render_pool.tend(); self.render_pool.send(their_state); }); } /// Return the number of steps actually stepped fn step_at_most(&mut self, at_most: u32) -> u32 { let diag = self.diag.clone(); diag.instrument_driver(move || { assert!(at_most != 0); let start_step = self.state.step_no(); if self.renderer.any_work_for_frame(start_step) { self.render(); } // maybe the renderer or stimulus needs servicing before the max frame the user asked for. // step less than `at_most`, in that case. let next_frame_for_user = start_step + at_most as u64; let next_frame_to_render = self.renderer.next_frame_for_work(start_step); let next_frame_for_stim = self.stimuli.next_frame_for_work(start_step); let step_to = [Some(next_frame_for_user), next_frame_to_render, Some(next_frame_for_stim)] .into_iter() .flatten() .min() .unwrap(); let steps_this_time = (step_to - start_step).try_into().unwrap(); let meta = self.state.meta(); let stim = self.stimuli.get_for(meta, start_step); // prefetch the next stimulus, in the background. self.diag.instrument_stimuli_prep(|| { self.stimuli.start_job(meta, step_to); }); trace!("step begin"); self.diag.instrument_step(steps_this_time as u64, || { self.state.step_multiple(steps_this_time, &stim); }); trace!("step end"); if self.last_diag_time.elapsed().as_secs_f64() >= 5.0 { self.last_diag_time = Instant::now(); let step = self.state.step_no(); let diagstr = self.diag.format(); let sim_time = self.state.time() as f64; let percent_complete = self.sim_end_time.map(|t| { format!("[{:.1}%] ", 100.0 * self.state.time() / *t.to_seconds(self.timestep())) }).unwrap_or_default(); info!( "{}t={:.2e} frame {:06} {}", percent_complete, sim_time, step, diagstr ); } steps_this_time }) } pub fn step(&mut self) { self.step_at_most(1); } /// Returns the number of timesteps needed to reach the end time pub fn steps_until(&mut self, sim_end_time: T) -> u64 { let sim_end_step = sim_end_time.to_frame(self.state.timestep()); let start_step = self.state.step_no(); sim_end_step.saturating_sub(start_step) } pub fn step_until(&mut self, sim_end_time: T) { let sim_end_time = sim_end_time.to_frame(self.state.timestep()); self.sim_end_time = Some(sim_end_time); let mut stepped = false; while self.state.step_no() < *sim_end_time { let steps_left = *sim_end_time - self.state.step_no(); // sanity limit: don't try to step too much at once else we may lock up the GPU/etc. self.step_at_most(steps_left.min(1000) as u32); stepped = true; } if stepped { // render the final frame -- unless we already *have* self.render(); } self.render_pool.join_workers(); self.sim_end_time = None; } } // this is effectively `Cow`, but without the `ToOwned` (Clone) requirement pub enum ValueOrRef<'a, T> { Value(T), Ref(&'a T), } impl<'a, T> AsRef for ValueOrRef<'a, T> { fn as_ref(&self) -> &T { match self { ValueOrRef::Value(x) => &x, ValueOrRef::Ref(x) => x, } } } /// gives an opportunity to optimize a Stimulus for a specific setting /// before passing it off to the simulation. pub trait DriverStimulus { type Optimized: Stimulus; fn optimized_for<'a>( &'a self, meta: SimMeta, _step: u64 ) -> ValueOrRef<'a, Self::Optimized>; } pub trait Pushable { fn push(&mut self, meta: SimMeta, t: T); } pub struct DriverStimulusVec(StimuliVec); impl Default for DriverStimulusVec { fn default() -> Self { Self(Default::default()) } } impl> DriverStimulus for DriverStimulusVec { type Optimized = StimuliVec; fn optimized_for<'a>( &'a self, _meta: SimMeta, _step: u64 ) -> ValueOrRef<'a, Self::Optimized> { ValueOrRef::Ref(&self.0) } } impl Pushable for DriverStimulusVec { fn push(&mut self, _meta: SimMeta, t: T) { self.0.push(t) } } #[derive(Default)] pub struct DriverStimulusDynVec(DynStimuli); impl DriverStimulus for DriverStimulusDynVec { type Optimized = DynStimuli; fn optimized_for<'a>( &'a self, _meta: SimMeta, _step: u64 ) -> ValueOrRef<'a, Self::Optimized> { ValueOrRef::Ref(&self.0) } } impl + Send + 'static> Pushable for DriverStimulusDynVec { fn push(&mut self, _meta: SimMeta, t: T) { self.0.push(Box::new(t)) } } /// optimized stimulus which will evaluate the vector fields _only once_ pub struct DriverStimulusModulated(StimuliVec>); impl Default for DriverStimulusModulated { fn default() -> Self { Self(Default::default()) } } impl, T> Pushable> for DriverStimulusModulated { fn push(&mut self, meta: SimMeta, stim: ModulatedVectorField) { let (vfield, timef) = stim.into_inner(); let dim = meta.dim(); let mut storage = Vec::new(); storage.resize_with(dim.product_sum_usize(), Fields::default); let mut view = DimSlice::new(dim, storage); let mut_view: DimSlice<&mut [_]> = view.as_mut(); for (loc, value) in mut_view.enumerated() { *value = vfield.at(meta.feature_size().cast(), loc.into()); } self.0.push(ModulatedStaticField::new(view, timef)) } } impl> DriverStimulus for DriverStimulusModulated { type Optimized = StimuliVec>>; fn optimized_for<'a>( &'a self, meta: SimMeta, step: u64, ) -> ValueOrRef<'a, Self::Optimized> { let t_sec = meta.time_step().cast::() * R::from_primitive(step); let opt = self.0.iter().map(|modulated| ModulatedVectorField::new( // TODO: remove this costly clone! (*modulated.fields()).clone(), modulated.modulation().at(t_sec), )).collect(); ValueOrRef::Value(StimuliVec::from_vec(opt)) } } /// a Stimulus where the field has been pre-calculated pub type ModulatedStaticField = ModulatedVectorField>>, T>; /// wraps a Stimulus to help provide async functionality on top of it. /// the caller can request evaluation at a specific time, and either block on that or /// come back and re-request that time later, expecting that it's been evaluated in the background. struct StimAccess { stim: Arc>, steps_per_stimulus: u64, diag: SyncDiagnostics, /// is the background thread doing work (or, has it completed work and placed it on the return /// queue)? /// A.K.A. "can i safely do a blocking recv on response_channel". outstanding: Cell, worker: JobPool<(SimMeta, u64), (SimMeta, u64, RenderedStimulus)>, } impl StimAccess { fn new(diag: SyncDiagnostics, stim: T) -> Self { Self { stim: Arc::new(Mutex::new(stim)), steps_per_stimulus: 1, diag, outstanding: Cell::new(false), worker: JobPool::new(1), } } fn into_inner(self) -> T { let _ = self.maybe_wait_for_job(Default::default(), 0); // with the worker joined, there should be no outstanding handles on the arc. Arc::try_unwrap(self.stim).ok().unwrap().into_inner().unwrap() } fn next_frame_for_work(&self, after: u64) -> u64 { let f = after + self.steps_per_stimulus; f - f % self.steps_per_stimulus } /// used internally. /// waits for an outstanding job (if any). /// if the response matches the request, return the response, /// else discard the response. fn maybe_wait_for_job(&self, meta: SimMeta, step: u64) -> Option> { if !self.outstanding.get() { return None; } // block until job is complete and receive the result let completed = self.diag.instrument_stimuli_blocked(|| { self.worker.recv() }); let (job_meta, job_step, rendered) = completed; self.outstanding.set(false); Some(rendered) .filter(|_| (job_meta, job_step) == (meta, step)) } } impl + Send + 'static> StimAccess { fn get_for(&mut self, meta: SimMeta, step: u64) -> RenderedStimulus { // either claim the outstanding job (if it exists and matches)... self.maybe_wait_for_job(meta, step).unwrap_or_else(|| { // or start a job and wait for it to complete inline self.start_job(meta, step); self.maybe_wait_for_job(meta, step).unwrap() }) } // begin rendering the stimulus in the background fn start_job(&mut self, meta: SimMeta, step: u64) { // only one in-progress job allowed! assert!(!self.outstanding.get()); self.outstanding.set(true); self.ensure_worker(); self.worker.send((meta, step)); } fn ensure_worker(&mut self) { if self.worker.num_workers() != 0 { return; } let stim = self.stim.clone(); let diag = self.diag.clone(); self.worker.spawn_worker(move |(meta, step)| { let stim = diag.instrument_stimuli(|| { let stim = stim.lock().unwrap(); let opt = stim.optimized_for(meta, step); opt.as_ref().rendered( meta.time_step().cast(), // TODO: convert this to an integer meta.time_step().cast::() * R::from_primitive(step), meta.feature_size().cast(), meta.dim() ).into_owned() //^ this 'into_owned' ought to be a no-op. //^ it would only ever be borrowed if we accidentally called `rendered` twice. }); (meta, step, stim) }); } } impl> Pushable for StimAccess { fn push(&mut self, meta: SimMeta, t: S) { // invalidate any outstanding jobs (because the stimulus will have changed) let _ = self.maybe_wait_for_job(Default::default(), 0); self.stim.lock().unwrap().push(meta, t) } }