Abstract tokio (#960)
* [trust-dns-proto] Make tokio* optional 1. Move from tokio::io::{AsyncRead, AsyncWrite} to futures::io::{AsyncRead, AsyncWrite} and provide a helper struct Compat02As03 for the conversion. 2. Abstract tokio::time::{Delay, Timeout}. 3. Modify the other crates which are impacted by the above two changes. * [trust-dns-proto] make fn asyn for Time trait * [trust-dns-proto] collapse features Collapse the features tokio-io, tokio-time into tokio-time.
This commit is contained in:
parent
6c337c5ae4
commit
93cf1db18b
@ -33,6 +33,7 @@ use trust_dns_client::tcp::*;
|
||||
use trust_dns_client::udp::*;
|
||||
use trust_dns_proto::error::*;
|
||||
use trust_dns_proto::xfer::*;
|
||||
use trust_dns_proto::{iocompat::AsyncIo02As03, TokioTime};
|
||||
|
||||
fn find_test_port() -> u16 {
|
||||
let server = std::net::UdpSocket::bind(("0.0.0.0", 0)).unwrap();
|
||||
@ -189,7 +190,7 @@ fn trust_dns_tcp_bench(b: &mut Bencher) {
|
||||
.unwrap()
|
||||
.next()
|
||||
.unwrap();
|
||||
let (stream, sender) = TcpClientStream::<TcpStream>::new(addr);
|
||||
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TcpStream>>::new::<TokioTime>(addr);
|
||||
let mp = DnsMultiplexer::new(stream, sender, None::<Arc<Signer>>);
|
||||
bench(b, mp);
|
||||
|
||||
@ -264,7 +265,7 @@ fn bind_tcp_bench(b: &mut Bencher) {
|
||||
.unwrap()
|
||||
.next()
|
||||
.unwrap();
|
||||
let (stream, sender) = TcpClientStream::<TcpStream>::new(addr);
|
||||
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TcpStream>>::new::<TokioTime>(addr);
|
||||
let mp = DnsMultiplexer::new(stream, sender, None::<Arc<Signer>>);
|
||||
bench(b, mp);
|
||||
|
||||
|
@ -26,6 +26,7 @@ use trust_dns_client::proto::xfer::{
|
||||
};
|
||||
use trust_dns_client::proto::SecureDnsHandle;
|
||||
use trust_dns_client::rr::dnssec::*;
|
||||
use trust_dns_proto::{iocompat::AsyncIo02As03, TokioTime};
|
||||
|
||||
use server_harness::*;
|
||||
|
||||
@ -66,8 +67,9 @@ async fn standard_tcp_conn(
|
||||
) -> (
|
||||
AsyncClient<DnsMultiplexerSerialResponse>,
|
||||
DnsExchangeBackground<
|
||||
DnsMultiplexer<TcpClientStream<TokioTcpStream>, Signer>,
|
||||
DnsMultiplexer<TcpClientStream<AsyncIo02As03<TokioTcpStream>>, Signer>,
|
||||
DnsMultiplexerSerialResponse,
|
||||
TokioTime,
|
||||
>,
|
||||
) {
|
||||
let addr: SocketAddr = ("127.0.0.1", port)
|
||||
@ -75,7 +77,7 @@ async fn standard_tcp_conn(
|
||||
.unwrap()
|
||||
.next()
|
||||
.unwrap();
|
||||
let (stream, sender) = TcpClientStream::<TokioTcpStream>::new(addr);
|
||||
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
|
||||
AsyncClient::new(stream, sender, None)
|
||||
.await
|
||||
.expect("new AsyncClient failed")
|
||||
|
@ -32,12 +32,14 @@ use trust_dns_client::op::ResponseCode;
|
||||
use trust_dns_client::rr::*;
|
||||
use trust_dns_client::tcp::TcpClientStream;
|
||||
use trust_dns_client::udp::UdpClientStream;
|
||||
use trust_dns_proto::TokioTime;
|
||||
|
||||
// TODO: Needed for when TLS tests are added back
|
||||
// #[cfg(feature = "dns-over-openssl")]
|
||||
// use trust_dns_openssl::TlsClientStreamBuilder;
|
||||
|
||||
use server_harness::{named_test_harness, query_a};
|
||||
use trust_dns_proto::iocompat::AsyncIo02As03;
|
||||
|
||||
#[test]
|
||||
fn test_example_toml_startup() {
|
||||
@ -47,7 +49,8 @@ fn test_example_toml_startup() {
|
||||
Ipv4Addr::new(127, 0, 0, 1).into(),
|
||||
tcp_port.expect("no tcp_port"),
|
||||
);
|
||||
let (stream, sender) = TcpClientStream::<TokioTcpStream>::new(addr);
|
||||
let (stream, sender) =
|
||||
TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
|
||||
let client = AsyncClient::new(Box::new(stream), sender, None);
|
||||
|
||||
let (mut client, bg) = io_loop.block_on(client).expect("client failed to connect");
|
||||
@ -60,7 +63,8 @@ fn test_example_toml_startup() {
|
||||
Ipv4Addr::new(127, 0, 0, 1).into(),
|
||||
tcp_port.expect("no tcp_port"),
|
||||
);
|
||||
let (stream, sender) = TcpClientStream::<TokioTcpStream>::new(addr);
|
||||
let (stream, sender) =
|
||||
TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
|
||||
let client = AsyncClient::new(Box::new(stream), sender, None);
|
||||
|
||||
let (mut client, bg) = io_loop.block_on(client).expect("client failed to connect");
|
||||
@ -78,7 +82,8 @@ fn test_ipv4_only_toml_startup() {
|
||||
Ipv4Addr::new(127, 0, 0, 1).into(),
|
||||
tcp_port.expect("no tcp_port"),
|
||||
);
|
||||
let (stream, sender) = TcpClientStream::<TokioTcpStream>::new(addr);
|
||||
let (stream, sender) =
|
||||
TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
|
||||
let client = AsyncClient::new(Box::new(stream), sender, None);
|
||||
|
||||
let (mut client, bg) = io_loop.block_on(client).expect("client failed to connect");
|
||||
@ -91,7 +96,8 @@ fn test_ipv4_only_toml_startup() {
|
||||
Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1).into(),
|
||||
tcp_port.expect("no tcp_port"),
|
||||
);
|
||||
let (stream, sender) = TcpClientStream::<TokioTcpStream>::new(addr);
|
||||
let (stream, sender) =
|
||||
TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
|
||||
let client = AsyncClient::new(Box::new(stream), sender, None);
|
||||
|
||||
assert!(io_loop.block_on(client).is_err());
|
||||
@ -141,7 +147,8 @@ fn test_ipv4_and_ipv6_toml_startup() {
|
||||
Ipv4Addr::new(127, 0, 0, 1).into(),
|
||||
tcp_port.expect("no tcp_port"),
|
||||
);
|
||||
let (stream, sender) = TcpClientStream::<TokioTcpStream>::new(addr);
|
||||
let (stream, sender) =
|
||||
TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
|
||||
let client = AsyncClient::new(Box::new(stream), sender, None);
|
||||
|
||||
let (mut client, bg) = io_loop.block_on(client).expect("client failed to connect");
|
||||
@ -153,7 +160,8 @@ fn test_ipv4_and_ipv6_toml_startup() {
|
||||
Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1).into(),
|
||||
tcp_port.expect("no tcp_port"),
|
||||
);
|
||||
let (stream, sender) = TcpClientStream::<TokioTcpStream>::new(addr);
|
||||
let (stream, sender) =
|
||||
TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
|
||||
let client = AsyncClient::new(Box::new(stream), sender, None);
|
||||
let (mut client, bg) = io_loop.block_on(client).expect("client failed to connect");
|
||||
trust_dns_proto::spawn_bg(&io_loop, bg);
|
||||
@ -171,7 +179,8 @@ fn test_nodata_where_name_exists() {
|
||||
Ipv4Addr::new(127, 0, 0, 1).into(),
|
||||
tcp_port.expect("no tcp_port"),
|
||||
);
|
||||
let (stream, sender) = TcpClientStream::<TokioTcpStream>::new(addr);
|
||||
let (stream, sender) =
|
||||
TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
|
||||
let client = AsyncClient::new(Box::new(stream), sender, None);
|
||||
let (mut client, bg) = io_loop.block_on(client).expect("client failed to connect");
|
||||
trust_dns_proto::spawn_bg(&io_loop, bg);
|
||||
@ -196,7 +205,8 @@ fn test_nxdomain_where_no_name_exists() {
|
||||
Ipv4Addr::new(127, 0, 0, 1).into(),
|
||||
tcp_port.expect("no tcp_port"),
|
||||
);
|
||||
let (stream, sender) = TcpClientStream::<TokioTcpStream>::new(addr);
|
||||
let (stream, sender) =
|
||||
TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
|
||||
let client = AsyncClient::new(Box::new(stream), sender, None);
|
||||
let (mut client, bg) = io_loop.block_on(client).expect("client failed to connect");
|
||||
trust_dns_proto::spawn_bg(&io_loop, bg);
|
||||
@ -259,7 +269,8 @@ fn test_server_continues_on_bad_data_tcp() {
|
||||
Ipv4Addr::new(127, 0, 0, 1).into(),
|
||||
tcp_port.expect("no tcp_port"),
|
||||
);
|
||||
let (stream, sender) = TcpClientStream::<TokioTcpStream>::new(addr);
|
||||
let (stream, sender) =
|
||||
TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
|
||||
let client = AsyncClient::new(Box::new(stream), sender, None);
|
||||
|
||||
let (mut client, bg) = io_loop.block_on(client).expect("client failed to connect");
|
||||
@ -279,7 +290,8 @@ fn test_server_continues_on_bad_data_tcp() {
|
||||
Ipv4Addr::new(127, 0, 0, 1).into(),
|
||||
tcp_port.expect("no tcp_port"),
|
||||
);
|
||||
let (stream, sender) = TcpClientStream::<TokioTcpStream>::new(addr);
|
||||
let (stream, sender) =
|
||||
TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
|
||||
let client = AsyncClient::new(Box::new(stream), sender, None);
|
||||
let (mut client, bg) = io_loop.block_on(client).expect("client failed to connect");
|
||||
trust_dns_proto::spawn_bg(&io_loop, bg);
|
||||
@ -301,7 +313,8 @@ fn test_forward() {
|
||||
Ipv4Addr::new(127, 0, 0, 1).into(),
|
||||
tcp_port.expect("no tcp_port"),
|
||||
);
|
||||
let (stream, sender) = TcpClientStream::<TokioTcpStream>::new(addr);
|
||||
let (stream, sender) =
|
||||
TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
|
||||
let client = AsyncClient::new(Box::new(stream), sender, None);
|
||||
|
||||
let (mut client, bg) = io_loop.block_on(client).expect("client failed to connect");
|
||||
@ -323,7 +336,8 @@ fn test_forward() {
|
||||
Ipv4Addr::new(127, 0, 0, 1).into(),
|
||||
tcp_port.expect("no tcp_port"),
|
||||
);
|
||||
let (stream, sender) = TcpClientStream::<TokioTcpStream>::new(addr);
|
||||
let (stream, sender) =
|
||||
TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
|
||||
let client = AsyncClient::new(Box::new(stream), sender, None);
|
||||
|
||||
let (mut client, bg) = io_loop.block_on(client).expect("client failed to connect");
|
||||
|
@ -16,6 +16,7 @@ use crate::proto::xfer::{
|
||||
DnsHandle, DnsMultiplexer, DnsMultiplexerConnect, DnsMultiplexerSerialResponse, DnsRequest,
|
||||
DnsRequestOptions, DnsRequestSender, DnsResponse, DnsStreamHandle,
|
||||
};
|
||||
use crate::proto::TokioTime;
|
||||
use futures::{ready, Future, FutureExt};
|
||||
use log::debug;
|
||||
use rand;
|
||||
@ -146,7 +147,7 @@ where
|
||||
|
||||
/// A future that resolves to an AsyncClient
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
pub struct AsyncClientConnect<F, S, R>(DnsExchangeConnect<F, S, R>)
|
||||
pub struct AsyncClientConnect<F, S, R>(DnsExchangeConnect<F, S, R, TokioTime>)
|
||||
where
|
||||
F: Future<Output = Result<S, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static,
|
||||
@ -159,7 +160,7 @@ where
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static + Send + Unpin,
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
{
|
||||
type Output = Result<(AsyncClient<R>, DnsExchangeBackground<S, R>), ProtoError>;
|
||||
type Output = Result<(AsyncClient<R>, DnsExchangeBackground<S, R, TokioTime>), ProtoError>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
let result = ready!(self.0.poll_unpin(cx));
|
||||
|
@ -18,6 +18,7 @@ use crate::proto::xfer::{
|
||||
DnsExchangeBackground, DnsHandle, DnsRequest, DnsRequestSender, DnsResponse,
|
||||
};
|
||||
use crate::proto::SecureDnsHandle;
|
||||
use crate::proto::TokioTime;
|
||||
|
||||
// FIXME: rename to AsyncDnsSecClient
|
||||
/// A DNSSEC Client implemented over futures-rs.
|
||||
@ -152,7 +153,8 @@ where
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static + Send + Unpin,
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
{
|
||||
type Output = Result<(AsyncSecureClient<R>, DnsExchangeBackground<S, R>), ProtoError>;
|
||||
type Output =
|
||||
Result<(AsyncSecureClient<R>, DnsExchangeBackground<S, R, TokioTime>), ProtoError>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
let result = ready!(self.client_connect.poll_unpin(cx));
|
||||
|
@ -11,8 +11,10 @@ use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::proto::iocompat::AsyncIo02As03;
|
||||
use crate::proto::tcp::{TcpClientConnect, TcpClientStream};
|
||||
use crate::proto::xfer::{DnsMultiplexer, DnsMultiplexerConnect, DnsRequestSender};
|
||||
use crate::proto::TokioTime;
|
||||
use tokio::net::TcpStream;
|
||||
|
||||
use crate::client::ClientConnection;
|
||||
@ -60,14 +62,18 @@ impl TcpClientConnection {
|
||||
}
|
||||
|
||||
impl ClientConnection for TcpClientConnection {
|
||||
type Sender = DnsMultiplexer<TcpClientStream<TcpStream>, Signer>;
|
||||
type Sender = DnsMultiplexer<TcpClientStream<AsyncIo02As03<TcpStream>>, Signer>;
|
||||
type Response = <Self::Sender as DnsRequestSender>::DnsResponseFuture;
|
||||
type SenderFuture =
|
||||
DnsMultiplexerConnect<TcpClientConnect<TcpStream>, TcpClientStream<TcpStream>, Signer>;
|
||||
type SenderFuture = DnsMultiplexerConnect<
|
||||
TcpClientConnect<AsyncIo02As03<TcpStream>>,
|
||||
TcpClientStream<AsyncIo02As03<TcpStream>>,
|
||||
Signer,
|
||||
>;
|
||||
|
||||
fn new_stream(&self, signer: Option<Arc<Signer>>) -> Self::SenderFuture {
|
||||
let (tcp_client_stream, handle) =
|
||||
TcpClientStream::<TcpStream>::with_timeout(self.name_server, self.timeout);
|
||||
let (tcp_client_stream, handle) = TcpClientStream::<AsyncIo02As03<TcpStream>>::with_timeout::<
|
||||
TokioTime,
|
||||
>(self.name_server, self.timeout);
|
||||
DnsMultiplexer::new(tcp_client_stream, handle, signer)
|
||||
}
|
||||
}
|
||||
|
@ -28,6 +28,7 @@ use webpki::DNSNameRef;
|
||||
|
||||
use trust_dns_proto::error::ProtoError;
|
||||
use trust_dns_proto::xfer::{DnsRequest, DnsRequestSender, DnsResponse, SerialMessage};
|
||||
use trust_dns_proto::Time;
|
||||
|
||||
const ALPN_H2: &[u8] = b"h2";
|
||||
|
||||
@ -223,7 +224,7 @@ impl DnsRequestSender for HttpsClientStream {
|
||||
/// (Unsupported Media Type) upon receiving a media type it is unable to
|
||||
/// process.
|
||||
/// ```
|
||||
fn send_message(
|
||||
fn send_message<TE: Time>(
|
||||
&mut self,
|
||||
mut message: DnsRequest,
|
||||
_cx: &mut Context,
|
||||
@ -249,7 +250,7 @@ impl DnsRequestSender for HttpsClientStream {
|
||||
)))
|
||||
}
|
||||
|
||||
fn error_response(error: ProtoError) -> Self::DnsResponseFuture {
|
||||
fn error_response<TE: Time>(error: ProtoError) -> Self::DnsResponseFuture {
|
||||
HttpsClientResponse(Box::pin(future::err(error)))
|
||||
}
|
||||
|
||||
@ -519,6 +520,7 @@ mod tests {
|
||||
|
||||
use trust_dns_proto::op::{Message, Query};
|
||||
use trust_dns_proto::rr::{Name, RData, RecordType};
|
||||
use trust_dns_proto::TokioTime;
|
||||
|
||||
use super::*;
|
||||
|
||||
@ -550,7 +552,9 @@ mod tests {
|
||||
let mut runtime = Runtime::new().expect("could not start runtime");
|
||||
let mut https = runtime.block_on(connect).expect("https connect failed");
|
||||
|
||||
let sending = runtime.block_on(future::lazy(|cx| https.send_message(request, cx)));
|
||||
let sending = runtime.block_on(future::lazy(|cx| {
|
||||
https.send_message::<TokioTime>(request, cx)
|
||||
}));
|
||||
let response: DnsResponse = runtime.block_on(sending).expect("send_message failed");
|
||||
|
||||
//assert_eq!(response.addr(), SocketAddr::from(([1, 1, 1, 1], 443)));
|
||||
|
@ -18,6 +18,7 @@ use tokio::net::TcpStream as TokioTcpStream;
|
||||
use tokio_tls::TlsStream as TokioTlsStream;
|
||||
|
||||
use trust_dns_proto::error::ProtoError;
|
||||
use trust_dns_proto::iocompat::AsyncIo02As03;
|
||||
use trust_dns_proto::tcp::TcpClientStream;
|
||||
use trust_dns_proto::xfer::BufDnsStreamHandle;
|
||||
|
||||
@ -26,7 +27,7 @@ use crate::TlsStreamBuilder;
|
||||
/// TlsClientStream secure DNS over TCP stream
|
||||
///
|
||||
/// See TlsClientStreamBuilder::new()
|
||||
pub type TlsClientStream = TcpClientStream<TokioTlsStream<TokioTcpStream>>;
|
||||
pub type TlsClientStream = TcpClientStream<AsyncIo02As03<TokioTlsStream<TokioTcpStream>>>;
|
||||
|
||||
/// Builder for TlsClientStream
|
||||
pub struct TlsClientStreamBuilder(TlsStreamBuilder);
|
||||
|
@ -18,11 +18,12 @@ use native_tls::{Certificate, Identity, TlsConnector};
|
||||
use tokio::net::TcpStream as TokioTcpStream;
|
||||
use tokio_tls::{TlsConnector as TokioTlsConnector, TlsStream as TokioTlsStream};
|
||||
|
||||
use trust_dns_proto::iocompat::AsyncIo02As03;
|
||||
use trust_dns_proto::tcp::TcpStream;
|
||||
use trust_dns_proto::xfer::{BufStreamHandle, SerialMessage};
|
||||
|
||||
/// A TlsStream counterpart to the TcpStream which embeds a secure TlsStream
|
||||
pub type TlsStream = TcpStream<TokioTlsStream<TokioTcpStream>>;
|
||||
pub type TlsStream = TcpStream<AsyncIo02As03<TokioTlsStream<TokioTcpStream>>>;
|
||||
|
||||
fn tls_new(certs: Vec<Certificate>, pkcs12: Option<Identity>) -> io::Result<TlsConnector> {
|
||||
let mut builder = TlsConnector::builder();
|
||||
@ -53,7 +54,8 @@ pub fn tls_from_stream(
|
||||
let (message_sender, outbound_messages) = unbounded();
|
||||
let message_sender = BufStreamHandle::new(message_sender);
|
||||
|
||||
let stream = TcpStream::from_stream_with_receiver(stream, peer_addr, outbound_messages);
|
||||
let stream =
|
||||
TcpStream::from_stream_with_receiver(AsyncIo02As03(stream), peer_addr, outbound_messages);
|
||||
|
||||
(stream, message_sender)
|
||||
}
|
||||
@ -170,7 +172,7 @@ impl TlsStreamBuilder {
|
||||
.await?;
|
||||
|
||||
Ok(TcpStream::from_stream_with_receiver(
|
||||
tls_connected,
|
||||
AsyncIo02As03(tls_connected),
|
||||
name_server,
|
||||
outbound_messages,
|
||||
))
|
||||
|
@ -18,13 +18,14 @@ use tokio::net::TcpStream as TokioTcpStream;
|
||||
use tokio_openssl::SslStream as TokioTlsStream;
|
||||
|
||||
use trust_dns_proto::error::ProtoError;
|
||||
use trust_dns_proto::iocompat::AsyncIo02As03;
|
||||
use trust_dns_proto::tcp::TcpClientStream;
|
||||
use trust_dns_proto::xfer::BufDnsStreamHandle;
|
||||
|
||||
use super::TlsStreamBuilder;
|
||||
|
||||
/// A Type definition for the TLS stream
|
||||
pub type TlsClientStream = TcpClientStream<TokioTlsStream<TokioTcpStream>>;
|
||||
pub type TlsClientStream = TcpClientStream<AsyncIo02As03<TokioTlsStream<TokioTcpStream>>>;
|
||||
|
||||
/// A Builder for the TlsClientStream
|
||||
pub struct TlsClientStreamBuilder(TlsStreamBuilder);
|
||||
|
@ -20,6 +20,7 @@ use openssl::x509::{X509Ref, X509};
|
||||
use tokio::net::TcpStream as TokioTcpStream;
|
||||
use tokio_openssl::{self, SslStream as TokioTlsStream};
|
||||
|
||||
use trust_dns_proto::iocompat::AsyncIo02As03;
|
||||
use trust_dns_proto::tcp::TcpStream;
|
||||
use trust_dns_proto::xfer::BufStreamHandle;
|
||||
|
||||
@ -56,7 +57,7 @@ impl TlsIdentityExt for SslContextBuilder {
|
||||
}
|
||||
|
||||
/// A TlsStream counterpart to the TcpStream which embeds a secure TlsStream
|
||||
pub type TlsStream = TcpStream<TokioTlsStream<TokioTcpStream>>;
|
||||
pub type TlsStream = TcpStream<AsyncIo02As03<TokioTlsStream<TokioTcpStream>>>;
|
||||
|
||||
fn new(certs: Vec<X509>, pkcs12: Option<ParsedPkcs12>) -> io::Result<SslConnector> {
|
||||
let mut tls = SslConnector::builder(SslMethod::tls()).map_err(|e| {
|
||||
@ -115,7 +116,7 @@ fn new(certs: Vec<X509>, pkcs12: Option<ParsedPkcs12>) -> io::Result<SslConnecto
|
||||
///
|
||||
/// This is intended for use with a TlsListener and Incoming connections
|
||||
pub fn tls_stream_from_existing_tls_stream(
|
||||
stream: TokioTlsStream<TokioTcpStream>,
|
||||
stream: AsyncIo02As03<TokioTlsStream<TokioTcpStream>>,
|
||||
peer_addr: SocketAddr,
|
||||
) -> (TlsStream, BufStreamHandle) {
|
||||
let (message_sender, outbound_messages) = unbounded();
|
||||
@ -247,7 +248,11 @@ impl TlsStreamBuilder {
|
||||
// sending and receiving tcp packets.
|
||||
let stream = Box::pin(
|
||||
connect_tls(tls_config, dns_name, name_server).map_ok(move |s| {
|
||||
TcpStream::from_stream_with_receiver(s, name_server, outbound_messages)
|
||||
TcpStream::from_stream_with_receiver(
|
||||
AsyncIo02As03(s),
|
||||
name_server,
|
||||
outbound_messages,
|
||||
)
|
||||
}),
|
||||
);
|
||||
|
||||
|
@ -40,7 +40,7 @@ dnssec-openssl = ["dnssec", "openssl"]
|
||||
dnssec-ring = ["dnssec", "ring"]
|
||||
dnssec = ["data-encoding"]
|
||||
testing = []
|
||||
tokio-runtime = [ "tokio/rt-core", "tokio/udp", "tokio/tcp" ]
|
||||
tokio-runtime = ["tokio/rt-core", "tokio/udp", "tokio/tcp", "tokio/time"]
|
||||
default = ["tokio-runtime"]
|
||||
|
||||
serde-config = ["serde"]
|
||||
@ -70,8 +70,9 @@ ring = { version = "0.16", optional = true, features = ["std"] }
|
||||
serde = { version = "1.0", optional = true }
|
||||
smallvec = "1.0"
|
||||
socket2 = { version = "0.3.10" }
|
||||
tokio = { version = "0.2.1", features = ["time"] }
|
||||
tokio = { version = "0.2.1", optional = true }
|
||||
url = "2.1.0"
|
||||
|
||||
[dev-dependencies]
|
||||
env_logger = "0.7"
|
||||
tokio = { version = "0.2.1", features = ["rt-core", "time"] }
|
||||
|
@ -23,8 +23,6 @@ use openssl::error::ErrorStack as SslErrorStack;
|
||||
use ring::error::Unspecified;
|
||||
|
||||
use failure::{Backtrace, Context, Fail};
|
||||
use tokio::time::Elapsed;
|
||||
use tokio::time::Error as TimerError;
|
||||
|
||||
/// An alias for results returned by functions of this crate
|
||||
pub type ProtoResult<T> = ::std::result::Result<T, ProtoError>;
|
||||
@ -278,18 +276,6 @@ impl From<SslErrorStack> for ProtoError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<TimerError> for ProtoError {
|
||||
fn from(e: TimerError) -> ProtoError {
|
||||
e.context(ProtoErrorKind::Timer).into()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Elapsed> for ProtoError {
|
||||
fn from(e: Elapsed) -> ProtoError {
|
||||
e.context(ProtoErrorKind::Timeout).into()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<::url::ParseError> for ProtoError {
|
||||
fn from(e: ::url::ParseError) -> ProtoError {
|
||||
e.context(ProtoErrorKind::UrlParsing).into()
|
||||
|
@ -16,14 +16,14 @@
|
||||
|
||||
//! Trust-DNS Protocol library
|
||||
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
use std::future::Future;
|
||||
use async_trait::async_trait;
|
||||
use futures::Future;
|
||||
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
#[cfg(feature = "tokio-runtime")]
|
||||
use std::marker::Send;
|
||||
use std::time::Duration;
|
||||
#[cfg(any(test, feature = "tokio-runtime"))]
|
||||
use tokio::runtime::Runtime;
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
#[cfg(feature = "tokio-runtime")]
|
||||
#[cfg(any(test, feature = "tokio-runtime"))]
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
macro_rules! try_ready_stream {
|
||||
@ -38,8 +38,7 @@ macro_rules! try_ready_stream {
|
||||
}
|
||||
|
||||
/// Spawn a background task, if it was present
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
#[cfg(feature = "tokio-runtime")]
|
||||
#[cfg(any(test, feature = "tokio-runtime"))]
|
||||
pub fn spawn_bg<F: Future<Output = R> + Send + 'static, R: Send + 'static>(
|
||||
runtime: &Runtime,
|
||||
background: F,
|
||||
@ -71,10 +70,50 @@ pub use crate::xfer::secure_dns_handle::SecureDnsHandle;
|
||||
#[doc(hidden)]
|
||||
pub use crate::xfer::{BufDnsStreamHandle, BufStreamHandle};
|
||||
|
||||
#[cfg(feature = "tokio-runtime")]
|
||||
#[doc(hidden)]
|
||||
pub mod iocompat {
|
||||
use std::io;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use futures::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::io::{AsyncRead as AsyncRead02, AsyncWrite as AsyncWrite02};
|
||||
|
||||
/// Conversion from `tokio::io::{AsyncRead, AsyncWrite}` to `std::io::{AsyncRead, AsyncWrite}`
|
||||
pub struct AsyncIo02As03<T>(pub T);
|
||||
|
||||
impl<T> Unpin for AsyncIo02As03<T> {}
|
||||
impl<R: AsyncRead02 + Unpin> AsyncRead for AsyncIo02As03<R> {
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context,
|
||||
buf: &mut [u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
Pin::new(&mut self.0).poll_read(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl<W: AsyncWrite02 + Unpin> AsyncWrite for AsyncIo02As03<W> {
|
||||
fn poll_write(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
Pin::new(&mut self.0).poll_write(cx, buf)
|
||||
}
|
||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
Pin::new(&mut self.0).poll_flush(cx)
|
||||
}
|
||||
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
Pin::new(&mut self.0).poll_shutdown(cx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Generic executor.
|
||||
// This trait is created to facilitate running the tests defined in the tests mod using different types of
|
||||
// executors. It's used in Fuchsia OS, please be mindful when update it.
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub trait Executor {
|
||||
/// Spawns a future object to run synchronously or asynchronously depending on the specific
|
||||
/// executor.
|
||||
@ -88,3 +127,39 @@ impl Executor for Runtime {
|
||||
self.block_on(future)
|
||||
}
|
||||
}
|
||||
|
||||
/// Generic Time for Delay and Timeout.
|
||||
// This trait is created to allow to use different types of time systems. It's used in Fuchsia OS, please be mindful when update it.
|
||||
#[async_trait]
|
||||
pub trait Time {
|
||||
/// Return a type that implements `Future` that will wait until the specified duration has
|
||||
/// elapsed.
|
||||
async fn delay_for(duration: Duration) -> ();
|
||||
|
||||
/// Return a type that implement `Future` to complete before the specified duration has elapsed.
|
||||
async fn timeout<F: 'static + Future + Send>(
|
||||
duration: Duration,
|
||||
future: F,
|
||||
) -> Result<F::Output, std::io::Error>;
|
||||
}
|
||||
|
||||
/// New type which is implemented using tokio::time::{Delay, Timeout}
|
||||
#[cfg(any(test, feature = "tokio-runtime"))]
|
||||
pub struct TokioTime;
|
||||
|
||||
#[cfg(any(test, feature = "tokio-runtime"))]
|
||||
#[async_trait]
|
||||
impl Time for TokioTime {
|
||||
async fn delay_for(duration: Duration) -> () {
|
||||
tokio::time::delay_for(duration).await
|
||||
}
|
||||
|
||||
async fn timeout<F: 'static + Future + Send>(
|
||||
duration: Duration,
|
||||
future: F,
|
||||
) -> Result<F::Output, std::io::Error> {
|
||||
tokio::time::timeout(duration, future)
|
||||
.await
|
||||
.map_err(move |_| std::io::Error::new(std::io::ErrorKind::TimedOut, "future timed out"))
|
||||
}
|
||||
}
|
||||
|
@ -15,13 +15,16 @@ use std::time::Duration;
|
||||
|
||||
#[cfg(feature = "tokio-runtime")]
|
||||
use async_trait::async_trait;
|
||||
use futures::io::{AsyncRead, AsyncWrite};
|
||||
use futures::{Future, Stream, StreamExt, TryFutureExt};
|
||||
use log::warn;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
|
||||
use crate::error::ProtoError;
|
||||
#[cfg(feature = "tokio-runtime")]
|
||||
use crate::iocompat::AsyncIo02As03;
|
||||
use crate::tcp::{Connect, TcpStream};
|
||||
use crate::xfer::{DnsClientStream, SerialMessage};
|
||||
use crate::Time;
|
||||
use crate::{BufDnsStreamHandle, DnsStreamHandle};
|
||||
|
||||
/// Tcp client stream
|
||||
@ -41,13 +44,13 @@ impl<S: Connect + 'static + Send> TcpClientStream<S> {
|
||||
///
|
||||
/// * `name_server` - the IP and Port of the DNS server to connect to
|
||||
#[allow(clippy::new_ret_no_self)]
|
||||
pub fn new(
|
||||
pub fn new<TE: 'static + Time>(
|
||||
name_server: SocketAddr,
|
||||
) -> (
|
||||
TcpClientConnect<S::Transport>,
|
||||
Box<dyn DnsStreamHandle + 'static + Send>,
|
||||
) {
|
||||
Self::with_timeout(name_server, Duration::from_secs(5))
|
||||
Self::with_timeout::<TE>(name_server, Duration::from_secs(5))
|
||||
}
|
||||
|
||||
/// Constructs a new TcpStream for a client to the specified SocketAddr.
|
||||
@ -56,14 +59,14 @@ impl<S: Connect + 'static + Send> TcpClientStream<S> {
|
||||
///
|
||||
/// * `name_server` - the IP and Port of the DNS server to connect to
|
||||
/// * `timeout` - connection timeout
|
||||
pub fn with_timeout(
|
||||
pub fn with_timeout<TE: 'static + Time>(
|
||||
name_server: SocketAddr,
|
||||
timeout: Duration,
|
||||
) -> (
|
||||
TcpClientConnect<S::Transport>,
|
||||
Box<dyn DnsStreamHandle + 'static + Send>,
|
||||
) {
|
||||
let (stream_future, sender) = TcpStream::<S>::with_timeout(name_server, timeout);
|
||||
let (stream_future, sender) = TcpStream::<S>::with_timeout::<TE>(name_server, timeout);
|
||||
|
||||
let new_future = Box::pin(
|
||||
stream_future
|
||||
@ -132,27 +135,30 @@ use tokio::net::TcpStream as TokioTcpStream;
|
||||
|
||||
#[cfg(feature = "tokio-runtime")]
|
||||
#[async_trait]
|
||||
impl Connect for TokioTcpStream {
|
||||
type Transport = TokioTcpStream;
|
||||
impl Connect for AsyncIo02As03<TokioTcpStream> {
|
||||
type Transport = AsyncIo02As03<TokioTcpStream>;
|
||||
|
||||
async fn connect(addr: SocketAddr) -> io::Result<Self::Transport> {
|
||||
TokioTcpStream::connect(&addr).await
|
||||
TokioTcpStream::connect(&addr).await.map(AsyncIo02As03)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[cfg(feature = "tokio-runtime")]
|
||||
mod tests {
|
||||
use super::AsyncIo02As03;
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
use std::net::Ipv6Addr;
|
||||
use std::net::{IpAddr, Ipv4Addr};
|
||||
use tokio::{net::TcpStream as TokioTcpStream, runtime::Runtime};
|
||||
use tokio::net::TcpStream as TokioTcpStream;
|
||||
use tokio::runtime::Runtime;
|
||||
|
||||
use crate::tests::tcp_client_stream_test;
|
||||
use crate::TokioTime;
|
||||
#[test]
|
||||
fn test_tcp_stream_ipv4() {
|
||||
let io_loop = Runtime::new().expect("failed to create tokio runtime");
|
||||
tcp_client_stream_test::<TokioTcpStream, Runtime>(
|
||||
tcp_client_stream_test::<AsyncIo02As03<TokioTcpStream>, Runtime, TokioTime>(
|
||||
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
|
||||
io_loop,
|
||||
)
|
||||
@ -162,7 +168,7 @@ mod tests {
|
||||
#[cfg(not(target_os = "linux"))] // ignored until Travis-CI fixes IPv6
|
||||
fn test_tcp_stream_ipv6() {
|
||||
let io_loop = Runtime::new().expect("failed to create tokio runtime");
|
||||
tcp_client_stream_test::<TokioTcpStream, Runtime>(
|
||||
tcp_client_stream_test::<AsyncIo02As03<TokioTcpStream>, Runtime, TokioTime>(
|
||||
IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)),
|
||||
io_loop,
|
||||
)
|
||||
|
@ -17,11 +17,12 @@ use std::time::Duration;
|
||||
use async_trait::async_trait;
|
||||
use futures::channel::mpsc::{unbounded, UnboundedReceiver};
|
||||
use futures::stream::{Fuse, Peekable, Stream, StreamExt};
|
||||
use futures::{ready, Future, FutureExt, TryFutureExt};
|
||||
use futures::{self, ready, Future, FutureExt};
|
||||
use log::debug;
|
||||
|
||||
use crate::error::*;
|
||||
use crate::xfer::{BufStreamHandle, SerialMessage};
|
||||
use crate::Time;
|
||||
|
||||
/// Trait for TCP connection
|
||||
#[async_trait]
|
||||
@ -30,7 +31,7 @@ where
|
||||
Self: Sized,
|
||||
{
|
||||
/// TcpSteam
|
||||
type Transport: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send;
|
||||
type Transport: futures::io::AsyncRead + futures::io::AsyncWrite + Send;
|
||||
|
||||
/// connect to tcp
|
||||
async fn connect(addr: SocketAddr) -> io::Result<Self::Transport>;
|
||||
@ -118,7 +119,7 @@ impl<S: Connect + 'static> TcpStream<S> {
|
||||
///
|
||||
/// * `name_server` - the IP and Port of the DNS server to connect to
|
||||
#[allow(clippy::new_ret_no_self, clippy::type_complexity)]
|
||||
pub fn new<E>(
|
||||
pub fn new<E, TE>(
|
||||
name_server: SocketAddr,
|
||||
) -> (
|
||||
impl Future<Output = Result<TcpStream<S::Transport>, io::Error>> + Send,
|
||||
@ -126,8 +127,9 @@ impl<S: Connect + 'static> TcpStream<S> {
|
||||
)
|
||||
where
|
||||
E: FromProtoError,
|
||||
TE: Time,
|
||||
{
|
||||
Self::with_timeout(name_server, Duration::from_secs(5))
|
||||
Self::with_timeout::<TE>(name_server, Duration::from_secs(5))
|
||||
}
|
||||
|
||||
/// Creates a new future of the eventually establish a IO stream connection or fail trying
|
||||
@ -137,7 +139,7 @@ impl<S: Connect + 'static> TcpStream<S> {
|
||||
/// * `name_server` - the IP and Port of the DNS server to connect to
|
||||
/// * `timeout` - connection timeout
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub fn with_timeout(
|
||||
pub fn with_timeout<TE: Time>(
|
||||
name_server: SocketAddr,
|
||||
timeout: Duration,
|
||||
) -> (
|
||||
@ -148,25 +150,18 @@ impl<S: Connect + 'static> TcpStream<S> {
|
||||
let message_sender = BufStreamHandle::new(message_sender);
|
||||
// This set of futures collapses the next tcp socket into a stream which can be used for
|
||||
// sending and receiving tcp packets.
|
||||
let stream_fut = Self::connect(name_server, timeout, outbound_messages);
|
||||
let stream_fut = Self::connect::<TE>(name_server, timeout, outbound_messages);
|
||||
|
||||
(stream_fut, message_sender)
|
||||
}
|
||||
|
||||
async fn connect(
|
||||
async fn connect<TE: Time>(
|
||||
name_server: SocketAddr,
|
||||
timeout: Duration,
|
||||
outbound_messages: UnboundedReceiver<SerialMessage>,
|
||||
) -> Result<TcpStream<S::Transport>, io::Error> {
|
||||
let tcp = S::connect(name_server);
|
||||
tokio::time::timeout(timeout, tcp)
|
||||
.map_err(move |_| {
|
||||
debug!("timed out connecting to: {}", name_server);
|
||||
io::Error::new(
|
||||
io::ErrorKind::TimedOut,
|
||||
format!("timed out connecting to: {}", name_server),
|
||||
)
|
||||
})
|
||||
TE::timeout(timeout, tcp)
|
||||
.map(
|
||||
move |tcp_stream: Result<Result<S::Transport, io::Error>, _>| {
|
||||
tcp_stream
|
||||
@ -190,7 +185,7 @@ impl<S: Connect + 'static> TcpStream<S> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite> TcpStream<S> {
|
||||
impl<S: futures::io::AsyncRead + futures::io::AsyncWrite> TcpStream<S> {
|
||||
/// Initializes a TcpStream.
|
||||
///
|
||||
/// This is intended for use with a TcpListener and Incoming.
|
||||
@ -227,7 +222,7 @@ impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite> TcpStream<S> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin> Stream for TcpStream<S> {
|
||||
impl<S: futures::io::AsyncRead + futures::io::AsyncWrite + Unpin> Stream for TcpStream<S> {
|
||||
type Item = io::Result<SerialMessage>;
|
||||
|
||||
#[allow(clippy::cognitive_complexity)]
|
||||
@ -450,20 +445,27 @@ mod tests {
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
use std::net::Ipv6Addr;
|
||||
use std::net::{IpAddr, Ipv4Addr};
|
||||
use tokio::{net::TcpStream as TokioTcpStream, runtime::Runtime};
|
||||
use tokio::net::TcpStream as TokioTcpStream;
|
||||
use tokio::runtime::Runtime;
|
||||
|
||||
use crate::iocompat::AsyncIo02As03;
|
||||
use crate::TokioTime;
|
||||
|
||||
use crate::tests::tcp_stream_test;
|
||||
#[test]
|
||||
fn test_tcp_stream_ipv4() {
|
||||
let io_loop = Runtime::new().expect("failed to create tokio runtime");
|
||||
tcp_stream_test::<TokioTcpStream, Runtime>(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), io_loop)
|
||||
tcp_stream_test::<AsyncIo02As03<TokioTcpStream>, Runtime, TokioTime>(
|
||||
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
|
||||
io_loop,
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(not(target_os = "linux"))] // ignored until Travis-CI fixes IPv6
|
||||
fn test_tcp_stream_ipv6() {
|
||||
let io_loop = Runtime::new().expect("failed to create tokio runtime");
|
||||
tcp_stream_test::<TokioTcpStream, Runtime>(
|
||||
tcp_stream_test::<AsyncIo02As03<TokioTcpStream>, Runtime, TokioTime>(
|
||||
IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)),
|
||||
io_loop,
|
||||
)
|
||||
|
@ -2,13 +2,13 @@ use std::io::{Read, Write};
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use std::sync::{atomic::AtomicBool, Arc};
|
||||
|
||||
use futures::io::{AsyncRead, AsyncWrite};
|
||||
use futures::stream::StreamExt;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
|
||||
use crate::error::ProtoError;
|
||||
use crate::tcp::{Connect, TcpClientStream, TcpStream};
|
||||
use crate::xfer::SerialMessage;
|
||||
use crate::Executor;
|
||||
use crate::{Executor, Time};
|
||||
|
||||
const TEST_BYTES: &[u8; 8] = b"DEADBEEF";
|
||||
const TEST_BYTES_LEN: usize = 8;
|
||||
@ -84,8 +84,10 @@ fn tcp_server_setup(
|
||||
}
|
||||
|
||||
/// Test tcp_stream.
|
||||
pub fn tcp_stream_test<S: Connect + 'static, E: Executor>(server_addr: IpAddr, mut exec: E)
|
||||
where
|
||||
pub fn tcp_stream_test<S: Connect + 'static, E: Executor, TE: Time>(
|
||||
server_addr: IpAddr,
|
||||
mut exec: E,
|
||||
) where
|
||||
<S as Connect>::Transport: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
let (succeeded, server_handle, server_addr) =
|
||||
@ -96,7 +98,7 @@ where
|
||||
// the tests should run within 5 seconds... right?
|
||||
// TODO: add timeout here, so that test never hangs...
|
||||
// let timeout = Timeout::new(Duration::from_secs(5));
|
||||
let (stream, sender) = TcpStream::<S>::new::<ProtoError>(server_addr);
|
||||
let (stream, sender) = TcpStream::<S>::new::<ProtoError, TE>(server_addr);
|
||||
|
||||
let mut stream = exec.block_on(stream).expect("run failed to get stream");
|
||||
|
||||
@ -119,7 +121,7 @@ where
|
||||
}
|
||||
|
||||
/// Test tcp_client_stream.
|
||||
pub fn tcp_client_stream_test<S: Connect + Send + 'static, E: Executor>(
|
||||
pub fn tcp_client_stream_test<S: Connect + Send + 'static, E: Executor, TE: Time + 'static>(
|
||||
server_addr: IpAddr,
|
||||
mut exec: E,
|
||||
) where
|
||||
@ -133,7 +135,7 @@ pub fn tcp_client_stream_test<S: Connect + Send + 'static, E: Executor>(
|
||||
// the tests should run within 5 seconds... right?
|
||||
// TODO: add timeout here, so that test never hangs...
|
||||
// let timeout = Timeout::new(Duration::from_secs(5));
|
||||
let (stream, mut sender) = TcpClientStream::<S>::new(server_addr);
|
||||
let (stream, mut sender) = TcpClientStream::<S>::new::<TE>(server_addr);
|
||||
|
||||
let mut stream = exec.block_on(stream).expect("run failed to get stream");
|
||||
|
||||
|
@ -4,7 +4,7 @@ use futures::stream::StreamExt;
|
||||
use log::debug;
|
||||
|
||||
use crate::udp::{UdpClientStream, UdpSocket, UdpStream};
|
||||
use crate::Executor;
|
||||
use crate::{Executor, Time};
|
||||
|
||||
/// Test next random udpsocket.
|
||||
pub fn next_random_socket_test<S: UdpSocket + Send + 'static, E: Executor>(mut exec: E) {
|
||||
@ -109,7 +109,7 @@ pub fn udp_stream_test<S: UdpSocket + Send + 'static, E: Executor>(
|
||||
|
||||
/// Test udp_client_stream.
|
||||
#[allow(clippy::print_stdout)]
|
||||
pub fn udp_client_stream_test<S: UdpSocket + Send + 'static, E: Executor>(
|
||||
pub fn udp_client_stream_test<S: UdpSocket + Send + 'static, E: Executor, TE: Time>(
|
||||
server_addr: IpAddr,
|
||||
mut exec: E,
|
||||
) {
|
||||
@ -207,7 +207,7 @@ pub fn udp_client_stream_test<S: UdpSocket + Send + 'static, E: Executor>(
|
||||
for i in 0..send_recv_times {
|
||||
// test once
|
||||
let response_future = exec.block_on(future::lazy(|cx| {
|
||||
stream.send_message(DnsRequest::new(query.clone(), Default::default()), cx)
|
||||
stream.send_message::<TE>(DnsRequest::new(query.clone(), Default::default()), cx)
|
||||
}));
|
||||
println!("client sending request {}", i);
|
||||
let response = match exec.block_on(response_future) {
|
||||
|
@ -7,6 +7,7 @@
|
||||
|
||||
use std::borrow::Borrow;
|
||||
use std::fmt::{self, Display};
|
||||
use std::io;
|
||||
use std::marker::PhantomData;
|
||||
use std::net::SocketAddr;
|
||||
use std::pin::Pin;
|
||||
@ -16,13 +17,13 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use futures::{Future, Stream};
|
||||
use log::{debug, warn};
|
||||
use tokio::time::Elapsed;
|
||||
|
||||
use crate::error::ProtoError;
|
||||
use crate::op::message::NoopMessageFinalizer;
|
||||
use crate::op::{MessageFinalizer, OpCode};
|
||||
use crate::udp::udp_stream::{NextRandomUdpSocket, UdpSocket};
|
||||
use crate::xfer::{DnsRequest, DnsRequestSender, DnsResponse, SerialMessage};
|
||||
use crate::Time;
|
||||
|
||||
/// A UDP client stream of DNS binary packets
|
||||
///
|
||||
@ -109,7 +110,7 @@ impl<S: UdpSocket + Send + 'static, MF: MessageFinalizer> DnsRequestSender
|
||||
{
|
||||
type DnsResponseFuture = UdpResponse;
|
||||
|
||||
fn send_message(
|
||||
fn send_message<TE: Time>(
|
||||
&mut self,
|
||||
mut message: DnsRequest,
|
||||
_cx: &mut Context,
|
||||
@ -130,7 +131,7 @@ impl<S: UdpSocket + Send + 'static, MF: MessageFinalizer> DnsRequestSender
|
||||
Err(err) => {
|
||||
let err: ProtoError = err;
|
||||
|
||||
return UdpResponse::complete(SingleUseUdpSocket::errored(err));
|
||||
return UdpResponse::complete::<_, TE>(SingleUseUdpSocket::errored(err));
|
||||
}
|
||||
};
|
||||
|
||||
@ -142,7 +143,7 @@ impl<S: UdpSocket + Send + 'static, MF: MessageFinalizer> DnsRequestSender
|
||||
if let Some(ref signer) = self.signer {
|
||||
if let Err(e) = message.finalize::<MF>(signer.borrow(), now) {
|
||||
debug!("could not sign message: {}", e);
|
||||
return UdpResponse::complete(SingleUseUdpSocket::errored(e));
|
||||
return UdpResponse::complete::<_, TE>(SingleUseUdpSocket::errored(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -150,18 +151,18 @@ impl<S: UdpSocket + Send + 'static, MF: MessageFinalizer> DnsRequestSender
|
||||
let bytes = match message.to_vec() {
|
||||
Ok(bytes) => bytes,
|
||||
Err(err) => {
|
||||
return UdpResponse::complete(SingleUseUdpSocket::errored(err));
|
||||
return UdpResponse::complete::<_, TE>(SingleUseUdpSocket::errored(err));
|
||||
}
|
||||
};
|
||||
|
||||
let message_id = message.id();
|
||||
let message = SerialMessage::new(bytes, self.name_server);
|
||||
|
||||
UdpResponse::new::<S>(message, message_id, self.timeout)
|
||||
UdpResponse::new::<S, TE>(message, message_id, self.timeout)
|
||||
}
|
||||
|
||||
fn error_response(err: ProtoError) -> Self::DnsResponseFuture {
|
||||
UdpResponse::complete(SingleUseUdpSocket::errored(err))
|
||||
fn error_response<TE: Time>(err: ProtoError) -> Self::DnsResponseFuture {
|
||||
UdpResponse::complete::<_, TE>(SingleUseUdpSocket::errored(err))
|
||||
}
|
||||
|
||||
fn shutdown(&mut self) {
|
||||
@ -190,7 +191,7 @@ impl<S: Send, MF: MessageFinalizer> Stream for UdpClientStream<S, MF> {
|
||||
/// A future that resolves to
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub struct UdpResponse(
|
||||
Pin<Box<dyn Future<Output = Result<Result<DnsResponse, ProtoError>, Elapsed>> + Send>>,
|
||||
Pin<Box<dyn Future<Output = Result<Result<DnsResponse, ProtoError>, io::Error>> + Send>>,
|
||||
);
|
||||
|
||||
impl UdpResponse {
|
||||
@ -200,23 +201,29 @@ impl UdpResponse {
|
||||
///
|
||||
/// * `request` - Serialized message being sent
|
||||
/// * `message_id` - Id of the message that was encoded in the serial message
|
||||
fn new<S: UdpSocket + Send + Unpin + 'static>(
|
||||
fn new<S: UdpSocket + Send + Unpin + 'static, T: Time>(
|
||||
request: SerialMessage,
|
||||
message_id: u16,
|
||||
timeout: Duration,
|
||||
) -> Self {
|
||||
UdpResponse(Box::pin(tokio::time::timeout(
|
||||
UdpResponse(T::timeout::<
|
||||
Pin<Box<dyn Future<Output = Result<DnsResponse, ProtoError>> + Send>>,
|
||||
>(
|
||||
timeout,
|
||||
SingleUseUdpSocket::send_serial_message::<S>(request, message_id),
|
||||
)))
|
||||
Box::pin(SingleUseUdpSocket::send_serial_message::<S>(
|
||||
request, message_id,
|
||||
)),
|
||||
))
|
||||
}
|
||||
|
||||
/// ad already completed future
|
||||
fn complete<F: Future<Output = Result<DnsResponse, ProtoError>> + Send + 'static>(
|
||||
fn complete<F: Future<Output = Result<DnsResponse, ProtoError>> + Send + 'static, T: Time>(
|
||||
f: F,
|
||||
) -> Self {
|
||||
// TODO: this constructure isn't really necessary
|
||||
UdpResponse(Box::pin(tokio::time::timeout(Duration::from_secs(5), f)))
|
||||
UdpResponse(T::timeout::<
|
||||
Pin<Box<dyn Future<Output = Result<DnsResponse, ProtoError>> + Send>>,
|
||||
>(Duration::from_secs(5), Box::pin(f)))
|
||||
}
|
||||
}
|
||||
|
||||
@ -346,8 +353,8 @@ impl SingleUseUdpSocket {
|
||||
#[cfg(feature = "tokio-runtime")]
|
||||
mod tests {
|
||||
#![allow(clippy::dbg_macro, clippy::print_stdout)]
|
||||
|
||||
use crate::tests::udp_client_stream_test;
|
||||
use crate::TokioTime;
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
use std::net::Ipv6Addr;
|
||||
use std::net::{IpAddr, Ipv4Addr};
|
||||
@ -356,7 +363,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_udp_client_stream_ipv4() {
|
||||
let io_loop = Runtime::new().expect("failed to create tokio runtime");
|
||||
udp_client_stream_test::<TokioUdpSocket, Runtime>(
|
||||
udp_client_stream_test::<TokioUdpSocket, Runtime, TokioTime>(
|
||||
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
|
||||
io_loop,
|
||||
)
|
||||
@ -366,7 +373,7 @@ mod tests {
|
||||
#[cfg(not(target_os = "linux"))] // ignored until Travis-CI fixes IPv6
|
||||
fn test_udp_client_stream_ipv6() {
|
||||
let io_loop = Runtime::new().expect("failed to create tokio runtime");
|
||||
udp_client_stream_test::<TokioUdpSocket, Runtime>(
|
||||
udp_client_stream_test::<TokioUdpSocket, Runtime, TokioTime>(
|
||||
IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)),
|
||||
io_loop,
|
||||
)
|
||||
|
@ -7,6 +7,7 @@
|
||||
|
||||
//! This module contains all the types for demuxing DNS oriented streams.
|
||||
|
||||
use std::marker::PhantomData;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
@ -22,6 +23,7 @@ use crate::xfer::{
|
||||
BufDnsRequestStreamHandle, DnsRequest, DnsRequestSender, DnsRequestStreamHandle, DnsResponse,
|
||||
OneshotDnsRequest,
|
||||
};
|
||||
use crate::Time;
|
||||
|
||||
/// This is a generic Exchange implemented over multiplexed DNS connection providers.
|
||||
///
|
||||
@ -45,7 +47,7 @@ where
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `stream` - the established IO stream for communication
|
||||
pub fn from_stream<S>(stream: S) -> (Self, DnsExchangeBackground<S, R>)
|
||||
pub fn from_stream<S, TE>(stream: S) -> (Self, DnsExchangeBackground<S, R, TE>)
|
||||
where
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static + Send + Unpin,
|
||||
{
|
||||
@ -56,17 +58,18 @@ where
|
||||
}
|
||||
|
||||
/// Wraps a stream where a sender and receiver have already been established
|
||||
pub fn from_stream_with_receiver<S>(
|
||||
pub fn from_stream_with_receiver<S, TE>(
|
||||
stream: S,
|
||||
receiver: UnboundedReceiver<OneshotDnsRequest<R>>,
|
||||
sender: DnsRequestStreamHandle<R>,
|
||||
) -> (Self, DnsExchangeBackground<S, R>)
|
||||
) -> (Self, DnsExchangeBackground<S, R, TE>)
|
||||
where
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static + Send + Unpin,
|
||||
{
|
||||
let background = DnsExchangeBackground {
|
||||
io_stream: stream,
|
||||
outbound_messages: receiver.peekable(),
|
||||
marker: PhantomData,
|
||||
};
|
||||
|
||||
let sender = BufDnsRequestStreamHandle::new(sender);
|
||||
@ -77,10 +80,11 @@ where
|
||||
/// Returns a future, which itself wraps a future which is awaiting connection.
|
||||
///
|
||||
/// The connect_future should be lazy.
|
||||
pub fn connect<F, S>(connect_future: F) -> DnsExchangeConnect<F, S, R>
|
||||
pub fn connect<F, S, TE>(connect_future: F) -> DnsExchangeConnect<F, S, R, TE>
|
||||
where
|
||||
F: Future<Output = Result<S, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static + Send + Unpin,
|
||||
TE: Time + Unpin,
|
||||
{
|
||||
let (message_sender, outbound_messages) = unbounded();
|
||||
let message_sender = DnsRequestStreamHandle::<R>::new(message_sender);
|
||||
@ -140,16 +144,17 @@ where
|
||||
///
|
||||
/// It must be spawned before any DNS messages are sent.
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
pub struct DnsExchangeBackground<S, R>
|
||||
pub struct DnsExchangeBackground<S, R, TE>
|
||||
where
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static + Send + Unpin,
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
{
|
||||
io_stream: S,
|
||||
outbound_messages: Peekable<UnboundedReceiver<OneshotDnsRequest<R>>>,
|
||||
marker: PhantomData<TE>,
|
||||
}
|
||||
|
||||
impl<S, R> DnsExchangeBackground<S, R>
|
||||
impl<S, R, TE> DnsExchangeBackground<S, R, TE>
|
||||
where
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static + Send + Unpin,
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
@ -164,10 +169,11 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, R> Future for DnsExchangeBackground<S, R>
|
||||
impl<S, R, TE> Future for DnsExchangeBackground<S, R, TE>
|
||||
where
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static + Send + Unpin,
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
TE: Time + Unpin,
|
||||
{
|
||||
type Output = Result<(), ProtoError>;
|
||||
|
||||
@ -213,7 +219,9 @@ where
|
||||
// if there is no peer, this connection should die...
|
||||
let (dns_request, serial_response): (DnsRequest, _) = dns_request.unwrap();
|
||||
|
||||
match serial_response.send_response(io_stream.send_message(dns_request, cx)) {
|
||||
match serial_response
|
||||
.send_response(io_stream.send_message::<TE>(dns_request, cx))
|
||||
{
|
||||
Ok(()) => (),
|
||||
Err(_) => {
|
||||
warn!("failed to associate send_message response to the sender");
|
||||
@ -247,17 +255,19 @@ where
|
||||
/// The future will return a tuple of the DnsExchange (for sending messages) and a background
|
||||
/// for running the background tasks. The background is optional as only one thread should run
|
||||
/// the background. If returned, it must be spawned before any dns requests will function.
|
||||
pub struct DnsExchangeConnect<F, S, R>(DnsExchangeConnectInner<F, S, R>)
|
||||
where
|
||||
F: Future<Output = Result<S, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static,
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin;
|
||||
|
||||
impl<F, S, R> DnsExchangeConnect<F, S, R>
|
||||
pub struct DnsExchangeConnect<F, S, R, TE>(DnsExchangeConnectInner<F, S, R, TE>)
|
||||
where
|
||||
F: Future<Output = Result<S, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static,
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
TE: Time + Unpin;
|
||||
|
||||
impl<F, S, R, TE> DnsExchangeConnect<F, S, R, TE>
|
||||
where
|
||||
F: Future<Output = Result<S, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static,
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
TE: Time + Unpin,
|
||||
{
|
||||
fn connect(
|
||||
connect_future: F,
|
||||
@ -273,24 +283,26 @@ where
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
impl<F, S, R> Future for DnsExchangeConnect<F, S, R>
|
||||
impl<F, S, R, TE> Future for DnsExchangeConnect<F, S, R, TE>
|
||||
where
|
||||
F: Future<Output = Result<S, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static + Send + Unpin,
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
TE: Time + Unpin,
|
||||
{
|
||||
type Output = Result<(DnsExchange<R>, DnsExchangeBackground<S, R>), ProtoError>;
|
||||
type Output = Result<(DnsExchange<R>, DnsExchangeBackground<S, R, TE>), ProtoError>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
self.0.poll_unpin(cx)
|
||||
}
|
||||
}
|
||||
|
||||
enum DnsExchangeConnectInner<F, S, R>
|
||||
enum DnsExchangeConnectInner<F, S, R, TE>
|
||||
where
|
||||
F: Future<Output = Result<S, ProtoError>> + 'static + Send,
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static + Send,
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
TE: Time + Unpin,
|
||||
{
|
||||
Connecting {
|
||||
connect_future: F,
|
||||
@ -299,7 +311,7 @@ where
|
||||
},
|
||||
Connected {
|
||||
exchange: DnsExchange<R>,
|
||||
background: Option<DnsExchangeBackground<S, R>>,
|
||||
background: Option<DnsExchangeBackground<S, R, TE>>,
|
||||
},
|
||||
FailAll {
|
||||
error: ProtoError,
|
||||
@ -308,13 +320,14 @@ where
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
impl<F, S, R> Future for DnsExchangeConnectInner<F, S, R>
|
||||
impl<F, S, R, TE> Future for DnsExchangeConnectInner<F, S, R, TE>
|
||||
where
|
||||
F: Future<Output = Result<S, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static + Send + Unpin,
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
TE: Time + Unpin,
|
||||
{
|
||||
type Output = Result<(DnsExchange<R>, DnsExchangeBackground<S, R>), ProtoError>;
|
||||
type Output = Result<(DnsExchange<R>, DnsExchangeBackground<S, R, TE>), ProtoError>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
loop {
|
||||
@ -372,7 +385,7 @@ where
|
||||
Poll::Ready(opt) => opt,
|
||||
Poll::Pending => return Poll::Pending,
|
||||
} {
|
||||
let response = S::error_response(error.clone());
|
||||
let response = S::error_response::<TE>(error.clone());
|
||||
// ignoring errors... best effort send...
|
||||
outbound_message.unwrap().1.send_response(response).ok();
|
||||
}
|
||||
|
@ -11,6 +11,7 @@ use std::borrow::Borrow;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::{self, Display};
|
||||
use std::marker::Unpin;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
@ -23,7 +24,6 @@ use log::{debug, warn};
|
||||
use rand;
|
||||
use rand::distributions::{Distribution, Standard};
|
||||
use smallvec::SmallVec;
|
||||
use tokio::{self, time::Delay};
|
||||
|
||||
use crate::error::*;
|
||||
use crate::op::{Message, MessageFinalizer, OpCode};
|
||||
@ -32,6 +32,7 @@ use crate::xfer::{
|
||||
SerialMessage,
|
||||
};
|
||||
use crate::DnsStreamHandle;
|
||||
use crate::Time;
|
||||
|
||||
const QOS_MAX_RECEIVE_MSGS: usize = 100; // max number of messages to receive from the UDP socket
|
||||
|
||||
@ -45,7 +46,7 @@ struct ActiveRequest {
|
||||
// expecting more than one response
|
||||
// TODO: change the completion above to a Stream, and don't hold messages...
|
||||
responses: SmallVec<[Message; 1]>,
|
||||
timeout: Delay,
|
||||
timeout: Box<dyn Future<Output = ()> + Send + Unpin>,
|
||||
}
|
||||
|
||||
impl ActiveRequest {
|
||||
@ -53,7 +54,7 @@ impl ActiveRequest {
|
||||
completion: oneshot::Sender<Result<DnsResponse, ProtoError>>,
|
||||
request_id: u16,
|
||||
request_options: DnsRequestOptions,
|
||||
timeout: Delay,
|
||||
timeout: Box<dyn Future<Output = ()> + Send + Unpin>,
|
||||
) -> Self {
|
||||
ActiveRequest {
|
||||
completion,
|
||||
@ -308,7 +309,11 @@ where
|
||||
{
|
||||
type DnsResponseFuture = DnsMultiplexerSerialResponse;
|
||||
|
||||
fn send_message(&mut self, request: DnsRequest, cx: &mut Context) -> Self::DnsResponseFuture {
|
||||
fn send_message<TE: Time>(
|
||||
&mut self,
|
||||
request: DnsRequest,
|
||||
cx: &mut Context,
|
||||
) -> Self::DnsResponseFuture {
|
||||
if self.is_shutdown {
|
||||
panic!("can not send messages after stream is shutdown")
|
||||
}
|
||||
@ -350,12 +355,13 @@ where
|
||||
}
|
||||
|
||||
// store a Timeout for this message before sending
|
||||
let timeout = tokio::time::delay_for(self.timeout_duration);
|
||||
let timeout = TE::delay_for(self.timeout_duration);
|
||||
|
||||
let (complete, receiver) = oneshot::channel();
|
||||
|
||||
// send the message
|
||||
let active_request = ActiveRequest::new(complete, request.id(), request_options, timeout);
|
||||
let active_request =
|
||||
ActiveRequest::new(complete, request.id(), request_options, Box::new(timeout));
|
||||
|
||||
match request.to_vec() {
|
||||
Ok(buffer) => {
|
||||
@ -385,7 +391,7 @@ where
|
||||
DnsMultiplexerSerialResponseInner::Completion(receiver).into()
|
||||
}
|
||||
|
||||
fn error_response(error: ProtoError) -> Self::DnsResponseFuture {
|
||||
fn error_response<TE: Time>(error: ProtoError) -> Self::DnsResponseFuture {
|
||||
DnsMultiplexerSerialResponseInner::Err(Some(error)).into()
|
||||
}
|
||||
|
||||
|
@ -15,6 +15,7 @@ use futures::{ready, Future, Stream};
|
||||
use log::{debug, warn};
|
||||
|
||||
use crate::error::*;
|
||||
use crate::Time;
|
||||
|
||||
mod dns_exchange;
|
||||
pub mod dns_handle;
|
||||
@ -161,10 +162,14 @@ pub trait DnsRequestSender: Stream<Item = Result<(), ProtoError>> + Send + Unpin
|
||||
/// # Return
|
||||
///
|
||||
/// A future which will resolve to a SerialMessage response
|
||||
fn send_message(&mut self, message: DnsRequest, cx: &mut Context) -> Self::DnsResponseFuture;
|
||||
fn send_message<TE: Time>(
|
||||
&mut self,
|
||||
message: DnsRequest,
|
||||
cx: &mut Context,
|
||||
) -> Self::DnsResponseFuture;
|
||||
|
||||
/// Constructs an error response
|
||||
fn error_response(error: ProtoError) -> Self::DnsResponseFuture;
|
||||
fn error_response<TE: Time>(error: ProtoError) -> Self::DnsResponseFuture;
|
||||
|
||||
/// Allows the upstream user to inform the underling stream that it should shutdown.
|
||||
///
|
||||
|
@ -6,6 +6,7 @@ use std::net::SocketAddr;
|
||||
use crate::tls::CLIENT_CONFIG;
|
||||
|
||||
use proto::xfer::{DnsExchange, DnsExchangeConnect};
|
||||
use proto::TokioTime;
|
||||
use trust_dns_https::{
|
||||
HttpsClientConnect, HttpsClientResponse, HttpsClientStream, HttpsClientStreamBuilder,
|
||||
};
|
||||
@ -17,7 +18,7 @@ pub(crate) fn new_https_stream(
|
||||
socket_addr: SocketAddr,
|
||||
dns_name: String,
|
||||
client_config: Option<TlsClientConfig>,
|
||||
) -> DnsExchangeConnect<HttpsClientConnect, HttpsClientStream, HttpsClientResponse> {
|
||||
) -> DnsExchangeConnect<HttpsClientConnect, HttpsClientStream, HttpsClientResponse, TokioTime> {
|
||||
let client_config = client_config.map_or_else(
|
||||
|| CLIENT_CONFIG.clone(),
|
||||
|TlsClientConfig(client_config)| client_config,
|
||||
|
@ -25,6 +25,7 @@ use tokio_tls::TlsStream as TokioTlsStream;
|
||||
|
||||
use proto;
|
||||
use proto::error::ProtoError;
|
||||
use proto::iocompat::AsyncIo02As03;
|
||||
#[cfg(feature = "mdns")]
|
||||
use proto::multicast::{MdnsClientConnect, MdnsClientStream, MdnsQueryType};
|
||||
use proto::op::NoopMessageFinalizer;
|
||||
@ -34,6 +35,7 @@ use proto::xfer::{
|
||||
DnsExchange, DnsExchangeBackground, DnsExchangeConnect, DnsExchangeSend, DnsHandle,
|
||||
DnsMultiplexer, DnsMultiplexerConnect, DnsMultiplexerSerialResponse, DnsRequest, DnsResponse,
|
||||
};
|
||||
use proto::TokioTime;
|
||||
|
||||
#[cfg(feature = "dns-over-https")]
|
||||
use trust_dns_https::{self, HttpsClientConnect, HttpsClientResponse, HttpsClientStream};
|
||||
@ -98,7 +100,10 @@ impl ConnectionProvider for TokioConnectionProvider {
|
||||
let timeout = options.timeout;
|
||||
|
||||
let (stream, handle) =
|
||||
TcpClientStream::<TokioTcpStream>::with_timeout(socket_addr, timeout);
|
||||
TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::with_timeout::<TokioTime>(
|
||||
socket_addr,
|
||||
timeout,
|
||||
);
|
||||
// TODO: need config for Signer...
|
||||
let dns_conn = DnsMultiplexer::with_timeout(
|
||||
stream,
|
||||
@ -180,17 +185,19 @@ pub(crate) enum ConnectionConnect {
|
||||
UdpClientConnect<TokioUdpSocket>,
|
||||
UdpClientStream<TokioUdpSocket>,
|
||||
UdpResponse,
|
||||
TokioTime,
|
||||
>,
|
||||
),
|
||||
Tcp(
|
||||
DnsExchangeConnect<
|
||||
DnsMultiplexerConnect<
|
||||
TcpClientConnect<TokioTcpStream>,
|
||||
TcpClientStream<TokioTcpStream>,
|
||||
TcpClientConnect<AsyncIo02As03<TokioTcpStream>>,
|
||||
TcpClientStream<AsyncIo02As03<TokioTcpStream>>,
|
||||
NoopMessageFinalizer,
|
||||
>,
|
||||
DnsMultiplexer<TcpClientStream<TokioTcpStream>, NoopMessageFinalizer>,
|
||||
DnsMultiplexer<TcpClientStream<AsyncIo02As03<TokioTcpStream>>, NoopMessageFinalizer>,
|
||||
DnsMultiplexerSerialResponse,
|
||||
TokioTime,
|
||||
>,
|
||||
),
|
||||
#[cfg(feature = "dns-over-tls")]
|
||||
@ -201,28 +208,35 @@ pub(crate) enum ConnectionConnect {
|
||||
Box<
|
||||
dyn futures::Future<
|
||||
Output = Result<
|
||||
TcpClientStream<TokioTlsStream<TokioTcpStream>>,
|
||||
TcpClientStream<AsyncIo02As03<TokioTlsStream<TokioTcpStream>>>,
|
||||
ProtoError,
|
||||
>,
|
||||
> + Send
|
||||
+ 'static,
|
||||
>,
|
||||
>,
|
||||
TcpClientStream<TokioTlsStream<TokioTcpStream>>,
|
||||
TcpClientStream<AsyncIo02As03<TokioTlsStream<TokioTcpStream>>>,
|
||||
NoopMessageFinalizer,
|
||||
>,
|
||||
DnsMultiplexer<
|
||||
TcpClientStream<AsyncIo02As03<TokioTlsStream<TokioTcpStream>>>,
|
||||
NoopMessageFinalizer,
|
||||
>,
|
||||
DnsMultiplexer<TcpClientStream<TokioTlsStream<TokioTcpStream>>, NoopMessageFinalizer>,
|
||||
DnsMultiplexerSerialResponse,
|
||||
TokioTime,
|
||||
>,
|
||||
),
|
||||
#[cfg(feature = "dns-over-https")]
|
||||
Https(DnsExchangeConnect<HttpsClientConnect, HttpsClientStream, HttpsClientResponse>),
|
||||
Https(
|
||||
DnsExchangeConnect<HttpsClientConnect, HttpsClientStream, HttpsClientResponse, TokioTime>,
|
||||
),
|
||||
#[cfg(feature = "mdns")]
|
||||
Mdns(
|
||||
DnsExchangeConnect<
|
||||
DnsMultiplexerConnect<MdnsClientConnect, MdnsClientStream, NoopMessageFinalizer>,
|
||||
DnsMultiplexer<MdnsClientStream, NoopMessageFinalizer>,
|
||||
DnsMultiplexerSerialResponse,
|
||||
TokioTime,
|
||||
>,
|
||||
),
|
||||
}
|
||||
@ -394,27 +408,33 @@ impl Future for ConnectionBackground {
|
||||
#[allow(clippy::type_complexity)]
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
pub(crate) enum ConnectionBackgroundInner {
|
||||
Udp(DnsExchangeBackground<UdpClientStream<TokioUdpSocket>, UdpResponse>),
|
||||
Udp(DnsExchangeBackground<UdpClientStream<TokioUdpSocket>, UdpResponse, TokioTime>),
|
||||
Tcp(
|
||||
DnsExchangeBackground<
|
||||
DnsMultiplexer<TcpClientStream<TokioTcpStream>, NoopMessageFinalizer>,
|
||||
DnsMultiplexer<TcpClientStream<AsyncIo02As03<TokioTcpStream>>, NoopMessageFinalizer>,
|
||||
DnsMultiplexerSerialResponse,
|
||||
TokioTime,
|
||||
>,
|
||||
),
|
||||
#[cfg(feature = "dns-over-tls")]
|
||||
Tls(
|
||||
DnsExchangeBackground<
|
||||
DnsMultiplexer<TcpClientStream<TokioTlsStream<TokioTcpStream>>, NoopMessageFinalizer>,
|
||||
DnsMultiplexer<
|
||||
TcpClientStream<AsyncIo02As03<TokioTlsStream<TokioTcpStream>>>,
|
||||
NoopMessageFinalizer,
|
||||
>,
|
||||
DnsMultiplexerSerialResponse,
|
||||
TokioTime,
|
||||
>,
|
||||
),
|
||||
#[cfg(feature = "dns-over-https")]
|
||||
Https(DnsExchangeBackground<HttpsClientStream, HttpsClientResponse>),
|
||||
Https(DnsExchangeBackground<HttpsClientStream, HttpsClientResponse, TokioTime>),
|
||||
#[cfg(feature = "mdns")]
|
||||
Mdns(
|
||||
DnsExchangeBackground<
|
||||
DnsMultiplexer<MdnsClientStream, NoopMessageFinalizer>,
|
||||
DnsMultiplexerSerialResponse,
|
||||
TokioTime,
|
||||
>,
|
||||
),
|
||||
}
|
||||
|
@ -16,13 +16,15 @@ use rustls::ClientConfig;
|
||||
use tokio::net::TcpStream as TokioTcpStream;
|
||||
|
||||
use trust_dns_proto::error::ProtoError;
|
||||
use trust_dns_proto::iocompat::AsyncIo02As03;
|
||||
use trust_dns_proto::tcp::TcpClientStream;
|
||||
use trust_dns_proto::xfer::BufDnsStreamHandle;
|
||||
|
||||
use crate::tls_stream::tls_connect;
|
||||
|
||||
/// Type of TlsClientStream used with Rustls
|
||||
pub type TlsClientStream = TcpClientStream<tokio_rustls::client::TlsStream<TokioTcpStream>>;
|
||||
pub type TlsClientStream =
|
||||
TcpClientStream<AsyncIo02As03<tokio_rustls::client::TlsStream<TokioTcpStream>>>;
|
||||
|
||||
/// Creates a new TlsStream to the specified name_server
|
||||
///
|
||||
|
@ -20,6 +20,7 @@ use tokio::net::TcpStream as TokioTcpStream;
|
||||
use tokio_rustls::TlsConnector;
|
||||
use webpki::{DNSName, DNSNameRef};
|
||||
|
||||
use trust_dns_proto::iocompat::AsyncIo02As03;
|
||||
use trust_dns_proto::tcp::TcpStream;
|
||||
use trust_dns_proto::xfer::{BufStreamHandle, SerialMessage};
|
||||
|
||||
@ -35,7 +36,7 @@ pub type TlsStream<S> = TcpStream<S>;
|
||||
/// Initializes a TlsStream with an existing tokio_tls::TlsStream.
|
||||
///
|
||||
/// This is intended for use with a TlsListener and Incoming connections
|
||||
pub fn tls_from_stream<S: tokio::io::AsyncRead + tokio::io::AsyncWrite>(
|
||||
pub fn tls_from_stream<S: futures::io::AsyncRead + futures::io::AsyncWrite>(
|
||||
stream: S,
|
||||
peer_addr: SocketAddr,
|
||||
) -> (TlsStream<S>, BufStreamHandle) {
|
||||
@ -78,7 +79,12 @@ pub fn tls_connect(
|
||||
dns_name: String,
|
||||
client_config: Arc<ClientConfig>,
|
||||
) -> (
|
||||
Pin<Box<dyn Future<Output = Result<TlsStream<TokioTlsClientStream>, io::Error>> + Send>>,
|
||||
Pin<
|
||||
Box<
|
||||
dyn Future<Output = Result<TlsStream<AsyncIo02As03<TokioTlsClientStream>>, io::Error>>
|
||||
+ Send,
|
||||
>,
|
||||
>,
|
||||
BufStreamHandle,
|
||||
) {
|
||||
let (message_sender, outbound_messages) = unbounded();
|
||||
@ -104,7 +110,7 @@ async fn connect_tls(
|
||||
name_server: SocketAddr,
|
||||
dns_name: String,
|
||||
outbound_messages: UnboundedReceiver<SerialMessage>,
|
||||
) -> io::Result<TcpStream<TokioTlsClientStream>> {
|
||||
) -> io::Result<TcpStream<AsyncIo02As03<TokioTlsClientStream>>> {
|
||||
let tcp = TokioTcpStream::connect(&name_server).await?;
|
||||
|
||||
let dns_name = DNSNameRef::try_from_ascii_str(&dns_name)
|
||||
@ -122,7 +128,7 @@ async fn connect_tls(
|
||||
.await?;
|
||||
|
||||
Ok(TcpStream::from_stream_with_receiver(
|
||||
s,
|
||||
AsyncIo02As03(s),
|
||||
name_server,
|
||||
outbound_messages,
|
||||
))
|
||||
|
@ -21,6 +21,7 @@ use tokio::runtime::Runtime;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use proto::error::ProtoError;
|
||||
use proto::iocompat::AsyncIo02As03;
|
||||
use proto::op::Edns;
|
||||
use proto::serialize::binary::{BinDecodable, BinDecoder};
|
||||
use proto::tcp::TcpStream;
|
||||
@ -141,7 +142,8 @@ impl<T: RequestHandler> ServerFuture<T> {
|
||||
let src_addr = tcp_stream.peer_addr().unwrap();
|
||||
debug!("accepted request from: {}", src_addr);
|
||||
// take the created stream...
|
||||
let (buf_stream, stream_handle) = TcpStream::from_stream(tcp_stream, src_addr);
|
||||
let (buf_stream, stream_handle) =
|
||||
TcpStream::from_stream(AsyncIo02As03(tcp_stream), src_addr);
|
||||
let mut timeout_stream = TimeoutStream::new(buf_stream, timeout);
|
||||
//let request_stream = RequestStream::new(timeout_stream, stream_handle);
|
||||
|
||||
@ -257,7 +259,8 @@ impl<T: RequestHandler> ServerFuture<T> {
|
||||
}
|
||||
};
|
||||
debug!("accepted TLS request from: {}", src_addr);
|
||||
let (buf_stream, stream_handle) = TlsStream::from_stream(tls_stream, src_addr);
|
||||
let (buf_stream, stream_handle) =
|
||||
TlsStream::from_stream(AsyncIo02As03(tls_stream), src_addr);
|
||||
let mut timeout_stream = TimeoutStream::new(buf_stream, timeout);
|
||||
while let Some(message) = timeout_stream.next().await {
|
||||
let message = match message {
|
||||
@ -380,7 +383,7 @@ impl<T: RequestHandler> ServerFuture<T> {
|
||||
let tls_stream = tls_acceptor.accept(tcp_stream).await;
|
||||
|
||||
let tls_stream = match tls_stream {
|
||||
Ok(tls_stream) => tls_stream,
|
||||
Ok(tls_stream) => AsyncIo02As03(tls_stream),
|
||||
Err(e) => {
|
||||
debug!("tls handshake src: {} error: {}", src_addr, e);
|
||||
return;
|
||||
|
@ -42,6 +42,7 @@ use trust_dns_proto::xfer::DnsResponse;
|
||||
use trust_dns_proto::xfer::{
|
||||
DnsExchangeBackground, DnsMultiplexer, DnsMultiplexerSerialResponse, DnsStreamHandle,
|
||||
};
|
||||
use trust_dns_proto::{iocompat::AsyncIo02As03, TokioTime};
|
||||
use trust_dns_server::authority::{Authority, Catalog};
|
||||
|
||||
use trust_dns_integration::authority::create_example;
|
||||
@ -102,7 +103,7 @@ fn test_query_udp_ipv6() {
|
||||
fn test_query_tcp_ipv4() {
|
||||
let mut io_loop = Runtime::new().unwrap();
|
||||
let addr: SocketAddr = ("8.8.8.8", 53).to_socket_addrs().unwrap().next().unwrap();
|
||||
let (stream, sender) = TcpClientStream::<TokioTcpStream>::new(addr);
|
||||
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
|
||||
let client = AsyncClient::new(stream, sender, None);
|
||||
let (mut client, bg) = io_loop.block_on(client).expect("client failed to connect");
|
||||
trust_dns_proto::spawn_bg(&io_loop, bg);
|
||||
@ -121,7 +122,7 @@ fn test_query_tcp_ipv6() {
|
||||
.unwrap()
|
||||
.next()
|
||||
.unwrap();
|
||||
let (stream, sender) = TcpClientStream::<TokioTcpStream>::new(addr);
|
||||
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
|
||||
let client = AsyncClient::new(stream, sender, None);
|
||||
let (mut client, bg) = io_loop.block_on(client).expect("client failed to connect");
|
||||
trust_dns_proto::spawn_bg(&io_loop, bg);
|
||||
@ -231,6 +232,7 @@ async fn create_sig0_ready_client() -> (
|
||||
DnsExchangeBackground<
|
||||
DnsMultiplexer<TestClientStream, Signer, Box<dyn DnsStreamHandle>>,
|
||||
DnsMultiplexerSerialResponse,
|
||||
TokioTime,
|
||||
>,
|
||||
),
|
||||
Name,
|
||||
@ -948,8 +950,9 @@ fn test_timeout_query_tcp() {
|
||||
.next()
|
||||
.unwrap();
|
||||
|
||||
let (stream, sender) =
|
||||
TcpClientStream::<TokioTcpStream>::with_timeout(addr, std::time::Duration::from_millis(1));
|
||||
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::with_timeout::<
|
||||
TokioTime,
|
||||
>(addr, std::time::Duration::from_millis(1));
|
||||
let client = AsyncClient::with_timeout(
|
||||
Box::new(stream),
|
||||
sender,
|
||||
|
@ -14,6 +14,7 @@ use tokio::runtime::Runtime;
|
||||
use trust_dns_proto::op::{NoopMessageFinalizer, Query};
|
||||
use trust_dns_proto::rr::{DNSClass, Name, RData, Record, RecordType};
|
||||
use trust_dns_proto::xfer::{DnsExchange, DnsMultiplexer};
|
||||
use trust_dns_proto::TokioTime;
|
||||
use trust_dns_resolver::config::LookupIpStrategy;
|
||||
use trust_dns_resolver::lookup::{Lookup, LookupFuture};
|
||||
use trust_dns_resolver::lookup_ip::LookupIpFuture;
|
||||
@ -35,7 +36,7 @@ fn test_lookup() {
|
||||
let mut io_loop = Runtime::new().unwrap();
|
||||
let (stream, sender) = TestClientStream::new(Arc::new(Mutex::new(catalog)));
|
||||
let dns_conn = DnsMultiplexer::new(stream, Box::new(sender), NoopMessageFinalizer::new());
|
||||
let client = DnsExchange::connect(dns_conn);
|
||||
let client = DnsExchange::connect::<_, _, TokioTime>(dns_conn);
|
||||
|
||||
let (client, bg) = io_loop.block_on(client).expect("client failed to connect");
|
||||
trust_dns_proto::spawn_bg(&io_loop, bg);
|
||||
@ -64,7 +65,7 @@ fn test_lookup_hosts() {
|
||||
let (stream, sender) = TestClientStream::new(Arc::new(Mutex::new(catalog)));
|
||||
let dns_conn = DnsMultiplexer::new(stream, Box::new(sender), NoopMessageFinalizer::new());
|
||||
|
||||
let client = DnsExchange::connect(dns_conn);
|
||||
let client = DnsExchange::connect::<_, _, TokioTime>(dns_conn);
|
||||
let (client, bg) = io_loop.block_on(client).expect("client connect failed");
|
||||
trust_dns_proto::spawn_bg(&io_loop, bg);
|
||||
|
||||
@ -122,7 +123,7 @@ fn test_lookup_ipv4_like() {
|
||||
let (stream, sender) = TestClientStream::new(Arc::new(Mutex::new(catalog)));
|
||||
let dns_conn = DnsMultiplexer::new(stream, Box::new(sender), NoopMessageFinalizer::new());
|
||||
|
||||
let client = DnsExchange::connect(dns_conn);
|
||||
let client = DnsExchange::connect::<_, _, TokioTime>(dns_conn);
|
||||
let (client, bg) = io_loop.block_on(client).expect("client connect failed");
|
||||
trust_dns_proto::spawn_bg(&io_loop, bg);
|
||||
|
||||
@ -152,7 +153,7 @@ fn test_lookup_ipv4_like_fall_through() {
|
||||
let (stream, sender) = TestClientStream::new(Arc::new(Mutex::new(catalog)));
|
||||
let dns_conn = DnsMultiplexer::new(stream, Box::new(sender), NoopMessageFinalizer::new());
|
||||
|
||||
let client = DnsExchange::connect(dns_conn);
|
||||
let client = DnsExchange::connect::<_, _, TokioTime>(dns_conn);
|
||||
let (client, bg) = io_loop.block_on(client).expect("client connect failed");
|
||||
trust_dns_proto::spawn_bg(&io_loop, bg);
|
||||
|
||||
|
@ -25,6 +25,7 @@ use trust_dns_client::tcp::TcpClientStream;
|
||||
use trust_dns_proto::udp::{UdpClientConnect, UdpClientStream, UdpResponse};
|
||||
use trust_dns_proto::xfer::DnsMultiplexerSerialResponse;
|
||||
use trust_dns_proto::SecureDnsHandle;
|
||||
use trust_dns_proto::{iocompat::AsyncIo02As03, TokioTime};
|
||||
use trust_dns_server::authority::{Authority, Catalog};
|
||||
|
||||
use trust_dns_integration::authority::create_secure_example;
|
||||
@ -311,7 +312,7 @@ where
|
||||
|
||||
let mut io_loop = Runtime::new().unwrap();
|
||||
let addr: SocketAddr = ("8.8.8.8", 53).to_socket_addrs().unwrap().next().unwrap();
|
||||
let (stream, sender) = TcpClientStream::<TokioTcpStream>::new(addr);
|
||||
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
|
||||
let client = AsyncClient::new(Box::new(stream), sender, None);
|
||||
let (client, bg) = io_loop.block_on(client).expect("client failed to connect");
|
||||
trust_dns_proto::spawn_bg(&io_loop, bg);
|
||||
|
Loading…
Reference in New Issue
Block a user