add a JobPool type to better abstract over repeat asynchronous work

This commit is contained in:
2022-08-22 19:30:07 -07:00
parent 5fff872890
commit 284b7368ef
4 changed files with 174 additions and 0 deletions

25
Cargo.lock generated
View File

@@ -314,6 +314,7 @@ dependencies = [
"common_macros",
"coremem_cross",
"criterion",
"crossbeam",
"crossterm",
"csv",
"dashmap",
@@ -409,6 +410,20 @@ dependencies = [
"itertools",
]
[[package]]
name = "crossbeam"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c"
dependencies = [
"cfg-if 1.0.0",
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-epoch",
"crossbeam-queue",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.5"
@@ -444,6 +459,16 @@ dependencies = [
"scopeguard",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cd42583b04998a5363558e5f9291ee5a5ff6b49944332103f251e7479a82aa7"
dependencies = [
"cfg-if 1.0.0",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.10"

View File

@@ -12,6 +12,7 @@ crate-type = ["lib"]
[dependencies]
bincode = "1.3" # MIT
common_macros = "0.1" # MIT or Apache 2.0
crossbeam = "0.8" # MIT or Apache 2.0
crossterm = "0.24" # MIT
csv = "1.1" # MIT or Unlicense
dashmap = "5.3" # MIT

View File

@@ -14,6 +14,7 @@ pub mod meas;
pub mod render;
pub mod sim;
pub mod stim;
pub mod worker;
pub use driver::*;
pub use sim::*;

View File

@@ -0,0 +1,147 @@
//! consumer/producer primitives
use crossbeam::channel::{self, Receiver, Sender};
pub struct JobPool<C, R, W> {
worker: Worker<C, R, W>,
// TODO: might want this to be ordinary Sender
command_chan: Sender<C>,
response_chan: Receiver<R>,
handles: Vec<std::thread::JoinHandle<()>>,
}
struct Worker<C, R, W> {
command_chan: Receiver<C>,
response_chan: Sender<R>,
work_fn: W,
}
impl<C, R, W: Clone> Clone for Worker<C, R, W> {
fn clone(&self) -> Self {
Self {
command_chan: self.command_chan.clone(),
response_chan: self.response_chan.clone(),
work_fn: self.work_fn.clone(),
}
}
}
impl<C: Send, R: Send, W: Fn(C) -> R> Worker<C, R, W> {
fn to_completion(self) {
for cmd in &self.command_chan {
let resp = (self.work_fn)(cmd);
let _ = self.response_chan.send(resp);
}
}
}
impl<C, R, W> JobPool<C, R, W> {
pub fn new(work_fn: W) -> Self {
let (cmd_send, cmd_recv) = channel::bounded(0);
let (resp_send, resp_recv) = channel::bounded(0);
let worker = Worker {
command_chan: cmd_recv,
response_chan: resp_send,
work_fn,
};
Self {
worker,
command_chan: cmd_send,
response_chan: resp_recv,
handles: vec![],
}
}
}
impl<C: Send + 'static, R: Send + 'static, W: Fn(C) -> R + Send + Clone + 'static> JobPool<C, R, W> {
pub fn spawn_workers(&mut self, n: u32) {
for _ in 0..n {
let worker = self.worker.clone();
self.handles.push(std::thread::spawn(move || {
worker.to_completion()
}));
}
}
}
impl<C, R, W> Drop for JobPool<C, R, W> {
fn drop(&mut self) {
// hang up the sender, to signal workers to exit.
(self.command_chan, _) = channel::bounded(0);
(_, self.response_chan) = channel::bounded(0);
for h in self.handles.drain(..) {
h.join().unwrap();
}
}
}
impl<C: Send + 'static, R, W> JobPool<C, R, W> {
pub fn send(&self, cmd: C) {
self.command_chan.send(cmd).unwrap();
}
}
impl<C, R: Send + 'static, W> JobPool<C, R, W> {
pub fn recv(&self) -> R {
self.response_chan.recv().unwrap()
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn lifecycle_no_workers() {
let _pool: JobPool<(), (), ()> = JobPool::new(());
}
#[test]
fn lifecycle_some_workers() {
let mut pool: JobPool<(), (), _> = JobPool::new(|_| ());
pool.spawn_workers(1);
pool.spawn_workers(2);
}
#[test]
fn single_worker() {
let mut pool: JobPool<u32, u32, _> = JobPool::new(|x| x*2);
pool.spawn_workers(1);
pool.send(5);
assert_eq!(pool.recv(), 10);
pool.send(4);
assert_eq!(pool.recv(), 8);
}
#[test]
fn multi_worker() {
use std::sync::{Arc, Mutex};
let mutex = Arc::new(Mutex::new(()));
let worker_mutex = mutex.clone();
let mut pool: JobPool<u32, u32, _> = JobPool::new(move |x| {
// wait until caller unlocks us
let _ = worker_mutex.lock().unwrap();
x*2
});
pool.spawn_workers(2);
pool.send(1);
assert_eq!(pool.recv(), 2);
{
let _lock = mutex.lock().unwrap();
pool.send(4);
pool.send(5); // shouldn't block
}
let mut replies = [pool.recv(), pool.recv()];
replies.sort();
assert_eq!(replies, [8, 10]);
}
#[test]
fn exit_with_unclaimed_responses() {
let mut pool: JobPool<u32, u32, _> = JobPool::new(|x| x*2);
pool.spawn_workers(2);
pool.send(5);
pool.send(6);
}
}