cleanup invalid FIXMEs and prepare for std::future to master

wip: review cleanup

clean up for merge
This commit is contained in:
Benjamin Fry 2019-10-14 15:58:50 -07:00
parent 749f8eccd0
commit f628efca01
26 changed files with 110 additions and 189 deletions

View File

@ -14,6 +14,7 @@ All notes should be prepended with the location of the change, e.g. `(proto)` or
- *breaking* (client) rebranded from `trust-dns` to `trust-dns-client`
- *breaking* (all) all internals updated to std::future and async/await (requires `Rust 1.40` minimum)
## 0.17.0 (Client/Server)
### Added

View File

@ -243,13 +243,6 @@ impl DnsRequestSender for HttpsClientStream {
Arc::clone(&self.name_server_name),
self.name_server,
)))
// HttpsSerialResponse(HttpsSerialResponseInner::StartSend {
// h2: self.h2.clone(),
// message,
// name_server_name: Arc::clone(&self.name_server_name),
// name_server: self.name_server,
// })
}
fn error_response(error: ProtoError) -> Self::DnsResponseFuture {
@ -503,27 +496,6 @@ pub struct HttpsClientResponse(
Pin<Box<dyn Future<Output = Result<DnsResponse, ProtoError>> + Send>>,
);
// impl HttpsClientResponse {
// /// creates a new future for the request
// ///
// /// # Arguments
// ///
// /// * `request` - Serialized message being sent
// /// * `message_id` - Id of the message that was encoded in the serial message
// fn new(request: SerialMessage, message_id: u16, timeout: Duration) -> Self {
// UdpResponse(Box::pin(Timeout::new(
// SingleUseUdpSocket::send_serial_message::<S>(request, message_id),
// timeout,
// )))
// }
// /// ad already completed future
// fn complete<F: Future<Output = Result<DnsResponse, ProtoError>> + Send + 'static>(f: F) -> Self {
// // TODO: this constructure isn't really necessary
// UdpResponse(Box::pin(Timeout::new(f, Duration::from_secs(5))))
// }
// }
impl Future for HttpsClientResponse {
type Output = Result<DnsResponse, ProtoError>;

View File

@ -169,8 +169,6 @@ mod tests {
let request = request::new("ns.example.com", len).unwrap();
let request = request.map(|()| stream);
// FIXME: generic stream impl is issue...
let from_post = message_from(Arc::new("ns.example.com".to_string()), request);
let bytes = match block_on(from_post) {
Ok(bytes) => bytes,

View File

@ -139,13 +139,7 @@ impl TlsStreamBuilder {
let ca_chain = self.ca_chain.clone();
let identity = self.identity;
let tcp_stream: Result<TokioTcpStream, _> = TokioTcpStream::connect(&name_server) /*.map_err(|e| {
io::Error::new(
io::ErrorKind::ConnectionRefused,
format!("tls error: {}", e),
)
})*/
.await;
let tcp_stream: Result<TokioTcpStream, _> = TokioTcpStream::connect(&name_server).await;
// TODO: for some reason the above wouldn't accept a ?
let tcp_stream = match tcp_stream {

View File

@ -242,8 +242,6 @@ impl TlsStreamBuilder {
}
};
// TODO: this clone can go way when the fn becomes
// This set of futures collapses the next tcp socket into a stream which can be used for
// sending and receiving tcp packets.
let stream = Box::pin(

View File

@ -301,27 +301,6 @@ impl From<Elapsed> for ProtoError {
}
}
// impl From<tokio_timer::Error<ProtoError>> for ProtoError {
// fn from(e: tokio_timer::Error<ProtoError>) -> Self {
// if e.is_elapsed() {
// return ProtoError::from(ProtoErrorKind::Timeout);
// }
// if e.is_inner() {
// return e.into_inner().expect("invalid state, not a ProtoError");
// }
// if e.is_timer() {
// return ProtoError::from(
// e.into_timer()
// .expect("invalid state, not a tokio_timer::Error"),
// );
// }
// ProtoError::from("unknown error with tokio_timer")
// }
// }
impl From<::url::ParseError> for ProtoError {
fn from(e: ::url::ParseError) -> ProtoError {
e.context(ProtoErrorKind::UrlParsing).into()

View File

@ -750,8 +750,6 @@ where
additional_count.1 |= count.1;
}
// FIXME: because this is destructive, we need to move message signing here... maybe it will work?
// this is a little hacky, but if we are Verifying a signature, i.e. the original Message
// then the SIG0 records should not be encoded and the edns record (if it exists) is already
// part of the additionals section.

View File

@ -121,7 +121,6 @@ impl<S: Connect + 'static> TcpStream<S> {
pub fn new<E>(
name_server: SocketAddr,
) -> (
//Box<dyn Future<Output = Result<TcpStream<S::Transport>, io::Error>> + Send>,
impl Future<Output = Result<TcpStream<S::Transport>, io::Error>> + Send,
BufStreamHandle,
)
@ -142,7 +141,6 @@ impl<S: Connect + 'static> TcpStream<S> {
name_server: SocketAddr,
timeout: Duration,
) -> (
//Box<dyn Future<Output = Result<TcpStream<S::Transport>, io::Error>> + Send>,
impl Future<Output = Result<TcpStream<S::Transport>, io::Error>> + Send,
BufStreamHandle,
) {

View File

@ -314,7 +314,6 @@ impl SingleUseUdpSocket {
message.id()
);
//SingleUseUdpSocket::AwaitResponse(msg.take(), socket.take(), msg_id)
continue;
}
}
@ -324,14 +323,14 @@ impl SingleUseUdpSocket {
"dropped malformed message waiting for id: {} err: {}",
msg_id, e
);
//SingleUseUdpSocket::AwaitResponse(msg.take(), socket.take(), msg_id)
continue;
}
}
}
}
// FIXME: this is unnecessary
// TODO: this is unnecessary
async fn errored(err: ProtoError) -> Result<DnsResponse, ProtoError> {
futures::future::err(err).await
}

