no more references to Signer/MessageSigner in proto
This commit is contained in:
parent
33ab18a8a2
commit
e8257f7ebd
@ -18,6 +18,7 @@ This project adheres to [Semantic Versioning](http://semver.org/).
|
||||
- Large refactoring of internal APIs to more cleanly support \*ring\* and OpenSSL features (@briansmith)
|
||||
- `ClientHandle::send` moved to `trust_dns_proto::DnsHandle::send` (internal API)
|
||||
- Many interfaces moved from `ClientStreamHandle` to `trust_dns_proto::DnsStreamHandle`
|
||||
- `Message::sign` has been renamed and change to the more general method `Message::finalize`
|
||||
|
||||
### Fixed
|
||||
|
||||
|
@ -33,9 +33,9 @@ test_script:
|
||||
- cargo test --manifest-path client/Cargo.toml --all-features
|
||||
- cargo test --manifest-path client/Cargo.toml --no-default-features
|
||||
- cargo test --manifest-path client/Cargo.toml
|
||||
- cargo test --manifest-path client/Cargo.toml --no-default-features --features=openssl
|
||||
- cargo test --manifest-path client/Cargo.toml --no-default-features --features=dnssec-openssl
|
||||
- cargo test --manifest-path client/Cargo.toml --no-default-features
|
||||
- cargo test --manifest-path client/Cargo.toml --no-default-features --features=ring
|
||||
- cargo test --manifest-path client/Cargo.toml --no-default-features --features=dnssec-ring
|
||||
|
||||
- cargo test --manifest-path rustls/Cargo.toml
|
||||
- cargo test --manifest-path openssl/Cargo.toml
|
||||
|
@ -6,6 +6,7 @@
|
||||
// copied, modified, or distributed except according to those terms.
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::marker::PhantomData;
|
||||
use std::io;
|
||||
use std::time::Duration;
|
||||
|
||||
@ -52,16 +53,17 @@ impl ClientStreamHandle for StreamHandle {
|
||||
/// implementations.
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
pub struct ClientFuture<S: Stream<Item = Vec<u8>, Error = io::Error>> {
|
||||
stream: S,
|
||||
reactor_handle: Handle,
|
||||
timeout_duration: Duration,
|
||||
// TODO genericize and remove this Box
|
||||
stream_handle: Box<ClientStreamHandle>,
|
||||
new_receiver:
|
||||
Peekable<StreamFuse<UnboundedReceiver<(Message, Complete<ClientResult<Message>>)>>>,
|
||||
active_requests: HashMap<u16, (Complete<ClientResult<Message>>, Timeout)>,
|
||||
// TODO: Maybe make a typed version of ClientFuture for Updates?
|
||||
signer: Option<Signer>,
|
||||
phantom: PhantomData<S>,
|
||||
// stream: S,
|
||||
// reactor_handle: Handle,
|
||||
// timeout_duration: Duration,
|
||||
// // TODO genericize and remove this Box
|
||||
// stream_handle: Box<ClientStreamHandle>,
|
||||
// new_receiver:
|
||||
// Peekable<StreamFuse<UnboundedReceiver<(Message, Complete<ClientResult<Message>>)>>>,
|
||||
// active_requests: HashMap<u16, (Complete<ClientResult<Message>>, Timeout)>,
|
||||
// // TODO: Maybe make a typed version of ClientFuture for Updates?
|
||||
// signer: Option<Signer>,
|
||||
}
|
||||
|
||||
impl<S: Stream<Item = Vec<u8>, Error = io::Error> + 'static> ClientFuture<S> {
|
||||
@ -103,16 +105,21 @@ impl<S: Stream<Item = Vec<u8>, Error = io::Error> + 'static> ClientFuture<S> {
|
||||
/// * `timeout_duration` - All requests may fail due to lack of response, this is the time to
|
||||
/// wait for a response before canceling the request.
|
||||
/// * `stream_handle` - The handle for the `stream` on which bytes can be sent/received.
|
||||
/// * `signer` - An optional signer for requests, needed for Updates with Sig0, otherwise not needed
|
||||
/// * `finalizer` - An optional signer for requests, needed for Updates with Sig0, otherwise not needed
|
||||
pub fn with_timeout(
|
||||
stream: Box<Future<Item = S, Error = io::Error>>,
|
||||
stream_handle: Box<DnsStreamHandle>,
|
||||
loop_handle: &Handle,
|
||||
timeout_duration: Duration,
|
||||
signer: Option<Signer>,
|
||||
finalizer: Option<Signer>,
|
||||
) -> BasicClientHandle {
|
||||
let dns_future =
|
||||
DnsFuture::with_timeout(stream, stream_handle, loop_handle, timeout_duration, signer);
|
||||
let dns_future_handle = DnsFuture::with_timeout(
|
||||
stream,
|
||||
stream_handle,
|
||||
loop_handle,
|
||||
timeout_duration,
|
||||
finalizer,
|
||||
);
|
||||
|
||||
// let (sender, rx) = unbounded();
|
||||
|
||||
@ -143,256 +150,256 @@ impl<S: Stream<Item = Vec<u8>, Error = io::Error> + 'static> ClientFuture<S> {
|
||||
// }),
|
||||
// );
|
||||
|
||||
BasicClientHandle { message_sender: dns_future }
|
||||
BasicClientHandle { message_sender: dns_future_handle }
|
||||
}
|
||||
|
||||
/// loop over active_requests and remove cancelled requests
|
||||
/// this should free up space if we already had 4096 active requests
|
||||
fn drop_cancelled(&mut self) {
|
||||
// TODO: should we have a timeout here? or always expect the caller to do this?
|
||||
let mut canceled = HashSet::new();
|
||||
for (&id, &mut (ref mut req, ref mut timeout)) in self.active_requests.iter_mut() {
|
||||
if let Ok(Async::Ready(())) = req.poll_cancel() {
|
||||
canceled.insert(id);
|
||||
}
|
||||
// /// loop over active_requests and remove cancelled requests
|
||||
// /// this should free up space if we already had 4096 active requests
|
||||
// fn drop_cancelled(&mut self) {
|
||||
// // TODO: should we have a timeout here? or always expect the caller to do this?
|
||||
// let mut canceled = HashSet::new();
|
||||
// for (&id, &mut (ref mut req, ref mut timeout)) in self.active_requests.iter_mut() {
|
||||
// if let Ok(Async::Ready(())) = req.poll_cancel() {
|
||||
// canceled.insert(id);
|
||||
// }
|
||||
|
||||
// check for timeouts...
|
||||
match timeout.poll() {
|
||||
Ok(Async::Ready(_)) => {
|
||||
warn!("request timeout: {}", id);
|
||||
canceled.insert(id);
|
||||
}
|
||||
Ok(Async::NotReady) => (),
|
||||
Err(e) => {
|
||||
error!("unexpected error from timeout: {}", e);
|
||||
canceled.insert(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
// // check for timeouts...
|
||||
// match timeout.poll() {
|
||||
// Ok(Async::Ready(_)) => {
|
||||
// warn!("request timeout: {}", id);
|
||||
// canceled.insert(id);
|
||||
// }
|
||||
// Ok(Async::NotReady) => (),
|
||||
// Err(e) => {
|
||||
// error!("unexpected error from timeout: {}", e);
|
||||
// canceled.insert(id);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
// drop all the canceled requests
|
||||
for id in canceled {
|
||||
if let Some((req, _)) = self.active_requests.remove(&id) {
|
||||
// TODO, perhaps there is a different reason timeout? but there shouldn't be...
|
||||
// being lazy and always returning timeout in this case (if it was canceled then the
|
||||
// then the otherside isn't really paying attention anyway)
|
||||
// // drop all the canceled requests
|
||||
// for id in canceled {
|
||||
// if let Some((req, _)) = self.active_requests.remove(&id) {
|
||||
// // TODO, perhaps there is a different reason timeout? but there shouldn't be...
|
||||
// // being lazy and always returning timeout in this case (if it was canceled then the
|
||||
// // then the otherside isn't really paying attention anyway)
|
||||
|
||||
// complete the request, it's failed...
|
||||
req.send(Err(ClientErrorKind::Timeout.into())).expect(
|
||||
"error notifying wait, possible future leak",
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
// // complete the request, it's failed...
|
||||
// req.send(Err(ClientErrorKind::Timeout.into())).expect(
|
||||
// "error notifying wait, possible future leak",
|
||||
// );
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
/// creates random query_id, validates against all active queries
|
||||
fn next_random_query_id(&self) -> Async<u16> {
|
||||
let mut rand = rand::thread_rng();
|
||||
// /// creates random query_id, validates against all active queries
|
||||
// fn next_random_query_id(&self) -> Async<u16> {
|
||||
// let mut rand = rand::thread_rng();
|
||||
|
||||
for _ in 0..100 {
|
||||
let id = rand.gen_range(0_u16, u16::max_value());
|
||||
// for _ in 0..100 {
|
||||
// let id = rand.gen_range(0_u16, u16::max_value());
|
||||
|
||||
if !self.active_requests.contains_key(&id) {
|
||||
return Async::Ready(id);
|
||||
}
|
||||
}
|
||||
// if !self.active_requests.contains_key(&id) {
|
||||
// return Async::Ready(id);
|
||||
// }
|
||||
// }
|
||||
|
||||
warn!("could not get next random query id, delaying");
|
||||
task::current().notify();
|
||||
Async::NotReady
|
||||
}
|
||||
// warn!("could not get next random query id, delaying");
|
||||
// task::current().notify();
|
||||
// Async::NotReady
|
||||
// }
|
||||
}
|
||||
|
||||
impl<S: Stream<Item = Vec<u8>, Error = io::Error> + 'static> Future for ClientFuture<S> {
|
||||
type Item = ();
|
||||
type Error = ClientError;
|
||||
// impl<S: Stream<Item = Vec<u8>, Error = io::Error> + 'static> Future for ClientFuture<S> {
|
||||
// type Item = ();
|
||||
// type Error = ClientError;
|
||||
|
||||
fn poll(&mut self) -> Poll<(), Self::Error> {
|
||||
self.drop_cancelled();
|
||||
// fn poll(&mut self) -> Poll<(), Self::Error> {
|
||||
// self.drop_cancelled();
|
||||
|
||||
// loop over new_receiver for all outbound requests
|
||||
loop {
|
||||
// get next query_id
|
||||
let query_id: Option<u16> = match self.new_receiver.peek() {
|
||||
Ok(Async::Ready(Some(_))) => {
|
||||
debug!("got message from receiver");
|
||||
// // loop over new_receiver for all outbound requests
|
||||
// loop {
|
||||
// // get next query_id
|
||||
// let query_id: Option<u16> = match self.new_receiver.peek() {
|
||||
// Ok(Async::Ready(Some(_))) => {
|
||||
// debug!("got message from receiver");
|
||||
|
||||
// we have a new message to send
|
||||
match self.next_random_query_id() {
|
||||
Async::Ready(id) => Some(id),
|
||||
Async::NotReady => break,
|
||||
}
|
||||
}
|
||||
Ok(_) => None,
|
||||
Err(()) => {
|
||||
warn!("receiver was shutdown?");
|
||||
break
|
||||
}
|
||||
};
|
||||
// // we have a new message to send
|
||||
// match self.next_random_query_id() {
|
||||
// Async::Ready(id) => Some(id),
|
||||
// Async::NotReady => break,
|
||||
// }
|
||||
// }
|
||||
// Ok(_) => None,
|
||||
// Err(()) => {
|
||||
// warn!("receiver was shutdown?");
|
||||
// break
|
||||
// }
|
||||
// };
|
||||
|
||||
// finally pop the reciever
|
||||
match self.new_receiver.poll() {
|
||||
Ok(Async::Ready(Some((mut message, complete)))) => {
|
||||
// if there was a message, and the above succesion was succesful,
|
||||
// register the new message, if not do not register, and set the complete to error.
|
||||
// getting a random query id, this mitigates potential cache poisoning.
|
||||
// TODO: for SIG0 we can't change the message id after signing.
|
||||
let query_id = query_id.expect("query_id should have been set above");
|
||||
message.set_id(query_id);
|
||||
// // finally pop the reciever
|
||||
// match self.new_receiver.poll() {
|
||||
// Ok(Async::Ready(Some((mut message, complete)))) => {
|
||||
// // if there was a message, and the above succesion was succesful,
|
||||
// // register the new message, if not do not register, and set the complete to error.
|
||||
// // getting a random query id, this mitigates potential cache poisoning.
|
||||
// // TODO: for SIG0 we can't change the message id after signing.
|
||||
// let query_id = query_id.expect("query_id should have been set above");
|
||||
// message.set_id(query_id);
|
||||
|
||||
// update messages need to be signed.
|
||||
if let OpCode::Update = message.op_code() {
|
||||
if let Some(ref signer) = self.signer {
|
||||
// TODO: it's too bad this happens here...
|
||||
if let Err(e) = message.sign(signer, Utc::now().timestamp() as u32) {
|
||||
warn!("could not sign message: {}", e);
|
||||
complete.send(Err(e.into())).expect(
|
||||
"error notifying wait, possible future leak",
|
||||
);
|
||||
continue; // to the next message...
|
||||
}
|
||||
}
|
||||
}
|
||||
// // update messages need to be signed.
|
||||
// if let OpCode::Update = message.op_code() {
|
||||
// if let Some(ref signer) = self.signer {
|
||||
// // TODO: it's too bad this happens here...
|
||||
// if let Err(e) = message.sign(signer, Utc::now().timestamp() as u32) {
|
||||
// warn!("could not sign message: {}", e);
|
||||
// complete.send(Err(e.into())).expect(
|
||||
// "error notifying wait, possible future leak",
|
||||
// );
|
||||
// continue; // to the next message...
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
// store a Timeout for this message before sending
|
||||
let timeout = match Timeout::new(self.timeout_duration, &self.reactor_handle) {
|
||||
Ok(timeout) => timeout,
|
||||
Err(e) => {
|
||||
warn!("could not create timer: {}", e);
|
||||
complete.send(Err(e.into())).expect(
|
||||
"error notifying wait, possible future leak",
|
||||
);
|
||||
continue; // to the next message...
|
||||
}
|
||||
};
|
||||
// // store a Timeout for this message before sending
|
||||
// let timeout = match Timeout::new(self.timeout_duration, &self.reactor_handle) {
|
||||
// Ok(timeout) => timeout,
|
||||
// Err(e) => {
|
||||
// warn!("could not create timer: {}", e);
|
||||
// complete.send(Err(e.into())).expect(
|
||||
// "error notifying wait, possible future leak",
|
||||
// );
|
||||
// continue; // to the next message...
|
||||
// }
|
||||
// };
|
||||
|
||||
// send the message
|
||||
match message.to_vec() {
|
||||
Ok(buffer) => {
|
||||
debug!("sending message id: {}", query_id);
|
||||
try!(self.stream_handle.send(buffer));
|
||||
// add to the map -after- the client send b/c we don't want to put it in the map if
|
||||
// we ended up returning from the send.
|
||||
self.active_requests.insert(
|
||||
message.id(),
|
||||
(complete, timeout),
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("error message id: {} error: {}", query_id, e);
|
||||
// complete with the error, don't add to the map of active requests
|
||||
complete.send(Err(e.into())).expect(
|
||||
"error notifying wait, possible future leak",
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(_) => break,
|
||||
Err(()) => {
|
||||
warn!("receiver was shutdown?");
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
// // send the message
|
||||
// match message.to_vec() {
|
||||
// Ok(buffer) => {
|
||||
// debug!("sending message id: {}", query_id);
|
||||
// try!(self.stream_handle.send(buffer));
|
||||
// // add to the map -after- the client send b/c we don't want to put it in the map if
|
||||
// // we ended up returning from the send.
|
||||
// self.active_requests.insert(
|
||||
// message.id(),
|
||||
// (complete, timeout),
|
||||
// );
|
||||
// }
|
||||
// Err(e) => {
|
||||
// debug!("error message id: {} error: {}", query_id, e);
|
||||
// // complete with the error, don't add to the map of active requests
|
||||
// complete.send(Err(e.into())).expect(
|
||||
// "error notifying wait, possible future leak",
|
||||
// );
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// Ok(_) => break,
|
||||
// Err(()) => {
|
||||
// warn!("receiver was shutdown?");
|
||||
// break
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
// Collect all inbound requests, max 100 at a time for QoS
|
||||
// by having a max we will guarantee that the client can't be DOSed in this loop
|
||||
// TODO: make the QoS configurable
|
||||
let mut messages_received = 0;
|
||||
for i in 0..QOS_MAX_RECEIVE_MSGS {
|
||||
match try!(self.stream.poll()) {
|
||||
Async::Ready(Some(buffer)) => {
|
||||
messages_received = i;
|
||||
// // Collect all inbound requests, max 100 at a time for QoS
|
||||
// // by having a max we will guarantee that the client can't be DOSed in this loop
|
||||
// // TODO: make the QoS configurable
|
||||
// let mut messages_received = 0;
|
||||
// for i in 0..QOS_MAX_RECEIVE_MSGS {
|
||||
// match try!(self.stream.poll()) {
|
||||
// Async::Ready(Some(buffer)) => {
|
||||
// messages_received = i;
|
||||
|
||||
// deserialize or log decode_error
|
||||
match Message::from_vec(&buffer) {
|
||||
Ok(message) => {
|
||||
match self.active_requests.remove(&message.id()) {
|
||||
Some((complete, _)) => {
|
||||
complete.send(Ok(message)).expect(
|
||||
"error notifying wait, possible future leak",
|
||||
)
|
||||
}
|
||||
None => debug!("unexpected request_id: {}", message.id()),
|
||||
}
|
||||
}
|
||||
// TODO: return src address for diagnostics
|
||||
Err(e) => debug!("error decoding message: {}", e),
|
||||
}
|
||||
// // deserialize or log decode_error
|
||||
// match Message::from_vec(&buffer) {
|
||||
// Ok(message) => {
|
||||
// match self.active_requests.remove(&message.id()) {
|
||||
// Some((complete, _)) => {
|
||||
// complete.send(Ok(message)).expect(
|
||||
// "error notifying wait, possible future leak",
|
||||
// )
|
||||
// }
|
||||
// None => debug!("unexpected request_id: {}", message.id()),
|
||||
// }
|
||||
// }
|
||||
// // TODO: return src address for diagnostics
|
||||
// Err(e) => debug!("error decoding message: {}", e),
|
||||
// }
|
||||
|
||||
}
|
||||
Async::Ready(None) |
|
||||
Async::NotReady => break,
|
||||
}
|
||||
}
|
||||
// }
|
||||
// Async::Ready(None) |
|
||||
// Async::NotReady => break,
|
||||
// }
|
||||
// }
|
||||
|
||||
// Clean shutdown happens when all pending requests are done and the
|
||||
// incoming channel has been closed (e.g. you'll never receive another
|
||||
// request). try! will early return the error...
|
||||
let done = if let Async::Ready(None) = try!(self.new_receiver.peek()) {
|
||||
true
|
||||
} else {
|
||||
false
|
||||
};
|
||||
if self.active_requests.is_empty() && done {
|
||||
return Ok(().into()); // we are done
|
||||
}
|
||||
// // Clean shutdown happens when all pending requests are done and the
|
||||
// // incoming channel has been closed (e.g. you'll never receive another
|
||||
// // request). try! will early return the error...
|
||||
// let done = if let Async::Ready(None) = try!(self.new_receiver.peek()) {
|
||||
// true
|
||||
// } else {
|
||||
// false
|
||||
// };
|
||||
// if self.active_requests.is_empty() && done {
|
||||
// return Ok(().into()); // we are done
|
||||
// }
|
||||
|
||||
// If still active, then if the qos (for _ in 0..100 loop) limit
|
||||
// was hit then "yield". This'll make sure that the future is
|
||||
// woken up immediately on the next turn of the event loop.
|
||||
if messages_received == QOS_MAX_RECEIVE_MSGS {
|
||||
task::current().notify();
|
||||
}
|
||||
// // If still active, then if the qos (for _ in 0..100 loop) limit
|
||||
// // was hit then "yield". This'll make sure that the future is
|
||||
// // woken up immediately on the next turn of the event loop.
|
||||
// if messages_received == QOS_MAX_RECEIVE_MSGS {
|
||||
// task::current().notify();
|
||||
// }
|
||||
|
||||
// Finally, return not ready to keep the 'driver task' alive.
|
||||
return Ok(Async::NotReady);
|
||||
}
|
||||
}
|
||||
// // Finally, return not ready to keep the 'driver task' alive.
|
||||
// return Ok(Async::NotReady);
|
||||
// }
|
||||
// }
|
||||
|
||||
/// Always returns the specified io::Error to the remote Sender
|
||||
struct ClientStreamErrored {
|
||||
error: io::Error,
|
||||
new_receiver:
|
||||
Peekable<StreamFuse<UnboundedReceiver<(Message, Complete<ClientResult<Message>>)>>>,
|
||||
}
|
||||
// /// Always returns the specified io::Error to the remote Sender
|
||||
// struct ClientStreamErrored {
|
||||
// error: io::Error,
|
||||
// new_receiver:
|
||||
// Peekable<StreamFuse<UnboundedReceiver<(Message, Complete<ClientResult<Message>>)>>>,
|
||||
// }
|
||||
|
||||
impl Future for ClientStreamErrored {
|
||||
type Item = ();
|
||||
type Error = ClientError;
|
||||
// impl Future for ClientStreamErrored {
|
||||
// type Item = ();
|
||||
// type Error = ClientError;
|
||||
|
||||
fn poll(&mut self) -> Poll<(), Self::Error> {
|
||||
match self.new_receiver.poll() {
|
||||
Ok(Async::Ready(Some((_, complete)))) => {
|
||||
complete
|
||||
.send(Err(ClientError::from(&self.error).clone()))
|
||||
.expect("error notifying wait, possible future leak");
|
||||
// fn poll(&mut self) -> Poll<(), Self::Error> {
|
||||
// match self.new_receiver.poll() {
|
||||
// Ok(Async::Ready(Some((_, complete)))) => {
|
||||
// complete
|
||||
// .send(Err(ClientError::from(&self.error).clone()))
|
||||
// .expect("error notifying wait, possible future leak");
|
||||
|
||||
task::current().notify();
|
||||
return Ok(Async::NotReady);
|
||||
}
|
||||
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
|
||||
_ => return Err(ClientErrorKind::NoError.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
// task::current().notify();
|
||||
// return Ok(Async::NotReady);
|
||||
// }
|
||||
// Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
|
||||
// _ => return Err(ClientErrorKind::NoError.into()),
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
enum ClientStreamOrError<S: Stream<Item = Vec<u8>, Error = io::Error> + 'static> {
|
||||
Future(ClientFuture<S>),
|
||||
Errored(ClientStreamErrored),
|
||||
}
|
||||
// enum ClientStreamOrError<S: Stream<Item = Vec<u8>, Error = io::Error> + 'static> {
|
||||
// Future(ClientFuture<S>),
|
||||
// Errored(ClientStreamErrored),
|
||||
// }
|
||||
|
||||
impl<S: Stream<Item = Vec<u8>, Error = io::Error> + 'static> Future for ClientStreamOrError<S> {
|
||||
type Item = ();
|
||||
type Error = ClientError;
|
||||
// impl<S: Stream<Item = Vec<u8>, Error = io::Error> + 'static> Future for ClientStreamOrError<S> {
|
||||
// type Item = ();
|
||||
// type Error = ClientError;
|
||||
|
||||
fn poll(&mut self) -> Poll<(), Self::Error> {
|
||||
match *self {
|
||||
ClientStreamOrError::Future(ref mut f) => f.poll(),
|
||||
ClientStreamOrError::Errored(ref mut e) => e.poll(),
|
||||
}
|
||||
}
|
||||
}
|
||||
// fn poll(&mut self) -> Poll<(), Self::Error> {
|
||||
// match *self {
|
||||
// ClientStreamOrError::Future(ref mut f) => f.poll(),
|
||||
// ClientStreamOrError::Errored(ref mut e) => e.poll(),
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
|
||||
/// Root ClientHandle implementaton returned by ClientFuture
|
||||
@ -400,6 +407,7 @@ impl<S: Stream<Item = Vec<u8>, Error = io::Error> + 'static> Future for ClientSt
|
||||
/// This can be used directly to perform queries. See `trust_dns::client::SecureClientHandle` for
|
||||
/// a DNSSEc chain validator.
|
||||
#[derive(Clone)]
|
||||
#[deprecated(note = "See [`BasicDnsHandle`]")]
|
||||
pub struct BasicClientHandle {
|
||||
message_sender: BasicDnsHandle,
|
||||
}
|
||||
|
@ -1,21 +1,15 @@
|
||||
/*
|
||||
* Copyright (C) 2015 Benjamin Fry <benjaminfry@me.com>
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
// Copyright 2015-2017 Benjamin Fry <benjaminfry@me.com>
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
|
||||
// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
|
||||
// http://opensource.org/licenses/MIT>, at your option. This file may not be
|
||||
// copied, modified, or distributed except according to those terms.
|
||||
|
||||
//! Operations to send with a `Client` or server, e.g. `Query`, `Message`, or `UpdateMessage` can
|
||||
//! be used to gether to either query or update resource records sets.
|
||||
|
||||
pub use trust_dns_proto::op::{Edns, Header, Message, MessageType, UpdateMessage, OpCode, Query,
|
||||
mod update_message;
|
||||
|
||||
pub use self::update_message::UpdateMessage;
|
||||
pub use trust_dns_proto::op::{Edns, Header, Message, MessageFinalizer, MessageType, OpCode, Query,
|
||||
ResponseCode};
|
||||
|
155
client/src/op/update_message.rs
Normal file
155
client/src/op/update_message.rs
Normal file
@ -0,0 +1,155 @@
|
||||
// Copyright 2015-2017 Benjamin Fry <benjaminfry@me.com>
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
|
||||
// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
|
||||
// http://opensource.org/licenses/MIT>, at your option. This file may not be
|
||||
// copied, modified, or distributed except according to those terms.
|
||||
|
||||
use std::fmt::Debug;
|
||||
use std::mem;
|
||||
|
||||
use trust_dns_proto::error::ProtoResult;
|
||||
|
||||
use error::*;
|
||||
use rr::{Record, RecordType};
|
||||
#[cfg(feature = "openssl")]
|
||||
use rr::{DNSClass, Name, RData};
|
||||
#[cfg(feature = "openssl")]
|
||||
use rr::rdata::SIG;
|
||||
use serialize::binary::{BinEncoder, BinDecoder, BinSerializable, EncodeMode};
|
||||
use super::{Message, MessageFinalizer, MessageType, Header, Query, Edns, OpCode, ResponseCode};
|
||||
|
||||
/// To reduce errors in using the Message struct as an Update, this will do the call throughs
|
||||
/// to properly do that.
|
||||
///
|
||||
/// Generally rather than constructin this by hand, see the update methods on `Client`
|
||||
pub trait UpdateMessage: Debug {
|
||||
/// see `Header::id`
|
||||
fn id(&self) -> u16;
|
||||
|
||||
/// Adds the zone section, i.e. name.example.com would be example.com
|
||||
fn add_zone(&mut self, query: Query);
|
||||
|
||||
/// Add the pre-requisite records
|
||||
///
|
||||
/// These must exist, or not, for the Update request to go through.
|
||||
fn add_pre_requisite(&mut self, record: Record);
|
||||
|
||||
/// Add all pre-requisites to the UpdateMessage
|
||||
#[deprecated = "will be removed post 0.9.x"]
|
||||
fn add_all_pre_requisites(&mut self, vector: &[&Record]);
|
||||
|
||||
/// Add all the Records from the Iterator to the pre-reqisites section
|
||||
fn add_pre_requisites<R, I>(&mut self, records: R)
|
||||
where
|
||||
R: IntoIterator<Item = Record, IntoIter = I>,
|
||||
I: Iterator<Item = Record>;
|
||||
|
||||
/// Add the Record to be updated
|
||||
fn add_update(&mut self, record: Record);
|
||||
|
||||
/// Add the set of Records to be updated
|
||||
#[deprecated = "will be removed post 0.9.x"]
|
||||
fn add_all_updates(&mut self, vector: &[&Record]);
|
||||
|
||||
/// Add the Records from the Iterator to the updates section
|
||||
fn add_updates<R, I>(&mut self, records: R)
|
||||
where
|
||||
R: IntoIterator<Item = Record, IntoIter = I>,
|
||||
I: Iterator<Item = Record>;
|
||||
|
||||
/// Add Records to the additional Section of hte UpdateMessage
|
||||
fn add_additional(&mut self, record: Record);
|
||||
|
||||
/// Returns the Zones to be updated, generally should only be one.
|
||||
fn zones(&self) -> &[Query];
|
||||
|
||||
/// Returns the pre-requisites
|
||||
fn prerequisites(&self) -> &[Record];
|
||||
|
||||
/// Returns the records to be updated
|
||||
fn updates(&self) -> &[Record];
|
||||
|
||||
/// Returns the additonal records
|
||||
fn additionals(&self) -> &[Record];
|
||||
|
||||
/// This is used to authenticate update messages.
|
||||
///
|
||||
/// see `Message::sig0()` for more information.
|
||||
fn sig0(&self) -> &[Record];
|
||||
|
||||
/// Finalize the message prior to sending.
|
||||
///
|
||||
/// Subsequent to calling this, the Message should not change.
|
||||
fn finalize<MF: MessageFinalizer>(
|
||||
&mut self,
|
||||
finalizer: &MF,
|
||||
inception_time: u32,
|
||||
) -> ProtoResult<()>;
|
||||
}
|
||||
|
||||
/// to reduce errors in using the Message struct as an Update, this will do the call throughs
|
||||
/// to properly do that.
|
||||
impl UpdateMessage for Message {
|
||||
fn id(&self) -> u16 {
|
||||
self.id()
|
||||
}
|
||||
fn add_zone(&mut self, query: Query) {
|
||||
self.add_query(query);
|
||||
}
|
||||
fn add_pre_requisite(&mut self, record: Record) {
|
||||
self.add_answer(record);
|
||||
}
|
||||
fn add_all_pre_requisites(&mut self, vector: &[&Record]) {
|
||||
self.add_answers(vector.into_iter().map(|r| (*r).clone()));
|
||||
}
|
||||
fn add_pre_requisites<R, I>(&mut self, records: R)
|
||||
where
|
||||
R: IntoIterator<Item = Record, IntoIter = I>,
|
||||
I: Iterator<Item = Record>,
|
||||
{
|
||||
self.add_answers(records);
|
||||
}
|
||||
fn add_update(&mut self, record: Record) {
|
||||
self.add_name_server(record);
|
||||
}
|
||||
fn add_all_updates(&mut self, vector: &[&Record]) {
|
||||
self.add_name_servers(vector.into_iter().map(|r| (*r).clone()));
|
||||
}
|
||||
fn add_updates<R, I>(&mut self, records: R)
|
||||
where
|
||||
R: IntoIterator<Item = Record, IntoIter = I>,
|
||||
I: Iterator<Item = Record>,
|
||||
{
|
||||
self.add_name_servers(records);
|
||||
}
|
||||
fn add_additional(&mut self, record: Record) {
|
||||
self.add_additional(record);
|
||||
}
|
||||
|
||||
fn zones(&self) -> &[Query] {
|
||||
self.queries()
|
||||
}
|
||||
fn prerequisites(&self) -> &[Record] {
|
||||
self.answers()
|
||||
}
|
||||
fn updates(&self) -> &[Record] {
|
||||
self.name_servers()
|
||||
}
|
||||
fn additionals(&self) -> &[Record] {
|
||||
self.additionals()
|
||||
}
|
||||
|
||||
fn sig0(&self) -> &[Record] {
|
||||
self.sig0()
|
||||
}
|
||||
|
||||
// TODO: where's the 'right' spot for this function
|
||||
fn finalize<MF: MessageFinalizer>(
|
||||
&mut self,
|
||||
finalizer: &MF,
|
||||
inception_time: u32,
|
||||
) -> ProtoResult<()> {
|
||||
self.finalize(finalizer, inception_time)
|
||||
}
|
||||
}
|
@ -28,7 +28,6 @@ pub use self::dnssec::DigestType;
|
||||
#[cfg(any(feature = "openssl", feature = "ring"))]
|
||||
pub use self::key_format::KeyFormat;
|
||||
pub use self::keypair::KeyPair;
|
||||
pub use self::dnssec::MessageSigner;
|
||||
pub use self::dnssec::Nsec3HashAlgorithm;
|
||||
pub use self::dnssec::PublicKey;
|
||||
pub use self::dnssec::PublicKeyBuf;
|
||||
|
@ -18,10 +18,10 @@
|
||||
#[cfg(any(feature = "openssl", feature = "ring"))]
|
||||
use chrono::Duration;
|
||||
use trust_dns_proto::error::{ProtoResult, ProtoErrorKind};
|
||||
use trust_dns_proto::rr::dnssec::{tbs, TBS, MessageSigner};
|
||||
use trust_dns_proto::rr::dnssec::{tbs, TBS};
|
||||
|
||||
use op::Message;
|
||||
use rr::Name;
|
||||
use op::{Message, MessageFinalizer};
|
||||
use rr::{DNSClass, Name, Record, RecordType};
|
||||
#[cfg(any(feature = "openssl", feature = "ring"))]
|
||||
use rr::RData;
|
||||
#[cfg(any(feature = "openssl", feature = "ring"))]
|
||||
@ -33,17 +33,6 @@ use rr::rdata::{DNSKEY, KEY};
|
||||
#[cfg(any(feature = "openssl", feature = "ring"))]
|
||||
use serialize::binary::BinEncoder;
|
||||
|
||||
/// FIXME: get docs from Signer in Client
|
||||
// pub trait MessageSigner {
|
||||
// /// FIXME: get docs from Signer in Client
|
||||
// fn algorithm(&self) -> Algorithm;
|
||||
// /// FIXME: get docs from Signer in Client
|
||||
// fn calculate_key_tag(&self) -> ProtoResult<u16>;
|
||||
// /// FIXME: get docs from Signer in Client
|
||||
// fn signer_name(&self) -> &Name;
|
||||
// /// FIXME: get docs from Signer in Client
|
||||
// fn sign_message(&self, message: &Message, pre_sig0: &SIG) -> ProtoResult<Vec<u8>>;
|
||||
// }
|
||||
/// Use for performing signing and validation of DNSSec based components.
|
||||
///
|
||||
/// TODO: warning this struct and it's impl are under high volatility, expect breaking changes
|
||||
@ -383,10 +372,7 @@ impl Signer {
|
||||
ProtoErrorKind::Msg(format!("signing error: {}", e)).into()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(any(feature = "openssl", feature = "ring"))]
|
||||
impl MessageSigner for Signer {
|
||||
/// Returns the algorithm this Signer will use to either sign or validate a signature
|
||||
fn algorithm(&self) -> Algorithm {
|
||||
self.algorithm
|
||||
@ -522,6 +508,54 @@ impl MessageSigner for Signer {
|
||||
}
|
||||
}
|
||||
|
||||
impl MessageFinalizer for Signer {
|
||||
fn finalize_message(&self, message: &Message, current_time: u32) -> ProtoResult<Vec<Record>> {
|
||||
debug!("signing message: {:?}", message);
|
||||
let key_tag: u16 = try!(self.calculate_key_tag());
|
||||
|
||||
// this is based on RFCs 2535, 2931 and 3007
|
||||
|
||||
// 'For all SIG(0) RRs, the owner name, class, TTL, and original TTL, are
|
||||
// meaningless.' - 2931
|
||||
let mut sig0 = Record::new();
|
||||
|
||||
// The TTL fields SHOULD be zero
|
||||
sig0.set_ttl(0);
|
||||
|
||||
// The CLASS field SHOULD be ANY
|
||||
sig0.set_dns_class(DNSClass::ANY);
|
||||
|
||||
// The owner name SHOULD be root (a single zero octet).
|
||||
sig0.set_name(Name::root());
|
||||
let num_labels = sig0.name().num_labels();
|
||||
|
||||
let expiration_time: u32 = current_time + (5 * 60); // +5 minutes in seconds
|
||||
|
||||
sig0.set_rr_type(RecordType::SIG);
|
||||
let pre_sig0 = SIG::new(
|
||||
// type covered in SIG(0) is 0 which is what makes this SIG0 vs a standard SIG
|
||||
RecordType::NULL,
|
||||
self.algorithm(),
|
||||
num_labels,
|
||||
// see above, original_ttl is meaningless, The TTL fields SHOULD be zero
|
||||
0,
|
||||
// recommended time is +5 minutes from now, to prevent timing attacks, 2 is probably good
|
||||
expiration_time,
|
||||
// current time, this should be UTC
|
||||
// unsigned numbers of seconds since the start of 1 January 1970, GMT
|
||||
current_time,
|
||||
key_tag,
|
||||
// can probably get rid of this clone if the owndership is correct
|
||||
self.signer_name().clone(),
|
||||
Vec::new(),
|
||||
);
|
||||
let signature: Vec<u8> = try!(self.sign_message(message, &pre_sig0));
|
||||
sig0.set_rdata(RData::SIG(pre_sig0.set_sig(signature)));
|
||||
|
||||
Ok(vec![sig0])
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(any(feature = "openssl", feature = "ring")))]
|
||||
impl MessageSigner for Signer {
|
||||
/// Always panics!
|
||||
@ -552,7 +586,7 @@ mod tests {
|
||||
use rr::{DNSClass, Name, Record, RecordType};
|
||||
use rr::rdata::SIG;
|
||||
use rr::rdata::key::KeyUsage;
|
||||
use rr::dnssec::{PublicKey, PublicKeyEnum, Verifier};
|
||||
use rr::dnssec::*;
|
||||
use op::{Message, Query, UpdateMessage};
|
||||
|
||||
pub use super::*;
|
||||
@ -600,7 +634,7 @@ mod tests {
|
||||
|
||||
// now test that the sig0 record works correctly.
|
||||
assert!(question.sig0().is_empty());
|
||||
question.sign(&signer, 0).expect("should have signed");
|
||||
question.finalize(&signer, 0).expect("should have signed");
|
||||
assert!(!question.sig0().is_empty());
|
||||
|
||||
let sig = signer.sign_message(&question, &pre_sig0);
|
||||
@ -755,4 +789,107 @@ MC0CAQACBQC+L6pNAgMBAAECBQCYj0ZNAgMA9CsCAwDHZwICeEUCAnE/AgMA3u0=
|
||||
|
||||
assert_eq!(key_tag, 28551);
|
||||
}
|
||||
|
||||
// TODO: these tests technically came from TBS in trust_dns_proto
|
||||
#[cfg(feature = "openssl")]
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
extern crate openssl;
|
||||
use self::openssl::rsa::Rsa;
|
||||
|
||||
use rr::*;
|
||||
use rr::rdata::SIG;
|
||||
use rr::dnssec::*;
|
||||
use rr::dnssec::tbs::*;
|
||||
|
||||
#[test]
|
||||
fn test_rrset_tbs() {
|
||||
let rsa = Rsa::generate(2048).unwrap();
|
||||
let key = KeyPair::from_rsa(rsa).unwrap();
|
||||
let sig0key = key.to_sig0key(Algorithm::RSASHA256).unwrap();
|
||||
let signer = Signer::sig0(sig0key, key, Name::root());
|
||||
|
||||
let origin: Name = Name::parse("example.com.", None).unwrap();
|
||||
let rrsig = Record::new()
|
||||
.set_name(origin.clone())
|
||||
.set_ttl(86400)
|
||||
.set_rr_type(RecordType::NS)
|
||||
.set_dns_class(DNSClass::IN)
|
||||
.set_rdata(RData::SIG(SIG::new(
|
||||
RecordType::NS,
|
||||
Algorithm::RSASHA256,
|
||||
origin.num_labels(),
|
||||
86400,
|
||||
5,
|
||||
0,
|
||||
signer.calculate_key_tag().unwrap(),
|
||||
origin.clone(),
|
||||
vec![],
|
||||
)))
|
||||
.clone();
|
||||
let rrset = vec![
|
||||
Record::new()
|
||||
.set_name(origin.clone())
|
||||
.set_ttl(86400)
|
||||
.set_rr_type(RecordType::NS)
|
||||
.set_dns_class(DNSClass::IN)
|
||||
.set_rdata(RData::NS(Name::parse("a.iana-servers.net.", None).unwrap()))
|
||||
.clone(),
|
||||
Record::new()
|
||||
.set_name(origin.clone())
|
||||
.set_ttl(86400)
|
||||
.set_rr_type(RecordType::NS)
|
||||
.set_dns_class(DNSClass::IN)
|
||||
.set_rdata(RData::NS(Name::parse("b.iana-servers.net.", None).unwrap()))
|
||||
.clone(),
|
||||
];
|
||||
|
||||
let tbs = rrset_tbs_with_rrsig(&rrsig, &rrset).unwrap();
|
||||
assert!(!tbs.as_ref().is_empty());
|
||||
|
||||
let rrset = vec![
|
||||
Record::new()
|
||||
.set_name(origin.clone())
|
||||
.set_ttl(86400)
|
||||
.set_rr_type(RecordType::CNAME)
|
||||
.set_dns_class(DNSClass::IN)
|
||||
.set_rdata(RData::CNAME(
|
||||
Name::parse("a.iana-servers.net.", None).unwrap(),
|
||||
))
|
||||
.clone(), // different type
|
||||
Record::new()
|
||||
.set_name(Name::parse("www.example.com.", None).unwrap())
|
||||
.set_ttl(86400)
|
||||
.set_rr_type(RecordType::NS)
|
||||
.set_dns_class(DNSClass::IN)
|
||||
.set_rdata(RData::NS(Name::parse("a.iana-servers.net.", None).unwrap()))
|
||||
.clone(), // different name
|
||||
Record::new()
|
||||
.set_name(origin.clone())
|
||||
.set_ttl(86400)
|
||||
.set_rr_type(RecordType::NS)
|
||||
.set_dns_class(DNSClass::CH)
|
||||
.set_rdata(RData::NS(Name::parse("a.iana-servers.net.", None).unwrap()))
|
||||
.clone(), // different class
|
||||
Record::new()
|
||||
.set_name(origin.clone())
|
||||
.set_ttl(86400)
|
||||
.set_rr_type(RecordType::NS)
|
||||
.set_dns_class(DNSClass::IN)
|
||||
.set_rdata(RData::NS(Name::parse("a.iana-servers.net.", None).unwrap()))
|
||||
.clone(),
|
||||
Record::new()
|
||||
.set_name(origin.clone())
|
||||
.set_ttl(86400)
|
||||
.set_rr_type(RecordType::NS)
|
||||
.set_dns_class(DNSClass::IN)
|
||||
.set_rdata(RData::NS(Name::parse("b.iana-servers.net.", None).unwrap()))
|
||||
.clone(),
|
||||
];
|
||||
|
||||
let filtered_tbs = rrset_tbs_with_rrsig(&rrsig, &rrset).unwrap();
|
||||
assert!(!filtered_tbs.as_ref().is_empty());
|
||||
assert_eq!(tbs.as_ref(), filtered_tbs.as_ref());
|
||||
}
|
||||
}
|
||||
}
|
@ -20,8 +20,7 @@ use rand;
|
||||
use tokio_core::reactor::{Handle, Timeout};
|
||||
|
||||
use error::*;
|
||||
use op::{Message, MessageType, OpCode, Query, UpdateMessage};
|
||||
use rr::dnssec::MessageSigner;
|
||||
use op::{Message, MessageType, MessageFinalizer, OpCode, Query};
|
||||
use rr::{domain, DNSClass, IntoRecordSet, RData, Record, RecordType};
|
||||
use rr::rdata::NULL;
|
||||
|
||||
@ -49,7 +48,7 @@ impl DnsStreamHandle for StreamHandle {
|
||||
/// This Client is generic and capable of wrapping UDP, TCP, and other underlying DNS protocol
|
||||
/// implementations.
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
pub struct DnsFuture<S: Stream<Item = Vec<u8>, Error = io::Error>, MS: MessageSigner> {
|
||||
pub struct DnsFuture<S: Stream<Item = Vec<u8>, Error = io::Error>, MF: MessageFinalizer> {
|
||||
stream: S,
|
||||
reactor_handle: Handle,
|
||||
timeout_duration: Duration,
|
||||
@ -58,13 +57,13 @@ pub struct DnsFuture<S: Stream<Item = Vec<u8>, Error = io::Error>, MS: MessageSi
|
||||
new_receiver:
|
||||
Peekable<StreamFuse<UnboundedReceiver<(Message, Complete<ProtoResult<Message>>)>>>,
|
||||
active_requests: HashMap<u16, (Complete<ProtoResult<Message>>, Timeout)>,
|
||||
signer: Option<MS>,
|
||||
signer: Option<MF>,
|
||||
}
|
||||
|
||||
impl<S, MS> DnsFuture<S, MS>
|
||||
impl<S, MF> DnsFuture<S, MF>
|
||||
where
|
||||
S: Stream<Item = Vec<u8>, Error = io::Error> + 'static,
|
||||
MS: MessageSigner + 'static,
|
||||
MF: MessageFinalizer + 'static,
|
||||
{
|
||||
/// Spawns a new DnsFuture Stream. This uses a default timeout of 5 seconds for all requests.
|
||||
///
|
||||
@ -80,7 +79,7 @@ where
|
||||
stream: Box<Future<Item = S, Error = io::Error>>,
|
||||
stream_handle: Box<DnsStreamHandle>,
|
||||
loop_handle: &Handle,
|
||||
signer: Option<MS>,
|
||||
signer: Option<MF>,
|
||||
) -> BasicDnsHandle {
|
||||
Self::with_timeout(
|
||||
stream,
|
||||
@ -108,7 +107,7 @@ where
|
||||
stream_handle: Box<DnsStreamHandle>,
|
||||
loop_handle: &Handle,
|
||||
timeout_duration: Duration,
|
||||
signer: Option<MS>,
|
||||
signer: Option<MF>,
|
||||
) -> BasicDnsHandle {
|
||||
let (sender, rx) = unbounded();
|
||||
|
||||
@ -204,10 +203,10 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, MS> Future for DnsFuture<S, MS>
|
||||
impl<S, MF> Future for DnsFuture<S, MF>
|
||||
where
|
||||
S: Stream<Item = Vec<u8>, Error = io::Error> + 'static,
|
||||
MS: MessageSigner + 'static,
|
||||
MF: MessageFinalizer + 'static,
|
||||
{
|
||||
type Item = ();
|
||||
type Error = ProtoError;
|
||||
@ -241,15 +240,17 @@ where
|
||||
// if there was a message, and the above succesion was succesful,
|
||||
// register the new message, if not do not register, and set the complete to error.
|
||||
// getting a random query id, this mitigates potential cache poisoning.
|
||||
// TODO: for SIG0 we can't change the message id after signing.
|
||||
let query_id = query_id.expect("query_id should have been set above");
|
||||
message.set_id(query_id);
|
||||
|
||||
// update messages need to be signed.
|
||||
if let OpCode::Update = message.op_code() {
|
||||
if let Some(ref signer) = self.signer {
|
||||
// TODO: it's too bad this happens here...
|
||||
if let Err(e) = message.sign(signer, Utc::now().timestamp() as u32) {
|
||||
if let Err(e) = message.finalize(
|
||||
signer,
|
||||
Utc::now().timestamp() as u32,
|
||||
)
|
||||
{
|
||||
warn!("could not sign message: {}", e);
|
||||
complete.send(Err(e.into())).expect(
|
||||
"error notifying wait, possible future leak",
|
||||
@ -384,15 +385,19 @@ impl Future for ClientStreamErrored {
|
||||
}
|
||||
}
|
||||
|
||||
enum ClientStreamOrError<S: Stream<Item = Vec<u8>, Error = io::Error> + 'static, MS: MessageSigner + 'static> {
|
||||
Future(DnsFuture<S, MS>),
|
||||
enum ClientStreamOrError<S, MF>
|
||||
where
|
||||
S: Stream<Item = Vec<u8>, Error = io::Error> + 'static,
|
||||
MF: MessageFinalizer + 'static,
|
||||
{
|
||||
Future(DnsFuture<S, MF>),
|
||||
Errored(ClientStreamErrored),
|
||||
}
|
||||
|
||||
impl<S, MS> Future for ClientStreamOrError<S, MS>
|
||||
impl<S, MF> Future for ClientStreamOrError<S, MF>
|
||||
where
|
||||
S: Stream<Item = Vec<u8>, Error = io::Error> + 'static,
|
||||
MS: MessageSigner + 'static,
|
||||
MF: MessageFinalizer + 'static,
|
||||
{
|
||||
type Item = ();
|
||||
type Error = ProtoError;
|
||||
|
@ -11,6 +11,7 @@
|
||||
//! TRust-DNS Protocol library
|
||||
|
||||
extern crate chrono;
|
||||
extern crate data_encoding;
|
||||
#[macro_use]
|
||||
extern crate error_chain;
|
||||
#[macro_use]
|
||||
|
@ -21,7 +21,6 @@ use std::mem;
|
||||
|
||||
use error::*;
|
||||
use rr::{Record, RecordType};
|
||||
use rr::dnssec::MessageSigner;
|
||||
#[cfg(feature = "openssl")]
|
||||
use rr::{DNSClass, Name, RData};
|
||||
#[cfg(feature = "openssl")]
|
||||
@ -505,7 +504,7 @@ impl Message {
|
||||
/// # Return value
|
||||
///
|
||||
/// The sig0, i.e. signed record, for verifying the sending and package integrity
|
||||
fn sig0(&self) -> &[Record] {
|
||||
pub fn sig0(&self) -> &[Record] {
|
||||
&self.sig0
|
||||
}
|
||||
|
||||
@ -629,192 +628,47 @@ impl Message {
|
||||
Ok(buffer)
|
||||
}
|
||||
|
||||
/// Sign the message, i.e. add a SIG0 record to this Message.
|
||||
/// Finalize the message prior to sending.
|
||||
///
|
||||
/// Subsequent to calling this, the Message should not change.
|
||||
#[cfg(feature = "openssl")]
|
||||
pub fn sign<S: MessageSigner>(&mut self, signer: &S, inception_time: u32) -> ProtoResult<()> {
|
||||
debug!("signing message: {:?}", self);
|
||||
let key_tag: u16 = try!(signer.calculate_key_tag());
|
||||
pub fn finalize<MF: MessageFinalizer>(
|
||||
&mut self,
|
||||
finalizer: &MF,
|
||||
inception_time: u32,
|
||||
) -> ProtoResult<()> {
|
||||
debug!("finalizing message: {:?}", self);
|
||||
let finals: Vec<Record> = finalizer.finalize_message(self, inception_time)?;
|
||||
|
||||
// this is based on RFCs 2535, 2931 and 3007
|
||||
// append all records to message
|
||||
for fin in finals {
|
||||
match fin.rr_type() {
|
||||
// SIG0's are special, and come at the very end of the message
|
||||
RecordType::SIG => self.add_sig0(fin),
|
||||
_ => self.add_additional(fin),
|
||||
};
|
||||
}
|
||||
|
||||
// 'For all SIG(0) RRs, the owner name, class, TTL, and original TTL, are
|
||||
// meaningless.' - 2931
|
||||
let mut sig0 = Record::new();
|
||||
|
||||
// The TTL fields SHOULD be zero
|
||||
sig0.set_ttl(0);
|
||||
|
||||
// The CLASS field SHOULD be ANY
|
||||
sig0.set_dns_class(DNSClass::ANY);
|
||||
|
||||
// The owner name SHOULD be root (a single zero octet).
|
||||
sig0.set_name(Name::root());
|
||||
let num_labels = sig0.name().num_labels();
|
||||
|
||||
let expiration_time: u32 = inception_time + (5 * 60); // +5 minutes in seconds
|
||||
|
||||
sig0.set_rr_type(RecordType::SIG);
|
||||
let pre_sig0 = SIG::new(
|
||||
// type covered in SIG(0) is 0 which is what makes this SIG0 vs a standard SIG
|
||||
RecordType::NULL,
|
||||
signer.algorithm(),
|
||||
num_labels,
|
||||
// see above, original_ttl is meaningless, The TTL fields SHOULD be zero
|
||||
0,
|
||||
// recommended time is +5 minutes from now, to prevent timing attacks, 2 is probably good
|
||||
expiration_time,
|
||||
// current time, this should be UTC
|
||||
// unsigned numbers of seconds since the start of 1 January 1970, GMT
|
||||
inception_time,
|
||||
key_tag,
|
||||
// can probably get rid of this clone if the owndership is correct
|
||||
signer.signer_name().clone(),
|
||||
Vec::new(),
|
||||
);
|
||||
let signature: Vec<u8> = try!(signer.sign_message(self, &pre_sig0));
|
||||
sig0.set_rdata(RData::SIG(pre_sig0.set_sig(signature)));
|
||||
|
||||
debug!("sig0: {:?}", sig0);
|
||||
|
||||
self.add_sig0(sig0);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Always returns an error; enable OpenSSL for signing support
|
||||
#[cfg(not(feature = "openssl"))]
|
||||
pub fn sign<S: MessageSigner>(&mut self, _: &S, _: u32) -> ProtoResult<()> {
|
||||
Err(
|
||||
ProtoErrorKind::Message("openssl feature not enabled").into(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// To reduce errors in using the Message struct as an Update, this will do the call throughs
|
||||
/// to properly do that.
|
||||
/// A trait for performing final ammendments to a Message before it is sent.
|
||||
///
|
||||
/// Generally rather than constructin this by hand, see the update methods on `Client`
|
||||
pub trait UpdateMessage: Debug {
|
||||
/// see `Header::id`
|
||||
fn id(&self) -> u16;
|
||||
|
||||
/// Adds the zone section, i.e. name.example.com would be example.com
|
||||
fn add_zone(&mut self, query: Query);
|
||||
|
||||
/// Add the pre-requisite records
|
||||
/// An example of this is a SIG0 signer, which needs the final form of the message,
|
||||
/// but then needs to attach additional data to the body of the message.
|
||||
pub trait MessageFinalizer {
|
||||
/// The message taken in should be processed and then return [`Record`]s which should be
|
||||
/// appended to the additional section of the message.
|
||||
///
|
||||
/// These must exist, or not, for the Update request to go through.
|
||||
fn add_pre_requisite(&mut self, record: Record);
|
||||
|
||||
/// Add all pre-requisites to the UpdateMessage
|
||||
#[deprecated = "will be removed post 0.9.x"]
|
||||
fn add_all_pre_requisites(&mut self, vector: &[&Record]);
|
||||
|
||||
/// Add all the Records from the Iterator to the pre-reqisites section
|
||||
fn add_pre_requisites<R, I>(&mut self, records: R)
|
||||
where
|
||||
R: IntoIterator<Item = Record, IntoIter = I>,
|
||||
I: Iterator<Item = Record>;
|
||||
|
||||
/// Add the Record to be updated
|
||||
fn add_update(&mut self, record: Record);
|
||||
|
||||
/// Add the set of Records to be updated
|
||||
#[deprecated = "will be removed post 0.9.x"]
|
||||
fn add_all_updates(&mut self, vector: &[&Record]);
|
||||
|
||||
/// Add the Records from the Iterator to the updates section
|
||||
fn add_updates<R, I>(&mut self, records: R)
|
||||
where
|
||||
R: IntoIterator<Item = Record, IntoIter = I>,
|
||||
I: Iterator<Item = Record>;
|
||||
|
||||
/// Add Records to the additional Section of hte UpdateMessage
|
||||
fn add_additional(&mut self, record: Record);
|
||||
|
||||
/// Returns the Zones to be updated, generally should only be one.
|
||||
fn zones(&self) -> &[Query];
|
||||
|
||||
/// Returns the pre-requisites
|
||||
fn prerequisites(&self) -> &[Record];
|
||||
|
||||
/// Returns the records to be updated
|
||||
fn updates(&self) -> &[Record];
|
||||
|
||||
/// Returns the additonal records
|
||||
fn additionals(&self) -> &[Record];
|
||||
|
||||
/// This is used to authenticate update messages.
|
||||
/// # Arguments
|
||||
///
|
||||
/// see `Message::sig0()` for more information.
|
||||
fn sig0(&self) -> &[Record];
|
||||
|
||||
/// Signs the UpdateMessage, used to validate the authenticity and authorization of UpdateMessage
|
||||
fn sign<S: MessageSigner>(&mut self, signer: &S, inception_time: u32) -> ProtoResult<()>;
|
||||
}
|
||||
|
||||
/// to reduce errors in using the Message struct as an Update, this will do the call throughs
|
||||
/// to properly do that.
|
||||
impl UpdateMessage for Message {
|
||||
fn id(&self) -> u16 {
|
||||
self.id()
|
||||
}
|
||||
fn add_zone(&mut self, query: Query) {
|
||||
self.add_query(query);
|
||||
}
|
||||
fn add_pre_requisite(&mut self, record: Record) {
|
||||
self.add_answer(record);
|
||||
}
|
||||
fn add_all_pre_requisites(&mut self, vector: &[&Record]) {
|
||||
self.add_answers(vector.into_iter().map(|r| (*r).clone()));
|
||||
}
|
||||
fn add_pre_requisites<R, I>(&mut self, records: R)
|
||||
where
|
||||
R: IntoIterator<Item = Record, IntoIter = I>,
|
||||
I: Iterator<Item = Record>,
|
||||
{
|
||||
self.add_answers(records);
|
||||
}
|
||||
fn add_update(&mut self, record: Record) {
|
||||
self.add_name_server(record);
|
||||
}
|
||||
fn add_all_updates(&mut self, vector: &[&Record]) {
|
||||
self.add_name_servers(vector.into_iter().map(|r| (*r).clone()));
|
||||
}
|
||||
fn add_updates<R, I>(&mut self, records: R)
|
||||
where
|
||||
R: IntoIterator<Item = Record, IntoIter = I>,
|
||||
I: Iterator<Item = Record>,
|
||||
{
|
||||
self.add_name_servers(records);
|
||||
}
|
||||
fn add_additional(&mut self, record: Record) {
|
||||
self.add_additional(record);
|
||||
}
|
||||
|
||||
fn zones(&self) -> &[Query] {
|
||||
self.queries()
|
||||
}
|
||||
fn prerequisites(&self) -> &[Record] {
|
||||
self.answers()
|
||||
}
|
||||
fn updates(&self) -> &[Record] {
|
||||
self.name_servers()
|
||||
}
|
||||
fn additionals(&self) -> &[Record] {
|
||||
self.additionals()
|
||||
}
|
||||
|
||||
fn sig0(&self) -> &[Record] {
|
||||
self.sig0()
|
||||
}
|
||||
|
||||
// TODO: where's the 'right' spot for this function
|
||||
|
||||
fn sign<S: MessageSigner>(&mut self, signer: &S, inception_time: u32) -> ProtoResult<()> {
|
||||
Message::sign(self, signer, inception_time)
|
||||
}
|
||||
/// * `message` - messge to process
|
||||
/// * `current_time` - the current time as specified by the system, it's not recommended to read the current time as that makes testing complicated.
|
||||
///
|
||||
/// # Return
|
||||
///
|
||||
/// A vector to append to the additionals section of the message, sorted in the order as they should appear in the message.
|
||||
fn finalize_message(&self, message: &Message, current_time: u32) -> ProtoResult<Vec<Record>>;
|
||||
}
|
||||
|
||||
impl BinSerializable<Message> for Message {
|
||||
|
@ -27,7 +27,7 @@ pub mod response_code;
|
||||
pub use self::edns::Edns;
|
||||
pub use self::header::Header;
|
||||
pub use self::header::MessageType;
|
||||
pub use self::message::{Message, UpdateMessage};
|
||||
pub use self::message::{Message, MessageFinalizer};
|
||||
pub use self::op_code::OpCode;
|
||||
pub use self::query::Query;
|
||||
pub use self::response_code::ResponseCode;
|
||||
|
@ -24,7 +24,6 @@ mod nsec3;
|
||||
pub mod public_key;
|
||||
#[cfg(any(feature = "openssl", feature = "ring"))]
|
||||
mod rsa_public_key;
|
||||
mod signer;
|
||||
mod supported_algorithm;
|
||||
mod trust_anchor;
|
||||
pub mod tbs;
|
||||
@ -37,7 +36,6 @@ pub use self::public_key::PublicKey;
|
||||
pub use self::public_key::PublicKeyBuf;
|
||||
pub use self::public_key::PublicKeyEnum;
|
||||
pub use self::supported_algorithm::SupportedAlgorithms;
|
||||
pub use self::signer::MessageSigner;
|
||||
pub use self::tbs::TBS;
|
||||
pub use self::trust_anchor::TrustAnchor;
|
||||
pub use self::verifier::Verifier;
|
||||
|
@ -1,24 +0,0 @@
|
||||
// Copyright 2015-2017 Benjamin Fry <benjaminfry@me.com>
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
|
||||
// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
|
||||
// http://opensource.org/licenses/MIT>, at your option. This file may not be
|
||||
// copied, modified, or distributed except according to those terms.
|
||||
|
||||
use error::*;
|
||||
use op::Message;
|
||||
use rr::Name;
|
||||
use rr::dnssec::Algorithm;
|
||||
use rr::rdata::SIG;
|
||||
|
||||
/// FIXME: get docs from Signer in Client
|
||||
pub trait MessageSigner {
|
||||
/// FIXME: get docs from Signer in Client
|
||||
fn algorithm(&self) -> Algorithm;
|
||||
/// FIXME: get docs from Signer in Client
|
||||
fn calculate_key_tag(&self) -> ProtoResult<u16>;
|
||||
/// FIXME: get docs from Signer in Client
|
||||
fn signer_name(&self) -> &Name;
|
||||
/// FIXME: get docs from Signer in Client
|
||||
fn sign_message(&self, message: &Message, pre_sig0: &SIG) -> ProtoResult<Vec<u8>>;
|
||||
}
|
@ -297,106 +297,3 @@ pub fn determine_name(name: &Name, num_labels: u8) -> Option<Name> {
|
||||
// TODO: this should be an error
|
||||
None
|
||||
}
|
||||
|
||||
#[cfg(feature = "openssl")]
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
extern crate openssl;
|
||||
use self::openssl::rsa::Rsa;
|
||||
|
||||
use rr::{Name, RecordType};
|
||||
use rr::rdata::SIG;
|
||||
use rr::dnssec::{KeyPair, Signer};
|
||||
|
||||
pub use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_rrset_tbs() {
|
||||
let rsa = Rsa::generate(2048).unwrap();
|
||||
let key = KeyPair::from_rsa(rsa).unwrap();
|
||||
let sig0key = key.to_sig0key(Algorithm::RSASHA256).unwrap();
|
||||
let signer = Signer::sig0(sig0key, key, Name::root());
|
||||
|
||||
let origin: Name = Name::parse("example.com.", None).unwrap();
|
||||
let rrsig = Record::new()
|
||||
.set_name(origin.clone())
|
||||
.set_ttl(86400)
|
||||
.set_rr_type(RecordType::NS)
|
||||
.set_dns_class(DNSClass::IN)
|
||||
.set_rdata(RData::SIG(SIG::new(
|
||||
RecordType::NS,
|
||||
Algorithm::RSASHA256,
|
||||
origin.num_labels(),
|
||||
86400,
|
||||
5,
|
||||
0,
|
||||
signer.calculate_key_tag().unwrap(),
|
||||
origin.clone(),
|
||||
vec![],
|
||||
)))
|
||||
.clone();
|
||||
let rrset = vec![
|
||||
Record::new()
|
||||
.set_name(origin.clone())
|
||||
.set_ttl(86400)
|
||||
.set_rr_type(RecordType::NS)
|
||||
.set_dns_class(DNSClass::IN)
|
||||
.set_rdata(RData::NS(Name::parse("a.iana-servers.net.", None).unwrap()))
|
||||
.clone(),
|
||||
Record::new()
|
||||
.set_name(origin.clone())
|
||||
.set_ttl(86400)
|
||||
.set_rr_type(RecordType::NS)
|
||||
.set_dns_class(DNSClass::IN)
|
||||
.set_rdata(RData::NS(Name::parse("b.iana-servers.net.", None).unwrap()))
|
||||
.clone(),
|
||||
];
|
||||
|
||||
let tbs = rrset_tbs_with_rrsig(&rrsig, &rrset).unwrap();
|
||||
assert!(!tbs.0.is_empty());
|
||||
|
||||
let rrset = vec![
|
||||
Record::new()
|
||||
.set_name(origin.clone())
|
||||
.set_ttl(86400)
|
||||
.set_rr_type(RecordType::CNAME)
|
||||
.set_dns_class(DNSClass::IN)
|
||||
.set_rdata(RData::CNAME(
|
||||
Name::parse("a.iana-servers.net.", None).unwrap(),
|
||||
))
|
||||
.clone(), // different type
|
||||
Record::new()
|
||||
.set_name(Name::parse("www.example.com.", None).unwrap())
|
||||
.set_ttl(86400)
|
||||
.set_rr_type(RecordType::NS)
|
||||
.set_dns_class(DNSClass::IN)
|
||||
.set_rdata(RData::NS(Name::parse("a.iana-servers.net.", None).unwrap()))
|
||||
.clone(), // different name
|
||||
Record::new()
|
||||
.set_name(origin.clone())
|
||||
.set_ttl(86400)
|
||||
.set_rr_type(RecordType::NS)
|
||||
.set_dns_class(DNSClass::CH)
|
||||
.set_rdata(RData::NS(Name::parse("a.iana-servers.net.", None).unwrap()))
|
||||
.clone(), // different class
|
||||
Record::new()
|
||||
.set_name(origin.clone())
|
||||
.set_ttl(86400)
|
||||
.set_rr_type(RecordType::NS)
|
||||
.set_dns_class(DNSClass::IN)
|
||||
.set_rdata(RData::NS(Name::parse("a.iana-servers.net.", None).unwrap()))
|
||||
.clone(),
|
||||
Record::new()
|
||||
.set_name(origin.clone())
|
||||
.set_ttl(86400)
|
||||
.set_rr_type(RecordType::NS)
|
||||
.set_dns_class(DNSClass::IN)
|
||||
.set_rdata(RData::NS(Name::parse("b.iana-servers.net.", None).unwrap()))
|
||||
.clone(),
|
||||
];
|
||||
|
||||
let filtered_tbs = rrset_tbs_with_rrsig(&rrsig, &rrset).unwrap();
|
||||
assert!(!filtered_tbs.0.is_empty());
|
||||
assert_eq!(tbs.as_ref(), filtered_tbs.as_ref());
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user