refactor lookup types and cache

This commit is contained in:
Benjamin Fry 2017-09-03 00:41:56 -07:00
parent 967f25d53d
commit 9441aa7ff8
8 changed files with 673 additions and 597 deletions

View File

@ -3,6 +3,13 @@
All notable changes to this project will be documented in this file.
This project adheres to [Semantic Versioning](http://semver.org/).
## 0.11.3
### Added
- `lookup` to `ClientHandle`, simpler form with `Query`
- `query` to `Query` for ease of Query creation
## 0.11.2
(README.md documentation changes for crates.io)

View File

@ -39,9 +39,8 @@ pub trait ClientStreamHandle {
impl ClientStreamHandle for StreamHandle {
fn send(&mut self, buffer: Vec<u8>) -> io::Result<()> {
UnboundedSender::send(self, buffer).map_err(|_| {
io::Error::new(io::ErrorKind::Other,
"unknown")
})
io::Error::new(io::ErrorKind::Other, "unknown")
})
}
}
@ -74,16 +73,19 @@ impl<S: Stream<Item = Vec<u8>, Error = io::Error> + 'static> ClientFuture<S> {
/// the Stream will be spawned
/// * `stream_handle` - The handle for the `stream` on which bytes can be sent/received.
/// * `signer` - An optional signer for requests, needed for Updates with Sig0, otherwise not needed
pub fn new(stream: Box<Future<Item = S, Error = io::Error>>,
stream_handle: Box<ClientStreamHandle>,
loop_handle: &Handle,
signer: Option<Signer>)
-> BasicClientHandle {
Self::with_timeout(stream,
stream_handle,
loop_handle,
Duration::from_secs(5),
signer)
pub fn new(
stream: Box<Future<Item = S, Error = io::Error>>,
stream_handle: Box<ClientStreamHandle>,
loop_handle: &Handle,
signer: Option<Signer>,
) -> BasicClientHandle {
Self::with_timeout(
stream,
stream_handle,
loop_handle,
Duration::from_secs(5),
signer,
)
}
/// Spawns a new ClientFuture Stream.
@ -98,44 +100,41 @@ impl<S: Stream<Item = Vec<u8>, Error = io::Error> + 'static> ClientFuture<S> {
/// wait for a response before canceling the request.
/// * `stream_handle` - The handle for the `stream` on which bytes can be sent/received.
/// * `signer` - An optional signer for requests, needed for Updates with Sig0, otherwise not needed
pub fn with_timeout(stream: Box<Future<Item = S, Error = io::Error>>,
stream_handle: Box<ClientStreamHandle>,
loop_handle: &Handle,
timeout_duration: Duration,
signer: Option<Signer>)
-> BasicClientHandle {
pub fn with_timeout(
stream: Box<Future<Item = S, Error = io::Error>>,
stream_handle: Box<ClientStreamHandle>,
loop_handle: &Handle,
timeout_duration: Duration,
signer: Option<Signer>,
) -> BasicClientHandle {
let (sender, rx) = unbounded();
let loop_handle_clone = loop_handle.clone();
loop_handle
.spawn(stream
.then(move |res| match res {
Ok(stream) => {
ClientStreamOrError::Future(ClientFuture {
stream: stream,
reactor_handle:
loop_handle_clone,
timeout_duration:
timeout_duration,
stream_handle: stream_handle,
new_receiver: rx.fuse()
.peekable(),
active_requests:
HashMap::new(),
signer: signer,
})
}
Err(stream_error) => {
ClientStreamOrError::Errored(ClientStreamErrored {
error: stream_error,
new_receiver: rx.fuse()
.peekable(),
})
}
})
.map_err(|e: ClientError| {
error!("error in Client: {}", e);
}));
loop_handle.spawn(
stream
.then(move |res| match res {
Ok(stream) => {
ClientStreamOrError::Future(ClientFuture {
stream: stream,
reactor_handle: loop_handle_clone,
timeout_duration: timeout_duration,
stream_handle: stream_handle,
new_receiver: rx.fuse().peekable(),
active_requests: HashMap::new(),
signer: signer,
})
}
Err(stream_error) => {
ClientStreamOrError::Errored(ClientStreamErrored {
error: stream_error,
new_receiver: rx.fuse().peekable(),
})
}
})
.map_err(|e: ClientError| {
error!("error in Client: {}", e);
}),
);
BasicClientHandle { message_sender: sender }
}
@ -172,8 +171,9 @@ impl<S: Stream<Item = Vec<u8>, Error = io::Error> + 'static> ClientFuture<S> {
// then the otherside isn't really paying attention anyway)
// complete the request, it's failed...
req.send(Err(ClientErrorKind::Timeout.into()))
.expect("error notifying wait, possible future leak");
req.send(Err(ClientErrorKind::Timeout.into())).expect(
"error notifying wait, possible future leak",
);
}
}
}
@ -239,9 +239,9 @@ impl<S: Stream<Item = Vec<u8>, Error = io::Error> + 'static> Future for ClientFu
// TODO: it's too bad this happens here...
if let Err(e) = message.sign(signer, Utc::now().timestamp() as u32) {
warn!("could not sign message: {}", e);
complete
.send(Err(e.into()))
.expect("error notifying wait, possible future leak");
complete.send(Err(e.into())).expect(
"error notifying wait, possible future leak",
);
continue; // to the next message...
}
}
@ -252,9 +252,9 @@ impl<S: Stream<Item = Vec<u8>, Error = io::Error> + 'static> Future for ClientFu
Ok(timeout) => timeout,
Err(e) => {
warn!("could not create timer: {}", e);
complete
.send(Err(e.into()))
.expect("error notifying wait, possible future leak");
complete.send(Err(e.into())).expect(
"error notifying wait, possible future leak",
);
continue; // to the next message...
}
};
@ -266,15 +266,17 @@ impl<S: Stream<Item = Vec<u8>, Error = io::Error> + 'static> Future for ClientFu
try!(self.stream_handle.send(buffer));
// add to the map -after- the client send b/c we don't want to put it in the map if
// we ended up returning from the send.
self.active_requests
.insert(message.id(), (complete, timeout));
self.active_requests.insert(
message.id(),
(complete, timeout),
);
}
Err(e) => {
debug!("error message id: {} error: {}", query_id, e);
// complete with the error, don't add to the map of active requests
complete
.send(Err(e.into()))
.expect("error notifying wait, possible future leak");
complete.send(Err(e.into())).expect(
"error notifying wait, possible future leak",
);
}
}
}
@ -300,9 +302,9 @@ impl<S: Stream<Item = Vec<u8>, Error = io::Error> + 'static> Future for ClientFu
Ok(message) => {
match self.active_requests.remove(&message.id()) {
Some((complete, _)) => {
complete
.send(Ok(message))
.expect("error notifying wait, possible future leak")
complete.send(Ok(message)).expect(
"error notifying wait, possible future leak",
)
}
None => debug!("unexpected request_id: {}", message.id()),
}
@ -404,18 +406,20 @@ impl ClientHandle for BasicClientHandle {
Ok(()) => receiver,
Err(e) => {
let (complete, receiver) = oneshot::channel();
complete
.send(Err(e.into()))
.expect("error notifying wait, possible future leak");
complete.send(Err(e.into())).expect(
"error notifying wait, possible future leak",
);
receiver
}
};
// conver the oneshot into a Box of a Future message and error.
Box::new(receiver
.map_err(|c| ClientError::from(c))
.map(|result| result.into_future())
.flatten())
Box::new(
receiver
.map_err(|c| ClientError::from(c))
.map(|result| result.into_future())
.flatten(),
)
}
}
@ -433,25 +437,19 @@ pub trait ClientHandle: Clone {
/// A *classic* DNS query
///
/// *Note* As of now, this will not recurse on PTR or CNAME record responses, that is up to
/// the caller.
/// This is identical to `query`, but instead takes a `Query` object.
///
/// # Arguments
///
/// * `name` - the label to lookup
/// * `query_class` - most likely this should always be DNSClass::IN
/// * `query_type` - record type to lookup
fn query(&mut self,
name: domain::Name,
query_class: DNSClass,
query_type: RecordType)
-> Box<Future<Item = Message, Error = ClientError>> {
debug!("querying: {} {:?}", name, query_type);
/// * `query` - the query to lookup
fn lookup(&mut self, query: Query) -> Box<Future<Item = Message, Error = ClientError>> {
debug!("querying: {} {:?}", query.name(), query.query_type());
// build the message
let mut message: Message = Message::new();
let id: u16 = rand::random();
// TODO make recursion a parameter
message.add_query(query);
message
.set_id(id)
.set_message_type(MessageType::Query)
@ -465,17 +463,32 @@ pub trait ClientHandle: Clone {
edns.set_version(0);
}
// add the query
let mut query: Query = Query::new();
query
.set_name(name.clone())
.set_query_class(query_class)
.set_query_type(query_type);
message.add_query(query);
self.send(message)
}
/// A *classic* DNS query
///
/// *Note* As of now, this will not recurse on PTR or CNAME record responses, that is up to
/// the caller.
///
/// # Arguments
///
/// * `name` - the label to lookup
/// * `query_class` - most likely this should always be DNSClass::IN
/// * `query_type` - record type to lookup
fn query(
&mut self,
name: domain::Name,
query_class: DNSClass,
query_type: RecordType,
) -> Box<Future<Item = Message, Error = ClientError>> {
let mut query = Query::query(name, query_type);
query.set_query_class(query_class);
self.lookup(query)
}
/// Sends a NOTIFY message to the remote system
///
/// [RFC 1996](https://tools.ietf.org/html/rfc1996), DNS NOTIFY, August 1996
@ -536,13 +549,15 @@ pub trait ClientHandle: Clone {
/// * `query_class` - most likely this should always be DNSClass::IN
/// * `query_type` - record type which has been updated
/// * `rrset` - the new version of the record(s) being notified
fn notify<R>(&mut self,
name: domain::Name,
query_class: DNSClass,
query_type: RecordType,
rrset: Option<R>)
-> Box<Future<Item = Message, Error = ClientError>>
where R: IntoRecordSet
fn notify<R>(
&mut self,
name: domain::Name,
query_class: DNSClass,
query_type: RecordType,
rrset: Option<R>,
) -> Box<Future<Item = Message, Error = ClientError>>
where
R: IntoRecordSet,
{
debug!("notifying: {} {:?}", name, query_type);
@ -615,11 +630,13 @@ pub trait ClientHandle: Clone {
/// * `zone_origin` - the zone name to update, i.e. SOA name
///
/// The update must go to a zone authority (i.e. the server used in the ClientConnection)
fn create<R>(&mut self,
rrset: R,
zone_origin: domain::Name)
-> Box<Future<Item = Message, Error = ClientError>>
where R: IntoRecordSet
fn create<R>(
&mut self,
rrset: R,
zone_origin: domain::Name,
) -> Box<Future<Item = Message, Error = ClientError>>
where
R: IntoRecordSet,
{
// TODO: assert non-empty rrset?
let rrset = rrset.into_record_set();
@ -689,12 +706,14 @@ pub trait ClientHandle: Clone {
///
/// The update must go to a zone authority (i.e. the server used in the ClientConnection). If
/// the rrset does not exist and must_exist is false, then the RRSet will be created.
fn append<R>(&mut self,
rrset: R,
zone_origin: domain::Name,
must_exist: bool)
-> Box<Future<Item = Message, Error = ClientError>>
where R: IntoRecordSet
fn append<R>(
&mut self,
rrset: R,
zone_origin: domain::Name,
must_exist: bool,
) -> Box<Future<Item = Message, Error = ClientError>>
where
R: IntoRecordSet,
{
let rrset = rrset.into_record_set();
assert!(zone_origin.zone_of(rrset.name()));
@ -773,13 +792,15 @@ pub trait ClientHandle: Clone {
/// * `zone_origin` - the zone name to update, i.e. SOA name
///
/// The update must go to a zone authority (i.e. the server used in the ClientConnection).
fn compare_and_swap<C, N>(&mut self,
current: C,
new: N,
zone_origin: domain::Name)
-> Box<Future<Item = Message, Error = ClientError>>
where C: IntoRecordSet,
N: IntoRecordSet
fn compare_and_swap<C, N>(
&mut self,
current: C,
new: N,
zone_origin: domain::Name,
) -> Box<Future<Item = Message, Error = ClientError>>
where
C: IntoRecordSet,
N: IntoRecordSet,
{
let current = current.into_record_set();
let new = new.into_record_set();
@ -864,11 +885,13 @@ pub trait ClientHandle: Clone {
///
/// The update must go to a zone authority (i.e. the server used in the ClientConnection). If
/// the rrset does not exist and must_exist is false, then the RRSet will be deleted.
fn delete_by_rdata<R>(&mut self,
rrset: R,
zone_origin: domain::Name)
-> Box<Future<Item = Message, Error = ClientError>>
where R: IntoRecordSet
fn delete_by_rdata<R>(
&mut self,
rrset: R,
zone_origin: domain::Name,
) -> Box<Future<Item = Message, Error = ClientError>>
where
R: IntoRecordSet,
{
let mut rrset = rrset.into_record_set();
assert!(zone_origin.zone_of(rrset.name()));
@ -938,10 +961,11 @@ pub trait ClientHandle: Clone {
///
/// The update must go to a zone authority (i.e. the server used in the ClientConnection). If
/// the rrset does not exist and must_exist is false, then the RRSet will be deleted.
fn delete_rrset(&mut self,
mut record: Record,
zone_origin: domain::Name)
-> Box<Future<Item = Message, Error = ClientError>> {
fn delete_rrset(
&mut self,
mut record: Record,
zone_origin: domain::Name,
) -> Box<Future<Item = Message, Error = ClientError>> {
assert!(zone_origin.zone_of(record.name()));
// for updates, the query section is used for the zone
@ -1001,11 +1025,12 @@ pub trait ClientHandle: Clone {
/// The update must go to a zone authority (i.e. the server used in the ClientConnection). This
/// operation attempts to delete all resource record sets the the specified name reguardless of
/// the record type.
fn delete_all(&mut self,
name_of_records: domain::Name,
zone_origin: domain::Name,
dns_class: DNSClass)
-> Box<Future<Item = Message, Error = ClientError>> {
fn delete_all(
&mut self,
name_of_records: domain::Name,
zone_origin: domain::Name,
dns_class: DNSClass,
) -> Box<Future<Item = Message, Error = ClientError>> {
assert!(zone_origin.zone_of(&name_of_records));
// for updates, the query section is used for the zone

View File

@ -19,8 +19,8 @@
use rr::domain::Name;
use rr::record_type::RecordType;
use rr::dns_class::DNSClass;
use ::serialize::binary::*;
use ::error::*;
use serialize::binary::*;
use error::*;
/// Query struct for looking up resource records, basically a resource record without RDATA.
///
@ -63,6 +63,15 @@ impl Query {
}
}
/// Create a new query from name and type, class defaults to IN
pub fn query(name: Name, query_type: RecordType) -> Self {
Query {
name,
query_type,
query_class: DNSClass::IN,
}
}
/// replaces name with the new name
pub fn set_name(&mut self, name: Name) -> &mut Self {
self.name = name;

View File

@ -3,6 +3,16 @@
All notable changes to this project will be documented in this file.
This project adheres to [Semantic Versioning](http://semver.org/).
## 0.5.0
### Added
- Generic record type lookup
### Changed
- refactored lru cache and resolver to better cache other record types
## 0.4.0
### Removed

View File

@ -92,7 +92,6 @@
//! let mut resolver = ResolverFuture::new(ResolverConfig::default(), ResolverOpts::default(), &io_loop.handle());
//!
//! // Lookup the IP addresses associated with a name.
//! // NOTE: do not forget the final dot, as the resolver does not yet support search paths.
//! // This returns a future that will lookup the IP addresses, it must be run in the Core to
//! // to get the actual result.
//! let lookup_future = resolver.lookup_ip("www.example.com.");

View File

@ -6,22 +6,21 @@
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.
//! LookupHost result from a resolution with a Resolver
//! LookupIp result from a resolution of ipv4 and ipv6 records with a Resolver.
use std::error::Error;
use std::io;
use std::mem;
use std::net::IpAddr;
use std::slice::Iter;
use std::sync::{Arc, Mutex, TryLockError};
use std::time::Instant;
use std::sync::Arc;
use futures::{Async, future, Future, Poll, task};
use trust_dns::client::{ClientHandle, RetryClientHandle, SecureClientHandle};
use trust_dns::error::ClientError;
use trust_dns::op::Message;
use trust_dns::rr::{DNSClass, Name, RData, RecordType};
use trust_dns::op::{Message, Query};
use trust_dns::rr::{Name, RecordType};
use config::LookupIpStrategy;
use lru::DnsLru;
@ -80,12 +79,10 @@ pub type LookupIpFuture = InnerLookupIpFuture<LookupIpEither>;
#[doc(hidden)]
/// The Future returned from ResolverFuture when performing an A or AAAA lookup.
pub struct InnerLookupIpFuture<C: ClientHandle + 'static> {
client: C,
client_cache: DnsLru<C>,
names: Vec<Name>,
strategy: LookupIpStrategy,
future: Box<Future<Item = LookupIp, Error = io::Error>>,
// TODO: zero lock datastructure instead?
cache: Arc<Mutex<DnsLru>>,
}
impl<C: ClientHandle + 'static> InnerLookupIpFuture<C> {
@ -95,23 +92,20 @@ impl<C: ClientHandle + 'static> InnerLookupIpFuture<C> {
///
/// * `names` - a set of DNS names to attempt to resolve, they will be attempted in queue order, i.e. the first is `names.pop()`. Upon each failure, the next will be attempted.
/// * `strategy` - the lookup IP strategy to use
/// * `client` - connection to use for performing all lookups
/// * `cache` - a cache from which to attempt to load records prior to lookup
/// * `client_cache` - cache with a connection to use for performing all lookups
pub(crate) fn lookup(
mut names: Vec<Name>,
strategy: LookupIpStrategy,
client: &mut C,
cache: Arc<Mutex<DnsLru>>,
client_cache: &mut DnsLru<C>,
) -> Self {
let name = names.pop().expect("can not lookup IPs for no names");
let query = LookupIpState::lookup(name, strategy, client, cache.clone());
let query = strategic_lookup(name, strategy, client_cache);
InnerLookupIpFuture {
client: client.clone(),
client_cache: client_cache.clone(),
names,
strategy,
future: Box::new(query),
cache,
}
}
@ -121,8 +115,7 @@ impl<C: ClientHandle + 'static> InnerLookupIpFuture<C> {
) -> Poll<LookupIp, io::Error> {
let name = self.names.pop();
if let Some(name) = name {
let query =
LookupIpState::lookup(name, self.strategy, &mut self.client, self.cache.clone());
let query = strategic_lookup(name, self.strategy, &mut self.client_cache);
mem::replace(&mut self.future, Box::new(query));
// guarantee that we get scheduled for the next turn...
@ -133,16 +126,15 @@ impl<C: ClientHandle + 'static> InnerLookupIpFuture<C> {
}
}
pub(crate) fn error<E: Error>(client: C, error: E) -> Self {
pub(crate) fn error<E: Error>(client_cache: DnsLru<C>, error: E) -> Self {
return InnerLookupIpFuture {
// errors on names don't need to be cheap... i.e. this clone is unfortunate in this case.
client,
client_cache,
names: vec![],
strategy: LookupIpStrategy::default(),
future: Box::new(future::err(
io::Error::new(io::ErrorKind::Other, format!("{}", error)),
)),
cache: Arc::new(Mutex::new(DnsLru::new(0))),
};
}
}
@ -183,204 +175,12 @@ impl<C: ClientHandle + 'static> Future for InnerLookupIpFuture<C> {
// }
// }
struct FromCache {
name: Name,
strategy: LookupIpStrategy,
cache: Arc<Mutex<DnsLru>>,
}
impl Future for FromCache {
type Item = Option<LookupIp>;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// first transition any polling that is needed (mutable refs...)
match self.cache.try_lock() {
Err(TryLockError::WouldBlock) => {
task::current().notify(); // yield
return Ok(Async::NotReady);
}
// TODO: need to figure out a way to recover from this.
// It requires unwrapping the poisoned error and recreating the Mutex at a higher layer...
Err(TryLockError::Poisoned(poison)) => Err(io::Error::new(
io::ErrorKind::Other,
format!("poisoned: {}", poison),
)),
Ok(mut lru) => {
return Ok(Async::Ready(lru.get(&self.name, Instant::now())));
}
}
}
}
struct InsertCache {
ips: Vec<(IpAddr, u32)>,
name: Name,
cache: Arc<Mutex<DnsLru>>,
}
impl Future for InsertCache {
type Item = LookupIp;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// first transition any polling that is needed (mutable refs...)
match self.cache.try_lock() {
Err(TryLockError::WouldBlock) => {
task::current().notify(); // yield
return Ok(Async::NotReady);
}
// TODO: need to figure out a way to recover from this.
// It requires unwrapping the poisoned error and recreating the Mutex at a higher layer...
Err(TryLockError::Poisoned(poison)) => Err(io::Error::new(
io::ErrorKind::Other,
format!("poisoned: {}", poison),
)),
Ok(mut lru) => {
// this will put this object into an inconsistent state, but no one should call poll again...
let name = mem::replace(&mut self.name, Name::root());
let ips = mem::replace(&mut self.ips, vec![]);
return Ok(Async::Ready(lru.insert(name, ips, Instant::now())));
}
}
}
}
enum LookupIpState<C: ClientHandle + 'static> {
/// In the FromCache state we evaluate cache entries for any results
FromCache(FromCache, C),
/// In the query state there is an active query that's been started, see Self::lookup()
Query(Box<Future<Item = Vec<(IpAddr, u32)>, Error = ClientError>>, Name, Arc<Mutex<DnsLru>>),
/// State of adding the item to the cache
InsertCache(InsertCache),
/// A state which should not occur
Error,
}
impl<C: ClientHandle + 'static> LookupIpState<C> {
pub(crate) fn lookup(
name: Name,
strategy: LookupIpStrategy,
client: &mut C,
cache: Arc<Mutex<DnsLru>>,
) -> LookupIpState<C> {
LookupIpState::FromCache(
FromCache {
name,
strategy,
cache,
},
client.clone(),
)
}
/// Query after a failed cache lookup
///
/// # Panics
///
/// This will panic if the current state is not FromCache.
fn query_after_cache(&mut self) {
let from_cache_state = mem::replace(self, LookupIpState::Error);
// TODO: with specialization, could we define a custom query only on the FromCache type?
match from_cache_state {
LookupIpState::FromCache(from_cache, mut client) => {
let query_future =
strategic_lookup(from_cache.name.clone(), from_cache.strategy, &mut client);
mem::replace(
self,
LookupIpState::Query(query_future, from_cache.name, from_cache.cache),
);
}
_ => panic!("bad state, expected FromCache"),
}
}
fn cache(&mut self, ips: Vec<(IpAddr, u32)>) {
// The error state, this query is complete...
let query_state = mem::replace(self, LookupIpState::Error);
match query_state {
LookupIpState::Query(_, name, cache) => {
mem::replace(
self,
LookupIpState::InsertCache(InsertCache { ips, name, cache }),
);
}
_ => panic!("bad state, expected Query"),
}
}
}
impl<C: ClientHandle + 'static> Future for LookupIpState<C> {
type Item = LookupIp;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// first transition any polling that is needed (mutable refs...)
let poll;
match *self {
LookupIpState::FromCache(ref mut from_cache, ..) => {
match from_cache.poll() {
// need to query since it wasn't in the cache
Ok(Async::Ready(None)) => (), // handled below
Ok(Async::Ready(Some(ips))) => return Ok(Async::Ready(ips)),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(error) => return Err(error),
};
poll = Ok(Async::NotReady);
}
LookupIpState::Query(ref mut query, ..) => {
poll = query.poll().map_err(|e| e.into());
match poll {
Ok(Async::NotReady) => {
return Ok(Async::NotReady);
}
Ok(Async::Ready(_)) => (), // handled in next match
Err(e) => {
return Err(e);
}
}
}
LookupIpState::InsertCache(ref mut insert_cache) => {
return insert_cache.poll();
// match insert_cache.poll() {
// // need to query since it wasn't in the cache
// Ok(Async::Ready(ips)) => return Ok(Async::Ready(ips)),
// Ok(Async::NotReady) => return Ok(Async::NotReady),
// Err(error) => return Err(error),
// }
}
LookupIpState::Error => panic!("invalid error state"),
}
// getting here means there are Aync::Ready available.
match *self {
LookupIpState::FromCache(..) => self.query_after_cache(),
LookupIpState::Query(..) => {
match poll {
Ok(Async::Ready(ips)) => {
self.cache(ips.clone());
}
_ => panic!("should have returned earlier"),
}
}
_ => panic!("should have returned earlier"),
}
task::current().notify(); // yield
return Ok(Async::NotReady);
}
}
/// returns a new future for lookup
fn strategic_lookup<C: ClientHandle + 'static>(
name: Name,
strategy: LookupIpStrategy,
client: &mut C,
) -> Box<Future<Item = Vec<(IpAddr, u32)>, Error = ClientError>> {
client: &mut DnsLru<C>,
) -> Box<Future<Item = LookupIp, Error = io::Error>> {
match strategy {
LookupIpStrategy::Ipv4Only => ipv4_only(name, client),
LookupIpStrategy::Ipv6Only => ipv6_only(name, client),
@ -390,68 +190,52 @@ fn strategic_lookup<C: ClientHandle + 'static>(
}
}
fn map_message_to_ipaddr(mut message: Message) -> Vec<(IpAddr, u32)> {
message
.take_answers()
.iter()
.filter_map(|r| {
let ttl = r.ttl();
match *r.rdata() {
RData::A(ipaddr) => Some((IpAddr::V4(ipaddr), ttl)),
RData::AAAA(ipaddr) => Some((IpAddr::V6(ipaddr), ttl)),
_ => None,
}
})
.collect()
}
/// queries only for A records
fn ipv4_only<C: ClientHandle>(
fn ipv4_only<C: ClientHandle + 'static>(
name: Name,
client: &mut C,
) -> Box<Future<Item = Vec<(IpAddr, u32)>, Error = ClientError>> {
Box::new(client.query(name, DNSClass::IN, RecordType::A).map(
map_message_to_ipaddr,
))
client: &mut DnsLru<C>,
) -> Box<Future<Item = LookupIp, Error = io::Error>> {
client.lookup(Query::query(name, RecordType::A))
}
/// queries only for AAAA records
fn ipv6_only<C: ClientHandle>(
fn ipv6_only<C: ClientHandle + 'static>(
name: Name,
client: &mut C,
) -> Box<Future<Item = Vec<(IpAddr, u32)>, Error = ClientError>> {
Box::new(client.query(name, DNSClass::IN, RecordType::AAAA).map(
map_message_to_ipaddr,
))
client: &mut DnsLru<C>,
) -> Box<Future<Item = LookupIp, Error = io::Error>> {
client.lookup(Query::query(name, RecordType::AAAA))
}
/// queries only for A and AAAA in parallel
fn ipv4_and_ipv6<C: ClientHandle>(
fn ipv4_and_ipv6<C: ClientHandle + 'static>(
name: Name,
client: &mut C,
) -> Box<Future<Item = Vec<(IpAddr, u32)>, Error = ClientError>> {
client: &mut DnsLru<C>,
) -> Box<Future<Item = LookupIp, Error = io::Error>> {
Box::new(
client
.query(name.clone(), DNSClass::IN, RecordType::A)
.map(map_message_to_ipaddr)
.select(client.query(name, DNSClass::IN, RecordType::AAAA).map(
map_message_to_ipaddr,
))
.lookup(Query::query(name.clone(), RecordType::A))
.select(client.lookup(Query::query(name.clone(), RecordType::AAAA)))
.then(|sel_res| {
match sel_res {
// Some ips returned, get the other record result, or else just return record
Ok((mut ips, remaining_query)) => {
Ok((ips, remaining_query)) => {
Box::new(remaining_query.then(move |query_res| match query_res {
/// join AAAA and A results
Ok(mut rem_ips) => {
rem_ips.append(&mut ips);
future::ok(rem_ips)
Ok(rem_ips) => {
// TODO: create a LookupIp enum with the ability to chain these together
let len = rem_ips.ips.len() + ips.ips.len();
let mut ret_ips = Vec::with_capacity(len);
ret_ips.extend_from_slice(&*rem_ips.ips);
ret_ips.extend_from_slice(&*ips.ips);
future::ok(LookupIp::new(Arc::new(ret_ips)))
}
// One failed, just return the other
Err(_) => future::ok(ips),
})) as
// This cast is to resolve a comilation error, not sure of it's necessity
Box<Future<Item = Vec<(IpAddr, u32)>, Error = ClientError>>
Box<Future<Item = LookupIp, Error = io::Error>>
}
// One failed, just return the other
@ -464,55 +248,53 @@ fn ipv4_and_ipv6<C: ClientHandle>(
/// queries only for AAAA and on no results queries for A
fn ipv6_then_ipv4<C: ClientHandle + 'static>(
name: Name,
client: &mut C,
) -> Box<Future<Item = Vec<(IpAddr, u32)>, Error = ClientError>> {
client: &mut DnsLru<C>,
) -> Box<Future<Item = LookupIp, Error = io::Error>> {
rt_then_swap(name, client, RecordType::AAAA, RecordType::A)
}
/// queries only for A and on no results queries for AAAA
fn ipv4_then_ipv6<C: ClientHandle + 'static>(
name: Name,
client: &mut C,
) -> Box<Future<Item = Vec<(IpAddr, u32)>, Error = ClientError>> {
client: &mut DnsLru<C>,
) -> Box<Future<Item = LookupIp, Error = io::Error>> {
rt_then_swap(name, client, RecordType::A, RecordType::AAAA)
}
/// queries only for first_type and on no results queries for second_type
fn rt_then_swap<C: ClientHandle + 'static>(
name: Name,
client: &mut C,
client: &mut DnsLru<C>,
first_type: RecordType,
second_type: RecordType,
) -> Box<Future<Item = Vec<(IpAddr, u32)>, Error = ClientError>> {
) -> Box<Future<Item = LookupIp, Error = io::Error>> {
let mut or_client = client.clone();
Box::new(
client
.query(name.clone(), DNSClass::IN, first_type)
.map(map_message_to_ipaddr)
.lookup(Query::query(name.clone(), first_type))
.then(move |res| {
match res {
Ok(ips) => {
println!("ips");
if ips.is_empty() {
// FIXME: lookup should return an Option, Option::None should be cached for NxDomain
if ips.ips.is_empty() {
println!("ips, are empty");
// no ips returns, NXDomain or Otherwise, doesn't matter
Box::new(
or_client
.query(name.clone(), DNSClass::IN, second_type)
.map(map_message_to_ipaddr),
.lookup(Query::query(name.clone(), second_type)),
) as
Box<Future<Item = Vec<(IpAddr, u32)>, Error = ClientError>>
Box<Future<Item = LookupIp, Error = io::Error>>
} else {
Box::new(future::ok(ips)) as
Box<Future<Item = Vec<(IpAddr, u32)>, Error = ClientError>>
Box<Future<Item = LookupIp, Error = io::Error>>
}
}
Err(_) => {
Box::new(
or_client
.query(name.clone(), DNSClass::IN, second_type)
.map(map_message_to_ipaddr),
.lookup(Query::query(name.clone(), second_type))
)
}
}
@ -521,7 +303,7 @@ fn rt_then_swap<C: ClientHandle + 'static>(
}
#[cfg(test)]
mod tests {
pub mod tests {
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::sync::{Arc, Mutex};
@ -535,7 +317,7 @@ mod tests {
use super::*;
#[derive(Clone)]
struct MockClientHandle {
pub struct MockClientHandle {
messages: Arc<Mutex<Vec<ClientResult<Message>>>>,
}
@ -547,7 +329,7 @@ mod tests {
}
}
fn v4_message() -> ClientResult<Message> {
pub fn v4_message() -> ClientResult<Message> {
let mut message = Message::new();
message.insert_answers(vec![
Record::from_rdata(
@ -560,7 +342,7 @@ mod tests {
Ok(message)
}
fn v6_message() -> ClientResult<Message> {
pub fn v6_message() -> ClientResult<Message> {
let mut message = Message::new();
message.insert_answers(vec![
Record::from_rdata(
@ -573,26 +355,26 @@ mod tests {
Ok(message)
}
fn empty() -> ClientResult<Message> {
pub fn empty() -> ClientResult<Message> {
Ok(Message::new())
}
fn error() -> ClientResult<Message> {
pub fn error() -> ClientResult<Message> {
Err(ClientErrorKind::Io.into())
}
fn mock(messages: Vec<ClientResult<Message>>) -> MockClientHandle {
pub fn mock(messages: Vec<ClientResult<Message>>) -> MockClientHandle {
MockClientHandle { messages: Arc::new(Mutex::new(messages)) }
}
#[test]
fn test_ipv4_only_strategy() {
assert_eq!(
ipv4_only(Name::root(), &mut mock(vec![v4_message()]))
ipv4_only(Name::root(), &mut DnsLru::new(0, mock(vec![v4_message()])))
.wait()
.unwrap()
.into_iter()
.map(|(ip, _)| ip)
.iter()
.cloned()
.collect::<Vec<IpAddr>>(),
vec![Ipv4Addr::new(127, 0, 0, 1)]
);
@ -601,11 +383,11 @@ mod tests {
#[test]
fn test_ipv6_only_strategy() {
assert_eq!(
ipv6_only(Name::root(), &mut mock(vec![v6_message()]))
ipv6_only(Name::root(), &mut DnsLru::new(0, mock(vec![v6_message()])))
.wait()
.unwrap()
.into_iter()
.map(|(ip, _)| ip)
.iter()
.cloned()
.collect::<Vec<IpAddr>>(),
vec![Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)]
);
@ -615,11 +397,11 @@ mod tests {
fn test_ipv4_and_ipv6_strategy() {
// both succeed
assert_eq!(
ipv4_and_ipv6(Name::root(), &mut mock(vec![v4_message(), v6_message()]))
ipv4_and_ipv6(Name::root(), &mut DnsLru::new(0, mock(vec![v4_message(), v6_message()])))
.wait()
.unwrap()
.into_iter()
.map(|(ip, _)| ip)
.iter()
.cloned()
.collect::<Vec<IpAddr>>(),
vec![
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
@ -629,33 +411,33 @@ mod tests {
// only ipv4 available
assert_eq!(
ipv4_and_ipv6(Name::root(), &mut mock(vec![v4_message()]))
ipv4_and_ipv6(Name::root(), &mut DnsLru::new(0, mock(vec![v4_message()])))
.wait()
.unwrap()
.into_iter()
.map(|(ip, _)| ip)
.iter()
.cloned()
.collect::<Vec<IpAddr>>(),
vec![IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))]
);
// only ipv6 available
assert_eq!(
ipv4_and_ipv6(Name::root(), &mut mock(vec![v6_message()]))
ipv4_and_ipv6(Name::root(), &mut DnsLru::new(0, mock(vec![v6_message()])))
.wait()
.unwrap()
.into_iter()
.map(|(ip, _)| ip)
.iter()
.cloned()
.collect::<Vec<IpAddr>>(),
vec![IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1))]
);
// error, then only ipv6 available
assert_eq!(
ipv4_and_ipv6(Name::root(), &mut mock(vec![error(), v6_message()]))
ipv4_and_ipv6(Name::root(), &mut DnsLru::new(0, mock(vec![error(), v6_message()])))
.wait()
.unwrap()
.into_iter()
.map(|(ip, _)| ip)
.iter()
.cloned()
.collect::<Vec<IpAddr>>(),
vec![IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1))]
);
@ -665,33 +447,33 @@ mod tests {
fn test_ipv6_then_ipv4_strategy() {
// ipv6 first
assert_eq!(
ipv6_then_ipv4(Name::root(), &mut mock(vec![v6_message()]))
ipv6_then_ipv4(Name::root(), &mut DnsLru::new(0, mock(vec![v6_message()])))
.wait()
.unwrap()
.into_iter()
.map(|(ip, _)| ip)
.iter()
.cloned()
.collect::<Vec<IpAddr>>(),
vec![Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)]
);
// nothing then ipv4
assert_eq!(
ipv6_then_ipv4(Name::root(), &mut mock(vec![v4_message(), empty()]))
ipv6_then_ipv4(Name::root(), &mut DnsLru::new(0, mock(vec![v4_message(), empty()])))
.wait()
.unwrap()
.into_iter()
.map(|(ip, _)| ip)
.iter()
.cloned()
.collect::<Vec<IpAddr>>(),
vec![Ipv4Addr::new(127, 0, 0, 1)]
);
// ipv4 and error
assert_eq!(
ipv6_then_ipv4(Name::root(), &mut mock(vec![v4_message(), error()]))
ipv6_then_ipv4(Name::root(), &mut DnsLru::new(0, mock(vec![v4_message(), error()])))
.wait()
.unwrap()
.into_iter()
.map(|(ip, _)| ip)
.iter()
.cloned()
.collect::<Vec<IpAddr>>(),
vec![Ipv4Addr::new(127, 0, 0, 1)]
);
@ -701,105 +483,35 @@ mod tests {
fn test_ipv4_then_ipv6_strategy() {
// ipv6 first
assert_eq!(
ipv4_then_ipv6(Name::root(), &mut mock(vec![v4_message()]))
ipv4_then_ipv6(Name::root(), &mut DnsLru::new(0, mock(vec![v4_message()])))
.wait()
.unwrap()
.into_iter()
.map(|(ip, _)| ip)
.iter()
.cloned()
.collect::<Vec<IpAddr>>(),
vec![Ipv4Addr::new(127, 0, 0, 1)]
);
// nothing then ipv6
assert_eq!(
ipv4_then_ipv6(Name::root(), &mut mock(vec![v6_message(), empty()]))
ipv4_then_ipv6(Name::root(), &mut DnsLru::new(0, mock(vec![v6_message(), empty()])))
.wait()
.unwrap()
.into_iter()
.map(|(ip, _)| ip)
.iter()
.cloned()
.collect::<Vec<IpAddr>>(),
vec![Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)]
);
// error then ipv6
assert_eq!(
ipv4_then_ipv6(Name::root(), &mut mock(vec![v6_message(), error()]))
ipv4_then_ipv6(Name::root(), &mut DnsLru::new(0, mock(vec![v6_message(), error()])))
.wait()
.unwrap()
.into_iter()
.map(|(ip, _)| ip)
.iter()
.cloned()
.collect::<Vec<IpAddr>>(),
vec![Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)]
);
}
#[test]
fn test_empty_cache() {
let cache = Arc::new(Mutex::new(DnsLru::new(1)));
let mut client = mock(vec![empty()]);
let ips =
LookupIpState::lookup(Name::root(), LookupIpStrategy::Ipv4Only, &mut client, cache)
.wait()
.unwrap();
assert!(ips.iter().next().is_none());
}
#[test]
fn test_from_cache() {
let cache = Arc::new(Mutex::new(DnsLru::new(1)));
cache.lock().unwrap().insert(
Name::root(),
vec![
(IpAddr::from(Ipv4Addr::new(127, 0, 0, 1)), u32::max_value()),
],
Instant::now(),
);
let mut client = mock(vec![empty()]);
let ips =
LookupIpState::lookup(Name::root(), LookupIpStrategy::Ipv4Only, &mut client, cache)
.wait()
.unwrap();
assert_eq!(
ips.iter().cloned().collect::<Vec<IpAddr>>(),
vec![Ipv4Addr::new(127, 0, 0, 1)]
);
}
#[test]
fn test_no_cache_insert() {
let cache = Arc::new(Mutex::new(DnsLru::new(1)));
// first should come from client...
let mut client = mock(vec![v4_message()]);
let ips = LookupIpState::lookup(
Name::root(),
LookupIpStrategy::Ipv4Only,
&mut client,
cache.clone(),
).wait()
.unwrap();
assert_eq!(
ips.iter().cloned().collect::<Vec<IpAddr>>(),
vec![Ipv4Addr::new(127, 0, 0, 1)]
);
// next should come from cache...
let mut client = mock(vec![empty()]);
let ips =
LookupIpState::lookup(Name::root(), LookupIpStrategy::Ipv4Only, &mut client, cache)
.wait()
.unwrap();
assert_eq!(
ips.iter().cloned().collect::<Vec<IpAddr>>(),
vec![Ipv4Addr::new(127, 0, 0, 1)]
);
}
}

View File

@ -1,17 +1,25 @@
use std::io;
use std::mem;
use std::net::IpAddr;
use std::sync::Arc;
use std::sync::{Arc, Mutex, TryLockError};
use std::time::{Duration, Instant};
use futures::{Async, Future, Poll, task};
use trust_dns::client::ClientHandle;
use trust_dns::error::ClientError;
use trust_dns::op::{Query, Message};
use trust_dns::rr::RData;
use lookup_ip::LookupIp;
use lru_cache::LruCache;
use trust_dns::rr::Name;
/// Maximum TTL as defined in https://tools.ietf.org/html/rfc2181
const MAX_TTL: u32 = 2147483647_u32;
#[derive(Debug)]
struct LruValue {
// FIXME: change to RData
ips: LookupIp,
ttl_until: Instant,
}
@ -23,68 +31,307 @@ impl LruValue {
}
}
#[derive(Debug)]
pub(crate) struct DnsLru(LruCache<Name, LruValue>);
#[derive(Clone, Debug)]
pub(crate) struct DnsLru<C: ClientHandle> {
lru: Arc<Mutex<LruCache<Query, LruValue>>>,
client: C,
}
impl DnsLru {
pub(crate) fn new(max_size: usize) -> Self {
DnsLru(LruCache::new(max_size))
impl<C: ClientHandle + 'static> DnsLru<C> {
pub(crate) fn new(max_size: usize, client: C) -> Self {
DnsLru {
lru: Arc::new(Mutex::new(LruCache::new(max_size))),
client,
}
}
// TODO: need to consider NXDomain storage...
pub(crate) fn insert(
&mut self,
name: Name,
ips_and_ttl: Vec<(IpAddr, u32)>,
now: Instant,
) -> LookupIp {
let len = ips_and_ttl.len();
// collapse the values, we're going to take the Minimum TTL as the correct one
let (ips, ttl): (Vec<IpAddr>, u32) =
ips_and_ttl.into_iter().fold(
(Vec::with_capacity(len), MAX_TTL),
|(mut ips, mut min_ttl),
(ip, ttl)| {
ips.push(ip);
min_ttl = if ttl < min_ttl { ttl } else { min_ttl };
(ips, min_ttl)
},
);
pub fn lookup(&mut self, query: Query) -> Box<Future<Item = LookupIp, Error = io::Error>> {
Box::new(QueryState::lookup(
query,
&mut self.client,
self.lru.clone(),
))
}
}
let ttl = Duration::from_secs(ttl as u64);
let ttl_until = now + ttl;
// TODO: need to consider NXDomain storage...
fn insert(
lru: &mut LruCache<Query, LruValue>,
query: Query,
ips_and_ttl: Vec<(IpAddr, u32)>,
now: Instant,
) -> LookupIp {
let len = ips_and_ttl.len();
// collapse the values, we're going to take the Minimum TTL as the correct one
let (ips, ttl): (Vec<IpAddr>, u32) = ips_and_ttl.into_iter().fold(
(Vec::with_capacity(len), MAX_TTL),
|(mut ips, mut min_ttl),
(ip, ttl)| {
ips.push(ip);
min_ttl = if ttl < min_ttl { ttl } else { min_ttl };
(ips, min_ttl)
},
);
// insert into the LRU
let ips = LookupIp::new(Arc::new(ips));
self.0.insert(
name,
LruValue {
ips: ips.clone(),
ttl_until,
},
);
let ttl = Duration::from_secs(ttl as u64);
let ttl_until = now + ttl;
ips
// insert into the LRU
let ips = LookupIp::new(Arc::new(ips));
lru.insert(
query,
LruValue {
ips: ips.clone(),
ttl_until,
},
);
ips
}
/// This needs to be mut b/c it's an LRU, meaning the ordering of elements will potentially change on retrieval...
fn get(lru: &mut LruCache<Query, LruValue>, query: &Query, now: Instant) -> Option<LookupIp> {
let ips = lru.get_mut(query).and_then(
|value| if value.is_current(now) {
Some(value.ips.clone())
} else {
None
},
);
// in this case, we can preemtively remove out of data elements
// this assumes time is always moving forward, this would only not be true in contrived situations where now
// is not current time, like tests...
if ips.is_none() {
lru.remove(query);
}
/// This needs to be mut b/c it's an LRU, meaning the ordering of elements will potentially change on retrieval...
pub(crate) fn get(&mut self, name: &Name, now: Instant) -> Option<LookupIp> {
let ips = self.0.get_mut(name).and_then(
|value| if value.is_current(now) {
Some(value.ips.clone())
} else {
None
},
);
ips
}
// in this case, we can preemtively remove out of data elements
// this assumes time is always moving forward, this would only not be true in contrived situations where now
// is not current time, like tests...
if ips.is_none() {
self.0.remove(name);
struct FromCache {
query: Query,
cache: Arc<Mutex<LruCache<Query, LruValue>>>,
}
impl Future for FromCache {
type Item = Option<LookupIp>;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// first transition any polling that is needed (mutable refs...)
match self.cache.try_lock() {
Err(TryLockError::WouldBlock) => {
task::current().notify(); // yield
return Ok(Async::NotReady);
}
// TODO: need to figure out a way to recover from this.
// It requires unwrapping the poisoned error and recreating the Mutex at a higher layer...
Err(TryLockError::Poisoned(poison)) => Err(io::Error::new(
io::ErrorKind::Other,
format!("poisoned: {}", poison),
)),
Ok(mut lru) => {
return Ok(Async::Ready(get(&mut lru, &self.query, Instant::now())));
}
}
}
}
struct QueryFuture {
message_future: Box<Future<Item = Message, Error = ClientError>>,
query: Query,
cache: Arc<Mutex<LruCache<Query, LruValue>>>,
}
impl Future for QueryFuture {
type Item = Vec<(IpAddr, u32)>;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.message_future.poll() {
Ok(Async::Ready(mut message)) => {
let records = message
.take_answers()
.iter()
.filter_map(|r| {
// FIXME: need to store RData, not IP
let ttl = r.ttl();
match *r.rdata() {
RData::A(ipaddr) => Some((IpAddr::V4(ipaddr), ttl)),
RData::AAAA(ipaddr) => Some((IpAddr::V6(ipaddr), ttl)),
_ => None,
}
})
.collect();
Ok(Async::Ready(records))
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => Err(err.into()),
}
}
}
struct InsertCache {
ips: Vec<(IpAddr, u32)>,
query: Query,
cache: Arc<Mutex<LruCache<Query, LruValue>>>,
}
impl Future for InsertCache {
type Item = LookupIp;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// first transition any polling that is needed (mutable refs...)
match self.cache.try_lock() {
Err(TryLockError::WouldBlock) => {
task::current().notify(); // yield
return Ok(Async::NotReady);
}
// TODO: need to figure out a way to recover from this.
// It requires unwrapping the poisoned error and recreating the Mutex at a higher layer...
Err(TryLockError::Poisoned(poison)) => Err(io::Error::new(
io::ErrorKind::Other,
format!("poisoned: {}", poison),
)),
Ok(mut lru) => {
// this will put this object into an inconsistent state, but no one should call poll again...
let query = mem::replace(&mut self.query, Query::new());
let ips = mem::replace(&mut self.ips, vec![]);
return Ok(Async::Ready(insert(&mut *lru, query, ips, Instant::now())));
}
}
}
}
enum QueryState<C: ClientHandle + 'static> {
/// In the FromCache state we evaluate cache entries for any results
FromCache(FromCache, C),
/// In the query state there is an active query that's been started, see Self::lookup()
Query(QueryFuture),
/// State of adding the item to the cache
InsertCache(InsertCache),
/// A state which should not occur
Error,
}
impl<C: ClientHandle + 'static> QueryState<C> {
pub(crate) fn lookup(
query: Query,
client: &mut C,
cache: Arc<Mutex<LruCache<Query, LruValue>>>,
) -> QueryState<C> {
QueryState::FromCache(FromCache { query, cache }, client.clone())
}
/// Query after a failed cache lookup
///
/// # Panics
///
/// This will panic if the current state is not FromCache.
fn query_after_cache(&mut self) {
let from_cache_state = mem::replace(self, QueryState::Error);
// TODO: with specialization, could we define a custom query only on the FromCache type?
match from_cache_state {
QueryState::FromCache(from_cache, mut client) => {
let query = from_cache.query;
let message_future = client.lookup(query.clone());
mem::replace(
self,
QueryState::Query(QueryFuture {
message_future,
query,
cache: from_cache.cache,
}),
);
}
_ => panic!("bad state, expected FromCache"),
}
}
fn cache(&mut self, ips: Vec<(IpAddr, u32)>) {
// The error state, this query is complete...
let query_state = mem::replace(self, QueryState::Error);
match query_state {
QueryState::Query(QueryFuture {
message_future: _,
query,
cache,
}) => {
mem::replace(
self,
QueryState::InsertCache(InsertCache { ips, query, cache }),
);
}
_ => panic!("bad state, expected Query"),
}
}
}
impl<C: ClientHandle + 'static> Future for QueryState<C> {
type Item = LookupIp;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// first transition any polling that is needed (mutable refs...)
let poll;
match *self {
QueryState::FromCache(ref mut from_cache, ..) => {
match from_cache.poll() {
// need to query since it wasn't in the cache
Ok(Async::Ready(None)) => (), // handled below
Ok(Async::Ready(Some(ips))) => return Ok(Async::Ready(ips)),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(error) => return Err(error),
};
poll = Ok(Async::NotReady);
}
QueryState::Query(ref mut query, ..) => {
poll = query.poll().map_err(|e| e.into());
match poll {
Ok(Async::NotReady) => {
return Ok(Async::NotReady);
}
Ok(Async::Ready(_)) => (), // handled in next match
Err(e) => {
return Err(e);
}
}
}
QueryState::InsertCache(ref mut insert_cache) => {
return insert_cache.poll();
// match insert_cache.poll() {
// // need to query since it wasn't in the cache
// Ok(Async::Ready(ips)) => return Ok(Async::Ready(ips)),
// Ok(Async::NotReady) => return Ok(Async::NotReady),
// Err(error) => return Err(error),
// }
}
QueryState::Error => panic!("invalid error state"),
}
ips
// getting here means there are Aync::Ready available.
match *self {
QueryState::FromCache(..) => self.query_after_cache(),
QueryState::Query(..) => {
match poll {
Ok(Async::Ready(ips)) => {
self.cache(ips);
}
_ => panic!("should have returned earlier"),
}
}
_ => panic!("should have returned earlier"),
}
task::current().notify(); // yield
return Ok(Async::NotReady);
}
}
@ -94,7 +341,12 @@ mod tests {
use std::str::FromStr;
use std::time::*;
use lru_cache::LruCache;
use trust_dns::op::Query;
use trust_dns::rr::{Name, RecordType};
use super::*;
use lookup_ip::tests::*;
#[test]
fn test_is_current() {
@ -117,22 +369,22 @@ mod tests {
#[test]
fn test_insert() {
let now = Instant::now();
let name = Name::from_str("www.example.com.").unwrap();
let name = Query::query(Name::from_str("www.example.com.").unwrap(), RecordType::A);
let ips_ttl = vec![(IpAddr::from(Ipv4Addr::new(127, 0, 0, 1)), 1)];
let ips = vec![IpAddr::from(Ipv4Addr::new(127, 0, 0, 1))];
let mut lru = DnsLru::new(1);
let mut lru = LruCache::new(1);
let rc_ips = lru.insert(name.clone(), ips_ttl, now);
let rc_ips = insert(&mut lru, name.clone(), ips_ttl, now);
assert_eq!(*rc_ips.iter().next().unwrap(), ips[0]);
let rc_ips = lru.get(&name, now).unwrap();
let rc_ips = get(&mut lru, &name, now).unwrap();
assert_eq!(*rc_ips.iter().next().unwrap(), ips[0]);
}
#[test]
fn test_insert_ttl() {
let now = Instant::now();
let name = Name::from_str("www.example.com.").unwrap();
let name = Query::query(Name::from_str("www.example.com.").unwrap(), RecordType::A);
// TTL should be 1
let ips_ttl = vec![
(IpAddr::from(Ipv4Addr::new(127, 0, 0, 1)), 1),
@ -142,16 +394,80 @@ mod tests {
IpAddr::from(Ipv4Addr::new(127, 0, 0, 1)),
IpAddr::from(Ipv4Addr::new(127, 0, 0, 2)),
];
let mut lru = DnsLru::new(1);
let mut lru = LruCache::new(1);
lru.insert(name.clone(), ips_ttl, now);
insert(&mut lru, name.clone(), ips_ttl, now);
// still valid
let rc_ips = lru.get(&name, now + Duration::from_secs(1)).unwrap();
let rc_ips = get(&mut lru, &name, now + Duration::from_secs(1)).unwrap();
assert_eq!(*rc_ips.iter().next().unwrap(), ips[0]);
// 2 should be one too far
let rc_ips = lru.get(&name, now + Duration::from_secs(2));
let rc_ips = get(&mut lru, &name, now + Duration::from_secs(2));
assert!(rc_ips.is_none());
}
#[test]
fn test_empty_cache() {
let cache = Arc::new(Mutex::new(LruCache::new(1)));
let mut client = mock(vec![empty()]);
let ips = QueryState::lookup(Query::new(), &mut client, cache)
.wait()
.unwrap();
assert!(ips.iter().next().is_none());
}
#[test]
fn test_from_cache() {
let cache = Arc::new(Mutex::new(LruCache::new(1)));
insert(
&mut cache.lock().unwrap(),
Query::new(),
vec![
(IpAddr::from(Ipv4Addr::new(127, 0, 0, 1)), u32::max_value()),
],
Instant::now(),
);
let mut client = mock(vec![empty()]);
let ips = QueryState::lookup(Query::new(), &mut client, cache)
.wait()
.unwrap();
assert_eq!(
ips.iter().cloned().collect::<Vec<IpAddr>>(),
vec![Ipv4Addr::new(127, 0, 0, 1)]
);
}
#[test]
fn test_no_cache_insert() {
let cache = Arc::new(Mutex::new(LruCache::new(1)));
// first should come from client...
let mut client = mock(vec![v4_message()]);
let ips = QueryState::lookup(Query::new(), &mut client, cache.clone())
.wait()
.unwrap();
assert_eq!(
ips.iter().cloned().collect::<Vec<IpAddr>>(),
vec![Ipv4Addr::new(127, 0, 0, 1)]
);
// next should come from cache...
let mut client = mock(vec![empty()]);
let ips = QueryState::lookup(Query::new(), &mut client, cache)
.wait()
.unwrap();
assert_eq!(
ips.iter().cloned().collect::<Vec<IpAddr>>(),
vec![Ipv4Addr::new(127, 0, 0, 1)]
);
}
}

View File

@ -8,7 +8,6 @@
//! Structs for creating and using a ResolverFuture
use std::io;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use tokio_core::reactor::Handle;
use trust_dns::client::{RetryClientHandle, SecureClientHandle};
@ -24,19 +23,25 @@ use system_conf;
pub struct ResolverFuture {
config: ResolverConfig,
options: ResolverOpts,
pool: NameServerPool,
lru: Arc<Mutex<DnsLru>>,
client_cache: DnsLru<LookupIpEither>,
}
impl ResolverFuture {
/// Construct a new ResolverFuture with the associated Client.
pub fn new(config: ResolverConfig, options: ResolverOpts, reactor: &Handle) -> Self {
let pool = NameServerPool::from_config(&config, &options, reactor);
let either;
let client = RetryClientHandle::new(pool.clone(), options.attempts);
if options.validate {
either = LookupIpEither::Secure(SecureClientHandle::new(client));
} else {
either = LookupIpEither::Retry(client);
}
ResolverFuture {
config,
options,
pool,
lru: Arc::new(Mutex::new(DnsLru::new(options.cache_size))),
client_cache: DnsLru::new(options.cache_size, either),
}
}
@ -61,8 +66,7 @@ impl ResolverFuture {
let name = match Name::from_str(host) {
Ok(name) => name,
Err(err) => {
let client = RetryClientHandle::new(self.pool.clone(), self.options.attempts);
return InnerLookupIpFuture::error(LookupIpEither::Retry(client), err);
return InnerLookupIpFuture::error(self.client_cache.clone(), err);
}
};
@ -93,21 +97,10 @@ impl ResolverFuture {
names
};
// TODO: consider removing this clone?
// create the lookup
let mut either;
let client = RetryClientHandle::new(self.pool.clone(), self.options.attempts);
if self.options.validate {
either = LookupIpEither::Secure(SecureClientHandle::new(client));
} else {
either = LookupIpEither::Retry(client);
}
LookupIpFuture::lookup(
names,
self.options.ip_strategy,
&mut either,
self.lru.clone(),
&mut self.client_cache.clone(),
)
}
@ -116,6 +109,11 @@ impl ResolverFuture {
names.push(name);
}
}
// TODO: generic lookup
// pub fn lookup(&mut self, host: &str) -> Lookup {
// }
}