rename dns_future to dns_multiplexer

This commit is contained in:
Benjamin Fry 2018-07-08 22:52:19 -07:00
parent 289e75649c
commit 3f2df6f4b3
4 changed files with 5 additions and 256 deletions

View File

@ -55,11 +55,11 @@ pub mod tcp;
pub mod udp;
pub mod xfer;
#[doc(hidden)]
pub use xfer::dns_future::DnsMultiplexer;
#[doc(hidden)]
pub use xfer::dns_handle::{BasicDnsHandle, DnsHandle, DnsStreamHandle, StreamHandle};
#[doc(hidden)]
pub use xfer::dns_multiplexer::DnsMultiplexer;
#[doc(hidden)]
pub use xfer::retry_dns_handle::RetryDnsHandle;
#[doc(hidden)]
#[cfg(feature = "dnssec")]

View File

@ -454,20 +454,6 @@ where
}
}
// // Clean shutdown happens when all pending requests are done and the
// // incoming channel has been closed (e.g. you'll never receive another
// // request). Errors will return early...
// let done = match self.new_receiver.peek() {
// Ok(Async::Ready(None)) => true,
// Ok(_) => false,
// Err(_) => return Err(ProtoErrorKind::NoError.into()),
// };
// // The
// if self.active_requests.is_empty() && done {
// return Ok(Async::Ready(None)); // we are done
// }
// If still active, then if the qos (for _ in 0..100 loop) limit
// was hit then "yield". This'll make sure that the future is
// woken up immediately on the next turn of the event loop.
@ -533,240 +519,3 @@ impl Future for DnsMultiplexerSerialResponseInner {
}
}
}
// debug!("got message from receiver");
// // we have a new message to send
// match self.next_random_query_id() {
// Async::Ready(id) => Some(id),
// Async::NotReady => break,
// }
// }
// Ok(Async::Ready(None)) => {
// // We want to pop the nones off in the poll, to get rid of them.
// None
// }
// Ok(Async::NotReady) => {
// // we must break in the NotReady case as well, we don't want there to ever be a case where
// // a message could arrive between peek and poll... i.e. a race condition where query_id
// // would have been gotten
// break;
// }
// Err(()) => {
// warn!("receiver was shutdown?");
// break;
// }
// };
// // finally pop the reciever
// match self.new_receiver.poll() {
// Ok(Async::Ready(Some((request, complete)))) => {
// let mut request: DnsRequest = request;
// // if there was a message, and the above succesion was succesful,
// // register the new message, if not do not register, and set the complete to error.
// // getting a random query id, this mitigates potential cache poisoning.
// let query_id = query_id.expect("query_id should have been set above");
// request.set_id(query_id);
// let now = SystemTime::now()
// .duration_since(UNIX_EPOCH)
// .map_err(|_| {
// ProtoErrorKind::Message("Current time is before the Unix epoch.").into()
// })?
// .as_secs();
// let now = now as u32; // XXX: truncates u64 to u32.
// // update messages need to be signed.
// if let OpCode::Update = request.op_code() {
// if let Some(ref signer) = self.signer {
// if let Err(e) = request.finalize::<MF>(signer.borrow(), now) {
// warn!("could not sign message: {}", e);
// ignore_send(complete.send(Err(e.into())));
// continue; // to the next message...
// }
// }
// }
// // store a Timeout for this message before sending
// let mut timeout = Delay::new(Instant::now() + self.timeout_duration);
// // make sure to register insterest in the Timeout
// match timeout.poll() {
// Ok(Async::Ready(_)) => {
// warn!("timeout fired before sending message!: {}", query_id);
// ignore_send(
// complete
// .send(Err(E::from(ProtoError::from(ProtoErrorKind::Timeout)))),
// );
// continue; // to the next message
// }
// Ok(Async::NotReady) => (), // this is the exepcted state...
// Err(e) => {
// error!("could not register interest in Timeout: {}", e);
// ignore_send(complete.send(Err(E::from(e.into()))));
// continue; // to the next message
// }
// }
// // send the message
// let active_request = ActiveRequest::new(
// complete,
// request.id(),
// request.options().clone(),
// timeout,
// );
// match request.unwrap().to_vec() {
// Ok(buffer) => {
// debug!("sending message id: {}", active_request.request_id());
// let serial_message =
// SerialMessage::new(buffer, self.stream.name_server_addr());
// self.stream_handle.send(serial_message)?;
// // add to the map -after- the client send b/c we don't want to put it in the map if
// // we ended up returning from the send.
// self.active_requests
// .insert(active_request.request_id(), active_request);
// }
// Err(e) => {
// debug!(
// "error message id: {} error: {}",
// active_request.request_id(),
// e
// );
// // complete with the error, don't add to the map of active requests
// active_request.complete_with_error(e);
// }
// }
// }
// Ok(_) => break,
// Err(()) => {
// warn!("receiver was shutdown?");
// break;
// }
// }
// }
// // Collect all inbound requests, max 100 at a time for QoS
// // by having a max we will guarantee that the client can't be DOSed in this loop
// // TODO: make the QoS configurable
// let mut messages_received = 0;
// for i in 0..QOS_MAX_RECEIVE_MSGS {
// match self.stream.poll().map_err(|e| E::from(e.into()))? {
// Async::Ready(Some(buffer)) => {
// messages_received = i;
// // deserialize or log decode_error
// match buffer.to_message() {
// Ok(message) => match self.active_requests.entry(message.id()) {
// Entry::Occupied(mut request_entry) => {
// // first add the response to the active_requests responses
// let complete = {
// let mut active_request = request_entry.get_mut();
// active_request.add_response(message);
// // determine if this is complete
// !active_request.request_options().expects_multiple_responses
// };
// // now check if the request is complete
// if complete {
// let mut active_request = request_entry.remove();
// active_request.complete();
// }
// }
// Entry::Vacant(..) => debug!("unexpected request_id: {}", message.id()),
// },
// // TODO: return src address for diagnostics
// Err(e) => debug!("error decoding message: {}", e),
// }
// }
// Async::Ready(None) | Async::NotReady => break,
// }
// }
// // Clean shutdown happens when all pending requests are done and the
// // incoming channel has been closed (e.g. you'll never receive another
// // request). Errors will return early...
// let done = match self.new_receiver.peek() {
// Ok(Async::Ready(None)) => true,
// Ok(_) => false,
// Err(_) => return Err(E::from(ProtoErrorKind::NoError.into())),
// };
// if self.active_requests.is_empty() && done {
// return Ok(().into()); // we are done
// }
// // If still active, then if the qos (for _ in 0..100 loop) limit
// // was hit then "yield". This'll make sure that the future is
// // woken up immediately on the next turn of the event loop.
// if messages_received == QOS_MAX_RECEIVE_MSGS {
// task::current().notify();
// }
// // Finally, return not ready to keep the 'driver task' alive.
// Ok(Async::NotReady)
// }
// }
// /// Always returns the specified io::Error to the remote Sender
// struct ClientStreamErrored<E>
// where
// E: FromProtoError,
// {
// error: E,
// new_receiver: Peekable<
// StreamFuse<UnboundedReceiver<(DnsRequest, oneshot::Sender<Result<DnsResponse, E>>)>>,
// >,
// }
// impl<E> Future for ClientStreamErrored<E>
// where
// E: FromProtoError,
// {
// type Item = ();
// type Error = E;
// fn poll(&mut self) -> Poll<(), Self::Error> {
// match self.new_receiver.poll() {
// Ok(Async::Ready(Some((_, complete)))) => {
// // TODO: this error never seems to make it, the receiver closes early...
// ignore_send(complete.send(Err(self.error.clone())));
// task::current().notify();
// Ok(Async::NotReady)
// }
// Ok(Async::Ready(None)) => Ok(Async::Ready(())),
// _ => Err(E::from(ProtoErrorKind::NoError.into())),
// }
// }
// }
// enum ClientStreamOrError<S, E, MF, D = Box<DnsStreamHandle<Error = E>>>
// where
// D: Send + 'static,
// S: DnsClientStream + 'static,
// E: FromProtoError + Send,
// MF: MessageFinalizer + Send + Sync + 'static,
// {
// Future(DnsMultiplexer<S, E, MF, D>),
// Errored(ClientStreamErrored<E>),
// }
// impl<S, E, MF> Future for ClientStreamOrError<S, E, MF, Box<DnsStreamHandle<Error = E>>>
// where
// S: DnsClientStream + 'static,
// E: FromProtoError + Send + 'static,
// MF: MessageFinalizer + Send + Sync + 'static,
// {
// type Item = ();
// type Error = E;
// fn poll(&mut self) -> Poll<(), Self::Error> {
// match *self {
// ClientStreamOrError::Future(ref mut f) => Future::poll(f),
// ClientStreamOrError::Errored(ref mut e) => e.poll(),
// }
// }
// }

