Simplify server logging to single logline per request
This commit is contained in:
parent
04ab19ad1c
commit
d9dcd8bfb0
@ -16,6 +16,9 @@ All notes should be prepended with the location of the change, e.g. `(proto)` or
|
||||
|
||||
### Changed
|
||||
|
||||
- (server) Multiple queries in a message will always result in a FormError now #1554
|
||||
- (server) `ServerFuture` and other `Catalog` related API changes #1554
|
||||
- (server) `ResponseHandler` now must return a `ResponseInfo` to allow for more consistent logging #1554
|
||||
- (resolver) Correct behavior around trust_nx_responses (@peterthejohnston) #1556
|
||||
- (server) `ResponseHandler` trait is now `async_trait`, requires all impls to be annotated with `#[async_trait]` #1550
|
||||
- (server) `Authority` impls required to be internally modifiable and `Send + Sync` #1550
|
||||
|
@ -1,22 +1,13 @@
|
||||
/*
|
||||
* Copyright (C) 2015 Benjamin Fry <benjaminfry@me.com>
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
// Copyright 2015-2021 Benjamin Fry <benjaminfry@me.com>
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
|
||||
// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
|
||||
// http://opensource.org/licenses/MIT>, at your option. This file may not be
|
||||
// copied, modified, or distributed except according to those terms.
|
||||
|
||||
//! Message metadata
|
||||
|
||||
use std::convert::From;
|
||||
use std::{convert::From, fmt};
|
||||
|
||||
use super::op_code::OpCode;
|
||||
use super::response_code::ResponseCode;
|
||||
@ -82,6 +73,63 @@ pub enum MessageType {
|
||||
Response,
|
||||
}
|
||||
|
||||
impl fmt::Display for MessageType {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
|
||||
let s = match self {
|
||||
MessageType::Query => "QUERY",
|
||||
MessageType::Response => "RESPONSE",
|
||||
};
|
||||
|
||||
write!(f, "{}", s)
|
||||
}
|
||||
}
|
||||
|
||||
/// All the flags of the request/response header
|
||||
#[derive(Clone, Copy)]
|
||||
pub struct Flags {
|
||||
authoritative: bool,
|
||||
truncation: bool,
|
||||
recursion_desired: bool,
|
||||
recursion_available: bool,
|
||||
authentic_data: bool,
|
||||
checking_disabled: bool,
|
||||
}
|
||||
|
||||
/// We are following the `dig` commands display format for the header flags
|
||||
///
|
||||
/// Example: "RD,AA,RA;" is Recursion-Desired, Authoritative-Answer, Recursion-Available.
|
||||
impl fmt::Display for Flags {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
|
||||
const SEPARATOR: &str = ",";
|
||||
let mut insert_separator = ""; // initially empty
|
||||
if self.recursion_desired {
|
||||
write!(f, "RD")?;
|
||||
insert_separator = SEPARATOR;
|
||||
}
|
||||
if self.checking_disabled {
|
||||
write!(f, "{}CD", insert_separator)?;
|
||||
insert_separator = SEPARATOR;
|
||||
}
|
||||
if self.truncation {
|
||||
write!(f, "{}TC", insert_separator)?;
|
||||
insert_separator = SEPARATOR;
|
||||
}
|
||||
if self.authoritative {
|
||||
write!(f, "{}AA", insert_separator)?;
|
||||
insert_separator = SEPARATOR;
|
||||
}
|
||||
if self.recursion_available {
|
||||
write!(f, "{}RA", insert_separator)?;
|
||||
insert_separator = SEPARATOR;
|
||||
}
|
||||
if self.authentic_data {
|
||||
write!(f, "{}AD", insert_separator)?
|
||||
}
|
||||
|
||||
write!(f, ";")
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Header {
|
||||
fn default() -> Self {
|
||||
Header {
|
||||
@ -209,6 +257,18 @@ impl Header {
|
||||
self
|
||||
}
|
||||
|
||||
/// A method to get all header flags (useful for Display purposes)
|
||||
pub fn flags(&self) -> Flags {
|
||||
Flags {
|
||||
authoritative: self.authoritative,
|
||||
authentic_data: self.authentic_data,
|
||||
checking_disabled: self.checking_disabled,
|
||||
recursion_available: self.recursion_available,
|
||||
recursion_desired: self.recursion_desired,
|
||||
truncation: self.truncation,
|
||||
}
|
||||
}
|
||||
|
||||
/// The low response code (original response codes before EDNS extensions)
|
||||
pub fn set_response_code(&mut self, response_code: ResponseCode) -> &mut Self {
|
||||
self.response_code = response_code;
|
||||
|
@ -880,6 +880,10 @@ impl<'e, I: Iterator<Item = &'e E>, E: 'e + BinEncodable> EmitAndCount for I {
|
||||
}
|
||||
|
||||
/// Emits the different sections of a message properly
|
||||
///
|
||||
/// # Return
|
||||
///
|
||||
/// In the case of a successful emit, the final header (updated counts, etc) is returned for help with logging, etc.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn emit_message_parts<Q, A, N, D>(
|
||||
header: &Header,
|
||||
@ -890,7 +894,7 @@ pub fn emit_message_parts<Q, A, N, D>(
|
||||
edns: Option<&Edns>,
|
||||
signature: &[Record],
|
||||
encoder: &mut BinEncoder<'_>,
|
||||
) -> ProtoResult<()>
|
||||
) -> ProtoResult<Header>
|
||||
where
|
||||
Q: EmitAndCount,
|
||||
A: EmitAndCount,
|
||||
@ -940,8 +944,9 @@ where
|
||||
let was_truncated =
|
||||
header.truncated() || answer_count.1 || nameserver_count.1 || additional_count.1;
|
||||
|
||||
place.replace(encoder, update_header_counts(header, was_truncated, counts))?;
|
||||
Ok(())
|
||||
let final_header = update_header_counts(header, was_truncated, counts);
|
||||
place.replace(encoder, final_header)?;
|
||||
Ok(final_header)
|
||||
}
|
||||
|
||||
impl BinEncodable for Message {
|
||||
@ -955,7 +960,9 @@ impl BinEncodable for Message {
|
||||
self.edns.as_ref(),
|
||||
&self.signature,
|
||||
encoder,
|
||||
)
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -16,7 +16,7 @@
|
||||
|
||||
//! Operation code for queries, updates, and responses
|
||||
|
||||
use std::convert::From;
|
||||
use std::{convert::From, fmt};
|
||||
|
||||
use crate::error::*;
|
||||
|
||||
@ -53,6 +53,19 @@ pub enum OpCode {
|
||||
Update,
|
||||
}
|
||||
|
||||
impl fmt::Display for OpCode {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
|
||||
let s = match self {
|
||||
OpCode::Query => "QUERY",
|
||||
OpCode::Status => "STATUS",
|
||||
OpCode::Notify => "NOTIFY",
|
||||
OpCode::Update => "UPDATE",
|
||||
};
|
||||
|
||||
write!(f, "{}", s)
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert from `OpCode` to `u8`
|
||||
///
|
||||
/// ```
|
||||
|
@ -27,7 +27,7 @@ use crate::{
|
||||
op::{Edns, Header, LowerQuery, MessageType, OpCode, ResponseCode},
|
||||
rr::{LowerName, RecordType},
|
||||
},
|
||||
server::{Request, RequestHandler, ResponseHandler},
|
||||
server::{Request, RequestHandler, ResponseHandler, ResponseInfo},
|
||||
};
|
||||
|
||||
/// Set of authorities, zones, available to this server.
|
||||
@ -41,7 +41,7 @@ async fn send_response<R: ResponseHandler>(
|
||||
response_edns: Option<Edns>,
|
||||
mut response: MessageResponse<'_, '_>,
|
||||
mut response_handle: R,
|
||||
) -> io::Result<()> {
|
||||
) -> io::Result<ResponseInfo> {
|
||||
#[cfg(feature = "dnssec")]
|
||||
if let Some(mut resp_edns) = response_edns {
|
||||
// set edns DAU and DHU
|
||||
@ -72,7 +72,11 @@ impl RequestHandler for Catalog {
|
||||
///
|
||||
/// * `request` - the requested action to perform.
|
||||
/// * `response_handle` - sink for the response message to be sent
|
||||
async fn handle_request<R: ResponseHandler>(&self, request: Request, mut response_handle: R) {
|
||||
async fn handle_request<R: ResponseHandler>(
|
||||
&self,
|
||||
request: Request,
|
||||
mut response_handle: R,
|
||||
) -> ResponseInfo {
|
||||
let request_message = request.message;
|
||||
trace!("request: {:?}", request_message);
|
||||
|
||||
@ -105,10 +109,14 @@ impl RequestHandler for Catalog {
|
||||
let result = response_handle
|
||||
.send_response(response.build_no_records(response_header))
|
||||
.await;
|
||||
if let Err(e) = result {
|
||||
error!("request error: {}", e);
|
||||
|
||||
match result {
|
||||
Err(e) => {
|
||||
error!("request error: {}", e);
|
||||
return ResponseInfo::serve_failed();
|
||||
}
|
||||
Ok(info) => return info,
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
response_edns = Some(resp_edns);
|
||||
@ -122,10 +130,11 @@ impl RequestHandler for Catalog {
|
||||
MessageType::Query => match request_message.op_code() {
|
||||
OpCode::Query => {
|
||||
debug!("query received: {}", request_message.id());
|
||||
self.lookup(request_message, response_edns, response_handle)
|
||||
let info = self
|
||||
.lookup(request_message, response_edns, response_handle)
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
Ok(info)
|
||||
}
|
||||
OpCode::Update => {
|
||||
debug!("update received: {}", request_message.id());
|
||||
@ -156,8 +165,12 @@ impl RequestHandler for Catalog {
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = result {
|
||||
error!("request failed: {}", e);
|
||||
match result {
|
||||
Err(e) => {
|
||||
error!("request failed: {}", e);
|
||||
ResponseInfo::serve_failed()
|
||||
}
|
||||
Ok(info) => info,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -239,7 +252,7 @@ impl Catalog {
|
||||
update: &MessageRequest,
|
||||
response_edns: Option<Edns>,
|
||||
response_handle: R,
|
||||
) -> io::Result<()> {
|
||||
) -> io::Result<ResponseInfo> {
|
||||
let zones: &[LowerQuery] = update.queries();
|
||||
|
||||
// 2.3 - Zone Section
|
||||
@ -329,7 +342,7 @@ impl Catalog {
|
||||
request: MessageRequest,
|
||||
response_edns: Option<Edns>,
|
||||
response_handle: R,
|
||||
) {
|
||||
) -> ResponseInfo {
|
||||
// find matching authorities for the request
|
||||
let queries_and_authorities = request
|
||||
.queries()
|
||||
@ -388,13 +401,18 @@ async fn lookup<R: ResponseHandler + Unpin>(
|
||||
request: MessageRequest,
|
||||
response_edns: Option<Edns>,
|
||||
response_handle: R,
|
||||
) {
|
||||
) -> ResponseInfo {
|
||||
// TODO: the spec is very unclear on what to do with multiple queries
|
||||
// we will search for each, in the future, maybe make this threaded to respond even faster.
|
||||
// the current impl will return on the first query result
|
||||
|
||||
// This will return the most recent response...
|
||||
// FIXME: maybe we should move the response reporting into the ResponseHandler in some way?
|
||||
let mut info: ResponseInfo = ResponseInfo::serve_failed();
|
||||
|
||||
for (query_idx, authority) in queries_and_authorities {
|
||||
let query = &request.queries()[query_idx];
|
||||
info!(
|
||||
debug!(
|
||||
"request: {} found authority: {}",
|
||||
request.id(),
|
||||
authority.origin()
|
||||
@ -418,10 +436,17 @@ async fn lookup<R: ResponseHandler + Unpin>(
|
||||
);
|
||||
|
||||
let result = send_response(response_edns.clone(), response, response_handle.clone()).await;
|
||||
if let Err(e) = result {
|
||||
error!("error sending response: {}", e);
|
||||
|
||||
match result {
|
||||
Err(e) => {
|
||||
error!("error sending response: {}", e);
|
||||
info = ResponseInfo::serve_failed()
|
||||
}
|
||||
Ok(i) => info = i,
|
||||
}
|
||||
}
|
||||
|
||||
info
|
||||
}
|
||||
|
||||
#[allow(unused_variables)]
|
||||
|
@ -293,7 +293,9 @@ impl BinEncodable for MessageRequest {
|
||||
self.edns.as_ref(),
|
||||
&self.sig0,
|
||||
encoder,
|
||||
)
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -12,6 +12,7 @@ use crate::proto::op::message::EmitAndCount;
|
||||
use crate::proto::op::{message, Edns, Header, ResponseCode};
|
||||
use crate::proto::rr::Record;
|
||||
use crate::proto::serialize::binary::BinEncoder;
|
||||
use crate::server::ResponseInfo;
|
||||
|
||||
/// A EncodableMessage with borrowed data for Responses in the Server
|
||||
#[derive(Debug)]
|
||||
@ -79,7 +80,7 @@ where
|
||||
}
|
||||
|
||||
/// Consumes self, and emits to the encoder.
|
||||
pub fn destructive_emit(mut self, encoder: &mut BinEncoder<'_>) -> ProtoResult<()> {
|
||||
pub fn destructive_emit(mut self, encoder: &mut BinEncoder<'_>) -> ProtoResult<ResponseInfo> {
|
||||
// soa records are part of the nameserver section
|
||||
let mut name_servers = self.name_servers.chain(self.soa);
|
||||
|
||||
@ -93,6 +94,7 @@ where
|
||||
&self.sig0,
|
||||
encoder,
|
||||
)
|
||||
.map(Into::into)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -16,7 +16,10 @@ use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use crate::{
|
||||
authority::{MessageRequest, MessageResponse},
|
||||
proto::{https::https_server, serialize::binary::BinDecodable},
|
||||
server::{request_handler::RequestHandler, response_handler::ResponseHandler, server_future},
|
||||
server::{
|
||||
request_handler::RequestHandler, response_handler::ResponseHandler, server_future,
|
||||
Protocol, ResponseInfo,
|
||||
},
|
||||
};
|
||||
|
||||
pub(crate) async fn h2_handler<T, I>(
|
||||
@ -82,7 +85,7 @@ async fn handle_request<T>(
|
||||
|
||||
debug!("received message: {:?}", message);
|
||||
|
||||
server_future::handle_request(message, src_addr, handler, responder).await
|
||||
server_future::handle_request(message, src_addr, Protocol::Https, handler, responder).await
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@ -90,16 +93,19 @@ struct HttpsResponseHandle(Arc<Mutex<::h2::server::SendResponse<Bytes>>>);
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ResponseHandler for HttpsResponseHandle {
|
||||
async fn send_response(&mut self, response: MessageResponse<'_, '_>) -> io::Result<()> {
|
||||
async fn send_response(
|
||||
&mut self,
|
||||
response: MessageResponse<'_, '_>,
|
||||
) -> io::Result<ResponseInfo> {
|
||||
use crate::proto::https::response;
|
||||
use crate::proto::https::HttpsError;
|
||||
use crate::proto::serialize::binary::BinEncoder;
|
||||
|
||||
let mut bytes = Vec::with_capacity(512);
|
||||
// mut block
|
||||
{
|
||||
let info = {
|
||||
let mut encoder = BinEncoder::new(&mut bytes);
|
||||
response.destructive_emit(&mut encoder)?;
|
||||
response.destructive_emit(&mut encoder)?
|
||||
};
|
||||
let bytes = Bytes::from(bytes);
|
||||
let response = response::new(bytes.len())?;
|
||||
@ -113,6 +119,6 @@ impl ResponseHandler for HttpsResponseHandle {
|
||||
.map_err(HttpsError::from)?;
|
||||
stream.send_data(bytes, true).map_err(HttpsError::from)?;
|
||||
|
||||
Ok(())
|
||||
Ok(info)
|
||||
}
|
||||
}
|
||||
|
@ -9,12 +9,14 @@
|
||||
|
||||
#[cfg(feature = "dns-over-https")]
|
||||
mod https_handler;
|
||||
mod protocol;
|
||||
mod request_handler;
|
||||
mod response_handler;
|
||||
mod server_future;
|
||||
mod timeout_stream;
|
||||
|
||||
pub use self::request_handler::{Request, RequestHandler};
|
||||
pub use self::protocol::Protocol;
|
||||
pub use self::request_handler::{Request, RequestHandler, ResponseInfo};
|
||||
pub use self::response_handler::{ResponseHandle, ResponseHandler};
|
||||
pub use self::server_future::ServerFuture;
|
||||
pub use self::timeout_stream::TimeoutStream;
|
||||
|
37
crates/server/src/server/protocol.rs
Normal file
37
crates/server/src/server/protocol.rs
Normal file
@ -0,0 +1,37 @@
|
||||
use std::fmt;
|
||||
|
||||
/// For tracking purposes of inbound requests, which protocol was used
|
||||
#[non_exhaustive]
|
||||
#[derive(Clone, Copy)]
|
||||
pub enum Protocol {
|
||||
/// User Datagram Protocol, the default for all DNS requests
|
||||
Udp,
|
||||
/// Transmission Control Protocol, used in DNS primarily for large responses (avoids truncation) and AXFR/IXFR
|
||||
Tcp,
|
||||
/// Transport Layer Security over TCP, for establishing a privacy, DoT (similar to DoH)
|
||||
Tls,
|
||||
/// Datagram Transport Layer Security over UDP
|
||||
Dtls,
|
||||
/// HTTP over TLS, DNS over HTTPS, aka DoH (similar to DoT)
|
||||
Https,
|
||||
}
|
||||
|
||||
impl fmt::Display for Protocol {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
|
||||
let s = match self {
|
||||
Protocol::Udp => "UDP",
|
||||
Protocol::Tcp => "TCP",
|
||||
Protocol::Tls => "TLS",
|
||||
Protocol::Dtls => "DTLS",
|
||||
Protocol::Https => "HTTPS",
|
||||
};
|
||||
|
||||
write!(f, "{}", s)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Protocol {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
|
||||
fmt::Display::fmt(self, f)
|
||||
}
|
||||
}
|
@ -9,14 +9,59 @@
|
||||
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use crate::{authority::MessageRequest, server::ResponseHandler};
|
||||
use crate::{
|
||||
authority::MessageRequest,
|
||||
proto::op::{Header, ResponseCode},
|
||||
server::{Protocol, ResponseHandler},
|
||||
};
|
||||
|
||||
/// An incoming request to the DNS catalog
|
||||
#[non_exhaustive]
|
||||
pub struct Request {
|
||||
/// Message with the associated query or update data
|
||||
pub message: MessageRequest,
|
||||
/// Source address of the Client
|
||||
pub src: SocketAddr,
|
||||
/// Protocol of the request
|
||||
pub protocol: Protocol,
|
||||
}
|
||||
|
||||
impl Request {
|
||||
/// Build a new requests with the inbound message, source address, and protocol.
|
||||
pub fn new(message: MessageRequest, src: SocketAddr, protocol: Protocol) -> Self {
|
||||
Self {
|
||||
message,
|
||||
src,
|
||||
protocol,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Information about the response sent for a request
|
||||
#[derive(Clone, Copy)]
|
||||
#[repr(transparent)]
|
||||
pub struct ResponseInfo(Header);
|
||||
|
||||
impl ResponseInfo {
|
||||
pub(crate) fn serve_failed() -> ResponseInfo {
|
||||
let mut header = Header::new();
|
||||
header.set_response_code(ResponseCode::ServFail);
|
||||
header.into()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Header> for ResponseInfo {
|
||||
fn from(header: Header) -> Self {
|
||||
ResponseInfo(header)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for ResponseInfo {
|
||||
type Target = Header;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
/// Trait for handling incoming requests, and providing a message response.
|
||||
@ -28,5 +73,9 @@ pub trait RequestHandler: Send + Sync + Unpin + 'static {
|
||||
///
|
||||
/// * `request` - the requested action to perform.
|
||||
/// * `response_handle` - handle to which a return message should be sent
|
||||
async fn handle_request<R: ResponseHandler>(&self, request: Request, response_handle: R);
|
||||
async fn handle_request<R: ResponseHandler>(
|
||||
&self,
|
||||
request: Request,
|
||||
response_handle: R,
|
||||
) -> ResponseInfo;
|
||||
}
|
||||
|
@ -8,12 +8,13 @@
|
||||
use std::io;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use log::info;
|
||||
use log::debug;
|
||||
|
||||
use crate::authority::MessageResponse;
|
||||
use crate::client::serialize::binary::BinEncoder;
|
||||
use crate::proto::xfer::SerialMessage;
|
||||
use crate::proto::{BufDnsStreamHandle, DnsStreamHandle};
|
||||
use crate::server::ResponseInfo;
|
||||
|
||||
/// A handler for send a response to a client
|
||||
#[async_trait::async_trait]
|
||||
@ -24,7 +25,10 @@ pub trait ResponseHandler: Clone + Send + Sync + Unpin + 'static {
|
||||
/// Serializes and sends a message to to the wrapped handle
|
||||
///
|
||||
/// self is consumed as only one message should ever be sent in response to a Request
|
||||
async fn send_response(&mut self, response: MessageResponse<'_, '_>) -> io::Result<()>;
|
||||
async fn send_response(
|
||||
&mut self,
|
||||
response: MessageResponse<'_, '_>,
|
||||
) -> io::Result<ResponseInfo>;
|
||||
}
|
||||
|
||||
/// A handler for wrapping a BufStreamHandle, which will properly serialize the message and add the
|
||||
@ -47,8 +51,11 @@ impl ResponseHandler for ResponseHandle {
|
||||
/// Serializes and sends a message to to the wrapped handle
|
||||
///
|
||||
/// self is consumed as only one message should ever be sent in response to a Request
|
||||
async fn send_response(&mut self, response: MessageResponse<'_, '_>) -> io::Result<()> {
|
||||
info!(
|
||||
async fn send_response(
|
||||
&mut self,
|
||||
response: MessageResponse<'_, '_>,
|
||||
) -> io::Result<ResponseInfo> {
|
||||
debug!(
|
||||
"response: {} response_code: {}",
|
||||
response.header().id(),
|
||||
response.header().response_code(),
|
||||
@ -59,7 +66,7 @@ impl ResponseHandler for ResponseHandle {
|
||||
response.destructive_emit(&mut encoder)
|
||||
};
|
||||
|
||||
encode_result.map_err(|e| {
|
||||
let info = encode_result.map_err(|e| {
|
||||
io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
format!("error encoding message: {}", e),
|
||||
@ -68,6 +75,8 @@ impl ResponseHandler for ResponseHandle {
|
||||
|
||||
self.stream_handle
|
||||
.send(SerialMessage::new(buffer, self.dst))
|
||||
.map_err(|_| io::Error::new(io::ErrorKind::Other, "unknown"))
|
||||
.map_err(|_| io::Error::new(io::ErrorKind::Other, "unknown"))?;
|
||||
|
||||
Ok(info)
|
||||
}
|
||||
}
|
||||
|
@ -11,19 +11,24 @@ use log::{debug, info, warn};
|
||||
#[cfg(feature = "dns-over-rustls")]
|
||||
use rustls::{Certificate, PrivateKey};
|
||||
use tokio::{net, task::JoinHandle};
|
||||
use trust_dns_client::op::{LowerQuery, Query};
|
||||
|
||||
use crate::authority::MessageRequest;
|
||||
use crate::proto::error::ProtoError;
|
||||
use crate::proto::iocompat::AsyncIoTokioAsStd;
|
||||
use crate::proto::op::Edns;
|
||||
#[cfg(all(feature = "dns-over-openssl", not(feature = "dns-over-rustls")))]
|
||||
use crate::proto::openssl::tls_server::*;
|
||||
use crate::proto::serialize::binary::{BinDecodable, BinDecoder};
|
||||
use crate::proto::tcp::TcpStream;
|
||||
use crate::proto::udp::UdpStream;
|
||||
use crate::proto::xfer::SerialMessage;
|
||||
use crate::proto::BufDnsStreamHandle;
|
||||
use crate::server::{Request, RequestHandler, ResponseHandle, ResponseHandler, TimeoutStream};
|
||||
use crate::{
|
||||
authority::MessageRequest,
|
||||
proto::{
|
||||
error::ProtoError,
|
||||
iocompat::AsyncIoTokioAsStd,
|
||||
op::Edns,
|
||||
serialize::binary::{BinDecodable, BinDecoder},
|
||||
tcp::TcpStream,
|
||||
udp::UdpStream,
|
||||
xfer::SerialMessage,
|
||||
BufDnsStreamHandle,
|
||||
},
|
||||
server::{Protocol, Request, RequestHandler, ResponseHandle, ResponseHandler, TimeoutStream},
|
||||
};
|
||||
|
||||
// TODO, would be nice to have a Slab for buffers here...
|
||||
|
||||
@ -71,7 +76,8 @@ impl<T: RequestHandler> ServerFuture<T> {
|
||||
let stream_handle = stream_handle.with_remote_addr(src_addr);
|
||||
|
||||
tokio::spawn(async move {
|
||||
self::handle_raw_request(message, handler, stream_handle).await
|
||||
self::handle_raw_request(message, Protocol::Udp, handler, stream_handle)
|
||||
.await;
|
||||
});
|
||||
}
|
||||
|
||||
@ -147,6 +153,7 @@ impl<T: RequestHandler> ServerFuture<T> {
|
||||
// we don't spawn here to limit clients from getting too many resources
|
||||
self::handle_raw_request(
|
||||
message,
|
||||
Protocol::Tcp,
|
||||
handler.clone(),
|
||||
stream_handle.clone(),
|
||||
)
|
||||
@ -408,6 +415,7 @@ impl<T: RequestHandler> ServerFuture<T> {
|
||||
|
||||
self::handle_raw_request(
|
||||
message,
|
||||
Protocol::Tls,
|
||||
handler.clone(),
|
||||
stream_handle.clone(),
|
||||
)
|
||||
@ -553,6 +561,7 @@ impl<T: RequestHandler> ServerFuture<T> {
|
||||
|
||||
pub(crate) async fn handle_raw_request<T: RequestHandler>(
|
||||
message: SerialMessage,
|
||||
protocol: Protocol,
|
||||
request_handler: Arc<Mutex<T>>,
|
||||
response_handler: BufDnsStreamHandle,
|
||||
) {
|
||||
@ -566,7 +575,14 @@ pub(crate) async fn handle_raw_request<T: RequestHandler>(
|
||||
let mut decoder = BinDecoder::new(message.bytes());
|
||||
match MessageRequest::read(&mut decoder) {
|
||||
Ok(message) => {
|
||||
self::handle_request(message, src_addr, request_handler, response_handler).await
|
||||
self::handle_request(
|
||||
message,
|
||||
src_addr,
|
||||
protocol,
|
||||
request_handler,
|
||||
response_handler,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
// FIXME: return the error and properly log it in handle_request?
|
||||
Err(e) => warn!("failed to handle message: {}", e),
|
||||
@ -576,31 +592,62 @@ pub(crate) async fn handle_raw_request<T: RequestHandler>(
|
||||
pub(crate) async fn handle_request<R: ResponseHandler, T: RequestHandler>(
|
||||
message: MessageRequest,
|
||||
src_addr: SocketAddr,
|
||||
protocol: Protocol,
|
||||
request_handler: Arc<Mutex<T>>,
|
||||
response_handler: R,
|
||||
) {
|
||||
let request = Request {
|
||||
message,
|
||||
src: src_addr,
|
||||
};
|
||||
let request = Request::new(message, src_addr, protocol);
|
||||
|
||||
info!(
|
||||
"request: {} type: {:?} op_code: {:?} dnssec: {} {}",
|
||||
request.message.id(),
|
||||
request.message.message_type(),
|
||||
request.message.op_code(),
|
||||
request.message.edns().map_or(false, Edns::dnssec_ok),
|
||||
request
|
||||
.message
|
||||
.queries()
|
||||
.first()
|
||||
.map(|q| q.original().to_string())
|
||||
.unwrap_or_else(|| "empty_queries".to_string()),
|
||||
let id = request.message.id();
|
||||
let query = request.message.queries().first().map(LowerQuery::original);
|
||||
let query_name: String = query
|
||||
.map(Query::name)
|
||||
.map(ToString::to_string)
|
||||
.unwrap_or_default();
|
||||
let query_type = query
|
||||
.map(Query::query_type)
|
||||
.as_ref()
|
||||
.map(ToString::to_string)
|
||||
.unwrap_or_default();
|
||||
let query_class = query
|
||||
.map(Query::query_class)
|
||||
.as_ref()
|
||||
.map(ToString::to_string)
|
||||
.unwrap_or_default();
|
||||
let qflags = request.message.header().flags();
|
||||
let qop_code = request.message.op_code();
|
||||
|
||||
debug!(
|
||||
"request:{id} src:{proto}://{addr}#{port} type:{message_type} dnssec:{is_dnssec} {op}:{query}:{qtype}:{class} qflags:{qflags}",
|
||||
id = request.message.id(),
|
||||
proto = protocol, addr = src_addr.ip(), port = src_addr.port(),
|
||||
message_type= request.message.message_type(),
|
||||
is_dnssec = request.message.edns().map_or(false, Edns::dnssec_ok),
|
||||
op = qop_code,
|
||||
query = query_name,
|
||||
qtype = query_type,
|
||||
class = query_class,
|
||||
qflags = qflags,
|
||||
);
|
||||
|
||||
request_handler
|
||||
let response_info = request_handler
|
||||
.lock()
|
||||
.await
|
||||
.handle_request(request, response_handler)
|
||||
.await
|
||||
.await;
|
||||
|
||||
let rid = response_info.id();
|
||||
if id != rid {
|
||||
warn!("request id:{} does not match response id:{}", id, rid);
|
||||
debug_assert_eq!(id, rid, "request id and response id should match");
|
||||
}
|
||||
|
||||
let rflags = response_info.flags();
|
||||
let answer_count = response_info.answer_count();
|
||||
let authority_count = response_info.name_server_count();
|
||||
let additional_count = response_info.additional_count();
|
||||
let response_code = response_info.response_code();
|
||||
|
||||
info!("request:{id} src:{proto}://{addr}#{port} {op}:{query}:{qtype}:{class} qflags:{qflags} response:{code} rr:{answers}/{authorities}/{additionals} rflags: {rflags}",
|
||||
id = rid, proto = protocol, addr = src_addr.ip(), port = src_addr.port(), op = qop_code, query = query_name, qtype = query_type, class = query_class, qflags = qflags, code = response_code, answers = answer_count, authorities = authority_count, additionals = additional_count, rflags = rflags);
|
||||
}
|
||||
|
@ -7,7 +7,7 @@
|
||||
|
||||
use std::io;
|
||||
|
||||
use log::info;
|
||||
use log::{debug, info};
|
||||
|
||||
use crate::{
|
||||
authority::{
|
||||
@ -105,9 +105,9 @@ impl Authority for ForwardAuthority {
|
||||
_lookup_options: LookupOptions,
|
||||
) -> Result<Self::Lookup, LookupError> {
|
||||
// TODO: make this an error?
|
||||
assert!(self.origin.zone_of(name));
|
||||
debug_assert!(self.origin.zone_of(name));
|
||||
|
||||
info!("forwarding lookup: {} {}", name, rtype);
|
||||
debug!("forwarding lookup: {} {}", name, rtype);
|
||||
let name: LowerName = name.clone();
|
||||
let resolve = self.resolver.lookup(name, rtype, Default::default()).await;
|
||||
|
||||
|
@ -26,6 +26,8 @@ use trust_dns_proto::TokioTime;
|
||||
use trust_dns_proto::{error::ProtoError, BufDnsStreamHandle};
|
||||
|
||||
use trust_dns_server::authority::{Catalog, MessageRequest, MessageResponse};
|
||||
use trust_dns_server::server::Protocol;
|
||||
use trust_dns_server::server::ResponseInfo;
|
||||
use trust_dns_server::server::{Request, RequestHandler, ResponseHandler};
|
||||
|
||||
pub mod authority;
|
||||
@ -98,15 +100,18 @@ impl TestResponseHandler {
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ResponseHandler for TestResponseHandler {
|
||||
async fn send_response(&mut self, response: MessageResponse<'_, '_>) -> io::Result<()> {
|
||||
async fn send_response(
|
||||
&mut self,
|
||||
response: MessageResponse<'_, '_>,
|
||||
) -> io::Result<ResponseInfo> {
|
||||
let buf = &mut self.buf.lock().unwrap();
|
||||
buf.clear();
|
||||
let mut encoder = BinEncoder::new(buf);
|
||||
response
|
||||
let info = response
|
||||
.destructive_emit(&mut encoder)
|
||||
.expect("could not encode");
|
||||
self.message_ready.store(true, Ordering::Release);
|
||||
Ok(())
|
||||
Ok(info)
|
||||
}
|
||||
}
|
||||
|
||||
@ -137,10 +142,7 @@ impl Stream for TestClientStream {
|
||||
let src_addr = SocketAddr::from(([127, 0, 0, 1], 1234));
|
||||
|
||||
let message = MessageRequest::read(&mut decoder).expect("could not decode message");
|
||||
let request = Request {
|
||||
message,
|
||||
src: src_addr,
|
||||
};
|
||||
let request = Request::new(message, src_addr, Protocol::Udp);
|
||||
|
||||
let response_handler = TestResponseHandler::new();
|
||||
block_on(
|
||||
|
Loading…
Reference in New Issue
Block a user