Driver: stimulus: use our own JobPool abstraction
this one leverages crossbeam. it does appear to schedule jobs faster than the rust-threadpool. curiously, the `recv` impl seems to be slower. maybe that's because of the (inadvertent, unecessary) switch from mpsc to mppc. worth trying to just insert a buffer.
This commit is contained in:
@@ -17,6 +17,7 @@ use crate::stim::{
|
|||||||
TimeVarying,
|
TimeVarying,
|
||||||
VectorField,
|
VectorField,
|
||||||
};
|
};
|
||||||
|
use crate::worker::JobPool;
|
||||||
use coremem_cross::compound::list;
|
use coremem_cross::compound::list;
|
||||||
use coremem_cross::dim::DimSlice;
|
use coremem_cross::dim::DimSlice;
|
||||||
use coremem_cross::step::SimMeta;
|
use coremem_cross::step::SimMeta;
|
||||||
@@ -455,14 +456,7 @@ struct StimAccess<R, T> {
|
|||||||
/// queue)?
|
/// queue)?
|
||||||
/// A.K.A. "can i safely do a blocking recv on response_channel".
|
/// A.K.A. "can i safely do a blocking recv on response_channel".
|
||||||
outstanding: Cell<bool>,
|
outstanding: Cell<bool>,
|
||||||
/// data sent from worker thread back to the Driver side.
|
worker: JobPool<(SimMeta<f32>, u64), (SimMeta<f32>, u64, Box<RenderedStimulus<R>>)>,
|
||||||
/// XXX: Boxing isn't necessary, but doing so means much less memcopy'ing over the channel
|
|
||||||
/// (just one pointer, instead of N^3 bytes). better for perf.
|
|
||||||
response_channel: (
|
|
||||||
SyncSender<(SimMeta<f32>, u64, Box<RenderedStimulus<R>>)>,
|
|
||||||
Receiver<(SimMeta<f32>, u64, Box<RenderedStimulus<R>>)>,
|
|
||||||
),
|
|
||||||
worker: ThreadPool,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R, T> StimAccess<R, T> {
|
impl<R, T> StimAccess<R, T> {
|
||||||
@@ -472,8 +466,7 @@ impl<R, T> StimAccess<R, T> {
|
|||||||
steps_per_stimulus: 1,
|
steps_per_stimulus: 1,
|
||||||
diag,
|
diag,
|
||||||
outstanding: Cell::new(false),
|
outstanding: Cell::new(false),
|
||||||
response_channel: sync_channel(0),
|
worker: JobPool::new(),
|
||||||
worker: ThreadPool::new(1),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fn into_inner(self) -> T {
|
fn into_inner(self) -> T {
|
||||||
@@ -497,7 +490,7 @@ impl<R, T> StimAccess<R, T> {
|
|||||||
|
|
||||||
// block until job is complete and receive the result
|
// block until job is complete and receive the result
|
||||||
let completed = self.diag.instrument_stimuli_blocked(|| {
|
let completed = self.diag.instrument_stimuli_blocked(|| {
|
||||||
self.response_channel.1.recv().unwrap()
|
self.worker.recv()
|
||||||
});
|
});
|
||||||
let (job_meta, job_step, rendered) = completed;
|
let (job_meta, job_step, rendered) = completed;
|
||||||
self.outstanding.set(false);
|
self.outstanding.set(false);
|
||||||
@@ -508,7 +501,7 @@ impl<R, T> StimAccess<R, T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<R: Real, T: DriverStimulus<R> + Send + 'static> StimAccess<R, T> {
|
impl<R: Real, T: DriverStimulus<R> + Send + 'static> StimAccess<R, T> {
|
||||||
fn get_for(&self, meta: SimMeta<f32>, step: u64) -> Box<RenderedStimulus<R>> {
|
fn get_for(&mut self, meta: SimMeta<f32>, step: u64) -> Box<RenderedStimulus<R>> {
|
||||||
// either claim the outstanding job (if it exists and matches)...
|
// either claim the outstanding job (if it exists and matches)...
|
||||||
self.maybe_wait_for_job(meta, step).unwrap_or_else(|| {
|
self.maybe_wait_for_job(meta, step).unwrap_or_else(|| {
|
||||||
// or start a job and wait for it to complete inline
|
// or start a job and wait for it to complete inline
|
||||||
@@ -517,19 +510,23 @@ impl<R: Real, T: DriverStimulus<R> + Send + 'static> StimAccess<R, T> {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
// begin rendering the stimulus in the background
|
// begin rendering the stimulus in the background
|
||||||
fn start_job(&self, meta: SimMeta<f32>, step: u64) {
|
fn start_job(&mut self, meta: SimMeta<f32>, step: u64) {
|
||||||
// only one in-progress job allowed!
|
// only one in-progress job allowed!
|
||||||
assert!(!self.outstanding.get());
|
assert!(!self.outstanding.get());
|
||||||
self.outstanding.set(true);
|
self.outstanding.set(true);
|
||||||
|
|
||||||
// these are cheap Arc clones
|
self.ensure_worker();
|
||||||
let diag = self.diag.clone();
|
self.worker.send((meta, step));
|
||||||
let stim = self.stim.clone();
|
}
|
||||||
let response_handle = self.response_channel.0.clone();
|
fn ensure_worker(&mut self) {
|
||||||
|
if self.worker.num_workers() != 0 {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
self.worker.execute(move || {
|
let stim = self.stim.clone();
|
||||||
trace!("eval_stimulus begin");
|
let diag = self.diag.clone();
|
||||||
let rendered = diag.instrument_stimuli(|| {
|
self.worker.spawn_worker(move |(meta, step)| {
|
||||||
|
let stim = diag.instrument_stimuli(|| {
|
||||||
let stim = stim.lock().unwrap();
|
let stim = stim.lock().unwrap();
|
||||||
let opt = stim.optimized_for(meta, step);
|
let opt = stim.optimized_for(meta, step);
|
||||||
Box::new(opt.as_ref().rendered(
|
Box::new(opt.as_ref().rendered(
|
||||||
@@ -542,10 +539,7 @@ impl<R: Real, T: DriverStimulus<R> + Send + 'static> StimAccess<R, T> {
|
|||||||
//^ this 'into_owned' ought to be a no-op.
|
//^ this 'into_owned' ought to be a no-op.
|
||||||
//^ it would only ever be borrowed if we accidentally called `rendered` twice.
|
//^ it would only ever be borrowed if we accidentally called `rendered` twice.
|
||||||
});
|
});
|
||||||
|
(meta, step, stim)
|
||||||
// NB: this is blocking.
|
|
||||||
// it's important that we drop any locks before replying.
|
|
||||||
response_handle.send((meta, step, rendered)).unwrap();
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user