View File

@ -15,8 +15,8 @@ use futures::{Future, Poll, Stream};
use op::Message;
mod dns_exchange;
pub mod dns_future;
pub mod dns_handle;
pub mod dns_multiplexer;
pub mod dns_request;
pub mod dns_response;
pub mod retry_dns_handle;
@ -25,8 +25,8 @@ pub mod secure_dns_handle;
mod serial_message;
pub use self::dns_exchange::{DnsExchange, DnsExchangeConnect};
pub use self::dns_future::{DnsMultiplexer, DnsMultiplexerSerialResponse};
pub use self::dns_handle::{BasicDnsHandle, DnsHandle, DnsStreamHandle, StreamHandle};
pub use self::dns_multiplexer::{DnsMultiplexer, DnsMultiplexerSerialResponse};
pub use self::dns_request::{DnsRequest, DnsRequestOptions};
pub use self::dns_response::DnsResponse;
pub use self::retry_dns_handle::RetryDnsHandle;

View File

@ -39,7 +39,7 @@ mkdir -p target
export TDNS_SERVER_SRC_ROOT=./server
export COVERALLS_PARALLEL=true
SRC_PATHS=client/src,native-tls/src,openssl/src,proto/src,resolver/src,rustls/src,server/src
SRC_PATHS=client/src,native-tls/src,openssl/src,proto/src,https/src,resolver/src,rustls/src,server/src
EXCLUDE_PATHS=client/src/error,proto/src/error.rs,server/src/error,compatibility-tests/src/lib.rs
for i in target/debug/deps/trust_dns*-* target/debug/deps/*_tests-* ; do