Old client to future (#87)
* restructured SyncClient * original client now updated to use the ClientFuture
This commit is contained in:
parent
aeb484a49f
commit
1851904ea4
18
CHANGELOG.md
18
CHANGELOG.md
@ -2,7 +2,23 @@
|
||||
All notable changes to this project will be documented in this file.
|
||||
This project adheres to [Semantic Versioning](http://semver.org/).
|
||||
|
||||
## 0.9.4 (in progress)
|
||||
## 0.10.0 (in progress)
|
||||
### Changed
|
||||
- *Important* Possible breaking API change, the original Client has been renamed
|
||||
In an attempt to reduce the overhead of managing the project. The original
|
||||
Client has now been revamped to essentially be a synchronous Client over the
|
||||
ClientFuture implementation. The ClientFuture has proven to be a more stable
|
||||
and reliable implementation. It was attempted to make the move seamless,
|
||||
but two new types were introduced, `SyncClient` and `SecureSyncClient`, which
|
||||
are both synchronous implementations of the old Client function interfaces.
|
||||
Please read those docs on those new types and the Client trait.
|
||||
|
||||
### Removed
|
||||
- *Important* The original Server implementation was removed entirely. Please
|
||||
use the ServerFuture implementation from now on. Sorry for the inconvenience,
|
||||
but this is necessary to make sure that the software remains at a high quality
|
||||
and there is no easy way to migrate the original Server to use ServerFuture.
|
||||
|
||||
### Added
|
||||
- support for ECDSAP256SHA256, ECDSAP384SHA384 and ED25519 (client and server)
|
||||
- additional config options for keys to named, see `tests/named_test_configs/example.toml`
|
||||
|
41
Cargo.lock
generated
41
Cargo.lock
generated
@ -9,7 +9,6 @@ dependencies = [
|
||||
"futures 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"lazy_static 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"mio 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"openssl 0.9.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rand 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rusqlite 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
@ -61,11 +60,6 @@ name = "bitflags"
|
||||
version = "0.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
|
||||
[[package]]
|
||||
name = "bytes"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
|
||||
[[package]]
|
||||
name = "cfg-if"
|
||||
version = "0.1.0"
|
||||
@ -210,22 +204,6 @@ dependencies = [
|
||||
"toml 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mio"
|
||||
version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"bytes 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"libc 0.2.19 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"miow 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"net2 0.2.26 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"nix 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"slab 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"time 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mio"
|
||||
version = "0.6.2"
|
||||
@ -265,15 +243,6 @@ dependencies = [
|
||||
"ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nix"
|
||||
version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"bitflags 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"libc 0.2.19 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nix"
|
||||
version = "0.7.0"
|
||||
@ -425,11 +394,6 @@ name = "semver"
|
||||
version = "0.1.20"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
|
||||
[[package]]
|
||||
name = "slab"
|
||||
version = "0.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
|
||||
[[package]]
|
||||
name = "slab"
|
||||
version = "0.3.0"
|
||||
@ -504,7 +468,6 @@ dependencies = [
|
||||
"futures 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"lazy_static 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"mio 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"openssl 0.9.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rand 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"ring 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
@ -563,7 +526,6 @@ dependencies = [
|
||||
"checksum backtrace-sys 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "3602e8d8c43336088a8505fa55cae2b3884a9be29440863a11528a42f46f6bb7"
|
||||
"checksum bitflags 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8dead7461c1127cf637931a1e50934eb6eee8bff2f74433ac7909e9afcee04a3"
|
||||
"checksum bitflags 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "aad18937a628ec6abcd26d1489012cc0e18c21798210f491af69ded9b881106d"
|
||||
"checksum bytes 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c129aff112dcc562970abb69e2508b40850dd24c274761bb50fb8a0067ba6c27"
|
||||
"checksum cfg-if 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "de1e760d7b6535af4241fca8bd8adf68e2e7edacc6b29f5d399050c5e48cf88c"
|
||||
"checksum chrono 0.2.25 (registry+https://github.com/rust-lang/crates.io-index)" = "9213f7cd7c27e95c2b57c49f0e69b1ea65b27138da84a170133fd21b07659c00"
|
||||
"checksum data-encoding 1.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f13f03d68d1906eb3222536f5756953e30de4dff4417e5d428414fb8edb26723"
|
||||
@ -584,11 +546,9 @@ dependencies = [
|
||||
"checksum lru-cache 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "656fa4dfcb02bcf1063c592ba3ff6a5303ee1f2afe98c8a889e8b1a77c6dfdb7"
|
||||
"checksum memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d8b629fb514376c675b98c1421e80b151d3817ac42d7c667717d282761418d20"
|
||||
"checksum metadeps 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "829fffe7ea1d747e23f64be972991bc516b2f1ac2ae4a3b33d8bea150c410151"
|
||||
"checksum mio 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a637d1ca14eacae06296a008fa7ad955347e34efcb5891cfd8ba05491a37907e"
|
||||
"checksum mio 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "5b493dc9fd96bd2077f2117f178172b0765db4dfda3ea4d8000401e6d65d3e80"
|
||||
"checksum miow 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "3e690c5df6b2f60acd45d56378981e827ff8295562fc8d34f573deb267a59cd1"
|
||||
"checksum net2 0.2.26 (registry+https://github.com/rust-lang/crates.io-index)" = "5edf9cb6be97212423aed9413dd4729d62b370b5e1c571750e882cebbbc1e3e2"
|
||||
"checksum nix 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "bfb3ddedaa14746434a02041940495bf11325c22f6d36125d3bdd56090d50a79"
|
||||
"checksum nix 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a0d95c5fa8b641c10ad0b8887454ebaafa3c92b5cd5350f8fc693adafd178e7b"
|
||||
"checksum num 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)" = "bde7c03b09e7c6a301ee81f6ddf66d7a28ec305699e3d3b056d2fc56470e3120"
|
||||
"checksum num-integer 0.1.32 (registry+https://github.com/rust-lang/crates.io-index)" = "fb24d9bfb3f222010df27995441ded1e954f8f69cd35021f6bef02ca9552fb92"
|
||||
@ -608,7 +568,6 @@ dependencies = [
|
||||
"checksum rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "c5f5376ea5e30ce23c03eb77cbe4962b988deead10910c372b226388b594c084"
|
||||
"checksum scoped-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f417c22df063e9450888a7561788e9bd46d3bb3c1466435b4eccb903807f147d"
|
||||
"checksum semver 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)" = "d4f410fedcf71af0345d7607d246e7ad15faaadd49d240ee3b24e5dc21a820ac"
|
||||
"checksum slab 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "d807fd58c4181bbabed77cb3b891ba9748241a552bcc5be698faaebefc54f46e"
|
||||
"checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23"
|
||||
"checksum strsim 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "67f84c44fbb2f91db7fef94554e6b2ac05909c9c0b0bc23bb98d3a1aebfe7f7c"
|
||||
"checksum thread-id 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a9539db560102d1cef46b8b78ce737ff0bb64e7e18d35b2a5688f7d097d0ff03"
|
||||
|
11
README.md
11
README.md
@ -218,17 +218,6 @@ It is a default feature, so default-features will need to be set to false (this
|
||||
trust-dns = { version = "*", default-features = false, features = ["openssl"] }
|
||||
```
|
||||
|
||||
- mio_client
|
||||
|
||||
Also a default feature, this exables the old deprecated MIO based client. This will remove an independent dependency requirement on the MIO library (there is an implicit dependency on MIO via the tokio-rs library, that will not be removed). Disabling this feature will only compile in the futures-rs based client. The below example line will disable all default features and enable mio_client, remove `"mio_client"` to remove the direct dependency on MIO.
|
||||
|
||||
```
|
||||
[dependencies]
|
||||
...
|
||||
trust-dns = { version = "*", default-features = false, features = ["mio_client"] }
|
||||
```
|
||||
|
||||
|
||||
# FAQ
|
||||
|
||||
- Why are you building another DNS server?
|
||||
|
@ -37,8 +37,7 @@ license = "MIT/Apache-2.0"
|
||||
build = "build.rs"
|
||||
|
||||
[features]
|
||||
default = ["openssl", "mio_client", "ring"]
|
||||
mio_client = ["mio"]
|
||||
default = ["openssl", "ring"]
|
||||
|
||||
[lib]
|
||||
name = "trust_dns"
|
||||
@ -52,10 +51,8 @@ error-chain = "0.1.12"
|
||||
futures = "^0.1.6"
|
||||
lazy_static = "^0.2.1"
|
||||
log = "^0.3.5"
|
||||
mio = { version = "^0.5.1", optional = true }
|
||||
openssl = { version = "^0.9.5", optional = true }
|
||||
rand = "^0.3"
|
||||
# ring = { version = "^0.6", optional = true, features = ["rsa_signing"] }
|
||||
ring = { version = "^0.6", optional = true }
|
||||
rustc-serialize = "^0.3.18"
|
||||
time = "^0.1"
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -14,17 +14,16 @@
|
||||
|
||||
//! Trait for client connections
|
||||
|
||||
use std::fmt::Debug;
|
||||
use std::io;
|
||||
|
||||
use ::error::*;
|
||||
use futures::Future;
|
||||
use tokio_core::reactor::Core;
|
||||
|
||||
use ::client::ClientStreamHandle;
|
||||
|
||||
/// Trait for client connections
|
||||
pub trait ClientConnection: Sized+Debug {
|
||||
/// Sends a serialized message to via this connection, returning the serialized response.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `bytes` - the serialized Message
|
||||
fn send(&mut self, bytes: Vec<u8>) -> ClientResult<Vec<u8>>;
|
||||
// TODO: split connect, send and read...
|
||||
pub trait ClientConnection: Sized {
|
||||
type MessageStream;
|
||||
|
||||
fn unwrap(self) -> (Core, Box<Future<Item=Self::MessageStream, Error=io::Error>>, Box<ClientStreamHandle>);
|
||||
}
|
||||
|
@ -796,7 +796,6 @@ pub trait ClientHandle: Clone {
|
||||
///
|
||||
/// * `record` - The name, class and record_type will be used to match and delete the RecordSet
|
||||
/// * `zone_origin` - the zone name to update, i.e. SOA name
|
||||
/// * `signer` - the signer, with private key, to use to sign the request
|
||||
///
|
||||
/// The update must go to a zone authority (i.e. the server used in the ClientConnection). If
|
||||
/// the rrset does not exist and must_exist is false, then the RRSet will be deleted.
|
||||
@ -853,7 +852,6 @@ pub trait ClientHandle: Clone {
|
||||
/// * `name_of_records` - the name of all the record sets to delete
|
||||
/// * `zone_origin` - the zone name to update, i.e. SOA name
|
||||
/// * `dns_class` - the class of the SOA
|
||||
/// * `signer` - the signer, with private key, to use to sign the request
|
||||
///
|
||||
/// The update must go to a zone authority (i.e. the server used in the ClientConnection). This
|
||||
/// operation attempts to delete all resource record sets the the specified name reguardless of
|
||||
|
@ -16,9 +16,7 @@
|
||||
|
||||
//! DNS Client associated classes for performing queries and other operations.
|
||||
|
||||
#[cfg(feature = "mio_client")]
|
||||
mod client;
|
||||
#[cfg(feature = "mio_client")]
|
||||
mod client_connection;
|
||||
mod client_future;
|
||||
mod memoize_client_handle;
|
||||
@ -27,9 +25,7 @@ mod retry_client_handle;
|
||||
mod secure_client_handle;
|
||||
|
||||
#[allow(deprecated)]
|
||||
#[cfg(feature = "mio_client")]
|
||||
pub use self::client::Client;
|
||||
#[cfg(feature = "mio_client")]
|
||||
pub use self::client::{Client, SecureSyncClient, SyncClient};
|
||||
pub use self::client_connection::ClientConnection;
|
||||
pub use self::client_future::{ClientFuture, BasicClientHandle, ClientHandle, StreamHandle, ClientStreamHandle};
|
||||
pub use self::memoize_client_handle::MemoizeClientHandle;
|
||||
|
@ -33,8 +33,6 @@ extern crate data_encoding;
|
||||
#[macro_use] extern crate futures;
|
||||
#[macro_use] extern crate lazy_static;
|
||||
#[macro_use] extern crate log;
|
||||
#[cfg(feature = "mio_client")]
|
||||
extern crate mio;
|
||||
#[cfg(feature = "openssl")]
|
||||
extern crate openssl;
|
||||
extern crate rand;
|
||||
|
@ -16,18 +16,13 @@
|
||||
|
||||
//! TCP protocol related components for DNS.
|
||||
|
||||
#[cfg(feature = "mio_client")]
|
||||
mod handler;
|
||||
#[cfg(feature = "mio_client")]
|
||||
// mod handler;
|
||||
mod tcp_client_connection;
|
||||
mod tcp_client_stream;
|
||||
mod tcp_stream;
|
||||
|
||||
#[cfg(feature = "mio_client")]
|
||||
pub use self::handler::TcpHandler;
|
||||
#[cfg(feature = "mio_client")]
|
||||
pub use self::handler::TcpState;
|
||||
#[cfg(feature = "mio_client")]
|
||||
// pub use self::handler::TcpHandler;
|
||||
// pub use self::handler::TcpState;
|
||||
pub use self::tcp_client_connection::TcpClientConnection;
|
||||
pub use self::tcp_client_stream::TcpClientStream;
|
||||
pub use self::tcp_stream::TcpStream;
|
||||
|
@ -16,23 +16,19 @@
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::io;
|
||||
use std::mem;
|
||||
use std::fmt;
|
||||
|
||||
use mio::tcp::TcpStream;
|
||||
use mio::{Token, EventLoop, Handler, EventSet, PollOpt}; // not * b/c don't want confusion with std::net
|
||||
use futures::Future;
|
||||
use tokio_core::reactor::Core;
|
||||
|
||||
use ::error::*;
|
||||
use ::client::ClientConnection;
|
||||
use ::tcp::{TcpHandler, TcpState};
|
||||
|
||||
const RESPONSE: Token = Token(0);
|
||||
use ::client::{ClientConnection, ClientStreamHandle};
|
||||
use ::tcp::TcpClientStream;
|
||||
|
||||
/// TCP based DNS client
|
||||
pub struct TcpClientConnection {
|
||||
handler: Option<TcpHandler>,
|
||||
event_loop: EventLoop<ClientHandler>,
|
||||
error: Option<ClientError>,
|
||||
io_loop: Core,
|
||||
tcp_client_stream: Box<Future<Item=TcpClientStream, Error=io::Error>>,
|
||||
client_stream_handle: Box<ClientStreamHandle>,
|
||||
}
|
||||
|
||||
impl TcpClientConnection {
|
||||
@ -45,124 +41,17 @@ impl TcpClientConnection {
|
||||
///
|
||||
/// * `name_server` - address of the name server to use for queries
|
||||
pub fn new(name_server: SocketAddr) -> ClientResult<Self> {
|
||||
// TODO: randomize local port binding issue #23
|
||||
// probably not necessary for TCP...
|
||||
debug!("connecting to {:?}", name_server);
|
||||
let stream = try!(TcpStream::connect(&name_server));
|
||||
let io_loop = try!(Core::new());
|
||||
let (tcp_client_stream, handle) = TcpClientStream::new(name_server, io_loop.handle());
|
||||
|
||||
let mut event_loop: EventLoop<ClientHandler> = try!(EventLoop::new());
|
||||
// TODO make the timeout configurable, 5 seconds is the dig default
|
||||
// TODO the error is private to mio, which makes this awkward...
|
||||
if event_loop.timeout_ms((), 5000).is_err() { return Err(ClientErrorKind::Message("error setting timer").into()) };
|
||||
// TODO: Linux requires a register before a reregister, reregister is needed b/c of OSX later
|
||||
// ideally this would not be added to the event loop until the client connection request.
|
||||
try!(event_loop.register(&stream, RESPONSE, EventSet::all(), PollOpt::all()));
|
||||
|
||||
Ok(TcpClientConnection{ handler: Some(TcpHandler::new_client_handler(stream)), event_loop: event_loop, error: None })
|
||||
Ok(TcpClientConnection{ io_loop: io_loop, tcp_client_stream: tcp_client_stream, client_stream_handle: handle })
|
||||
}
|
||||
}
|
||||
|
||||
impl ClientConnection for TcpClientConnection {
|
||||
fn send(&mut self, buffer: Vec<u8> ) -> ClientResult<Vec<u8>> {
|
||||
self.error = None;
|
||||
// TODO: b/c of OSX this needs to be a reregister (since deregister is not working)
|
||||
// ideally it should be a register with the later deregister...
|
||||
try!(self.event_loop.reregister(self.handler.as_ref().expect("never none 70").get_stream(), RESPONSE, EventSet::all(), PollOpt::all()));
|
||||
// this is the request message, needs to be set each time
|
||||
// TODO: it would be cool to reuse this buffer.
|
||||
let mut handler = mem::replace(&mut self.handler, None).expect("never none 73");
|
||||
handler.set_buffer(buffer);
|
||||
let mut client_handler = ClientHandler{ handler: handler, error: None };
|
||||
let result = self.event_loop.run(&mut client_handler);
|
||||
self.handler = Some(client_handler.handler);
|
||||
type MessageStream = TcpClientStream;
|
||||
|
||||
try!(result);
|
||||
|
||||
if self.error.is_some() { return Err(mem::replace(&mut self.error, None).unwrap()) }
|
||||
Ok(self.handler.as_mut().expect("never none 82").remove_buffer())
|
||||
//debug!("client deregistering");
|
||||
// TODO: when this line is added OSX starts failing, but we should have it...
|
||||
// try!(self.event_loop.deregister(&response.stream));
|
||||
fn unwrap(self) -> (Core, Box<Future<Item=Self::MessageStream, Error=io::Error>>, Box<ClientStreamHandle>) {
|
||||
(self.io_loop, self.tcp_client_stream, self.client_stream_handle)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for TcpClientConnection {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "TcpClientConnection: {:?}", self.handler.as_ref().expect("never none 91").get_stream())
|
||||
}
|
||||
}
|
||||
|
||||
struct ClientHandler {
|
||||
pub handler: TcpHandler,
|
||||
pub error: Option<ClientError>,
|
||||
}
|
||||
|
||||
// TODO: this should be merged with the server handler
|
||||
impl Handler for ClientHandler {
|
||||
type Timeout = ();
|
||||
type Message = ();
|
||||
|
||||
fn ready(&mut self, event_loop: &mut EventLoop<Self>, token: Token, events: EventSet) {
|
||||
match token {
|
||||
RESPONSE => {
|
||||
if events.is_error() {
|
||||
warn!("closing, error from: {:?}", self.handler.get_stream());
|
||||
// TODO: do we need to shutdown the stream?
|
||||
event_loop.shutdown();
|
||||
} else if events.is_hup() {
|
||||
info!("client hungup: {:?}", self.handler.get_stream());
|
||||
// TODO: do we need to shutdown the stream?
|
||||
//remove = Some(RemoveFrom::TcpHandlers(token));
|
||||
event_loop.shutdown();
|
||||
} else if events.is_readable() || events.is_writable() {
|
||||
// the handler will deal with the rest of the connection, we need to check the return value
|
||||
// for an error with wouldblock, this means that the handler couldn't complete the request.
|
||||
match self.handler.handle_message(events) {
|
||||
Ok(TcpState::Done) => {
|
||||
// the shutdown will stop the event_loop run to return the requester
|
||||
self.handler.reset();
|
||||
event_loop.shutdown();
|
||||
},
|
||||
Ok(..) => {
|
||||
// registering the event to only wake up on the correct event
|
||||
// this reduces looping on states like writable that can remain set for a long time
|
||||
//if let Err(e) = event_loop.reregister(handler.get_stream(), token, handler.get_events(), PollOpt::level()) {
|
||||
debug!("reregistering for next call: {:?}", self.handler.get_events());
|
||||
if let Err(e) = event_loop.reregister(self.handler.get_stream(), token, self.handler.get_events(), PollOpt::all()) {
|
||||
error!("could not reregister stream: {:?} cause: {}", self.handler.get_stream(), e);
|
||||
// TODO: need to return an error here
|
||||
//remove = Some(RemoveFrom::TcpHandlers(token));
|
||||
}
|
||||
},
|
||||
Err(ref e) if io::ErrorKind::WouldBlock == e.kind() => {
|
||||
debug!("WouldBlock, reregistering for next call: {:?}", self.handler.get_events());
|
||||
if let Err(e) = event_loop.reregister(self.handler.get_stream(), token, self.handler.get_events(), PollOpt::all()) {
|
||||
error!("could not reregister stream: {:?} cause: {}", self.handler.get_stream(), e);
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
// shutdown the connection, remove it.
|
||||
warn!("connection: {:?} shutdown on error: {}", self.handler.get_stream(), e);
|
||||
// TODO: do we need to shutdown the stream?
|
||||
//remove = Some(RemoveFrom::TcpHandlers(token));
|
||||
// TODO: need to return an error here
|
||||
//self.error = Some(e);
|
||||
event_loop.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
error!("unrecognized token: {:?}", token);
|
||||
self.error = Some(ClientErrorKind::Message("no data was received from the remote").into());
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn timeout(&mut self, event_loop: &mut EventLoop<Self>, _: ()) {
|
||||
self.error = Some(ClientErrorKind::Message("timed out awaiting response from server(s)").into());
|
||||
event_loop.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: should test this independently of the client code
|
||||
|
@ -334,7 +334,7 @@ fn tcp_client_stream_test(server_addr: IpAddr) {
|
||||
// the tests should run within 5 seconds... right?
|
||||
// TODO: add timeout here, so that test never hangs...
|
||||
// let timeout = Timeout::new(Duration::from_secs(5), &io_loop.handle());
|
||||
let (stream, mut sender) = TcpStream::new(server_addr, io_loop.handle());
|
||||
let (stream, sender) = TcpStream::new(server_addr, io_loop.handle());
|
||||
|
||||
let mut stream: TcpStream = io_loop.run(stream).ok().expect("run failed to get stream");
|
||||
|
||||
|
@ -16,18 +16,13 @@
|
||||
|
||||
//! UDP protocol related components for DNS.
|
||||
|
||||
#[cfg(feature = "mio_client")]
|
||||
mod handler;
|
||||
#[cfg(feature = "mio_client")]
|
||||
// mod handler;
|
||||
mod udp_client_connection;
|
||||
mod udp_client_stream;
|
||||
mod udp_stream;
|
||||
|
||||
#[cfg(feature = "mio_client")]
|
||||
pub use self::handler::UdpHandler;
|
||||
#[cfg(feature = "mio_client")]
|
||||
pub use self::handler::UdpState;
|
||||
#[cfg(feature = "mio_client")]
|
||||
// pub use self::handler::UdpHandler;
|
||||
// pub use self::handler::UdpState;
|
||||
pub use self::udp_client_connection::UdpClientConnection;
|
||||
pub use self::udp_client_stream::UdpClientStream;
|
||||
pub use self::udp_stream::UdpStream;
|
||||
|
@ -14,45 +14,24 @@
|
||||
|
||||
//! UDP based DNS client
|
||||
|
||||
use std::mem;
|
||||
use std::net::{SocketAddr, ToSocketAddrs};
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use mio::udp::UdpSocket;
|
||||
use mio::{Token, EventLoop, Handler, EventSet, PollOpt}; // not * b/c don't want confusion with std::net
|
||||
use rand::Rng;
|
||||
use rand;
|
||||
use futures::Future;
|
||||
use tokio_core::reactor::Core;
|
||||
|
||||
use ::error::*;
|
||||
use client::ClientConnection;
|
||||
|
||||
const RESPONSE: Token = Token(0);
|
||||
use ::client::{ClientConnection, ClientStreamHandle};
|
||||
use ::udp::UdpClientStream;
|
||||
|
||||
/// UDP based DNS client
|
||||
pub struct UdpClientConnection {
|
||||
name_server: SocketAddr,
|
||||
socket: Option<UdpSocket>,
|
||||
event_loop: EventLoop<Response>,
|
||||
io_loop: Core,
|
||||
udp_client_stream: Box<Future<Item=UdpClientStream, Error=io::Error>>,
|
||||
client_stream_handle: Box<ClientStreamHandle>,
|
||||
}
|
||||
|
||||
impl UdpClientConnection {
|
||||
fn next_bound_local_address() -> ClientResult<UdpSocket> {
|
||||
let mut rand = rand::thread_rng();
|
||||
|
||||
let mut error = Err(ClientErrorKind::Message("could not bind address in 10 tries").into());
|
||||
for _ in 0..10 {
|
||||
let zero_addr = ("0.0.0.0", rand.gen_range(1025_u16, u16::max_value())).to_socket_addrs().expect("could not parse 0.0.0.0 address").
|
||||
next().expect("no addresses parsed from 0.0.0.0");
|
||||
|
||||
match UdpSocket::bound(&zero_addr) {
|
||||
Ok(socket) => return Ok(socket),
|
||||
Err(err) => error = Err(err.into()),
|
||||
}
|
||||
}
|
||||
|
||||
error
|
||||
}
|
||||
|
||||
/// Creates a new client connection.
|
||||
///
|
||||
/// *Note* this has side affects of binding the socket to 0.0.0.0 and starting the listening
|
||||
@ -62,127 +41,17 @@ impl UdpClientConnection {
|
||||
///
|
||||
/// * `name_server` - address of the name server to use for queries
|
||||
pub fn new(name_server: SocketAddr) -> ClientResult<Self> {
|
||||
// TODO: allow the bind address to be specified...
|
||||
// client binds to all addresses... this shouldn't ever fail
|
||||
let socket = try!(Self::next_bound_local_address());
|
||||
let mut event_loop: EventLoop<Response> = try!(EventLoop::new());
|
||||
// TODO make the timeout configurable, 5 seconds is the dig default
|
||||
// TODO the error is private to mio, which makes this awkward...
|
||||
if event_loop.timeout_ms((), 5000).is_err() { return Err(ClientErrorKind::Message("error setting timer").into()) };
|
||||
// TODO: Linux requires a register before a reregister, reregister is needed b/c of OSX later
|
||||
// ideally this would not be added to the event loop until the client connection request.
|
||||
try!(event_loop.register(&socket, RESPONSE, EventSet::readable(), PollOpt::all()));
|
||||
let io_loop = try!(Core::new());
|
||||
let (udp_client_stream, handle) = UdpClientStream::new(name_server, io_loop.handle());
|
||||
|
||||
debug!("client event_loop created");
|
||||
|
||||
Ok(UdpClientConnection{name_server: name_server, socket: Some(socket), event_loop: event_loop})
|
||||
Ok(UdpClientConnection{ io_loop: io_loop, udp_client_stream: udp_client_stream, client_stream_handle: handle })
|
||||
}
|
||||
}
|
||||
|
||||
impl ClientConnection for UdpClientConnection {
|
||||
fn send(&mut self, buffer: Vec<u8>) -> ClientResult<Vec<u8>> {
|
||||
debug!("client reregistering");
|
||||
// TODO: b/c of OSX this needs to be a reregister (since deregister is not working)
|
||||
try!(self.event_loop.reregister(self.socket.as_ref().expect("never none 86"), RESPONSE, EventSet::readable(), PollOpt::all()));
|
||||
debug!("client sending");
|
||||
try!(self.socket.as_ref().expect("never none 88").send_to(&buffer, &self.name_server));
|
||||
debug!("client sent data");
|
||||
type MessageStream = UdpClientStream;
|
||||
|
||||
// get the response to return
|
||||
let mut response: Response = Response::new(mem::replace(&mut self.socket, None).expect("never none 92"));
|
||||
|
||||
// run_once should be enough, if something else nepharious hits the socket, what?
|
||||
try!(self.event_loop.run(&mut response));
|
||||
debug!("client event_loop running");
|
||||
|
||||
|
||||
if response.error.is_some() { return Err(response.error.unwrap()) }
|
||||
if response.buf.is_none() { return Err(ClientErrorKind::Message("no data was received from the remote").into()) }
|
||||
let result = Ok(response.buf.unwrap());
|
||||
//debug!("client deregistering");
|
||||
// TODO: when this line is added OSX starts failing, but we should have it...
|
||||
// try!(self.event_loop.deregister(&response.socket));
|
||||
self.socket = Some(response.socket);
|
||||
result
|
||||
fn unwrap(self) -> (Core, Box<Future<Item=Self::MessageStream, Error=io::Error>>, Box<ClientStreamHandle>) {
|
||||
(self.io_loop, self.udp_client_stream, self.client_stream_handle)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for UdpClientConnection {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "UdpClientConnection ns: {:?} socket: {:?}", self.name_server, self.socket)
|
||||
}
|
||||
}
|
||||
|
||||
struct Response {
|
||||
pub buf: Option<Vec<u8>>,
|
||||
pub addr: Option<SocketAddr>,
|
||||
pub error: Option<ClientError>,
|
||||
pub socket: UdpSocket,
|
||||
}
|
||||
|
||||
impl Response {
|
||||
pub fn new(socket: UdpSocket) -> Self {
|
||||
Response{ buf: None, addr: None, error: None, socket: socket }
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: this should be merged with the server handler
|
||||
impl Handler for Response {
|
||||
type Timeout = ();
|
||||
type Message = ();
|
||||
|
||||
fn ready(&mut self, event_loop: &mut EventLoop<Self>, token: Token, events: EventSet) {
|
||||
match token {
|
||||
RESPONSE => {
|
||||
if !events.is_readable() {
|
||||
debug!("got woken up, but not readable: {:?}", token);
|
||||
return
|
||||
}
|
||||
|
||||
let mut buf: [u8; 4096] = [0u8; 4096];
|
||||
|
||||
let recv_result = self.socket.recv_from(&mut buf);
|
||||
if recv_result.is_err() {
|
||||
// debug b/c we're returning the error explicitly
|
||||
debug!("could not recv_from on {:?}: {:?}", self.socket, recv_result);
|
||||
self.error = Some(recv_result.unwrap_err().into());
|
||||
return
|
||||
}
|
||||
|
||||
if recv_result.as_ref().unwrap().is_none() {
|
||||
// debug b/c we're returning the error explicitly
|
||||
debug!("no return address on recv_from: {:?}", self.socket);
|
||||
self.error = Some(ClientErrorKind::Message("no address received in response").into());
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: ignore if not from the IP that we requested
|
||||
let (length, addr) = recv_result.unwrap().unwrap();
|
||||
debug!("bytes: {:?} from: {:?}", length, addr);
|
||||
self.addr = Some(addr);
|
||||
|
||||
if length == 0 {
|
||||
debug!("0 bytes recieved from: {}", addr);
|
||||
return
|
||||
}
|
||||
|
||||
// we got our response, shutdown.
|
||||
event_loop.shutdown();
|
||||
|
||||
// set our data
|
||||
self.buf = Some(buf.iter().take(length).cloned().collect());
|
||||
},
|
||||
_ => {
|
||||
error!("unrecognized token: {:?}", token);
|
||||
self.error = Some(ClientErrorKind::Message("no data was received from the remote").into());
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn timeout(&mut self, event_loop: &mut EventLoop<Self>, _: ()) {
|
||||
self.error = Some(ClientErrorKind::Message("timed out awaiting response from server(s)").into());
|
||||
event_loop.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: should test this independently of the client code
|
||||
|
@ -261,7 +261,7 @@ fn udp_stream_test(server_addr: std::net::IpAddr) {
|
||||
};
|
||||
|
||||
let socket = std::net::UdpSocket::bind(client_addr).expect("could not create socket"); // some random address...
|
||||
let (mut stream, mut sender) = UdpStream::with_bound(socket, io_loop.handle());
|
||||
let (mut stream, sender) = UdpStream::with_bound(socket, io_loop.handle());
|
||||
//let mut stream: UdpStream = io_loop.run(stream).ok().unwrap();
|
||||
|
||||
for _ in 0..send_recv_times {
|
||||
|
@ -55,7 +55,6 @@ error-chain = "0.1.12"
|
||||
futures = "^0.1"
|
||||
lazy_static = "^0.2.1"
|
||||
log = "^0.3.5"
|
||||
mio = "^0.5.1"
|
||||
openssl = "^0.9.5"
|
||||
rand = "^0.3"
|
||||
rustc-serialize = "^0.3.18"
|
||||
|
@ -30,7 +30,6 @@ extern crate chrono;
|
||||
#[macro_use] extern crate error_chain;
|
||||
#[macro_use] extern crate futures;
|
||||
#[macro_use] extern crate log;
|
||||
extern crate mio;
|
||||
extern crate openssl;
|
||||
extern crate rusqlite;
|
||||
extern crate rustc_serialize;
|
||||
@ -44,8 +43,6 @@ pub mod config;
|
||||
pub mod error;
|
||||
pub mod server;
|
||||
|
||||
#[allow(deprecated)]
|
||||
pub use self::server::Server;
|
||||
pub use self::server::ServerFuture;
|
||||
|
||||
/// this exposes a version function which gives access to the access
|
||||
|
@ -17,14 +17,11 @@
|
||||
//! `Server` component for hosting a domain name servers operations.
|
||||
|
||||
mod request_stream;
|
||||
mod server;
|
||||
mod server_future;
|
||||
mod timeout_stream;
|
||||
|
||||
pub use self::request_stream::Request;
|
||||
pub use self::request_stream::RequestStream;
|
||||
pub use self::request_stream::ResponseHandle;
|
||||
#[allow(deprecated)]
|
||||
pub use self::server::Server;
|
||||
pub use self::server_future::ServerFuture;
|
||||
pub use self::timeout_stream::TimeoutStream;
|
||||
|
@ -1,424 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2015 Benjamin Fry <benjaminfry@me.com>
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
use std::collections::HashMap;
|
||||
use std::collections::VecDeque;
|
||||
use std::io;
|
||||
use std::sync::Arc;
|
||||
use std::cell::Cell;
|
||||
|
||||
use mio::{Token, Evented, EventLoop, Handler, EventSet, PollOpt};
|
||||
use mio::tcp::{TcpListener, TcpStream};
|
||||
use mio::udp::UdpSocket;
|
||||
|
||||
use trust_dns::op::{Message, OpCode, ResponseCode, RequestHandler};
|
||||
use trust_dns::serialize::binary::{BinDecoder, BinEncoder, BinSerializable};
|
||||
use trust_dns::tcp::{TcpHandler, TcpState};
|
||||
use trust_dns::udp::{UdpHandler, UdpState};
|
||||
|
||||
use ::authority::Catalog;
|
||||
|
||||
// TODO, might be cool to store buffers for later usage...
|
||||
#[deprecated = "will be removed post 0.9.x"]
|
||||
pub struct Server {
|
||||
handlers: HashMap<Token, DnsHandlerType>,
|
||||
next_token: Cell<usize>,
|
||||
catalog: Arc<Catalog>, // should the catalog just be static?
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
impl Server {
|
||||
pub fn new(catalog: Catalog) -> Server {
|
||||
Server {
|
||||
handlers: HashMap::new(),
|
||||
next_token: Cell::new(0),
|
||||
catalog: Arc::new(catalog),
|
||||
}
|
||||
}
|
||||
|
||||
fn next_token(&self) -> Token {
|
||||
for _ in 0..100 {
|
||||
self.next_token.set(self.next_token.get()+1);
|
||||
let token: Token = Token(self.next_token.get());
|
||||
if self.handlers.contains_key(&token) { continue }
|
||||
|
||||
// ok, safe to use
|
||||
return token;
|
||||
}
|
||||
|
||||
panic!("tried to get the next token 100 times, failed");
|
||||
}
|
||||
|
||||
/// register a UDP socket. Should be bound before calling this.
|
||||
pub fn register_socket(&mut self, socket: UdpSocket) {
|
||||
let token = self.next_token();
|
||||
self.handlers.insert(token, DnsHandlerType::UdpSocket((socket, VecDeque::new())));
|
||||
}
|
||||
|
||||
/// register a TcpListener to the Server. This should already be bound to either an IPv6 or an
|
||||
/// IPv4 address.
|
||||
pub fn register_listener(&mut self, listener: TcpListener) {
|
||||
let token = self.next_token();
|
||||
self.handlers.insert(token, DnsHandlerType::TcpListener(listener));
|
||||
}
|
||||
|
||||
/// TODO how to do threads? should we do a bunch of listener threads and then query threads?
|
||||
/// Ideally the processing would be n-threads for recieving, which hand off to m-threads for
|
||||
/// request handling. It would generally be the case that n <= m.
|
||||
pub fn listen(&mut self) -> io::Result<()> {
|
||||
info!("Server starting up");
|
||||
let mut event_loop: EventLoop<Self> = try!(EventLoop::new());
|
||||
|
||||
// registering these on non-writable events, since these are the listeners.
|
||||
for (token, handler) in self.handlers.iter() {
|
||||
match *handler {
|
||||
DnsHandlerType::UdpSocket(ref handler) => try!(event_loop.register(handler.get_socket(), *token, !EventSet::writable(), PollOpt::all())),
|
||||
DnsHandlerType::TcpListener(ref handler) => try!(event_loop.register(handler.get_socket(), *token, !EventSet::writable(), PollOpt::all())),
|
||||
DnsHandlerType::TcpHandler(_) => panic!("tcp handlers should not have been registered yet"),
|
||||
}
|
||||
}
|
||||
|
||||
try!(event_loop.run(self));
|
||||
|
||||
Err(io::Error::new(io::ErrorKind::Interrupted, "Server stopping due to interruption"))
|
||||
}
|
||||
|
||||
/// given a set of bytes, decode and process the request, producing a response to send
|
||||
fn process_request(bytes: &[u8], stream: &TcpStream, catalog: &Catalog) -> Message {
|
||||
let mut decoder = BinDecoder::new(bytes);
|
||||
let request = Message::read(&mut decoder);
|
||||
|
||||
match request {
|
||||
Err(ref decode_error) => {
|
||||
warn!("unable to decode request from client: {:?}: {}", stream, decode_error);
|
||||
Message::error_msg(0/* id is in the message... */, OpCode::Query/* right default? */, ResponseCode::FormErr)
|
||||
},
|
||||
Ok(ref req) => catalog.handle_request(req),
|
||||
}
|
||||
}
|
||||
|
||||
/// encodes a message to the specified buffer
|
||||
fn encode_message(response: Message, buffer: &mut Vec<u8>) -> io::Result<()> {
|
||||
// all responses need these fields set:
|
||||
buffer.clear();
|
||||
let encode_result = {
|
||||
let mut encoder: BinEncoder = BinEncoder::new(buffer);
|
||||
response.emit(&mut encoder)
|
||||
};
|
||||
|
||||
if let Err(encode_error) = encode_result {
|
||||
error!("error encoding response to client: {}", encode_error);
|
||||
let err_msg = Message::error_msg(response.get_id(), response.get_op_code(), ResponseCode::ServFail);
|
||||
|
||||
buffer.clear();
|
||||
let mut encoder: BinEncoder = BinEncoder::new(buffer);
|
||||
err_msg.emit(&mut encoder).unwrap(); // this is a coding error if it fails
|
||||
}
|
||||
|
||||
// ready to write to the other side, double check that our buffer is legit first.
|
||||
if buffer.len() > u16::max_value() as usize() {
|
||||
error!("too many bytes to write for u16, {}", buffer.len());
|
||||
return Err(io::Error::new(io::ErrorKind::InvalidData, "did not write the length"));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// each dns handler type is used for managing client requests.
|
||||
enum DnsHandlerType {
|
||||
// the deque represents responses that need to be sent back to the client
|
||||
// as of now this these are local requests, but in the future these will be resolver based
|
||||
// responses, which will have some time between resolve and response
|
||||
UdpSocket((UdpSocket, VecDeque<UdpHandler>)),
|
||||
// Inbound TCP connections
|
||||
TcpListener(TcpListener),
|
||||
// Handlers for the TCP connections
|
||||
TcpHandler(TcpHandler),
|
||||
}
|
||||
|
||||
/// Handler for DNS requests
|
||||
trait DnsHandler {
|
||||
/// Called when the Evented of the Handler is woken up on activity
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `events` - the set of events that that woke this socket up
|
||||
/// * `catalog` - the local catalog for lookups
|
||||
///
|
||||
/// # Return
|
||||
///
|
||||
/// Returns a tuple of the next event_set for this handler, and/or a new handler to add to the
|
||||
/// the event_loop. If the first of the tuple is None, self will be removed from the event_loop.
|
||||
/// If the second is None, nothing will happen, otherwise the new handler will be added to the
|
||||
/// event_loop.
|
||||
fn handle(&mut self, events: EventSet, catalog: &Arc<Catalog>) -> (Option<EventSet>, Option<(DnsHandlerType, EventSet)>);
|
||||
|
||||
/// returns the Evented which self wraps.
|
||||
fn get_socket(&self) -> &Evented;
|
||||
}
|
||||
|
||||
impl DnsHandler for DnsHandlerType {
|
||||
fn handle(&mut self, events: EventSet, catalog: &Arc<Catalog>) -> (Option<EventSet>, Option<(DnsHandlerType, EventSet)>) {
|
||||
match *self {
|
||||
DnsHandlerType::UdpSocket(ref mut udp_handler) => udp_handler.handle(events, catalog),
|
||||
DnsHandlerType::TcpListener(ref mut tcp_listener) => tcp_listener.handle(events, catalog),
|
||||
DnsHandlerType::TcpHandler(ref mut tcp_handler) => tcp_handler.handle(events, catalog),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_socket(&self) -> &Evented {
|
||||
match *self {
|
||||
DnsHandlerType::UdpSocket(ref udp_handler) => udp_handler.get_socket() as &Evented,
|
||||
DnsHandlerType::TcpListener(ref tcp_listener) => tcp_listener.get_socket() as &Evented,
|
||||
DnsHandlerType::TcpHandler(ref tcp_handler) => tcp_handler.get_socket() as &Evented,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DnsHandler for TcpListener {
|
||||
fn handle(&mut self, events: EventSet, _: &Arc<Catalog>) -> (Option<EventSet>, Option<(DnsHandlerType, EventSet)>) {
|
||||
if events.is_error() { panic!("unexpected error state on: {:?}", self) }
|
||||
else if events.is_hup() { panic!("listening socket hungup: {:?}", self) }
|
||||
else if events.is_readable() || events.is_writable() {
|
||||
// there's a new connection coming in
|
||||
// give it a new token and insert the stream on the eventlistener
|
||||
// then store in the map for reference when dealing with new streams
|
||||
for _ in 0..100 { // loop a max of 100 times, don't want to starve the responses.
|
||||
match self.accept() {
|
||||
Ok(Some((stream, addr))) => {
|
||||
info!("new tcp connection from: {}", addr);
|
||||
return (Some(EventSet::all()), Some((DnsHandlerType::TcpHandler(TcpHandler::new_server_handler(stream)),
|
||||
!EventSet::writable())))
|
||||
},
|
||||
Ok(None) => {
|
||||
return (Some(EventSet::all()), None)
|
||||
},
|
||||
Err(e) => panic!("unexpected error accepting: {}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// this should never happen
|
||||
return (Some(EventSet::all()), None)
|
||||
}
|
||||
|
||||
fn get_socket(&self) -> &Evented {
|
||||
return self as &Evented
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unreachable_code)] // this has been deprecated
|
||||
impl DnsHandler for (UdpSocket, VecDeque<UdpHandler>) {
|
||||
fn handle(&mut self, events: EventSet, catalog: &Arc<Catalog>) -> (Option<EventSet>, Option<(DnsHandlerType, EventSet)>) {
|
||||
let ref socket = self.0;
|
||||
let ref mut requests = self.1;
|
||||
|
||||
if events.is_error() {
|
||||
panic!("unexpected socket error: {:?}", socket)
|
||||
} else if events.is_hup() {
|
||||
panic!("unexpected socket hangup: {:?}", socket)
|
||||
} else {
|
||||
let mut next_event: EventSet = EventSet::all();
|
||||
|
||||
// process the responses before the requests...
|
||||
if events.is_writable() {
|
||||
let mut remove: Vec<usize> = Vec::new();
|
||||
|
||||
// send all the data for the incomplete requests
|
||||
for (i, req) in requests.iter().enumerate() {
|
||||
match req.handle_message (&socket, events) {
|
||||
Ok(UdpState::Done) => {
|
||||
// complete, remove
|
||||
remove.push(i);
|
||||
},
|
||||
Ok(..) => {
|
||||
// Noop, request not complete
|
||||
},
|
||||
Err(ref e) if io::ErrorKind::WouldBlock == e.kind() => {
|
||||
// this is expected with the connection would block
|
||||
// noop
|
||||
},
|
||||
Err(e) => {
|
||||
// shutdown the connection, remove it.
|
||||
warn!("error writing socket: {:?} error: {}", socket, e);
|
||||
// TODO: do we need to shutdown the stream?
|
||||
remove.push(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// remove the complete requests
|
||||
for i in remove {
|
||||
requests.remove(i);
|
||||
// TODO might want to compress the list here, as it could become a leak after a large
|
||||
// set of requests.
|
||||
}
|
||||
|
||||
if requests.is_empty() {
|
||||
next_event = !EventSet::writable();
|
||||
}
|
||||
}
|
||||
|
||||
// now process the incoming requests
|
||||
if events.is_readable() {
|
||||
// collect new requests
|
||||
// TODO: could a ton of inbound requests starve the server
|
||||
for _ in 0..100 {
|
||||
if let Some(handler) = UdpHandler::new_server(&socket, catalog.clone()) {
|
||||
// this is a new request for a UDP transaction
|
||||
// let the handler read, etc.
|
||||
requests.push_back(handler);
|
||||
|
||||
next_event = EventSet::all();
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return (Some(next_event), None)
|
||||
}
|
||||
|
||||
return (Some(EventSet::all()), None)
|
||||
}
|
||||
|
||||
fn get_socket(&self) -> &Evented {
|
||||
return &self.0 as &Evented
|
||||
}
|
||||
}
|
||||
|
||||
impl DnsHandler for TcpHandler {
|
||||
#[allow(deprecated)]
|
||||
fn handle(&mut self, events: EventSet, catalog: &Arc<Catalog>) -> (Option<EventSet>, Option<(DnsHandlerType, EventSet)>) {
|
||||
if events.is_error() {
|
||||
warn!("closing, error from: {:?}", self.get_stream());
|
||||
// TODO: do we need to shutdown the stream?
|
||||
return (None, None);
|
||||
} else if events.is_hup() {
|
||||
info!("client hungup: {:?}", self.get_stream());
|
||||
// TODO: do we need to shutdown the stream?
|
||||
return (None, None);
|
||||
} else if events.is_readable() || events.is_writable() {
|
||||
let mut process_resquest = false;
|
||||
// the handler will deal with the rest of the connection, we need to check the return value
|
||||
// for an error with wouldblock, this means that the handler couldn't complete the request.
|
||||
match self.handle_message(events) {
|
||||
Ok(TcpState::Done) => {
|
||||
// reset, the client will close the connection according to the spec
|
||||
self.reset();
|
||||
debug!("TcpState::Done");
|
||||
},
|
||||
Ok(TcpState::WillWriteLength) => {
|
||||
// this means that we have gotten through recieving a packet
|
||||
process_resquest = true;
|
||||
debug!("TcpState::WillWriteLength");
|
||||
}
|
||||
Ok(..) => {
|
||||
// registering the event to only wake up on the correct event
|
||||
// this reduces looping on states like writable that can remain set for a long time
|
||||
debug!("reregistering for next call: {:?}", self.get_events());
|
||||
},
|
||||
Err(ref e) if io::ErrorKind::WouldBlock == e.kind() => {
|
||||
// this is expected with the connection would block
|
||||
// noop
|
||||
},
|
||||
Err(e) => {
|
||||
// shutdown the connection, remove it.
|
||||
warn!("connection: {:?} shutdown on error: {}", self.get_stream(), e);
|
||||
// TODO: do we need to shutdown the stream?
|
||||
return (None, None);
|
||||
}
|
||||
}
|
||||
|
||||
// need to process the response
|
||||
if process_resquest {
|
||||
let response = Server::process_request(self.get_buffer(), self.get_stream(), catalog.as_ref());
|
||||
if Server::encode_message(response, self.get_buffer_mut()).is_err() {
|
||||
warn!("could not encode message to: {:?}", self.get_stream());
|
||||
return (None, None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
debug!("reregistering for next call: {:?}", self.get_events());
|
||||
return (Some(self.get_events()), None)
|
||||
}
|
||||
|
||||
fn get_socket(&self) -> &Evented {
|
||||
return self.get_stream() as &Evented
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
impl Handler for Server {
|
||||
type Timeout = Token; // Timeouts are registered with tokens.
|
||||
type Message = ();
|
||||
|
||||
fn ready(&mut self, event_loop: &mut EventLoop<Self>, token: Token, events: EventSet) {
|
||||
let mut remove_token: Option<Token> = None;
|
||||
let mut add_handler: Option<(DnsHandlerType, EventSet)> = None;
|
||||
|
||||
// The token should always exist
|
||||
if let Some(mut handler) = self.handlers.get_mut(&token) {
|
||||
// the handler will perform the lookup or other actions.
|
||||
// if none is returned for event_set_opt, the handler will be revmoed
|
||||
let (event_set_opt, add) = handler.handle(events, &self.catalog);
|
||||
|
||||
// this represents a new handler to watch
|
||||
add_handler = add;
|
||||
|
||||
// given the new event_set option, reregister the socket
|
||||
if let Some(event_set) = event_set_opt {
|
||||
let socket: &Evented = handler.get_socket();
|
||||
if let Err(err) = event_loop.reregister(socket, token, event_set, PollOpt::all()) {
|
||||
// removing the socket in case of an error
|
||||
warn!("cound not reregister {:?}: {}", token, err);
|
||||
remove_token = Some(token);
|
||||
}
|
||||
} else {
|
||||
remove_token = Some(token);
|
||||
}
|
||||
}
|
||||
|
||||
// unregister the token
|
||||
remove_token.and_then(|token| {
|
||||
self.handlers.remove(&token)
|
||||
}).and_then(|handler| {
|
||||
event_loop.deregister(handler.get_socket()).unwrap_or_else(|e| debug!("error deregistering: {}", e));
|
||||
Some(())
|
||||
});
|
||||
|
||||
// need to register a new handler if there was one.
|
||||
if let Some((handler, event_set)) = add_handler {
|
||||
let register_res: io::Result<Token> = {
|
||||
let socket: &Evented = handler.get_socket();
|
||||
|
||||
let next_token = self.next_token();
|
||||
event_loop.register(socket, next_token, event_set, PollOpt::all()).map(|_| next_token)
|
||||
};
|
||||
|
||||
match register_res {
|
||||
Ok(token) => { self.handlers.insert(token, handler); },
|
||||
Err(err) => warn!("error registering handler: {}", err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn interrupted(&mut self, event_loop: &mut EventLoop<Self>) {
|
||||
warn!("server interrupted, shutting down");
|
||||
event_loop.shutdown();
|
||||
// self.error = Some(Err(io::Error::new(io::ErrorKind::Interrupted, format!("interrupted"))));
|
||||
}
|
||||
}
|
@ -1,61 +1,50 @@
|
||||
extern crate chrono;
|
||||
extern crate futures;
|
||||
extern crate openssl;
|
||||
extern crate tokio_core;
|
||||
extern crate trust_dns;
|
||||
extern crate trust_dns_server;
|
||||
|
||||
use std::io;
|
||||
use std::net::*;
|
||||
use std::fmt;
|
||||
|
||||
use chrono::Duration;
|
||||
use futures::Future;
|
||||
use openssl::rsa::Rsa;
|
||||
use tokio_core::reactor::Core;
|
||||
|
||||
#[allow(deprecated)]
|
||||
use trust_dns::client::{Client, ClientConnection};
|
||||
use trust_dns::error::*;
|
||||
use trust_dns::client::{Client, ClientConnection, ClientStreamHandle, SecureSyncClient, SyncClient};
|
||||
use trust_dns::op::*;
|
||||
use trust_dns::rr::{DNSClass, Record, RecordType, domain, RData};
|
||||
use trust_dns::rr::dnssec::{Algorithm, KeyPair, Signer, TrustAnchor};
|
||||
use trust_dns::rr::rdata::*;
|
||||
use trust_dns::serialize::binary::{BinDecoder, BinEncoder, BinSerializable};
|
||||
use trust_dns::tcp::TcpClientConnection;
|
||||
use trust_dns::udp::UdpClientConnection;
|
||||
|
||||
use trust_dns_server::authority::Catalog;
|
||||
|
||||
mod common;
|
||||
use common::TestClientStream;
|
||||
use common::authority::{create_example, create_secure_example};
|
||||
|
||||
pub struct TestClientConnection<'a> {
|
||||
catalog: &'a Catalog
|
||||
pub struct TestClientConnection {
|
||||
catalog: Catalog
|
||||
}
|
||||
|
||||
impl<'a> TestClientConnection<'a> {
|
||||
pub fn new(catalog: &'a Catalog) -> TestClientConnection<'a> {
|
||||
impl TestClientConnection {
|
||||
pub fn new(catalog: Catalog) -> TestClientConnection {
|
||||
TestClientConnection { catalog: catalog }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> ClientConnection for TestClientConnection<'a> {
|
||||
fn send(&mut self, bytes: Vec<u8>) -> ClientResult<Vec<u8>> {
|
||||
let mut decoder = BinDecoder::new(&bytes);
|
||||
impl ClientConnection for TestClientConnection {
|
||||
type MessageStream = TestClientStream;
|
||||
|
||||
let message = try!(Message::read(&mut decoder));
|
||||
let response = self.catalog.handle_request(&message);
|
||||
|
||||
let mut buf = Vec::with_capacity(512);
|
||||
{
|
||||
let mut encoder = BinEncoder::new(&mut buf);
|
||||
try!(response.emit(&mut encoder));
|
||||
}
|
||||
|
||||
Ok(buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> fmt::Debug for TestClientConnection<'a> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "TestClientConnection catalog")
|
||||
fn unwrap(self) -> (Core, Box<Future<Item=Self::MessageStream, Error=io::Error>>, Box<ClientStreamHandle>) {
|
||||
let io_loop = Core::new().unwrap();
|
||||
let (stream, handle) = TestClientStream::new(self.catalog);
|
||||
(io_loop, stream, handle)
|
||||
}
|
||||
}
|
||||
|
||||
@ -66,7 +55,7 @@ fn test_query_nonet() {
|
||||
let mut catalog = Catalog::new();
|
||||
catalog.upsert(authority.get_origin().clone(), authority);
|
||||
|
||||
let client = Client::new(TestClientConnection::new(&catalog));
|
||||
let client = SyncClient::new(TestClientConnection::new(catalog));
|
||||
|
||||
test_query(client);
|
||||
}
|
||||
@ -77,7 +66,7 @@ fn test_query_nonet() {
|
||||
fn test_query_udp() {
|
||||
let addr: SocketAddr = ("8.8.8.8",53).to_socket_addrs().unwrap().next().unwrap();
|
||||
let conn = UdpClientConnection::new(addr).unwrap();
|
||||
let client = Client::new(conn);
|
||||
let client = SyncClient::new(conn);
|
||||
|
||||
test_query(client);
|
||||
}
|
||||
@ -88,13 +77,13 @@ fn test_query_udp() {
|
||||
fn test_query_tcp() {
|
||||
let addr: SocketAddr = ("8.8.8.8",53).to_socket_addrs().unwrap().next().unwrap();
|
||||
let conn = TcpClientConnection::new(addr).unwrap();
|
||||
let client = Client::new(conn);
|
||||
let client = SyncClient::new(conn);
|
||||
|
||||
test_query(client);
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
fn test_query<C: ClientConnection>(client: Client<C>) {
|
||||
fn test_query(client: SyncClient) {
|
||||
use std::cmp::Ordering;
|
||||
let name = domain::Name::with_labels(vec!["WWW".to_string(), "example".to_string(), "com".to_string()]);
|
||||
|
||||
@ -136,7 +125,7 @@ fn test_secure_query_example_nonet() {
|
||||
let mut catalog = Catalog::new();
|
||||
catalog.upsert(authority.get_origin().clone(), authority);
|
||||
|
||||
let client = Client::with_trust_anchor(TestClientConnection::new(&catalog), trust_anchor);
|
||||
let client = SecureSyncClient::new(TestClientConnection::new(catalog)).trust_anchor(trust_anchor).build();
|
||||
|
||||
test_secure_query_example(client);
|
||||
}
|
||||
@ -147,7 +136,7 @@ fn test_secure_query_example_nonet() {
|
||||
fn test_secure_query_example_udp() {
|
||||
let addr: SocketAddr = ("8.8.8.8",53).to_socket_addrs().unwrap().next().unwrap();
|
||||
let conn = UdpClientConnection::new(addr).unwrap();
|
||||
let client = Client::new(conn);
|
||||
let client = SecureSyncClient::new(conn).build();
|
||||
|
||||
test_secure_query_example(client);
|
||||
}
|
||||
@ -158,13 +147,13 @@ fn test_secure_query_example_udp() {
|
||||
fn test_secure_query_example_tcp() {
|
||||
let addr: SocketAddr = ("8.8.8.8",53).to_socket_addrs().unwrap().next().unwrap();
|
||||
let conn = TcpClientConnection::new(addr).unwrap();
|
||||
let client = Client::new(conn);
|
||||
let client = SecureSyncClient::new(conn).build();
|
||||
|
||||
test_secure_query_example(client);
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
fn test_secure_query_example<C: ClientConnection>(client: Client<C>) {
|
||||
fn test_secure_query_example(client: SecureSyncClient) {
|
||||
let name = domain::Name::with_labels(vec!["www".to_string(), "example".to_string(), "com".to_string()]);
|
||||
let response = client.secure_query(&name, DNSClass::IN, RecordType::A);
|
||||
|
||||
@ -191,7 +180,7 @@ fn test_secure_query_example<C: ClientConnection>(client: Client<C>) {
|
||||
#[ignore]
|
||||
#[allow(deprecated)]
|
||||
fn test_dnssec_rollernet_td_udp() {
|
||||
let c = Client::new(UdpClientConnection::new("8.8.8.8:53".parse().unwrap()).unwrap());
|
||||
let c = SecureSyncClient::new(UdpClientConnection::new("8.8.8.8:53".parse().unwrap()).unwrap()).build();
|
||||
c.secure_query(
|
||||
&domain::Name::parse("rollernet.us.", None).unwrap(),
|
||||
DNSClass::IN,
|
||||
@ -203,7 +192,7 @@ fn test_dnssec_rollernet_td_udp() {
|
||||
#[ignore]
|
||||
#[allow(deprecated)]
|
||||
fn test_dnssec_rollernet_td_tcp() {
|
||||
let c = Client::new(TcpClientConnection::new("8.8.8.8:53".parse().unwrap()).unwrap());
|
||||
let c = SecureSyncClient::new(TcpClientConnection::new("8.8.8.8:53".parse().unwrap()).unwrap()).build();
|
||||
c.secure_query(
|
||||
&domain::Name::parse("rollernet.us.", None).unwrap(),
|
||||
DNSClass::IN,
|
||||
@ -215,7 +204,7 @@ fn test_dnssec_rollernet_td_tcp() {
|
||||
#[ignore]
|
||||
#[allow(deprecated)]
|
||||
fn test_dnssec_rollernet_td_tcp_mixed_case() {
|
||||
let c = Client::new(TcpClientConnection::new("8.8.8.8:53".parse().unwrap()).unwrap());
|
||||
let c = SecureSyncClient::new(TcpClientConnection::new("8.8.8.8:53".parse().unwrap()).unwrap()).build();
|
||||
c.secure_query(
|
||||
&domain::Name::parse("RollErnet.Us.", None).unwrap(),
|
||||
DNSClass::IN,
|
||||
@ -241,9 +230,8 @@ fn test_nsec_query_example_nonet() {
|
||||
let mut catalog = Catalog::new();
|
||||
catalog.upsert(authority.get_origin().clone(), authority);
|
||||
|
||||
let client = Client::with_trust_anchor(TestClientConnection::new(&catalog), trust_anchor);
|
||||
|
||||
test_nsec_query_example(client);
|
||||
let client = SecureSyncClient::new(TestClientConnection::new(catalog)).trust_anchor(trust_anchor).build();
|
||||
test_nsec_query_example::<TestClientConnection>(client);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -252,8 +240,8 @@ fn test_nsec_query_example_nonet() {
|
||||
fn test_nsec_query_example_udp() {
|
||||
let addr: SocketAddr = ("8.8.8.8",53).to_socket_addrs().unwrap().next().unwrap();
|
||||
let conn = UdpClientConnection::new(addr).unwrap();
|
||||
let client = Client::new(conn);
|
||||
test_nsec_query_example(client);
|
||||
let client = SecureSyncClient::new(conn).build();
|
||||
test_nsec_query_example::<UdpClientConnection>(client);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -262,12 +250,12 @@ fn test_nsec_query_example_udp() {
|
||||
fn test_nsec_query_example_tcp() {
|
||||
let addr: SocketAddr = ("8.8.8.8",53).to_socket_addrs().unwrap().next().unwrap();
|
||||
let conn = TcpClientConnection::new(addr).unwrap();
|
||||
let client = Client::new(conn);
|
||||
test_nsec_query_example(client);
|
||||
let client = SecureSyncClient::new(conn).build();
|
||||
test_nsec_query_example::<TcpClientConnection>(client);
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
fn test_nsec_query_example<C: ClientConnection>(client: Client<C>) {
|
||||
fn test_nsec_query_example<C: ClientConnection>(client: SecureSyncClient) {
|
||||
let name = domain::Name::with_labels(vec!["none".to_string(), "example".to_string(), "com".to_string()]);
|
||||
|
||||
let response = client.secure_query(&name, DNSClass::IN, RecordType::A);
|
||||
@ -286,7 +274,7 @@ fn test_nsec_query_type() {
|
||||
|
||||
let addr: SocketAddr = ("8.8.8.8",53).to_socket_addrs().unwrap().next().unwrap();
|
||||
let conn = TcpClientConnection::new(addr).unwrap();
|
||||
let client = Client::new(conn);
|
||||
let client = SecureSyncClient::new(conn).build();
|
||||
|
||||
let response = client.secure_query(&name, DNSClass::IN, RecordType::NS);
|
||||
assert!(response.is_ok(), "query failed: {}", response.unwrap_err());
|
||||
@ -333,7 +321,7 @@ fn test_nsec_query_type() {
|
||||
// }
|
||||
|
||||
#[allow(deprecated)]
|
||||
fn create_sig0_ready_client<'a>(catalog: &'a mut Catalog) -> (Client<TestClientConnection<'a>>, Signer, domain::Name) {
|
||||
fn create_sig0_ready_client(mut catalog: Catalog) -> (SyncClient, domain::Name) {
|
||||
let mut authority = create_example();
|
||||
authority.set_allow_update(true);
|
||||
let origin = authority.get_origin().clone();
|
||||
@ -353,15 +341,15 @@ fn create_sig0_ready_client<'a>(catalog: &'a mut Catalog) -> (Client<TestClientC
|
||||
authority.upsert(auth_key, 0);
|
||||
|
||||
catalog.upsert(authority.get_origin().clone(), authority);
|
||||
let client = Client::new(TestClientConnection::new(catalog));
|
||||
let client = SyncClient::with_signer(TestClientConnection::new(catalog), signer);
|
||||
|
||||
(client, signer, origin)
|
||||
(client, origin)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_create() {
|
||||
let mut catalog = Catalog::new();
|
||||
let (client, signer, origin) = create_sig0_ready_client(&mut catalog);
|
||||
let catalog = Catalog::new();
|
||||
let (client, origin) = create_sig0_ready_client(catalog);
|
||||
|
||||
// create a record
|
||||
let mut record = Record::with(domain::Name::with_labels(vec!["new".to_string(), "example".to_string(), "com".to_string()]),
|
||||
@ -370,7 +358,7 @@ fn test_create() {
|
||||
record.rdata(RData::A(Ipv4Addr::new(100,10,100,10)));
|
||||
|
||||
|
||||
let result = client.create(record.clone(), origin.clone(), &signer).expect("create failed");
|
||||
let result = client.create(record.clone(), origin.clone()).expect("create failed");
|
||||
assert_eq!(result.get_response_code(), ResponseCode::NoError);
|
||||
let result = client.query(record.get_name(), record.get_dns_class(), record.get_rr_type()).expect("query failed");
|
||||
assert_eq!(result.get_response_code(), ResponseCode::NoError);
|
||||
@ -379,22 +367,22 @@ fn test_create() {
|
||||
|
||||
// trying to create again should error
|
||||
// TODO: it would be cool to make this
|
||||
let result = client.create(record.clone(), origin.clone(), &signer).expect("create failed");
|
||||
let result = client.create(record.clone(), origin.clone()).expect("create failed");
|
||||
assert_eq!(result.get_response_code(), ResponseCode::YXRRSet);
|
||||
|
||||
// will fail if already set and not the same value.
|
||||
let mut record = record.clone();
|
||||
record.rdata(RData::A(Ipv4Addr::new(101,11,101,11)));
|
||||
|
||||
let result = client.create(record.clone(), origin.clone(), &signer).expect("create failed");
|
||||
let result = client.create(record.clone(), origin.clone()).expect("create failed");
|
||||
assert_eq!(result.get_response_code(), ResponseCode::YXRRSet);
|
||||
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_append() {
|
||||
let mut catalog = Catalog::new();
|
||||
let (client, signer, origin) = create_sig0_ready_client(&mut catalog);
|
||||
let catalog = Catalog::new();
|
||||
let (client, origin) = create_sig0_ready_client(catalog);
|
||||
|
||||
// append a record
|
||||
let mut record = Record::with(domain::Name::with_labels(vec!["new".to_string(), "example".to_string(), "com".to_string()]),
|
||||
@ -403,11 +391,11 @@ fn test_append() {
|
||||
record.rdata(RData::A(Ipv4Addr::new(100,10,100,10)));
|
||||
|
||||
// first check the must_exist option
|
||||
let result = client.append(record.clone(), origin.clone(), true, &signer).expect("append failed");
|
||||
let result = client.append(record.clone(), origin.clone(), true).expect("append failed");
|
||||
assert_eq!(result.get_response_code(), ResponseCode::NXRRSet);
|
||||
|
||||
// next append to a non-existent RRset
|
||||
let result = client.append(record.clone(), origin.clone(), false, &signer).expect("append failed");
|
||||
let result = client.append(record.clone(), origin.clone(), false).expect("append failed");
|
||||
assert_eq!(result.get_response_code(), ResponseCode::NoError);
|
||||
|
||||
// verify record contents
|
||||
@ -420,7 +408,7 @@ fn test_append() {
|
||||
let mut record = record.clone();
|
||||
record.rdata(RData::A(Ipv4Addr::new(101,11,101,11)));
|
||||
|
||||
let result = client.append(record.clone(), origin.clone(), true, &signer).expect("create failed");
|
||||
let result = client.append(record.clone(), origin.clone(), true).expect("create failed");
|
||||
assert_eq!(result.get_response_code(), ResponseCode::NoError);
|
||||
|
||||
let result = client.query(record.get_name(), record.get_dns_class(), record.get_rr_type()).expect("query failed");
|
||||
@ -431,7 +419,7 @@ fn test_append() {
|
||||
assert!(result.get_answers().iter().any(|rr| if let &RData::A(ref ip) = rr.get_rdata() { *ip == Ipv4Addr::new(101,11,101,11) } else { false }));
|
||||
|
||||
// show that appending the same thing again is ok, but doesn't add any records
|
||||
let result = client.append(record.clone(), origin.clone(), true, &signer).expect("create failed");
|
||||
let result = client.append(record.clone(), origin.clone(), true).expect("create failed");
|
||||
assert_eq!(result.get_response_code(), ResponseCode::NoError);
|
||||
|
||||
let result = client.query(record.get_name(), record.get_dns_class(), record.get_rr_type()).expect("query failed");
|
||||
@ -441,8 +429,8 @@ fn test_append() {
|
||||
|
||||
#[test]
|
||||
fn test_compare_and_swap() {
|
||||
let mut catalog = Catalog::new();
|
||||
let (client, signer, origin) = create_sig0_ready_client(&mut catalog);
|
||||
let catalog = Catalog::new();
|
||||
let (client, origin) = create_sig0_ready_client(catalog);
|
||||
|
||||
// create a record
|
||||
let mut record = Record::with(domain::Name::with_labels(vec!["new".to_string(), "example".to_string(), "com".to_string()]),
|
||||
@ -450,14 +438,14 @@ fn test_compare_and_swap() {
|
||||
Duration::minutes(5).num_seconds() as u32);
|
||||
record.rdata(RData::A(Ipv4Addr::new(100,10,100,10)));
|
||||
|
||||
let result = client.create(record.clone(), origin.clone(), &signer).expect("create failed");
|
||||
let result = client.create(record.clone(), origin.clone()).expect("create failed");
|
||||
assert_eq!(result.get_response_code(), ResponseCode::NoError);
|
||||
|
||||
let current = record;
|
||||
let mut new = current.clone();
|
||||
new.rdata(RData::A(Ipv4Addr::new(101,11,101,11)));
|
||||
|
||||
let result = client.compare_and_swap(current.clone(), new.clone(), origin.clone(), &signer).expect("compare_and_swap failed");
|
||||
let result = client.compare_and_swap(current.clone(), new.clone(), origin.clone()).expect("compare_and_swap failed");
|
||||
assert_eq!(result.get_response_code(), ResponseCode::NoError);
|
||||
|
||||
let result = client.query(new.get_name(), new.get_dns_class(), new.get_rr_type()).expect("query failed");
|
||||
@ -469,7 +457,7 @@ fn test_compare_and_swap() {
|
||||
let mut new = new;
|
||||
new.rdata(RData::A(Ipv4Addr::new(102,12,102,12)));
|
||||
|
||||
let result = client.compare_and_swap(current, new.clone(), origin.clone(), &signer).expect("compare_and_swap failed");
|
||||
let result = client.compare_and_swap(current, new.clone(), origin.clone()).expect("compare_and_swap failed");
|
||||
assert_eq!(result.get_response_code(), ResponseCode::NXRRSet);
|
||||
|
||||
let result = client.query(new.get_name(), new.get_dns_class(), new.get_rr_type()).expect("query failed");
|
||||
@ -480,8 +468,8 @@ fn test_compare_and_swap() {
|
||||
|
||||
#[test]
|
||||
fn test_delete_by_rdata() {
|
||||
let mut catalog = Catalog::new();
|
||||
let (client, signer, origin) = create_sig0_ready_client(&mut catalog);
|
||||
let catalog = Catalog::new();
|
||||
let (client, origin) = create_sig0_ready_client(catalog);
|
||||
|
||||
// append a record
|
||||
let mut record = Record::with(domain::Name::with_labels(vec!["new".to_string(), "example".to_string(), "com".to_string()]),
|
||||
@ -490,20 +478,20 @@ fn test_delete_by_rdata() {
|
||||
record.rdata(RData::A(Ipv4Addr::new(100,10,100,10)));
|
||||
|
||||
// first check the must_exist option
|
||||
let result = client.delete_by_rdata(record.clone(), origin.clone(), &signer).expect("delete failed");
|
||||
let result = client.delete_by_rdata(record.clone(), origin.clone()).expect("delete failed");
|
||||
assert_eq!(result.get_response_code(), ResponseCode::NoError);
|
||||
|
||||
// next create to a non-existent RRset
|
||||
let result = client.create(record.clone(), origin.clone(), &signer).expect("create failed");
|
||||
let result = client.create(record.clone(), origin.clone()).expect("create failed");
|
||||
assert_eq!(result.get_response_code(), ResponseCode::NoError);
|
||||
|
||||
let mut record = record.clone();
|
||||
record.rdata(RData::A(Ipv4Addr::new(101,11,101,11)));
|
||||
let result = client.append(record.clone(), origin.clone(), true, &signer).expect("create failed");
|
||||
let result = client.append(record.clone(), origin.clone(), true).expect("create failed");
|
||||
assert_eq!(result.get_response_code(), ResponseCode::NoError);
|
||||
|
||||
// verify record contents
|
||||
let result = client.delete_by_rdata(record.clone(), origin.clone(), &signer).expect("delete failed");
|
||||
let result = client.delete_by_rdata(record.clone(), origin.clone()).expect("delete failed");
|
||||
assert_eq!(result.get_response_code(), ResponseCode::NoError);
|
||||
|
||||
let result = client.query(record.get_name(), record.get_dns_class(), record.get_rr_type()).expect("query failed");
|
||||
@ -514,8 +502,8 @@ fn test_delete_by_rdata() {
|
||||
|
||||
#[test]
|
||||
fn test_delete_rrset() {
|
||||
let mut catalog = Catalog::new();
|
||||
let (client, signer, origin) = create_sig0_ready_client(&mut catalog);
|
||||
let catalog = Catalog::new();
|
||||
let (client, origin) = create_sig0_ready_client(catalog);
|
||||
|
||||
// append a record
|
||||
let mut record = Record::with(domain::Name::with_labels(vec!["new".to_string(), "example".to_string(), "com".to_string()]),
|
||||
@ -524,20 +512,20 @@ fn test_delete_rrset() {
|
||||
record.rdata(RData::A(Ipv4Addr::new(100,10,100,10)));
|
||||
|
||||
// first check the must_exist option
|
||||
let result = client.delete_rrset(record.clone(), origin.clone(), &signer).expect("delete failed");
|
||||
let result = client.delete_rrset(record.clone(), origin.clone()).expect("delete failed");
|
||||
assert_eq!(result.get_response_code(), ResponseCode::NoError);
|
||||
|
||||
// next create to a non-existent RRset
|
||||
let result = client.create(record.clone(), origin.clone(), &signer).expect("create failed");
|
||||
let result = client.create(record.clone(), origin.clone()).expect("create failed");
|
||||
assert_eq!(result.get_response_code(), ResponseCode::NoError);
|
||||
|
||||
let mut record = record.clone();
|
||||
record.rdata(RData::A(Ipv4Addr::new(101,11,101,11)));
|
||||
let result = client.append(record.clone(), origin.clone(), true, &signer).expect("create failed");
|
||||
let result = client.append(record.clone(), origin.clone(), true).expect("create failed");
|
||||
assert_eq!(result.get_response_code(), ResponseCode::NoError);
|
||||
|
||||
// verify record contents
|
||||
let result = client.delete_rrset(record.clone(), origin.clone(), &signer).expect("delete failed");
|
||||
let result = client.delete_rrset(record.clone(), origin.clone()).expect("delete failed");
|
||||
assert_eq!(result.get_response_code(), ResponseCode::NoError);
|
||||
|
||||
let result = client.query(record.get_name(), record.get_dns_class(), record.get_rr_type()).expect("query failed");
|
||||
@ -547,8 +535,8 @@ fn test_delete_rrset() {
|
||||
|
||||
#[test]
|
||||
fn test_delete_all() {
|
||||
let mut catalog = Catalog::new();
|
||||
let (client, signer, origin) = create_sig0_ready_client(&mut catalog);
|
||||
let catalog = Catalog::new();
|
||||
let (client, origin) = create_sig0_ready_client(catalog);
|
||||
|
||||
// append a record
|
||||
let mut record = Record::with(domain::Name::with_labels(vec!["new".to_string(), "example".to_string(), "com".to_string()]),
|
||||
@ -557,21 +545,21 @@ fn test_delete_all() {
|
||||
record.rdata(RData::A(Ipv4Addr::new(100,10,100,10)));
|
||||
|
||||
// first check the must_exist option
|
||||
let result = client.delete_all(record.get_name().clone(), origin.clone(), DNSClass::IN, &signer).expect("delete failed");
|
||||
let result = client.delete_all(record.get_name().clone(), origin.clone(), DNSClass::IN).expect("delete failed");
|
||||
assert_eq!(result.get_response_code(), ResponseCode::NoError);
|
||||
|
||||
// next create to a non-existent RRset
|
||||
let result = client.create(record.clone(), origin.clone(), &signer).expect("create failed");
|
||||
let result = client.create(record.clone(), origin.clone()).expect("create failed");
|
||||
assert_eq!(result.get_response_code(), ResponseCode::NoError);
|
||||
|
||||
let mut record = record.clone();
|
||||
record.rr_type(RecordType::AAAA);
|
||||
record.rdata(RData::AAAA(Ipv6Addr::new(1, 2, 3, 4, 5, 6, 7, 8)));
|
||||
let result = client.create(record.clone(), origin.clone(), &signer).expect("create failed");
|
||||
let result = client.create(record.clone(), origin.clone()).expect("create failed");
|
||||
assert_eq!(result.get_response_code(), ResponseCode::NoError);
|
||||
|
||||
// verify record contents
|
||||
let result = client.delete_all(record.get_name().clone(), origin.clone(), DNSClass::IN, &signer).expect("delete failed");
|
||||
let result = client.delete_all(record.get_name().clone(), origin.clone(), DNSClass::IN).expect("delete failed");
|
||||
assert_eq!(result.get_response_code(), ResponseCode::NoError);
|
||||
|
||||
let result = client.query(record.get_name(), record.get_dns_class(), RecordType::A).expect("query failed");
|
||||
|
@ -1,14 +1,16 @@
|
||||
extern crate chrono;
|
||||
extern crate futures;
|
||||
extern crate mio;
|
||||
extern crate openssl;
|
||||
extern crate trust_dns;
|
||||
extern crate trust_dns_server;
|
||||
|
||||
use std::io;
|
||||
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, UdpSocket, TcpListener};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::stream::Stream;
|
||||
|
||||
use trust_dns::client::*;
|
||||
use trust_dns::op::*;
|
||||
use trust_dns::rr::*;
|
||||
@ -31,8 +33,7 @@ fn test_server_www_udp() {
|
||||
|
||||
thread::Builder::new().name("test_server:udp:server".to_string()).spawn(move || server_thread_udp(udp_socket)).unwrap();
|
||||
|
||||
let client_conn = UdpClientConnection::new(ipaddr).unwrap();
|
||||
let client_thread = thread::Builder::new().name("test_server:udp:client".to_string()).spawn(move || client_thread_www(client_conn)).unwrap();
|
||||
let client_thread = thread::Builder::new().name("test_server:udp:client".to_string()).spawn(move || client_thread_www(lazy_udp_client(ipaddr))).unwrap();
|
||||
|
||||
let client_result = client_thread.join();
|
||||
// let server_result = server_thread.join();
|
||||
@ -51,8 +52,7 @@ fn test_server_www_tcp() {
|
||||
|
||||
thread::Builder::new().name("test_server:tcp:server".to_string()).spawn(move || server_thread_tcp(tcp_listener)).unwrap();
|
||||
|
||||
let client_conn = TcpClientConnection::new(ipaddr).unwrap();
|
||||
let client_thread = thread::Builder::new().name("test_server:tcp:client".to_string()).spawn(move || client_thread_www(client_conn)).unwrap();
|
||||
let client_thread = thread::Builder::new().name("test_server:tcp:client".to_string()).spawn(move || client_thread_www(lazy_tcp_client(ipaddr))).unwrap();
|
||||
|
||||
let client_result = client_thread.join();
|
||||
// let server_result = server_thread.join();
|
||||
@ -61,11 +61,19 @@ fn test_server_www_tcp() {
|
||||
// assert!(server_result.is_ok(), "server failed: {:?}", server_result);
|
||||
}
|
||||
|
||||
fn lazy_udp_client(ipaddr: SocketAddr) -> UdpClientConnection {
|
||||
UdpClientConnection::new(ipaddr).unwrap()
|
||||
}
|
||||
|
||||
fn lazy_tcp_client(ipaddr: SocketAddr) -> TcpClientConnection {
|
||||
TcpClientConnection::new(ipaddr).unwrap()
|
||||
}
|
||||
|
||||
#[allow(deprecated)] // TODO: for now...
|
||||
fn client_thread_www<C: ClientConnection>(conn: C) {
|
||||
fn client_thread_www<C: ClientConnection>(conn: C)
|
||||
where C::MessageStream: Stream<Item=Vec<u8>, Error=io::Error> + 'static {
|
||||
let name = Name::with_labels(vec!["www".to_string(), "example".to_string(), "com".to_string()]);
|
||||
println!("about to query server: {:?}", conn);
|
||||
let client = Client::new(conn);
|
||||
let client = SyncClient::new(conn);
|
||||
|
||||
let response = client.query(&name, DNSClass::IN, RecordType::A).expect("error querying");
|
||||
|
||||
|
@ -1,128 +0,0 @@
|
||||
extern crate chrono;
|
||||
extern crate futures;
|
||||
extern crate mio;
|
||||
extern crate openssl;
|
||||
extern crate trust_dns;
|
||||
extern crate trust_dns_server;
|
||||
|
||||
use std::thread;
|
||||
use std::net::{SocketAddr, SocketAddrV4, Ipv4Addr};
|
||||
|
||||
use mio::tcp::TcpListener;
|
||||
use mio::udp::UdpSocket;
|
||||
|
||||
#[allow(deprecated)]
|
||||
use trust_dns::client::*;
|
||||
use trust_dns::tcp::TcpClientConnection;
|
||||
use trust_dns::udp::UdpClientConnection;
|
||||
use trust_dns::op::*;
|
||||
use trust_dns::rr::*;
|
||||
#[allow(deprecated)]
|
||||
use trust_dns_server::Server;
|
||||
use trust_dns_server::authority::*;
|
||||
|
||||
mod common;
|
||||
use common::authority::create_example;
|
||||
|
||||
#[test]
|
||||
fn test_server_www_udp() {
|
||||
let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127,0,0,1), 0));
|
||||
let udp_socket = UdpSocket::bound(&addr).unwrap();
|
||||
|
||||
let ipaddr = udp_socket.local_addr().unwrap();
|
||||
println!("udp_socket on port: {}", ipaddr);
|
||||
|
||||
thread::Builder::new().name("test_server:udp:server".to_string()).spawn(move || server_thread_udp(udp_socket)).unwrap();
|
||||
|
||||
let client_conn = UdpClientConnection::new(ipaddr).unwrap();
|
||||
let client_thread = thread::Builder::new().name("test_server:udp:client".to_string()).spawn(move || client_thread_www(client_conn)).unwrap();
|
||||
|
||||
let client_result = client_thread.join();
|
||||
// let server_result = server_thread.join();
|
||||
|
||||
assert!(client_result.is_ok(), "client failed: {:?}", client_result);
|
||||
// assert!(server_result.is_ok(), "server failed: {:?}", server_result);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_server_www_tcp() {
|
||||
use mio::tcp::TcpListener;
|
||||
|
||||
let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127,0,0,1), 0));
|
||||
let tcp_listener = TcpListener::bind(&addr).unwrap();
|
||||
|
||||
let ipaddr = tcp_listener.local_addr().unwrap();
|
||||
println!("tcp_listner on port: {}", ipaddr);
|
||||
|
||||
thread::Builder::new().name("test_server:tcp:server".to_string()).spawn(move || server_thread_tcp(tcp_listener)).unwrap();
|
||||
|
||||
let client_conn = TcpClientConnection::new(ipaddr).unwrap();
|
||||
let client_thread = thread::Builder::new().name("test_server:tcp:client".to_string()).spawn(move || client_thread_www(client_conn)).unwrap();
|
||||
|
||||
let client_result = client_thread.join();
|
||||
// let server_result = server_thread.join();
|
||||
|
||||
assert!(client_result.is_ok(), "client failed: {:?}", client_result);
|
||||
// assert!(server_result.is_ok(), "server failed: {:?}", server_result);
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
#[allow(dead_code)]
|
||||
fn client_thread_www<C: ClientConnection>(conn: C) {
|
||||
let name = Name::with_labels(vec!["www".to_string(), "example".to_string(), "com".to_string()]);
|
||||
println!("about to query server: {:?}", conn);
|
||||
let client = Client::new(conn);
|
||||
|
||||
let response = client.query(&name, DNSClass::IN, RecordType::A).expect("error querying");
|
||||
|
||||
assert!(response.get_response_code() == ResponseCode::NoError, "got an error: {:?}", response.get_response_code());
|
||||
|
||||
let record = &response.get_answers()[0];
|
||||
assert_eq!(record.get_name(), &name);
|
||||
assert_eq!(record.get_rr_type(), RecordType::A);
|
||||
assert_eq!(record.get_dns_class(), DNSClass::IN);
|
||||
|
||||
if let &RData::A(ref address) = record.get_rdata() {
|
||||
assert_eq!(address, &Ipv4Addr::new(93,184,216,34))
|
||||
} else {
|
||||
assert!(false);
|
||||
}
|
||||
|
||||
let mut ns: Vec<_> = response.get_name_servers().to_vec();
|
||||
ns.sort();
|
||||
|
||||
assert_eq!(ns.len(), 2);
|
||||
assert_eq!(ns.first().unwrap().get_rr_type(), RecordType::NS);
|
||||
assert_eq!(ns.first().unwrap().get_rdata(), &RData::NS(Name::parse("a.iana-servers.net.", None).unwrap()) );
|
||||
assert_eq!(ns.last().unwrap().get_rr_type(), RecordType::NS);
|
||||
assert_eq!(ns.last().unwrap().get_rdata(), &RData::NS(Name::parse("b.iana-servers.net.", None).unwrap()) );
|
||||
}
|
||||
|
||||
fn new_catalog() -> Catalog {
|
||||
let example = create_example();
|
||||
let origin = example.get_origin().clone();
|
||||
|
||||
let mut catalog: Catalog = Catalog::new();
|
||||
catalog.upsert(origin.clone(), example);
|
||||
catalog
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
fn server_thread_udp(udp_socket: UdpSocket) {
|
||||
let catalog = new_catalog();
|
||||
|
||||
let mut server = Server::new(catalog);
|
||||
server.register_socket(udp_socket);
|
||||
|
||||
server.listen().unwrap();
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
fn server_thread_tcp(tcp_listener: TcpListener) {
|
||||
let catalog = new_catalog();
|
||||
let mut server = Server::new(catalog);
|
||||
server.register_listener(tcp_listener);
|
||||
|
||||
server.listen().unwrap();
|
||||
}
|
Loading…
Reference in New Issue
Block a user