cleaned up Server impl to ease adding new connection handlers

This commit is contained in:
Benjamin Fry 2016-07-10 22:04:34 -07:00
parent ba8eee06d3
commit 26b44483b0
2 changed files with 279 additions and 183 deletions

View File

@ -6,6 +6,9 @@ This project adheres to [Semantic Versioning](http://semver.org/).
### Fixed
- Randomized ports for client connections and message ids, #23
### Changed
- Cleaned up the Server implementation to isolate connection handlers
## 0.7.0 2016-06-20
### Added
- Added recovery from journal to named startup

View File

@ -19,7 +19,7 @@ use std::io;
use std::sync::Arc;
use std::cell::Cell;
use mio::{Token, EventLoop, Handler, EventSet, PollOpt};
use mio::{Token, Evented, EventLoop, Handler, EventSet, PollOpt};
use mio::tcp::{TcpListener, TcpStream};
use mio::udp::UdpSocket;
@ -31,13 +31,7 @@ use ::udp::{UdpHandler, UdpState};
// TODO, might be cool to store buffers for later usage...
pub struct Server {
udp_sockets: HashMap<Token, UdpSocket>,
// for each udp_socket, there is a set of udp_responses. The token is the same as the one
// registered in udp_sockets above, and the vector is the set of addresses for which we have
// responses.
udp_requests: HashMap<Token, VecDeque<UdpHandler>>,
tcp_sockets: HashMap<Token, TcpListener>,
tcp_handlers: HashMap<Token, TcpHandler>,
handlers: HashMap<Token, DnsHandlerType>,
next_token: Cell<usize>,
catalog: Arc<Catalog>, // should the catalog just be static?
}
@ -45,10 +39,7 @@ pub struct Server {
impl Server {
pub fn new(catalog: Catalog) -> Server {
Server {
udp_sockets: HashMap::new(),
udp_requests: HashMap::new(),
tcp_sockets: HashMap::new(),
tcp_handlers: HashMap::new(),
handlers: HashMap::new(),
next_token: Cell::new(0),
catalog: Arc::new(catalog),
}
@ -58,9 +49,7 @@ impl Server {
for _ in 0..100 {
self.next_token.set(self.next_token.get()+1);
let token: Token = Token(self.next_token.get());
if self.tcp_sockets.contains_key(&token) { continue }
else if self.tcp_handlers.contains_key(&token) { continue }
else if self.udp_sockets.contains_key(&token) { continue }
if self.handlers.contains_key(&token) { continue }
// ok, safe to use
return token;
@ -72,14 +61,14 @@ impl Server {
/// register a UDP socket. Should be bound before calling this.
pub fn register_socket(&mut self, socket: UdpSocket) {
let token = self.next_token();
self.udp_sockets.insert(token, socket);
self.handlers.insert(token, DnsHandlerType::UdpSocket((socket, VecDeque::new())));
}
/// register a TcpListener to the Server. This should already be bound to either an IPv6 or an
/// IPv4 address.
pub fn register_listener(&mut self, listener: TcpListener) {
let token = self.next_token();
self.tcp_sockets.insert(token, listener);
self.handlers.insert(token, DnsHandlerType::TcpListener(listener));
}
/// TODO how to do threads? should we do a bunch of listener threads and then query threads?
@ -90,8 +79,13 @@ impl Server {
let mut event_loop: EventLoop<Self> = try!(EventLoop::new());
// registering these on non-writable events, since these are the listeners.
for (ref token, ref socket) in &self.udp_sockets { try!(event_loop.register(*socket, **token, !EventSet::writable(), PollOpt::all())); }
for (ref token, ref socket) in &self.tcp_sockets { try!(event_loop.register(*socket, **token, !EventSet::writable(), PollOpt::all())); }
for (token, handler) in self.handlers.iter() {
match *handler {
DnsHandlerType::UdpSocket(ref handler) => try!(event_loop.register(handler.get_socket(), *token, !EventSet::writable(), PollOpt::all())),
DnsHandlerType::TcpListener(ref handler) => try!(event_loop.register(handler.get_socket(), *token, !EventSet::writable(), PollOpt::all())),
DnsHandlerType::TcpHandler(_) => panic!("tcp handlers should not have been registered yet"),
}
}
try!(event_loop.run(self));
@ -140,175 +134,279 @@ impl Server {
}
}
// each dns handler type is used for managing client requests.
enum DnsHandlerType {
// the deque represents responses that need to be sent back to the client
// as of now this these are local requests, but in the future these will be resolver based
// responses, which will have some time between resolve and response
UdpSocket((UdpSocket, VecDeque<UdpHandler>)),
// Inbound TCP connections
TcpListener(TcpListener),
// Handlers for the TCP connections
TcpHandler(TcpHandler),
}
/// Handler for DNS requests
trait DnsHandler {
/// Called when the Evented of the Handler is woken up on activity
///
/// # Arguments
/// * `events` - the set of events that that woke this socket up
/// * `catalog` - the local catalog for lookups
///
/// # Return
///
/// Returns a tuple of the next event_set for this handler, and/or a new handler to add to the
/// the event_loop. If the first of the tuple is None, self will be removed from the event_loop.
/// If the second is None, nothing will happen, otherwise the new handler will be added to the
/// event_loop.
fn handle(&mut self, events: EventSet, catalog: &Arc<Catalog>) -> (Option<EventSet>, Option<(DnsHandlerType, EventSet)>);
/// returns the Evented which self wraps.
fn get_socket(&self) -> &Evented;
}
impl DnsHandler for DnsHandlerType {
fn handle(&mut self, events: EventSet, catalog: &Arc<Catalog>) -> (Option<EventSet>, Option<(DnsHandlerType, EventSet)>) {
match *self {
DnsHandlerType::UdpSocket(ref mut udp_handler) => udp_handler.handle(events, catalog),
DnsHandlerType::TcpListener(ref mut tcp_listener) => tcp_listener.handle(events, catalog),
DnsHandlerType::TcpHandler(ref mut tcp_handler) => tcp_handler.handle(events, catalog),
}
}
fn get_socket(&self) -> &Evented {
match *self {
DnsHandlerType::UdpSocket(ref udp_handler) => udp_handler.get_socket() as &Evented,
DnsHandlerType::TcpListener(ref tcp_listener) => tcp_listener.get_socket() as &Evented,
DnsHandlerType::TcpHandler(ref tcp_handler) => tcp_handler.get_socket() as &Evented,
}
}
}
impl DnsHandler for TcpListener {
fn handle(&mut self, events: EventSet, _: &Arc<Catalog>) -> (Option<EventSet>, Option<(DnsHandlerType, EventSet)>) {
if events.is_error() { panic!("unexpected error state on: {:?}", self) }
else if events.is_hup() { panic!("listening socket hungup: {:?}", self) }
else if events.is_readable() || events.is_writable() {
// there's a new connection coming in
// give it a new token and insert the stream on the eventlistener
// then store in the map for reference when dealing with new streams
for _ in 0..100 { // loop a max of 100 times, don't want to starve the responses.
match self.accept() {
Ok(Some((stream, addr))) => {
info!("new tcp connection from: {}", addr);
return (Some(EventSet::all()), Some((DnsHandlerType::TcpHandler(TcpHandler::new_server_handler(stream)),
!EventSet::writable())))
},
Ok(None) => {
return (Some(EventSet::all()), None)
},
Err(e) => panic!("unexpected error accepting: {}", e),
}
}
}
// this should never happen
return (Some(EventSet::all()), None)
}
fn get_socket(&self) -> &Evented {
return self as &Evented
}
}
impl DnsHandler for (UdpSocket, VecDeque<UdpHandler>) {
fn handle(&mut self, events: EventSet, catalog: &Arc<Catalog>) -> (Option<EventSet>, Option<(DnsHandlerType, EventSet)>) {
let ref socket = self.0;
let ref mut requests = self.1;
if events.is_error() {
panic!("unexpected socket error: {:?}", socket)
} else if events.is_hup() {
panic!("unexpected socket hangup: {:?}", socket)
} else {
let mut next_event: EventSet = EventSet::all();
// process the responses before the requests...
if events.is_writable() {
let mut remove: Vec<usize> = Vec::new();
// send all the data for the incomplete requests
for (i, req) in requests.iter().enumerate() {
match req.handle_message (&socket, events) {
Ok(UdpState::Done) => {
// complete, remove
remove.push(i);
},
Ok(..) => {
// Noop, request not complete
},
Err(ref e) if io::ErrorKind::WouldBlock == e.kind() => {
// this is expected with the connection would block
// noop
},
Err(e) => {
// shutdown the connection, remove it.
warn!("error writing socket: {:?} error: {}", socket, e);
// TODO: do we need to shutdown the stream?
remove.push(i);
}
}
}
// remove the complete requests
for i in remove {
requests.remove(i);
// TODO might want to compress the list here, as it could become a leak after a large
// set of requests.
}
if requests.is_empty() {
next_event = !EventSet::writable();
}
}
// now process the incoming requests
if events.is_readable() {
// collect new requests
// TODO: could a ton of inbound requests starve the server
for _ in 0..100 {
if let Some(handler) = UdpHandler::new_server(&socket, catalog.clone()) {
// this is a new request for a UDP transaction
// let the handler read, etc.
requests.push_back(handler);
next_event = EventSet::all();
} else {
break
}
}
}
return (Some(next_event), None)
}
return (Some(EventSet::all()), None)
}
fn get_socket(&self) -> &Evented {
return &self.0 as &Evented
}
}
impl DnsHandler for TcpHandler {
fn handle(&mut self, events: EventSet, catalog: &Arc<Catalog>) -> (Option<EventSet>, Option<(DnsHandlerType, EventSet)>) {
if events.is_error() {
warn!("closing, error from: {:?}", self.get_stream());
// TODO: do we need to shutdown the stream?
return (None, None);
} else if events.is_hup() {
info!("client hungup: {:?}", self.get_stream());
// TODO: do we need to shutdown the stream?
return (None, None);
} else if events.is_readable() || events.is_writable() {
let mut process_resquest = false;
// the handler will deal with the rest of the connection, we need to check the return value
// for an error with wouldblock, this means that the handler couldn't complete the request.
match self.handle_message(events) {
Ok(TcpState::Done) => {
// reset, the client will close the connection according to the spec
self.reset();
debug!("TcpState::Done");
},
Ok(TcpState::WillWriteLength) => {
// this means that we have gotten through recieving a packet
process_resquest = true;
debug!("TcpState::WillWriteLength");
}
Ok(..) => {
// registering the event to only wake up on the correct event
// this reduces looping on states like writable that can remain set for a long time
debug!("reregistering for next call: {:?}", self.get_events());
},
Err(ref e) if io::ErrorKind::WouldBlock == e.kind() => {
// this is expected with the connection would block
// noop
},
Err(e) => {
// shutdown the connection, remove it.
warn!("connection: {:?} shutdown on error: {}", self.get_stream(), e);
// TODO: do we need to shutdown the stream?
return (None, None);
}
}
// need to process the response
if process_resquest {
let response = Server::process_request(self.get_buffer(), self.get_stream(), catalog.as_ref());
if Server::encode_message(response, self.get_buffer_mut()).is_err() {
warn!("could not encode message to: {:?}", self.get_stream());
return (None, None)
}
}
}
debug!("reregistering for next call: {:?}", self.get_events());
return (Some(self.get_events()), None)
}
fn get_socket(&self) -> &Evented {
return self.get_stream() as &Evented
}
}
impl Handler for Server {
type Timeout = Token; // Timeouts are registered with tokens.
type Message = ();
fn ready(&mut self, event_loop: &mut EventLoop<Self>, token: Token, events: EventSet) {
let mut remove: Option<RemoveFrom> = None;
let mut remove_token: Option<Token> = None;
let mut add_handler: Option<(DnsHandlerType, EventSet)> = None;
if let Some(socket) = self.udp_sockets.get(&token) {
if events.is_error() {
panic!("unexpected socket error: {:?}", socket)
} else if events.is_hup() {
panic!("unexpected socket hangup: {:?}", socket)
// The token should always exist
if let Some(mut handler) = self.handlers.get_mut(&token) {
// the handler will perform the lookup or other actions.
// if none is returned for event_set_opt, the handler will be revmoed
let (event_set_opt, add) = handler.handle(events, &self.catalog);
// this represents a new handler to watch
add_handler = add;
// given the new event_set option, reregister the socket
if let Some(event_set) = event_set_opt {
let socket: &Evented = handler.get_socket();
if let Err(err) = event_loop.reregister(socket, token, event_set, PollOpt::all()) {
// removing the socket in case of an error
warn!("cound not reregister {:?}: {}", token, err);
remove_token = Some(token);
}
} else {
// process the responses before the requests...
if events.is_writable() {
// send out our queued up responses
if let Some(reqs) = self.udp_requests.get_mut(&token) {
let mut remove: Vec<usize> = Vec::new();
// send all the data for the incomplete requests
for (i, req) in reqs.iter().enumerate() {
match req.handle_message (&socket, events) {
Ok(UdpState::Done) => {
// complete, remove
remove.push(i);
},
Ok(..) => {
// Noop, request not complete
},
Err(ref e) if io::ErrorKind::WouldBlock == e.kind() => {
// this is expected with the connection would block
// noop
},
Err(e) => {
// shutdown the connection, remove it.
warn!("error writing socket: {:?} error: {}", socket, e);
// TODO: do we need to shutdown the stream?
remove.push(i);
}
}
}
// remove the complete requests
for i in remove {
reqs.remove(i);
// TODO might want to compress the list here, as it could become a leak after a large
// set of requests.
}
if reqs.is_empty() {
// theres nothing left you write, go back to just reading...
if let Err(e) = event_loop.reregister(socket, token, !EventSet::writable(), PollOpt::all()) {
error!("could not reregister socket: {:?} error: {}", socket, e);
}
}
}
}
// now process the incoming requests
if events.is_readable() {
// collect new requests
// TODO: could a ton of inbound requests starve the server
while let Some(handler) = UdpHandler::new_server(socket, self.catalog.clone()) {
// this is a new request for a UDP transaction
// let the handler read, etc.
self.udp_requests.entry(token).or_insert(VecDeque::new()).push_back(handler);
// reregeister the UDP socket for writes
if let Err(e) = event_loop.reregister(socket, token, EventSet::all(), PollOpt::all()) {
error!("could not reregister socket: {:?} error: {}", socket, e);
}
}
}
}
} else if let Some(ref socket) = self.tcp_sockets.get(&token) {
if events.is_error() { panic!("unexpected error state on: {:?}", socket) }
else if events.is_hup() { panic!("listening socket hungup: {:?}", socket) }
else if events.is_readable() || events.is_writable() {
// there's a new connection coming in
// give it a new token and insert the stream on the eventlistener
// then store in the map for reference when dealing with new streams
loop {
match socket.accept() {
Ok(Some((stream, addr))) => {
let token = self.next_token();
// initially we want readable sockets...
match event_loop.register(&stream, token, !EventSet::writable(), PollOpt::level()) {
Err(e) => error!("could not register stream: {:?} cause: {}", stream, e),
Ok(()) => {
info!("accepted tcp connection from: {:?} on {:?}", addr, stream.local_addr().ok());
self.tcp_handlers.insert(token, TcpHandler::new_server_handler(stream));
}
}
},
Ok(None) => return,
Err(e) => panic!("unexpected error accepting: {}", e),
}
}
}
} else if let Some(ref mut handler) = self.tcp_handlers.get_mut(&token) {
if events.is_error() {
warn!("closing, error from: {:?}", handler.get_stream());
// TODO: do we need to shutdown the stream?
remove = Some(RemoveFrom::TcpHandlers(token));
} else if events.is_hup() {
info!("client hungup: {:?}", handler.get_stream());
// TODO: do we need to shutdown the stream?
remove = Some(RemoveFrom::TcpHandlers(token));
} else if events.is_readable() || events.is_writable() {
let mut process_resquest = false;
// the handler will deal with the rest of the connection, we need to check the return value
// for an error with wouldblock, this means that the handler couldn't complete the request.
match handler.handle_message(events) {
Ok(TcpState::Done) => {
// reset, the client will close the connection according to the spec
handler.reset();
debug!("TcpState::Done");
},
Ok(TcpState::WillWriteLength) => {
// this means that we have gotten through recieving a packet
process_resquest = true;
debug!("TcpState::WillWriteLength");
}
Ok(..) => {
// registering the event to only wake up on the correct event
// this reduces looping on states like writable that can remain set for a long time
//if let Err(e) = event_loop.reregister(handler.get_stream(), token, handler.get_events(), PollOpt::level()) {
debug!("reregistering for next call: {:?}", handler.get_events());
if let Err(e) = event_loop.reregister(handler.get_stream(), token, handler.get_events(), PollOpt::all()) {
error!("could not reregister stream: {:?} cause: {}", handler.get_stream(), e);
remove = Some(RemoveFrom::TcpHandlers(token));
}
},
Err(ref e) if io::ErrorKind::WouldBlock == e.kind() => {
// this is expected with the connection would block
// noop
},
Err(e) => {
// shutdown the connection, remove it.
warn!("connection: {:?} shutdown on error: {}", handler.get_stream(), e);
// TODO: do we need to shutdown the stream?
remove = Some(RemoveFrom::TcpHandlers(token));
}
}
// need to process the response
if process_resquest {
let response = Self::process_request(handler.get_buffer(), handler.get_stream(), self.catalog.as_ref());
if Self::encode_message(response, handler.get_buffer_mut()).is_err() {
warn!("could not encode message to: {:?}", handler.get_stream());
remove = Some(RemoveFrom::TcpHandlers(token))
}
debug!("reregistering for next call: {:?}", handler.get_events());
if let Err(e) = event_loop.reregister(handler.get_stream(), token, handler.get_events(), PollOpt::all()) {
error!("could not reregister stream: {:?} cause: {}", handler.get_stream(), e);
remove = Some(RemoveFrom::TcpHandlers(token));
}
}
remove_token = Some(token);
}
}
// check if we need to remove something
match remove {
Some(RemoveFrom::TcpHandlers(t)) => { self.tcp_handlers.remove(&t); },
//Some(RemoveFrom::UdpRequests(t)) => { self.udp_requests.remove(&t); },
None => (),
// unregister the token
remove_token.and_then(|token| {
self.handlers.remove(&token)
}).and_then(|handler| {
event_loop.deregister(handler.get_socket()).unwrap_or_else(|e| debug!("error deregistering: {}", e));
Some(())
});
// need to register a new handler if there was one.
if let Some((handler, event_set)) = add_handler {
let register_res: io::Result<Token> = {
let socket: &Evented = handler.get_socket();
let next_token = self.next_token();
event_loop.register(socket, next_token, event_set, PollOpt::all()).map(|_| next_token)
};
match register_res {
Ok(token) => { self.handlers.insert(token, handler); },
Err(err) => warn!("error registering handler: {}", err),
}
}
}
@ -319,11 +417,6 @@ impl Handler for Server {
}
}
enum RemoveFrom {
TcpHandlers(Token),
//UdpRequests(Token),
}
#[cfg(test)]
mod server_tests {
use std::thread;