diff --git a/crates/coremem/src/driver.rs b/crates/coremem/src/driver.rs index 7714b32..d3d7107 100644 --- a/crates/coremem/src/driver.rs +++ b/crates/coremem/src/driver.rs @@ -17,6 +17,7 @@ use crate::stim::{ TimeVarying, VectorField, }; +use crate::worker::JobPool; use coremem_cross::compound::list; use coremem_cross::dim::DimSlice; use coremem_cross::step::SimMeta; @@ -455,14 +456,7 @@ struct StimAccess { /// queue)? /// A.K.A. "can i safely do a blocking recv on response_channel". outstanding: Cell, - /// data sent from worker thread back to the Driver side. - /// XXX: Boxing isn't necessary, but doing so means much less memcopy'ing over the channel - /// (just one pointer, instead of N^3 bytes). better for perf. - response_channel: ( - SyncSender<(SimMeta, u64, Box>)>, - Receiver<(SimMeta, u64, Box>)>, - ), - worker: ThreadPool, + worker: JobPool<(SimMeta, u64), (SimMeta, u64, Box>)>, } impl StimAccess { @@ -472,8 +466,7 @@ impl StimAccess { steps_per_stimulus: 1, diag, outstanding: Cell::new(false), - response_channel: sync_channel(0), - worker: ThreadPool::new(1), + worker: JobPool::new(), } } fn into_inner(self) -> T { @@ -497,7 +490,7 @@ impl StimAccess { // block until job is complete and receive the result let completed = self.diag.instrument_stimuli_blocked(|| { - self.response_channel.1.recv().unwrap() + self.worker.recv() }); let (job_meta, job_step, rendered) = completed; self.outstanding.set(false); @@ -508,7 +501,7 @@ impl StimAccess { } impl + Send + 'static> StimAccess { - fn get_for(&self, meta: SimMeta, step: u64) -> Box> { + fn get_for(&mut self, meta: SimMeta, step: u64) -> Box> { // either claim the outstanding job (if it exists and matches)... self.maybe_wait_for_job(meta, step).unwrap_or_else(|| { // or start a job and wait for it to complete inline @@ -517,19 +510,23 @@ impl + Send + 'static> StimAccess { }) } // begin rendering the stimulus in the background - fn start_job(&self, meta: SimMeta, step: u64) { + fn start_job(&mut self, meta: SimMeta, step: u64) { // only one in-progress job allowed! assert!(!self.outstanding.get()); self.outstanding.set(true); - // these are cheap Arc clones - let diag = self.diag.clone(); - let stim = self.stim.clone(); - let response_handle = self.response_channel.0.clone(); + self.ensure_worker(); + self.worker.send((meta, step)); + } + fn ensure_worker(&mut self) { + if self.worker.num_workers() != 0 { + return; + } - self.worker.execute(move || { - trace!("eval_stimulus begin"); - let rendered = diag.instrument_stimuli(|| { + let stim = self.stim.clone(); + let diag = self.diag.clone(); + self.worker.spawn_worker(move |(meta, step)| { + let stim = diag.instrument_stimuli(|| { let stim = stim.lock().unwrap(); let opt = stim.optimized_for(meta, step); Box::new(opt.as_ref().rendered( @@ -542,10 +539,7 @@ impl + Send + 'static> StimAccess { //^ this 'into_owned' ought to be a no-op. //^ it would only ever be borrowed if we accidentally called `rendered` twice. }); - - // NB: this is blocking. - // it's important that we drop any locks before replying. - response_handle.send((meta, step, rendered)).unwrap(); + (meta, step, stim) }); } }