View File

@ -126,7 +126,6 @@ where
DnsRequestStreamHandle { sender }
}
// FIXME: does try send change the semantics this had before?
/// see [`futures::sync::mpsc::UnboundedSender`]
pub fn unbounded_send(
&self,

View File

@ -82,7 +82,7 @@ impl<H: DnsHandle + Unpin> Future for RetrySendFuture<H> {
}
self.remaining_attempts -= 1;
// FIXME: if the "sent" Message is part of the error result,
// TODO: if the "sent" Message is part of the error result,
// then we can just reuse it... and no clone necessary
let request = self.request.clone();
self.future = self.handle.send(request);

View File

@ -440,7 +440,7 @@ where
if let RecordType::DNSSEC(DNSSECRecordType::DNSKEY) = rrset.record_type {
if rrsigs.is_empty() {
debug!("unsigned key: {}, {:?}", rrset.name, rrset.record_type);
// FIXME: validate that this DNSKEY is stronger than the one lower in the chain,
// TODO: validate that this DNSKEY is stronger than the one lower in the chain,
// also, set the min algorithm to this algorithm to prevent downgrade attacks.
return verify_dnskey_rrset(handle.clone_with_context(), rrset);
}

View File

@ -53,6 +53,7 @@ dnssec = []
serde-config = ["serde", "trust-dns-proto/serde-config"]
# enables experimental the mDNS (multicast) feature
mdns = ["trust-dns-proto/mdns"]
[lib]

View File

@ -126,7 +126,7 @@ mod tests {
fn tests_dir() -> String {
let server_path = env::var("TDNS_SERVER_SRC_ROOT").unwrap_or_else(|_| ".".to_owned());
format! {"{}/../resolver/tests", server_path}
format!("{}/../resolver/tests", server_path)
}
#[test]

View File

@ -264,7 +264,8 @@ pub trait LookupObject: Send {
fn take_additionals(&mut self) -> Option<Box<dyn LookupObject>>;
}
struct EmptyLookup;
/// A lookup that returns no records
pub struct EmptyLookup;
impl LookupObject for EmptyLookup {
fn is_empty(&self) -> bool {

View File

@ -33,7 +33,7 @@ use trust_dns::rr::{LowerName, RecordType};
use crate::authority::{
AuthLookup, MessageRequest, MessageResponse, MessageResponseBuilder, ZoneType,
};
use crate::authority::{AuthorityObject, BoxedLookupFuture, LookupError, LookupObject};
use crate::authority::{AuthorityObject, BoxedLookupFuture, LookupError, LookupObject, EmptyLookup};
use crate::server::{Request, RequestHandler, ResponseHandler};
/// Set of authorities, zones, available to this server.
@ -169,14 +169,14 @@ impl RequestHandler for Catalog {
/// Future response to handle a request
#[must_use = "futures do nothing unless polled"]
pub enum HandleRequest {
LookupFuture(Pin<Box<dyn Future<Output = Result<(), ()>> + Send>>),
LookupFuture(Pin<Box<dyn Future<Output = ()> + Send>>),
Result(io::Result<()>),
}
impl HandleRequest {
fn lookup<R: ResponseHandler + Unpin>(lookup_future: LookupFuture<R>) -> Self {
let lookup =
Box::pin(lookup_future) as Pin<Box<dyn Future<Output = Result<(), ()>> + Send>>;
Box::pin(lookup_future) as Pin<Box<dyn Future<Output = ()> + Send>>;
HandleRequest::LookupFuture(lookup)
}
@ -187,15 +187,15 @@ impl HandleRequest {
impl Future for HandleRequest {
// TODO: return ()
type Output = Result<(), ()>;
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match *self {
HandleRequest::LookupFuture(ref mut lookup) => lookup.as_mut().poll(cx),
HandleRequest::Result(Ok(_)) => Poll::Ready(Ok(())),
HandleRequest::Result(Ok(_)) => Poll::Ready(()),
HandleRequest::Result(Err(ref res)) => {
error!("update failed: {}", res);
Poll::Ready(Ok(()))
Poll::Ready(())
}
}
}
@ -480,16 +480,14 @@ impl<R: ResponseHandler + Unpin> LookupFuture<R> {
}
impl<R: ResponseHandler + Unpin> Future for LookupFuture<R> {
// TODO: return ()
type Output = Result<(), ()>;
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
loop {
// See if the lookup had anything
match self.lookup.as_mut().map(|f| f.poll_unpin(cx)) {
Some(Poll::Pending) => return Poll::Pending,
Some(Poll::Ready(Ok(()))) => (),
Some(Poll::Ready(Err(()))) => error!("unexpected error result in LookupFuture"),
Some(Poll::Ready(())) => (),
None => (),
}
@ -501,7 +499,7 @@ impl<R: ResponseHandler + Unpin> Future for LookupFuture<R> {
q_a
} else {
// all lookups are complete, finish request
return Poll::Ready(Ok(()));
return Poll::Ready(());
};
let query = if let Some(query) = self.request.queries().get(query_idx) {
@ -669,13 +667,12 @@ impl<R: ResponseHandler> AuthorityLookup<R> {
}
impl<R: ResponseHandler> Future for AuthorityLookup<R> {
// TODO: return ()
type Output = Result<(), ()>;
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let (response_params, request_params, authority, state) = self.split();
let sections = ready!(state.poll(cx, request_params, response_params, authority))?;
let sections = ready!(state.poll(cx, request_params, response_params, authority));
let records = sections.answers;
let soa = sections.soa;
@ -707,7 +704,7 @@ impl<R: ResponseHandler> Future for AuthorityLookup<R> {
.map_err(|e| error!("error sending response: {}", e))
.ok();
Poll::Ready(Ok(()))
Poll::Ready(())
}
}
@ -732,7 +729,7 @@ impl AuthOrResolve {
request_params: &RequestParams,
response_params: &mut ResponseParams<R>,
authority: &Arc<RwLock<Box<dyn AuthorityObject>>>,
) -> Poll<Result<LookupSections, ()>> {
) -> Poll<LookupSections> {
match self {
AuthOrResolve::AuthorityLookupState(a) => {
a.poll(cx, request_params, response_params, authority)
@ -779,7 +776,7 @@ impl AuthorityLookupState {
request_params: &RequestParams,
response_params: &mut ResponseParams<R>,
authority: &Arc<RwLock<Box<dyn AuthorityObject>>>,
) -> Poll<Result<LookupSections, ()>> {
) -> Poll<LookupSections> {
loop {
*self = match self {
// In this state we await the records, on success we transition to getting
@ -940,7 +937,7 @@ impl AuthorityLookupState {
}),
};
return Poll::Ready(Ok(sections));
return Poll::Ready(sections);
}
}
}
@ -962,7 +959,7 @@ impl ResolveLookupState {
_request_params: &RequestParams,
response_params: &mut ResponseParams<R>,
_authority: &Arc<RwLock<Box<dyn AuthorityObject>>>,
) -> Poll<Result<LookupSections, ()>> {
) -> Poll<LookupSections> {
#[allow(clippy::never_loop)]
loop {
// TODO: way more states to consider.
@ -975,7 +972,11 @@ impl ResolveLookupState {
ResolveLookupState::Records { record_lookup } => {
let records = ready!(record_lookup
.map_err(|e| error!("error resolving: {}", e))
.poll_unpin(cx))?;
.map(|r: Result<_,()>| match r {
Ok(r) => r,
Err(()) => Box::new(EmptyLookup),
})
.poll_unpin(cx));
// need to clone the result codes...
response_params.response_header.set_authoritative(false);
@ -987,7 +988,7 @@ impl ResolveLookupState {
additionals: Box::new(AuthLookup::default()) as Box<dyn LookupObject>,
};
return Poll::Ready(Ok(sections));
return Poll::Ready(sections);
}
}
}

View File

@ -26,7 +26,7 @@ pub use self::auth_lookup::{
AnyRecords, AuthLookup, AuthLookupIter, LookupRecords, LookupRecordsIter,
};
pub use self::authority::Authority;
pub use self::authority_object::{AuthorityObject, BoxedLookupFuture, LookupObject};
pub use self::authority_object::{AuthorityObject, BoxedLookupFuture, LookupObject, EmptyLookup};
pub use self::catalog::Catalog;
pub use self::error::{LookupError, LookupResult};
pub use self::message_request::{MessageRequest, Queries, UpdateRequest};

View File

@ -46,8 +46,7 @@ use std::path::{Path, PathBuf};
use std::pin::Pin;
use clap::{Arg, ArgMatches};
use futures::executor::block_on;
use futures::{future, Future, TryFutureExt};
use futures::{future, Future};
use tokio::runtime::Runtime;
use tokio::runtime::TaskExecutor;
use tokio_net::tcp::TcpListener;
@ -487,6 +486,9 @@ fn config_tls(
zone_dir: &Path,
listen_addrs: &[IpAddr],
) {
use futures::TryFutureExt;
use futures::executor::block_on;
let tls_listen_port: u16 = args
.flag_tls_port
.unwrap_or_else(|| config.get_tls_listen_port());
@ -531,6 +533,9 @@ fn config_https(
zone_dir: &Path,
listen_addrs: &[IpAddr],
) {
use futures::TryFutureExt;
use futures::executor::block_on;
let https_listen_port: u16 = args
.flag_https_port
.unwrap_or_else(|| config.get_https_listen_port());

View File

@ -83,10 +83,7 @@ async fn handle_request<T>(
debug!("received message: {:?}", message);
if let Err(()) = server_future::handle_request(message, src_addr, handler, responder).await {
warn!("error handling request from {}", src_addr);
()
}
server_future::handle_request(message, src_addr, handler, responder).await
}
#[derive(Clone)]

View File

@ -25,7 +25,7 @@ pub struct Request {
/// Trait for handling incoming requests, and providing a message response.
pub trait RequestHandler: Send + Unpin + 'static {
/// A future for execution of the request
type ResponseFuture: Future<Output = Result<(), ()>> + Send + Unpin + 'static;
type ResponseFuture: Future<Output = ()> + Send + Unpin + 'static;
/// Determines what needs to happen given the type of request, i.e. Query or Update.
///

View File

@ -12,7 +12,7 @@ use std::sync::{Arc, Mutex};
use std::task::Context;
use std::time::Duration;
use futures::{future, Future, FutureExt, Poll, StreamExt, TryFutureExt, TryStreamExt};
use futures::{future, Future, FutureExt, Poll, StreamExt};
#[cfg(feature = "dns-over-rustls")]
use rustls::{Certificate, PrivateKey};
@ -214,7 +214,7 @@ impl<T: RequestHandler> ServerFuture<T> {
format!("tls error: {}", e),
)
})
.and_then(move |tls_stream| {
.map_ok(move |tls_stream| {
let (buf_stream, stream_handle) =
TlsStream::from_stream(tls_stream, src_addr);
let timeout_stream = TimeoutStream::new(buf_stream, timeout);
@ -247,14 +247,10 @@ impl<T: RequestHandler> ServerFuture<T> {
.map(|_: Result<(), ()>| ()),
);
future::ok(())
Ok(())
})
.await
}
// FIXME: need to map this error to Ok, otherwise this is a DOS potential
// .map_err(move |e| {
// debug!("error TLS handshake: {:?} error: {:?}", src_addr, e)
// })
})
.map_err(|e| panic!("error in inbound tls_stream: {}", e))
.map(|_: Result<(), ()>| ()),
@ -310,6 +306,7 @@ impl<T: RequestHandler> ServerFuture<T> {
timeout: Duration,
certificate_and_key: (Vec<Certificate>, PrivateKey),
) -> io::Result<()> {
use futures::{TryFutureExt, TryStreamExt};
use tokio_rustls::TlsAcceptor;
use trust_dns_rustls::{tls_from_stream, tls_server};
@ -340,54 +337,39 @@ impl<T: RequestHandler> ServerFuture<T> {
// take the created stream...
tls_acceptor
.accept(tcp_stream)
.and_then(move |tls_stream| {
.map_ok(move |tls_stream| {
let (buf_stream, stream_handle) = tls_from_stream(tls_stream, src_addr);
let timeout_stream = TimeoutStream::new(buf_stream, timeout);
//let request_stream = RequestStream::new(timeout_stream, stream_handle);
let handler = handler.clone();
// and spawn to the io_loop
tokio_executor::spawn(timeout_stream.for_each(move |message| {
message
.map(|message| {
Box::pin(self::handle_raw_request(
tokio_executor::spawn(
timeout_stream
.try_for_each(move |message| {
self::handle_raw_request(
message,
handler.clone(),
stream_handle.clone(),
))
as Pin<Box<dyn Future<Output = ()> + Send>>
)
.map(|_: ()| Ok(()))
})
.unwrap_or_else(|e| {
.map_err(move |e| {
debug!(
"error in TCP request_stream src: {:?} error: {}",
"error in TLS request_stream src {}: {}",
src_addr, e
);
Box::pin(future::ready(()))
as Pin<Box<dyn Future<Output = ()> + Send>>
)
})
}));
.map(|_: Result<(), ()>| ()),
);
future::ok(())
()
})
.map_err(|e| {
io::Error::new(
io::ErrorKind::ConnectionRefused,
format!("tls error: {}", e),
)
})
// .map(|result: Result<(),()>| ())
// FIXME: need to map this error to Ok, otherwise this is a DOS potential
// .map_err(move |e| {
// debug!("error HTTPS handshake: {:?} error: {:?}", src_addr, e)
// })
.map_err(move |e| debug!("error handling TLS stream {}: {}", src_addr, e))
.map(|_: Result<(), ()>| Ok(()))
})
.map_err(|_| panic!("error in inbound tls_stream"))
.map(|_: Result<(), ()>| ()), // .map(|r| {
// if let Err(e) = r {
// panic!("error in inbound https_stream: {}", e)
// }
// })
.map(|_: Result<(), ()>| ()),
);
Ok(())
@ -441,6 +423,7 @@ impl<T: RequestHandler> ServerFuture<T> {
certificate_and_key: (Vec<Certificate>, PrivateKey),
dns_hostname: String,
) -> io::Result<()> {
use futures::{TryFutureExt, TryStreamExt};
use tokio_rustls::TlsAcceptor;
use crate::server::https_handler::h2_handler;
@ -477,10 +460,11 @@ impl<T: RequestHandler> ServerFuture<T> {
// take the created stream...
tls_acceptor
.accept(tcp_stream)
.map_err(|e| warn!("tls error: {}", e))
.map_err(move |e| debug!("error handling HTTPS stream {}: {}", src_addr, e))
.and_then(move |tls_stream| {
h2_handler(handler, tls_stream, src_addr, dns_hostname).unit_error()
})
.map(|_: Result<(), ()>| Ok(()))
})
.map(|_: Result<(), ()>| ())
});
@ -544,19 +528,19 @@ pub(crate) fn handle_request<R: ResponseHandler, T: RequestHandler>(
}
#[must_use = "futures do nothing unless polled"]
pub(crate) enum HandleRawRequest<F: Future<Output = Result<(), ()>>> {
pub(crate) enum HandleRawRequest<F: Future<Output = ()>> {
HandleRequest(F),
Result(io::Error),
}
// TODO: output ()
impl<F: Future<Output = Result<(), ()>> + Unpin> Future for HandleRawRequest<F> {
impl<F: Future<Output = ()> + Unpin> Future for HandleRawRequest<F> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match *self {
HandleRawRequest::HandleRequest(ref mut f) => {
f.poll_unpin(cx).map(|_: Result<_, _>| ())
f.poll_unpin(cx)
}
HandleRawRequest::Result(ref res) => {
warn!("failed to handle message: {}", res);

View File

@ -140,8 +140,6 @@ pub fn create_example() -> InMemoryAuthority {
0,
);
// FIXME: THESE NEW CNAME RECORDS ARE CAUSING NSEC TO FAIL, WHY?
// www.example.com. 86400 IN AAAA 2606:2800:220:1:248:1893:25c8:1946
records.upsert(
Record::new()

View File

@ -24,7 +24,7 @@ use std::time::{Duration, Instant};
use futures::channel::mpsc::{unbounded, UnboundedReceiver};
use futures::executor::block_on;
use futures::stream::{Fuse, Stream, StreamExt};
use futures::{future, Future, FutureExt, TryFutureExt};
use futures::{future, Future, FutureExt};
use tokio_timer::Delay;
use trust_dns::client::ClientConnection;
@ -84,7 +84,7 @@ impl TestResponseHandler {
TestResponseHandler { message_ready, buf }
}
fn into_inner(self) -> impl Future<Output = Result<Vec<u8>, ()>> {
fn into_inner(self) -> impl Future<Output = Vec<u8>> {
future::poll_fn(move |_| {
if self
.message_ready
@ -92,16 +92,16 @@ impl TestResponseHandler {
.is_ok()
{
let bytes: Vec<u8> = mem::replace(&mut self.buf.lock().unwrap(), vec![]);
Poll::Ready(Ok(bytes))
Poll::Ready(bytes)
} else {
Poll::Pending
}
})
}
pub fn into_message(self) -> impl Future<Output = Result<Message, ()>> {
pub fn into_message(self) -> impl Future<Output = Message> {
let bytes = self.into_inner();
bytes.map_ok(|b| {
bytes.map(|b| {
let mut decoder = BinDecoder::new(&b);
Message::read(&mut decoder).expect("could not decode message")
})
@ -156,12 +156,11 @@ impl Stream for TestClientStream {
.lock()
.unwrap()
.handle_request(request, response_handler.clone()),
)
.unwrap();
);
dbg!("catalog handled request");
let buf = block_on(response_handler.into_inner()).unwrap();
let buf = block_on(response_handler.into_inner());
dbg!("catalog responded");
Poll::Ready(Some(Ok(SerialMessage::new(buf, src_addr))))

View File

@ -138,8 +138,8 @@ fn test_catalog_lookup() {
let question_req = MessageRequest::from_bytes(&question_bytes).unwrap();
let response_handler = TestResponseHandler::new();
block_on(catalog.lookup(question_req, None, response_handler.clone())).unwrap();
let result = block_on(response_handler.into_message()).unwrap();
block_on(catalog.lookup(question_req, None, response_handler.clone()));
let result = block_on(response_handler.into_message());
assert_eq!(result.response_code(), ResponseCode::NoError);
assert_eq!(result.message_type(), MessageType::Response);
@ -179,8 +179,8 @@ fn test_catalog_lookup() {
let question_req = MessageRequest::from_bytes(&question_bytes).unwrap();
let response_handler = TestResponseHandler::new();
block_on(catalog.lookup(question_req, None, response_handler.clone())).unwrap();
let result = block_on(response_handler.into_message()).unwrap();
block_on(catalog.lookup(question_req, None, response_handler.clone()));
let result = block_on(response_handler.into_message());
assert_eq!(result.response_code(), ResponseCode::NoError);
assert_eq!(result.message_type(), MessageType::Response);
@ -216,8 +216,8 @@ fn test_catalog_nx_soa() {
let question_req = MessageRequest::from_bytes(&question_bytes).unwrap();
let response_handler = TestResponseHandler::new();
block_on(catalog.lookup(question_req, None, response_handler.clone())).unwrap();
let result = block_on(response_handler.into_message()).unwrap();
block_on(catalog.lookup(question_req, None, response_handler.clone()));
let result = block_on(response_handler.into_message());
assert_eq!(result.response_code(), ResponseCode::NXDomain);
assert_eq!(result.message_type(), MessageType::Response);
@ -278,8 +278,8 @@ fn test_axfr() {
let question_req = MessageRequest::from_bytes(&question_bytes).unwrap();
let response_handler = TestResponseHandler::new();
block_on(catalog.lookup(question_req, None, response_handler.clone())).expect("lookup failed");
let result = block_on(response_handler.into_message()).unwrap();
block_on(catalog.lookup(question_req, None, response_handler.clone()));
let result = block_on(response_handler.into_message());
let mut answers: Vec<Record> = result.answers().to_vec();
@ -395,8 +395,8 @@ fn test_axfr_refused() {
let question_req = MessageRequest::from_bytes(&question_bytes).unwrap();
let response_handler = TestResponseHandler::new();
block_on(catalog.lookup(question_req, None, response_handler.clone())).expect("lookup failed");
let result = block_on(response_handler.into_message()).unwrap();
block_on(catalog.lookup(question_req, None, response_handler.clone()));
let result = block_on(response_handler.into_message());
assert_eq!(result.response_code(), ResponseCode::Refused);
assert!(result.answers().is_empty());
@ -432,8 +432,8 @@ fn test_cname_additionals() {
let question_req = MessageRequest::from_bytes(&question_bytes).unwrap();
let response_handler = TestResponseHandler::new();
block_on(catalog.lookup(question_req, None, response_handler.clone())).unwrap();
let result = block_on(response_handler.into_message()).unwrap();
block_on(catalog.lookup(question_req, None, response_handler.clone()));
let result = block_on(response_handler.into_message());
assert_eq!(result.message_type(), MessageType::Response);
assert_eq!(result.response_code(), ResponseCode::NoError);
@ -476,8 +476,8 @@ fn test_multiple_cname_additionals() {
let question_req = MessageRequest::from_bytes(&question_bytes).unwrap();
let response_handler = TestResponseHandler::new();
block_on(catalog.lookup(question_req, None, response_handler.clone())).unwrap();
let result = block_on(response_handler.into_message()).unwrap();
block_on(catalog.lookup(question_req, None, response_handler.clone()));
let result = block_on(response_handler.into_message());
assert_eq!(result.message_type(), MessageType::Response);
assert_eq!(result.response_code(), ResponseCode::NoError);

View File

@ -23,7 +23,7 @@ use std::sync::{Arc, Mutex};
#[cfg(feature = "dnssec")]
use chrono::Duration;
use futures::{Future, TryFutureExt};
use futures::{Future, FutureExt, TryFutureExt};
use tokio::runtime::current_thread::Runtime;
use tokio_net::tcp::TcpStream as TokioTcpStream;
use tokio_net::udp::UdpSocket as TokioUdpSocket;
@ -60,8 +60,8 @@ fn test_query_nonet() {
let (bg, mut client) = ClientFuture::new(stream, Box::new(sender), None);
io_loop.spawn(bg);
io_loop.block_on(test_query(&mut client)).unwrap();
io_loop.block_on(test_query(&mut client)).unwrap();
io_loop.block_on(test_query(&mut client));
io_loop.block_on(test_query(&mut client));
}
#[test]
@ -73,8 +73,8 @@ fn test_query_udp_ipv4() {
io_loop.spawn(bg);
// TODO: timeouts on these requests so that the test doesn't hang
io_loop.block_on(test_query(&mut client)).unwrap();
io_loop.block_on(test_query(&mut client)).unwrap();
io_loop.block_on(test_query(&mut client));
io_loop.block_on(test_query(&mut client));
}
#[test]
@ -91,8 +91,8 @@ fn test_query_udp_ipv6() {
io_loop.spawn(bg);
// TODO: timeouts on these requests so that the test doesn't hang
io_loop.block_on(test_query(&mut client)).unwrap();
io_loop.block_on(test_query(&mut client)).unwrap();
io_loop.block_on(test_query(&mut client));
io_loop.block_on(test_query(&mut client));
}
#[test]
@ -104,8 +104,8 @@ fn test_query_tcp_ipv4() {
io_loop.spawn(bg);
// TODO: timeouts on these requests so that the test doesn't hang
io_loop.block_on(test_query(&mut client)).unwrap();
io_loop.block_on(test_query(&mut client)).unwrap();
io_loop.block_on(test_query(&mut client));
io_loop.block_on(test_query(&mut client));
}
#[test]
@ -122,8 +122,8 @@ fn test_query_tcp_ipv6() {
io_loop.spawn(bg);
// TODO: timeouts on these requests so that the test doesn't hang
io_loop.block_on(test_query(&mut client)).unwrap();
io_loop.block_on(test_query(&mut client)).unwrap();
io_loop.block_on(test_query(&mut client));
io_loop.block_on(test_query(&mut client));
}
#[test]
@ -150,12 +150,12 @@ fn test_query_https() {
io_loop.spawn(bg);
// TODO: timeouts on these requests so that the test doesn't hang
io_loop.block_on(test_query(&mut client)).unwrap();
io_loop.block_on(test_query(&mut client)).unwrap();
io_loop.block_on(test_query(&mut client));
io_loop.block_on(test_query(&mut client));
}
#[cfg(test)]
fn test_query<R>(client: &mut BasicClientHandle<R>) -> Pin<Box<dyn Future<Output = Result<(), ()>>>>
fn test_query<R>(client: &mut BasicClientHandle<R>) -> Pin<Box<dyn Future<Output = ()>>>
where
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
{
@ -184,9 +184,9 @@ where
panic!();
}
})
.map_err(|e| {
panic!("query failed: {}", e);
}),
.map(|r: Result<_,_>| {
r.expect("query failed")
})
)
}

View File

@ -283,13 +283,12 @@ fn server_thread_udp(udp_socket: UdpSocket, server_continue: Arc<AtomicBool>) {
let mut io_loop = Runtime::new().unwrap();
let server = ServerFuture::new(catalog);
io_loop
.block_on::<Pin<Box<dyn Future<Output = Result<(), ()>> + Send>>>(Box::pin(future::lazy(
.block_on::<Pin<Box<dyn Future<Output = ()> + Send>>>(Box::pin(future::lazy(
|_| {
server.register_socket(udp_socket);
Ok(())
()
},
)))
.unwrap();
)));
while server_continue.load(Ordering::Relaxed) {
io_loop.block_on(tokio_timer::delay(
@ -315,7 +314,7 @@ fn server_thread_tcp(tcp_listener: TcpListener, server_continue: Arc<AtomicBool>
}
}
// FIXME: need a rustls option
// TODO: need a rustls option
#[cfg(all(feature = "dns-over-openssl", not(feature = "dns-over-rustls")))]
fn server_thread_tls(
tls_listener: TcpListener,