remove response future type parameter where possible
This commit is contained in:
parent
7d9b186121
commit
3ba3937f2d
@ -119,11 +119,10 @@ fn trust_dns_process() -> (NamedProcess, u16) {
|
||||
}
|
||||
|
||||
/// Runs the bench tesk using the specified client
|
||||
fn bench<F, S, R>(b: &mut Bencher, stream: F)
|
||||
fn bench<F, S>(b: &mut Bencher, stream: F)
|
||||
where
|
||||
F: Future<Output = Result<S, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender<DnsResponseFuture = R>,
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender,
|
||||
{
|
||||
let mut io_loop = Runtime::new().unwrap();
|
||||
let client = AsyncClient::connect(stream);
|
||||
|
@ -21,9 +21,7 @@ use tokio::runtime::Runtime;
|
||||
|
||||
use trust_dns_client::client::*;
|
||||
use trust_dns_client::proto::tcp::TcpClientStream;
|
||||
use trust_dns_client::proto::xfer::{
|
||||
DnsExchangeBackground, DnsMultiplexer, DnsMultiplexerSerialResponse,
|
||||
};
|
||||
use trust_dns_client::proto::xfer::{DnsExchangeBackground, DnsMultiplexer};
|
||||
use trust_dns_client::proto::DnssecDnsHandle;
|
||||
use trust_dns_client::rr::dnssec::*;
|
||||
use trust_dns_proto::{iocompat::AsyncIo02As03, TokioTime};
|
||||
@ -65,10 +63,9 @@ fn trust_anchor(public_key_path: &Path, format: KeyFormat, algorithm: Algorithm)
|
||||
async fn standard_tcp_conn(
|
||||
port: u16,
|
||||
) -> (
|
||||
AsyncClient<DnsMultiplexerSerialResponse>,
|
||||
AsyncClient,
|
||||
DnsExchangeBackground<
|
||||
DnsMultiplexer<TcpClientStream<AsyncIo02As03<TokioTcpStream>>, Signer>,
|
||||
DnsMultiplexerSerialResponse,
|
||||
TokioTime,
|
||||
>,
|
||||
) {
|
||||
|
@ -11,12 +11,10 @@ use std::sync::*;
|
||||
use std::thread;
|
||||
use std::time::*;
|
||||
|
||||
use futures::Future;
|
||||
use regex::Regex;
|
||||
use tokio::runtime::Runtime;
|
||||
|
||||
use trust_dns_client::client::*;
|
||||
use trust_dns_client::proto::error::ProtoError;
|
||||
use trust_dns_client::proto::xfer::DnsResponse;
|
||||
use trust_dns_client::rr::dnssec::*;
|
||||
use trust_dns_client::rr::rdata::{DNSSECRData, DNSSECRecordType};
|
||||
@ -225,14 +223,12 @@ pub fn query_a<C: ClientHandle>(io_loop: &mut Runtime, client: &mut C) {
|
||||
// This only validates that a query to the server works, it shouldn't be used for more than this.
|
||||
// i.e. more complex checks live with the clients and authorities to validate deeper functionality
|
||||
#[allow(dead_code)]
|
||||
pub fn query_all_dnssec<R>(
|
||||
pub fn query_all_dnssec(
|
||||
io_loop: &mut Runtime,
|
||||
client: AsyncClient<R>,
|
||||
client: AsyncClient,
|
||||
algorithm: Algorithm,
|
||||
with_rfc6975: bool,
|
||||
) where
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + Send + Unpin,
|
||||
{
|
||||
) {
|
||||
let name = Name::from_str("example.com.").unwrap();
|
||||
let mut client = MutMessageHandle::new(client);
|
||||
client.dnssec_ok = true;
|
||||
@ -285,23 +281,19 @@ pub fn query_all_dnssec<R>(
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn query_all_dnssec_with_rfc6975<R>(
|
||||
pub fn query_all_dnssec_with_rfc6975(
|
||||
io_loop: &mut Runtime,
|
||||
client: AsyncClient<R>,
|
||||
client: AsyncClient,
|
||||
algorithm: Algorithm,
|
||||
) where
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + Send + Unpin,
|
||||
{
|
||||
) {
|
||||
query_all_dnssec(io_loop, client, algorithm, true)
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn query_all_dnssec_wo_rfc6975<R>(
|
||||
pub fn query_all_dnssec_wo_rfc6975(
|
||||
io_loop: &mut Runtime,
|
||||
client: AsyncClient<R>,
|
||||
client: AsyncClient,
|
||||
algorithm: Algorithm,
|
||||
) where
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + Send + Unpin,
|
||||
{
|
||||
) {
|
||||
query_all_dnssec(io_loop, client, algorithm, false)
|
||||
}
|
||||
|
@ -14,7 +14,7 @@ use crate::proto::error::ProtoError;
|
||||
use crate::proto::xfer::{
|
||||
DnsClientStream, DnsExchange, DnsExchangeBackground, DnsExchangeConnect, DnsExchangeSend,
|
||||
DnsHandle, DnsMultiplexer, DnsMultiplexerConnect, DnsRequest, DnsRequestOptions,
|
||||
DnsRequestSender, DnsResponse, DnsResponseFuture, DnsStreamHandle,
|
||||
DnsRequestSender, DnsResponse, DnsStreamHandle,
|
||||
};
|
||||
use crate::proto::TokioTime;
|
||||
use futures::{ready, Future, FutureExt};
|
||||
@ -33,20 +33,17 @@ pub const MAX_PAYLOAD_LEN: u16 = 1500 - 40 - 8; // 1500 (general MTU) - 40 (ipv6
|
||||
///
|
||||
/// This Client is generic and capable of wrapping UDP, TCP, and other underlying DNS protocol
|
||||
/// implementations.
|
||||
pub type ClientFuture<R> = AsyncClient<R>;
|
||||
pub type ClientFuture = AsyncClient;
|
||||
|
||||
/// A DNS Client implemented over futures-rs.
|
||||
///
|
||||
/// This Client is generic and capable of wrapping UDP, TCP, and other underlying DNS protocol
|
||||
/// implementations.
|
||||
pub struct AsyncClient<R>
|
||||
where
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
{
|
||||
exchange: DnsExchange<R>,
|
||||
pub struct AsyncClient {
|
||||
exchange: DnsExchange,
|
||||
}
|
||||
|
||||
impl AsyncClient<DnsResponseFuture> {
|
||||
impl AsyncClient {
|
||||
/// Spawns a new AsyncClient Stream. This uses a default timeout of 5 seconds for all requests.
|
||||
///
|
||||
/// # Arguments
|
||||
@ -60,11 +57,7 @@ impl AsyncClient<DnsResponseFuture> {
|
||||
stream: F,
|
||||
stream_handle: Box<dyn DnsStreamHandle>,
|
||||
signer: Option<Arc<Signer>>,
|
||||
) -> AsyncClientConnect<
|
||||
DnsMultiplexerConnect<F, S, Signer>,
|
||||
DnsMultiplexer<S, Signer>,
|
||||
DnsResponseFuture,
|
||||
>
|
||||
) -> AsyncClientConnect<DnsMultiplexerConnect<F, S, Signer>, DnsMultiplexer<S, Signer>>
|
||||
where
|
||||
F: Future<Output = Result<S, ProtoError>> + Send + Unpin + 'static,
|
||||
S: DnsClientStream + Unpin + 'static,
|
||||
@ -87,24 +80,17 @@ impl AsyncClient<DnsResponseFuture> {
|
||||
stream_handle: Box<dyn DnsStreamHandle>,
|
||||
timeout_duration: Duration,
|
||||
signer: Option<Arc<Signer>>,
|
||||
) -> AsyncClientConnect<
|
||||
DnsMultiplexerConnect<F, S, Signer>,
|
||||
DnsMultiplexer<S, Signer>,
|
||||
DnsResponseFuture,
|
||||
>
|
||||
) -> AsyncClientConnect<DnsMultiplexerConnect<F, S, Signer>, DnsMultiplexer<S, Signer>>
|
||||
where
|
||||
F: Future<Output = Result<S, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsClientStream + Unpin + 'static,
|
||||
{
|
||||
let mp = DnsMultiplexer::with_timeout(stream, stream_handle, timeout_duration, signer);
|
||||
Self::connect(mp).into()
|
||||
Self::connect(mp)
|
||||
}
|
||||
}
|
||||
|
||||
impl<R> AsyncClient<R>
|
||||
where
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
{
|
||||
impl AsyncClient {
|
||||
/// Returns a future, which itself wraps a future which is awaiting connection.
|
||||
///
|
||||
/// The connect_future should be lazy.
|
||||
@ -114,19 +100,16 @@ where
|
||||
/// This returns a tuple of Self a handle to send dns messages and an optional background.
|
||||
/// The background task must be run on an executor before handle is used, if it is Some.
|
||||
/// If it is None, then another thread has already run the background.
|
||||
pub fn connect<F, S>(connect_future: F) -> AsyncClientConnect<F, S, R>
|
||||
pub fn connect<F, S>(connect_future: F) -> AsyncClientConnect<F, S>
|
||||
where
|
||||
S: DnsRequestSender<DnsResponseFuture = R>,
|
||||
S: DnsRequestSender,
|
||||
F: Future<Output = Result<S, ProtoError>> + 'static + Send + Unpin,
|
||||
{
|
||||
AsyncClientConnect(DnsExchange::connect(connect_future))
|
||||
}
|
||||
}
|
||||
|
||||
impl<R> Clone for AsyncClient<R>
|
||||
where
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
{
|
||||
impl Clone for AsyncClient {
|
||||
fn clone(&self) -> Self {
|
||||
AsyncClient {
|
||||
exchange: self.exchange.clone(),
|
||||
@ -134,11 +117,8 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<Resp> DnsHandle for AsyncClient<Resp>
|
||||
where
|
||||
Resp: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
{
|
||||
type Response = DnsExchangeSend<Resp>;
|
||||
impl DnsHandle for AsyncClient {
|
||||
type Response = DnsExchangeSend;
|
||||
type Error = ProtoError;
|
||||
|
||||
fn send<R: Into<DnsRequest> + Unpin + Send + 'static>(&mut self, request: R) -> Self::Response {
|
||||
@ -148,20 +128,18 @@ where
|
||||
|
||||
/// A future that resolves to an AsyncClient
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
pub struct AsyncClientConnect<F, S, R>(DnsExchangeConnect<F, S, R, TokioTime>)
|
||||
pub struct AsyncClientConnect<F, S>(DnsExchangeConnect<F, S, TokioTime>)
|
||||
where
|
||||
F: Future<Output = Result<S, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static,
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin;
|
||||
S: DnsRequestSender + 'static;
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
impl<F, S, R> Future for AsyncClientConnect<F, S, R>
|
||||
impl<F, S> Future for AsyncClientConnect<F, S>
|
||||
where
|
||||
F: Future<Output = Result<S, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static + Send + Unpin,
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender + 'static + Send + Unpin,
|
||||
{
|
||||
type Output = Result<(AsyncClient<R>, DnsExchangeBackground<S, R, TokioTime>), ProtoError>;
|
||||
type Output = Result<(AsyncClient, DnsExchangeBackground<S, TokioTime>), ProtoError>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
let result = ready!(self.0.poll_unpin(cx));
|
||||
|
@ -24,22 +24,16 @@ use crate::proto::TokioTime;
|
||||
///
|
||||
/// This Client is generic and capable of wrapping UDP, TCP, and other underlying DNS protocol
|
||||
/// implementations.
|
||||
pub struct AsyncDnssecClient<R>
|
||||
where
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
{
|
||||
client: DnssecDnsHandle<AsyncClient<R>>,
|
||||
pub struct AsyncDnssecClient {
|
||||
client: DnssecDnsHandle<AsyncClient>,
|
||||
}
|
||||
|
||||
impl<R> AsyncDnssecClient<R>
|
||||
where
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
{
|
||||
impl AsyncDnssecClient {
|
||||
/// Returns a DNSSEC verifying client with a TrustAnchor that can be replaced
|
||||
pub fn builder<F, S>(connect_future: F) -> AsyncSecureClientBuilder<F, S, R>
|
||||
pub fn builder<F, S>(connect_future: F) -> AsyncSecureClientBuilder<F, S>
|
||||
where
|
||||
F: Future<Output = Result<S, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static,
|
||||
S: DnsRequestSender + 'static,
|
||||
{
|
||||
let client_connect = AsyncClient::connect(connect_future);
|
||||
AsyncSecureClientBuilder {
|
||||
@ -49,25 +43,22 @@ where
|
||||
}
|
||||
|
||||
/// Returns a DNSSEC verifying client with the default TrustAnchor
|
||||
pub fn connect<F, S>(connect_future: F) -> AsyncSecureClientConnect<F, S, R>
|
||||
pub fn connect<F, S>(connect_future: F) -> AsyncSecureClientConnect<F, S>
|
||||
where
|
||||
F: Future<Output = Result<S, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static,
|
||||
S: DnsRequestSender + 'static,
|
||||
{
|
||||
Self::builder(connect_future).build()
|
||||
}
|
||||
|
||||
fn from_client(client: AsyncClient<R>, trust_anchor: TrustAnchor) -> Self {
|
||||
fn from_client(client: AsyncClient, trust_anchor: TrustAnchor) -> Self {
|
||||
Self {
|
||||
client: DnssecDnsHandle::with_trust_anchor(client, trust_anchor),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<R> Clone for AsyncDnssecClient<R>
|
||||
where
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
{
|
||||
impl Clone for AsyncDnssecClient {
|
||||
fn clone(&self) -> Self {
|
||||
AsyncDnssecClient {
|
||||
client: self.client.clone(),
|
||||
@ -75,10 +66,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<Resp> DnsHandle for AsyncDnssecClient<Resp>
|
||||
where
|
||||
Resp: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
{
|
||||
impl DnsHandle for AsyncDnssecClient {
|
||||
type Response =
|
||||
Pin<Box<(dyn Future<Output = Result<DnsResponse, ProtoError>> + Send + 'static)>>;
|
||||
type Error = ProtoError;
|
||||
@ -90,22 +78,20 @@ where
|
||||
|
||||
/// A builder to allow a custom trust to be used for validating all signed records
|
||||
#[cfg(feature = "dnssec")]
|
||||
pub struct AsyncSecureClientBuilder<F, S, R>
|
||||
pub struct AsyncSecureClientBuilder<F, S>
|
||||
where
|
||||
F: Future<Output = Result<S, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static,
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender + 'static,
|
||||
{
|
||||
client_connect: AsyncClientConnect<F, S, R>,
|
||||
client_connect: AsyncClientConnect<F, S>,
|
||||
trust_anchor: Option<TrustAnchor>,
|
||||
}
|
||||
|
||||
#[cfg(feature = "dnssec")]
|
||||
impl<F, S, R> AsyncSecureClientBuilder<F, S, R>
|
||||
impl<F, S> AsyncSecureClientBuilder<F, S>
|
||||
where
|
||||
F: Future<Output = Result<S, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static,
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender + 'static,
|
||||
{
|
||||
/// This variant allows for the trust_anchor to be replaced
|
||||
///
|
||||
@ -119,7 +105,7 @@ where
|
||||
}
|
||||
|
||||
/// Construct the new client connect
|
||||
pub fn build(mut self) -> AsyncSecureClientConnect<F, S, R> {
|
||||
pub fn build(mut self) -> AsyncSecureClientConnect<F, S> {
|
||||
let trust_anchor = if let Some(trust_anchor) = self.trust_anchor.take() {
|
||||
trust_anchor
|
||||
} else {
|
||||
@ -135,25 +121,22 @@ where
|
||||
|
||||
/// A future which will resolve to a AsyncDnssecClient
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
pub struct AsyncSecureClientConnect<F, S, R>
|
||||
pub struct AsyncSecureClientConnect<F, S>
|
||||
where
|
||||
F: Future<Output = Result<S, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static,
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender + 'static,
|
||||
{
|
||||
client_connect: AsyncClientConnect<F, S, R>,
|
||||
client_connect: AsyncClientConnect<F, S>,
|
||||
trust_anchor: Option<TrustAnchor>,
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
impl<F, S, R> Future for AsyncSecureClientConnect<F, S, R>
|
||||
impl<F, S> Future for AsyncSecureClientConnect<F, S>
|
||||
where
|
||||
F: Future<Output = Result<S, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static + Send + Unpin,
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender + 'static + Send + Unpin,
|
||||
{
|
||||
type Output =
|
||||
Result<(AsyncDnssecClient<R>, DnsExchangeBackground<S, R, TokioTime>), ProtoError>;
|
||||
type Output = Result<(AsyncDnssecClient, DnsExchangeBackground<S, TokioTime>), ProtoError>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
let result = ready!(self.client_connect.poll_unpin(cx));
|
||||
|
@ -432,8 +432,8 @@ impl<CC: ClientConnection> SyncClient<CC> {
|
||||
}
|
||||
|
||||
impl<CC: ClientConnection> Client for SyncClient<CC> {
|
||||
type Response = DnsExchangeSend<CC::Response>;
|
||||
type Handle = AsyncClient<CC::Response>;
|
||||
type Response = DnsExchangeSend;
|
||||
type Handle = AsyncClient;
|
||||
|
||||
fn new_future(&self) -> NewFutureObj<Self::Handle> {
|
||||
let stream = self.conn.new_stream(self.signer.clone());
|
||||
@ -478,7 +478,7 @@ impl<CC: ClientConnection> SyncDnssecClient<CC> {
|
||||
impl<CC: ClientConnection> Client for SyncDnssecClient<CC> {
|
||||
type Response =
|
||||
Pin<Box<(dyn Future<Output = Result<DnsResponse, ProtoError>> + Send + 'static)>>;
|
||||
type Handle = AsyncDnssecClient<CC::Response>;
|
||||
type Handle = AsyncDnssecClient;
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
fn new_future(&self) -> NewFutureObj<Self::Handle> {
|
||||
|
@ -17,19 +17,14 @@ use std::sync::Arc;
|
||||
|
||||
use futures::Future;
|
||||
|
||||
use trust_dns_proto::{
|
||||
error::ProtoError,
|
||||
xfer::{DnsRequestSender, DnsResponse},
|
||||
};
|
||||
use trust_dns_proto::{error::ProtoError, xfer::DnsRequestSender};
|
||||
|
||||
use crate::rr::dnssec::Signer;
|
||||
|
||||
/// Trait for client connections
|
||||
pub trait ClientConnection: 'static + Sized + Send + Sync + Unpin {
|
||||
/// The associated DNS RequestSender type.
|
||||
type Sender: DnsRequestSender<DnsResponseFuture = Self::Response>;
|
||||
/// Response type of the RequestSender
|
||||
type Response: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin;
|
||||
type Sender: DnsRequestSender;
|
||||
/// A future that resolves to the RequestSender
|
||||
type SenderFuture: Future<Output = Result<Self::Sender, ProtoError>> + 'static + Send + Unpin;
|
||||
|
||||
|
@ -14,7 +14,6 @@ use std::sync::Arc;
|
||||
use rustls::{Certificate, ClientConfig};
|
||||
use trust_dns_https::{HttpsClientConnect, HttpsClientStream, HttpsClientStreamBuilder};
|
||||
use trust_dns_proto::tcp::Connect;
|
||||
use trust_dns_proto::xfer::DnsRequestSender;
|
||||
|
||||
use crate::client::ClientConnection;
|
||||
use crate::rr::dnssec::Signer;
|
||||
@ -50,7 +49,6 @@ where
|
||||
T: Connect + 'static + Unpin + Sync + Send,
|
||||
{
|
||||
type Sender = HttpsClientStream;
|
||||
type Response = <Self::Sender as DnsRequestSender>::DnsResponseFuture;
|
||||
type SenderFuture = HttpsClientConnect<T>;
|
||||
|
||||
fn new_stream(
|
||||
|
@ -12,7 +12,7 @@ use std::sync::Arc;
|
||||
|
||||
use crate::proto::{
|
||||
multicast::{MdnsClientConnect, MdnsClientStream, MdnsQueryType, MDNS_IPV4, MDNS_IPV6},
|
||||
xfer::{DnsMultiplexer, DnsMultiplexerConnect, DnsRequestSender},
|
||||
xfer::{DnsMultiplexer, DnsMultiplexerConnect},
|
||||
};
|
||||
|
||||
use crate::client::ClientConnection;
|
||||
@ -53,7 +53,6 @@ impl MdnsClientConnection {
|
||||
|
||||
impl ClientConnection for MdnsClientConnection {
|
||||
type Sender = DnsMultiplexer<MdnsClientStream, Signer>;
|
||||
type Response = <Self::Sender as DnsRequestSender>::DnsResponseFuture;
|
||||
type SenderFuture = DnsMultiplexerConnect<MdnsClientConnect, MdnsClientStream, Signer>;
|
||||
|
||||
fn new_stream(&self, signer: Option<Arc<Signer>>) -> Self::SenderFuture {
|
||||
|
@ -17,7 +17,7 @@ use crate::client::ClientConnection;
|
||||
use crate::error::*;
|
||||
use crate::proto::iocompat::AsyncIo02As03;
|
||||
use crate::proto::tcp::{TcpClientConnect, TcpClientStream};
|
||||
use crate::proto::xfer::{DnsMultiplexer, DnsMultiplexerConnect, DnsRequestSender};
|
||||
use crate::proto::xfer::{DnsMultiplexer, DnsMultiplexerConnect};
|
||||
use crate::proto::TokioTime;
|
||||
use crate::rr::dnssec::Signer;
|
||||
|
||||
@ -63,7 +63,6 @@ impl TcpClientConnection {
|
||||
|
||||
impl ClientConnection for TcpClientConnection {
|
||||
type Sender = DnsMultiplexer<TcpClientStream<AsyncIo02As03<TcpStream>>, Signer>;
|
||||
type Response = <Self::Sender as DnsRequestSender>::DnsResponseFuture;
|
||||
type SenderFuture = DnsMultiplexerConnect<
|
||||
TcpClientConnect<AsyncIo02As03<TcpStream>>,
|
||||
TcpClientStream<AsyncIo02As03<TcpStream>>,
|
||||
|
@ -12,7 +12,6 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::proto::udp::{UdpClientConnect, UdpClientStream};
|
||||
use crate::proto::xfer::DnsRequestSender;
|
||||
|
||||
use crate::client::ClientConnection;
|
||||
use crate::error::*;
|
||||
@ -50,7 +49,6 @@ impl UdpClientConnection {
|
||||
|
||||
impl ClientConnection for UdpClientConnection {
|
||||
type Sender = UdpClientStream<UdpSocket, Signer>;
|
||||
type Response = <Self::Sender as DnsRequestSender>::DnsResponseFuture;
|
||||
type SenderFuture = UdpClientConnect<UdpSocket, Signer>;
|
||||
|
||||
fn new_stream(&self, signer: Option<Arc<Signer>>) -> Self::SenderFuture {
|
||||
|
@ -180,8 +180,6 @@ impl HttpsClientStream {
|
||||
}
|
||||
|
||||
impl DnsRequestSender for HttpsClientStream {
|
||||
type DnsResponseFuture = DnsResponseFuture;
|
||||
|
||||
/// This indicates that the HTTP message was successfully sent, and we now have the response.RecvStream
|
||||
///
|
||||
/// If the request fails, this will return the error, and it should be assumed that the Stream portion of
|
||||
@ -233,7 +231,7 @@ impl DnsRequestSender for HttpsClientStream {
|
||||
&mut self,
|
||||
mut message: DnsRequest,
|
||||
_cx: &mut Context,
|
||||
) -> Self::DnsResponseFuture {
|
||||
) -> DnsResponseFuture {
|
||||
if self.is_shutdown {
|
||||
panic!("can not send messages after stream is shutdown")
|
||||
}
|
||||
@ -256,7 +254,7 @@ impl DnsRequestSender for HttpsClientStream {
|
||||
.into()
|
||||
}
|
||||
|
||||
fn error_response<TE: Time>(error: ProtoError) -> Self::DnsResponseFuture {
|
||||
fn error_response<TE: Time>(error: ProtoError) -> DnsResponseFuture {
|
||||
Box::pin(future::err(error)).into()
|
||||
}
|
||||
|
||||
|
@ -108,13 +108,11 @@ fn random_query_id() -> u16 {
|
||||
impl<S: UdpSocket + Send + 'static, MF: MessageFinalizer> DnsRequestSender
|
||||
for UdpClientStream<S, MF>
|
||||
{
|
||||
type DnsResponseFuture = DnsResponseFuture;
|
||||
|
||||
fn send_message<TE: Time>(
|
||||
&mut self,
|
||||
mut message: DnsRequest,
|
||||
_cx: &mut Context,
|
||||
) -> Self::DnsResponseFuture {
|
||||
) -> DnsResponseFuture {
|
||||
if self.is_shutdown {
|
||||
panic!("can not send messages after stream is shutdown")
|
||||
}
|
||||
@ -161,7 +159,7 @@ impl<S: UdpSocket + Send + 'static, MF: MessageFinalizer> DnsRequestSender
|
||||
UdpResponse::new::<S, TE>(message, message_id, self.timeout).into()
|
||||
}
|
||||
|
||||
fn error_response<TE: Time>(err: ProtoError) -> Self::DnsResponseFuture {
|
||||
fn error_response<TE: Time>(err: ProtoError) -> DnsResponseFuture {
|
||||
UdpResponse::complete::<_, TE>(SingleUseUdpSocket::errored(err)).into()
|
||||
}
|
||||
|
||||
|
@ -29,17 +29,11 @@ use crate::Time;
|
||||
///
|
||||
/// The underlying `DnsRequestSender` is expected to multiplex any I/O connections. DnsExchange assumes that the underlying stream is responsible for this.
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
pub struct DnsExchange<R>
|
||||
where
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
{
|
||||
sender: BufDnsRequestStreamHandle<R>,
|
||||
pub struct DnsExchange {
|
||||
sender: BufDnsRequestStreamHandle,
|
||||
}
|
||||
|
||||
impl<R> DnsExchange<R>
|
||||
where
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
{
|
||||
impl DnsExchange {
|
||||
/// Initializes a TcpStream with an existing tcp::TcpStream.
|
||||
///
|
||||
/// This is intended for use with a TcpListener and Incoming.
|
||||
@ -47,12 +41,12 @@ where
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `stream` - the established IO stream for communication
|
||||
pub fn from_stream<S, TE>(stream: S) -> (Self, DnsExchangeBackground<S, R, TE>)
|
||||
pub fn from_stream<S, TE>(stream: S) -> (Self, DnsExchangeBackground<S, TE>)
|
||||
where
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender + 'static + Send + Unpin,
|
||||
{
|
||||
let (message_sender, outbound_messages) = unbounded();
|
||||
let message_sender = DnsRequestStreamHandle::<R>::new(message_sender);
|
||||
let message_sender = DnsRequestStreamHandle::new(message_sender);
|
||||
|
||||
Self::from_stream_with_receiver(stream, outbound_messages, message_sender)
|
||||
}
|
||||
@ -60,11 +54,11 @@ where
|
||||
/// Wraps a stream where a sender and receiver have already been established
|
||||
pub fn from_stream_with_receiver<S, TE>(
|
||||
stream: S,
|
||||
receiver: UnboundedReceiver<OneshotDnsRequest<R>>,
|
||||
sender: DnsRequestStreamHandle<R>,
|
||||
) -> (Self, DnsExchangeBackground<S, R, TE>)
|
||||
receiver: UnboundedReceiver<OneshotDnsRequest>,
|
||||
sender: DnsRequestStreamHandle,
|
||||
) -> (Self, DnsExchangeBackground<S, TE>)
|
||||
where
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender + 'static + Send + Unpin,
|
||||
{
|
||||
let background = DnsExchangeBackground {
|
||||
io_stream: stream,
|
||||
@ -80,23 +74,20 @@ where
|
||||
/// Returns a future, which itself wraps a future which is awaiting connection.
|
||||
///
|
||||
/// The connect_future should be lazy.
|
||||
pub fn connect<F, S, TE>(connect_future: F) -> DnsExchangeConnect<F, S, R, TE>
|
||||
pub fn connect<F, S, TE>(connect_future: F) -> DnsExchangeConnect<F, S, TE>
|
||||
where
|
||||
F: Future<Output = Result<S, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender + 'static + Send + Unpin,
|
||||
TE: Time + Unpin,
|
||||
{
|
||||
let (message_sender, outbound_messages) = unbounded();
|
||||
let message_sender = DnsRequestStreamHandle::<R>::new(message_sender);
|
||||
let message_sender = DnsRequestStreamHandle::new(message_sender);
|
||||
|
||||
DnsExchangeConnect::connect(connect_future, outbound_messages, message_sender)
|
||||
}
|
||||
}
|
||||
|
||||
impl<R> Clone for DnsExchange<R>
|
||||
where
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
{
|
||||
impl Clone for DnsExchange {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
sender: self.sender.clone(),
|
||||
@ -104,11 +95,8 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<Resp> DnsHandle for DnsExchange<Resp>
|
||||
where
|
||||
Resp: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
{
|
||||
type Response = DnsExchangeSend<Resp>;
|
||||
impl DnsHandle for DnsExchange {
|
||||
type Response = DnsExchangeSend;
|
||||
type Error = ProtoError;
|
||||
|
||||
fn send<R: Into<DnsRequest> + Unpin + Send + 'static>(&mut self, request: R) -> Self::Response {
|
||||
@ -121,18 +109,12 @@ where
|
||||
|
||||
/// A Future that will resolve to a Response after sending the request
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
pub struct DnsExchangeSend<R>
|
||||
where
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
{
|
||||
result: OneshotDnsResponseReceiver<R>,
|
||||
_sender: BufDnsRequestStreamHandle<R>,
|
||||
pub struct DnsExchangeSend {
|
||||
result: OneshotDnsResponseReceiver,
|
||||
_sender: BufDnsRequestStreamHandle,
|
||||
}
|
||||
|
||||
impl<R> Future for DnsExchangeSend<R>
|
||||
where
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
{
|
||||
impl Future for DnsExchangeSend {
|
||||
type Output = Result<DnsResponse, ProtoError>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
@ -145,35 +127,27 @@ where
|
||||
///
|
||||
/// It must be spawned before any DNS messages are sent.
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
pub struct DnsExchangeBackground<S, R, TE>
|
||||
pub struct DnsExchangeBackground<S, TE>
|
||||
where
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static + Send + Unpin,
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender + 'static + Send + Unpin,
|
||||
{
|
||||
io_stream: S,
|
||||
outbound_messages: Peekable<UnboundedReceiver<OneshotDnsRequest<R>>>,
|
||||
outbound_messages: Peekable<UnboundedReceiver<OneshotDnsRequest>>,
|
||||
marker: PhantomData<TE>,
|
||||
}
|
||||
|
||||
impl<S, R, TE> DnsExchangeBackground<S, R, TE>
|
||||
impl<S, TE> DnsExchangeBackground<S, TE>
|
||||
where
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static + Send + Unpin,
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender + 'static + Send + Unpin,
|
||||
{
|
||||
fn pollable_split(
|
||||
&mut self,
|
||||
) -> (
|
||||
&mut S,
|
||||
&mut Peekable<UnboundedReceiver<OneshotDnsRequest<R>>>,
|
||||
) {
|
||||
fn pollable_split(&mut self) -> (&mut S, &mut Peekable<UnboundedReceiver<OneshotDnsRequest>>) {
|
||||
(&mut self.io_stream, &mut self.outbound_messages)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, R, TE> Future for DnsExchangeBackground<S, R, TE>
|
||||
impl<S, TE> Future for DnsExchangeBackground<S, TE>
|
||||
where
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static + Send + Unpin,
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender + 'static + Send + Unpin,
|
||||
TE: Time + Unpin,
|
||||
{
|
||||
type Output = Result<(), ProtoError>;
|
||||
@ -256,24 +230,22 @@ where
|
||||
/// The future will return a tuple of the DnsExchange (for sending messages) and a background
|
||||
/// for running the background tasks. The background is optional as only one thread should run
|
||||
/// the background. If returned, it must be spawned before any dns requests will function.
|
||||
pub struct DnsExchangeConnect<F, S, R, TE>(DnsExchangeConnectInner<F, S, R, TE>)
|
||||
pub struct DnsExchangeConnect<F, S, TE>(DnsExchangeConnectInner<F, S, TE>)
|
||||
where
|
||||
F: Future<Output = Result<S, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static,
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender + 'static,
|
||||
TE: Time + Unpin;
|
||||
|
||||
impl<F, S, R, TE> DnsExchangeConnect<F, S, R, TE>
|
||||
impl<F, S, TE> DnsExchangeConnect<F, S, TE>
|
||||
where
|
||||
F: Future<Output = Result<S, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static,
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender + 'static,
|
||||
TE: Time + Unpin,
|
||||
{
|
||||
fn connect(
|
||||
connect_future: F,
|
||||
outbound_messages: UnboundedReceiver<OneshotDnsRequest<R>>,
|
||||
sender: DnsRequestStreamHandle<R>,
|
||||
outbound_messages: UnboundedReceiver<OneshotDnsRequest>,
|
||||
sender: DnsRequestStreamHandle,
|
||||
) -> Self {
|
||||
DnsExchangeConnect(DnsExchangeConnectInner::Connecting {
|
||||
connect_future,
|
||||
@ -284,51 +256,48 @@ where
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
impl<F, S, R, TE> Future for DnsExchangeConnect<F, S, R, TE>
|
||||
impl<F, S, TE> Future for DnsExchangeConnect<F, S, TE>
|
||||
where
|
||||
F: Future<Output = Result<S, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static + Send + Unpin,
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender + 'static + Send + Unpin,
|
||||
TE: Time + Unpin,
|
||||
{
|
||||
type Output = Result<(DnsExchange<R>, DnsExchangeBackground<S, R, TE>), ProtoError>;
|
||||
type Output = Result<(DnsExchange, DnsExchangeBackground<S, TE>), ProtoError>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
self.0.poll_unpin(cx)
|
||||
}
|
||||
}
|
||||
|
||||
enum DnsExchangeConnectInner<F, S, R, TE>
|
||||
enum DnsExchangeConnectInner<F, S, TE>
|
||||
where
|
||||
F: Future<Output = Result<S, ProtoError>> + 'static + Send,
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static + Send,
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender + 'static + Send,
|
||||
TE: Time + Unpin,
|
||||
{
|
||||
Connecting {
|
||||
connect_future: F,
|
||||
outbound_messages: Option<UnboundedReceiver<OneshotDnsRequest<R>>>,
|
||||
sender: Option<DnsRequestStreamHandle<R>>,
|
||||
outbound_messages: Option<UnboundedReceiver<OneshotDnsRequest>>,
|
||||
sender: Option<DnsRequestStreamHandle>,
|
||||
},
|
||||
Connected {
|
||||
exchange: DnsExchange<R>,
|
||||
background: Option<DnsExchangeBackground<S, R, TE>>,
|
||||
exchange: DnsExchange,
|
||||
background: Option<DnsExchangeBackground<S, TE>>,
|
||||
},
|
||||
FailAll {
|
||||
error: ProtoError,
|
||||
outbound_messages: UnboundedReceiver<OneshotDnsRequest<R>>,
|
||||
outbound_messages: UnboundedReceiver<OneshotDnsRequest>,
|
||||
},
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
impl<F, S, R, TE> Future for DnsExchangeConnectInner<F, S, R, TE>
|
||||
impl<F, S, TE> Future for DnsExchangeConnectInner<F, S, TE>
|
||||
where
|
||||
F: Future<Output = Result<S, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender<DnsResponseFuture = R> + 'static + Send + Unpin,
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
S: DnsRequestSender + 'static + Send + Unpin,
|
||||
TE: Time + Unpin,
|
||||
{
|
||||
type Output = Result<(DnsExchange<R>, DnsExchangeBackground<S, R, TE>), ProtoError>;
|
||||
type Output = Result<(DnsExchange, DnsExchangeBackground<S, TE>), ProtoError>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
loop {
|
||||
|
@ -307,13 +307,11 @@ where
|
||||
S: DnsClientStream + Unpin + 'static,
|
||||
MF: MessageFinalizer + Send + Sync + 'static,
|
||||
{
|
||||
type DnsResponseFuture = DnsResponseFuture;
|
||||
|
||||
fn send_message<TE: Time>(
|
||||
&mut self,
|
||||
request: DnsRequest,
|
||||
cx: &mut Context,
|
||||
) -> Self::DnsResponseFuture {
|
||||
) -> DnsResponseFuture {
|
||||
if self.is_shutdown {
|
||||
panic!("can not send messages after stream is shutdown")
|
||||
}
|
||||
@ -391,7 +389,7 @@ where
|
||||
DnsMultiplexerSerialResponseInner::Completion(receiver).into()
|
||||
}
|
||||
|
||||
fn error_response<TE: Time>(error: ProtoError) -> Self::DnsResponseFuture {
|
||||
fn error_response<TE: Time>(error: ProtoError) -> DnsResponseFuture {
|
||||
DnsMultiplexerSerialResponseInner::Err(Some(error)).into()
|
||||
}
|
||||
|
||||
|
@ -10,7 +10,7 @@ use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use futures_channel::mpsc::{TrySendError, UnboundedSender};
|
||||
use futures_channel::oneshot::{self, Receiver, Sender};
|
||||
use futures_channel::oneshot;
|
||||
use futures_util::future::Future;
|
||||
use futures_util::ready;
|
||||
use futures_util::stream::Stream;
|
||||
@ -111,54 +111,32 @@ impl DnsStreamHandle for BufDnsStreamHandle {
|
||||
|
||||
// TODO: expose the Sink trait for this?
|
||||
/// A sender to which serialized DNS Messages can be sent
|
||||
pub struct DnsRequestStreamHandle<F>
|
||||
where
|
||||
F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
|
||||
{
|
||||
sender: UnboundedSender<OneshotDnsRequest<F>>,
|
||||
#[derive(Clone)]
|
||||
pub struct DnsRequestStreamHandle {
|
||||
sender: UnboundedSender<OneshotDnsRequest>,
|
||||
}
|
||||
|
||||
impl<F> DnsRequestStreamHandle<F>
|
||||
where
|
||||
F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
|
||||
{
|
||||
impl DnsRequestStreamHandle {
|
||||
/// Constructs a new BufStreamHandle with the associated ProtoError
|
||||
pub fn new(sender: UnboundedSender<OneshotDnsRequest<F>>) -> Self {
|
||||
pub fn new(sender: UnboundedSender<OneshotDnsRequest>) -> Self {
|
||||
DnsRequestStreamHandle { sender }
|
||||
}
|
||||
|
||||
/// see [`futures::sync::mpsc::UnboundedSender`]
|
||||
pub fn unbounded_send(
|
||||
&self,
|
||||
msg: OneshotDnsRequest<F>,
|
||||
) -> Result<(), TrySendError<OneshotDnsRequest<F>>> {
|
||||
msg: OneshotDnsRequest,
|
||||
) -> Result<(), TrySendError<OneshotDnsRequest>> {
|
||||
self.sender.unbounded_send(msg)
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> Clone for DnsRequestStreamHandle<F>
|
||||
where
|
||||
F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
DnsRequestStreamHandle {
|
||||
sender: self.sender.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Types that implement this are capable of sending a serialized DNS message on a stream
|
||||
///
|
||||
/// The underlying Stream implementation should yield `Some(())` whenever it is ready to send a message,
|
||||
/// NotReady, if it is not ready to send a message, and `Err` or `None` in the case that the stream is
|
||||
/// done, and should be shutdown.
|
||||
pub trait DnsRequestSender: Stream<Item = Result<(), ProtoError>> + Send + Unpin + 'static {
|
||||
/// A future that resolves to a response serial message
|
||||
type DnsResponseFuture: Future<Output = Result<DnsResponse, ProtoError>>
|
||||
+ 'static
|
||||
+ Send
|
||||
+ Unpin;
|
||||
|
||||
/// Send a message, and return a future of the response
|
||||
///
|
||||
/// # Return
|
||||
@ -168,10 +146,10 @@ pub trait DnsRequestSender: Stream<Item = Result<(), ProtoError>> + Send + Unpin
|
||||
&mut self,
|
||||
message: DnsRequest,
|
||||
cx: &mut Context,
|
||||
) -> Self::DnsResponseFuture;
|
||||
) -> DnsResponseFuture;
|
||||
|
||||
/// Constructs an error response
|
||||
fn error_response<TE: Time>(error: ProtoError) -> Self::DnsResponseFuture;
|
||||
fn error_response<TE: Time>(error: ProtoError) -> DnsResponseFuture;
|
||||
|
||||
/// Allows the upstream user to inform the underling stream that it should shutdown.
|
||||
///
|
||||
@ -183,34 +161,18 @@ pub trait DnsRequestSender: Stream<Item = Result<(), ProtoError>> + Send + Unpin
|
||||
}
|
||||
|
||||
/// Used for associating a name_server to a DnsRequestStreamHandle
|
||||
pub struct BufDnsRequestStreamHandle<F>
|
||||
where
|
||||
F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
|
||||
{
|
||||
sender: DnsRequestStreamHandle<F>,
|
||||
#[derive(Clone)]
|
||||
pub struct BufDnsRequestStreamHandle {
|
||||
sender: DnsRequestStreamHandle,
|
||||
}
|
||||
|
||||
impl<F> BufDnsRequestStreamHandle<F>
|
||||
where
|
||||
F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
|
||||
{
|
||||
impl BufDnsRequestStreamHandle {
|
||||
/// Construct a new BufDnsRequestStreamHandle
|
||||
pub fn new(sender: DnsRequestStreamHandle<F>) -> Self {
|
||||
pub fn new(sender: DnsRequestStreamHandle) -> Self {
|
||||
BufDnsRequestStreamHandle { sender }
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> Clone for BufDnsRequestStreamHandle<F>
|
||||
where
|
||||
F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
BufDnsRequestStreamHandle {
|
||||
sender: self.sender.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! try_oneshot {
|
||||
($expr:expr) => {{
|
||||
use std::result::Result;
|
||||
@ -227,11 +189,8 @@ macro_rules! try_oneshot {
|
||||
};
|
||||
}
|
||||
|
||||
impl<F> DnsHandle for BufDnsRequestStreamHandle<F>
|
||||
where
|
||||
F: Future<Output = Result<DnsResponse, ProtoError>> + Send + Unpin + 'static,
|
||||
{
|
||||
type Response = OneshotDnsResponseReceiver<F>;
|
||||
impl DnsHandle for BufDnsRequestStreamHandle {
|
||||
type Response = OneshotDnsResponseReceiver;
|
||||
type Error = ProtoError;
|
||||
|
||||
fn send<R: Into<DnsRequest>>(&mut self, request: R) -> Self::Response {
|
||||
@ -250,19 +209,15 @@ where
|
||||
|
||||
// TODO: this future should return the origin message in the response on errors
|
||||
/// A OneshotDnsRequest creates a channel for a response to message
|
||||
pub struct OneshotDnsRequest<F>
|
||||
where
|
||||
F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
|
||||
{
|
||||
pub struct OneshotDnsRequest {
|
||||
dns_request: DnsRequest,
|
||||
sender_for_response: Sender<F>,
|
||||
sender_for_response: oneshot::Sender<DnsResponseFuture>,
|
||||
}
|
||||
|
||||
impl<F> OneshotDnsRequest<F>
|
||||
where
|
||||
F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
|
||||
{
|
||||
fn oneshot(dns_request: DnsRequest) -> (OneshotDnsRequest<F>, oneshot::Receiver<F>) {
|
||||
impl OneshotDnsRequest {
|
||||
fn oneshot(
|
||||
dns_request: DnsRequest,
|
||||
) -> (OneshotDnsRequest, oneshot::Receiver<DnsResponseFuture>) {
|
||||
let (sender_for_response, receiver) = oneshot::channel();
|
||||
|
||||
(
|
||||
@ -274,7 +229,7 @@ where
|
||||
)
|
||||
}
|
||||
|
||||
fn unwrap(self) -> (DnsRequest, OneshotDnsResponse<F>) {
|
||||
fn unwrap(self) -> (DnsRequest, OneshotDnsResponse) {
|
||||
(
|
||||
self.dns_request,
|
||||
OneshotDnsResponse(self.sender_for_response),
|
||||
@ -282,37 +237,26 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
struct OneshotDnsResponse<F>(oneshot::Sender<F>)
|
||||
where
|
||||
F: Future<Output = Result<DnsResponse, ProtoError>> + Send;
|
||||
struct OneshotDnsResponse(oneshot::Sender<DnsResponseFuture>);
|
||||
|
||||
impl<F> OneshotDnsResponse<F>
|
||||
where
|
||||
F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
|
||||
{
|
||||
fn send_response(self, serial_response: F) -> Result<(), F> {
|
||||
impl OneshotDnsResponse {
|
||||
fn send_response(self, serial_response: DnsResponseFuture) -> Result<(), DnsResponseFuture> {
|
||||
self.0.send(serial_response)
|
||||
}
|
||||
}
|
||||
|
||||
/// A Future that wraps a oneshot::Receiver and resolves to the final value
|
||||
pub enum OneshotDnsResponseReceiver<F>
|
||||
where
|
||||
F: Future<Output = Result<DnsResponse, ProtoError>> + Send + Unpin,
|
||||
{
|
||||
pub enum OneshotDnsResponseReceiver {
|
||||
/// The receiver
|
||||
Receiver(Receiver<F>),
|
||||
Receiver(oneshot::Receiver<DnsResponseFuture>),
|
||||
/// The future once received
|
||||
Received(F),
|
||||
Received(DnsResponseFuture),
|
||||
/// Error during the send operation
|
||||
Err(Option<ProtoError>),
|
||||
}
|
||||
|
||||
impl<F> Future for OneshotDnsResponseReceiver<F>
|
||||
where
|
||||
F: Future<Output = Result<DnsResponse, ProtoError>> + Send + Unpin,
|
||||
{
|
||||
type Output = <F as Future>::Output;
|
||||
impl Future for OneshotDnsResponseReceiver {
|
||||
type Output = Result<DnsResponse, ProtoError>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
loop {
|
||||
|
@ -6,7 +6,7 @@ use std::net::SocketAddr;
|
||||
use crate::name_server::RuntimeProvider;
|
||||
use crate::tls::CLIENT_CONFIG;
|
||||
|
||||
use proto::xfer::{DnsExchange, DnsExchangeConnect, DnsResponseFuture};
|
||||
use proto::xfer::{DnsExchange, DnsExchangeConnect};
|
||||
use proto::TokioTime;
|
||||
use trust_dns_https::{HttpsClientConnect, HttpsClientStream, HttpsClientStreamBuilder};
|
||||
|
||||
@ -17,7 +17,7 @@ pub(crate) fn new_https_stream<R>(
|
||||
socket_addr: SocketAddr,
|
||||
dns_name: String,
|
||||
client_config: Option<TlsClientConfig>,
|
||||
) -> DnsExchangeConnect<HttpsClientConnect<R::Tcp>, HttpsClientStream, DnsResponseFuture, TokioTime>
|
||||
) -> DnsExchangeConnect<HttpsClientConnect<R::Tcp>, HttpsClientStream, TokioTime>
|
||||
where
|
||||
R: RuntimeProvider,
|
||||
{
|
||||
|
@ -36,9 +36,7 @@ use proto::multicast::{MdnsClientConnect, MdnsClientStream, MdnsQueryType};
|
||||
use proto::op::NoopMessageFinalizer;
|
||||
|
||||
use proto::udp::UdpClientStream;
|
||||
use proto::xfer::{
|
||||
DnsExchange, DnsExchangeSend, DnsHandle, DnsRequest, DnsResponse, DnsResponseFuture,
|
||||
};
|
||||
use proto::xfer::{DnsExchange, DnsExchangeSend, DnsHandle, DnsRequest, DnsResponse};
|
||||
|
||||
use proto::xfer::DnsMultiplexer;
|
||||
|
||||
@ -210,14 +208,7 @@ pub(crate) enum ConnectionConnect<R: RuntimeProvider>
|
||||
where
|
||||
<<R as RuntimeProvider>::Tcp as Connect>::Transport: Unpin,
|
||||
{
|
||||
Udp(
|
||||
DnsExchangeConnect<
|
||||
UdpClientConnect<R::Udp>,
|
||||
UdpClientStream<R::Udp>,
|
||||
DnsResponseFuture,
|
||||
R::Timer,
|
||||
>,
|
||||
),
|
||||
Udp(DnsExchangeConnect<UdpClientConnect<R::Udp>, UdpClientStream<R::Udp>, R::Timer>),
|
||||
Tcp(
|
||||
DnsExchangeConnect<
|
||||
DnsMultiplexerConnect<
|
||||
@ -229,7 +220,6 @@ where
|
||||
TcpClientStream<<<R as RuntimeProvider>::Tcp as Connect>::Transport>,
|
||||
NoopMessageFinalizer,
|
||||
>,
|
||||
DnsResponseFuture,
|
||||
R::Timer,
|
||||
>,
|
||||
),
|
||||
@ -255,25 +245,16 @@ where
|
||||
TcpClientStream<AsyncIo02As03<TokioTlsStream<TokioTcpStream>>>,
|
||||
NoopMessageFinalizer,
|
||||
>,
|
||||
DnsResponseFuture,
|
||||
TokioTime,
|
||||
>,
|
||||
),
|
||||
#[cfg(feature = "dns-over-https")]
|
||||
Https(
|
||||
DnsExchangeConnect<
|
||||
HttpsClientConnect<R::Tcp>,
|
||||
HttpsClientStream,
|
||||
DnsResponseFuture,
|
||||
TokioTime,
|
||||
>,
|
||||
),
|
||||
Https(DnsExchangeConnect<HttpsClientConnect<R::Tcp>, HttpsClientStream, TokioTime>),
|
||||
#[cfg(feature = "mdns")]
|
||||
Mdns(
|
||||
DnsExchangeConnect<
|
||||
DnsMultiplexerConnect<MdnsClientConnect, MdnsClientStream, NoopMessageFinalizer>,
|
||||
DnsMultiplexer<MdnsClientStream, NoopMessageFinalizer>,
|
||||
DnsResponseFuture,
|
||||
TokioTime,
|
||||
>,
|
||||
),
|
||||
@ -346,14 +327,14 @@ impl DnsHandle for GenericConnection {
|
||||
/// A representation of an established connection
|
||||
#[derive(Clone)]
|
||||
enum ConnectionConnected {
|
||||
Udp(DnsExchange<DnsResponseFuture>),
|
||||
Tcp(DnsExchange<DnsResponseFuture>),
|
||||
Udp(DnsExchange),
|
||||
Tcp(DnsExchange),
|
||||
#[cfg(feature = "dns-over-tls")]
|
||||
Tls(DnsExchange<DnsResponseFuture>),
|
||||
Tls(DnsExchange),
|
||||
#[cfg(feature = "dns-over-https")]
|
||||
Https(DnsExchange<DnsResponseFuture>),
|
||||
Https(DnsExchange),
|
||||
#[cfg(feature = "mdns")]
|
||||
Mdns(DnsExchange<DnsResponseFuture>),
|
||||
Mdns(DnsExchange),
|
||||
}
|
||||
|
||||
impl DnsHandle for ConnectionConnected {
|
||||
@ -389,14 +370,14 @@ impl DnsHandle for ConnectionConnected {
|
||||
/// A wrapper type to switch over a connection that still needs to be made, or is already established
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
enum ConnectionResponseInner {
|
||||
Udp(DnsExchangeSend<DnsResponseFuture>),
|
||||
Tcp(DnsExchangeSend<DnsResponseFuture>),
|
||||
Udp(DnsExchangeSend),
|
||||
Tcp(DnsExchangeSend),
|
||||
#[cfg(feature = "dns-over-tls")]
|
||||
Tls(DnsExchangeSend<DnsResponseFuture>),
|
||||
Tls(DnsExchangeSend),
|
||||
#[cfg(feature = "dns-over-https")]
|
||||
Https(DnsExchangeSend<DnsResponseFuture>),
|
||||
Https(DnsExchangeSend),
|
||||
#[cfg(feature = "mdns")]
|
||||
Mdns(DnsExchangeSend<DnsResponseFuture>),
|
||||
Mdns(DnsExchangeSend),
|
||||
}
|
||||
|
||||
impl Future for ConnectionResponseInner {
|
||||
|
@ -32,7 +32,7 @@ use trust_dns_client::rr::dnssec::Signer;
|
||||
use trust_dns_client::serialize::binary::*;
|
||||
use trust_dns_proto::error::ProtoError;
|
||||
use trust_dns_proto::xfer::{
|
||||
DnsClientStream, DnsMultiplexer, DnsMultiplexerConnect, DnsRequestSender, SerialMessage,
|
||||
DnsClientStream, DnsMultiplexer, DnsMultiplexerConnect, SerialMessage,
|
||||
};
|
||||
use trust_dns_proto::StreamHandle;
|
||||
|
||||
@ -256,7 +256,6 @@ impl NeverReturnsClientConnection {
|
||||
#[allow(clippy::type_complexity)]
|
||||
impl ClientConnection for NeverReturnsClientConnection {
|
||||
type Sender = DnsMultiplexer<NeverReturnsClientStream, Signer>;
|
||||
type Response = <Self::Sender as DnsRequestSender>::DnsResponseFuture;
|
||||
type SenderFuture = DnsMultiplexerConnect<
|
||||
Pin<Box<dyn Future<Output = Result<NeverReturnsClientStream, ProtoError>> + Send>>,
|
||||
NeverReturnsClientStream,
|
||||
|
@ -17,7 +17,7 @@ use futures::Future;
|
||||
use trust_dns_client::client::ClientConnection;
|
||||
use trust_dns_client::rr::dnssec::Signer;
|
||||
use trust_dns_proto::error::ProtoError;
|
||||
use trust_dns_proto::xfer::{DnsMultiplexer, DnsMultiplexerConnect, DnsRequestSender};
|
||||
use trust_dns_proto::xfer::{DnsMultiplexer, DnsMultiplexerConnect};
|
||||
|
||||
use rustls::ClientConfig;
|
||||
use trust_dns_rustls::{tls_client_connect, TlsClientStream};
|
||||
@ -49,7 +49,6 @@ impl TlsClientConnection {
|
||||
#[allow(clippy::type_complexity)]
|
||||
impl ClientConnection for TlsClientConnection {
|
||||
type Sender = DnsMultiplexer<TlsClientStream, Signer>;
|
||||
type Response = <Self::Sender as DnsRequestSender>::DnsResponseFuture;
|
||||
type SenderFuture = DnsMultiplexerConnect<
|
||||
Pin<Box<dyn Future<Output = Result<TlsClientStream, ProtoError>> + Send>>,
|
||||
TlsClientStream,
|
||||
|
@ -36,12 +36,8 @@ use trust_dns_client::rr::Record;
|
||||
use trust_dns_client::rr::{DNSClass, Name, RData, RecordSet, RecordType};
|
||||
use trust_dns_client::tcp::TcpClientStream;
|
||||
use trust_dns_client::udp::UdpClientStream;
|
||||
use trust_dns_proto::error::ProtoError;
|
||||
use trust_dns_proto::xfer::DnsResponse;
|
||||
#[cfg(feature = "dnssec")]
|
||||
use trust_dns_proto::xfer::{
|
||||
DnsExchangeBackground, DnsMultiplexer, DnsMultiplexerSerialResponse, DnsStreamHandle,
|
||||
};
|
||||
use trust_dns_proto::xfer::{DnsExchangeBackground, DnsMultiplexer, DnsStreamHandle};
|
||||
use trust_dns_proto::{iocompat::AsyncIo02As03, TokioTime};
|
||||
use trust_dns_server::authority::{Authority, Catalog};
|
||||
|
||||
@ -167,10 +163,7 @@ fn test_query_https() {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn test_query<R>(client: &mut AsyncClient<R>) -> impl Future<Output = ()>
|
||||
where
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
{
|
||||
fn test_query(client: &mut AsyncClient) -> impl Future<Output = ()> {
|
||||
let name = Name::from_ascii("WWW.example.com").unwrap();
|
||||
|
||||
client
|
||||
@ -231,10 +224,9 @@ fn test_notify() {
|
||||
#[allow(clippy::type_complexity)]
|
||||
async fn create_sig0_ready_client() -> (
|
||||
(
|
||||
AsyncClient<DnsMultiplexerSerialResponse>,
|
||||
AsyncClient,
|
||||
DnsExchangeBackground<
|
||||
DnsMultiplexer<TestClientStream, Signer, Box<dyn DnsStreamHandle>>,
|
||||
DnsMultiplexerSerialResponse,
|
||||
TokioTime,
|
||||
>,
|
||||
),
|
||||
@ -877,10 +869,7 @@ fn test_delete_all() {
|
||||
assert_eq!(result.answers().len(), 0);
|
||||
}
|
||||
|
||||
fn test_timeout_query<R>(mut client: AsyncClient<R>, mut io_loop: Runtime)
|
||||
where
|
||||
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin,
|
||||
{
|
||||
fn test_timeout_query(mut client: AsyncClient, mut io_loop: Runtime) {
|
||||
let name = Name::from_str("www.example.com").unwrap();
|
||||
|
||||
let err = io_loop
|
||||
|
@ -34,7 +34,7 @@ use trust_dns_integration::{NeverReturnsClientConnection, TestClientStream};
|
||||
use trust_dns_proto::error::ProtoError;
|
||||
#[cfg(feature = "dnssec")]
|
||||
use trust_dns_proto::op::*;
|
||||
use trust_dns_proto::xfer::{DnsMultiplexer, DnsMultiplexerConnect, DnsRequestSender};
|
||||
use trust_dns_proto::xfer::{DnsMultiplexer, DnsMultiplexerConnect};
|
||||
use trust_dns_server::authority::{Authority, Catalog};
|
||||
|
||||
pub struct TestClientConnection {
|
||||
@ -52,7 +52,6 @@ impl TestClientConnection {
|
||||
#[allow(clippy::type_complexity)]
|
||||
impl ClientConnection for TestClientConnection {
|
||||
type Sender = DnsMultiplexer<TestClientStream, Signer>;
|
||||
type Response = <Self::Sender as DnsRequestSender>::DnsResponseFuture;
|
||||
type SenderFuture = DnsMultiplexerConnect<
|
||||
Pin<Box<dyn Future<Output = Result<TestClientStream, ProtoError>> + Send>>,
|
||||
TestClientStream,
|
||||
|
@ -22,8 +22,7 @@ use trust_dns_client::rr::Name;
|
||||
use trust_dns_client::rr::{DNSClass, RData, RecordType};
|
||||
use trust_dns_client::tcp::TcpClientStream;
|
||||
|
||||
use trust_dns_proto::udp::{UdpClientConnect, UdpClientStream, UdpResponse};
|
||||
use trust_dns_proto::xfer::DnsMultiplexerSerialResponse;
|
||||
use trust_dns_proto::udp::{UdpClientConnect, UdpClientStream};
|
||||
use trust_dns_proto::DnssecDnsHandle;
|
||||
use trust_dns_proto::{iocompat::AsyncIo02As03, TokioTime};
|
||||
use trust_dns_server::authority::{Authority, Catalog};
|
||||
@ -196,7 +195,7 @@ where
|
||||
|
||||
fn with_nonet<F>(test: F)
|
||||
where
|
||||
F: Fn(DnssecDnsHandle<MemoizeClientHandle<AsyncClient<DnsMultiplexerSerialResponse>>>, Runtime),
|
||||
F: Fn(DnssecDnsHandle<MemoizeClientHandle<AsyncClient>>, Runtime),
|
||||
{
|
||||
let succeeded = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
|
||||
let succeeded_clone = succeeded.clone();
|
||||
@ -254,7 +253,7 @@ where
|
||||
|
||||
fn with_udp<F>(test: F)
|
||||
where
|
||||
F: Fn(DnssecDnsHandle<MemoizeClientHandle<AsyncClient<UdpResponse>>>, Runtime),
|
||||
F: Fn(DnssecDnsHandle<MemoizeClientHandle<AsyncClient>>, Runtime),
|
||||
{
|
||||
let succeeded = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
|
||||
let succeeded_clone = succeeded.clone();
|
||||
@ -290,7 +289,7 @@ where
|
||||
|
||||
fn with_tcp<F>(test: F)
|
||||
where
|
||||
F: Fn(DnssecDnsHandle<MemoizeClientHandle<AsyncClient<DnsMultiplexerSerialResponse>>>, Runtime),
|
||||
F: Fn(DnssecDnsHandle<MemoizeClientHandle<AsyncClient>>, Runtime),
|
||||
{
|
||||
let succeeded = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
|
||||
let succeeded_clone = succeeded.clone();
|
||||
|
@ -220,8 +220,7 @@ fn lazy_tls_client(ipaddr: SocketAddr, dns_name: String, cert_der: Vec<u8>) -> T
|
||||
|
||||
fn client_thread_www<C: ClientConnection>(conn: C)
|
||||
where
|
||||
C::Sender: DnsRequestSender<DnsResponseFuture = C::Response>,
|
||||
C::Response: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send,
|
||||
C::Sender: DnsRequestSender,
|
||||
C::SenderFuture: Future<Output = Result<C::Sender, ProtoError>> + 'static + Send,
|
||||
{
|
||||
let name = Name::from_str("www.example.com").unwrap();
|
||||
|
Loading…
Reference in New Issue
Block a user