associate Time type with Connect trait

This commit is contained in:
Dirkjan Ochtman 2020-10-14 14:41:20 +02:00 committed by Benjamin Fry
parent 2764de2260
commit ab46adcc09
11 changed files with 52 additions and 55 deletions

View File

@ -26,8 +26,8 @@ use trust_dns_client::rr::*;
use trust_dns_client::tcp::*;
use trust_dns_client::udp::*;
use trust_dns_proto::error::*;
use trust_dns_proto::iocompat::AsyncIo02As03;
use trust_dns_proto::xfer::*;
use trust_dns_proto::{iocompat::AsyncIo02As03, TokioTime};
fn find_test_port() -> u16 {
let server = std::net::UdpSocket::bind(("0.0.0.0", 0)).unwrap();
@ -183,7 +183,7 @@ fn trust_dns_tcp_bench(b: &mut Bencher) {
.unwrap()
.next()
.unwrap();
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TcpStream>>::new::<TokioTime>(addr);
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TcpStream>>::new(addr);
let mp = DnsMultiplexer::new(stream, sender, None::<Arc<Signer>>);
bench(b, mp);
@ -258,7 +258,7 @@ fn bind_tcp_bench(b: &mut Bencher) {
.unwrap()
.next()
.unwrap();
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TcpStream>>::new::<TokioTime>(addr);
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TcpStream>>::new(addr);
let mp = DnsMultiplexer::new(stream, sender, None::<Arc<Signer>>);
bench(b, mp);

View File

@ -70,7 +70,7 @@ async fn standard_tcp_conn(
.unwrap()
.next()
.unwrap();
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new(addr);
AsyncClient::new(stream, sender, None)
.await
.expect("new AsyncClient failed")

View File

@ -23,7 +23,6 @@ use trust_dns_client::op::ResponseCode;
use trust_dns_client::rr::*;
use trust_dns_client::tcp::TcpClientStream;
use trust_dns_client::udp::UdpClientStream;
use trust_dns_proto::TokioTime;
// TODO: Needed for when TLS tests are added back
// #[cfg(feature = "dns-over-openssl")]
@ -40,8 +39,7 @@ fn test_example_toml_startup() {
Ipv4Addr::new(127, 0, 0, 1).into(),
tcp_port.expect("no tcp_port"),
);
let (stream, sender) =
TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new(addr);
let client = AsyncClient::new(Box::new(stream), sender, None);
let (mut client, bg) = io_loop.block_on(client).expect("client failed to connect");
@ -54,8 +52,7 @@ fn test_example_toml_startup() {
Ipv4Addr::new(127, 0, 0, 1).into(),
tcp_port.expect("no tcp_port"),
);
let (stream, sender) =
TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new(addr);
let client = AsyncClient::new(Box::new(stream), sender, None);
let (mut client, bg) = io_loop.block_on(client).expect("client failed to connect");
@ -73,8 +70,7 @@ fn test_ipv4_only_toml_startup() {
Ipv4Addr::new(127, 0, 0, 1).into(),
tcp_port.expect("no tcp_port"),
);
let (stream, sender) =
TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new(addr);
let client = AsyncClient::new(Box::new(stream), sender, None);
let (mut client, bg) = io_loop.block_on(client).expect("client failed to connect");
@ -87,8 +83,7 @@ fn test_ipv4_only_toml_startup() {
Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1).into(),
tcp_port.expect("no tcp_port"),
);
let (stream, sender) =
TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new(addr);
let client = AsyncClient::new(Box::new(stream), sender, None);
assert!(io_loop.block_on(client).is_err());
@ -138,8 +133,7 @@ fn test_ipv4_and_ipv6_toml_startup() {
Ipv4Addr::new(127, 0, 0, 1).into(),
tcp_port.expect("no tcp_port"),
);
let (stream, sender) =
TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new(addr);
let client = AsyncClient::new(Box::new(stream), sender, None);
let (mut client, bg) = io_loop.block_on(client).expect("client failed to connect");
@ -151,8 +145,7 @@ fn test_ipv4_and_ipv6_toml_startup() {
Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1).into(),
tcp_port.expect("no tcp_port"),
);
let (stream, sender) =
TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new(addr);
let client = AsyncClient::new(Box::new(stream), sender, None);
let (mut client, bg) = io_loop.block_on(client).expect("client failed to connect");
trust_dns_proto::spawn_bg(&io_loop, bg);
@ -170,8 +163,7 @@ fn test_nodata_where_name_exists() {
Ipv4Addr::new(127, 0, 0, 1).into(),
tcp_port.expect("no tcp_port"),
);
let (stream, sender) =
TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new(addr);
let client = AsyncClient::new(Box::new(stream), sender, None);
let (mut client, bg) = io_loop.block_on(client).expect("client failed to connect");
trust_dns_proto::spawn_bg(&io_loop, bg);
@ -196,8 +188,7 @@ fn test_nxdomain_where_no_name_exists() {
Ipv4Addr::new(127, 0, 0, 1).into(),
tcp_port.expect("no tcp_port"),
);
let (stream, sender) =
TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new(addr);
let client = AsyncClient::new(Box::new(stream), sender, None);
let (mut client, bg) = io_loop.block_on(client).expect("client failed to connect");
trust_dns_proto::spawn_bg(&io_loop, bg);
@ -260,8 +251,7 @@ fn test_server_continues_on_bad_data_tcp() {
Ipv4Addr::new(127, 0, 0, 1).into(),
tcp_port.expect("no tcp_port"),
);
let (stream, sender) =
TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new(addr);
let client = AsyncClient::new(Box::new(stream), sender, None);
let (mut client, bg) = io_loop.block_on(client).expect("client failed to connect");
@ -281,8 +271,7 @@ fn test_server_continues_on_bad_data_tcp() {
Ipv4Addr::new(127, 0, 0, 1).into(),
tcp_port.expect("no tcp_port"),
);
let (stream, sender) =
TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new(addr);
let client = AsyncClient::new(Box::new(stream), sender, None);
let (mut client, bg) = io_loop.block_on(client).expect("client failed to connect");
trust_dns_proto::spawn_bg(&io_loop, bg);
@ -304,8 +293,7 @@ fn test_forward() {
Ipv4Addr::new(127, 0, 0, 1).into(),
tcp_port.expect("no tcp_port"),
);
let (stream, sender) =
TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new(addr);
let client = AsyncClient::new(Box::new(stream), sender, None);
let (mut client, bg) = io_loop.block_on(client).expect("client failed to connect");
@ -327,8 +315,7 @@ fn test_forward() {
Ipv4Addr::new(127, 0, 0, 1).into(),
tcp_port.expect("no tcp_port"),
);
let (stream, sender) =
TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new(addr);
let client = AsyncClient::new(Box::new(stream), sender, None);
let (mut client, bg) = io_loop.block_on(client).expect("client failed to connect");

View File

@ -14,6 +14,8 @@ use futures_io::{AsyncRead, AsyncWrite};
use trust_dns_resolver::proto::tcp::Connect;
use trust_dns_resolver::proto::udp::UdpSocket;
use crate::time::AsyncStdTime;
pub struct AsyncStdUdpSocket(async_std::net::UdpSocket);
#[async_trait]
@ -37,6 +39,8 @@ pub struct AsyncStdTcpStream(async_std::net::TcpStream);
#[async_trait]
impl Connect for AsyncStdTcpStream {
type Time = AsyncStdTime;
async fn connect(addr: SocketAddr) -> io::Result<Self> {
let stream = async_std::net::TcpStream::connect(addr).await?;
stream.set_nodelay(true)?;

View File

@ -18,7 +18,6 @@ use crate::error::*;
use crate::proto::iocompat::AsyncIo02As03;
use crate::proto::tcp::{TcpClientConnect, TcpClientStream};
use crate::proto::xfer::{DnsMultiplexer, DnsMultiplexerConnect};
use crate::proto::TokioTime;
use crate::rr::dnssec::Signer;
/// Tcp client connection
@ -70,9 +69,10 @@ impl ClientConnection for TcpClientConnection {
>;
fn new_stream(&self, signer: Option<Arc<Signer>>) -> Self::SenderFuture {
let (tcp_client_stream, handle) = TcpClientStream::<AsyncIo02As03<TcpStream>>::with_timeout::<
TokioTime,
>(self.name_server, self.timeout);
let (tcp_client_stream, handle) = TcpClientStream::<AsyncIo02As03<TcpStream>>::with_timeout(
self.name_server,
self.timeout,
);
DnsMultiplexer::new(tcp_client_stream, handle, signer)
}
}

View File

@ -24,7 +24,8 @@ use crate::error::ProtoError;
use crate::iocompat::AsyncIo02As03;
use crate::tcp::{Connect, TcpStream};
use crate::xfer::{DnsClientStream, SerialMessage};
use crate::Time;
#[cfg(feature = "tokio-runtime")]
use crate::TokioTime;
use crate::{BufDnsStreamHandle, DnsStreamHandle};
/// Tcp client stream
@ -44,13 +45,13 @@ impl<S: Connect> TcpClientStream<S> {
///
/// * `name_server` - the IP and Port of the DNS server to connect to
#[allow(clippy::new_ret_no_self)]
pub fn new<TE: 'static + Time>(
pub fn new(
name_server: SocketAddr,
) -> (
TcpClientConnect<S>,
Box<dyn DnsStreamHandle + 'static + Send>,
) {
Self::with_timeout::<TE>(name_server, Duration::from_secs(5))
Self::with_timeout(name_server, Duration::from_secs(5))
}
/// Constructs a new TcpStream for a client to the specified SocketAddr.
@ -59,14 +60,14 @@ impl<S: Connect> TcpClientStream<S> {
///
/// * `name_server` - the IP and Port of the DNS server to connect to
/// * `timeout` - connection timeout
pub fn with_timeout<TE: 'static + Time>(
pub fn with_timeout(
name_server: SocketAddr,
timeout: Duration,
) -> (
TcpClientConnect<S>,
Box<dyn DnsStreamHandle + 'static + Send>,
) {
let (stream_future, sender) = TcpStream::<S>::with_timeout::<TE>(name_server, timeout);
let (stream_future, sender) = TcpStream::<S>::with_timeout(name_server, timeout);
let new_future = Box::pin(
stream_future
@ -136,6 +137,8 @@ use tokio::net::TcpStream as TokioTcpStream;
#[cfg(feature = "tokio-runtime")]
#[async_trait]
impl Connect for AsyncIo02As03<TokioTcpStream> {
type Time = TokioTime;
async fn connect(addr: SocketAddr) -> io::Result<Self> {
super::tokio::connect(&addr).await.map(AsyncIo02As03)
}

View File

@ -30,6 +30,9 @@ pub trait Connect
where
Self: AsyncRead + AsyncWrite + Unpin + Send + Sync + Sized + 'static,
{
/// Timer type to use with this TCP stream type
type Time: Time;
/// connect to tcp
async fn connect(addr: SocketAddr) -> io::Result<Self>;
}
@ -116,7 +119,7 @@ impl<S: Connect> TcpStream<S> {
///
/// * `name_server` - the IP and Port of the DNS server to connect to
#[allow(clippy::new_ret_no_self, clippy::type_complexity)]
pub fn new<E, TE>(
pub fn new<E>(
name_server: SocketAddr,
) -> (
impl Future<Output = Result<TcpStream<S>, io::Error>> + Send,
@ -124,9 +127,8 @@ impl<S: Connect> TcpStream<S> {
)
where
E: FromProtoError,
TE: Time,
{
Self::with_timeout::<TE>(name_server, Duration::from_secs(5))
Self::with_timeout(name_server, Duration::from_secs(5))
}
/// Creates a new future of the eventually establish a IO stream connection or fail trying
@ -136,7 +138,7 @@ impl<S: Connect> TcpStream<S> {
/// * `name_server` - the IP and Port of the DNS server to connect to
/// * `timeout` - connection timeout
#[allow(clippy::type_complexity)]
pub fn with_timeout<TE: Time>(
pub fn with_timeout(
name_server: SocketAddr,
timeout: Duration,
) -> (
@ -146,18 +148,18 @@ impl<S: Connect> TcpStream<S> {
let (message_sender, outbound_messages) = BufStreamHandle::create();
// This set of futures collapses the next tcp socket into a stream which can be used for
// sending and receiving tcp packets.
let stream_fut = Self::connect::<TE>(name_server, timeout, outbound_messages);
let stream_fut = Self::connect(name_server, timeout, outbound_messages);
(stream_fut, message_sender)
}
async fn connect<TE: Time>(
async fn connect(
name_server: SocketAddr,
timeout: Duration,
outbound_messages: StreamReceiver,
) -> Result<TcpStream<S>, io::Error> {
let tcp = S::connect(name_server);
TE::timeout(timeout, tcp)
S::Time::timeout(timeout, tcp)
.map(move |tcp_stream: Result<Result<S, io::Error>, _>| {
tcp_stream
.and_then(|tcp_stream| tcp_stream)

View File

@ -92,7 +92,7 @@ pub fn tcp_stream_test<S: Connect, E: Executor, TE: Time>(server_addr: IpAddr, m
// the tests should run within 5 seconds... right?
// TODO: add timeout here, so that test never hangs...
// let timeout = Timeout::new(Duration::from_secs(5));
let (stream, mut sender) = TcpStream::<S>::new::<ProtoError, TE>(server_addr);
let (stream, mut sender) = TcpStream::<S>::new::<ProtoError>(server_addr);
let mut stream = exec.block_on(stream).expect("run failed to get stream");
@ -127,7 +127,7 @@ pub fn tcp_client_stream_test<S: Connect, E: Executor, TE: Time + 'static>(
// the tests should run within 5 seconds... right?
// TODO: add timeout here, so that test never hangs...
// let timeout = Timeout::new(Duration::from_secs(5));
let (stream, mut sender) = TcpClientStream::<S>::new::<TE>(server_addr);
let (stream, mut sender) = TcpClientStream::<S>::new(server_addr);
let mut stream = exec.block_on(stream).expect("run failed to get stream");

View File

@ -131,7 +131,7 @@ where
let timeout = options.timeout;
let (stream, handle) =
TcpClientStream::<R::Tcp>::with_timeout::<R::Timer>(socket_addr, timeout);
TcpClientStream::<R::Tcp>::with_timeout(socket_addr, timeout);
// TODO: need config for Signer...
let dns_conn = DnsMultiplexer::with_timeout(
stream,

View File

@ -82,7 +82,7 @@ fn test_query_udp_ipv6() {
fn test_query_tcp_ipv4() {
let mut io_loop = Runtime::new().unwrap();
let addr: SocketAddr = ("8.8.8.8", 53).to_socket_addrs().unwrap().next().unwrap();
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new(addr);
let client = AsyncClient::new(stream, sender, None);
let (mut client, bg) = io_loop.block_on(client).expect("client failed to connect");
trust_dns_proto::spawn_bg(&io_loop, bg);
@ -101,7 +101,7 @@ fn test_query_tcp_ipv6() {
.unwrap()
.next()
.unwrap();
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new(addr);
let client = AsyncClient::new(stream, sender, None);
let (mut client, bg) = io_loop.block_on(client).expect("client failed to connect");
trust_dns_proto::spawn_bg(&io_loop, bg);
@ -928,9 +928,10 @@ fn test_timeout_query_tcp() {
.next()
.unwrap();
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::with_timeout::<
TokioTime,
>(addr, std::time::Duration::from_millis(1));
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::with_timeout(
addr,
std::time::Duration::from_millis(1),
);
let client = AsyncClient::with_timeout(
Box::new(stream),
sender,

View File

@ -15,9 +15,9 @@ use trust_dns_client::rr::Name;
use trust_dns_client::rr::{DNSClass, RData, RecordType};
use trust_dns_client::tcp::TcpClientStream;
use trust_dns_proto::iocompat::AsyncIo02As03;
use trust_dns_proto::udp::{UdpClientConnect, UdpClientStream};
use trust_dns_proto::DnssecDnsHandle;
use trust_dns_proto::{iocompat::AsyncIo02As03, TokioTime};
use trust_dns_server::authority::{Authority, Catalog};
use trust_dns_integration::authority::create_secure_example;
@ -303,7 +303,7 @@ where
let mut io_loop = Runtime::new().unwrap();
let addr: SocketAddr = ("8.8.8.8", 53).to_socket_addrs().unwrap().next().unwrap();
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new::<TokioTime>(addr);
let (stream, sender) = TcpClientStream::<AsyncIo02As03<TokioTcpStream>>::new(addr);
let client = AsyncClient::new(Box::new(stream), sender, None);
let (client, bg) = io_loop.block_on(client).expect("client failed to connect");
trust_dns_proto::spawn_bg(&io_loop, bg);