timeouts for ServerFuture
This commit is contained in:
parent
8859b5c09b
commit
3f5f770741
@ -3,10 +3,19 @@ All notable changes to this project will be documented in this file.
|
||||
This project adheres to [Semantic Versioning](http://semver.org/).
|
||||
|
||||
## unreleased (0.9.0)
|
||||
### Added
|
||||
- new ServerFuture tokio and futures based server, #61
|
||||
- UdpStream & TcpSteam to support stream of messages with src address
|
||||
- TimeoutStream to wrap TcpStreams to help guard against malicious clients
|
||||
|
||||
### Changed
|
||||
- Split Server and Client into separate crates, #43
|
||||
- Moved many integration tests to `tests` from `src`, #52
|
||||
|
||||
### Fixed
|
||||
- Flush TcpStream after fully sending Message
|
||||
- Recognize no bytes read as closed TcpStream
|
||||
|
||||
## 0.8.1
|
||||
### Fixed
|
||||
- Fix build on rustc 1.11, #66
|
||||
|
24
Cargo.lock
generated
24
Cargo.lock
generated
@ -6,7 +6,7 @@ dependencies = [
|
||||
"chrono 0.2.25 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"docopt 0.6.86 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"error-chain 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"futures 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"lazy_static 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"mio 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
@ -15,7 +15,7 @@ dependencies = [
|
||||
"rusqlite 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rustc-serialize 0.3.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tokio-core 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tokio-core 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"toml 0.1.30 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"trust-dns 0.8.1",
|
||||
]
|
||||
@ -102,7 +102,7 @@ dependencies = [
|
||||
"lazy_static 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"regex 0.1.80 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rustc-serialize 0.3.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"strsim 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"strsim 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -115,7 +115,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "futures"
|
||||
version = "0.1.3"
|
||||
version = "0.1.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
@ -424,7 +424,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
|
||||
[[package]]
|
||||
name = "strsim"
|
||||
version = "0.5.1"
|
||||
version = "0.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
|
||||
[[package]]
|
||||
@ -464,10 +464,10 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tokio-core"
|
||||
version = "0.1.0"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"futures 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"mio 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"scoped-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
@ -490,7 +490,7 @@ dependencies = [
|
||||
"chrono 0.2.25 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"data-encoding 1.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"error-chain 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"futures 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"lazy_static 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"mio 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
@ -498,7 +498,7 @@ dependencies = [
|
||||
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rustc-serialize 0.3.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tokio-core 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tokio-core 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"trust-dns-server 0.8.1",
|
||||
]
|
||||
|
||||
@ -553,7 +553,7 @@ dependencies = [
|
||||
"checksum dbghelp-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "97590ba53bcb8ac28279161ca943a924d1fd4a8fb3fa63302591647c4fc5b850"
|
||||
"checksum docopt 0.6.86 (registry+https://github.com/rust-lang/crates.io-index)" = "4a7ef30445607f6fc8720f0a0a2c7442284b629cf0d049286860fae23e71c4d9"
|
||||
"checksum error-chain 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "faa976b4fd2e4c2b2f3f486874b19e61944d3de3de8b61c9fcf835d583871bcc"
|
||||
"checksum futures 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dd89497091f8c5d3a65c6b4baf6d2f0731937a7c9217d2f89141b21437a9d96"
|
||||
"checksum futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "0bad0a2ac64b227fdc10c254051ae5af542cf19c9328704fd4092f7914196897"
|
||||
"checksum gcc 0.3.38 (registry+https://github.com/rust-lang/crates.io-index)" = "553f11439bdefe755bf366b264820f1da70f3aaf3924e594b886beb9c831bcf5"
|
||||
"checksum gdi32-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0912515a8ff24ba900422ecda800b52f4016a56251922d397c576bf92c690518"
|
||||
"checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
|
||||
@ -591,12 +591,12 @@ dependencies = [
|
||||
"checksum semver 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)" = "d4f410fedcf71af0345d7607d246e7ad15faaadd49d240ee3b24e5dc21a820ac"
|
||||
"checksum slab 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "d807fd58c4181bbabed77cb3b891ba9748241a552bcc5be698faaebefc54f46e"
|
||||
"checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23"
|
||||
"checksum strsim 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "50c069df92e4b01425a8bf3576d5d417943a6a7272fbabaf5bd80b1aaa76442e"
|
||||
"checksum strsim 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "67f84c44fbb2f91db7fef94554e6b2ac05909c9c0b0bc23bb98d3a1aebfe7f7c"
|
||||
"checksum tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "87974a6f5c1dfb344d733055601650059a3363de2a6104819293baff662132d6"
|
||||
"checksum thread-id 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a9539db560102d1cef46b8b78ce737ff0bb64e7e18d35b2a5688f7d097d0ff03"
|
||||
"checksum thread_local 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)" = "8576dbbfcaef9641452d5cf0df9b0e7eeab7694956dd33bb61515fb8f18cfdd5"
|
||||
"checksum time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)" = "3c7ec6d62a20df54e07ab3b78b9a3932972f4b7981de295563686849eb3989af"
|
||||
"checksum tokio-core 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "659cbae6c954dee37352853816c6a52180e47feb70be73bbfeec6d58c4da4a71"
|
||||
"checksum tokio-core 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "06f40e15561569e24dab3dcf270c0bb950195b84dbed591dfb6591e28c9b9cff"
|
||||
"checksum toml 0.1.30 (registry+https://github.com/rust-lang/crates.io-index)" = "0590d72182e50e879c4da3b11c6488dae18fccb1ae0c7a3eda18e16795844796"
|
||||
"checksum user32-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4ef4711d107b21b410a3a974b1204d9accc8b10dad75d8324b5d755de1617d47"
|
||||
"checksum utf8-ranges 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a1ca13c08c41c9c3e04224ed9ff80461d97e121589ff27c753a16cb10830ae0f"
|
||||
|
@ -312,7 +312,6 @@ pub struct BasicClientHandle {
|
||||
|
||||
impl ClientHandle for BasicClientHandle {
|
||||
fn send(&self, message: Message) -> Box<Future<Item=Message, Error=ClientError>> {
|
||||
debug!("sending message");
|
||||
let (complete, oneshot) = futures::oneshot();
|
||||
|
||||
let oneshot = match self.message_sender.send((message, complete)) {
|
||||
|
@ -6,7 +6,7 @@
|
||||
// copied, modified, or distributed except according to those terms.
|
||||
|
||||
use std::mem;
|
||||
use std::net::SocketAddr;
|
||||
use std::net::{Shutdown, SocketAddr};
|
||||
use std::io;
|
||||
use std::io::{Read, Write};
|
||||
|
||||
@ -21,6 +21,7 @@ use ::BufStreamHandle;
|
||||
enum WriteTcpState {
|
||||
LenBytes{ pos: usize, length: [u8; 2], bytes: Vec<u8> },
|
||||
Bytes{ pos: usize, bytes: Vec<u8> },
|
||||
Flushing,
|
||||
}
|
||||
|
||||
enum ReadTcpState {
|
||||
@ -101,6 +102,9 @@ impl Stream for TcpStream {
|
||||
let wrote = try_nb!(self.socket.write(&bytes[*pos..]));
|
||||
*pos += wrote;
|
||||
},
|
||||
Some(WriteTcpState::Flushing) => {
|
||||
try_nb!(self.socket.flush());
|
||||
},
|
||||
_ => (),
|
||||
}
|
||||
|
||||
@ -120,9 +124,15 @@ impl Stream for TcpStream {
|
||||
if pos < bytes.len() {
|
||||
mem::replace(&mut self.send_state, Some(WriteTcpState::Bytes{ pos: pos, bytes: bytes }));
|
||||
} else {
|
||||
mem::replace(&mut self.send_state, None);
|
||||
// At this point we successfully delivered the entire message.
|
||||
// flush
|
||||
mem::replace(&mut self.send_state, Some(WriteTcpState::Flushing));
|
||||
}
|
||||
},
|
||||
Some(WriteTcpState::Flushing) => {
|
||||
// At this point we successfully delivered the entire message.
|
||||
mem::replace(&mut self.send_state, None);
|
||||
}
|
||||
None => (),
|
||||
};
|
||||
} else {
|
||||
@ -142,11 +152,13 @@ impl Stream for TcpStream {
|
||||
let len: [u8; 2] = [(buffer.len() >> 8 & 0xFF) as u8,
|
||||
(buffer.len() & 0xFF) as u8];
|
||||
|
||||
debug!("sending message len: {} to: {}", buffer.len(), dst);
|
||||
self.send_state = Some(WriteTcpState::LenBytes{ pos: 0, length: len, bytes: buffer });
|
||||
},
|
||||
// now we get to drop through to the receives...
|
||||
// TODO: should we also return None if there are no more messages to send?
|
||||
Async::NotReady | Async::Ready(None) => { debug!("no messages to send"); break },
|
||||
Async::NotReady => break,
|
||||
Async::Ready(None) => { debug!("no messages to send"); break },
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -160,10 +172,21 @@ impl Stream for TcpStream {
|
||||
// if Some(_) is returned, then that will be used as the next state.
|
||||
let new_state: Option<ReadTcpState> = match self.read_state {
|
||||
ReadTcpState::LenBytes { ref mut pos, ref mut bytes } => {
|
||||
debug!("in ReadTcpState::LenBytes: {}", pos);
|
||||
|
||||
// debug!("reading length {}", bytes.len());
|
||||
let read = try_nb!(self.socket.read(&mut bytes[*pos..]));
|
||||
if read == 0 {
|
||||
// the Stream was closed!
|
||||
debug!("zero bytes read, stream closed?");
|
||||
try!(self.socket.shutdown(Shutdown::Both));
|
||||
|
||||
if *pos == 0 {
|
||||
// Since this is the start of the next message, we have a clean end
|
||||
return Ok(Async::Ready(None))
|
||||
} else {
|
||||
return Err(io::Error::new(io::ErrorKind::BrokenPipe, "closed while reading length"));
|
||||
}
|
||||
}
|
||||
debug!("in ReadTcpState::LenBytes: {}", pos);
|
||||
*pos += read;
|
||||
|
||||
if *pos < bytes.len() {
|
||||
@ -180,8 +203,17 @@ impl Stream for TcpStream {
|
||||
}
|
||||
},
|
||||
ReadTcpState::Bytes { ref mut pos, ref mut bytes } => {
|
||||
debug!("in ReadTcpState::Bytes: {}", bytes.len());
|
||||
let read = try_nb!(self.socket.read(&mut bytes[*pos..]));
|
||||
if read == 0 {
|
||||
// the Stream was closed!
|
||||
debug!("zero bytes read for message, stream closed?");
|
||||
|
||||
// Since this is the start of the next message, we have a clean end
|
||||
try!(self.socket.shutdown(Shutdown::Both));
|
||||
return Err(io::Error::new(io::ErrorKind::BrokenPipe, "closed while reading message"));
|
||||
}
|
||||
|
||||
debug!("in ReadTcpState::Bytes: {}", bytes.len());
|
||||
*pos += read;
|
||||
|
||||
if *pos < bytes.len() {
|
||||
|
@ -36,7 +36,7 @@ impl RequestHandler for Catalog {
|
||||
///
|
||||
/// * `request` - the requested action to perform.
|
||||
fn handle_request(&self, request: &Message) -> Message {
|
||||
info!("id: {} type: {:?} op_code: {:?}", request.get_id(), request.get_message_type(), request.get_op_code());
|
||||
info!("request id: {} type: {:?} op_code: {:?}", request.get_id(), request.get_message_type(), request.get_op_code());
|
||||
debug!("request: {:?}", request);
|
||||
|
||||
let mut resp_edns_opt: Option<Edns> = None;
|
||||
|
@ -16,15 +16,15 @@
|
||||
|
||||
//! Configuration module for the server binary, `named`.
|
||||
|
||||
use std::io::Read;
|
||||
use std::fs::File;
|
||||
use std::io::Read;
|
||||
use std::net::{Ipv4Addr, Ipv6Addr};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::str::FromStr;
|
||||
use std::net::{Ipv4Addr, Ipv6Addr};
|
||||
use std::time::Duration;
|
||||
|
||||
use log::LogLevel;
|
||||
use rustc_serialize::Decodable;
|
||||
|
||||
use toml::{Decoder, Value};
|
||||
|
||||
use trust_dns::error::ParseResult;
|
||||
@ -33,14 +33,16 @@ use trust_dns::rr::Name;
|
||||
use ::authority::ZoneType;
|
||||
use ::error::{ConfigErrorKind, ConfigResult, ConfigError};
|
||||
|
||||
static DEFAULT_PORT: u16 = 53;
|
||||
static DEFAULT_PATH: &'static str = "/var/named"; // TODO what about windows (do I care? ;)
|
||||
static DEFAULT_PORT: u16 = 53;
|
||||
static DEFAULT_TCP_REQUEST_TIMEOUT: u64 = 5;
|
||||
|
||||
#[derive(RustcDecodable, Debug)]
|
||||
pub struct Config {
|
||||
listen_addrs_ipv4: Vec<String>,
|
||||
listen_addrs_ipv6: Vec<String>,
|
||||
listen_port: Option<u16>,
|
||||
tcp_request_timeout: Option<u64>,
|
||||
log_level: Option<String>,
|
||||
directory: Option<String>,
|
||||
zones: Vec<ZoneConfig>,
|
||||
@ -58,6 +60,7 @@ impl Config {
|
||||
pub fn get_listen_addrs_ipv4(&self) -> Vec<Ipv4Addr> { self.listen_addrs_ipv4.iter().map(|s| s.parse().unwrap()).collect() }
|
||||
pub fn get_listen_addrs_ipv6(&self) -> Vec<Ipv6Addr> { self.listen_addrs_ipv6.iter().map(|s| s.parse().unwrap()).collect() }
|
||||
pub fn get_listen_port(&self) -> u16 { self.listen_port.unwrap_or(DEFAULT_PORT) }
|
||||
pub fn get_tcp_request_timeout(&self) -> Duration { Duration::from_secs(self.tcp_request_timeout.unwrap_or(DEFAULT_TCP_REQUEST_TIMEOUT)) }
|
||||
pub fn get_log_level(&self) -> LogLevel {
|
||||
if let Some(ref level_str) = self.log_level {
|
||||
match level_str as &str {
|
||||
|
@ -33,7 +33,6 @@
|
||||
extern crate chrono;
|
||||
extern crate docopt;
|
||||
#[macro_use] extern crate log;
|
||||
extern crate mio;
|
||||
extern crate openssl;
|
||||
extern crate rustc_serialize;
|
||||
extern crate trust_dns;
|
||||
@ -42,16 +41,13 @@ extern crate trust_dns_server;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::collections::BTreeMap;
|
||||
use std::net::{Ipv4Addr, IpAddr, SocketAddr};
|
||||
use std::net::ToSocketAddrs;
|
||||
use std::net::{Ipv4Addr, IpAddr, SocketAddr, TcpListener, ToSocketAddrs, UdpSocket};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::io::{Read, Write};
|
||||
|
||||
use chrono::{Duration};
|
||||
use docopt::Docopt;
|
||||
use log::LogLevel;
|
||||
use mio::tcp::TcpListener;
|
||||
use mio::udp::UdpSocket;
|
||||
use openssl::crypto::rsa::RSA;
|
||||
|
||||
use trust_dns::error::ParseResult;
|
||||
@ -63,8 +59,7 @@ use trust_dns::rr::dnssec::{Algorithm, Signer};
|
||||
|
||||
use trust_dns_server::authority::{Authority, Catalog, Journal, ZoneType};
|
||||
use trust_dns_server::config::{Config, ZoneConfig};
|
||||
#[allow(deprecated)]
|
||||
use trust_dns_server::server::Server;
|
||||
use trust_dns_server::server::ServerFuture;
|
||||
|
||||
// the Docopt usage string.
|
||||
// http://docopt.org
|
||||
@ -254,17 +249,19 @@ pub fn main() {
|
||||
let v6addr = config.get_listen_addrs_ipv6();
|
||||
let mut listen_addrs : Vec<IpAddr> = v4addr.into_iter().map(|x| IpAddr::V4(x)).chain(v6addr.into_iter().map(|x| IpAddr::V6(x))).collect();
|
||||
let listen_port: u16 = args.flag_port.unwrap_or(config.get_listen_port());
|
||||
let tcp_request_timeout = config.get_tcp_request_timeout();
|
||||
|
||||
if listen_addrs.len() == 0 {
|
||||
listen_addrs.push(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)));
|
||||
}
|
||||
let sockaddrs : Vec<SocketAddr> = listen_addrs.into_iter().flat_map(|x| (x, listen_port).to_socket_addrs().unwrap()).collect();
|
||||
let udp_sockets : Vec<UdpSocket> = sockaddrs.iter().map(|x| UdpSocket::bound(x).expect(&format!("could not bind to udp: {}", x))).collect();
|
||||
let udp_sockets : Vec<UdpSocket> = sockaddrs.iter().map(|x| UdpSocket::bind(x).expect(&format!("could not bind to udp: {}", x))).collect();
|
||||
let tcp_listeners : Vec<TcpListener> = sockaddrs.iter().map(|x| TcpListener::bind(x).expect(&format!("could not bind to tcp: {}", x))).collect();
|
||||
|
||||
// now, run the server, based on the config
|
||||
let mut server = Server::new(catalog);
|
||||
let mut server = ServerFuture::new(catalog).expect("error creating ServerFuture");
|
||||
|
||||
// load all the listeners
|
||||
for udp_socket in udp_sockets {
|
||||
info!("listening for UDP on {:?}", udp_socket);
|
||||
server.register_socket(udp_socket);
|
||||
@ -272,7 +269,7 @@ pub fn main() {
|
||||
|
||||
for tcp_listener in tcp_listeners {
|
||||
info!("listening for TCP on {:?}", tcp_listener);
|
||||
server.register_listener(tcp_listener);
|
||||
server.register_listener(tcp_listener, tcp_request_timeout);
|
||||
}
|
||||
|
||||
banner();
|
||||
@ -281,9 +278,6 @@ pub fn main() {
|
||||
error!("failed to listen: {}", e);
|
||||
}
|
||||
|
||||
//let mut server = Server::new((listen_addr, listen_port), catalog).unwrap();
|
||||
//server.listen().unwrap();
|
||||
|
||||
// we're exiting for some reason...
|
||||
info!("Trust-DNS {} stopping", trust_dns::version());
|
||||
}
|
||||
|
@ -25,6 +25,11 @@ pub struct RequestStream<S> {
|
||||
}
|
||||
|
||||
impl<S> RequestStream<S> {
|
||||
/// Creates a new RequestStream
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `stream` - Stream from which requests will be read
|
||||
/// * `stream_handle` - Handle to which responses will be posted
|
||||
pub fn new(stream: S, stream_handle: BufStreamHandle) -> Self {
|
||||
RequestStream{ stream: stream, stream_handle: stream_handle }
|
||||
}
|
||||
@ -34,6 +39,11 @@ impl<S> Stream for RequestStream<S> where S: Stream<Item=(Vec<u8>, SocketAddr),
|
||||
type Item = (Request, ResponseHandle);
|
||||
type Error = io::Error;
|
||||
|
||||
/// Polls the underlying Stream for readyness.
|
||||
///
|
||||
/// # Returns
|
||||
/// When `Async::Ready(Some(_))` is returned, it contains the deserialized request and a handle
|
||||
/// back to the underlying stream to which a response can be sent.
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
loop {
|
||||
match try_ready!(self.stream.poll()) {
|
||||
@ -46,6 +56,7 @@ impl<S> Stream for RequestStream<S> where S: Stream<Item=(Vec<u8>, SocketAddr),
|
||||
let mut decoder = BinDecoder::new(&buffer);
|
||||
match Message::read(&mut decoder) {
|
||||
Ok(message) => {
|
||||
debug!("received message: {}", message.get_id());
|
||||
let request = Request{ message: message, src: addr};
|
||||
let response_handle = ResponseHandle{ dst: addr, stream_handle: self.stream_handle.clone() };
|
||||
return Ok(Async::Ready(Some((request, response_handle))));
|
||||
@ -62,13 +73,17 @@ impl<S> Stream for RequestStream<S> where S: Stream<Item=(Vec<u8>, SocketAddr),
|
||||
}
|
||||
}
|
||||
|
||||
/// A handler for wraping a BufStreamHandle, which will properly serialize the message and add the
|
||||
/// associated destination.
|
||||
pub struct ResponseHandle {
|
||||
dst: SocketAddr,
|
||||
stream_handle: BufStreamHandle,
|
||||
}
|
||||
|
||||
impl ResponseHandle {
|
||||
/// Serializes and sends a message to to the wrapped handle
|
||||
pub fn send(&self, response: Message) -> io::Result<()> {
|
||||
debug!("sending message: {}", response.get_id());
|
||||
let mut buffer = Vec::with_capacity(512);
|
||||
let encode_result = {
|
||||
let mut encoder: BinEncoder = BinEncoder::new(&mut buffer);
|
||||
|
@ -28,6 +28,7 @@ pub struct ServerFuture {
|
||||
}
|
||||
|
||||
impl ServerFuture {
|
||||
/// Creates a new ServerFuture with the specified Catalog of Zones.
|
||||
pub fn new(catalog: Catalog) -> io::Result<ServerFuture> {
|
||||
Ok(ServerFuture {
|
||||
io_loop: try!(Core::new()),
|
||||
|
@ -13,15 +13,25 @@ pub struct TimeoutStream<S> {
|
||||
stream: S,
|
||||
reactor_handle: Handle,
|
||||
timeout_duration: Duration,
|
||||
timeout: Timeout,
|
||||
timeout: Option<Timeout>,
|
||||
}
|
||||
|
||||
impl<S> TimeoutStream<S> {
|
||||
pub fn new(stream: S, timeout_duration: Duration, reactor_handle: Handle) -> io::Result<Self> {
|
||||
// store a Timeout for this message before sending
|
||||
let timeout = try!(Timeout::new(timeout_duration, &reactor_handle));
|
||||
|
||||
let timeout = try!(Self::timeout(timeout_duration, &reactor_handle));
|
||||
|
||||
Ok(TimeoutStream{ stream: stream, reactor_handle: reactor_handle, timeout_duration: timeout_duration, timeout: timeout })
|
||||
}
|
||||
|
||||
fn timeout(timeout_duration: Duration, reactor_handle: &Handle) -> io::Result<Option<Timeout>> {
|
||||
if timeout_duration > Duration::from_millis(0) {
|
||||
Ok(Some(try!(Timeout::new(timeout_duration, reactor_handle))))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, I> Stream for TimeoutStream<S>
|
||||
@ -34,15 +44,20 @@ where S: Stream<Item=I, Error=io::Error> {
|
||||
match self.stream.poll() {
|
||||
r @ Ok(Async::Ready(_)) | r @ Err(_) => {
|
||||
// reset the timeout to wait for the next request...
|
||||
let timeout = try!(Timeout::new(self.timeout_duration, &self.reactor_handle));
|
||||
let timeout = try!(Self::timeout(self.timeout_duration, &self.reactor_handle));
|
||||
drop(mem::replace(&mut self.timeout, timeout));
|
||||
|
||||
return r
|
||||
},
|
||||
Ok(Async::NotReady) => {
|
||||
// otherwise poll the timeout
|
||||
match try_ready!(self.timeout.poll()) {
|
||||
() => return Err(io::Error::new(io::ErrorKind::TimedOut, format!("nothing ready in {:?}", self.timeout_duration))),
|
||||
if self.timeout.is_none() { return Ok(Async::NotReady) }
|
||||
|
||||
// otherwise check if the timeout has expired.
|
||||
match try_ready!(self.timeout.as_mut().unwrap().poll()) {
|
||||
() => {
|
||||
debug!("timeout on stream");
|
||||
return Err(io::Error::new(io::ErrorKind::TimedOut, format!("nothing ready in {:?}", self.timeout_duration)))
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ extern crate trust_dns_server;
|
||||
use std::env;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::net::{Ipv4Addr, Ipv6Addr};
|
||||
use std::time::Duration;
|
||||
|
||||
use log::LogLevel;
|
||||
|
||||
@ -40,6 +41,7 @@ fn test_read_config() {
|
||||
assert_eq!(config.get_listen_port(), 53);
|
||||
assert_eq!(config.get_listen_addrs_ipv4(), vec![]);
|
||||
assert_eq!(config.get_listen_addrs_ipv6(), vec![]);
|
||||
assert_eq!(config.get_tcp_request_timeout(), Duration::from_secs(5));
|
||||
assert_eq!(config.get_log_level(), LogLevel::Info);
|
||||
assert_eq!(config.get_directory(), Path::new("/var/named"));
|
||||
assert_eq!(config.get_zones(), [
|
||||
@ -69,6 +71,9 @@ fn test_parse_toml() {
|
||||
let config: Config = "listen_addrs_ipv6 = [\"::0\", \"::1\"]".parse().unwrap();
|
||||
assert_eq!(config.get_listen_addrs_ipv6(), vec![Ipv6Addr::new(0,0,0,0,0,0,0,0), Ipv6Addr::new(0,0,0,0,0,0,0,1)]);
|
||||
|
||||
let config: Config = "tcp_request_timeout = 25".parse().unwrap();
|
||||
assert_eq!(config.get_tcp_request_timeout(), Duration::from_secs(25));
|
||||
|
||||
let config: Config = "log_level = \"Debug\"".parse().unwrap();
|
||||
assert_eq!(config.get_log_level(), LogLevel::Debug);
|
||||
|
||||
|
@ -25,6 +25,13 @@
|
||||
## listen_port: port on which to list
|
||||
# listen_port = 53
|
||||
|
||||
## tcp_request_timeout: TCP request timeout in seconds. Allows TCP connections
|
||||
## to timeout if there are no requests from a client in the specified amount of
|
||||
## time. This is not a socket level timeout, so trickles of data will not count,
|
||||
## a full request must be received for it to not count against the timeout.
|
||||
## Specifying a timeout of 0 will disable it.
|
||||
# tcp_request_timeout = 5
|
||||
|
||||
## log_level: Trace, Debug, Info, Warn, Error
|
||||
# log_level = "Info"
|
||||
|
||||
|
@ -1,3 +1,5 @@
|
||||
extern crate futures;
|
||||
extern crate log;
|
||||
extern crate trust_dns;
|
||||
extern crate tokio_core;
|
||||
|
||||
@ -28,6 +30,7 @@ fn named_test_harness<F, R>(toml: &str, test: F) where F: FnOnce(u16) -> R + Unw
|
||||
|
||||
let mut named = Command::new(&format!("{}/../target/debug/named", server_path))
|
||||
.stdout(Stdio::piped())
|
||||
//.arg("-d")
|
||||
.arg(&format!("--config={}/tests/named_test_configs/{}", server_path, toml))
|
||||
.arg(&format!("--zonedir={}/tests/named_test_configs", server_path))
|
||||
.arg(&format!("--port={}", test_port))
|
||||
@ -66,6 +69,7 @@ fn named_test_harness<F, R>(toml: &str, test: F) where F: FnOnce(u16) -> R + Unw
|
||||
for _ in 0..1000 {
|
||||
output.clear();
|
||||
named_out.read_line(&mut output).expect("could not read stdout");
|
||||
stdout().write(b"SRV: ").unwrap();
|
||||
stdout().write(output.as_bytes()).unwrap();
|
||||
if output.ends_with("awaiting connections...\n") { found = true; break }
|
||||
}
|
||||
@ -80,6 +84,7 @@ fn named_test_harness<F, R>(toml: &str, test: F) where F: FnOnce(u16) -> R + Unw
|
||||
while !succeeded.load(std::sync::atomic::Ordering::Relaxed) {
|
||||
output.clear();
|
||||
named_out.read_line(&mut output).expect("could not read stdout");
|
||||
stdout().write(b"SRV: ").unwrap();
|
||||
stdout().write(output.as_bytes()).unwrap();
|
||||
}
|
||||
}).expect("no thread available");
|
||||
@ -98,13 +103,16 @@ fn named_test_harness<F, R>(toml: &str, test: F) where F: FnOnce(u16) -> R + Unw
|
||||
|
||||
// This only validates that a query to the server works, it shouldn't be used for more than this.
|
||||
// i.e. more complex checks live with the clients and authorities to validate deeper funcionality
|
||||
fn test_query(io_loop: &mut Core, client: BasicClientHandle) -> bool {
|
||||
fn query(io_loop: &mut Core, client: BasicClientHandle) -> bool {
|
||||
let name = domain::Name::with_labels(vec!["www".to_string(), "example".to_string(), "com".to_string()]);
|
||||
|
||||
println!("sending request");
|
||||
let response = io_loop.run(client.query(name.clone(), DNSClass::IN, RecordType::A));
|
||||
println!("got response: {}", response.is_ok());
|
||||
if response.is_err() { return false }
|
||||
let response = response.unwrap();
|
||||
|
||||
|
||||
let record = &response.get_answers()[0];
|
||||
|
||||
if let &RData::A(ref address) = record.get_rdata() {
|
||||
@ -116,14 +124,24 @@ fn test_query(io_loop: &mut Core, client: BasicClientHandle) -> bool {
|
||||
|
||||
#[test]
|
||||
fn test_example_toml_startup() {
|
||||
use trust_dns::logger;
|
||||
use log::LogLevel;
|
||||
logger::TrustDnsLogger::enable_logging(LogLevel::Debug);
|
||||
|
||||
named_test_harness("example.toml", |port| {
|
||||
let mut io_loop = Core::new().unwrap();
|
||||
let addr: SocketAddr = ("127.0.0.1", port).to_socket_addrs().unwrap().next().unwrap();
|
||||
let (stream, sender) = TcpClientStream::new(addr, io_loop.handle());
|
||||
let client = ClientFuture::new(stream, sender, io_loop.handle(), None);
|
||||
|
||||
assert!(test_query(&mut io_loop, client));
|
||||
assert!(true);
|
||||
assert!(query(&mut io_loop, client));
|
||||
|
||||
// just tests that multiple queries work
|
||||
let addr: SocketAddr = ("127.0.0.1", port).to_socket_addrs().unwrap().next().unwrap();
|
||||
let (stream, sender) = TcpClientStream::new(addr, io_loop.handle());
|
||||
let client = ClientFuture::new(stream, sender, io_loop.handle(), None);
|
||||
|
||||
assert!(query(&mut io_loop, client));
|
||||
})
|
||||
}
|
||||
|
||||
@ -136,22 +154,21 @@ fn test_ipv4_only_toml_startup() {
|
||||
let client = ClientFuture::new(stream, sender, io_loop.handle(), None);
|
||||
|
||||
// ipv4 should succeed
|
||||
assert!(test_query(&mut io_loop, client));
|
||||
assert!(query(&mut io_loop, client));
|
||||
|
||||
let addr: SocketAddr = ("::1", port).to_socket_addrs().unwrap().next().unwrap();
|
||||
let (stream, sender) = TcpClientStream::new(addr, io_loop.handle());
|
||||
let client = ClientFuture::new(stream, sender, io_loop.handle(), None);
|
||||
|
||||
// ipv6 should fail
|
||||
assert!(!test_query(&mut io_loop, client));
|
||||
|
||||
assert!(true);
|
||||
assert!(!query(&mut io_loop, client));
|
||||
})
|
||||
}
|
||||
|
||||
// TODO: this is commented out b/c at least on macOS, ipv4 will route properly to ipv6 only
|
||||
// listeners over the [::ffff:127.0.0.1] interface
|
||||
//
|
||||
// #[ignore]
|
||||
// #[test]
|
||||
// fn test_ipv6_only_toml_startup() {
|
||||
// named_test_harness("ipv6_only.toml", |port| {
|
||||
@ -161,14 +178,14 @@ fn test_ipv4_only_toml_startup() {
|
||||
// let client = ClientFuture::new(stream, sender, io_loop.handle(), None);
|
||||
//
|
||||
// // ipv4 should fail
|
||||
// assert!(!test_query(&mut io_loop, client));
|
||||
// assert!(!query(&mut io_loop, client));
|
||||
//
|
||||
// let addr: SocketAddr = ("::1", port).to_socket_addrs().unwrap().next().unwrap();
|
||||
// let (stream, sender) = TcpClientStream::new(addr, io_loop.handle());
|
||||
// let client = ClientFuture::new(stream, sender, io_loop.handle(), None);
|
||||
//
|
||||
// // ipv6 should succeed
|
||||
// assert!(test_query(&mut io_loop, client));
|
||||
// assert!(query(&mut io_loop, client));
|
||||
//
|
||||
// assert!(true);
|
||||
// })
|
||||
@ -184,14 +201,14 @@ fn test_ipv4_and_ipv6_toml_startup() {
|
||||
let client = ClientFuture::new(stream, sender, io_loop.handle(), None);
|
||||
|
||||
// ipv4 should succeed
|
||||
assert!(test_query(&mut io_loop, client));
|
||||
assert!(query(&mut io_loop, client));
|
||||
|
||||
let addr: SocketAddr = ("::1", port).to_socket_addrs().unwrap().next().unwrap();
|
||||
let (stream, sender) = TcpClientStream::new(addr, io_loop.handle());
|
||||
let client = ClientFuture::new(stream, sender, io_loop.handle(), None);
|
||||
|
||||
// ipv6 should succeed
|
||||
assert!(test_query(&mut io_loop, client));
|
||||
assert!(query(&mut io_loop, client));
|
||||
|
||||
assert!(true);
|
||||
})
|
||||
|
Loading…
Reference in New Issue
Block a user