update to tokio 0.2 release (#932)

* update to tokio 0.2 release

* update to h2 0.2.0

* fix compilation errors

* server await JoinHandles from spawn

* fix integration tests using different Runtimes

* cleanup named test output

* fix openssl tests

* cleanup code from review

* refactor server_future for JoinHandle, also fix Udp deadlock
This commit is contained in:
Benjamin Fry 2019-12-06 00:00:00 -08:00 committed by GitHub
parent 22a274714f
commit 3eaf3043d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
85 changed files with 1144 additions and 1521 deletions

974
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -11,3 +11,6 @@ members = ["crates/client",
"tests/integration-tests",
"bin",
"util"]
[patch.crates-io]
typed-headers = { git = "https://github.com/o01eg/typed-headers", branch = "update-dependencies" }

View File

@ -73,12 +73,10 @@ path = "src/named.rs"
[dependencies]
chrono = "0.4"
clap = "2.33"
futures-preview = "0.3.0-alpha"
futures = "0.3.0"
log = "0.4.8"
rustls = { version = "0.16", optional = true }
tokio = "0.2.0-alpha"
tokio-executor = "0.2.0-alpha"
tokio-net = "0.2.0-alpha"
tokio = { version = "0.2.1", features = ["rt-core", "rt-threaded", "time"] }
trust-dns-client= { version = "0.18.0-alpha", path = "../crates/client" }
trust-dns-openssl = { version = "0.18.0-alpha", path = "../crates/openssl", optional = true }
trust-dns-proto = { version = "0.18.0-alpha", path = "../crates/proto" }
@ -90,4 +88,4 @@ env_logger = "0.7"
native-tls = "0.2"
trust-dns-native-tls = { version = "0.18.0-alpha", path = "../crates/native-tls" }
trust-dns-https = { version = "0.18.0-alpha", path = "../crates/https" }
webpki-roots = { version = "^0.18" }
webpki-roots = { version = "0.18" }

View File

@ -3,7 +3,6 @@
extern crate futures;
extern crate test;
extern crate tokio;
extern crate tokio_net;
extern crate trust_dns_client;
extern crate trust_dns_proto;
@ -22,9 +21,9 @@ use std::time::Duration;
use futures::Future;
use test::Bencher;
use tokio::runtime::current_thread::Runtime;
use tokio_net::tcp::TcpStream;
use tokio_net::udp::UdpSocket;
use tokio::net::TcpStream;
use tokio::net::UdpSocket;
use tokio::runtime::Runtime;
use trust_dns_client::client::*;
use trust_dns_client::op::*;

View File

@ -34,8 +34,6 @@ extern crate log;
#[cfg(feature = "dns-over-rustls")]
extern crate rustls;
extern crate tokio;
extern crate tokio_executor;
extern crate tokio_net;
extern crate trust_dns_client;
#[cfg(feature = "dns-over-openssl")]
extern crate trust_dns_openssl;
@ -45,14 +43,11 @@ extern crate trust_dns_server;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs};
use std::path::{Path, PathBuf};
use std::pin::Pin;
use clap::{Arg, ArgMatches};
use futures::{future, Future};
use tokio::runtime::Runtime;
use tokio::runtime::TaskExecutor;
use tokio_net::tcp::TcpListener;
use tokio_net::udp::UdpSocket;
use tokio::net::TcpListener;
use tokio::net::UdpSocket;
use tokio::runtime::{self, Runtime};
#[cfg(feature = "dnssec")]
use trust_dns_client::rr::rdata::key::KeyUsage;
@ -74,7 +69,7 @@ use trust_dns_server::store::StoreConfig;
fn load_zone(
zone_dir: &Path,
zone_config: &ZoneConfig,
executor: &TaskExecutor,
runtime: &Runtime,
) -> Result<Box<dyn AuthorityObject>, String> {
debug!("loading zone with config: {:#?}", zone_config);
@ -125,7 +120,7 @@ fn load_zone(
Some(StoreConfig::Forward(ref config)) => {
let (forwarder, bg) = ForwardAuthority::try_from_config(zone_name, zone_type, config)?;
executor.spawn(bg);
runtime.spawn(bg);
Box::new(forwarder)
}
#[cfg(feature = "sqlite")]
@ -282,7 +277,7 @@ impl<'a> From<ArgMatches<'a>> for Args {
/// Main method for running the named server.
///
/// `Note`: Tries to avoid panics, in favor of always starting.
pub fn main() {
fn main() {
let args = app_from_crate!()
.arg(
Arg::with_name(QUIET_ARG)
@ -362,8 +357,14 @@ pub fn main() {
.map(PathBuf::from)
.unwrap_or_else(|| directory_config.clone());
let io_loop = Runtime::new().expect("error when creating tokio Runtime");
let executor = io_loop.executor();
// TODO: allow for num threads configured...
let mut runtime = runtime::Builder::new()
.enable_all()
.threaded_scheduler()
.num_threads(4)
.thread_name("trust-dns-server-runtime")
.build()
.expect("failed to initialize Tokio Runtime");
let mut catalog: Catalog = Catalog::new();
// configure our server based on the config_path
for zone in config.get_zones() {
@ -371,7 +372,7 @@ pub fn main() {
.get_zone()
.unwrap_or_else(|_| panic!("bad zone name in {:?}", config_path));
match load_zone(&zone_dir, zone, &executor) {
match load_zone(&zone_dir, zone, &runtime) {
Ok(authority) => catalog.upsert(zone_name.into(), authority),
Err(error) => panic!("could not load zone {}: {}", zone_name, error),
}
@ -396,87 +397,85 @@ pub fn main() {
.iter()
.flat_map(|x| (*x, listen_port).to_socket_addrs().unwrap())
.collect();
let udp_sockets: Vec<UdpSocket> = sockaddrs
.iter()
.map(|x| {
io_loop
.block_on(UdpSocket::bind(x))
.unwrap_or_else(|_| panic!("could not bind to udp: {}", x))
})
.collect();
let tcp_listeners: Vec<TcpListener> = sockaddrs
.iter()
.map(|x| {
io_loop
.block_on(TcpListener::bind(x))
.unwrap_or_else(|_| panic!("could not bind to tcp: {}", x))
})
.collect();
// now, run the server, based on the config
#[cfg_attr(not(feature = "dns-over-tls"), allow(unused_mut))]
let mut server = ServerFuture::new(catalog);
let server_future: Pin<Box<dyn Future<Output = ()> + Send>> =
Box::pin(future::lazy(move |_| {
// load all the listeners
for udp_socket in udp_sockets {
info!("listening for UDP on {:?}", udp_socket);
server.register_socket(udp_socket);
}
// load all the listeners
for udp_socket in &sockaddrs {
info!("listening for UDP on {:?}", udp_socket);
let udp_socket = runtime
.block_on(UdpSocket::bind(udp_socket))
.unwrap_or_else(|_| panic!("could not bind to udp: {}", udp_socket));
server.register_socket(udp_socket, &runtime);
}
// and TCP as necessary
for tcp_listener in tcp_listeners {
info!("listening for TCP on {:?}", tcp_listener);
server
.register_listener(tcp_listener, tcp_request_timeout)
.expect("could not register TCP listener");
}
// and TCP as necessary
for tcp_listener in &sockaddrs {
info!("listening for TCP on {:?}", tcp_listener);
let tcp_listener = runtime
.block_on(TcpListener::bind(tcp_listener))
.unwrap_or_else(|_| panic!("could not bind to tcp: {}", tcp_listener));
server
.register_listener(tcp_listener, tcp_request_timeout, &runtime)
.expect("could not register TCP listener");
}
let tls_cert_config = config.get_tls_cert();
let tls_cert_config = config.get_tls_cert();
// and TLS as necessary
// TODO: we should add some more control from configs to enable/disable TLS/HTTPS
if let Some(_tls_cert_config) = tls_cert_config {
// setup TLS listeners
// TODO: support rustls
#[cfg(feature = "dns-over-tls")]
config_tls(
&args,
&mut server,
&config,
_tls_cert_config,
&zone_dir,
&listen_addrs,
);
// and TLS as necessary
// TODO: we should add some more control from configs to enable/disable TLS/HTTPS
if let Some(_tls_cert_config) = tls_cert_config {
// setup TLS listeners
#[cfg(feature = "dns-over-tls")]
config_tls(
&args,
&mut server,
&config,
_tls_cert_config,
&zone_dir,
&listen_addrs,
&mut runtime,
);
// setup HTTPS listeners
#[cfg(feature = "dns-over-https")]
config_https(
&args,
&mut server,
&config,
_tls_cert_config,
&zone_dir,
&listen_addrs,
);
}
// setup HTTPS listeners
#[cfg(feature = "dns-over-https")]
config_https(
&args,
&mut server,
&config,
_tls_cert_config,
&zone_dir,
&listen_addrs,
&mut runtime,
);
}
// config complete, starting!
banner();
info!("awaiting connections...");
// config complete, starting!
banner();
info!("awaiting connections...");
// TODO: how to do threads? should we do a bunch of listener threads and then query threads?
// Ideally the processing would be n-threads for receiving, which hand off to m-threads for
// request handling. It would generally be the case that n <= m.
info!("Server starting up");
}));
// TODO: how to do threads? should we do a bunch of listener threads and then query threads?
// Ideally the processing would be n-threads for receiving, which hand off to m-threads for
// request handling. It would generally be the case that n <= m.
info!("Server starting up");
match runtime.block_on(server.block_until_done()) {
Ok(()) => {
// we're exiting for some reason...
info!("Trust-DNS {} stopping", trust_dns_client::version());
}
Err(e) => {
let error_msg = format!(
"Trust-DNS {} has encountered an error: {}",
trust_dns_client::version(),
e
);
io_loop.spawn(server_future);
io_loop.shutdown_on_idle();
// we're exiting for some reason...
info!("Trust-DNS {} stopping", trust_dns_client::version());
error!("{}", error_msg);
panic!(error_msg);
}
};
}
#[cfg(feature = "dns-over-tls")]
@ -487,8 +486,8 @@ fn config_tls(
tls_cert_config: &TlsCertConfig,
zone_dir: &Path,
listen_addrs: &[IpAddr],
runtime: &mut Runtime,
) {
use futures::executor::block_on;
use futures::TryFutureExt;
let tls_listen_port: u16 = args
@ -498,19 +497,12 @@ fn config_tls(
.iter()
.flat_map(|x| (*x, tls_listen_port).to_socket_addrs().unwrap())
.collect();
let tls_listeners: Vec<TcpListener> = tls_sockaddrs
.iter()
.map(|x| {
block_on(
TcpListener::bind(x).unwrap_or_else(|_| panic!("could not bind to tls: {}", x)),
)
})
.collect();
if tls_listeners.is_empty() {
if tls_sockaddrs.is_empty() {
warn!("a tls certificate was specified, but no TLS addresses configured to listen on");
}
for tls_listener in tls_listeners {
for tls_listener in &tls_sockaddrs {
info!(
"loading cert for DNS over TLS: {:?}",
tls_cert_config.get_path()
@ -520,8 +512,17 @@ fn config_tls(
.expect("error loading tls certificate file");
info!("listening for TLS on {:?}", tls_listener);
let tls_listener = runtime.block_on(
TcpListener::bind(tls_listener)
.unwrap_or_else(|_| panic!("could not bind to tls: {}", tls_listener)),
);
server
.register_tls_listener(tls_listener, config.get_tcp_request_timeout(), tls_cert)
.register_tls_listener(
tls_listener,
config.get_tcp_request_timeout(),
tls_cert,
&runtime,
)
.expect("could not register TLS listener");
}
}
@ -534,8 +535,8 @@ fn config_https(
tls_cert_config: &TlsCertConfig,
zone_dir: &Path,
listen_addrs: &[IpAddr],
runtime: &mut Runtime,
) {
use futures::executor::block_on;
use futures::TryFutureExt;
let https_listen_port: u16 = args
@ -545,19 +546,12 @@ fn config_https(
.iter()
.flat_map(|x| (*x, https_listen_port).to_socket_addrs().unwrap())
.collect();
let https_listeners: Vec<TcpListener> = https_sockaddrs
.iter()
.map(|x| {
block_on(
TcpListener::bind(x).unwrap_or_else(|_| panic!("could not bind to tls: {}", x)),
)
})
.collect();
if https_listeners.is_empty() {
if https_sockaddrs.is_empty() {
warn!("a tls certificate was specified, but no HTTPS addresses configured to listen on");
}
for https_listener in https_listeners {
for https_listener in &https_sockaddrs {
info!(
"loading cert for DNS over TLS named {} from {:?}",
tls_cert_config.get_endpoint_name(),
@ -568,12 +562,18 @@ fn config_https(
.expect("error loading tls certificate file");
info!("listening for HTTPS on {:?}", https_listener);
let https_listener = runtime.block_on(
TcpListener::bind(https_listener)
.unwrap_or_else(|_| panic!("could not bind to tls: {}", https_listener)),
);
server
.register_https_listener(
https_listener,
config.get_tcp_request_timeout(),
tls_cert,
tls_cert_config.get_endpoint_name().to_string(),
runtime,
)
.expect("could not register TLS listener");
}

View File

@ -29,7 +29,7 @@ use std::net::*;
use std::sync::Arc;
use rustls::{Certificate, ClientConfig, ProtocolVersion, RootCertStore};
use tokio::runtime::current_thread::Runtime;
use tokio::runtime::Runtime;
use trust_dns_client::client::*;
use trust_dns_https::HttpsClientStreamBuilder;
@ -37,8 +37,8 @@ use server_harness::{named_test_harness, query_a};
#[test]
fn test_example_https_toml_startup() {
extern crate env_logger;
env_logger::try_init().ok();
// extern crate env_logger;
// env_logger::try_init().ok();
const ALPN_H2: &[u8] = b"h2";

View File

@ -30,7 +30,7 @@ use std::io::*;
use std::net::*;
use native_tls::Certificate;
use tokio::runtime::current_thread::Runtime;
use tokio::runtime::Runtime;
use trust_dns_client::client::*;
use trust_dns_native_tls::TlsClientStreamBuilder;

View File

@ -29,7 +29,7 @@ use std::sync::Arc;
use rustls::Certificate;
use rustls::ClientConfig;
use tokio::runtime::current_thread::Runtime;
use tokio::runtime::Runtime;
use trust_dns_client::client::*;
use trust_dns_rustls::tls_client_connect;

View File

@ -5,7 +5,6 @@ extern crate futures;
#[macro_use]
extern crate log;
extern crate tokio;
extern crate tokio_net;
extern crate trust_dns_client;
extern crate trust_dns_proto;
@ -18,8 +17,8 @@ use std::net::*;
use std::path::Path;
use futures::Future;
use tokio::runtime::current_thread::Runtime;
use tokio_net::tcp::TcpStream as TokioTcpStream;
use tokio::net::TcpStream as TokioTcpStream;
use tokio::runtime::Runtime;
use trust_dns_client::client::*;
use trust_dns_client::proto::error::ProtoError;

View File

@ -6,12 +6,10 @@
// copied, modified, or distributed except according to those terms.
extern crate chrono;
extern crate env_logger;
extern crate futures;
#[macro_use]
extern crate log;
extern crate tokio;
extern crate tokio_net;
extern crate trust_dns_client;
extern crate trust_dns_proto;
extern crate trust_dns_server;
@ -25,9 +23,9 @@ use std::io::Write;
use std::net::*;
use std::str::FromStr;
use tokio::runtime::current_thread::Runtime;
use tokio_net::tcp::TcpStream as TokioTcpStream;
use tokio_net::udp::UdpSocket as TokioUdpSocket;
use tokio::net::TcpStream as TokioTcpStream;
use tokio::net::UdpSocket as TokioUdpSocket;
use tokio::runtime::Runtime;
use trust_dns_client::client::*;
use trust_dns_client::op::ResponseCode;
@ -253,7 +251,7 @@ fn test_server_continues_on_bad_data_tcp() {
fn test_forward() {
use server_harness::query_message;
env_logger::init();
//env_logger::init();
named_test_harness("example_forwarder.toml", |port, _, _| {
let mut io_loop = Runtime::new().unwrap();

View File

@ -12,7 +12,7 @@ use std::thread;
use std::time::*;
use futures::Future;
use tokio::runtime::current_thread::Runtime;
use tokio::runtime::Runtime;
use trust_dns_client::client::*;
use trust_dns_client::proto::error::ProtoError;
@ -23,6 +23,16 @@ use trust_dns_client::rr::*;
use self::mut_message_client::MutMessageHandle;
fn collect_and_print<R: BufRead>(read: &mut R, output: &mut String) {
output.clear();
read.read_line(output).expect("could not read stdio");
if !output.is_empty() {
// uncomment for debugging
// println!("SRV: {}", output.trim_end());
}
}
/// Spins up a Server and handles shutting it down after running the test
#[allow(dead_code)]
pub fn named_test_harness<F, R>(toml: &str, test: F)
@ -113,25 +123,18 @@ where
while Instant::now() < wait_for_start_until {
{
assert!(
named
.lock()
.unwrap()
.try_wait()
.expect("failed to check status of named")
.is_none(),
"named has already exited"
);
if let Some(ret_code) = named
.lock()
.unwrap()
.try_wait()
.expect("failed to check status of named")
{
panic!("named has already exited with code: {}", ret_code);
}
}
output.clear();
named_out
.read_line(&mut output)
.expect("could not read stdout");
if !output.is_empty() {
// uncomment for debugging
// println!("SRV: {}", output.trim_end());
}
collect_and_print(&mut named_out, &mut output);
if output.contains("awaiting connections...") {
found = true;
break;
@ -149,13 +152,16 @@ where
.spawn(move || {
let succeeded = succeeded_clone;
while !succeeded.load(atomic::Ordering::Relaxed) {
output.clear();
named_out
.read_line(&mut output)
.expect("could not read stdout");
if !output.is_empty() {
// uncomment for debugging
// println!("SRV: {}", output.trim_end());
collect_and_print(&mut named_out, &mut output);
if let Some(_ret_code) = named
.lock()
.unwrap()
.try_wait()
.expect("failed to check status of named")
{
// uncomment for debugging:
// println!("named exited with code: {}", _ret_code);
}
}
})

View File

@ -65,24 +65,22 @@ name = "trust_dns_client"
path = "src/lib.rs"
[dependencies]
chrono = "^0.4"
chrono = "0.4"
data-encoding = "2.1.0"
failure = "0.1"
futures-preview = "0.3.0-alpha"
lazy_static = "^1.0"
log = "^0.4.8"
openssl = { version = "^0.10", features = ["v102", "v110"], optional = true }
futures = "0.3.0"
lazy_static = "1.0"
log = "0.4.8"
openssl = { version = "0.10", features = ["v102", "v110"], optional = true }
radix_trie = "0.1.6"
rand = "0.7"
ring = { version = "0.16", optional = true, features = ["std"]}
rustls = { version = "0.16", optional = true }
serde = { version = "1.0", features = ["derive"], optional = true }
# TODO: make this optional
tokio = "0.2.0-alpha"
tokio-net = "0.2.0-alpha"
tokio = { version = "0.2.1", features = ["rt-core"] }
trust-dns-https = {version = "0.18.0-alpha", path = "../https", optional = true }
trust-dns-proto = {version = "0.18.0-alpha", path = "../proto", features = ["dnssec"]}
webpki = { version = "0.21", optional = true }
[dev-dependencies]
openssl = { version = "^0.10", features = ["v102", "v110"], optional = false }
openssl = { version = "0.10", features = ["v102", "v110"], optional = false }

View File

@ -15,7 +15,7 @@
use std::sync::Arc;
use futures::Future;
use tokio::runtime::current_thread::Runtime;
use tokio::runtime::Runtime;
use trust_dns_proto::{
error::ProtoError,
@ -83,9 +83,8 @@ pub trait Client {
) -> ClientResult<DnsResponse> {
let mut reactor = Runtime::new()?;
let (bg, mut client) = self.new_future();
reactor
.spawn(bg)
.block_on(client.query(name.clone(), query_class, query_type))
reactor.spawn(bg);
reactor.block_on(client.query(name.clone(), query_class, query_type))
}
/// Sends a NOTIFY message to the remote system
@ -108,9 +107,8 @@ pub trait Client {
{
let mut reactor = Runtime::new()?;
let (bg, mut client) = self.new_future();
reactor
.spawn(bg)
.block_on(client.notify(name, query_class, query_type, rrset))
reactor.spawn(bg);
reactor.block_on(client.notify(name, query_class, query_type, rrset))
}
/// Sends a record to create on the server, this will fail if the record exists (atomicity
@ -152,9 +150,8 @@ pub trait Client {
{
let mut reactor = Runtime::new()?;
let (bg, mut client) = self.new_future();
reactor
.spawn(bg)
.block_on(client.create(rrset, zone_origin))
reactor.spawn(bg);
reactor.block_on(client.create(rrset, zone_origin))
}
/// Appends a record to an existing rrset, optionally require the rrset to exist (atomicity
@ -197,9 +194,8 @@ pub trait Client {
{
let mut reactor = Runtime::new()?;
let (bg, mut client) = self.new_future();
reactor
.spawn(bg)
.block_on(client.append(rrset, zone_origin, must_exist))
reactor.spawn(bg);
reactor.block_on(client.append(rrset, zone_origin, must_exist))
}
/// Compares and if it matches, swaps it for the new value (atomicity depends on the server)
@ -255,9 +251,8 @@ pub trait Client {
{
let mut reactor = Runtime::new()?;
let (bg, mut client) = self.new_future();
reactor
.spawn(bg)
.block_on(client.compare_and_swap(current, new, zone_origin))
reactor.spawn(bg);
reactor.block_on(client.compare_and_swap(current, new, zone_origin))
}
/// Deletes a record (by rdata) from an rrset, optionally require the rrset to exist.
@ -301,9 +296,8 @@ pub trait Client {
{
let mut reactor = Runtime::new()?;
let (bg, mut client) = self.new_future();
reactor
.spawn(bg)
.block_on(client.delete_by_rdata(record, zone_origin))
reactor.spawn(bg);
reactor.block_on(client.delete_by_rdata(record, zone_origin))
}
/// Deletes an entire rrset, optionally require the rrset to exist.
@ -344,9 +338,8 @@ pub trait Client {
fn delete_rrset(&self, record: Record, zone_origin: Name) -> ClientResult<DnsResponse> {
let mut reactor = Runtime::new()?;
let (bg, mut client) = self.new_future();
reactor
.spawn(bg)
.block_on(client.delete_rrset(record, zone_origin))
reactor.spawn(bg);
reactor.block_on(client.delete_rrset(record, zone_origin))
}
/// Deletes all records at the specified name
@ -381,9 +374,8 @@ pub trait Client {
) -> ClientResult<DnsResponse> {
let mut reactor = Runtime::new()?;
let (bg, mut client) = self.new_future();
reactor
.spawn(bg)
.block_on(client.delete_all(name_of_records, zone_origin, dns_class))
reactor.spawn(bg);
reactor.block_on(client.delete_all(name_of_records, zone_origin, dns_class))
}
}
@ -505,9 +497,8 @@ where
) -> ClientResult<DnsResponse> {
let mut reactor = Runtime::new()?;
let (bg, mut client) = self.new_future();
reactor
.spawn(bg)
.block_on(client.query(query_name.clone(), query_class, query_type))
reactor.spawn(bg);
reactor.block_on(client.query(query_name.clone(), query_class, query_type))
}
}

View File

@ -7,7 +7,7 @@
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::{Context, Poll};
use std::time::Duration;
use crate::proto::error::ProtoError;
@ -16,7 +16,7 @@ use crate::proto::xfer::{
DnsMultiplexer, DnsMultiplexerConnect, DnsMultiplexerSerialResponse, DnsRequest,
DnsRequestOptions, DnsRequestSender, DnsResponse, DnsStreamHandle, OneshotDnsResponseReceiver,
};
use futures::{ready, Future, FutureExt, Poll};
use futures::{ready, Future, FutureExt};
use log::{debug, warn};
use rand;

View File

@ -7,10 +7,10 @@
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::{Context, Poll};
use futures::lock::Mutex;
use futures::{future::Fuse, Future, FutureExt, Poll};
use futures::{future::Fuse, Future, FutureExt};
#[allow(clippy::type_complexity)]
pub struct RcFuture<F: Future>

View File

@ -209,8 +209,8 @@
//! ```rust
//! use std::net::{Ipv4Addr, SocketAddr};
//! use std::str::FromStr;
//! use tokio_net::udp::UdpSocket;
//! use tokio::runtime::current_thread::Runtime;
//! use tokio::net::UdpSocket;
//! use tokio::runtime::Runtime;
//!
//! use trust_dns_client::udp::UdpClientStream;
//! use trust_dns_client::client::{Client, ClientFuture, ClientHandle};

View File

@ -13,7 +13,7 @@ use std::time::Duration;
use crate::proto::tcp::{TcpClientConnect, TcpClientStream};
use crate::proto::xfer::{DnsMultiplexer, DnsMultiplexerConnect, DnsRequestSender};
use tokio_net::tcp::TcpStream;
use tokio::net::TcpStream;
use crate::client::ClientConnection;
use crate::error::*;

View File

@ -18,7 +18,7 @@ use crate::client::ClientConnection;
use crate::error::*;
use crate::rr::dnssec::Signer;
use tokio_net::udp::UdpSocket;
use tokio::net::UdpSocket;
/// UDP based DNS Client connection
///

View File

@ -45,17 +45,16 @@ name = "trust_dns_https"
path = "src/lib.rs"
[dependencies]
bytes = "0.4"
bytes = "0.5"
data-encoding = "2.1.0"
failure = "0.1"
futures-preview = "0.3.0-alpha"
h2 = { version = "0.2.0-alpha.2", features = ["unstable-stream"] }
http = "0.1"
futures = "0.3.0"
h2 = { version = "0.2.0", features = ["stream"] }
http = "0.2"
log = "0.4"
rustls = "0.16"
tokio-executor = "0.2.0-alpha"
tokio-rustls = "0.12.0-alpha.8"
tokio-net = "0.2.0-alpha"
tokio = { version = "0.2.1", features = ["tcp", "io-util", "rt-core"] }
tokio-rustls = "0.12.0"
# disables default features, i.e. openssl...
trust-dns-proto = { version = "0.18.0-alpha", path = "../proto", features = ["tokio-compat"], default-features = false }
trust-dns-rustls = { version = "0.18.0-alpha", path = "../rustls", default-features = false }
@ -64,5 +63,4 @@ webpki-roots = { version = "0.18" }
webpki = "0.21"
[dev-dependencies]
env_logger = "^0.7"
tokio = "0.2.0-alpha"
env_logger = "0.7"

View File

@ -12,16 +12,16 @@ use std::net::SocketAddr;
use std::ops::DerefMut;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::{Context, Poll};
use bytes::Bytes;
use futures::{future, Future, FutureExt, Poll, Stream, TryFutureExt};
use bytes::{Bytes, BytesMut};
use futures::{future, Future, FutureExt, Stream, TryFutureExt};
use h2;
use h2::client::{Connection, SendRequest};
use http::{self, header};
use rustls::ClientConfig;
use tokio_executor;
use tokio_net::tcp::TcpStream as TokioTcpStream;
use tokio;
use tokio::net::TcpStream as TokioTcpStream;
use tokio_rustls::{client::TlsStream as TokioTlsClientStream, Connect, TlsConnector};
use typed_headers::{ContentLength, HeaderMapExt};
use webpki::DNSNameRef;
@ -69,7 +69,7 @@ impl HttpsClientStream {
// build up the http request
let bytes = Bytes::from(message.bytes());
let bytes = BytesMut::from(message.bytes());
let request = crate::request::new(&name_server_name, bytes.len());
let request =
@ -83,7 +83,7 @@ impl HttpsClientStream {
.map_err(|err| ProtoError::from(format!("h2 send_request error: {}", err)))?;
send_stream
.send_data(bytes, true)
.send_data(bytes.freeze(), true)
.map_err(|e| ProtoError::from(format!("h2 send_data error: {}", e)))?;
let mut response_stream = response_future
@ -103,7 +103,7 @@ impl HttpsClientStream {
// max(512) says make sure it is at least 512 bytes, and min 4096 says it is at most 4k
// just a little protection from malicious actors.
let mut response_bytes =
Bytes::with_capacity(content_length.unwrap_or(512).max(512).min(4096));
BytesMut::with_capacity(content_length.unwrap_or(512).max(512).min(4096));
while let Some(partial_bytes) = response_stream.body_mut().data().await {
let partial_bytes =
@ -466,7 +466,7 @@ impl Future for HttpsClientConnectState {
// TODO: hand this back for others to run rather than spawning here?
debug!("h2 connection established to: {}", name_server);
tokio_executor::spawn(
tokio::spawn(
connection
.map_err(|e| warn!("h2 connection failed: {}", e))
.map(|_: Result<(), ()>| ()),
@ -513,8 +513,8 @@ mod tests {
use std::net::{Ipv4Addr, SocketAddr};
use std::str::FromStr;
use self::tokio::runtime::current_thread;
use rustls::{ClientConfig, ProtocolVersion, RootCertStore};
use tokio::runtime::Runtime;
use webpki_roots;
use trust_dns_proto::op::{Message, Query};
@ -524,7 +524,7 @@ mod tests {
#[test]
fn test_https_cloudflare() {
self::env_logger::try_init().ok();
//self::env_logger::try_init().ok();
let cloudflare = SocketAddr::from(([1, 1, 1, 1], 443));
let mut request = Message::new();
@ -547,7 +547,7 @@ mod tests {
let connect = https_builder.build(cloudflare, "cloudflare-dns.com".to_string());
// tokio runtime stuff...
let mut runtime = current_thread::Runtime::new().expect("could not start runtime");
let mut runtime = Runtime::new().expect("could not start runtime");
let mut https = runtime.block_on(connect).expect("https connect failed");
let sending = runtime.block_on(future::lazy(|cx| https.send_message(request, cx)));

View File

@ -11,7 +11,7 @@ use std::borrow::Borrow;
use std::fmt::Debug;
use std::sync::Arc;
use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use futures::{Stream, StreamExt};
use h2;
use http::{Method, Request};
@ -26,7 +26,7 @@ use crate::HttpsError;
pub async fn message_from<R>(
this_server_name: Arc<String>,
request: Request<R>,
) -> Result<Bytes, HttpsError>
) -> Result<BytesMut, HttpsError>
where
R: Stream<Item = Result<Bytes, h2::Error>> + 'static + Send + Debug + Unpin,
{
@ -61,15 +61,15 @@ where
pub(crate) async fn message_from_post<R>(
mut request_stream: R,
length: Option<usize>,
) -> Result<Bytes, HttpsError>
) -> Result<BytesMut, HttpsError>
where
R: Stream<Item = Result<Bytes, h2::Error>> + 'static + Send + Debug + Unpin,
{
let mut bytes = Bytes::with_capacity(length.unwrap_or(0).min(512).max(4096));
let mut bytes = BytesMut::with_capacity(length.unwrap_or(0).min(512).max(4096));
loop {
match request_stream.next().await {
Some(Ok(frame)) => bytes.extend_from_slice(&frame.slice_from(0)),
Some(Ok(mut frame)) => bytes.extend_from_slice(&frame.split_off(0)),
Some(Err(err)) => return Err(err.into()),
None => {
return if let Some(length) = length {

View File

@ -23,8 +23,7 @@ extern crate http;
extern crate log;
extern crate failure;
extern crate rustls;
extern crate tokio_executor;
extern crate tokio_net;
extern crate tokio;
extern crate tokio_rustls;
extern crate trust_dns_proto;
extern crate trust_dns_rustls;

View File

@ -92,12 +92,12 @@ pub fn verify<T>(name_server: &str, request: &Request<T>) -> HttpsResult<()> {
}
// we only accept HTTPS
if Some(&uri::Scheme::HTTPS) != uri.scheme_part() {
if Some(&uri::Scheme::HTTPS) != uri.scheme() {
return Err("must be HTTPS scheme".into());
}
// the authority must match our nameserver name
if let Some(authority) = uri.authority_part() {
if let Some(authority) = uri.authority() {
if authority.host() != name_server {
return Err("incorrect authority".into());
}

View File

@ -42,9 +42,8 @@ use crate::HttpsResult;
/// code 406, [RFC7231] Section 6.5.6), and so on.
/// ```
pub fn new(message_len: usize) -> HttpsResult<Response<()>> {
let mut response = Response::builder();
response.status(StatusCode::OK);
response.version(Version::HTTP_2);
let response = Response::builder();
let response = response.status(StatusCode::OK).version(Version::HTTP_2);
let mut response = response
.body(())
.map_err(|e| ProtoError::from(format!("invalid response: {}", e)))?;

View File

@ -45,16 +45,15 @@ name = "trust_dns_native_tls"
path = "src/lib.rs"
[dependencies]
futures-preview = "0.3.0-alpha"
futures = "0.3.0"
native-tls = "0.2"
tokio-net = "0.2.0-alpha"
tokio-tls = "0.3.0-alpha"
tokio = "0.2.1"
tokio-tls = "0.3.0"
# disables default features, i.e. openssl...
trust-dns-proto = { version = "0.18.0-alpha", path = "../proto", features = ["tokio-compat"], default-features = false }
[dev-dependencies]
tokio = "0.2.0-alpha"
## Commented out until MTLS support is complete
# [target.'cfg(target_os = "linux")'.dependencies]
# openssl = { version = "^0.10", features = ["v102", "v110"] }
# openssl = { version = "0.10", features = ["v102", "v110"] }

View File

@ -24,9 +24,7 @@
extern crate futures;
extern crate native_tls;
#[cfg(test)]
extern crate tokio;
extern crate tokio_net;
extern crate tokio_tls;
extern crate trust_dns_proto;

View File

@ -5,7 +5,7 @@
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.
#![allow(clippy::dbg_macro, clippy::print_stdout)]
#![allow(unused_imports, clippy::dbg_macro, clippy::print_stdout)]
use std;
use std::env;
@ -22,7 +22,7 @@ use std::{thread, time};
use futures::StreamExt;
use native_tls;
use native_tls::{Certificate, TlsAcceptor};
use tokio::runtime::current_thread::Runtime;
use tokio::runtime::Runtime;
use trust_dns_proto::xfer::SerialMessage;
@ -35,6 +35,7 @@ use crate::{TlsStream, TlsStreamBuilder};
// but not 3?
// #[cfg(not(target_os = "linux"))]
#[test]
#[cfg(not(target_os = "macos"))] // certificates are failing on macOS now...
fn test_tls_client_stream_ipv4() {
tls_client_stream_test(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), false)
}
@ -42,13 +43,15 @@ fn test_tls_client_stream_ipv4() {
// FIXME: mtls is disabled at the moment, it causes a hang on Linux, and is currently not supported on macOS
#[cfg(feature = "mtls")]
#[test]
#[cfg(not(target_os = "macos"))] // ignored until Travis-CI fixes IPv6
#[cfg(not(target_os = "macos"))]
fn test_tls_client_stream_ipv4_mtls() {
tls_client_stream_test(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), true)
}
#[test]
#[cfg(not(target_os = "linux"))] // ignored until Travis-CI fixes IPv6
#[cfg(not(target_os = "macos"))] // certificates are failing on macOS now
fn test_tls_client_stream_ipv6() {
tls_client_stream_test(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), false)
}

View File

@ -14,7 +14,7 @@ use futures::{Future, TryFutureExt};
use native_tls::Certificate;
#[cfg(feature = "mtls")]
use native_tls::Pkcs12;
use tokio_net::tcp::TcpStream as TokioTcpStream;
use tokio::net::TcpStream as TokioTcpStream;
use tokio_tls::TlsStream as TokioTlsStream;
use trust_dns_proto::error::ProtoError;

View File

@ -15,7 +15,7 @@ use futures::channel::mpsc::{unbounded, UnboundedReceiver};
use futures::{Future, TryFutureExt};
use native_tls::Protocol::Tlsv12;
use native_tls::{Certificate, Identity, TlsConnector};
use tokio_net::tcp::TcpStream as TokioTcpStream;
use tokio::net::TcpStream as TokioTcpStream;
use tokio_tls::{TlsConnector as TokioTlsConnector, TlsStream as TokioTlsStream};
use trust_dns_proto::tcp::TcpStream;

View File

@ -43,12 +43,12 @@ name = "trust_dns_openssl"
path = "src/lib.rs"
[dependencies]
futures-preview = "0.3.0-alpha"
futures = "0.3.0"
openssl = { version = "0.10", features = ["v102", "v110"] }
tokio-openssl = "0.4.0-alpha"
tokio-net = "0.2.0-alpha"
tokio-openssl = "0.4.0"
tokio = "0.2.1"
trust-dns-proto = { version = "0.18.0-alpha", path = "../proto", features = ["openssl"] }
[dev-dependencies]
openssl = { version = "0.10", features = ["v102", "v110"] }
tokio = "0.2.0-alpha"
tokio = "0.2.1"

View File

@ -25,7 +25,7 @@
extern crate futures;
extern crate openssl;
extern crate tokio_net;
extern crate tokio;
extern crate tokio_openssl;
extern crate trust_dns_proto;

View File

@ -14,7 +14,7 @@ use futures::{Future, TryFutureExt};
#[cfg(feature = "mtls")]
use openssl::pkcs12::Pkcs12;
use openssl::x509::X509;
use tokio_net::tcp::TcpStream as TokioTcpStream;
use tokio::net::TcpStream as TokioTcpStream;
use tokio_openssl::SslStream as TokioTlsStream;
use trust_dns_proto::error::ProtoError;

View File

@ -17,7 +17,7 @@ use openssl::ssl::{ConnectConfiguration, SslConnector, SslContextBuilder, SslMet
use openssl::stack::Stack;
use openssl::x509::store::X509StoreBuilder;
use openssl::x509::{X509Ref, X509};
use tokio_net::tcp::TcpStream as TokioTcpStream;
use tokio::net::TcpStream as TokioTcpStream;
use tokio_openssl::{self, SslStream as TokioTlsStream};
use trust_dns_proto::tcp::TcpStream;

View File

@ -26,7 +26,7 @@ use openssl::pkey::*;
use openssl::ssl::*;
use openssl::x509::store::X509StoreBuilder;
use openssl::x509::*;
use tokio::runtime::current_thread::Runtime;
use tokio::runtime::Runtime;
use openssl::asn1::*;
use openssl::bn::*;

View File

@ -39,7 +39,7 @@ codecov = { repository = "bluejekyll/trust-dns", branch = "master", service = "g
dnssec-openssl = ["dnssec", "openssl"]
dnssec-ring = ["dnssec", "ring"]
dnssec = ["data-encoding"]
tokio-compat = ["tokio-net"]
tokio-compat = ["tokio"]
default = ["tokio-compat"]
serde-config = ["serde"]
@ -59,23 +59,18 @@ async-trait = "0.1.18"
data-encoding = { version = "2.1.0", optional = true }
enum-as-inner = "0.3"
failure = "0.1"
futures-preview = "0.3.0-alpha"
idna = "^0.2.0"
lazy_static = "^1.0"
log = "^0.4.8"
openssl = { version = "^0.10", features = ["v102", "v110"], optional = true }
futures = "0.3.0"
idna = "0.2.0"
lazy_static = "1.0"
log = "0.4.8"
openssl = { version = "0.10", features = ["v102", "v110"], optional = true }
rand = "0.7"
ring = { version = "0.16", optional = true, features = ["std"] }
serde = { version = "1.0", optional = true }
smallvec = "^1.0"
socket2 = { version = "^0.3.10" }
tokio-executor = "0.2.0-alpha"
tokio-io = "0.2.0-alpha"
tokio-sync = "0.2.0-alpha"
tokio-net = { version = "0.2.0-alpha", optional = true }
tokio-timer = "0.3.0-alpha"
smallvec = "1.0"
socket2 = { version = "0.3.10" }
tokio = { version = "0.2.1", features = ["time", "udp", "tcp"], optional = true }
url = "2.1.0"
[dev-dependencies]
env_logger = "^0.7"
tokio = "0.2.0-alpha"
env_logger = "0.7"

View File

@ -23,9 +23,8 @@ use openssl::error::ErrorStack as SslErrorStack;
use ring::error::Unspecified;
use failure::{Backtrace, Context, Fail};
use tokio_executor::SpawnError;
use tokio_timer::timeout::Elapsed;
use tokio_timer::Error as TimerError;
use tokio::time::Elapsed;
use tokio::time::Error as TimerError;
/// An alias for results returned by functions of this crate
pub type ProtoResult<T> = ::std::result::Result<T, ProtoError>;
@ -168,10 +167,6 @@ pub enum ProtoErrorKind {
#[fail(display = "ring error")]
Ring,
/// Tokio Spawn Error
#[fail(display = "tokio spawn error")]
SpawnError,
/// An ssl error
#[fail(display = "ssl error")]
SSL,
@ -277,12 +272,6 @@ impl From<Unspecified> for ProtoError {
}
}
impl From<SpawnError> for ProtoError {
fn from(e: SpawnError) -> ProtoError {
e.context(ProtoErrorKind::SpawnError).into()
}
}
impl From<SslErrorStack> for ProtoError {
fn from(e: SslErrorStack) -> ProtoError {
e.context(ProtoErrorKind::SSL).into()
@ -409,7 +398,6 @@ impl Clone for ProtoErrorKind {
Io => Io,
Poisoned => Poisoned,
Ring => Ring,
SpawnError => SpawnError,
SSL => SSL,
Timeout => Timeout,
Timer => Timer,

View File

@ -8,10 +8,10 @@
use std::fmt::{self, Display};
use std::net::{Ipv4Addr, SocketAddr};
use std::pin::Pin;
use std::task::Context;
use std::task::{Context, Poll};
use futures::stream::{StreamExt, TryStreamExt};
use futures::{Future, FutureExt, Poll, Stream, TryFutureExt};
use futures::{Future, FutureExt, Stream, TryFutureExt};
use crate::error::ProtoError;
use crate::multicast::mdns_stream::{MDNS_IPV4, MDNS_IPV6};

View File

@ -10,21 +10,20 @@ use std::io;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::{Context, Poll};
use futures::channel::mpsc::unbounded;
use futures::future;
use futures::lock::Mutex;
use futures::ready;
use futures::stream::{Stream, StreamExt};
use futures::{Future, FutureExt, Poll, TryFutureExt};
use futures::{Future, FutureExt, TryFutureExt};
use lazy_static::lazy_static;
use log::{debug, trace};
use rand;
use rand::distributions::{uniform::Uniform, Distribution};
use socket2::{self, Socket};
use tokio_net::driver::Handle;
use tokio_net::udp::UdpSocket;
use tokio::net::UdpSocket;
use crate::multicast::MdnsQueryType;
use crate::udp::UdpStream;
@ -141,13 +140,10 @@ impl MdnsStream {
// This set of futures collapses the next udp socket into a stream which can be used for
// sending and receiving udp packets.
let stream = {
let handle = Handle::default();
let handle_clone = handle.clone();
Box::new(
next_socket
.map(move |socket| match socket {
Ok(Some(socket)) => Ok(Some(UdpSocket::from_std(socket, &handle)?)),
Ok(Some(socket)) => Ok(Some(UdpSocket::from_std(socket)?)),
Ok(None) => Ok(None),
Err(err) => Err(err),
})
@ -156,8 +152,7 @@ impl MdnsStream {
socket.map(|socket| UdpStream::from_parts(socket, outbound_messages));
let multicast: Option<_> = multicast_socket.map(|multicast_socket| {
Arc::new(Mutex::new(
UdpSocket::from_std(multicast_socket, &handle_clone)
.expect("bad handle?"),
UdpSocket::from_std(multicast_socket).expect("bad handle?"),
))
});
@ -431,8 +426,7 @@ pub mod tests {
use super::*;
use futures::future::Either;
use tokio::runtime::current_thread::Runtime;
use tokio_timer;
use tokio::runtime;
// TODO: is there a better way?
const BASE_TEST_PORT: u16 = 5379;
@ -450,7 +444,7 @@ pub mod tests {
// use env_logger;
// env_logger::init();
let mut io_loop = Runtime::new().unwrap();
let mut io_loop = runtime::Runtime::new().unwrap();
let (stream, _) = MdnsStream::new(
SocketAddr::new(*TEST_MDNS_IPV4, BASE_TEST_PORT),
MdnsQueryType::OneShot,
@ -481,7 +475,7 @@ pub mod tests {
// as there are probably unexpected responses coming on the standard addresses
fn one_shot_mdns_test(mdns_addr: SocketAddr) {
use std::time::{Duration, Instant};
use std::time::Duration;
let client_done = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
@ -493,12 +487,11 @@ pub mod tests {
let server_handle = std::thread::Builder::new()
.name("test_one_shot_mdns:server".to_string())
.spawn(move || {
let mut server_loop = Runtime::new().unwrap();
let mut timeout = future::lazy(|_| {
tokio_timer::delay(Instant::now() + Duration::from_millis(100))
})
.flatten()
.boxed();
let mut server_loop = runtime::Runtime::new().unwrap();
let mut timeout =
future::lazy(|_| tokio::time::delay_for(Duration::from_millis(100)))
.flatten()
.boxed();
// TTLs are 0 so that multicast test packets never leave the test host...
// FIXME: this is hardcoded to index 5 for ipv6, which isn't going to be correct in most cases...
@ -548,7 +541,7 @@ pub mod tests {
Either::Right(((), buffer_and_addr_stream_tmp)) => {
server_stream = buffer_and_addr_stream_tmp;
timeout = future::lazy(|_| {
tokio_timer::delay(Instant::now() + Duration::from_millis(100))
tokio::time::delay_for(Duration::from_millis(100))
})
.flatten()
.boxed();
@ -556,24 +549,21 @@ pub mod tests {
}
// let the server turn for a bit... send the message
server_loop.block_on(tokio_timer::delay(
Instant::now() + Duration::from_millis(100),
));
server_loop.block_on(tokio::time::delay_for(Duration::from_millis(100)));
}
})
.unwrap();
// setup the client, which is going to run on the testing thread...
let mut io_loop = Runtime::new().unwrap();
let mut io_loop = runtime::Runtime::new().unwrap();
// FIXME: this is hardcoded to index 5 for ipv6, which isn't going to be correct in most cases...
let (stream, sender) =
MdnsStream::new(mdns_addr, MdnsQueryType::OneShot, Some(1), None, Some(5));
let mut stream = io_loop.block_on(stream).ok().unwrap().into_future();
let mut timeout =
future::lazy(|_| tokio_timer::delay(Instant::now() + Duration::from_millis(100)))
.flatten()
.boxed();
let mut timeout = future::lazy(|_| tokio::time::delay_for(Duration::from_millis(100)))
.flatten()
.boxed();
let mut successes = 0;
for _ in 0..send_recv_times {
@ -602,11 +592,9 @@ pub mod tests {
}
Either::Right(((), buffer_and_addr_stream_tmp)) => {
stream = buffer_and_addr_stream_tmp;
timeout = future::lazy(|_| {
tokio_timer::delay(Instant::now() + Duration::from_millis(100))
})
.flatten()
.boxed();
timeout = future::lazy(|_| tokio::time::delay_for(Duration::from_millis(100)))
.flatten()
.boxed();
}
}
}
@ -639,7 +627,7 @@ pub mod tests {
// as there are probably unexpected responses coming on the standard addresses
fn passive_mdns_test(mdns_query_type: MdnsQueryType, mdns_addr: SocketAddr) {
use std::time::{Duration, Instant};
use std::time::Duration;
let server_got_packet = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
@ -651,12 +639,11 @@ pub mod tests {
let _server_handle = std::thread::Builder::new()
.name("test_one_shot_mdns:server".to_string())
.spawn(move || {
let mut io_loop = Runtime::new().unwrap();
let mut timeout = future::lazy(|_| {
tokio_timer::delay(Instant::now() + Duration::from_millis(100))
})
.flatten()
.boxed();
let mut io_loop = runtime::Runtime::new().unwrap();
let mut timeout =
future::lazy(|_| tokio::time::delay_for(Duration::from_millis(100)))
.flatten()
.boxed();
// TTLs are 0 so that multicast test packets never leave the test host...
// FIXME: this is hardcoded to index 5 for ipv6, which isn't going to be correct in most cases...
@ -691,7 +678,7 @@ pub mod tests {
Either::Right(((), buffer_and_addr_stream_tmp)) => {
server_stream = buffer_and_addr_stream_tmp;
timeout = future::lazy(|_| {
tokio_timer::delay(Instant::now() + Duration::from_millis(100))
tokio::time::delay_for(Duration::from_millis(100))
})
.flatten()
.boxed();
@ -699,23 +686,20 @@ pub mod tests {
}
// let the server turn for a bit... send the message
io_loop.block_on(tokio_timer::delay(
Instant::now() + Duration::from_millis(100),
));
io_loop.block_on(tokio::time::delay_for(Duration::from_millis(100)));
}
})
.unwrap();
// setup the client, which is going to run on the testing thread...
let mut io_loop = Runtime::new().unwrap();
let mut io_loop = runtime::Runtime::new().unwrap();
// FIXME: this is hardcoded to index 5 for ipv6, which isn't going to be correct in most cases...
let (stream, sender) =
MdnsStream::new(mdns_addr, MdnsQueryType::OneShot, Some(1), None, Some(5));
let mut stream = io_loop.block_on(stream).ok().unwrap().into_future();
let mut timeout =
future::lazy(|_| tokio_timer::delay(Instant::now() + Duration::from_millis(100)))
.flatten()
.boxed();
let mut timeout = future::lazy(|_| tokio::time::delay_for(Duration::from_millis(100)))
.flatten()
.boxed();
for _ in 0..send_recv_times {
// test once
@ -741,11 +725,9 @@ pub mod tests {
}
Either::Right(((), buffer_and_addr_stream_tmp)) => {
stream = buffer_and_addr_stream_tmp;
timeout = future::lazy(|_| {
tokio_timer::delay(Instant::now() + Duration::from_millis(100))
})
.flatten()
.boxed();
timeout = future::lazy(|_| tokio::time::delay_for(Duration::from_millis(100)))
.flatten()
.boxed();
}
}
}

View File

@ -9,13 +9,13 @@ use std::fmt::{self, Display};
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::Context;
use std::task::{Context, Poll};
use std::time::Duration;
use async_trait::async_trait;
use futures::{Future, Poll, Stream, StreamExt, TryFutureExt};
use futures::{Future, Stream, StreamExt, TryFutureExt};
use log::warn;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncRead, AsyncWrite};
use crate::error::ProtoError;
use crate::tcp::{Connect, TcpStream};
@ -126,7 +126,7 @@ impl<S> Future for TcpClientConnect<S> {
}
#[cfg(feature = "tokio-compat")]
use tokio_net::tcp::TcpStream as TokioTcpStream;
use tokio::net::TcpStream as TokioTcpStream;
#[cfg(feature = "tokio-compat")]
#[async_trait]
@ -167,7 +167,7 @@ const TEST_BYTES_LEN: usize = 8;
#[cfg(test)]
fn tcp_client_stream_test(server_addr: IpAddr) {
use std::io::{Read, Write};
use tokio::runtime::current_thread::Runtime;
use tokio::runtime;
let succeeded = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let succeeded_clone = succeeded.clone();
@ -235,7 +235,7 @@ fn tcp_client_stream_test(server_addr: IpAddr) {
.unwrap();
// setup the client, which is going to run on the testing thread...
let mut io_loop = Runtime::new().unwrap();
let mut io_loop = runtime::Runtime::new().unwrap();
// the tests should run within 5 seconds... right?
// TODO: add timeout here, so that test never hangs...

View File

@ -11,15 +11,14 @@ use std::io;
use std::mem;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::Context;
use std::task::{Context, Poll};
use std::time::Duration;
use async_trait::async_trait;
use futures::channel::mpsc::{unbounded, UnboundedReceiver};
use futures::stream::{Fuse, Peekable, Stream, StreamExt};
use futures::{ready, Future, FutureExt, Poll, TryFutureExt};
use futures::{ready, Future, FutureExt, TryFutureExt};
use log::debug;
use tokio_timer::Timeout;
use crate::error::*;
use crate::xfer::{BufStreamHandle, SerialMessage};
@ -31,7 +30,7 @@ where
Self: Sized,
{
/// TcpSteam
type Transport: tokio_io::AsyncRead + tokio_io::AsyncWrite + Send;
type Transport: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send;
/// connect to tcp
async fn connect(addr: &SocketAddr) -> io::Result<Self::Transport>;
@ -160,7 +159,7 @@ impl<S: Connect + 'static> TcpStream<S> {
outbound_messages: UnboundedReceiver<SerialMessage>,
) -> Result<TcpStream<S::Transport>, io::Error> {
let tcp = S::connect(&name_server);
Timeout::new(tcp, timeout)
tokio::time::timeout(timeout, tcp)
.map_err(move |_| {
debug!("timed out connecting to: {}", name_server);
io::Error::new(
@ -191,7 +190,7 @@ impl<S: Connect + 'static> TcpStream<S> {
}
}
impl<S: tokio_io::AsyncRead + tokio_io::AsyncWrite> TcpStream<S> {
impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite> TcpStream<S> {
/// Initializes a TcpStream.
///
/// This is intended for use with a TcpListener and Incoming.
@ -228,7 +227,7 @@ impl<S: tokio_io::AsyncRead + tokio_io::AsyncWrite> TcpStream<S> {
}
}
impl<S: tokio_io::AsyncRead + tokio_io::AsyncWrite + Unpin> Stream for TcpStream<S> {
impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin> Stream for TcpStream<S> {
type Item = io::Result<SerialMessage>;
#[allow(clippy::cognitive_complexity)]
@ -451,7 +450,7 @@ mod tests {
use std::net::Ipv6Addr;
use std::net::{IpAddr, Ipv4Addr};
use tokio_net::tcp;
use tokio;
use super::*;
@ -478,7 +477,7 @@ mod tests {
#[cfg(test)]
fn tcp_client_stream_test(server_addr: IpAddr) {
use std::io::{Read, Write};
use tokio::runtime::current_thread::Runtime;
use tokio::runtime;
let succeeded = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let succeeded_clone = succeeded.clone();
@ -546,12 +545,12 @@ mod tests {
.unwrap();
// setup the client, which is going to run on the testing thread...
let mut io_loop = Runtime::new().unwrap();
let mut io_loop = runtime::Runtime::new().unwrap();
// the tests should run within 5 seconds... right?
// TODO: add timeout here, so that test never hangs...
// let timeout = Timeout::new(Duration::from_secs(5));
let (stream, sender) = TcpStream::<tcp::TcpStream>::new::<ProtoError>(server_addr);
let (stream, sender) = TcpStream::<tokio::net::TcpStream>::new::<ProtoError>(server_addr);
let mut stream = io_loop.block_on(stream).expect("run failed to get stream");

View File

@ -11,12 +11,12 @@ use std::marker::PhantomData;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::{Context, Poll};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use futures::{Future, Poll, Stream};
use futures::{Future, Stream};
use log::{debug, warn};
use tokio_timer::timeout::{Elapsed, Timeout};
use tokio::time::Elapsed;
use crate::error::ProtoError;
use crate::op::message::NoopMessageFinalizer;
@ -205,9 +205,9 @@ impl UdpResponse {
message_id: u16,
timeout: Duration,
) -> Self {
UdpResponse(Box::pin(Timeout::new(
SingleUseUdpSocket::send_serial_message::<S>(request, message_id),
UdpResponse(Box::pin(tokio::time::timeout(
timeout,
SingleUseUdpSocket::send_serial_message::<S>(request, message_id),
)))
}
@ -216,7 +216,7 @@ impl UdpResponse {
f: F,
) -> Self {
// TODO: this constructure isn't really necessary
UdpResponse(Box::pin(Timeout::new(f, Duration::from_secs(5))))
UdpResponse(Box::pin(tokio::time::timeout(Duration::from_secs(5), f)))
}
}
@ -350,7 +350,7 @@ mod tests {
#[cfg(not(target_os = "linux"))]
use std::net::Ipv6Addr;
use std::net::{IpAddr, Ipv4Addr};
use tokio_net::udp;
use tokio;
use super::*;
use crate::op::Message;
@ -371,7 +371,7 @@ mod tests {
use crate::rr::rdata::NULL;
use crate::rr::{Name, RData, Record, RecordType};
use std::str::FromStr;
use tokio::runtime::current_thread::Runtime;
use tokio::runtime;
// use env_logger;
// env_logger::try_init().ok();
@ -449,13 +449,14 @@ mod tests {
.unwrap();
// setup the client, which is going to run on the testing thread...
let mut io_loop = Runtime::new().unwrap();
let mut io_loop = runtime::Runtime::new().unwrap();
// the tests should run within 5 seconds... right?
// TODO: add timeout here, so that test never hangs...
// let timeout = Timeout::new(Duration::from_secs(5));
let stream = UdpClientStream::with_timeout(server_addr, Duration::from_millis(500));
let mut stream: UdpClientStream<udp::UdpSocket> = io_loop.block_on(stream).ok().unwrap();
let mut stream: UdpClientStream<tokio::net::UdpSocket> =
io_loop.block_on(stream).ok().unwrap();
let mut worked_once = false;
for i in 0..send_recv_times {

View File

@ -9,14 +9,12 @@ use std::io;
use std::marker::PhantomData;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::{Context, Poll};
use async_trait::async_trait;
use futures::channel::mpsc::{unbounded, UnboundedReceiver};
use futures::lock::Mutex;
use futures::stream::{Fuse, Peekable, Stream, StreamExt};
use futures::{ready, Future, Poll, TryFutureExt};
use futures::{ready, Future, FutureExt, TryFutureExt};
use log::debug;
use rand;
use rand::distributions::{uniform::Uniform, Distribution};
@ -41,10 +39,8 @@ where
/// A UDP stream of DNS binary packets
#[must_use = "futures do nothing unless polled"]
pub struct UdpStream<S: Send> {
socket: Arc<Mutex<S>>,
sending: Option<Pin<Box<dyn Future<Output = io::Result<usize>> + Send>>>,
socket: S,
outbound_messages: Peekable<Fuse<UnboundedReceiver<SerialMessage>>>,
receiving: Option<Pin<Box<dyn Future<Output = io::Result<SerialMessage>> + Send>>>,
}
impl<S: UdpSocket + Send + 'static> UdpStream<S> {
@ -78,10 +74,8 @@ impl<S: UdpSocket + Send + 'static> UdpStream<S> {
// This set of futures collapses the next udp socket into a stream which can be used for
// sending and receiving udp packets.
let stream = Box::new(next_socket.map_ok(move |socket| UdpStream {
socket: Arc::new(Mutex::new(socket)),
sending: None,
socket,
outbound_messages: outbound_messages.fuse().peekable(),
receiving: None,
}));
(stream, message_sender)
@ -105,10 +99,8 @@ impl<S: UdpSocket + Send + 'static> UdpStream<S> {
let message_sender = BufStreamHandle::new(message_sender);
let stream = UdpStream {
socket: Arc::new(Mutex::new(socket)),
sending: None,
socket,
outbound_messages: outbound_messages.fuse().peekable(),
receiving: None,
};
(stream, message_sender)
@ -120,10 +112,8 @@ impl<S: UdpSocket + Send + 'static> UdpStream<S> {
outbound_messages: UnboundedReceiver<SerialMessage>,
) -> Self {
UdpStream {
socket: Arc::new(Mutex::new(socket)),
sending: None,
socket,
outbound_messages: outbound_messages.fuse().peekable(),
receiving: None,
}
}
}
@ -133,17 +123,10 @@ impl<S: Send> UdpStream<S> {
fn pollable_split(
&mut self,
) -> (
&mut Arc<Mutex<S>>,
&mut Option<Pin<Box<dyn Future<Output = io::Result<usize>> + Send>>>,
&mut S,
&mut Peekable<Fuse<UnboundedReceiver<SerialMessage>>>,
&mut Option<Pin<Box<dyn Future<Output = io::Result<SerialMessage>> + Send>>>,
) {
(
&mut self.socket,
&mut self.sending,
&mut self.outbound_messages,
&mut self.receiving,
)
(&mut self.socket, &mut self.outbound_messages)
}
}
@ -151,76 +134,44 @@ impl<S: UdpSocket + Send + 'static> Stream for UdpStream<S> {
type Item = Result<SerialMessage, io::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let (socket, sending, outbound_messages, receiving) = self.pollable_split();
let (socket, outbound_messages) = self.pollable_split();
let mut socket = Pin::new(socket);
let mut outbound_messages = Pin::new(outbound_messages);
// this will not accept incoming data while there is data to send
// makes this self throttling.
loop {
// if there's something currently sending, send it
if let Some(ref mut sending) = sending {
ready!(sending.as_mut().poll(cx))?;
}
*sending = None;
// first try to send
match outbound_messages.as_mut().poll_next(cx) {
match outbound_messages.as_mut().poll_peek(cx) {
Poll::Ready(Some(message)) => {
let socket = Arc::clone(socket);
let sending_fut = async {
let message = message;
let socket = socket;
let mut socket = socket.lock().await;
let addr = &message.addr();
socket.send_to(message.bytes(), addr).await
};
let addr = &message.addr();
// will return if the socket will block
*sending = Some(Box::pin(sending_fut));
// this wiil return if not ready,
// meaning that sending will be prefered over receiving...
// TODO: shouldn't this return the error to send to the sender?
ready!(socket.send_to(message.bytes(), addr).poll_unpin(cx))?;
}
// now we get to drop through to the receives...
// TODO: should we also return None if there are no more messages to send?
Poll::Pending | Poll::Ready(None) => break,
}
// message sent, need to pop the message
assert!(outbound_messages.as_mut().poll_next(cx).is_ready());
}
// For QoS, this will only accept one message and output that
// receive all inbound messages
// TODO: this should match edns settings
loop {
let msg = if let Some(receiving) = receiving {
// TODO: should we drop this packet if it's not from the same src as dest?
let msg = ready!(receiving.as_mut().poll(cx))?;
let mut buf = [0u8; 2048];
let (len, src) = ready!(socket.recv_from(&mut buf).poll_unpin(cx))?;
Some(Poll::Ready(Some(Ok(msg))))
} else {
None
};
*receiving = None;
if let Some(msg) = msg {
return msg;
}
let socket = Arc::clone(socket);
let receive_future = async {
let socket = socket;
let mut buf = [0u8; 2048];
let mut socket = socket.lock().await;
let (len, src) = socket.recv_from(&mut buf).await?;
Ok(SerialMessage::new(
buf.iter().take(len).cloned().collect(),
src,
))
};
*receiving = Some(Box::pin(receive_future));
}
Poll::Ready(Some(Ok(SerialMessage::new(
buf.iter().take(len).cloned().collect(),
src,
))))
}
}
@ -289,10 +240,10 @@ impl<S: UdpSocket> Future for NextRandomUdpSocket<S> {
#[test]
fn test_next_random_socket() {
use tokio::runtime::current_thread::Runtime;
use tokio::{self, runtime};
let mut io_loop = Runtime::new().unwrap();
let (stream, _) = UdpStream::<udp::UdpSocket>::new(SocketAddr::new(
let mut io_loop = runtime::Runtime::new().unwrap();
let (stream, _) = UdpStream::<tokio::net::UdpSocket>::new(SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
52,
));
@ -314,13 +265,13 @@ fn test_udp_stream_ipv6() {
udp_stream_test(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)))
}
#[cfg(feature = "tokio-compat")]
use tokio_net::udp;
use tokio::net;
#[cfg(feature = "tokio-compat")]
#[async_trait]
impl UdpSocket for udp::UdpSocket {
impl UdpSocket for net::UdpSocket {
async fn bind(addr: &SocketAddr) -> io::Result<Self> {
udp::UdpSocket::bind(addr).await
net::UdpSocket::bind(addr).await
}
async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
@ -334,7 +285,7 @@ impl UdpSocket for udp::UdpSocket {
#[cfg(test)]
fn udp_stream_test(server_addr: IpAddr) {
use tokio::runtime::current_thread::Runtime;
use tokio::runtime;
use std::net::ToSocketAddrs;
let succeeded = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
@ -388,7 +339,7 @@ fn udp_stream_test(server_addr: IpAddr) {
.unwrap();
// setup the client, which is going to run on the testing thread...
let mut io_loop = Runtime::new().unwrap();
let mut io_loop = runtime::Runtime::new().unwrap();
// the tests should run within 5 seconds... right?
// TODO: add timeout here, so that test never hangs...
@ -398,11 +349,11 @@ fn udp_stream_test(server_addr: IpAddr) {
};
let socket = io_loop
.block_on(udp::UdpSocket::bind(
.block_on(net::UdpSocket::bind(
&client_addr.to_socket_addrs().unwrap().next().unwrap(),
))
.expect("could not create socket"); // some random address...
let (mut stream, sender) = UdpStream::<udp::UdpSocket>::with_bound(socket);
let (mut stream, sender) = UdpStream::<net::UdpSocket>::with_bound(socket);
//let mut stream: UdpStream = io_loop.block_on(stream).ok().unwrap();
for _ in 0..send_recv_times {

View File

@ -8,11 +8,11 @@
//! This module contains all the types for demuxing DNS oriented streams.
use std::pin::Pin;
use std::task::Context;
use std::task::{Context, Poll};
use futures::channel::mpsc::{unbounded, UnboundedReceiver};
use futures::stream::{Peekable, Stream, StreamExt};
use futures::{Future, FutureExt, Poll};
use futures::{Future, FutureExt};
use log::{debug, warn};
use crate::error::*;

View File

@ -13,17 +13,17 @@ use std::collections::HashMap;
use std::fmt::{self, Display};
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use std::task::{Context, Poll};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use futures::channel::oneshot;
use futures::stream::{Stream, StreamExt};
use futures::{ready, Future, FutureExt, Poll};
use futures::{ready, Future, FutureExt};
use log::{debug, warn};
use rand;
use rand::distributions::{Distribution, Standard};
use smallvec::SmallVec;
use tokio_timer::Delay;
use tokio::{self, time::Delay};
use crate::error::*;
use crate::op::{Message, MessageFinalizer, OpCode};
@ -350,7 +350,7 @@ where
}
// store a Timeout for this message before sending
let timeout = tokio_timer::delay(Instant::now() + self.timeout_duration);
let timeout = tokio::time::delay_for(self.timeout_duration);
let (complete, receiver) = oneshot::channel();

View File

@ -7,11 +7,11 @@
use std::fmt::{Debug, Display};
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::Context;
use std::task::{Context, Poll};
use futures::channel::mpsc::{TrySendError, UnboundedSender};
use futures::channel::oneshot::{self, Receiver, Sender};
use futures::{ready, Future, Poll, Stream};
use futures::{ready, Future, Stream};
use log::{debug, warn};
use crate::error::*;

View File

@ -8,9 +8,9 @@
//! `RetryDnsHandle` allows for DnsQueries to be reattempted on failure
use std::pin::Pin;
use std::task::Context;
use std::task::{Context, Poll};
use futures::{Future, FutureExt, Poll};
use futures::{Future, FutureExt};
use crate::error::ProtoError;
use crate::xfer::{DnsRequest, DnsResponse};

View File

@ -36,7 +36,7 @@ appveyor = { repository = "bluejekyll/trust-dns", branch = "master", service = "
codecov = { repository = "bluejekyll/trust-dns", branch = "master", service = "github" }
[features]
default = ["tokio"]
default = ["tokio-compat"]
dns-over-native-tls = ["dns-over-tls", "trust-dns-native-tls"]
# DNS over TLS with OpenSSL currently needs a good way to set default CAs, use rustls or native-tls
dns-over-openssl = ["dns-over-tls", "trust-dns-openssl"]
@ -56,35 +56,33 @@ serde-config = ["serde", "trust-dns-proto/serde-config"]
# enables experimental the mDNS (multicast) feature
mdns = ["trust-dns-proto/mdns"]
tokio-compat = ["tokio", "trust-dns-proto/tokio-compat"]
[lib]
name = "trust_dns_resolver"
path = "src/lib.rs"
[dependencies]
cfg-if = "0.1"
cfg-if = "0.1.9"
failure = "0.1"
futures-preview = "0.3.0-alpha"
lazy_static = "^1.0"
log = "^0.4.8"
lru-cache = "^0.1.2"
futures = "0.3.0"
lazy_static = "1.0"
log = "0.4.8"
lru-cache = "0.1.2"
resolv-conf = { version = "0.6.0", features = ["system"] }
rustls = {version = "0.16", optional = true}
serde = { version = "1.0", features = ["derive"], optional = true }
smallvec = "^1.0"
tokio = { version = "0.2.0-alpha", optional = true }
tokio-executor = "0.2.0-alpha"
tokio-net = "0.2.0-alpha"
smallvec = "1.0"
tokio = { version = "0.2.1", features = ["rt-core"], optional = true }
trust-dns-https = { version = "0.18.0-alpha", path = "../https", optional = true }
trust-dns-native-tls = { version = "0.18.0-alpha", path = "../native-tls", optional = true }
trust-dns-openssl = { version = "0.18.0-alpha", path = "../openssl", optional = true }
trust-dns-proto = { version = "0.18.0-alpha", path = "../proto" }
trust-dns-rustls = { version = "0.18.0-alpha", path = "../rustls", optional = true }
webpki-roots = { version = "^0.18", optional = true }
webpki-roots = { version = "0.18", optional = true }
[target.'cfg(windows)'.dependencies]
ipconfig = { version = "^0.2.0" }
ipconfig = { version = "0.2.0" }
[dev-dependencies]
env_logger = "^0.7"
tokio-io = "0.2.0-alpha"
tokio = "0.2.0-alpha"
env_logger = "0.7"

View File

@ -4,7 +4,6 @@
extern crate lazy_static;
extern crate futures;
extern crate tokio;
extern crate tokio_io;
extern crate trust_dns_resolver;
use std::io;
@ -37,7 +36,7 @@ lazy_static! {
// This thread will manage the actual resolution runtime
thread::spawn(move || {
// A runtime for this new thread
let mut runtime = tokio::runtime::current_thread::Runtime::new().expect("failed to launch Runtime");
let mut runtime = tokio::runtime::Runtime::new().expect("failed to launch Runtime");
// our platform independent future, result, see next blocks
let (resolver, bg) = {
@ -127,8 +126,7 @@ fn main() {
.iter()
.map(|name| {
let join = thread::spawn(move || {
let mut runtime = tokio::runtime::current_thread::Runtime::new()
.expect("failed to launch Runtime");
let mut runtime = tokio::runtime::Runtime::new().expect("failed to launch Runtime");
runtime.block_on(resolve(*name, 443))
});

View File

@ -15,8 +15,7 @@ fn main() {
env_logger::init();
// Set up the standard tokio runtime (multithreaded by default).
let runtime = Runtime::new().expect("Failed to create runtime");
let mut runtime = Runtime::new().expect("Failed to create runtime");
let (resolver, bg) = {
// To make this independent, if targeting macOS, BSD, Linux, or Windows, we can use the system's configuration:
#[cfg(any(unix, windows))]
@ -59,7 +58,4 @@ fn main() {
// Drop the resolver, which means that the runtime will become idle.
drop(resolver);
// Once we have finished using the runtime, we can ask it to shut down when it's done (this blocks).
runtime.shutdown_on_idle();
}

View File

@ -7,10 +7,10 @@
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::{Context, Poll};
use futures::lock::Mutex;
use futures::{channel::mpsc, future, ready, Future, FutureExt, Poll, StreamExt};
use futures::{channel::mpsc, future, ready, Future, FutureExt, StreamExt};
#[cfg(feature = "dnssec")]
use proto::SecureDnsHandle;
use proto::{

View File

@ -10,14 +10,14 @@ use std::fmt;
use std::net::IpAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::{Context, Poll};
use futures::{
self,
channel::{mpsc, oneshot},
future,
lock::Mutex,
Future, FutureExt, Poll, TryFutureExt,
Future, FutureExt, TryFutureExt,
};
use proto::error::ProtoResult;
use proto::rr::domain::TryParseIp;
@ -389,7 +389,7 @@ mod tests {
use std::net::*;
use std::str::FromStr;
use self::tokio::runtime::current_thread::Runtime;
use self::tokio::runtime::Runtime;
use proto::xfer::DnsRequest;
use crate::config::{LookupIpStrategy, NameServerConfig};
@ -420,7 +420,7 @@ mod tests {
}
fn lookup_test(config: ResolverConfig) {
env_logger::try_init().ok();
//env_logger::try_init().ok();
let mut io_loop = Runtime::new().unwrap();
let (resolver, bg) = AsyncResolver::new(config, ResolverOpts::default());
@ -462,7 +462,7 @@ mod tests {
#[test]
fn test_ip_lookup() {
env_logger::try_init().ok();
//env_logger::try_init().ok();
let mut io_loop = Runtime::new().unwrap();
let (resolver, bg) = AsyncResolver::new(ResolverConfig::default(), ResolverOpts::default());
@ -762,7 +762,7 @@ mod tests {
#[test]
fn test_domain_search() {
env_logger::try_init().ok();
//env_logger::try_init().ok();
// domain is good now, should be combined with the name to form www.example.com
let domain = Name::from_str("example.com.").unwrap();
@ -801,7 +801,7 @@ mod tests {
#[test]
fn test_search_list() {
env_logger::try_init().ok();
//env_logger::try_init().ok();
let domain = Name::from_str("incorrect.example.com.").unwrap();
let search = vec![

View File

@ -143,7 +143,7 @@ mod tests {
#![allow(clippy::dbg_macro, clippy::print_stdout)]
use std::str::FromStr;
use tokio::runtime::current_thread::Runtime;
use tokio::runtime::Runtime;
use crate::config::*;

View File

@ -49,13 +49,13 @@ mod tests {
extern crate env_logger;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use tokio::runtime::current_thread::Runtime;
use tokio::runtime::Runtime;
use crate::config::{ResolverConfig, ResolverOpts};
use crate::AsyncResolver;
fn https_test(config: ResolverConfig) {
env_logger::try_init().ok();
//env_logger::try_init().ok();
let mut io_loop = Runtime::new().unwrap();
let (resolver, bg) = AsyncResolver::new(config, ResolverOpts::default());

View File

@ -93,7 +93,7 @@
//! # extern crate trust_dns_resolver;
//! # fn main() {
//! use std::net::*;
//! use tokio::runtime::current_thread::Runtime;
//! use tokio::runtime::Runtime;
//! use trust_dns_resolver::AsyncResolver;
//! use trust_dns_resolver::config::*;
//!
@ -205,10 +205,7 @@ extern crate resolv_conf;
#[macro_use]
extern crate serde;
extern crate smallvec;
#[cfg(feature = "tokio")]
extern crate tokio;
extern crate tokio_executor;
extern crate tokio_net;
#[cfg(feature = "dns-over-https")]
extern crate trust_dns_https;
#[cfg(feature = "dns-over-native-tls")]

View File

@ -12,11 +12,11 @@ use std::net::{Ipv4Addr, Ipv6Addr};
use std::pin::Pin;
use std::slice::Iter;
use std::sync::Arc;
use std::task::Context;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::vec::IntoIter;
use futures::{future, Future, FutureExt, Poll};
use futures::{future, Future, FutureExt};
use proto::error::ProtoError;
use proto::op::Query;

View File

@ -12,11 +12,11 @@
use std::net::IpAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::{Context, Poll};
use std::time::Instant;
use failure::Fail;
use futures::{future, future::Either, Future, FutureExt, Poll};
use futures::{future, future::Either, Future, FutureExt};
use proto::op::Query;
use proto::rr::{Name, RData, Record, RecordType};

View File

@ -8,13 +8,13 @@
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::Context;
use std::task::{Context, Poll};
use std::time::Duration;
use futures::{Future, FutureExt, Poll, TryFutureExt};
use tokio_executor::{DefaultExecutor, Executor};
use tokio_net::tcp::TcpStream as TokioTcpStream;
use tokio_net::udp::UdpSocket as TokioUdpSocket;
use futures::{Future, FutureExt, TryFutureExt};
use tokio;
use tokio::net::TcpStream as TokioTcpStream;
use tokio::net::UdpSocket as TokioUdpSocket;
use proto;
#[cfg(feature = "mdns")]
@ -152,7 +152,7 @@ impl ConnectionHandleConnect {
.map(|_| ());
let handle = BufDnsRequestStreamHandle::new(handle);
DefaultExecutor::current().spawn(stream.boxed())?;
tokio::spawn(stream.boxed());
Ok(ConnectionHandleConnected::Udp(handle))
}
Tcp {
@ -178,7 +178,7 @@ impl ConnectionHandleConnect {
.map(|_| ());
let handle = BufDnsRequestStreamHandle::new(handle);
DefaultExecutor::current().spawn(stream.boxed())?;
tokio::spawn(stream.boxed());
Ok(ConnectionHandleConnected::Tcp(handle))
}
#[cfg(feature = "dns-over-tls")]
@ -211,7 +211,7 @@ impl ConnectionHandleConnect {
.map(|_| ());
let handle = BufDnsRequestStreamHandle::new(handle);
DefaultExecutor::current().spawn(Box::pin(stream))?;
tokio::spawn(Box::pin(stream));
Ok(ConnectionHandleConnected::Tcp(handle))
}
#[cfg(feature = "dns-over-https")]
@ -232,7 +232,7 @@ impl ConnectionHandleConnect {
})
.map(|_| ());
DefaultExecutor::current().spawn(Box::pin(stream))?;
tokio::spawn(Box::pin(stream));
Ok(ConnectionHandleConnected::Https(handle))
}
#[cfg(feature = "mdns")]
@ -259,7 +259,7 @@ impl ConnectionHandleConnect {
.map(|_| ());
let handle = BufDnsRequestStreamHandle::new(handle);
DefaultExecutor::current().spawn(Box::pin(stream))?;
tokio::spawn(Box::pin(stream));
Ok(ConnectionHandleConnected::Tcp(handle))
}
}

View File

@ -245,7 +245,7 @@ mod tests {
use std::time::Duration;
use futures::{future, FutureExt};
use tokio::runtime::current_thread::Runtime;
use tokio::runtime::Runtime;
use proto::op::{Query, ResponseCode};
use proto::rr::{Name, RecordType};
@ -256,7 +256,7 @@ mod tests {
#[test]
fn test_name_server() {
env_logger::try_init().ok();
//env_logger::try_init().ok();
let config = NameServerConfig {
socket_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 53),

View File

@ -7,10 +7,10 @@
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::{Context, Poll};
use futures::lock::Mutex;
use futures::{future, Future, Poll, TryFutureExt};
use futures::{future, Future, TryFutureExt};
use smallvec::SmallVec;
use proto::error::ProtoError;
@ -336,7 +336,7 @@ mod tests {
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use tokio::runtime::current_thread::Runtime;
use tokio::runtime::Runtime;
use proto::op::Query;
use proto::rr::{Name, RecordType};

View File

@ -73,10 +73,7 @@ impl Resolver {
///
/// A new `Resolver` or an error if there was an error with the configuration.
pub fn new(config: ResolverConfig, options: ResolverOpts) -> io::Result<Self> {
let mut builder = runtime::Builder::new();
builder.core_threads(1);
let runtime = builder.build()?;
let runtime = runtime::Runtime::new()?;
let (async_resolver, bg) = AsyncResolver::new(config, options);
runtime.spawn(bg);

View File

@ -29,13 +29,13 @@ mod tests {
extern crate env_logger;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use tokio::runtime::current_thread::Runtime;
use tokio::runtime::Runtime;
use crate::config::{ResolverConfig, ResolverOpts};
use crate::AsyncResolver;
fn tls_test(config: ResolverConfig) {
env_logger::try_init().ok();
//env_logger::try_init().ok();
let mut io_loop = Runtime::new().unwrap();
let (resolver, bg) = AsyncResolver::new(config, ResolverOpts::default());

View File

@ -45,16 +45,14 @@ name = "trust_dns_rustls"
path = "src/lib.rs"
[dependencies]
futures-preview = "0.3.0-alpha"
futures = "0.3.0"
log = "0.4.8"
rustls = "0.16"
tokio-io = "0.2.0-alpha"
tokio-rustls = { version = "0.12.0-alpha.8", features = ["early-data"] }
tokio-net = "0.2.0-alpha"
tokio = { version = "0.2.1", features = ["tcp", "io-util"] }
tokio-rustls = { version = "0.12.0", features = ["early-data"] }
# disables default features, i.e. openssl...
trust-dns-proto = { version = "0.18.0-alpha", path = "../proto", features = ["tokio-compat"], default-features = false }
webpki = "0.21"
[dev-dependencies]
openssl = { version = "0.10", features = ["v102", "v110"] }
tokio = "0.2.0-alpha"

View File

@ -25,10 +25,7 @@
extern crate futures;
extern crate rustls;
#[cfg(test)]
extern crate tokio;
extern crate tokio_io;
extern crate tokio_net;
extern crate tokio_rustls;
extern crate trust_dns_proto;
extern crate webpki;

View File

@ -29,7 +29,7 @@ use self::openssl::x509::*;
use futures::StreamExt;
use rustls::Certificate;
use rustls::ClientConfig;
use tokio::runtime::current_thread::Runtime;
use tokio::runtime::Runtime;
use trust_dns_proto::xfer::SerialMessage;

View File

@ -13,7 +13,7 @@ use std::sync::Arc;
use futures::{Future, TryFutureExt};
use rustls::ClientConfig;
use tokio_net::tcp::TcpStream as TokioTcpStream;
use tokio::net::TcpStream as TokioTcpStream;
use trust_dns_proto::error::ProtoError;
use trust_dns_proto::tcp::TcpClientStream;

View File

@ -15,8 +15,8 @@ use std::sync::Arc;
use futures::channel::mpsc::{unbounded, UnboundedReceiver};
use futures::{Future, TryFutureExt};
use rustls::ClientConfig;
use tokio_io;
use tokio_net::tcp::TcpStream as TokioTcpStream;
use tokio;
use tokio::net::TcpStream as TokioTcpStream;
use tokio_rustls::TlsConnector;
use webpki::{DNSName, DNSNameRef};
@ -35,7 +35,7 @@ pub type TlsStream<S> = TcpStream<S>;
/// Initializes a TlsStream with an existing tokio_tls::TlsStream.
///
/// This is intended for use with a TlsListener and Incoming connections
pub fn tls_from_stream<S: tokio_io::AsyncRead + tokio_io::AsyncWrite>(
pub fn tls_from_stream<S: tokio::io::AsyncRead + tokio::io::AsyncWrite>(
stream: S,
peer_addr: SocketAddr,
) -> (TlsStream<S>, BufStreamHandle) {

View File

@ -70,27 +70,23 @@ name = "trust_dns_server"
path = "src/lib.rs"
[dependencies]
bytes = "0.4.9"
bytes = "0.5"
chrono = "0.4"
enum-as-inner = "0.3"
env_logger = "0.7"
failure = "0.1"
futures-preview = "0.3.0-alpha"
h2 = { version = "0.2.0-alpha", optional = true }
http = { version = "0.1", optional = true }
futures = "0.3.0"
h2 = { version = "0.2.0", features = ["stream"], optional = true }
http = { version = "0.2", optional = true }
log = "0.4.8"
openssl = { version = "0.10", features = ["v102", "v110"], optional = true }
rusqlite = { version = "0.20.0", features = ["bundled"], optional = true }
rustls = { version = "0.16", optional = true }
serde = { version = "1.0.102", features = ["derive"] }
time = "0.1"
tokio = "0.2.0-alpha"
tokio-executor = "0.2.0-alpha"
tokio-io = "0.2.0-alpha"
tokio-net = "0.2.0-alpha"
tokio-openssl = { version = "0.4.0-alpha", optional = true }
tokio-rustls = { version = "0.12.0-alpha.8", optional = true }
tokio-timer = "0.3.0-alpha"
tokio = { version = "0.2.1", features = ["stream", "tcp", "udp"] }
tokio-openssl = { version = "0.4.0", optional = true }
tokio-rustls = { version = "0.12.0", optional = true }
toml = "0.5"
trust-dns-client= { version = "0.18.0-alpha", path = "../client" }
trust-dns-https = { version = "0.18.0-alpha", path = "../https", optional = true }

View File

@ -8,9 +8,9 @@
//! All authority related types
use std::pin::Pin;
use std::task::Context;
use std::task::{Context, Poll};
use futures::{future, Future, Poll, TryFutureExt};
use futures::{future, Future, TryFutureExt};
use trust_dns_client::op::LowerQuery;
use trust_dns_client::proto::rr::dnssec::rdata::key::KEY;

View File

@ -21,9 +21,9 @@ use std::collections::HashMap;
use std::io;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::task::Context;
use std::task::{Context, Poll};
use futures::{ready, Future, FutureExt, Poll, TryFutureExt};
use futures::{ready, Future, FutureExt, TryFutureExt};
use trust_dns_client::op::{Edns, Header, LowerQuery, MessageType, OpCode, ResponseCode};
use trust_dns_client::rr::dnssec::{Algorithm, SupportedAlgorithms};

View File

@ -55,14 +55,10 @@ extern crate openssl;
extern crate rustls;
extern crate time;
extern crate tokio;
extern crate tokio_executor;
extern crate tokio_io;
extern crate tokio_net;
#[cfg(feature = "dns-over-openssl")]
extern crate tokio_openssl;
#[cfg(feature = "dns-over-rustls")]
extern crate tokio_rustls;
extern crate tokio_timer;
extern crate toml;
extern crate trust_dns_client;
#[cfg(feature = "dns-over-https")]

View File

@ -9,10 +9,10 @@ use std::io;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use h2::server;
use proto::serialize::binary::BinDecodable;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncRead, AsyncWrite};
use trust_dns_https::https_server;
use crate::authority::{MessageRequest, MessageResponse};
@ -66,7 +66,7 @@ pub async fn h2_handler<T, I>(
}
async fn handle_request<T>(
bytes: Bytes,
bytes: BytesMut,
src_addr: SocketAddr,
handler: Arc<Mutex<T>>,
responder: HttpsResponseHandle,
@ -87,7 +87,7 @@ async fn handle_request<T>(
}
#[derive(Clone)]
struct HttpsResponseHandle(Arc<Mutex<::h2::server::SendResponse<::bytes::Bytes>>>);
struct HttpsResponseHandle(Arc<Mutex<::h2::server::SendResponse<Bytes>>>);
impl ResponseHandler for HttpsResponseHandle {
fn send_response(&self, response: MessageResponse) -> io::Result<()> {

View File

@ -9,17 +9,18 @@ use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::Context;
use std::task::{Context, Poll};
use std::time::Duration;
use futures::{future, Future, FutureExt, Poll, StreamExt};
use futures::{future, Future, FutureExt, StreamExt};
#[cfg(feature = "dns-over-rustls")]
use rustls::{Certificate, PrivateKey};
use tokio_executor;
use tokio_net::driver::Handle;
use tokio_net::{tcp, udp};
use tokio::net;
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;
use proto::error::ProtoError;
use proto::op::Edns;
use proto::serialize::binary::{BinDecodable, BinDecoder};
use proto::tcp::TcpStream;
@ -37,6 +38,7 @@ use crate::server::{Request, RequestHandler, ResponseHandle, ResponseHandler, Ti
/// A Futures based implementation of a DNS server
pub struct ServerFuture<T: RequestHandler> {
handler: Arc<Mutex<T>>,
joins: Vec<JoinHandle<Result<(), ProtoError>>>,
}
impl<T: RequestHandler> ServerFuture<T> {
@ -44,38 +46,54 @@ impl<T: RequestHandler> ServerFuture<T> {
pub fn new(handler: T) -> ServerFuture<T> {
ServerFuture {
handler: Arc::new(Mutex::new(handler)),
joins: vec![],
}
}
/// Register a UDP socket. Should be bound before calling this function.
pub fn register_socket(&self, socket: udp::UdpSocket) {
debug!("registered udp: {:?}", socket);
pub fn register_socket(&mut self, socket: net::UdpSocket, runtime: &Runtime) {
debug!("registering udp: {:?}", socket);
let spawner = runtime.handle().clone();
// create the new UdpStream
let (buf_stream, stream_handle) = UdpStream::with_bound(socket);
let (mut buf_stream, stream_handle) = UdpStream::with_bound(socket);
//let request_stream = RequestStream::new(buf_stream, stream_handle);
let handler = self.handler.clone();
// this spawns a ForEach future which handles all the requests into a Handler.
tokio_executor::spawn(
buf_stream
.map(|r: Result<_, _>| match r {
Ok(t) => t,
Err(e) => panic!("error in UDP request_stream handler: {}", e),
})
// TODO: use for_each_concurrent
.for_each(move |message| {
let src_addr = message.addr();
debug!("received udp request from: {}", src_addr);
self::handle_raw_request(message, handler.clone(), stream_handle.clone())
}),
);
let join_handle = runtime.spawn(async move {
while let Some(message) = buf_stream.next().await {
let message = match message {
Err(e) => {
warn!("error receiving message on udp_socket: {}", e);
continue;
}
Ok(message) => message,
};
let src_addr = message.addr();
debug!("received udp request from: {}", src_addr);
let handler = handler.clone();
let stream_handle = stream_handle.clone();
spawner.spawn(async move {
self::handle_raw_request(message, handler, stream_handle).await;
});
}
// TODO: let's consider capturing all the initial configuration details so that the socket could be recreated...
Err(ProtoError::from("unexpected close of UDP socket"))
});
self.joins.push(join_handle);
}
/// Register a UDP socket. Should be bound before calling this function.
pub fn register_socket_std(&self, socket: std::net::UdpSocket) {
pub fn register_socket_std(&mut self, socket: std::net::UdpSocket, runtime: &Runtime) {
self.register_socket(
udp::UdpSocket::from_std(socket, &Handle::default()).expect("bad handle?"),
net::UdpSocket::from_std(socket).expect("bad handle?"),
runtime,
)
}
@ -92,54 +110,65 @@ impl<T: RequestHandler> ServerFuture<T> {
/// possible to create long-lived queries, but these should be from trusted sources
/// only, this would require some type of whitelisting.
pub fn register_listener(
&self,
listener: tcp::TcpListener,
&mut self,
listener: net::TcpListener,
timeout: Duration,
runtime: &Runtime,
) -> io::Result<()> {
debug!("register tcp: {:?}", listener);
let spawner = runtime.handle().clone();
let handler = self.handler.clone();
debug!("registered tcp: {:?}", listener);
// for each incoming request...
tokio_executor::spawn(listener.incoming().for_each(move |tcp_stream| {
let tcp_stream = match tcp_stream {
Ok(t) => t,
Err(e) => {
debug!("error receiving TCP request_stream error: {}", e);
let join = runtime.spawn(async move {
let mut listener = listener;
let mut incoming = listener.incoming();
return future::ready(());
}
};
while let Some(tcp_stream) = incoming.next().await {
let tcp_stream = match tcp_stream {
Ok(t) => t,
Err(e) => {
debug!("error receiving TCP tcp_stream error: {}", e);
continue;
}
};
let src_addr = tcp_stream.peer_addr().unwrap();
debug!("accepted request from: {}", src_addr);
// take the created stream...
let (buf_stream, stream_handle) = TcpStream::from_stream(tcp_stream, src_addr);
let timeout_stream = TimeoutStream::new(buf_stream, timeout);
//let request_stream = RequestStream::new(timeout_stream, stream_handle);
let handler = handler.clone();
let handler = handler.clone();
// and spawn to the io_loop
tokio_executor::spawn(timeout_stream.for_each(move |message| {
message
.map(|message| {
Box::pin(self::handle_raw_request(
message,
handler.clone(),
stream_handle.clone(),
)) as Pin<Box<dyn Future<Output = ()> + Send>>
})
.unwrap_or_else(|e| {
debug!(
"error in TCP request_stream src: {:?} error: {}",
src_addr, e
);
Box::pin(future::ready(())) as Pin<Box<dyn Future<Output = ()> + Send>>
})
}));
// and spawn to the io_loop
spawner.spawn(async move {
let src_addr = tcp_stream.peer_addr().unwrap();
debug!("accepted request from: {}", src_addr);
// take the created stream...
let (buf_stream, stream_handle) = TcpStream::from_stream(tcp_stream, src_addr);
let mut timeout_stream = TimeoutStream::new(buf_stream, timeout);
//let request_stream = RequestStream::new(timeout_stream, stream_handle);
future::ready(())
}));
while let Some(message) = timeout_stream.next().await {
let message = match message {
Ok(message) => message,
Err(e) => {
debug!(
"error in TCP request_stream src: {} error: {}",
src_addr, e
);
// we're going to bail on this connection...
return;
}
};
// we don't spawn here to limit clients from getting too many resources
self::handle_raw_request(message, handler.clone(), stream_handle.clone())
.await;
}
});
}
Err(ProtoError::from("unexpected close of TCP socket"))
});
self.joins.push(join);
Ok(())
}
@ -156,14 +185,12 @@ impl<T: RequestHandler> ServerFuture<T> {
/// possible to create long-lived queries, but these should be from trusted sources
/// only, this would require some type of whitelisting.
pub fn register_listener_std(
&self,
&mut self,
listener: std::net::TcpListener,
timeout: Duration,
runtime: &Runtime,
) -> io::Result<()> {
self.register_listener(
tcp::TcpListener::from_std(listener, &Handle::default())?,
timeout,
)
self.register_listener(net::TcpListener::from_std(listener)?, timeout, runtime)
}
/// Register a TlsListener to the Server. The TlsListener should already be bound to either an
@ -181,81 +208,81 @@ impl<T: RequestHandler> ServerFuture<T> {
/// * `pkcs12` - certificate used to announce to clients
#[cfg(all(feature = "dns-over-openssl", not(feature = "dns-over-rustls")))]
pub fn register_tls_listener(
&self,
listener: tcp::TcpListener,
&mut self,
listener: net::TcpListener,
timeout: Duration,
certificate_and_key: ((X509, Option<Stack<X509>>), PKey<Private>),
runtime: &Runtime,
) -> io::Result<()> {
use futures::{TryFutureExt, TryStreamExt};
use trust_dns_openssl::{tls_server, TlsStream};
let ((cert, chain), key) = certificate_and_key;
let spawner = runtime.handle().clone();
let handler = self.handler.clone();
debug!("registered tcp: {:?}", listener);
let tls_acceptor = Box::pin(tls_server::new_acceptor(cert, chain, key)?);
// for each incoming request...
tokio_executor::spawn(
listener
.incoming()
.try_for_each(move |tcp_stream| {
let join = runtime.spawn(async move {
let mut listener = listener;
let mut incoming = listener.incoming();
while let Some(tcp_stream) = incoming.next().await {
let tcp_stream = match tcp_stream {
Ok(t) => t,
Err(e) => {
debug!("error receiving TLS tcp_stream error: {}", e);
continue;
}
};
let handler = handler.clone();
let tls_acceptor = tls_acceptor.clone();
// kick out to a different task immediately, let them do the TLS handshake
spawner.spawn(async move {
let src_addr = tcp_stream.peer_addr().unwrap();
debug!("accepted request from: {}", src_addr);
let handler = handler.clone();
// TODO: can this clone be gotten rid of? isn't this what Pin is for?
let tls_acceptor = tls_acceptor.clone();
debug!("starting TLS request from: {}", src_addr);
async move {
// take the created stream...
tokio_openssl::accept(&*tls_acceptor, tcp_stream)
.map_err(|e| {
io::Error::new(
io::ErrorKind::ConnectionRefused,
format!("tls error: {}", e),
)
})
.map_ok(move |tls_stream| {
let (buf_stream, stream_handle) =
TlsStream::from_stream(tls_stream, src_addr);
let timeout_stream = TimeoutStream::new(buf_stream, timeout);
//let request_stream = RequestStream::new(timeout_stream, stream_handle);
let handler = handler.clone();
// perform the TLS
let tls_stream = tokio_openssl::accept(&*tls_acceptor, tcp_stream).await;
// and spawn to the io_loop
tokio_executor::spawn(
timeout_stream
.map_err(move |e| {
debug!(
"error in TLS request_stream src: {:?} error: {}",
src_addr, e
)
})
.try_for_each(move |message| {
self::handle_raw_request(
message,
handler.clone(),
stream_handle.clone(),
)
.map(|_: ()| Ok(()))
})
.map_err(move |_| {
debug!(
"error in TLS request_stream src: {:?}",
src_addr
)
})
.map(|_: Result<(), ()>| ()),
let tls_stream = match tls_stream {
Ok(tls_stream) => tls_stream,
Err(e) => {
debug!("tls handshake src: {} error: {}", src_addr, e);
return ();
}
};
debug!("accepted TLS request from: {}", src_addr);
let (buf_stream, stream_handle) = TlsStream::from_stream(tls_stream, src_addr);
let mut timeout_stream = TimeoutStream::new(buf_stream, timeout);
while let Some(message) = timeout_stream.next().await {
let message = match message {
Ok(message) => message,
Err(e) => {
debug!(
"error in TLS request_stream src: {:?} error: {}",
src_addr, e
);
()
})
.await
// kill this connection
return ();
}
};
self::handle_raw_request(message, handler.clone(), stream_handle.clone())
.await;
}
})
.map_err(|e| panic!("error in inbound tls_stream: {}", e))
.map(|_: Result<(), ()>| ()),
);
});
}
Err(ProtoError::from("unexpected close of TLS socket"))
});
self.joins.push(join);
Ok(())
}
@ -275,15 +302,17 @@ impl<T: RequestHandler> ServerFuture<T> {
/// * `pkcs12` - certificate used to announce to clients
#[cfg(all(feature = "dns-over-openssl", not(feature = "dns-over-rustls")))]
pub fn register_tls_listener_std(
&self,
&mut self,
listener: std::net::TcpListener,
timeout: Duration,
certificate_and_key: ((X509, Option<Stack<X509>>), PKey<Private>),
runtime: &Runtime,
) -> io::Result<()> {
self.register_tls_listener(
tcp::TcpListener::from_std(listener, &Handle::default())?,
net::TcpListener::from_std(listener)?,
timeout,
certificate_and_key,
runtime,
)
}
@ -302,15 +331,16 @@ impl<T: RequestHandler> ServerFuture<T> {
/// * `pkcs12` - certificate used to announce to clients
#[cfg(feature = "dns-over-rustls")]
pub fn register_tls_listener(
&self,
listener: tcp::TcpListener,
&mut self,
listener: net::TcpListener,
timeout: Duration,
certificate_and_key: (Vec<Certificate>, PrivateKey),
runtime: &Runtime,
) -> io::Result<()> {
use futures::{TryFutureExt, TryStreamExt};
use tokio_rustls::TlsAcceptor;
use trust_dns_rustls::{tls_from_stream, tls_server};
let spawner = runtime.handle().clone();
let handler = self.handler.clone();
debug!("registered tcp: {:?}", listener);
@ -325,51 +355,64 @@ impl<T: RequestHandler> ServerFuture<T> {
let tls_acceptor = TlsAcceptor::from(Arc::new(tls_acceptor));
// for each incoming request...
tokio_executor::spawn(
listener
.incoming()
// TODO: use for_each_concurrent
.try_for_each(move |tcp_stream| {
let join = runtime.spawn(async move {
let mut listener = listener;
let mut incoming = listener.incoming();
while let Some(tcp_stream) = incoming.next().await {
let tcp_stream = match tcp_stream {
Ok(t) => t,
Err(e) => {
debug!("error receiving TLS tcp_stream error: {}", e);
continue;
}
};
let handler = handler.clone();
let tls_acceptor = tls_acceptor.clone();
// kick out to a different task immediately, let them do the TLS handshake
spawner.spawn(async move {
let src_addr = tcp_stream.peer_addr().unwrap();
debug!("accepted request from: {}", src_addr);
let handler = handler.clone();
debug!("starting TLS request from: {}", src_addr);
// TODO: need to consider timeout of total connect...
// take the created stream...
tls_acceptor
.accept(tcp_stream)
.map_ok(move |tls_stream| {
let (buf_stream, stream_handle) = tls_from_stream(tls_stream, src_addr);
let timeout_stream = TimeoutStream::new(buf_stream, timeout);
//let request_stream = RequestStream::new(timeout_stream, stream_handle);
let handler = handler.clone();
// perform the TLS
let tls_stream = tls_acceptor.accept(tcp_stream).await;
// and spawn to the io_loop
tokio_executor::spawn(
timeout_stream
.try_for_each(move |message| {
self::handle_raw_request(
message,
handler.clone(),
stream_handle.clone(),
)
.map(|_: ()| Ok(()))
})
.map_err(move |e| {
debug!(
"error in TLS request_stream src {}: {}",
src_addr, e
)
})
.map(|_: Result<(), ()>| ()),
);
})
.map_err(move |e| debug!("error handling TLS stream {}: {}", src_addr, e))
.map(|_: Result<(), ()>| Ok(()))
})
.map_err(|_| panic!("error in inbound tls_stream"))
.map(|_: Result<(), ()>| ()),
);
let tls_stream = match tls_stream {
Ok(tls_stream) => tls_stream,
Err(e) => {
debug!("tls handshake src: {} error: {}", src_addr, e);
return;
}
};
debug!("accepted TLS request from: {}", src_addr);
let (buf_stream, stream_handle) = tls_from_stream(tls_stream, src_addr);
let mut timeout_stream = TimeoutStream::new(buf_stream, timeout);
while let Some(message) = timeout_stream.next().await {
let message = match message {
Ok(message) => message,
Err(e) => {
debug!(
"error in TLS request_stream src: {:?} error: {}",
src_addr, e
);
// kill this connection
return;
}
};
self::handle_raw_request(message, handler.clone(), stream_handle.clone())
.await;
}
});
}
Err(ProtoError::from("unexpected close of TLS socket"))
});
self.joins.push(join);
Ok(())
}
@ -396,6 +439,7 @@ impl<T: RequestHandler> ServerFuture<T> {
listener: tcp::TcpListener,
timeout: Duration,
pkcs12: ParsedPkcs12,
runtime: &Runtime,
) -> io::Result<()> {
unimplemented!("openssl based `dns-over-https` not yet supported. see the `dns-over-https-rustls` feature")
}
@ -415,19 +459,20 @@ impl<T: RequestHandler> ServerFuture<T> {
/// * `pkcs12` - certificate used to announce to clients
#[cfg(feature = "dns-over-https-rustls")]
pub fn register_https_listener(
&self,
listener: tcp::TcpListener,
&mut self,
listener: net::TcpListener,
// TODO: need to set a timeout between requests.
_timeout: Duration,
certificate_and_key: (Vec<Certificate>, PrivateKey),
dns_hostname: String,
runtime: &Runtime,
) -> io::Result<()> {
use futures::{TryFutureExt, TryStreamExt};
use tokio_rustls::TlsAcceptor;
use crate::server::https_handler::h2_handler;
use trust_dns_rustls::tls_server;
let spawner = runtime.handle().clone();
let dns_hostname = Arc::new(dns_hostname);
let handler = self.handler.clone();
debug!("registered tcp: {:?}", listener);
@ -443,33 +488,59 @@ impl<T: RequestHandler> ServerFuture<T> {
// for each incoming request...
let dns_hostname = dns_hostname;
tokio_executor::spawn({
let join = runtime.spawn(async move {
let mut listener = listener;
let mut incoming = listener.incoming();
let dns_hostname = dns_hostname;
listener
.incoming()
.map_err(|e| warn!("error in inbound https_stream: {}", e))
.try_for_each(move |tcp_stream| {
while let Some(tcp_stream) = incoming.next().await {
let tcp_stream = match tcp_stream {
Ok(t) => t,
Err(e) => {
debug!("error receiving HTTPS tcp_stream error: {}", e);
continue;
}
};
let handler = handler.clone();
let tls_acceptor = tls_acceptor.clone();
let dns_hostname = dns_hostname.clone();
spawner.spawn(async move {
let src_addr = tcp_stream.peer_addr().unwrap();
debug!("accepted request from: {}", src_addr);
let handler = handler.clone();
let dns_hostname = dns_hostname.clone();
debug!("starting HTTPS request from: {}", src_addr);
// TODO: need to consider timeout of total connect...
// take the created stream...
tls_acceptor
.accept(tcp_stream)
.map_err(move |e| debug!("error handling HTTPS stream {}: {}", src_addr, e))
.and_then(move |tls_stream| {
h2_handler(handler, tls_stream, src_addr, dns_hostname).unit_error()
})
.map(|_: Result<(), ()>| Ok(()))
})
.map(|_: Result<(), ()>| ())
let tls_stream = tls_acceptor.accept(tcp_stream).await;
let tls_stream = match tls_stream {
Ok(tls_stream) => tls_stream,
Err(e) => {
debug!("https handshake src: {} error: {}", src_addr, e);
return;
}
};
debug!("accepted HTTPS request from: {}", src_addr);
h2_handler(handler, tls_stream, src_addr, dns_hostname).await;
});
}
Err(ProtoError::from("unexpected close of HTTPS socket"))
});
self.joins.push(join);
Ok(())
}
/// This will run until all background tasks of the trust_dns_server end.
pub async fn block_until_done(self) -> Result<(), ProtoError> {
let (result, _, _) = future::select_all(self.joins).await;
result.map_err(|e| ProtoError::from(format!("Internal error in spawn: {}", e)))?
}
}
pub(crate) fn handle_raw_request<T: RequestHandler>(
@ -532,7 +603,6 @@ pub(crate) enum HandleRawRequest<F: Future<Output = ()>> {
Result(io::Error),
}
// TODO: output ()
impl<F: Future<Output = ()> + Unpin> Future for HandleRawRequest<F> {
type Output = ();

View File

@ -1,11 +1,11 @@
use std::io;
use std::mem;
use std::pin::Pin;
use std::task::Context;
use std::time::{Duration, Instant};
use std::task::{Context, Poll};
use std::time::Duration;
use futures::{FutureExt, Poll, Stream, StreamExt};
use tokio_timer::Delay;
use futures::{FutureExt, Stream, StreamExt};
use tokio::time::Delay;
/// This wraps the underlying Stream in a timeout.
///
@ -25,19 +25,16 @@ impl<S> TimeoutStream<S> {
/// * `timeout_duration` - timeout between each request, once exceed the connection is killed
/// * `reactor_handle` - reactor used for registering new timeouts
pub fn new(stream: S, timeout_duration: Duration) -> Self {
// store a Timeout for this message before sending
let timeout = Self::timeout(timeout_duration);
TimeoutStream {
stream,
timeout_duration,
timeout,
timeout: None,
}
}
fn timeout(timeout_duration: Duration) -> Option<Delay> {
if timeout_duration > Duration::from_millis(0) {
Some(tokio_timer::delay(Instant::now() + timeout_duration))
Some(tokio::time::delay_for(timeout_duration))
} else {
None
}
@ -52,6 +49,12 @@ where
// somehow insert a timeout here...
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
// if the timer isn't set, set one now
if self.timeout.is_none() {
let timeout = Self::timeout(self.timeout_duration);
mem::replace(&mut self.timeout, timeout);
}
match self.stream.poll_next_unpin(cx) {
r @ Poll::Ready(_) => {
// reset the timeout to wait for the next request...

View File

@ -6,9 +6,9 @@
// copied, modified, or distributed except according to those terms.
use std::pin::Pin;
use std::task::Context;
use std::task::{Context, Poll};
use futures::{Future, FutureExt, Poll};
use futures::{Future, FutureExt};
use trust_dns_client::op::LowerQuery;
use trust_dns_client::op::ResponseCode;

View File

@ -5,13 +5,12 @@ extern crate trust_dns_server;
use std::io;
use std::pin::Pin;
use std::task::Context;
use std::task::{Context, Poll};
use std::time::Duration;
#[allow(deprecated)]
use futures::stream::{iter, Stream, StreamExt, TryStreamExt};
use futures::Poll;
use tokio::runtime::current_thread::Runtime;
use tokio::runtime::Runtime;
use trust_dns_server::server::TimeoutStream;

View File

@ -45,10 +45,10 @@ name = "trust_dns_compatibility"
path = "src/lib.rs"
[dependencies]
chrono = "^0.4"
chrono = "0.4"
data-encoding = "2.1.0"
env_logger = "0.7"
futures-preview = "0.3.0-alpha"
futures = "0.3.0"
openssl = { version = "0.10", features = ["v102", "v110"] }
rand = "0.7"
trust-dns-client= { version = "0.18.0-alpha", path="../../crates/client", features = ["dnssec-openssl"] }

View File

@ -71,14 +71,12 @@ chrono = "0.4"
env_logger = "0.7"
lazy_static = "1.0"
log = "0.4.8"
futures-preview = "0.3.0-alpha"
openssl = { version = "^0.10", features = ["v102", "v110"] }
futures = "0.3.0"
openssl = { version = "0.10", features = ["v102", "v110"] }
rand = "0.7"
rusqlite = { version = "0.20.0", features = ["bundled"] }
rustls = { version = "0.16" }
tokio = "0.2.0-alpha"
tokio-net = "0.2.0-alpha"
tokio-timer = "0.3.0-alpha"
tokio = { version = "0.2.1", features = ["time", "rt-core"] }
trust-dns-client= { version = "0.18.0-alpha", path = "../../crates/client" }
trust-dns-https = { version = "0.18.0-alpha", path = "../../crates/https" }
trust-dns-openssl = { version = "0.18.0-alpha", path = "../../crates/openssl" }

View File

@ -6,7 +6,6 @@ extern crate futures;
extern crate openssl;
extern crate rustls;
extern crate tokio;
extern crate tokio_timer;
extern crate trust_dns_client;
extern crate trust_dns_proto;
extern crate trust_dns_rustls;
@ -20,13 +19,11 @@ use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use futures::channel::mpsc::{unbounded, UnboundedReceiver};
use futures::executor::block_on;
use futures::stream::{Fuse, Stream, StreamExt};
use futures::{future, Future, FutureExt};
use tokio_timer::Delay;
use tokio::time::{Delay, Duration, Instant};
use trust_dns_client::client::ClientConnection;
use trust_dns_client::error::ClientResult;
@ -139,6 +136,8 @@ impl Stream for TestClientStream {
type Item = Result<SerialMessage, ProtoError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use futures::executor::block_on;
match self.outbound_messages.next().poll_unpin(cx) {
// already handled above, here to make sure the poll() pops the next message
Poll::Ready(Some(bytes)) => {
@ -151,7 +150,6 @@ impl Stream for TestClientStream {
src: src_addr,
};
dbg!("catalog handling request");
let response_handler = TestResponseHandler::new();
block_on(
self.catalog
@ -160,10 +158,7 @@ impl Stream for TestClientStream {
.handle_request(request, response_handler.clone()),
);
dbg!("catalog handled request");
let buf = block_on(response_handler.into_inner());
dbg!("catalog responded");
Poll::Ready(Some(Ok(SerialMessage::new(buf, src_addr))))
}
@ -201,9 +196,11 @@ impl NeverReturnsClientStream {
let (message_sender, outbound_messages) = unbounded();
let message_sender = StreamHandle::new(message_sender);
let stream = Box::pin(future::ok(NeverReturnsClientStream {
timeout: tokio_timer::delay(Instant::now() + Duration::from_secs(1)),
outbound_messages: outbound_messages.fuse(),
let stream = Box::pin(future::lazy(|_| {
Ok(NeverReturnsClientStream {
timeout: tokio::time::delay_for(Duration::from_secs(1)),
outbound_messages: outbound_messages.fuse(),
})
}));
(stream, message_sender)
@ -226,8 +223,6 @@ impl Stream for NeverReturnsClientStream {
type Item = Result<SerialMessage, ProtoError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
println!("still not returning");
// poll the timer forever...
if let Poll::Pending = self.timeout.poll_unpin(cx) {
return Poll::Pending;

View File

@ -6,7 +6,6 @@ extern crate openssl;
#[cfg(feature = "dns-over-https-rustls")]
extern crate rustls;
extern crate tokio;
extern crate tokio_net;
extern crate trust_dns_client;
#[cfg(feature = "dns-over-https")]
extern crate trust_dns_https;
@ -24,9 +23,9 @@ use std::sync::{Arc, Mutex};
#[cfg(feature = "dnssec")]
use chrono::Duration;
use futures::{Future, FutureExt, TryFutureExt};
use tokio::runtime::current_thread::Runtime;
use tokio_net::tcp::TcpStream as TokioTcpStream;
use tokio_net::udp::UdpSocket as TokioUdpSocket;
use tokio::net::TcpStream as TokioTcpStream;
use tokio::net::UdpSocket as TokioUdpSocket;
use tokio::runtime::Runtime;
use trust_dns_client::client::{BasicClientHandle, ClientFuture, ClientHandle};
use trust_dns_client::error::ClientErrorKind;
@ -899,8 +898,8 @@ where
#[test]
fn test_timeout_query_nonet() {
env_logger::try_init().ok();
let mut io_loop = Runtime::new().unwrap();
//env_logger::try_init().ok();
let io_loop = Runtime::new().expect("failed to create Tokio Runtime");
let (stream, sender) = NeverReturnsClientStream::new();
let (bg, client) = ClientFuture::with_timeout(
stream,
@ -915,8 +914,8 @@ fn test_timeout_query_nonet() {
#[test]
fn test_timeout_query_udp() {
env_logger::try_init().ok();
let mut io_loop = Runtime::new().unwrap();
//env_logger::try_init().ok();
let io_loop = Runtime::new().unwrap();
// this is a test network, it should NOT be in use
let addr: SocketAddr = ("203.0.113.0", 53)
@ -934,8 +933,8 @@ fn test_timeout_query_udp() {
#[test]
fn test_timeout_query_tcp() {
env_logger::try_init().ok();
let mut io_loop = Runtime::new().unwrap();
//env_logger::try_init().ok();
let io_loop = Runtime::new().unwrap();
// this is a test network, it should NOT be in use
let addr: SocketAddr = ("203.0.113.0", 53)

View File

@ -201,7 +201,7 @@ where
#[test]
fn test_timeout_query_nonet() {
env_logger::try_init().ok();
// env_logger::try_init().ok();
// TODO: need to add timeout length to SyncClient
let client = SyncClient::new(NeverReturnsClientConnection::new().unwrap());
test_timeout_query(client);

View File

@ -10,7 +10,7 @@ use std::str::FromStr;
use std::sync::{Arc, Mutex};
use futures::{FutureExt, TryFutureExt};
use tokio::runtime::current_thread::Runtime;
use tokio::runtime::Runtime;
use trust_dns_proto::op::{NoopMessageFinalizer, Query};
use trust_dns_proto::rr::{DNSClass, Name, RData, Record, RecordType};

View File

@ -7,7 +7,6 @@ extern crate lazy_static;
extern crate log;
extern crate openssl;
extern crate tokio;
extern crate tokio_timer;
extern crate trust_dns_client;
extern crate trust_dns_integration;
extern crate trust_dns_proto;
@ -17,11 +16,11 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Barrier};
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
use std::time::Duration;
use futures::future::Either;
use futures::{future, StreamExt};
use tokio::runtime::current_thread::Runtime;
use tokio::runtime::Runtime;
use trust_dns_client::client::{ClientFuture, ClientHandle};
use trust_dns_client::multicast::MdnsQueryType;
@ -54,7 +53,7 @@ fn mdns_responsder(
let mut io_loop = Runtime::new().unwrap();
// a max time for the test to run
let mut timeout = tokio_timer::delay(Instant::now() + Duration::from_millis(100));
let mut timeout = tokio::time::delay_for(Duration::from_millis(100));
// TODO: ipv6 if is hardcoded, need a different strategy
let (mdns_stream, mdns_handle) = MdnsStream::new(
@ -97,7 +96,7 @@ fn mdns_responsder(
}
Either::Right(((), data_src_stream_tmp)) => {
stream = data_src_stream_tmp;
timeout = tokio_timer::delay(Instant::now() + Duration::from_millis(100));
timeout = tokio::time::delay_for(Duration::from_millis(100));
}
}
}
@ -128,9 +127,8 @@ fn test_query_mdns_ipv4() {
// A PTR request is the DNS-SD method for doing a directory listing...
let name = Name::from_ascii("_dns._udp.local.").unwrap();
let message = io_loop
.spawn(bg)
.block_on(client.query(name, DNSClass::IN, RecordType::PTR));
io_loop.spawn(bg);
let message = io_loop.block_on(client.query(name, DNSClass::IN, RecordType::PTR));
client_done.store(true, Ordering::Relaxed);
@ -152,9 +150,8 @@ fn test_query_mdns_ipv6() {
// A PTR request is the DNS-SD method for doing a directory listing...
let name = Name::from_ascii("_dns._udp.local.").unwrap();
let message = io_loop
.spawn(bg)
.block_on(client.query(name, DNSClass::IN, RecordType::PTR));
io_loop.spawn(bg);
let message = io_loop.block_on(client.query(name, DNSClass::IN, RecordType::PTR));
client_done.store(true, Ordering::Relaxed);

View File

@ -15,7 +15,7 @@ use std::sync::{
use std::task::Poll;
use futures::{future, Future};
use tokio::runtime::current_thread::Runtime;
use tokio::runtime::Runtime;
use trust_dns_client::op::Query;
use trust_dns_client::rr::{Name, RecordType};

View File

@ -2,7 +2,6 @@
extern crate futures;
extern crate tokio;
extern crate tokio_net;
extern crate trust_dns_client;
extern crate trust_dns_integration;
extern crate trust_dns_proto;
@ -12,9 +11,9 @@ use std::net::*;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use tokio::runtime::current_thread::Runtime;
use tokio_net::tcp::TcpStream as TokioTcpStream;
use tokio_net::udp::UdpSocket as TokioUdpSocket;
use tokio::net::TcpStream as TokioTcpStream;
use tokio::net::UdpSocket as TokioUdpSocket;
use tokio::runtime::Runtime;
use trust_dns_client::client::{
BasicClientHandle, ClientFuture, ClientHandle, MemoizeClientHandle, SecureClientHandle,
@ -241,7 +240,7 @@ where
let mut catalog = Catalog::new();
catalog.upsert(authority.origin().clone(), Box::new(authority));
let mut io_loop = Runtime::new().unwrap();
let io_loop = Runtime::new().unwrap();
let (stream, sender) = TestClientStream::new(Arc::new(Mutex::new(catalog)));
let (bg, client) = ClientFuture::new(stream, Box::new(sender), None);
let client = MemoizeClientHandle::new(client);
@ -274,7 +273,7 @@ where
})
.unwrap();
let mut io_loop = Runtime::new().unwrap();
let io_loop = Runtime::new().unwrap();
let addr: SocketAddr = ("8.8.8.8", 53).to_socket_addrs().unwrap().next().unwrap();
let stream: UdpClientConnect<TokioUdpSocket> = UdpClientStream::new(addr);
let (bg, client) = ClientFuture::connect(stream);
@ -311,7 +310,7 @@ where
})
.unwrap();
let mut io_loop = Runtime::new().unwrap();
let io_loop = Runtime::new().unwrap();
let addr: SocketAddr = ("8.8.8.8", 53).to_socket_addrs().unwrap().next().unwrap();
let (stream, sender) = TcpClientStream::<TokioTcpStream>::new(addr);
let (bg, client) = ClientFuture::new(Box::new(stream), sender, None);

View File

@ -2,8 +2,6 @@ extern crate futures;
extern crate openssl;
extern crate rustls;
extern crate tokio;
extern crate tokio_net;
extern crate tokio_timer;
extern crate trust_dns_client;
extern crate trust_dns_integration;
extern crate trust_dns_openssl;
@ -11,20 +9,17 @@ extern crate trust_dns_proto;
extern crate trust_dns_rustls;
extern crate trust_dns_server;
use std::io;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::pin::Pin;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use std::time::Duration;
use futures::executor::block_on;
use futures::{future, Future};
use tokio::runtime::current_thread::Runtime;
use tokio_net::tcp::TcpListener;
use tokio_net::udp::UdpSocket;
use futures::{future, Future, FutureExt};
use tokio::net::TcpListener;
use tokio::net::UdpSocket;
use tokio::runtime::Runtime;
use trust_dns_client::client::*;
use trust_dns_client::op::*;
@ -44,8 +39,9 @@ use trust_dns_integration::tls_client_connection::TlsClientConnection;
#[test]
fn test_server_www_udp() {
let mut runtime = Runtime::new().expect("failed to create Tokio Runtime");
let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 0));
let udp_socket = block_on(UdpSocket::bind(&addr)).unwrap();
let udp_socket = runtime.block_on(UdpSocket::bind(&addr)).unwrap();
let ipaddr = udp_socket.local_addr().unwrap();
println!("udp_socket on port: {}", ipaddr);
@ -54,7 +50,7 @@ fn test_server_www_udp() {
let server_thread = thread::Builder::new()
.name("test_server:udp:server".to_string())
.spawn(move || server_thread_udp(udp_socket, server_continue2))
.spawn(move || server_thread_udp(runtime, udp_socket, server_continue2))
.unwrap();
let client_thread = thread::Builder::new()
@ -71,8 +67,9 @@ fn test_server_www_udp() {
#[test]
fn test_server_www_tcp() {
let mut runtime = Runtime::new().expect("failed to create Tokio Runtime");
let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 0));
let tcp_listener = block_on(TcpListener::bind(&addr)).unwrap();
let tcp_listener = runtime.block_on(TcpListener::bind(&addr)).unwrap();
let ipaddr = tcp_listener.local_addr().unwrap();
println!("tcp_listner on port: {}", ipaddr);
@ -81,7 +78,7 @@ fn test_server_www_tcp() {
let server_thread = thread::Builder::new()
.name("test_server:tcp:server".to_string())
.spawn(move || server_thread_tcp(tcp_listener, server_continue2))
.spawn(move || server_thread_tcp(runtime, tcp_listener, server_continue2))
.unwrap();
let client_thread = thread::Builder::new()
@ -98,10 +95,9 @@ fn test_server_www_tcp() {
#[test]
fn test_server_unknown_type() {
use futures::executor::block_on;
let mut runtime = Runtime::new().expect("failed to create Tokio Runtime");
let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 0));
let udp_socket = block_on(UdpSocket::bind(&addr)).unwrap();
let udp_socket = runtime.block_on(UdpSocket::bind(&addr)).unwrap();
let ipaddr = udp_socket.local_addr().unwrap();
println!("udp_socket on port: {}", ipaddr);
@ -110,7 +106,7 @@ fn test_server_unknown_type() {
let server_thread = thread::Builder::new()
.name("test_server:udp:server".to_string())
.spawn(move || server_thread_udp(udp_socket, server_continue2))
.spawn(move || server_thread_udp(runtime, udp_socket, server_continue2))
.unwrap();
let conn = UdpClientConnection::new(ipaddr).unwrap();
@ -162,7 +158,6 @@ fn read_file(path: &str) -> Vec<u8> {
#[test]
fn test_server_www_tls() {
use futures::executor::block_on;
use std::env;
let dns_name = "ns.example.com";
@ -175,8 +170,9 @@ fn test_server_www_tls() {
let pkcs12_der = read_file(&format!("{}/../../tests/test-data/cert.p12", server_path));
// Server address
let mut runtime = Runtime::new().expect("failed to create Tokio Runtime");
let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 0));
let tcp_listener = block_on(TcpListener::bind(&addr)).unwrap();
let tcp_listener = runtime.block_on(TcpListener::bind(&addr)).unwrap();
let ipaddr = tcp_listener.local_addr().unwrap();
println!("tcp_listner on port: {}", ipaddr);
@ -185,7 +181,7 @@ fn test_server_www_tls() {
let server_thread = thread::Builder::new()
.name("test_server:tls:server".to_string())
.spawn(move || server_thread_tls(tcp_listener, server_continue2, pkcs12_der))
.spawn(move || server_thread_tls(tcp_listener, server_continue2, pkcs12_der, runtime))
.unwrap();
let client_thread = thread::Builder::new()
@ -277,36 +273,40 @@ fn new_catalog() -> Catalog {
catalog
}
fn server_thread_udp(udp_socket: UdpSocket, server_continue: Arc<AtomicBool>) {
fn server_thread_udp(
mut io_loop: Runtime,
udp_socket: UdpSocket,
server_continue: Arc<AtomicBool>,
) {
let catalog = new_catalog();
let mut io_loop = Runtime::new().unwrap();
let server = ServerFuture::new(catalog);
io_loop.block_on::<Pin<Box<dyn Future<Output = ()> + Send>>>(Box::pin(future::lazy(|_| {
server.register_socket(udp_socket);
})));
let mut server = ServerFuture::new(catalog);
server.register_socket(udp_socket, &io_loop);
while server_continue.load(Ordering::Relaxed) {
io_loop.block_on(tokio_timer::delay(
Instant::now() + Duration::from_millis(10),
));
io_loop.block_on(
future::lazy(|_| tokio::time::delay_for(Duration::from_millis(10))).flatten(),
);
}
drop(io_loop);
}
fn server_thread_tcp(tcp_listener: TcpListener, server_continue: Arc<AtomicBool>) {
fn server_thread_tcp(
mut io_loop: Runtime,
tcp_listener: TcpListener,
server_continue: Arc<AtomicBool>,
) {
let catalog = new_catalog();
let mut io_loop = Runtime::new().unwrap();
let server = ServerFuture::new(catalog);
io_loop
.block_on::<Pin<Box<dyn Future<Output = Result<(), io::Error>> + Send>>>(Box::pin(
future::lazy(|_| server.register_listener(tcp_listener, Duration::from_secs(30))),
))
.expect("tcp registration failed");
let mut server = ServerFuture::new(catalog);
server
.register_listener(tcp_listener, Duration::from_secs(30), &io_loop)
.expect("failed to register tcp");
while server_continue.load(Ordering::Relaxed) {
io_loop.block_on(tokio_timer::delay(
Instant::now() + Duration::from_millis(10),
));
io_loop.block_on(
future::lazy(|_| tokio::time::delay_for(Duration::from_millis(10))).flatten(),
);
}
}
@ -316,34 +316,24 @@ fn server_thread_tls(
tls_listener: TcpListener,
server_continue: Arc<AtomicBool>,
pkcs12_der: Vec<u8>,
mut io_loop: Runtime,
) {
use futures::FutureExt;
use openssl::pkcs12::Pkcs12;
let catalog = new_catalog();
let mut io_loop = Runtime::new().unwrap();
let server = ServerFuture::new(catalog);
io_loop
.block_on(
future::lazy(|_| {
let pkcs12 = Pkcs12::from_der(&pkcs12_der)
.expect("bad pkcs12 der")
.parse("mypass")
.expect("Pkcs12::from_der");
let pkcs12 = ((pkcs12.cert, pkcs12.chain), pkcs12.pkey);
future::ready(server.register_tls_listener(
tls_listener,
Duration::from_secs(30),
pkcs12,
))
})
.flatten(),
)
.expect("tcp registration failed");
let mut server = ServerFuture::new(catalog);
let pkcs12 = Pkcs12::from_der(&pkcs12_der)
.expect("bad pkcs12 der")
.parse("mypass")
.expect("Pkcs12::from_der");
let pkcs12 = ((pkcs12.cert, pkcs12.chain), pkcs12.pkey);
server
.register_tls_listener(tls_listener, Duration::from_secs(30), pkcs12, &io_loop)
.expect("failed to register TLS");
while server_continue.load(Ordering::Relaxed) {
io_loop.block_on(tokio_timer::delay(
Instant::now() + Duration::from_millis(10),
));
io_loop.block_on(
future::lazy(|_| tokio::time::delay_for(Duration::from_millis(10))).flatten(),
);
}
}