Fixes for the mio 0.5.0 on Linux

This commit is contained in:
Benjamin Fry 2016-04-07 23:46:25 -07:00
parent 351fddb5fa
commit 46f4d7c3b1
6 changed files with 213 additions and 158 deletions

View File

@ -2,6 +2,14 @@
All notable changes to this project will be documented in this file.
This project adheres to [Semantic Versioning](http://semver.org/).
## 0.5.3 2016-04-07
### Fixed
- [Linux TCP server mio issues](https://github.com/bluejekyll/trust-dns/issues/9)
### Changed
- combined the TCP client and server handlers
- reusing buffer in TCP handler between send and receive (performance)
## 0.5.2 2016-04-04
### Changed
- updated chrono to 0.2.21

3
docker-ftest.sh Executable file
View File

@ -0,0 +1,3 @@
#!/bin/sh
docker run -a STDERR -a STDOUT --rm -v ${PWD}:/src fnichol/rust:1.7.0 cargo test "$@"

View File

@ -20,12 +20,14 @@ use std::sync::Arc;
use std::cell::Cell;
use mio::{Token, EventLoop, Handler, EventSet, PollOpt, TryAccept};
use mio::tcp::{TcpListener};
use mio::tcp::{TcpListener, TcpStream};
use mio::udp::UdpSocket;
use ::udp::{UdpHandler, UdpState};
use ::tcp::{TcpHandler, TcpState};
use ::authority::Catalog;
use ::op::{Message, OpCode, ResponseCode};
use ::serialize::binary::{BinDecoder, BinEncoder, BinSerializable};
use ::tcp::{TcpHandler, TcpState};
use ::udp::{UdpHandler, UdpState};
// TODO, might be cool to store buffers for later usage...
pub struct Server {
@ -88,13 +90,54 @@ impl Server {
let mut event_loop: EventLoop<Self> = try!(EventLoop::new());
// registering these on non-writable events, since these are the listeners.
for (ref token, ref socket) in &self.udp_sockets { try!(event_loop.register(*socket, **token, !EventSet::writable(), PollOpt::level())); }
for (ref token, ref socket) in &self.tcp_sockets { try!(event_loop.register(*socket, **token, !EventSet::writable(), PollOpt::level())); }
for (ref token, ref socket) in &self.udp_sockets { try!(event_loop.register(*socket, **token, !EventSet::writable(), PollOpt::all())); }
for (ref token, ref socket) in &self.tcp_sockets { try!(event_loop.register(*socket, **token, !EventSet::writable(), PollOpt::all())); }
try!(event_loop.run(self));
Err(io::Error::new(io::ErrorKind::Interrupted, "Server stopping due to interruption"))
}
/// given a set of bytes, decode and process the request, producing a response to send
fn process_request(bytes: &[u8], stream: &TcpStream, catalog: &Catalog) -> Message {
let mut decoder = BinDecoder::new(bytes);
let request = Message::read(&mut decoder);
match request {
Err(ref decode_error) => {
warn!("unable to decode request from client: {:?}: {}", stream, decode_error);
Catalog::error_msg(0/* id is in the message... */, OpCode::Query/* right default? */, ResponseCode::FormErr)
},
Ok(ref req) => catalog.handle_request(req),
}
}
/// encodes a message to the specified buffer
fn encode_message(response: Message, buffer: &mut Vec<u8>) -> io::Result<()> {
// all responses need these fields set:
buffer.clear();
let encode_result = {
let mut encoder: BinEncoder = BinEncoder::new(buffer);
response.emit(&mut encoder)
};
if let Err(encode_error) = encode_result {
error!("error encoding response to client: {}", encode_error);
let err_msg = Catalog::error_msg(response.get_id(), response.get_op_code(), ResponseCode::ServFail);
buffer.clear();
let mut encoder: BinEncoder = BinEncoder::new(buffer);
err_msg.emit(&mut encoder).unwrap(); // this is a coding error if it fails
}
// ready to write to the other side, double check that our buffer is legit first.
if buffer.len() > u16::max_value() as usize() {
error!("too many bytes to write for u16, {}", buffer.len());
return Err(io::Error::new(io::ErrorKind::InvalidData, "did not write the length"));
}
Ok(())
}
}
impl Handler for Server {
@ -148,7 +191,7 @@ impl Handler for Server {
if reqs.is_empty() {
// theres nothing left you write, go back to just reading...
if let Err(e) = event_loop.reregister(socket, token, !EventSet::writable(), PollOpt::level()) {
if let Err(e) = event_loop.reregister(socket, token, !EventSet::writable(), PollOpt::all()) {
error!("could not reregister socket: {:?} error: {}", socket, e);
}
}
@ -164,7 +207,7 @@ impl Handler for Server {
self.udp_requests.entry(token).or_insert(VecDeque::new()).push_back(handler);
// reregeister the UDP socket for writes
if let Err(e) = event_loop.reregister(socket, token, EventSet::all(), PollOpt::level()) {
if let Err(e) = event_loop.reregister(socket, token, EventSet::all(), PollOpt::all()) {
error!("could not reregister socket: {:?} error: {}", socket, e);
}
} else {
@ -207,19 +250,28 @@ impl Handler for Server {
// TODO: do we need to shutdown the stream?
remove = Some(RemoveFrom::TcpHandlers(token));
} else if events.is_readable() || events.is_writable() {
let mut process_resquest = false;
// the handler will deal with the rest of the connection, we need to check the return value
// for an error with wouldblock, this means that the handler couldn't complete the request.
match handler.handle_message(events) {
Ok(TcpState::Done) => {
// reset, the client will close the connection according to the spec
handler.reset();
debug!("TcpState::Done");
},
Ok(TcpState::WillWriteLength) => {
// this means that we have gotten through recieving a packet
process_resquest = true;
debug!("TcpState::WillWriteLength");
}
Ok(..) => {
// registering the event to only wake up on the correct event
// this reduces looping on states like writable that can remain set for a long time
if let Err(e) = event_loop.reregister(handler.get_stream(), token, handler.get_events(), PollOpt::level()) {
error!("could not reregister stream: {:?} cause: {}", handler.get_stream(), e);
remove = Some(RemoveFrom::TcpHandlers(token));
//if let Err(e) = event_loop.reregister(handler.get_stream(), token, handler.get_events(), PollOpt::level()) {
debug!("reregistering for next call: {:?}", handler.get_events());
if let Err(e) = event_loop.reregister(handler.get_stream(), token, handler.get_events(), PollOpt::all()) {
error!("could not reregister stream: {:?} cause: {}", handler.get_stream(), e);
remove = Some(RemoveFrom::TcpHandlers(token));
}
},
Err(ref e) if io::ErrorKind::WouldBlock == e.kind() => {
@ -233,6 +285,21 @@ impl Handler for Server {
remove = Some(RemoveFrom::TcpHandlers(token));
}
}
// need to process the response
if process_resquest {
let response = Self::process_request(handler.get_buffer(), handler.get_stream(), self.catalog.as_ref());
if Self::encode_message(response, handler.get_buffer_mut()).is_err() {
warn!("could not encode message to: {:?}", handler.get_stream());
remove = Some(RemoveFrom::TcpHandlers(token))
}
debug!("reregistering for next call: {:?}", handler.get_events());
if let Err(e) = event_loop.reregister(handler.get_stream(), token, handler.get_events(), PollOpt::all()) {
error!("could not reregister stream: {:?} cause: {}", handler.get_stream(), e);
remove = Some(RemoveFrom::TcpHandlers(token));
}
}
}
}
@ -272,6 +339,10 @@ mod server_tests {
#[test]
fn test_server_www_udp() {
// use log::LogLevel;
// use ::logger;
// logger::TrustDnsLogger::enable_logging(LogLevel::Debug);
let example = create_example();
let origin = example.get_origin().clone();
@ -304,6 +375,10 @@ mod server_tests {
fn test_server_www_tcp() {
use mio::tcp::TcpListener;
// use log::LogLevel;
// use ::logger;
// logger::TrustDnsLogger::enable_logging(LogLevel::Debug);
let example = create_example();
let origin = example.get_origin().clone();
@ -337,7 +412,7 @@ mod server_tests {
println!("about to query server: {:?}", conn);
let client = Client::new(conn);
let response = client.query(&name, DNSClass::IN, RecordType::A).unwrap();
let response = client.query(&name, DNSClass::IN, RecordType::A).expect("error querying");
assert!(response.get_response_code() == ResponseCode::NoError, "got an error: {:?}", response.get_response_code());

View File

@ -15,6 +15,7 @@
*/
use std::io;
use std::io::{Write, Read};
use std::mem;
use std::sync::Arc;
use mio::tcp::TcpStream;
@ -30,27 +31,21 @@ pub struct TcpHandler {
state: TcpState, // current state of the handler and stream, i.e. are we reading from the client? or writing back to it?
buffer: Vec<u8>, // current location and buffer we are reading into or writing from
stream: TcpStream,
catalog: Arc<Catalog>,
catalog: Option<Arc<Catalog>>,
}
impl TcpHandler {
/// initializes this handler with the intention to write first
pub fn new_client_handler(message: Message, stream: TcpStream, catalog: Arc<Catalog>) -> Self {
let mut bytes: Vec<u8> = Vec::with_capacity(512);
{
let mut encoder: BinEncoder = BinEncoder::new(&mut bytes);
message.emit(&mut encoder).unwrap(); // coding error if this panics (i think?)
}
Self::new(TcpType::Client, TcpState::WillWriteLength, bytes, stream, catalog)
pub fn new_client_handler(stream: TcpStream, catalog: Option<Arc<Catalog>>) -> Self {
Self::new(TcpType::Client, TcpState::WillWriteLength, vec![], stream, catalog)
}
/// initializes this handler with the intention to read first
pub fn new_server_handler(stream: TcpStream, catalog: Arc<Catalog>) -> Self {
Self::new(TcpType::Server, TcpState::WillReadLength, Vec::with_capacity(512), stream, catalog)
Self::new(TcpType::Server, TcpState::WillReadLength, Vec::with_capacity(512), stream, Some(catalog))
}
fn new(tcp_type: TcpType, state: TcpState, buffer: Vec<u8>, stream: TcpStream, catalog: Arc<Catalog>) -> Self {
fn new(tcp_type: TcpType, state: TcpState, buffer: Vec<u8>, stream: TcpStream, catalog: Option<Arc<Catalog>>) -> Self {
TcpHandler{ tcp_type: tcp_type, state: state, buffer: buffer, stream: stream, catalog: catalog }
}
@ -62,6 +57,25 @@ impl TcpHandler {
Self::get_events_recurse(self.state, self.tcp_type)
}
pub fn set_buffer(&mut self, buffer: Vec<u8>) {
self.buffer = buffer;
}
pub fn remove_buffer(&mut self) -> Vec<u8> {
debug!("buffer: {}", self.buffer.len());
let buf = mem::replace(&mut self.buffer, vec![]);
debug!("buf: {}", buf.len());
return buf;
}
pub fn get_buffer(&self) -> &[u8] {
&self.buffer
}
pub fn get_buffer_mut(&mut self) -> &mut Vec<u8> {
&mut self.buffer
}
#[inline(always)]
fn get_events_recurse(state: TcpState, tcp_type: TcpType) -> EventSet {
match state {
@ -90,8 +104,8 @@ impl TcpHandler {
}
let length = (len_bytes[0] as u16) << 8 & 0xFF00 | len_bytes[1] as u16 & 0x00FF;
self.buffer = Vec::with_capacity(length as usize);
self.buffer.reserve(length as usize);
self.buffer.clear();
TcpState::WillRead{ length: length } // TODO clean up state change with param...
} else {
return Ok(self.state); // wrong socket state...
@ -105,41 +119,6 @@ impl TcpHandler {
// if we got all the bits...
if self.buffer.len() == length as usize {
let response: Message = {
let mut decoder = BinDecoder::new(&self.buffer);
let request = Message::read(&mut decoder);
match request {
Err(ref decode_error) => {
warn!("unable to decode request from client: {:?}: {}", self.stream, decode_error);
Catalog::error_msg(0/* id is in the message... */, OpCode::Query/* right default? */, ResponseCode::FormErr)
},
Ok(ref req) => self.catalog.handle_request(req),
}
};
// all responses need these fields set:
self.buffer.clear();
let encode_result = {
let mut encoder: BinEncoder = BinEncoder::new(&mut self.buffer);
response.emit(&mut encoder)
};
if let Err(encode_error) = encode_result {
error!("error encoding response to client: {}", encode_error);
let err_msg = Catalog::error_msg(response.get_id(), response.get_op_code(), ResponseCode::ServFail);
self.buffer.clear();
let mut encoder: BinEncoder = BinEncoder::new(&mut self.buffer);
err_msg.emit(&mut encoder).unwrap(); // this is a coding error if it fails
}
// ready to write to the other side, double check that our buffer is legit first.
if self.buffer.len() > u16::max_value() as usize() {
error!("too many bytes to write for u16, {}", self.buffer.len());
return Err(io::Error::new(io::ErrorKind::InvalidData, "did not write the length"));
}
self.state.next_state(self.tcp_type)
} else {
// still waiting on some more
@ -155,7 +134,6 @@ impl TcpHandler {
let wrote: usize = try!(self.stream.write(&len));
if wrote != 2 {
debug!("did not write all len_bytes expected: 2 got: {:?} bytes from: {:?}", wrote, self.stream);
return Err(io::Error::new(io::ErrorKind::InvalidData, "did not write the length"));
}
@ -184,16 +162,15 @@ impl TcpHandler {
/// resets the state of the handler to perform more requests if desired.
/// clears the buffers and sets the state back to the initial state
pub fn reset(&mut self) {
self.buffer.clear();
self.state = TcpState::initial_state(self.tcp_type);
}
}
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum TcpState {
WillReadLength,
WillReadLength, // waiting to recieve data
WillRead{ length: u16 }, // length of the message to read
WillWriteLength,
WillWriteLength, // beginning of a response to the other side
WillWrite, // length of the message to write
Done,
}

View File

@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::net::SocketAddr;
use std::io;
use std::io::{Write, Read};
use std::mem;
use std::fmt;
@ -21,13 +22,15 @@ use mio::{Token, EventLoop, Handler, EventSet, PollOpt}; // not * b/c don't want
use ::error::*;
use ::serialize::binary::*;
use client::ClientConnection;
use ::client::ClientConnection;
use ::tcp::{TcpHandler, TcpState};
const RESPONSE: Token = Token(0);
pub struct TcpClientConnection {
socket: Option<TcpStream>,
event_loop: EventLoop<Response>,
handler: Option<TcpHandler>,
event_loop: EventLoop<ClientHandler>,
error: Option<ClientError>,
}
impl TcpClientConnection {
@ -35,129 +38,104 @@ impl TcpClientConnection {
debug!("connecting to {:?}", name_server);
let stream = try!(TcpStream::connect(&name_server));
let mut event_loop: EventLoop<Response> = try!(EventLoop::new());
let mut event_loop: EventLoop<ClientHandler> = try!(EventLoop::new());
// TODO make the timeout configurable, 5 seconds is the dig default
// TODO the error is private to mio, which makes this awkward...
if event_loop.timeout_ms((), 5000).is_err() { return Err(ClientError::TimerError) };
// TODO: Linux requires a register before a reregister, reregister is needed b/c of OSX later
// ideally this would not be added to the event loop until the client connection request.
try!(event_loop.register(&stream, RESPONSE, EventSet::all(), PollOpt::all()));
Ok(TcpClientConnection{ socket: Some(stream), event_loop: event_loop })
Ok(TcpClientConnection{ handler: Some(TcpHandler::new_client_handler(stream, None)), event_loop: event_loop, error: None })
}
}
impl ClientConnection for TcpClientConnection {
fn send(&mut self, buffer: Vec<u8> ) -> ClientResult<Vec<u8>> {
self.error = None;
// TODO: b/c of OSX this needs to be a reregister (since deregister is not working)
// ideally it should be a register with the later deregister...
try!(self.event_loop.reregister(self.handler.as_ref().expect("never none").get_stream(), RESPONSE, EventSet::all(), PollOpt::all()));
// this is the request message, needs to be set each time
// TODO: it would be cool to reuse this buffer.
let mut handler = mem::replace(&mut self.handler, None).expect("never none");
handler.set_buffer(buffer);
let mut client_handler = ClientHandler{ handler: handler, error: None };
let result = self.event_loop.run(&mut client_handler);
self.handler = Some(client_handler.handler);
try!(self.event_loop.reregister(self.socket.as_ref().expect("never none"), RESPONSE, EventSet::all(), PollOpt::all()));
let mut response: Response = Response::new(buffer, mem::replace(&mut self.socket, None).expect("Only one user at a time"));
try!(self.event_loop.run(&mut response));
try!(result);
if response.error.is_some() { return Err(response.error.unwrap()) }
if response.buf.is_none() { return Err(ClientError::NoDataReceived) }
let result = Ok(response.buf.unwrap());
self.socket = Some(response.stream);
result
if self.error.is_some() { return Err(mem::replace(&mut self.error, None).unwrap()) }
Ok(self.handler.as_mut().expect("never none").remove_buffer())
//debug!("client deregistering");
// TODO: when this line is added OSX starts failing, but we should have it...
// try!(self.event_loop.deregister(&response.stream));
}
}
impl fmt::Debug for TcpClientConnection {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "TcpClientConnection: {:?}", self.socket)
write!(f, "TcpClientConnection: {:?}", self.handler.as_ref().expect("never none").get_stream())
}
}
struct Response {
pub state: ClientState,
pub message: Vec<u8>,
pub buf: Option<Vec<u8>>,
struct ClientHandler {
pub handler: TcpHandler,
pub error: Option<ClientError>,
pub stream: TcpStream,
}
enum ClientState {
WillWrite,
//WillRead,
}
impl Response {
pub fn new(message: Vec<u8>, stream: TcpStream) -> Self {
Response{ state: ClientState::WillWrite, message: message, buf: None, error: None, stream: stream }
}
}
// TODO: this should be merged with the server handler
impl Handler for Response {
impl Handler for ClientHandler {
type Timeout = ();
type Message = ();
fn ready(&mut self, event_loop: &mut EventLoop<Self>, token: Token, events: EventSet) {
match token {
RESPONSE => {
if events.is_writable() {
let len: [u8; 2] = [(self.message.len() >> 8 & 0xFF) as u8, (self.message.len() & 0xFF) as u8];
self.error = self.stream.write_all(&len).and_then(|_|self.stream.write_all(&self.message)).err().map(|o|o.into());
if self.error.is_some() { return }
self.error = self.stream.flush().err().map(|o|o.into());
debug!("wrote {} bytes to {:?}", self.message.len(), self.stream.peer_addr());
} else if events.is_readable() {
// assuming we will always be able to read two bytes.
let mut len_bytes: [u8;2] = [0u8;2];
{
let stream: &mut TcpStream = &mut self.stream;
match stream.take(2).read(&mut len_bytes) {
Ok(len) if len != 2 => {
debug!("did not read all len_bytes expected: 2 got: {:?} bytes from: {:?}", len_bytes, stream);
self.error = Some(ClientError::NotAllBytesReceived{received: len, expect: 2});
return
},
Err(e) => {
self.error = Some(e.into());
return
},
Ok(_) => (),
if events.is_error() {
warn!("closing, error from: {:?}", self.handler);
// TODO: do we need to shutdown the stream?
event_loop.shutdown();
} else if events.is_hup() {
info!("client hungup: {:?}", self.handler);
// TODO: do we need to shutdown the stream?
//remove = Some(RemoveFrom::TcpHandlers(token));
event_loop.shutdown();
} else if events.is_readable() || events.is_writable() {
// the handler will deal with the rest of the connection, we need to check the return value
// for an error with wouldblock, this means that the handler couldn't complete the request.
match self.handler.handle_message(events) {
Ok(TcpState::Done) => {
// the shutdown will stop the event_loop run to return the requester
self.handler.reset();
event_loop.shutdown();
},
Ok(..) => {
// registering the event to only wake up on the correct event
// this reduces looping on states like writable that can remain set for a long time
//if let Err(e) = event_loop.reregister(handler.get_stream(), token, handler.get_events(), PollOpt::level()) {
debug!("reregistering for next call: {:?}", self.handler.get_events());
if let Err(e) = event_loop.reregister(self.handler.get_stream(), token, self.handler.get_events(), PollOpt::all()) {
error!("could not reregister stream: {:?} cause: {}", self.handler.get_stream(), e);
// TODO: need to return an error here
//remove = Some(RemoveFrom::TcpHandlers(token));
}
},
Err(ref e) if io::ErrorKind::WouldBlock == e.kind() => {
// this is expected with the connection would block
// noop
},
Err(e) => {
// shutdown the connection, remove it.
warn!("connection: {:?} shutdown on error: {}", self.handler, e);
// TODO: do we need to shutdown the stream?
//remove = Some(RemoveFrom::TcpHandlers(token));
// TODO: need to return an error here
//self.error = Some(e);
event_loop.shutdown();
}
}
let len: u16 = (len_bytes[0] as u16) << 8 & 0xFF00 | len_bytes[1] as u16 & 0x00FF;
debug!("reading {:?} bytes from: {:?}", len, self.stream.peer_addr());
// use a cursor here, and seek to the write spot on each read...
let mut buf = Vec::with_capacity(len as usize);
{
let stream: &mut TcpStream = &mut self.stream;
match stream.take(len as u64).read_to_end(&mut buf) {
Ok(got) if got != len as usize => {
debug!("did not read all bytes got: {} expected: {} bytes from: {:?}", got, len, stream.peer_addr());
self.error = Some(ClientError::NotAllBytesReceived{received: got, expect: len as usize});
return
},
Err(e) => {
self.error = Some(e.into());
return
},
Ok(_) => (),
}
}
// we got our response, shutdown.
event_loop.shutdown();
debug!("read {:?} bytes from: {:?}", buf.len(), self.stream);
// set our data
self.buf = Some(buf);
// TODO, perhaps parse the response in here, so that the client could ignore messages with the
// wrong serial number.
} else if events.is_error() || events.is_hup() {
debug!("an error occured, connection shutdown early: {:?}", token);
self.error = Some(ClientError::NoDataReceived);
event_loop.shutdown();
} else {
debug!("got woken up, but not readable or writable: {:?}", token);
return
}
},
_ => {

View File

@ -40,6 +40,11 @@ impl UdpClientConnection {
// TODO make the timeout configurable, 5 seconds is the dig default
// TODO the error is private to mio, which makes this awkward...
if event_loop.timeout_ms((), 5000).is_err() { return Err(ClientError::TimerError) };
// TODO: Linux requires a register before a reregister, reregister is needed b/c of OSX later
// ideally this would not be added to the event loop until the client connection request.
try!(event_loop.register(&socket, RESPONSE, EventSet::readable(), PollOpt::all()));
debug!("client event_loop created");
Ok(UdpClientConnection{name_server: name_server, socket: Some(socket), event_loop: event_loop})
}
@ -47,17 +52,26 @@ impl UdpClientConnection {
impl ClientConnection for UdpClientConnection {
fn send(&mut self, buffer: Vec<u8>) -> ClientResult<Vec<u8>> {
debug!("client reregistering");
// TODO: b/c of OSX this needs to be a reregister (since deregister is not working)
try!(self.event_loop.reregister(self.socket.as_ref().expect("never none"), RESPONSE, EventSet::readable(), PollOpt::all()));
debug!("client sending");
try!(self.socket.as_ref().expect("never none").send_to(&buffer, &self.name_server));
debug!("client sent data");
let mut response: Response = Response::new(mem::replace(&mut self.socket, None).expect("never none"));
// run_once should be enough, if something else nepharious hits the socket, what?
try!(self.event_loop.run(&mut response));
debug!("client event_loop running");
if response.error.is_some() { return Err(response.error.unwrap()) }
if response.buf.is_none() { return Err(ClientError::NoDataReceived) }
let result = Ok(response.buf.unwrap());
//debug!("client deregistering");
// TODO: when this line is added OSX starts failing, but we should have it...
// try!(self.event_loop.deregister(&response.socket));
self.socket = Some(response.socket);
result
}