upgrade trust-dns-server to Tokio 0.3

This commit is contained in:
Benjamin Fry 2020-11-10 11:24:15 -08:00
parent 402a14abce
commit 8fbb05a6f7
7 changed files with 56 additions and 141 deletions

120
Cargo.lock generated
View File

@ -614,26 +614,6 @@ dependencies = [
"web-sys",
]
[[package]]
name = "h2"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e4728fd124914ad25e99e3d15a9361a879f6620f63cb56bbb08f95abb97a535"
dependencies = [
"bytes 0.5.6",
"fnv",
"futures-core",
"futures-sink",
"futures-util",
"http",
"indexmap",
"slab",
"tokio 0.2.22",
"tokio-util 0.3.1",
"tracing",
"tracing-futures",
]
[[package]]
name = "h2"
version = "0.3.0"
@ -647,8 +627,8 @@ dependencies = [
"http",
"indexmap",
"slab",
"tokio 0.3.3",
"tokio-util 0.4.0",
"tokio",
"tokio-util",
"tracing",
"tracing-futures",
]
@ -1563,18 +1543,6 @@ version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "238ce071d267c5710f9d31451efec16c5ee22de34df17cc05e56cbc92e967117"
[[package]]
name = "tokio"
version = "0.2.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d34ca54d84bf2b5b4d7d31e901a8464f7b60ac145a284fba25ceb801f2ddccd"
dependencies = [
"bytes 0.5.6",
"futures-core",
"memchr",
"pin-project-lite",
]
[[package]]
name = "tokio"
version = "0.3.3"
@ -1612,17 +1580,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "501c8252b73bd01379aaae1521523c2629ff1bc6ea46c29e0baff515cee60f1b"
dependencies = [
"native-tls",
"tokio 0.3.3",
]
[[package]]
name = "tokio-openssl"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c4b08c5f4208e699ede3df2520aca2e82401b2de33f45e96696a074480be594"
dependencies = [
"openssl",
"tokio 0.2.22",
"tokio",
]
[[package]]
@ -1633,19 +1591,7 @@ checksum = "d01e5cc2d3a154fa310982d0affaec8140d6476805422265b2f648eb813f937f"
dependencies = [
"openssl",
"openssl-sys",
"tokio 0.3.3",
]
[[package]]
name = "tokio-rustls"
version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e12831b255bcfa39dc0436b01e19fea231a37db570686c06ee72c423479f889a"
dependencies = [
"futures-core",
"rustls",
"tokio 0.2.22",
"webpki",
"tokio",
]
[[package]]
@ -1655,24 +1601,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ba3f045b4a15afc543ffe6e02bae7ecf372e89a0f9f8c4b15b13f0ceed4f105"
dependencies = [
"rustls",
"tokio 0.3.3",
"tokio",
"webpki",
]
[[package]]
name = "tokio-util"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499"
dependencies = [
"bytes 0.5.6",
"futures-core",
"futures-sink",
"log",
"pin-project-lite",
"tokio 0.2.22",
]
[[package]]
name = "tokio-util"
version = "0.4.0"
@ -1684,7 +1616,7 @@ dependencies = [
"futures-sink",
"log",
"pin-project-lite",
"tokio 0.3.3",
"tokio",
]
[[package]]
@ -1739,7 +1671,7 @@ dependencies = [
"native-tls",
"regex",
"rustls",
"tokio 0.3.3",
"tokio",
"trust-dns-client",
"trust-dns-https",
"trust-dns-native-tls",
@ -1770,7 +1702,7 @@ dependencies = [
"rustls",
"serde",
"thiserror",
"tokio 0.3.3",
"tokio",
"trust-dns-https",
"trust-dns-proto",
"webpki",
@ -1799,13 +1731,13 @@ dependencies = [
"env_logger",
"futures",
"futures-util",
"h2 0.3.0",
"h2",
"http",
"log",
"rustls",
"thiserror",
"tokio 0.3.3",
"tokio-rustls 0.20.0",
"tokio",
"tokio-rustls",
"trust-dns-proto",
"trust-dns-rustls",
"webpki",
@ -1825,7 +1757,7 @@ dependencies = [
"rand",
"rusqlite",
"rustls",
"tokio 0.3.3",
"tokio",
"trust-dns-client",
"trust-dns-https",
"trust-dns-openssl",
@ -1843,7 +1775,7 @@ dependencies = [
"futures-channel",
"futures-util",
"native-tls",
"tokio 0.3.3",
"tokio",
"tokio-native-tls",
"trust-dns-proto",
]
@ -1855,8 +1787,8 @@ dependencies = [
"futures-channel",
"futures-util",
"openssl",
"tokio 0.3.3",
"tokio-openssl 0.5.0",
"tokio",
"tokio-openssl",
"trust-dns-proto",
]
@ -1886,7 +1818,7 @@ dependencies = [
"smallvec",
"socket2",
"thiserror",
"tokio 0.3.3",
"tokio",
"url",
"wasm-bindgen",
]
@ -1909,10 +1841,10 @@ dependencies = [
"serde",
"smallvec",
"thiserror",
"tokio 0.3.3",
"tokio",
"tokio-native-tls",
"tokio-openssl 0.5.0",
"tokio-rustls 0.20.0",
"tokio-openssl",
"tokio-rustls",
"trust-dns-https",
"trust-dns-native-tls",
"trust-dns-openssl",
@ -1931,8 +1863,8 @@ dependencies = [
"log",
"openssl",
"rustls",
"tokio 0.3.3",
"tokio-rustls 0.20.0",
"tokio",
"tokio-rustls",
"trust-dns-proto",
"webpki",
]
@ -1948,7 +1880,7 @@ dependencies = [
"env_logger",
"futures-executor",
"futures-util",
"h2 0.2.7",
"h2",
"http",
"log",
"openssl",
@ -1956,9 +1888,9 @@ dependencies = [
"rustls",
"serde",
"thiserror",
"tokio 0.3.3",
"tokio-openssl 0.4.0",
"tokio-rustls 0.14.1",
"tokio",
"tokio-openssl",
"tokio-rustls",
"toml",
"trust-dns-client",
"trust-dns-https",
@ -1979,7 +1911,7 @@ dependencies = [
"log",
"openssl",
"structopt",
"tokio 0.3.3",
"tokio",
"trust-dns-client",
"trust-dns-proto",
"trust-dns-resolver",

View File

@ -78,7 +78,7 @@ enum-as-inner = "0.3"
env_logger = "0.8"
futures-executor = { version = "0.3.5", default-features = false, features = ["std"] }
futures-util = { version = "0.3.5", default-features = false, features = ["std"] }
h2 = { version = "0.2.6", features = ["stream"], optional = true }
h2 = { version = "0.3.0", features = ["stream"], optional = true }
http = { version = "0.2", optional = true }
log = "0.4"
openssl = { version = "0.10", features = ["v102", "v110"], optional = true }
@ -87,8 +87,8 @@ rustls = { version = "0.18", optional = true }
serde = { version = "1.0.114", features = ["derive"] }
thiserror = "1.0.20"
tokio = { version = "0.3.0", features = ["stream", "net"] }
tokio-openssl = { version = "0.4.0", optional = true }
tokio-rustls = { version = "0.14", optional = true }
tokio-openssl = { version = "0.5.0", optional = true }
tokio-rustls = { version = "0.20", optional = true }
toml = "0.5"
trust-dns-client= { version = "0.20.0-alpha.3", path = "../client" }
trust-dns-https = { version = "0.20.0-alpha.3", path = "../https", optional = true }

View File

@ -17,12 +17,11 @@ use log::{debug, info, warn};
#[cfg(feature = "dns-over-rustls")]
use rustls::{Certificate, PrivateKey};
use tokio::net;
use tokio::runtime::Handle;
use tokio::task::JoinHandle;
use crate::authority::MessageRequest;
use crate::proto::error::ProtoError;
use crate::proto::iocompat::AsyncIo02As03;
use crate::proto::iocompat::AsyncIoTokioAsStd;
use crate::proto::op::Edns;
use crate::proto::serialize::binary::{BinDecodable, BinDecoder};
use crate::proto::tcp::TcpStream;
@ -54,17 +53,13 @@ impl<T: RequestHandler> ServerFuture<T> {
pub fn register_socket(&mut self, socket: net::UdpSocket) {
debug!("registering udp: {:?}", socket);
let spawner = Handle::current();
// create the new UdpStream
let (mut buf_stream, stream_handle) = UdpStream::with_bound(socket);
//let request_stream = RequestStream::new(buf_stream, stream_handle);
let handler = self.handler.clone();
// this spawns a ForEach future which handles all the requests into a Handler.
let join_handle = spawner.spawn({
let spawner = spawner.clone();
let join_handle = tokio::spawn({
async move {
while let Some(message) = buf_stream.next().await {
let message = match message {
@ -80,7 +75,7 @@ impl<T: RequestHandler> ServerFuture<T> {
let handler = handler.clone();
let stream_handle = stream_handle.clone();
spawner.spawn(async move {
tokio::spawn(async move {
self::handle_raw_request(message, handler, stream_handle).await;
});
}
@ -114,18 +109,14 @@ impl<T: RequestHandler> ServerFuture<T> {
pub fn register_listener(&mut self, listener: net::TcpListener, timeout: Duration) {
debug!("register tcp: {:?}", listener);
let spawner = Handle::current();
let handler = self.handler.clone();
// for each incoming request...
let join = spawner.spawn({
let spawner = spawner.clone();
let join = tokio::spawn({
async move {
let mut listener = listener;
let mut incoming = listener.incoming();
while let Some(tcp_stream) = incoming.next().await {
while let Some(tcp_stream) = listener.next().await {
let tcp_stream = match tcp_stream {
Ok(t) => t,
Err(e) => {
@ -137,12 +128,12 @@ impl<T: RequestHandler> ServerFuture<T> {
let handler = handler.clone();
// and spawn to the io_loop
spawner.spawn(async move {
tokio::spawn(async move {
let src_addr = tcp_stream.peer_addr().unwrap();
debug!("accepted request from: {}", src_addr);
// take the created stream...
let (buf_stream, stream_handle) =
TcpStream::from_stream(AsyncIo02As03(tcp_stream), src_addr);
TcpStream::from_stream(AsyncIoTokioAsStd(tcp_stream), src_addr);
let mut timeout_stream = TimeoutStream::new(buf_stream, timeout);
//let request_stream = RequestStream::new(timeout_stream, stream_handle);
@ -265,7 +256,7 @@ impl<T: RequestHandler> ServerFuture<T> {
};
debug!("accepted TLS request from: {}", src_addr);
let (buf_stream, stream_handle) =
TlsStream::from_stream(AsyncIo02As03(tls_stream), src_addr);
TlsStream::from_stream(AsyncIoTokioAsStd(tls_stream), src_addr);
let mut timeout_stream = TimeoutStream::new(buf_stream, timeout);
while let Some(message) = timeout_stream.next().await {
let message = match message {
@ -350,7 +341,6 @@ impl<T: RequestHandler> ServerFuture<T> {
use tokio_rustls::TlsAcceptor;
use trust_dns_rustls::{tls_from_stream, tls_server};
let spawner = Handle::current();
let handler = self.handler.clone();
debug!("registered tcp: {:?}", listener);
@ -365,14 +355,11 @@ impl<T: RequestHandler> ServerFuture<T> {
let tls_acceptor = TlsAcceptor::from(Arc::new(tls_acceptor));
// for each incoming request...
let join = spawner.spawn({
let spawner = spawner.clone();
let join = tokio::spawn({
async move {
let mut listener = listener;
let mut incoming = listener.incoming();
while let Some(tcp_stream) = incoming.next().await {
while let Some(tcp_stream) = listener.next().await {
let tcp_stream = match tcp_stream {
Ok(t) => t,
Err(e) => {
@ -385,7 +372,7 @@ impl<T: RequestHandler> ServerFuture<T> {
let tls_acceptor = tls_acceptor.clone();
// kick out to a different task immediately, let them do the TLS handshake
spawner.spawn(async move {
tokio::spawn(async move {
let src_addr = tcp_stream.peer_addr().unwrap();
debug!("starting TLS request from: {}", src_addr);
@ -393,7 +380,7 @@ impl<T: RequestHandler> ServerFuture<T> {
let tls_stream = tls_acceptor.accept(tcp_stream).await;
let tls_stream = match tls_stream {
Ok(tls_stream) => AsyncIo02As03(tls_stream),
Ok(tls_stream) => AsyncIoTokioAsStd(tls_stream),
Err(e) => {
debug!("tls handshake src: {} error: {}", src_addr, e);
return;
@ -488,7 +475,6 @@ impl<T: RequestHandler> ServerFuture<T> {
use crate::server::https_handler::h2_handler;
use trust_dns_rustls::tls_server;
let spawner = Handle::current();
let dns_hostname: Arc<str> = Arc::from(dns_hostname);
let handler = self.handler.clone();
debug!("registered tcp: {:?}", listener);
@ -504,16 +490,13 @@ impl<T: RequestHandler> ServerFuture<T> {
// for each incoming request...
let dns_hostname = dns_hostname;
let join = spawner.spawn({
let spawner = spawner.clone();
let join = tokio::spawn({
async move {
let mut listener = listener;
let mut incoming = listener.incoming();
let dns_hostname = dns_hostname;
while let Some(tcp_stream) = incoming.next().await {
while let Some(tcp_stream) = listener.next().await {
let tcp_stream = match tcp_stream {
Ok(t) => t,
Err(e) => {
@ -526,7 +509,7 @@ impl<T: RequestHandler> ServerFuture<T> {
let tls_acceptor = tls_acceptor.clone();
let dns_hostname = dns_hostname.clone();
spawner.spawn(async move {
tokio::spawn(async move {
let src_addr = tcp_stream.peer_addr().unwrap();
debug!("starting HTTPS request from: {}", src_addr);

View File

@ -7,7 +7,7 @@ use std::time::Duration;
use futures_util::stream::{Stream, StreamExt};
use futures_util::FutureExt;
use log::{debug, warn};
use tokio::time::Delay;
use tokio::time::Sleep;
/// This wraps the underlying Stream in a timeout.
///
@ -15,7 +15,7 @@ use tokio::time::Delay;
pub struct TimeoutStream<S> {
stream: S,
timeout_duration: Duration,
timeout: Option<Delay>,
timeout: Option<Sleep>,
}
impl<S> TimeoutStream<S> {
@ -34,9 +34,9 @@ impl<S> TimeoutStream<S> {
}
}
fn timeout(timeout_duration: Duration) -> Option<Delay> {
fn timeout(timeout_duration: Duration) -> Option<Sleep> {
if timeout_duration > Duration::from_millis(0) {
Some(tokio::time::delay_for(timeout_duration))
Some(tokio::time::sleep(timeout_duration))
} else {
None
}

View File

@ -12,7 +12,6 @@ use std::task::{Context, Poll};
use futures_util::{future, FutureExt};
use log::info;
use tokio::runtime::Handle;
use crate::client::op::LowerQuery;
use crate::client::op::ResponseCode;
@ -21,7 +20,7 @@ use crate::client::rr::{LowerName, Name, Record, RecordType};
use crate::resolver::config::ResolverConfig;
use crate::resolver::error::ResolveError;
use crate::resolver::lookup::Lookup as ResolverLookup;
use crate::resolver::TokioAsyncResolver;
use crate::resolver::{TokioAsyncResolver, TokioHandle};
use crate::authority::{
Authority, LookupError, LookupObject, MessageRequest, UpdateResult, ZoneType,
@ -40,7 +39,7 @@ impl ForwardAuthority {
/// TODO: change this name to create or something
#[allow(clippy::new_without_default)]
#[doc(hidden)]
pub async fn new(runtime: Handle) -> Result<Self, String> {
pub async fn new(runtime: TokioHandle) -> Result<Self, String> {
let resolver = TokioAsyncResolver::from_system_conf(runtime)
.map_err(|e| format!("error constructing new Resolver: {}", e))?;
@ -55,7 +54,7 @@ impl ForwardAuthority {
origin: Name,
_zone_type: ZoneType,
config: &ForwardConfig,
runtime: Handle,
runtime: TokioHandle,
) -> Result<Self, String> {
info!("loading forwarder config: {}", origin);

View File

@ -7,14 +7,15 @@ use std::str::FromStr;
use tokio::runtime::Runtime;
use trust_dns_client::rr::{Name, RecordType};
use trust_dns_resolver::TokioHandle;
use trust_dns_server::authority::{Authority, LookupObject};
use trust_dns_server::store::forwarder::ForwardAuthority;
#[ignore]
#[test]
fn test_lookup() {
let mut runtime = Runtime::new().expect("failed to create Tokio Runtime");
let forwarder = ForwardAuthority::new(runtime.handle().clone());
let runtime = Runtime::new().expect("failed to create Tokio Runtime");
let forwarder = ForwardAuthority::new(TokioHandle);
let forwarder = runtime
.block_on(forwarder)
.expect("failed to create forwarder");

View File

@ -13,7 +13,7 @@ fn test_no_timeout() {
#[allow(deprecated)]
let sequence =
iter(vec![Ok(1), Err("error"), Ok(2)]).map_err(|e| io::Error::new(io::ErrorKind::Other, e));
let mut core = Runtime::new().expect("could not get core");
let core = Runtime::new().expect("could not get core");
let timeout_stream = TimeoutStream::new(sequence, Duration::from_secs(360));
@ -43,7 +43,7 @@ impl Stream for NeverStream {
#[test]
fn test_timeout() {
let mut core = Runtime::new().expect("could not get core");
let core = Runtime::new().expect("could not get core");
let timeout_stream = TimeoutStream::new(NeverStream {}, Duration::from_millis(1));
assert!(core