update proto to new std::future

proto types all updated

all types updated to new tokio and futures

tcp_stream compiling

wip: most types updated

tcp stream updated to std::future

proto crate compiling, need to fix some udp apis

proto compiling and all tests passing

mdns and secure socket handle compiling
This commit is contained in:
Benjamin Fry 2019-08-15 20:05:45 -04:00
parent 4858edd3e7
commit 0389abe243
16 changed files with 1139 additions and 741 deletions

338
Cargo.lock generated
View File

@ -24,6 +24,16 @@ dependencies = [
"nodrop 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "async-trait"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"proc-macro2 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "atty"
version = "0.2.13"
@ -156,6 +166,14 @@ name = "core-foundation-sys"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "crossbeam-channel"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"crossbeam-utils 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-deque"
version = "0.7.1"
@ -320,6 +338,71 @@ name = "futures"
version = "0.1.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "futures-channel-preview"
version = "0.3.0-alpha.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"futures-core-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-sink-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "futures-core-preview"
version = "0.3.0-alpha.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "futures-executor-preview"
version = "0.3.0-alpha.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"futures-channel-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-core-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-util-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "futures-io-preview"
version = "0.3.0-alpha.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "futures-preview"
version = "0.3.0-alpha.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"futures-channel-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-core-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-executor-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-io-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-sink-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-util-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "futures-sink-preview"
version = "0.3.0-alpha.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"futures-core-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "futures-util-preview"
version = "0.3.0-alpha.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"futures-channel-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-core-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-io-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-sink-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"memchr 2.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)",
"slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "getrandom"
version = "0.1.12"
@ -443,6 +526,9 @@ dependencies = [
name = "lazy_static"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"spin 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "libc"
@ -473,6 +559,14 @@ dependencies = [
"scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "lock_api"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"scopeguard 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "log"
version = "0.4.8"
@ -668,6 +762,16 @@ dependencies = [
"parking_lot_core 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "parking_lot"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"lock_api 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot_core 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "parking_lot_core"
version = "0.4.0"
@ -680,11 +784,30 @@ dependencies = [
"winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "parking_lot_core"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cfg-if 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)",
"cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.62 (registry+https://github.com/rust-lang/crates.io-index)",
"redox_syscall 0.1.56 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
"smallvec 0.6.10 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "percent-encoding"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "pin-utils"
version = "0.1.0-alpha.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "pkg-config"
version = "0.3.16"
@ -1211,6 +1334,32 @@ dependencies = [
"tokio-uds 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio"
version = "0.2.0-alpha.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-core-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-sink-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-util-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-codec 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-current-thread 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-fs 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-macros 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-reactor 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-sync 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-tcp 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-threadpool 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-timer 0.3.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-udp 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-uds 0.3.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tracing-core 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-codec"
version = "0.1.1"
@ -1221,6 +1370,18 @@ dependencies = [
"tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-codec"
version = "0.2.0-alpha.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-core-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-sink-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-current-thread"
version = "0.1.6"
@ -1230,6 +1391,15 @@ dependencies = [
"tokio-executor 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-current-thread"
version = "0.2.0-alpha.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"crossbeam-channel 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-executor"
version = "0.1.8"
@ -1239,6 +1409,11 @@ dependencies = [
"futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-executor"
version = "0.2.0-alpha.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "tokio-fs"
version = "0.1.6"
@ -1249,6 +1424,17 @@ dependencies = [
"tokio-threadpool 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-fs"
version = "0.2.0-alpha.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"futures-core-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-util-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-threadpool 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-io"
version = "0.1.12"
@ -1259,6 +1445,27 @@ dependencies = [
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-io"
version = "0.2.0-alpha.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-core-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"memchr 2.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-macros"
version = "0.2.0-alpha.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 0.15.44 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-openssl"
version = "0.3.0"
@ -1287,6 +1494,24 @@ dependencies = [
"tokio-sync 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-reactor"
version = "0.2.0-alpha.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"crossbeam-utils 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-core-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"mio 0.6.19 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-sync 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-rustls"
version = "0.10.0"
@ -1309,6 +1534,17 @@ dependencies = [
"futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-sync"
version = "0.2.0-alpha.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-core-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-sink-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-util-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-tcp"
version = "0.1.3"
@ -1322,6 +1558,20 @@ dependencies = [
"tokio-reactor 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-tcp"
version = "0.2.0-alpha.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-core-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-util-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"mio 0.6.19 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-reactor 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-threadpool"
version = "0.1.15"
@ -1338,6 +1588,24 @@ dependencies = [
"tokio-executor 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-threadpool"
version = "0.2.0-alpha.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"crossbeam-deque 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-queue 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-utils 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-core-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-util-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
"slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-sync 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-timer"
version = "0.2.11"
@ -1349,6 +1617,19 @@ dependencies = [
"tokio-executor 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-timer"
version = "0.3.0-alpha.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"crossbeam-utils 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-core-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-util-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-sync 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-tls"
version = "0.2.1"
@ -1373,6 +1654,18 @@ dependencies = [
"tokio-reactor 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-udp"
version = "0.2.0-alpha.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"futures-core-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-util-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"mio 0.6.19 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-reactor 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-uds"
version = "0.2.5"
@ -1390,6 +1683,24 @@ dependencies = [
"tokio-reactor 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-uds"
version = "0.3.0-alpha.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-core-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-util-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.62 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"mio 0.6.19 (registry+https://github.com/rust-lang/crates.io-index)",
"mio-uds 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-codec 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-reactor 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "toml"
version = "0.5.3"
@ -1398,6 +1709,15 @@ dependencies = [
"serde 1.0.100 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tracing-core"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"spin 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "trust-dns"
version = "0.17.0"
@ -1516,6 +1836,7 @@ dependencies = [
name = "trust-dns-proto"
version = "0.8.0"
dependencies = [
"async-trait 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
"data-encoding 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"enum-as-inner 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1904,6 +2225,7 @@ dependencies = [
"checksum aho-corasick 0.7.6 (registry+https://github.com/rust-lang/crates.io-index)" = "58fb5e95d83b38284460a5fda7d6470aa0b8844d283a0b614b8535e880800d2d"
"checksum ansi_term 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b"
"checksum arrayvec 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)" = "b8d73f9beda665eaa98ab9e4f7442bd4e7de6652587de55b2525e52e29c1b0ba"
"checksum async-trait 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "153d1add2273943aef00490c3a43cf078478c7f6be623a5a8254c775fd6f987d"
"checksum atty 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)" = "1803c647a3ec87095e7ae7acfca019e98de5ec9a7d01343f611cf3152ed71a90"
"checksum autocfg 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "b671c8fb71b457dd4ae18c4ba1e59aa81793daacc361d82fcd410cef0d491875"
"checksum backtrace 0.3.37 (registry+https://github.com/rust-lang/crates.io-index)" = "5180c5a20655b14a819b652fd2378fa5f1697b6c9ddad3e695c2f9cedf6df4e2"
@ -1921,6 +2243,7 @@ dependencies = [
"checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f"
"checksum core-foundation 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)" = "25b9e03f145fd4f2bf705e07b900cd41fc636598fe5dc452fd0db1441c3f496d"
"checksum core-foundation-sys 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e7ca8a5221364ef15ce201e8ed2f609fc312682a8f4e0e3d4aa5879764e0fa3b"
"checksum crossbeam-channel 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "c8ec7fcd21571dc78f96cc96243cab8d8f035247c3efd16c687be154c3fa9efa"
"checksum crossbeam-deque 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b18cd2e169ad86297e6bc0ad9aa679aee9daa4f19e8163860faf7c164e4f5a71"
"checksum crossbeam-epoch 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "fedcd6772e37f3da2a9af9bf12ebe046c0dfe657992377b4df982a2b54cd37a9"
"checksum crossbeam-queue 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7c979cd6cfe72335896575c6b5688da489e420d36a27a0b9eb0c73db574b4a4b"
@ -1960,6 +2283,7 @@ dependencies = [
"checksum libsqlite3-sys 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5e5b95e89c330291768dc840238db7f9e204fd208511ab6319b56193a7f2ae25"
"checksum linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "ae91b68aebc4ddb91978b11a1b02ddd8602a05ec19002801c5666000e05e0f83"
"checksum lock_api 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "62ebf1391f6acad60e5c8b43706dde4582df75c06698ab44511d15016bc2442c"
"checksum lock_api 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f8912e782533a93a167888781b836336a6ca5da6175c05944c86cf28c31104dc"
"checksum log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7"
"checksum lru-cache 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "31e24f1ad8321ca0e8a1e0ac13f23cb668e6f5466c2c57319f6a5cf1cc8e3b1c"
"checksum matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08"
@ -1982,7 +2306,9 @@ dependencies = [
"checksum openssl-sys 0.9.49 (registry+https://github.com/rust-lang/crates.io-index)" = "f4fad9e54bd23bd4cbbe48fdc08a1b8091707ac869ef8508edea2fec77dcc884"
"checksum owning_ref 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "49a4b8ea2179e6a2e27411d3bca09ca6dd630821cf6894c6c7c8467a8ee7ef13"
"checksum parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ab41b4aed082705d1056416ae4468b6ea99d52599ecf3169b00088d43113e337"
"checksum parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f842b1982eb6c2fe34036a4fbfb06dd185a3f5c8edfaacdf7d1ea10b07de6252"
"checksum parking_lot_core 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "94c8c7923936b28d546dfd14d4472eaf34c99b14e1c973a32b3e6d4eb04298c9"
"checksum parking_lot_core 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b876b1b9e7ac6e1a74a6da34d25c42e17e8862aa409cbbbdcfc8d86c6f3bc62b"
"checksum percent-encoding 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
"checksum pkg-config 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)" = "72d5370d90f49f70bd033c3d75e87fc529fbfff9d6f7cccef07d6170079d91ea"
"checksum ppv-lite86 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e3cbf9f658cdb5000fcf6f362b8ea2ba154b9f146a61c7a20d647034c6b6561b"
@ -2045,18 +2371,30 @@ dependencies = [
"checksum thread_local 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "c6b53e329000edc2b34dbe8545fd20e55a333362d0a321909685a19bd28c3f1b"
"checksum time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)" = "db8dcfca086c1143c9270ac42a2bbd8a7ee477b78ac8e45b19abfb0cbede4b6f"
"checksum tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)" = "5a09c0b5bb588872ab2f09afa13ee6e9dac11e10a0ec9e8e3ba39a5a5d530af6"
"checksum tokio 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)" = "6320d5796f0e08444252f37de1e23dab529e6f7e662447a0e10e183d1cbda371"
"checksum tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5c501eceaf96f0e1793cf26beb63da3d11c738c4a943fdf3746d81d64684c39f"
"checksum tokio-codec 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)" = "60505acba66c2b46c475b682355967e270db2c0a200392ca2079d214f514935e"
"checksum tokio-current-thread 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "d16217cad7f1b840c5a97dfb3c43b0c871fef423a6e8d2118c604e843662a443"
"checksum tokio-current-thread 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4ed30983efe64aa01758777622d70f45054802d959f02b7895b7245883699487"
"checksum tokio-executor 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "0f27ee0e6db01c5f0b2973824547ce7e637b2ed79b891a9677b0de9bd532b6ac"
"checksum tokio-executor 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1e0937fcedb52baa1424b7483977ec1e387a75413c12abc232c3c092ed35f68d"
"checksum tokio-fs 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "3fe6dc22b08d6993916647d108a1a7d15b9cd29c4f4496c62b92c45b5041b7af"
"checksum tokio-fs 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1e96beb64c2b41f4a04b9a6f6834684b9582b3a75534279e6f93b6f91b163eac"
"checksum tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "5090db468dad16e1a7a54c8c67280c5e4b544f3d3e018f0b913b400261f85926"
"checksum tokio-io 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ab1e0e025bd8193899c064a0b03d68bf7d599aaf6c4c2a62aa394bb0e476c1f7"
"checksum tokio-macros 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)" = "04da794609e1785ae8541a596891641897ae4693bb451cd1c91cd8ce965b36e6"
"checksum tokio-openssl 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "771d6246b170ae108d67d9963c23f31a579016c016d73bd4bd7d6ef0252afda7"
"checksum tokio-reactor 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "6af16bfac7e112bea8b0442542161bfc41cbfa4466b580bdda7d18cb88b911ce"
"checksum tokio-reactor 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)" = "aea2939bfb43f47226624d08486855f022c015016f599bf541d85c28557efccc"
"checksum tokio-rustls 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b9f0ded5b0b8dbb284cf9464ed0f2912e3e8806553d92f95f5e6944c2b8e989d"
"checksum tokio-sync 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2162248ff317e2bc713b261f242b69dbb838b85248ed20bb21df56d60ea4cae7"
"checksum tokio-sync 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ce3e512e9a367074a2a73fc2b48641e672eb572880b4e9c1eb01e7d1539f9694"
"checksum tokio-tcp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "1d14b10654be682ac43efee27401d792507e30fd8d26389e1da3b185de2e4119"
"checksum tokio-tcp 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7e309cd85bf33849dd29cb4dbdcfaf087bc6ae69a58ba37a9c39d8457ee706a6"
"checksum tokio-threadpool 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)" = "90ca01319dea1e376a001e8dc192d42ebde6dd532532a5bad988ac37db365b19"
"checksum tokio-threadpool 0.2.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1f2c3bb6766294ff0348ace4542ff49a4e2e951ffb54a3f90dcf060826fee499"
"checksum tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "f2106812d500ed25a4f38235b9cae8f78a09edf43203e16e59c3b769a342a60e"
"checksum tokio-timer 0.3.0-alpha.1 (registry+https://github.com/rust-lang/crates.io-index)" = "45d6916b534f98c7a8a91bb4ecb6cc5f8d883d4025f67c856ef7fd21d78dc7b2"
"checksum tokio-tls 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "354b8cd83825b3c20217a9dc174d6a0c67441a2fae5c41bcb1ea6679f6ae0f7c"
"checksum tokio-udp 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "f02298505547f73e60f568359ef0d016d5acd6e830ab9bc7c4a5b3403440121b"
"checksum tokio-uds 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "037ffc3ba0e12a0ab4aca92e5234e0dedeb48fddf6ccd260f1f150a36a9f2445"

View File

@ -55,10 +55,11 @@ name = "trust_dns_proto"
path = "src/lib.rs"
[dependencies]
async-trait = "0.1.11"
data-encoding = { version = "2.1.0", optional = true }
enum-as-inner = "0.3"
failure = "0.1"
futures = "^0.1.28"
futures-preview = "0.3.0-alpha"
idna = "^0.2.0"
lazy_static = "^1.0"
log = "^0.4.8"
@ -68,14 +69,15 @@ ring = { version = "0.16", optional = true, features = ["std"] }
serde = { version = "1.0", optional = true }
smallvec = "^0.6"
socket2 = { version = "^0.3.10" }
tokio-executor = "0.1.8"
tokio-io = "^0.1"
tokio-reactor = { version = "^0.1", optional = true}
tokio-tcp = { version = "^0.1", optional = true }
tokio-timer = "0.2.10"
tokio-udp = { version = "^0.1", optional = true }
tokio-executor = "0.2.0-alpha"
tokio-io = "0.2.0-alpha"
tokio-reactor = { version = "0.2.0-alpha", optional = true}
tokio-sync = "0.2.0-alpha"
tokio-tcp = { version = "0.2.0-alpha", optional = true }
tokio-timer = "0.3.0-alpha"
tokio-udp = { version = "0.2.0-alpha", optional = true }
url = "2.1.0"
[dev-dependencies]
env_logger = "^0.6"
tokio = "^0.1.22"
tokio = "0.2.0-alpha"

View File

@ -25,6 +25,7 @@ use ring::error::Unspecified;
use failure::{Backtrace, Context, Fail};
use tokio_executor::SpawnError;
use tokio_timer::Error as TimerError;
use tokio_timer::timeout::Elapsed;
/// An alias for results returned by functions of this crate
pub type ProtoResult<T> = ::std::result::Result<T, ProtoError>;
@ -34,7 +35,7 @@ pub type ProtoResult<T> = ::std::result::Result<T, ProtoError>;
pub enum ProtoErrorKind {
/// An error caused by a canceled future
#[fail(display = "future was canceled: {:?}", _0)]
Canceled(::futures::sync::oneshot::Canceled),
Canceled(futures::channel::oneshot::Canceled),
/// Character data length exceeded the limit
#[fail(display = "char data length exceeds {}: {}", _0, _1)]
@ -183,6 +184,10 @@ pub enum ProtoErrorKind {
#[fail(display = "request timed out")]
Timeout,
/// A request timeout elapsed
#[fail(display = "request time elapsed before data received")]
Elapsed,
/// An url parsing error
#[fail(display = "url parsing error")]
UrlParsing,
@ -294,27 +299,33 @@ impl From<TimerError> for ProtoError {
}
}
impl From<tokio_timer::timeout::Error<ProtoError>> for ProtoError {
fn from(e: tokio_timer::timeout::Error<ProtoError>) -> Self {
if e.is_elapsed() {
return ProtoError::from(ProtoErrorKind::Timeout);
}
if e.is_inner() {
return e.into_inner().expect("invalid state, not a ProtoError");
}
if e.is_timer() {
return ProtoError::from(
e.into_timer()
.expect("invalid state, not a tokio_timer::Error"),
);
}
ProtoError::from("unknown error with tokio_timer")
impl From<Elapsed> for ProtoError {
fn from(e: Elapsed) -> ProtoError {
e.context(ProtoErrorKind::Elapsed).into()
}
}
// impl From<tokio_timer::Error<ProtoError>> for ProtoError {
// fn from(e: tokio_timer::Error<ProtoError>) -> Self {
// if e.is_elapsed() {
// return ProtoError::from(ProtoErrorKind::Timeout);
// }
// if e.is_inner() {
// return e.into_inner().expect("invalid state, not a ProtoError");
// }
// if e.is_timer() {
// return ProtoError::from(
// e.into_timer()
// .expect("invalid state, not a tokio_timer::Error"),
// );
// }
// ProtoError::from("unknown error with tokio_timer")
// }
// }
impl From<::url::ParseError> for ProtoError {
fn from(e: ::url::ParseError) -> ProtoError {
e.context(ProtoErrorKind::UrlParsing).into()
@ -396,6 +407,7 @@ impl Clone for ProtoErrorKind {
DnsKeyProtocolNot3(protocol) => DnsKeyProtocolNot3(protocol),
DomainNameTooLong(len) => DomainNameTooLong(len),
EdnsNameNotRoot(ref found) => EdnsNameNotRoot(found.clone()),
Elapsed => Elapsed,
IncorrectRDataLengthRead { read, len } => IncorrectRDataLengthRead { read, len },
LabelBytesTooLong(len) => LabelBytesTooLong(len),
PointerNotPriorToLabel { idx, ptr } => PointerNotPriorToLabel { idx, ptr },

View File

@ -9,8 +9,12 @@
#![warn(missing_docs)]
#![recursion_limit = "2048"]
#![feature(async_await)]
#![feature(type_alias_impl_trait)]
//! Trust-DNS Protocol library
extern crate async_trait;
#[cfg(feature = "dnssec")]
extern crate data_encoding;
#[macro_use]
@ -18,7 +22,6 @@ extern crate enum_as_inner;
#[cfg(test)]
extern crate env_logger;
extern crate failure;
#[macro_use]
extern crate futures;
extern crate idna;
#[macro_use]
@ -37,8 +40,8 @@ extern crate socket2;
#[cfg(test)]
extern crate tokio;
extern crate tokio_executor;
#[macro_use]
extern crate tokio_io;
extern crate tokio_sync;
#[cfg(feature = "tokio-compat")]
extern crate tokio_tcp;
extern crate tokio_timer;
@ -46,6 +49,17 @@ extern crate tokio_timer;
extern crate tokio_udp;
extern crate url;
macro_rules! try_ready_stream {
($e:expr) => ({
match $e {
Poll::Ready(Some(Ok(t))) => t,
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => return Poll::Pending,
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(From::from(e)))),
}
})
}
pub mod error;
#[cfg(feature = "mdns")]
pub mod multicast;

View File

@ -7,8 +7,11 @@
use std::fmt::{self, Display};
use std::net::{Ipv4Addr, SocketAddr};
use std::task::Context;
use std::pin::Pin;
use futures::{Async, Future, Poll, Stream};
use futures::{Future, FutureExt, Poll, Stream, TryFutureExt};
use futures::stream::{StreamExt, TryStreamExt};
use crate::error::ProtoError;
use crate::xfer::{DnsClientStream, SerialMessage};
@ -60,11 +63,11 @@ impl MdnsClientStream {
let (stream_future, sender) =
MdnsStream::new(mdns_addr, mdns_query_type, packet_ttl, ipv4_if, ipv6_if);
let new_future = Box::new(
stream_future
.map(move |mdns_stream| MdnsClientStream { mdns_stream })
.map_err(ProtoError::from),
);
let stream_future = stream_future
.map_ok(move |mdns_stream| MdnsClientStream { mdns_stream })
.map_err(ProtoError::from);
let new_future = Box::new(stream_future);
let new_future = MdnsClientConnect(new_future);
let sender = Box::new(BufDnsStreamHandle::new(mdns_addr, sender));
@ -86,29 +89,29 @@ impl DnsClientStream for MdnsClientStream {
}
impl Stream for MdnsClientStream {
type Item = SerialMessage;
type Error = ProtoError;
type Item = Result<SerialMessage, ProtoError>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match try_ready!(self.mdns_stream.poll().map_err(ProtoError::from)) {
Some(serial_message) => {
// TODO: for mDNS queries could come from anywhere. It's not clear that there is anything
// we can validate in this case.
Ok(Async::Ready(Some(serial_message)))
}
None => Ok(Async::Ready(None)),
}
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mdns_stream = &mut self.as_mut().mdns_stream;
mdns_stream.map_err(ProtoError::from).poll_next_unpin(cx)
// match ready!(self.mdns_stream.poll_next_unpin(cx).map_err(ProtoError::from)) {
// Some(serial_message) => {
// // TODO: for mDNS queries could come from anywhere. It's not clear that there is anything
// // we can validate in this case.
// Poll::Ready(Some(Ok(serial_message)))
// }
// None => Poll::Ready(None),
// }
}
}
/// A future that resolves to an MdnsClientStream
pub struct MdnsClientConnect(Box<dyn Future<Item = MdnsClientStream, Error = ProtoError> + Send>);
pub struct MdnsClientConnect(Box<dyn Future<Output = Result<MdnsClientStream, ProtoError>> + Send + Unpin>);
impl Future for MdnsClientConnect {
type Item = MdnsClientStream;
type Error = ProtoError;
type Output = Result<MdnsClientStream, ProtoError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.0.poll()
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.0.as_mut().poll_unpin(cx)
}
}

View File

@ -8,12 +8,16 @@
use std;
use std::io;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use futures::ready;
use futures::future;
use futures::stream::Stream;
use futures::sync::mpsc::unbounded;
use futures::task;
use futures::{Async, Future, Poll};
use futures::stream::{Stream, StreamExt};
use futures::channel::mpsc::unbounded;
use futures::{Future, FutureExt, Poll, TryFutureExt};
use futures::lock::Mutex;
use rand;
use rand::distributions::{uniform::Uniform, Distribution};
use socket2::{self, Socket};
@ -40,8 +44,11 @@ pub struct MdnsStream {
multicast_addr: SocketAddr,
/// This is used for sending and (directly) receiving messages
datagram: Option<UdpStream<UdpSocket>>,
// FIXME: like UdpStream, this Arc is unnecessary, only needed for temp async/await capture below
/// In one-shot multicast, this will not join the multicast group
multicast: Option<UdpSocket>,
multicast: Option<Arc<Mutex<UdpSocket>>>,
/// Receiving portion of the MdnsStream
rcving_mcast: Option<Pin<Box<dyn Future<Output = io::Result<SerialMessage>> + Send>>>,
}
impl MdnsStream {
@ -51,7 +58,7 @@ impl MdnsStream {
packet_ttl: Option<u32>,
ipv4_if: Option<Ipv4Addr>,
) -> (
Box<dyn Future<Item = MdnsStream, Error = io::Error> + Send>,
Box<dyn Future<Output = Result<MdnsStream, io::Error>> + Send + Unpin>,
BufStreamHandle,
) {
Self::new(*MDNS_IPV4, mdns_query_type, packet_ttl, ipv4_if, None)
@ -63,7 +70,7 @@ impl MdnsStream {
packet_ttl: Option<u32>,
ipv6_if: Option<u32>,
) -> (
Box<dyn Future<Item = MdnsStream, Error = io::Error> + Send>,
Box<dyn Future<Output = Result<MdnsStream, io::Error>> + Send + Unpin>,
BufStreamHandle,
) {
Self::new(*MDNS_IPV6, mdns_query_type, packet_ttl, None, ipv6_if)
@ -102,7 +109,7 @@ impl MdnsStream {
ipv4_if: Option<Ipv4Addr>,
ipv6_if: Option<u32>,
) -> (
Box<dyn Future<Item = MdnsStream, Error = io::Error> + Send>,
Box<dyn Future<Output = Result<MdnsStream, io::Error>> + Send + Unpin>,
BufStreamHandle,
) {
let (message_sender, outbound_messages) = unbounded();
@ -137,26 +144,29 @@ impl MdnsStream {
Box::new(
next_socket
.map(move |socket: Option<_>| {
socket.map(|socket| {
UdpSocket::from_std(socket, &handle).expect("bad handle?")
})
.map(move |socket| {
match socket {
Ok(Some(socket)) => Ok(Some(UdpSocket::from_std(socket, &handle)?)),
Ok(None) => Ok(None),
Err(err) => Err(err),
}
})
.map(move |socket: Option<_>| {
let datagram =
.map_ok(move |socket: Option<_>| {
let datagram: Option<_> =
socket.map(|socket| UdpStream::from_parts(socket, outbound_messages));
let multicast: Option<UdpSocket> =
let multicast: Option<_> =
multicast_socket.map(|multicast_socket| {
UdpSocket::from_std(multicast_socket, &handle_clone)
.expect("bad handle?")
Arc::new(Mutex::new(UdpSocket::from_std(multicast_socket, &handle_clone)
.expect("bad handle?")))
});
MdnsStream {
multicast_addr,
datagram,
multicast,
rcving_mcast: None,
}
}),
})
)
};
@ -261,34 +271,54 @@ impl MdnsStream {
}
impl Stream for MdnsStream {
type Item = SerialMessage;
type Error = io::Error;
type Item = io::Result<SerialMessage>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
assert!(self.datagram.is_some() || self.multicast.is_some());
// we poll the datagram socket first, if available, since it's a direct response or direct request
if let Some(ref mut datagram) = self.datagram {
match datagram.poll() {
Ok(Async::Ready(data)) => return Ok(Async::Ready(data)),
Err(err) => return Err(err),
Ok(Async::NotReady) => (), // drop through
if let Some(ref mut datagram) = self.as_mut().datagram {
match datagram.poll_next_unpin(cx) {
Poll::Ready(ready) => return Poll::Ready(ready),
Poll::Pending => (), // drop through
}
}
if let Some(ref mut multicast) = self.multicast {
let mut buf = [0u8; 2048];
// TODO: should we drop this packet if it's not from the same src as dest?
let (len, src) = try_ready!(multicast.poll_recv_from(&mut buf));
// now return the multicast
return Ok(Async::Ready(Some(SerialMessage::new(
buf.iter().take(len).cloned().collect(),
src,
))));
loop {
let msg = if let Some(ref mut receiving) = self.rcving_mcast {
// TODO: should we drop this packet if it's not from the same src as dest?
let msg = ready!(receiving.as_mut().poll_unpin(cx))?;
Some(Poll::Ready(Some(Ok(msg))))
} else {
None
};
self.rcving_mcast = None;
if let Some(msg) = msg {
return msg;
}
// let socket = Arc::clone(socket);
if let Some(ref socket) = self.multicast {
let socket = Arc::clone(socket);
let receive_future = async {
let socket = socket;
let mut buf = [0u8; 2048];
let mut socket = socket.lock().await;
let (len, src) = socket.recv_from(&mut buf).await?;
Ok(SerialMessage::new(
buf.iter().take(len).cloned().collect(),
src,
))
};
self.rcving_mcast = Some(Box::pin(receive_future.boxed()));
}
}
Ok(Async::NotReady)
}
}
@ -339,24 +369,24 @@ impl NextRandomUdpSocket {
}
impl Future for NextRandomUdpSocket {
type Item = Option<std::net::UdpSocket>;
type Error = io::Error;
// TODO: clean this up, the RandomUdpSocket shouldnt' care about the query type
type Output = io::Result<Option<std::net::UdpSocket>>;
/// polls until there is an available next random UDP port.
///
/// if there is no port available after 10 attempts, returns NotReady
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
// non-one-shot, i.e. continuous, always use one of the well-known mdns ports and bind to the multicast addr
if !self.mdns_query_type.sender() {
debug!("skipping sending stream");
Ok(Async::Ready(None))
Poll::Ready(Ok(None))
} else if self.mdns_query_type.bind_on_5353() {
let addr = SocketAddr::new(self.bind_address, MDNS_PORT);
debug!("binding sending stream to {}", addr);
let socket = std::net::UdpSocket::bind(&addr)?;
let socket = self.prepare_sender(socket)?;
Ok(Async::Ready(Some(socket)))
Poll::Ready(Ok(Some(socket)))
} else {
// TODO: this is basically identical to UdpStream from here... share some code? (except for the port restriction)
// one-shot queries look very similar to UDP socket, but can't listen on 5353
@ -380,7 +410,7 @@ impl Future for NextRandomUdpSocket {
match std::net::UdpSocket::bind(&addr) {
Ok(socket) => {
let socket = self.prepare_sender(socket)?;
return Ok(Async::Ready(Some(socket)));
return Poll::Ready(Ok(Some(socket)));
}
Err(err) => debug!("unable to bind port, attempt: {}: {}", attempt, err),
}
@ -388,9 +418,9 @@ impl Future for NextRandomUdpSocket {
debug!("could not get next random port, delaying");
task::current().notify();
// returning NotReady here, perhaps the next poll there will be some more socket available.
Ok(Async::NotReady)
// TODO: this replaced a task::current().notify, is it correct?
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
@ -398,7 +428,7 @@ impl Future for NextRandomUdpSocket {
#[cfg(test)]
pub mod tests {
use super::*;
use futures::future::{Either, Future};
use futures::future::Either;
use tokio::runtime::current_thread::Runtime;
// TODO: is there a better way?
@ -460,10 +490,10 @@ pub mod tests {
.name("test_one_shot_mdns:server".to_string())
.spawn(move || {
let mut server_loop = Runtime::new().unwrap();
let mut timeout: Box<dyn Future<Item = (), Error = tokio_timer::Error> + Send> =
Box::new(future::lazy(|| {
let mut timeout =
future::lazy(|_| {
Delay::new(Instant::now() + Duration::from_millis(100))
}));
}).flatten().boxed();
// TTLs are 0 so that multicast test packets never leave the test host...
// FIXME: this is hardcoded to index 5 for ipv6, which isn't going to be correct in most cases...
@ -487,17 +517,15 @@ pub mod tests {
}
// wait for some bytes...
match server_loop
.block_on(future::lazy(|| server_stream.select2(timeout)))
.ok()
.expect("server stream closed")
.block_on(future::lazy(|_| future::select(server_stream, timeout)).flatten())
{
Either::A((buffer_and_addr_stream_tmp, timeout_tmp)) => {
let (buffer_and_addr, stream_tmp) = buffer_and_addr_stream_tmp;
Either::Left((buffer_and_addr_stream_tmp, timeout_tmp)) => {
let (buffer_and_addr, stream_tmp): (Option<Result<SerialMessage, io::Error>>, MdnsStream) = buffer_and_addr_stream_tmp;
server_stream = stream_tmp.into_future();
timeout = timeout_tmp;
let (buffer, addr) =
buffer_and_addr.expect("no buffer received").unwrap();
buffer_and_addr.expect("no msg received").expect("error receiving msg").unwrap();
assert_eq!(&buffer, test_bytes);
//println!("server got data! {}", addr);
@ -507,18 +535,17 @@ pub mod tests {
.unbounded_send(SerialMessage::new(test_bytes.to_vec(), addr))
.expect("could not send to client");
}
Either::B(((), buffer_and_addr_stream_tmp)) => {
Either::Right(((), buffer_and_addr_stream_tmp)) => {
server_stream = buffer_and_addr_stream_tmp;
timeout = Box::new(future::lazy(|| {
timeout = future::lazy(|_| {
Delay::new(Instant::now() + Duration::from_millis(100))
}));
}).flatten().boxed();
}
}
// let the server turn for a bit... send the message
server_loop
.block_on(Delay::new(Instant::now() + Duration::from_millis(100)))
.unwrap();
.block_on(Delay::new(Instant::now() + Duration::from_millis(100)));
}
})
.unwrap();
@ -530,10 +557,10 @@ pub mod tests {
let (stream, sender) =
MdnsStream::new(mdns_addr, MdnsQueryType::OneShot, Some(1), None, Some(5));
let mut stream = io_loop.block_on(stream).ok().unwrap().into_future();
let mut timeout: Box<dyn Future<Item = (), Error = tokio_timer::Error> + Send> =
Box::new(future::lazy(|| {
let mut timeout =
future::lazy(|_| {
Delay::new(Instant::now() + Duration::from_millis(100))
}));
}).flatten().boxed();
let mut successes = 0;
for _ in 0..send_recv_times {
@ -544,35 +571,24 @@ pub mod tests {
println!("client sending data!");
let run_result = match io_loop.block_on(future::lazy(|| stream.select2(timeout))) {
Ok(run_result) => run_result,
Err(err) => match err {
Either::A(((stream_err, _stream), _timeout)) => {
panic!("client stream errored: {}", stream_err)
}
Either::B((timeout_err, _stream)) => {
panic!("client timeout errored: {}", timeout_err)
}
},
};
match run_result {
Either::A((buffer_and_addr_stream_tmp, timeout_tmp)) => {
// TODO: this lazy isn't needed is it?
match io_loop.block_on(future::lazy(|_| future::select(stream, timeout)).flatten()) {
Either::Left((buffer_and_addr_stream_tmp, timeout_tmp)) => {
let (buffer_and_addr, stream_tmp) = buffer_and_addr_stream_tmp;
stream = stream_tmp.into_future();
timeout = timeout_tmp;
let (buffer, _addr) = buffer_and_addr.expect("no buffer received").unwrap();
let (buffer, _addr) = buffer_and_addr.expect("no msg received").expect("error receiving msg").unwrap();
println!("client got data!");
assert_eq!(&buffer, test_bytes);
successes += 1;
}
Either::B(((), buffer_and_addr_stream_tmp)) => {
Either::Right(((), buffer_and_addr_stream_tmp)) => {
stream = buffer_and_addr_stream_tmp;
timeout = Box::new(future::lazy(|| {
timeout = future::lazy(|_| {
Delay::new(Instant::now() + Duration::from_millis(100))
}));
}).flatten().boxed();
}
}
}
@ -615,10 +631,10 @@ pub mod tests {
.name("test_one_shot_mdns:server".to_string())
.spawn(move || {
let mut io_loop = Runtime::new().unwrap();
let mut timeout: Box<dyn Future<Item = (), Error = tokio_timer::Error> + Send> =
Box::new(future::lazy(|| {
let mut timeout =
future::lazy(|_| {
Delay::new(Instant::now() + Duration::from_millis(100))
}));
}).flatten().boxed();
// TTLs are 0 so that multicast test packets never leave the test host...
// FIXME: this is hardcoded to index 5 for ipv6, which isn't going to be correct in most cases...
@ -634,11 +650,9 @@ pub mod tests {
for _ in 0..=send_recv_times {
// wait for some bytes...
match io_loop
.block_on(future::lazy(|| server_stream.select2(timeout)))
.ok()
.expect("server stream closed")
.block_on(future::lazy(|_| future::select(server_stream, timeout)).flatten())
{
Either::A((_buffer_and_addr_stream_tmp, _timeout_tmp)) => {
Either::Left((_buffer_and_addr_stream_tmp, _timeout_tmp)) => {
// let (buffer_and_addr, stream_tmp) = buffer_and_addr_stream_tmp;
// server_stream = stream_tmp.into_future();
@ -652,18 +666,17 @@ pub mod tests {
.store(true, std::sync::atomic::Ordering::Relaxed);
return;
}
Either::B(((), buffer_and_addr_stream_tmp)) => {
Either::Right(((), buffer_and_addr_stream_tmp)) => {
server_stream = buffer_and_addr_stream_tmp;
timeout = Box::new(future::lazy(|| {
timeout = future::lazy(|_| {
Delay::new(Instant::now() + Duration::from_millis(100))
}));
}).flatten().boxed();
}
}
// let the server turn for a bit... send the message
io_loop
.block_on(Delay::new(Instant::now() + Duration::from_millis(100)))
.unwrap();
.block_on(Delay::new(Instant::now() + Duration::from_millis(100)));
}
})
.unwrap();
@ -674,10 +687,10 @@ pub mod tests {
let (stream, sender) =
MdnsStream::new(mdns_addr, MdnsQueryType::OneShot, Some(1), None, Some(5));
let mut stream = io_loop.block_on(stream).ok().unwrap().into_future();
let mut timeout: Box<dyn Future<Item = (), Error = tokio_timer::Error> + Send> =
Box::new(future::lazy(|| {
let mut timeout =
future::lazy(|_| {
Delay::new(Instant::now() + Duration::from_millis(100))
}));
}).flatten().boxed();
for _ in 0..send_recv_times {
// test once
@ -687,33 +700,24 @@ pub mod tests {
println!("client sending data!");
let run_result = match io_loop.block_on(future::lazy(|| stream.select2(timeout))) {
Ok(run_result) => run_result,
Err(err) => match err {
Either::A(((stream_err, _stream), _timeout)) => {
panic!("client stream errored: {}", stream_err)
}
Either::B((timeout_err, _stream)) => {
panic!("client timeout errored: {}", timeout_err)
}
},
};
// TODO: this lazy is probably unnecessary?
let run_result = io_loop.block_on(future::lazy(|_| future::select(stream, timeout)).flatten());
if server_got_packet.load(std::sync::atomic::Ordering::Relaxed) {
return;
}
match run_result {
Either::A((buffer_and_addr_stream_tmp, timeout_tmp)) => {
Either::Left((buffer_and_addr_stream_tmp, timeout_tmp)) => {
let (_buffer_and_addr, stream_tmp) = buffer_and_addr_stream_tmp;
stream = stream_tmp.into_future();
timeout = timeout_tmp;
}
Either::B(((), buffer_and_addr_stream_tmp)) => {
Either::Right(((), buffer_and_addr_stream_tmp)) => {
stream = buffer_and_addr_stream_tmp;
timeout = Box::new(future::lazy(|| {
timeout = future::lazy(|_| {
Delay::new(Instant::now() + Duration::from_millis(100))
}));
}).flatten().boxed();
}
}
}

View File

@ -6,10 +6,13 @@
// copied, modified, or distributed except according to those terms.
use std::fmt::{self, Display};
use std::io;
use std::net::SocketAddr;
use std::time::Duration;
use std::pin::Pin;
use std::task::Context;
use futures::{Async, Future, Poll, Stream};
use futures::{Future, Poll, Stream, TryFutureExt, StreamExt};
use tokio_io::{AsyncRead, AsyncWrite};
use crate::error::ProtoError;
@ -52,9 +55,9 @@ impl<S: Connect + 'static + Send> TcpClientStream<S> {
) -> (TcpClientConnect<S::Transport>, Box<dyn DnsStreamHandle + Send>) {
let (stream_future, sender) = TcpStream::<S>::with_timeout(name_server, timeout);
let new_future = Box::new(
let new_future = Box::pin(
stream_future
.map(move |tcp_stream| TcpClientStream { tcp_stream })
.map_ok(move |tcp_stream| TcpClientStream { tcp_stream })
.map_err(ProtoError::from),
);
@ -64,7 +67,7 @@ impl<S: Connect + 'static + Send> TcpClientStream<S> {
}
}
impl<S> TcpClientStream<S> {
impl<S: AsyncRead + AsyncWrite + Send> TcpClientStream<S> {
/// Wraps the TcpStream in TcpClientStream
pub fn from_stream(tcp_stream: TcpStream<S>) -> Self {
TcpClientStream { tcp_stream }
@ -77,44 +80,39 @@ impl<S: AsyncRead + AsyncWrite + Send> Display for TcpClientStream<S> {
}
}
impl<S: AsyncRead + AsyncWrite + Send> DnsClientStream for TcpClientStream<S> {
impl<S: AsyncRead + AsyncWrite + Send + Unpin> DnsClientStream for TcpClientStream<S> {
fn name_server_addr(&self) -> SocketAddr {
self.tcp_stream.peer_addr()
}
}
impl<S: AsyncRead + AsyncWrite + Send> Stream for TcpClientStream<S> {
type Item = SerialMessage;
type Error = ProtoError;
impl<S: AsyncRead + AsyncWrite + Send + Unpin> Stream for TcpClientStream<S> {
type Item = Result<SerialMessage, ProtoError>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match try_ready!(self.tcp_stream.poll().map_err(ProtoError::from)) {
Some(message) => {
// this is busted if the tcp connection doesn't have a peer
let peer = self.tcp_stream.peer_addr();
if message.addr() != peer {
// FIXME: this should be an error...
warn!("{} does not match name_server: {}", message.addr(), peer)
}
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let message = try_ready_stream!(self.tcp_stream.poll_next_unpin(cx));
Ok(Async::Ready(Some(message)))
}
None => Ok(Async::Ready(None)),
// this is busted if the tcp connection doesn't have a peer
let peer = self.tcp_stream.peer_addr();
if message.addr() != peer {
// FIXME: this should be an error...
warn!("{} does not match name_server: {}", message.addr(), peer)
}
Poll::Ready(Some(Ok(message)))
}
}
// TODO: create unboxed future for the TCP Stream
/// A future that resolves to an TcpClientStream
pub struct TcpClientConnect<S>(Box<dyn Future<Item = TcpClientStream<S>, Error = ProtoError> + Send>);
pub struct TcpClientConnect<S>(Pin<Box<dyn Future<Output = Result<TcpClientStream<S>, ProtoError>> + Send + Unpin + 'static>>);
impl<S> Future for TcpClientConnect<S> {
type Item = TcpClientStream<S>;
type Error = ProtoError;
type Output = Result<TcpClientStream<S>, ProtoError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.0.poll()
}
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.0.as_mut().poll(cx)
}
}
#[cfg(feature = "tokio-compat")]
@ -123,7 +121,9 @@ use tokio_tcp::TcpStream as TokioTcpStream;
#[cfg(feature = "tokio-compat")]
impl Connect for TokioTcpStream {
type Transport = TokioTcpStream;
type Future = tokio_tcp::ConnectFuture;
// TODO: changet Tokio's TcpStream to return a concrete type
type Future = impl Future<Output = io::Result<TokioTcpStream>>;
fn connect(addr: &SocketAddr) -> Self::Future {
TokioTcpStream::connect(addr)
}
@ -241,11 +241,9 @@ fn tcp_client_stream_test(server_addr: IpAddr) {
.send(SerialMessage::new(TEST_BYTES.to_vec(), server_addr))
.expect("send failed");
let (buffer, stream_tmp) = io_loop
.block_on(stream.into_future())
.ok()
.expect("future iteration run failed");
.block_on(stream.into_future());
stream = stream_tmp;
let buffer = buffer.expect("no buffer received");
let buffer = buffer.expect("no buffer received").expect("error recieving buffer");
assert_eq!(buffer.bytes(), TEST_BYTES);
}

View File

@ -11,10 +11,12 @@ use std::io;
use std::mem;
use std::net::SocketAddr;
use std::time::Duration;
use std::pin::Pin;
use std::task::Context;
use futures::stream::{Fuse, Peekable, Stream};
use futures::sync::mpsc::{unbounded, UnboundedReceiver};
use futures::{Async, Future, Poll};
use futures::stream::{Fuse, Peekable, Stream, StreamExt};
use futures::{ready, Future, FutureExt, Poll, TryFutureExt};
use futures::channel::mpsc::{unbounded, UnboundedReceiver};
use tokio_timer::Timeout;
use crate::error::*;
@ -26,9 +28,9 @@ where
Self: Sized,
{
/// TcpSteam
type Transport: io::Read + io::Write + Send;
type Transport: tokio_io::AsyncRead + tokio_io::AsyncWrite + Send;
/// Future returned by connect
type Future: Future<Item = Self::Transport, Error = io::Error> + Send;
type Future: Future<Output = Result<Self::Transport, io::Error>> + Send + Unpin;
/// connect to tcp
fn connect(addr: &SocketAddr) -> Self::Future;
}
@ -88,6 +90,10 @@ impl<S> TcpStream<S> {
pub fn peer_addr(&self) -> SocketAddr {
self.peer_addr
}
fn pollable_split(&mut self) -> (&mut S, &mut Peekable<Fuse<UnboundedReceiver<SerialMessage>>>, &mut Option<WriteTcpState>, &mut ReadTcpState) {
(&mut self.socket, &mut self.outbound_messages, &mut self.send_state, &mut self.read_state)
}
}
impl<S: Connect + 'static> TcpStream<S> {
@ -102,7 +108,8 @@ impl<S: Connect + 'static> TcpStream<S> {
pub fn new<E>(
name_server: SocketAddr,
) -> (
Box<dyn Future<Item = TcpStream<S::Transport>, Error = io::Error> + Send>,
//Box<dyn Future<Output = Result<TcpStream<S::Transport>, io::Error>> + Send>,
impl Future<Output = Result<TcpStream<S::Transport>, io::Error>> + Send,
BufStreamHandle,
)
where
@ -122,7 +129,8 @@ impl<S: Connect + 'static> TcpStream<S> {
name_server: SocketAddr,
timeout: Duration,
) -> (
Box<dyn Future<Item = TcpStream<S::Transport>, Error = io::Error> + Send>,
//Box<dyn Future<Output = Result<TcpStream<S::Transport>, io::Error>> + Send>,
impl Future<Output = Result<TcpStream<S::Transport>, io::Error>> + Send,
BufStreamHandle,
) {
let (message_sender, outbound_messages) = unbounded();
@ -130,17 +138,17 @@ impl<S: Connect + 'static> TcpStream<S> {
// This set of futures collapses the next tcp socket into a stream which can be used for
// sending and receiving tcp packets.
let tcp = S::connect(&name_server);
let stream = Timeout::new(tcp, timeout)
.map_err(move |e| {
let stream_fut = Timeout::new(tcp, timeout)
.map_err(move |_| {
debug!("timed out connecting to: {}", name_server);
e.into_inner().unwrap_or_else(|| {
io::Error::new(
io::ErrorKind::TimedOut,
format!("timed out connecting to: {}", name_server),
)
})
io::Error::new(
io::ErrorKind::TimedOut,
format!("timed out connecting to: {}", name_server),
)
})
.map(move |tcp_stream| {
.map(move |tcp_stream: Result<Result<S::Transport, io::Error>, _>| {
// FIXME: this unwrap is wrong, how do we flatten Result in this context?
tcp_stream.and_then(|tcp_stream| tcp_stream).map(|tcp_stream| {
debug!("TCP connection established to: {}", name_server);
TcpStream {
socket: tcp_stream,
@ -152,13 +160,14 @@ impl<S: Connect + 'static> TcpStream<S> {
},
peer_addr: name_server,
}
})
});
(Box::new(stream), message_sender)
(stream_fut, message_sender)
}
}
impl<S> TcpStream<S> {
impl<S: tokio_io::AsyncRead + tokio_io::AsyncWrite> TcpStream<S> {
/// Initializes a TcpStream.
///
/// This is intended for use with a TcpListener and Incoming.
@ -195,55 +204,58 @@ impl<S> TcpStream<S> {
}
}
impl<S: io::Read + io::Write> Stream for TcpStream<S> {
type Item = SerialMessage;
type Error = io::Error;
impl<S: tokio_io::AsyncRead + tokio_io::AsyncWrite + Unpin> Stream for TcpStream<S> {
type Item = io::Result<SerialMessage>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let peer = self.peer_addr;
let (socket, outbound_messages, send_state, read_state) = self.pollable_split();
let mut socket = Pin::new(socket);
let mut outbound_messages = Pin::new(outbound_messages);
#[allow(clippy::cognitive_complexity)]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// this will not accept incoming data while there is data to send
// makes this self throttling.
// TODO: it might be interesting to try and split the sending and receiving futures.
loop {
// in the case we are sending, send it all?
if self.send_state.is_some() {
if send_state.is_some() {
// sending...
match self.send_state {
match send_state {
Some(WriteTcpState::LenBytes {
ref mut pos,
ref length,
..
}) => {
let wrote = try_nb!(self.socket.write(&length[*pos..]));
let wrote = ready!(socket.as_mut().poll_write(cx, &length[*pos..]))?;
*pos += wrote;
}
Some(WriteTcpState::Bytes {
ref mut pos,
ref bytes,
}) => {
let wrote = try_nb!(self.socket.write(&bytes[*pos..]));
let wrote = ready!(socket.as_mut().poll_write(cx, &bytes[*pos..]))?;
*pos += wrote;
}
Some(WriteTcpState::Flushing) => {
try_nb!(self.socket.flush());
ready!(socket.as_mut().poll_flush(cx))?;
}
_ => (),
}
// get current state
let current_state = mem::replace(&mut self.send_state, None);
let current_state = mem::replace(send_state, None);
// switch states
match current_state {
Some(WriteTcpState::LenBytes { pos, length, bytes }) => {
if pos < length.len() {
mem::replace(
&mut self.send_state,
send_state,
Some(WriteTcpState::LenBytes { pos, length, bytes }),
);
} else {
mem::replace(
&mut self.send_state,
send_state,
Some(WriteTcpState::Bytes { pos: 0, bytes }),
);
}
@ -251,41 +263,38 @@ impl<S: io::Read + io::Write> Stream for TcpStream<S> {
Some(WriteTcpState::Bytes { pos, bytes }) => {
if pos < bytes.len() {
mem::replace(
&mut self.send_state,
send_state,
Some(WriteTcpState::Bytes { pos, bytes }),
);
} else {
// At this point we successfully delivered the entire message.
// flush
mem::replace(&mut self.send_state, Some(WriteTcpState::Flushing));
mem::replace(send_state, Some(WriteTcpState::Flushing));
}
}
Some(WriteTcpState::Flushing) => {
// At this point we successfully delivered the entire message.
mem::replace(&mut self.send_state, None);
mem::replace(send_state, None);
}
None => (),
};
} else {
// then see if there is more to send
match self
.outbound_messages
.poll()
.map_err(|()| io::Error::new(io::ErrorKind::Other, "unknown"))?
match outbound_messages.as_mut().poll_next(cx)
// .map_err(|()| io::Error::new(io::ErrorKind::Other, "unknown"))?
{
// already handled above, here to make sure the poll() pops the next message
Async::Ready(Some(message)) => {
Poll::Ready(Some(message)) => {
// if there is no peer, this connection should die...
let (buffer, dst) = message.unwrap();
let peer = self.peer_addr;
// This is an error if the destination is not our peer (this is TCP after all)
// This will kill the connection...
if peer != dst {
return Err(io::Error::new(
return Poll::Ready(Some(Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("mismatched peer: {} and dst: {}", peer, dst),
));
))));
}
// will return if the socket will block
@ -296,7 +305,7 @@ impl<S: io::Read + io::Write> Stream for TcpStream<S> {
];
debug!("sending message len: {} to: {}", buffer.len(), dst);
self.send_state = Some(WriteTcpState::LenBytes {
*send_state = Some(WriteTcpState::LenBytes {
pos: 0,
length: len,
bytes: buffer,
@ -304,8 +313,8 @@ impl<S: io::Read + io::Write> Stream for TcpStream<S> {
}
// now we get to drop through to the receives...
// TODO: should we also return None if there are no more messages to send?
Async::NotReady => break,
Async::Ready(None) => {
Poll::Pending => break,
Poll::Ready(None) => {
debug!("no messages to send");
break;
}
@ -320,13 +329,13 @@ impl<S: io::Read + io::Write> Stream for TcpStream<S> {
while ret_buf.is_none() {
// Evaluates the next state. If None is the result, then no state change occurs,
// if Some(_) is returned, then that will be used as the next state.
let new_state: Option<ReadTcpState> = match self.read_state {
let new_state: Option<ReadTcpState> = match read_state {
ReadTcpState::LenBytes {
ref mut pos,
ref mut bytes,
} => {
// debug!("reading length {}", bytes.len());
let read = try_nb!(self.socket.read(&mut bytes[*pos..]));
let read = ready!(socket.as_mut().poll_read(cx, &mut bytes[*pos..]))?;
if read == 0 {
// the Stream was closed!
debug!("zero bytes read, stream closed?");
@ -334,12 +343,12 @@ impl<S: io::Read + io::Write> Stream for TcpStream<S> {
if *pos == 0 {
// Since this is the start of the next message, we have a clean end
return Ok(Async::Ready(None));
return Poll::Ready(None);
} else {
return Err(io::Error::new(
return Poll::Ready(Some(Err(io::Error::new(
io::ErrorKind::BrokenPipe,
"closed while reading length",
));
))));
}
}
debug!("in ReadTcpState::LenBytes: {}", pos);
@ -363,17 +372,17 @@ impl<S: io::Read + io::Write> Stream for TcpStream<S> {
ref mut pos,
ref mut bytes,
} => {
let read = try_nb!(self.socket.read(&mut bytes[*pos..]));
let read = ready!(socket.as_mut().poll_read(cx, &mut bytes[*pos..]))?;
if read == 0 {
// the Stream was closed!
debug!("zero bytes read for message, stream closed?");
// Since this is the start of the next message, we have a clean end
// try!(self.socket.shutdown(Shutdown::Both)); // TODO: add generic shutdown function
return Err(io::Error::new(
return Poll::Ready(Some(Err(io::Error::new(
io::ErrorKind::BrokenPipe,
"closed while reading message",
));
))));
}
debug!("in ReadTcpState::Bytes: {}", bytes.len());
@ -396,7 +405,7 @@ impl<S: io::Read + io::Write> Stream for TcpStream<S> {
// if it was a completed receipt of bytes, then it will move out the bytes
if let Some(state) = new_state {
if let ReadTcpState::Bytes { pos, bytes } =
mem::replace(&mut self.read_state, state)
mem::replace(read_state, state)
{
debug!("returning bytes");
assert_eq!(pos, bytes.len());
@ -405,16 +414,16 @@ impl<S: io::Read + io::Write> Stream for TcpStream<S> {
}
}
// if the buffer is ready, return it, if not we're NotReady
// if the buffer is ready, return it, if not we're Pending
if let Some(buffer) = ret_buf {
debug!("returning buffer");
let src_addr = self.peer_addr;
Ok(Async::Ready(Some(SerialMessage::new(buffer, src_addr))))
return Poll::Ready(Some(Ok(SerialMessage::new(buffer, src_addr))));
} else {
debug!("bottomed out");
// at a minimum the outbound_messages should have been polled,
// which will wake this future up later...
Ok(Async::NotReady)
return Poll::Pending;
}
}
}
@ -531,11 +540,9 @@ fn tcp_client_stream_test(server_addr: IpAddr) {
.unbounded_send(SerialMessage::new(TEST_BYTES.to_vec(), server_addr))
.expect("send failed");
let (buffer, stream_tmp) = io_loop
.block_on(stream.into_future())
.ok()
.expect("future iteration run failed");
.block_on(stream.into_future());
stream = stream_tmp;
let message = buffer.expect("no buffer received");
let message = buffer.expect("no buffer received").expect("error receiving buffer");
assert_eq!(message.bytes(), TEST_BYTES);
}

View File

@ -11,13 +11,15 @@ use std::marker::PhantomData;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::pin::Pin;
use std::task::Context;
use futures::{Async, Future, Poll, Stream};
use tokio_timer::Timeout;
use futures::{Future, Poll, Stream};
use tokio_timer::timeout::{Elapsed, Timeout};
use crate::error::ProtoError;
use crate::op::message::NoopMessageFinalizer;
use crate::op::{Message, MessageFinalizer, OpCode};
use crate::op::{MessageFinalizer, OpCode};
use crate::udp::udp_stream::{NextRandomUdpSocket, UdpSocket};
use crate::xfer::{DnsRequest, DnsRequestSender, DnsResponse, SerialMessage};
@ -104,7 +106,7 @@ fn random_query_id() -> u16 {
impl<S: UdpSocket + Send + 'static, MF: MessageFinalizer> DnsRequestSender
for UdpClientStream<S, MF>
{
type DnsResponseFuture = UdpResponse<S>;
type DnsResponseFuture = UdpResponse;
fn send_message(&mut self, mut message: DnsRequest) -> Self::DnsResponseFuture {
if self.is_shutdown {
@ -123,10 +125,7 @@ impl<S: UdpSocket + Send + 'static, MF: MessageFinalizer> DnsRequestSender
Err(err) => {
let err: ProtoError = err;
return UdpResponse(Timeout::new(
SingleUseUdpSocket::Errored(Some(err)),
self.timeout,
));
return UdpResponse::complete(SingleUseUdpSocket::errored(err));
}
};
@ -138,10 +137,7 @@ impl<S: UdpSocket + Send + 'static, MF: MessageFinalizer> DnsRequestSender
if let Some(ref signer) = self.signer {
if let Err(e) = message.finalize::<MF>(signer.borrow(), now) {
debug!("could not sign message: {}", e);
return UdpResponse(Timeout::new(
SingleUseUdpSocket::Errored(Some(e)),
self.timeout,
));
return UdpResponse::complete(SingleUseUdpSocket::errored(e));
}
}
}
@ -149,24 +145,18 @@ impl<S: UdpSocket + Send + 'static, MF: MessageFinalizer> DnsRequestSender
let bytes = match message.to_vec() {
Ok(bytes) => bytes,
Err(err) => {
return UdpResponse(Timeout::new(
SingleUseUdpSocket::Errored(Some(err)),
self.timeout,
));
return UdpResponse::complete(SingleUseUdpSocket::errored(err));
}
};
let message_id = message.id();
let message = SerialMessage::new(bytes, self.name_server);
UdpResponse::new(message, message_id, self.timeout)
UdpResponse::new::<S>(message, message_id, self.timeout)
}
fn error_response(err: ProtoError) -> Self::DnsResponseFuture {
UdpResponse(Timeout::new(
SingleUseUdpSocket::Errored(Some(err)),
Duration::from_secs(5), // this should never be triggered
))
UdpResponse::complete(SingleUseUdpSocket::errored(err))
}
fn shutdown(&mut self) {
@ -180,43 +170,47 @@ impl<S: UdpSocket + Send + 'static, MF: MessageFinalizer> DnsRequestSender
// TODO: is this impl necessary? there's nothing being driven here...
impl<S: Send, MF: MessageFinalizer> Stream for UdpClientStream<S, MF> {
type Item = ();
type Error = ProtoError;
type Item = Result<(), ProtoError>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
// Technically the Stream doesn't actually do anything.
if self.is_shutdown {
Ok(Async::Ready(None))
Poll::Ready(None)
} else {
Ok(Async::Ready(Some(())))
Poll::Ready(Some(Ok(())))
}
}
}
/// A future that resolves to
pub struct UdpResponse<S>(Timeout<SingleUseUdpSocket<S>>);
pub struct UdpResponse(Pin<Box<dyn Future<Output = Result<Result<DnsResponse, ProtoError>, Elapsed>> + Send>>);
impl<S> UdpResponse<S> {
impl UdpResponse {
/// creates a new future for the request
///
/// # Arguments
///
/// * `request` - Serialized message being sent
/// * `message_id` - Id of the message that was encoded in the serial message
fn new(request: SerialMessage, message_id: u16, timeout: Duration) -> Self {
UdpResponse(Timeout::new(
SingleUseUdpSocket::StartSend(Some(request), message_id),
fn new<S: UdpSocket + Send + Unpin + 'static>(request: SerialMessage, message_id: u16, timeout: Duration) -> Self {
UdpResponse(Box::pin(Timeout::new(
SingleUseUdpSocket::send_serial_message::<S>(request, message_id),
timeout,
))
)))
}
/// ad already completed future
fn complete<F: Future<Output = Result<DnsResponse, ProtoError>> + Send + 'static>(f: F) -> Self {
// FIXME: this constructure isn't really necessary
UdpResponse(Box::pin(Timeout::new(f, Duration::from_secs(5))))
}
}
impl<S: UdpSocket> Future for UdpResponse<S> {
type Item = DnsResponse;
type Error = ProtoError;
impl Future for UdpResponse {
type Output = Result<DnsResponse, ProtoError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.0.poll().map_err(ProtoError::from)
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.0.as_mut().poll(cx).map_err(ProtoError::from).map(|r| r.and_then(|r| r))
}
}
@ -232,12 +226,12 @@ where
marker: PhantomData<S>,
}
impl<S: Send, MF: MessageFinalizer> Future for UdpClientConnect<S, MF> {
type Item = UdpClientStream<S, MF>;
type Error = ProtoError;
impl<S: Send + Unpin, MF: MessageFinalizer> Future for UdpClientConnect<S, MF> {
type Output = Result<UdpClientStream<S, MF>, ProtoError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Ok(Async::Ready(UdpClientStream::<S, MF> {
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
// TODO: this doesn't need to be a future?
Poll::Ready(Ok(UdpClientStream::<S, MF> {
name_server: self
.name_server
.take()
@ -250,142 +244,92 @@ impl<S: Send, MF: MessageFinalizer> Future for UdpClientConnect<S, MF> {
}
}
enum SingleUseUdpSocket<S> {
StartSend(Option<SerialMessage>, u16),
Connect(Option<SerialMessage>, NextRandomUdpSocket<S>, u16),
Send(Option<SerialMessage>, Option<S>, u16),
AwaitResponse(Option<SerialMessage>, S, u16),
Response(Option<Message>),
Errored(Option<ProtoError>),
}
struct SingleUseUdpSocket;
impl<S: UdpSocket> Future for SingleUseUdpSocket<S> {
type Item = DnsResponse;
type Error = ProtoError;
impl SingleUseUdpSocket {
async fn send_serial_message<S: UdpSocket>(msg: SerialMessage, msg_id: u16) -> Result<DnsResponse, ProtoError> {
let name_server = msg.addr();
let mut socket: S = NextRandomUdpSocket::new(&name_server).await?;
let bytes = msg.bytes();
let addr = &msg.addr();
let len_sent: usize = socket.send_to(bytes, addr).await?;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if bytes.len() != len_sent {
return Err(ProtoError::from(format!("Not all bytes of message sent, {} of {}", len_sent, bytes.len())))
}
// TODO: limit the max number of attempted messages? this relies on a timeout to die...
loop {
*self = match *self {
SingleUseUdpSocket::StartSend(ref mut msg, msg_id) => {
// get a new socket to use
let msg = msg.take();
let name_server = msg
.as_ref()
.expect("SingleUseUdpSocket::StartSend invalid state: msg")
.addr();
SingleUseUdpSocket::Connect(msg, NextRandomUdpSocket::new(&name_server), msg_id)
}
SingleUseUdpSocket::Connect(ref mut msg, ref mut future_socket, msg_id) => {
let socket = try_ready!(future_socket.poll());
// TODO: connect the socket here on merge into master
// TODO: consider making this heap based? need to verify it matches EDNS settings
let mut recv_buf = [0u8; 2048];
// send the message, and then await the response
SingleUseUdpSocket::Send(msg.take(), Some(socket), msg_id)
}
SingleUseUdpSocket::Send(ref mut msg, ref mut socket, msg_id) => {
try_ready!(socket
.as_mut()
.expect("SingleUseUdpSocket::Send invalid state: socket1")
.poll_send_to(
msg.as_ref()
.expect("SingleUseUdpSocket::Send invalid state: msg1")
.bytes(),
&msg.as_ref()
.expect("SingleUseUdpSocket::Send invalid state: msg2")
.addr()
));
let (len, src) = socket.recv_from(&mut recv_buf).await?;
let response = SerialMessage::new(recv_buf.iter().take(len).cloned().collect(), src);
// message is sent, await the response
SingleUseUdpSocket::AwaitResponse(
msg.take(),
socket
.take()
.expect("SingleUseUdpSocket::Send invalid state: socket2"),
msg_id,
)
}
SingleUseUdpSocket::AwaitResponse(ref mut request, ref mut socket, msg_id) => {
// TODO: consider making this heap based? need to verify it matches EDNS settings
let mut buf = [0u8; 2048];
// compare expected src to received packet
let request_target = msg.addr();
let (len, src) = try_ready!(socket.poll_recv_from(&mut buf));
let response = SerialMessage::new(buf.iter().take(len).cloned().collect(), src);
if response.addr() != request_target {
warn!(
"ignoring response from {} because it does not match name_server: {}.",
response.addr(),
request_target,
);
// compare expected src to received packet
let request_target = request
.as_ref()
.expect("SingleUseUdpSocket::AwaitResponse invalid state: msg")
.addr();
// await an answer from the correct NameServer
continue;
}
if response.addr() != request_target {
// TODO: match query strings from request and response?
match response.to_message() {
Ok(message) => {
if msg_id == message.id() {
debug!("received message id: {}", message.id());
return Ok(DnsResponse::from(message))
} else {
// on wrong id, attempted poison?
warn!(
"ignoring response from {} because it does not match name_server: {}.",
response.addr(),
request_target
"expected message id: {} got: {}, dropped",
msg_id,
message.id()
);
// await an answer from the correct NameServer
//SingleUseUdpSocket::AwaitResponse(msg.take(), socket.take(), msg_id)
continue;
}
// TODO: match query strings from request and response?
match response.to_message() {
Ok(message) => {
if msg_id == message.id() {
debug!("received message id: {}", message.id());
SingleUseUdpSocket::Response(Some(message))
} else {
// on wrong id, attempted poison?
warn!(
"expected message id: {} got: {}, dropped",
msg_id,
message.id()
);
//SingleUseUdpSocket::AwaitResponse(msg.take(), socket.take(), msg_id)
continue;
}
}
Err(e) => {
// on errors deserializing, continue
warn!(
"dropped malformed message waiting for id: {} err: {}",
msg_id, e
);
//SingleUseUdpSocket::AwaitResponse(msg.take(), socket.take(), msg_id)
continue;
}
}
}
SingleUseUdpSocket::Response(ref mut response) => {
// finally return the message
return Ok(Async::Ready(
response
.take()
.expect("SingleUseUdpSocket::Send invalid state: already complete")
.into(),
));
}
SingleUseUdpSocket::Errored(ref mut error) => {
// finally return the error
return Err(error
.take()
.expect("SingleUseUdpSocket::Errored invalid state: already complete"));
Err(e) => {
// on errors deserializing, continue
warn!(
"dropped malformed message waiting for id: {} err: {}",
msg_id, e
);
//SingleUseUdpSocket::AwaitResponse(msg.take(), socket.take(), msg_id)
continue;
}
}
}
}
// FIXME: this is unnecessary
async fn errored(err: ProtoError) -> Result<DnsResponse, ProtoError> {
futures::future::err(err).await
}
}
#[cfg(test)]
mod tests {
#[cfg(not(target_os = "linux"))]
#[cfg(test)]
use std::net::Ipv6Addr;
#[cfg(test)]
use std::net::{IpAddr, Ipv4Addr};
#[cfg(test)]
use tokio_udp;
use crate::op::Message;
use super::*;
#[test]
fn test_udp_client_stream_ipv4() {
udp_client_stream_test(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))
@ -397,7 +341,6 @@ fn test_udp_client_stream_ipv6() {
udp_client_stream_test(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)))
}
#[cfg(test)]
fn udp_client_stream_test(server_addr: IpAddr) {
use crate::op::Query;
use crate::rr::rdata::NULL;
@ -518,3 +461,4 @@ fn udp_client_stream_test(server_addr: IpAddr) {
assert!(worked_once);
}
}

View File

@ -8,35 +8,42 @@
use std::io;
use std::marker::PhantomData;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use futures::stream::{Fuse, Peekable, Stream};
use futures::sync::mpsc::{unbounded, UnboundedReceiver};
use futures::task;
use futures::{Async, Future, Poll};
use async_trait::async_trait;
use futures::channel::mpsc::{unbounded, UnboundedReceiver};
use futures::lock::Mutex;
use futures::stream::{Fuse, Peekable, Stream, StreamExt};
use futures::{ready, Future, Poll, TryFutureExt};
use rand;
use rand::distributions::{uniform::Uniform, Distribution};
use crate::xfer::{BufStreamHandle, SerialMessage};
/// Trait for UdpSocket
#[async_trait]
pub trait UdpSocket
where
Self: Sized,
Self: Sized + Unpin,
{
/// UdpSocket
fn bind(addr: &SocketAddr) -> io::Result<Self>;
/// Receive data from the socket and returns the number of bytes read and the address from
/// where the data came on success.
fn poll_recv_from(&mut self, buf: &mut [u8]) -> Poll<(usize, SocketAddr), io::Error>;
async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)>;
/// Send data to the given address.
fn poll_send_to(&mut self, buf: &[u8], target: &SocketAddr) -> Poll<(), io::Error>;
async fn send_to(&mut self, buf: &[u8], target: &SocketAddr) -> io::Result<usize>;
}
/// A UDP stream of DNS binary packets
#[must_use = "futures do nothing unless polled"]
pub struct UdpStream<S> {
socket: S,
pub struct UdpStream<S: Send> {
socket: Arc<Mutex<S>>,
sending: Option<Pin<Box<dyn Future<Output = io::Result<usize>> + Send>>>,
outbound_messages: Peekable<Fuse<UnboundedReceiver<SerialMessage>>>,
receiving: Option<Pin<Box<dyn Future<Output = io::Result<SerialMessage>> + Send>>>,
}
impl<S: UdpSocket + Send + 'static> UdpStream<S> {
@ -56,7 +63,7 @@ impl<S: UdpSocket + Send + 'static> UdpStream<S> {
pub fn new(
name_server: SocketAddr,
) -> (
Box<dyn Future<Item = UdpStream<S>, Error = io::Error> + Send>,
Box<dyn Future<Output = Result<UdpStream<S>, io::Error>> + Send + Unpin>,
BufStreamHandle,
) {
let (message_sender, outbound_messages) = unbounded();
@ -68,9 +75,11 @@ impl<S: UdpSocket + Send + 'static> UdpStream<S> {
// This set of futures collapses the next udp socket into a stream which can be used for
// sending and receiving udp packets.
let stream = Box::new(next_socket.map(move |socket| UdpStream {
socket,
let stream = Box::new(next_socket.map_ok(move |socket| UdpStream {
socket: Arc::new(Mutex::new(socket)),
sending: None,
outbound_messages: outbound_messages.fuse().peekable(),
receiving: None,
}));
(stream, message_sender)
@ -94,8 +103,10 @@ impl<S: UdpSocket + Send + 'static> UdpStream<S> {
let message_sender = BufStreamHandle::new(message_sender);
let stream = UdpStream {
socket,
socket: Arc::new(Mutex::new(socket)),
sending: None,
outbound_messages: outbound_messages.fuse().peekable(),
receiving: None,
};
(stream, message_sender)
@ -107,52 +118,99 @@ impl<S: UdpSocket + Send + 'static> UdpStream<S> {
outbound_messages: UnboundedReceiver<SerialMessage>,
) -> Self {
UdpStream {
socket,
socket: Arc::new(Mutex::new(socket)),
sending: None,
outbound_messages: outbound_messages.fuse().peekable(),
receiving: None,
}
}
}
impl<S: UdpSocket> Stream for UdpStream<S> {
type Item = SerialMessage;
type Error = io::Error;
impl<S: Send> UdpStream<S> {
fn pollable_split(&mut self) -> (
&mut Arc<Mutex<S>>,
&mut Option<Pin<Box<dyn Future<Output = io::Result<usize>> + Send>>>,
&mut Peekable<Fuse<UnboundedReceiver<SerialMessage>>>,
&mut Option<Pin<Box<dyn Future<Output = io::Result<SerialMessage>> + Send>>>) {
(&mut self.socket, &mut self.sending, &mut self.outbound_messages, &mut self.receiving)
}
}
impl<S: UdpSocket + Send + 'static> Stream for UdpStream<S> {
type Item = Result<SerialMessage, io::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let (socket, sending, outbound_messages, receiving) = self.pollable_split();
let mut outbound_messages = Pin::new(outbound_messages);
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// this will not accept incoming data while there is data to send
// makes this self throttling.
loop {
// if there's something currently sending, send it
if let Some(ref mut sending) = sending {
ready!(sending.as_mut().poll(cx))?;
}
*sending = None;
// first try to send
match self
.outbound_messages
.peek()
.map_err(|()| io::Error::new(io::ErrorKind::Other, "unknown"))?
match outbound_messages.as_mut().poll_next(cx)
{
Async::Ready(Some(ref message)) => {
Poll::Ready(Some(message)) => {
let socket = Arc::clone(socket);
let sending_fut = async {
let message = message;
let socket = socket;
let mut socket = socket.lock().await;
let addr = &message.addr();
socket.send_to(message.bytes(), addr).await
};
// will return if the socket will block
try_ready!(self.socket.poll_send_to(message.bytes(), &message.addr()));
*sending = Some(Box::pin(sending_fut));
}
// now we get to drop through to the receives...
// TODO: should we also return None if there are no more messages to send?
Async::NotReady | Async::Ready(None) => break,
Poll::Pending | Poll::Ready(None) => break,
}
// now pop the request which is already sent
// If it were an Err, it was returned on peeking.
self.outbound_messages.poll().expect("Impossible");
}
// For QoS, this will only accept one message and output that
// receive all inbound messages
// TODO: this should match edns settings
let mut buf = [0u8; 2048];
loop {
let msg = if let Some(receiving) = receiving {
// TODO: should we drop this packet if it's not from the same src as dest?
let msg = ready!(receiving.as_mut().poll(cx))?;
// TODO: should we drop this packet if it's not from the same src as dest?
let (len, src) = try_ready!(self.socket.poll_recv_from(&mut buf));
Ok(Async::Ready(Some(SerialMessage::new(
buf.iter().take(len).cloned().collect(),
src,
))))
Some(Poll::Ready(Some(Ok(msg))))
} else {
None
};
*receiving = None;
if let Some(msg) = msg {
return msg;
}
let socket = Arc::clone(socket);
let receive_future = async {
let socket = socket;
let mut buf = [0u8; 2048];
let mut socket = socket.lock().await;
let (len, src) = socket.recv_from(&mut buf).await?;
Ok(SerialMessage::new(
buf.iter().take(len).cloned().collect(),
src,
))
};
*receiving = Some(Box::pin(receive_future));
}
}
}
@ -178,13 +236,12 @@ impl<S> NextRandomUdpSocket<S> {
}
impl<S: UdpSocket> Future for NextRandomUdpSocket<S> {
type Item = S;
type Error = io::Error;
type Output = Result<S, io::Error>;
/// polls until there is an available next random UDP port.
///
/// if there is no port available after 10 attempts, returns NotReady
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let rand_port_range = Uniform::new_inclusive(1025_u16, u16::max_value());
let mut rand = rand::thread_rng();
@ -196,7 +253,7 @@ impl<S: UdpSocket> Future for NextRandomUdpSocket<S> {
match S::bind(&zero_addr) {
Ok(socket) => {
debug!("created socket successfully");
return Ok(Async::Ready(socket));
return Poll::Ready(Ok(socket));
}
Err(err) => debug!("unable to bind port, attempt: {}: {}", attempt, err),
}
@ -204,9 +261,11 @@ impl<S: UdpSocket> Future for NextRandomUdpSocket<S> {
debug!("could not get next random port, delaying");
task::current().notify();
// FIXME: this replaced task::current().notify();
cx.waker().wake_by_ref();
// returning NotReady here, perhaps the next poll there will be some more socket available.
Ok(Async::NotReady)
Poll::Pending
}
}
@ -240,18 +299,18 @@ fn test_udp_stream_ipv6() {
use tokio_udp;
#[cfg(feature = "tokio-compat")]
#[async_trait]
impl UdpSocket for tokio_udp::UdpSocket {
fn bind(addr: &SocketAddr) -> io::Result<Self> {
tokio_udp::UdpSocket::bind(addr)
}
fn poll_recv_from(&mut self, buf: &mut [u8]) -> Poll<(usize, SocketAddr), io::Error> {
self.poll_recv_from(buf)
async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
self.recv_from(buf).await
}
fn poll_send_to(&mut self, buf: &[u8], target: &SocketAddr) -> Poll<(), io::Error> {
self.poll_send_to(buf, target).map(|x| match x {
Async::Ready(_) => Async::Ready(()),
Async::NotReady => Async::NotReady,
})
async fn send_to(&mut self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> {
self.send_to(buf, target).await
}
}
@ -331,9 +390,9 @@ fn udp_stream_test(server_addr: IpAddr) {
sender
.unbounded_send(SerialMessage::new(test_bytes.to_vec(), server_addr))
.unwrap();
let (buffer_and_addr, stream_tmp) = io_loop.block_on(stream.into_future()).ok().unwrap();
let (buffer_and_addr, stream_tmp) = io_loop.block_on(stream.into_future());
stream = stream_tmp;
let message = buffer_and_addr.expect("no buffer received");
let message = buffer_and_addr.expect("no buffer received").expect("error receiving buffer");
assert_eq!(message.bytes(), test_bytes);
assert_eq!(message.addr(), server_addr);
}

View File

@ -7,9 +7,12 @@
//! This module contains all the types for demuxing DNS oriented streams.
use futures::stream::{Peekable, Stream};
use futures::sync::mpsc::{unbounded, UnboundedReceiver};
use futures::{Async, Future, Poll};
use std::pin::Pin;
use std::task::Context;
use futures::stream::{Peekable, Stream, StreamExt};
use futures::{Future, FutureExt, Poll};
use futures::channel::mpsc::{unbounded, UnboundedReceiver};
use crate::error::*;
use crate::xfer::{DnsRequest, DnsRequestSender, DnsRequestStreamHandle, DnsResponse, OneshotDnsRequest};
@ -21,7 +24,7 @@ use crate::xfer::{DnsRequest, DnsRequestSender, DnsRequestStreamHandle, DnsRespo
pub struct DnsExchange<S, R>
where
S: DnsRequestSender<DnsResponseFuture = R>,
R: Future<Item = DnsResponse, Error = ProtoError> + 'static + Send,
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send,
{
io_stream: S,
outbound_messages: Peekable<UnboundedReceiver<OneshotDnsRequest<R>>>,
@ -30,7 +33,7 @@ where
impl<S, R> DnsExchange<S, R>
where
S: DnsRequestSender<DnsResponseFuture = R>,
R: Future<Item = DnsResponse, Error = ProtoError> + 'static + Send,
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send,
{
/// Initializes a TcpStream with an existing tokio_tcp::TcpStream.
///
@ -64,7 +67,7 @@ where
/// The connect_future should be lazy.
pub fn connect<F>(connect_future: F) -> (DnsExchangeConnect<F, S, R>, DnsRequestStreamHandle<R>)
where
F: Future<Item = S, Error = ProtoError> + 'static + Send,
F: Future<Output = Result<S, ProtoError>> + 'static + Send + Unpin,
{
let (message_sender, outbound_messages) = unbounded();
(
@ -72,72 +75,76 @@ where
DnsRequestStreamHandle::<R>::new(message_sender),
)
}
fn pollable_split(&mut self) -> (&mut S, &mut Peekable<UnboundedReceiver<OneshotDnsRequest<R>>>) {
(&mut self.io_stream, &mut self.outbound_messages)
}
}
impl<S, R> Future for DnsExchange<S, R>
where
S: DnsRequestSender<DnsResponseFuture = R>,
R: Future<Item = DnsResponse, Error = ProtoError> + 'static + Send,
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send,
{
type Item = ();
type Error = ProtoError;
type Output = Result<(), ProtoError>;
#[allow(clippy::unused_unit)]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let (io_stream, outbound_messages) = self.pollable_split();
let mut io_stream = Pin::new(io_stream);
let mut outbound_messages = Pin::new(outbound_messages);
// this will not accept incoming data while there is data to send
// makes this self throttling.
loop {
// poll the underlying stream, to drive it...
match self.io_stream.poll() {
match io_stream.as_mut().poll_next(cx) {
// The stream is ready
Ok(Async::Ready(Some(()))) => (),
Ok(Async::NotReady) => {
if self.io_stream.is_shutdown() {
Poll::Ready(Some(Ok(()))) => (),
Poll::Pending => {
if io_stream.is_shutdown() {
// the io_stream is in a shutdown state, we are only waiting for final results...
return Ok(Async::NotReady);
return Poll::Pending;
}
// NotReady and not shutdown, see if there are more messages to send
()
} // underlying stream is complete.
Ok(Async::Ready(None)) => {
Poll::Ready(None) => {
debug!("io_stream is done, shutting down");
// TODO: return shutdown error to anything in the stream?
return Ok(Async::Ready(()));
return Poll::Ready(Ok(()));
}
Err(err) => return Err(err),
Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)),
}
// then see if there is more to send
match self
.outbound_messages
.poll()
.map_err(|()| ProtoError::from("unknown from outbound_message receiver"))?
match outbound_messages.as_mut().poll_next(cx)
{
// already handled above, here to make sure the poll() pops the next message
Async::Ready(Some(dns_request)) => {
Poll::Ready(Some(dns_request)) => {
// if there is no peer, this connection should die...
let (dns_request, serial_response): (DnsRequest, _) = dns_request.unwrap();
debug!("sending message via: {}", self.io_stream);
debug!("sending message via: {}", io_stream);
match serial_response.send_response(self.io_stream.send_message(dns_request)) {
match serial_response.send_response(io_stream.send_message(dns_request)) {
Ok(()) => (),
Err(_) => {
warn!("failed to associate send_message response to the sender");
return Err(
return Poll::Ready(Err(
"failed to associate send_message response to the sender".into()
);
));
}
}
}
// On not ready, this is our time to return...
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(None) => {
debug!("all handles closed, shutting down: {}", self.io_stream);
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => {
debug!("all handles closed, shutting down: {}", io_stream);
// if there is nothing that can use this connection to send messages, then this is done...
self.io_stream.shutdown();
io_stream.shutdown();
// now we'll await the stream to shutdown... see io_stream poll above
}
@ -151,15 +158,15 @@ where
/// A wrapper for a future DnsExchange connection
pub struct DnsExchangeConnect<F, S, R>(DnsExchangeConnectInner<F, S, R>)
where
F: Future<Item = S, Error = ProtoError> + 'static + Send,
F: Future<Output = Result<S, ProtoError>> + 'static + Send + Unpin,
S: DnsRequestSender<DnsResponseFuture = R>,
R: Future<Item = DnsResponse, Error = ProtoError> + 'static + Send;
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send;
impl<F, S, R> DnsExchangeConnect<F, S, R>
where
F: Future<Item = S, Error = ProtoError> + 'static + Send,
F: Future<Output = Result<S, ProtoError>> + 'static + Send + Unpin,
S: DnsRequestSender<DnsResponseFuture = R>,
R: Future<Item = DnsResponse, Error = ProtoError> + 'static + Send,
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send,
{
fn connect(
connect_future: F,
@ -174,23 +181,22 @@ where
impl<F, S, R> Future for DnsExchangeConnect<F, S, R>
where
F: Future<Item = S, Error = ProtoError> + 'static + Send,
F: Future<Output = Result<S, ProtoError>> + 'static + Send + Unpin,
S: DnsRequestSender<DnsResponseFuture = R>,
R: Future<Item = DnsResponse, Error = ProtoError> + 'static + Send,
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send,
{
type Item = DnsExchange<S, R>;
type Error = ProtoError;
type Output = Result<DnsExchange<S, R>, ProtoError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.0.poll()
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.0.poll_unpin(cx)
}
}
enum DnsExchangeConnectInner<F, S, R>
where
F: Future<Item = S, Error = ProtoError> + 'static + Send,
F: Future<Output = Result<S, ProtoError>> + 'static + Send,
S: DnsRequestSender<DnsResponseFuture = R>,
R: Future<Item = DnsResponse, Error = ProtoError> + 'static + Send,
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send,
{
Connecting {
connect_future: F,
@ -204,33 +210,33 @@ where
impl<F, S, R> Future for DnsExchangeConnectInner<F, S, R>
where
F: Future<Item = S, Error = ProtoError> + 'static + Send,
F: Future<Output = Result<S, ProtoError>> + 'static + Send + Unpin,
S: DnsRequestSender<DnsResponseFuture = R>,
R: Future<Item = DnsResponse, Error = ProtoError> + 'static + Send,
R: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send,
{
type Item = DnsExchange<S, R>;
type Error = ProtoError;
type Output = Result<DnsExchange<S, R>, ProtoError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
loop {
let next;
match self {
match *self {
DnsExchangeConnectInner::Connecting {
ref mut connect_future,
ref mut outbound_messages,
} => {
match connect_future.poll() {
Ok(Async::Ready(stream)) => {
let connect_future = Pin::new(connect_future);
match connect_future.poll(cx) {
Poll::Ready(Ok(stream)) => {
debug!("connection established: {}", stream);
return Ok(Async::Ready(DnsExchange::from_stream_with_receiver(
return Poll::Ready(Ok(DnsExchange::from_stream_with_receiver(
stream,
outbound_messages
.take()
.expect("cannot poll after complete"),
)));
}
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(error) => {
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(error)) => {
debug!("stream errored while connecting: {:?}", error);
next = DnsExchangeConnectInner::FailAll {
error,
@ -242,20 +248,19 @@ where
};
}
DnsExchangeConnectInner::FailAll {
error,
ref error,
ref mut outbound_messages,
} => {
while let Some(outbound_message) = match outbound_messages.poll() {
Ok(Async::Ready(opt)) => opt,
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(_) => None,
while let Some(outbound_message) = match outbound_messages.poll_next_unpin(cx) {
Poll::Ready(opt) => opt,
Poll::Pending => return Poll::Pending,
} {
let response = S::error_response(error.clone());
// ignoring errors... best effort send...
outbound_message.unwrap().1.send_response(response).ok();
}
return Err(error.clone());
return Poll::Ready(Err(error.clone()));
}
}

View File

@ -7,9 +7,9 @@
//! `DnsHandle` types perform conversions of the raw DNS messages before sending the messages on the specified streams.
use futures::sync::mpsc::UnboundedSender;
use futures::sync::oneshot;
use futures::{Future, IntoFuture};
use futures::future::{Future, FutureExt, TryFutureExt};
use futures::channel::mpsc::UnboundedSender;
use futures::channel::oneshot;
use rand;
use crate::error::*;
@ -66,12 +66,12 @@ impl BasicDnsHandle {
}
impl DnsHandle for BasicDnsHandle {
type Response = Box<dyn Future<Item = DnsResponse, Error = ProtoError> + Send>;
type Response = Box<dyn Future<Output = Result<DnsResponse, ProtoError>> + Send + Unpin>;
fn send<R: Into<DnsRequest>>(
&mut self,
request: R,
) -> Box<dyn Future<Item = DnsResponse, Error = ProtoError> + Send> {
) -> Self::Response {
let request = request.into();
let (complete, receiver) = oneshot::channel();
let message_sender: &mut _ = &mut self.message_sender;
@ -93,8 +93,7 @@ impl DnsHandle for BasicDnsHandle {
Box::new(
receiver
.map_err(|c| ProtoError::from(ProtoErrorKind::Canceled(c)))
.map(IntoFuture::into_future)
.flatten(),
.map(|r| r.and_then(|r| r)),
)
}
}
@ -102,7 +101,7 @@ impl DnsHandle for BasicDnsHandle {
/// A trait for implementing high level functions of DNS.
pub trait DnsHandle: 'static + Clone + Send {
/// The associated response from the response future, this should resolve to the Response message
type Response: Future<Item = DnsResponse, Error = ProtoError> + 'static + Send;
type Response: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send + Unpin;
/// Only returns true if and only if this DNS handle is validating DNSSec.
///

View File

@ -11,12 +11,14 @@ use std::borrow::Borrow;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt::{self, Display};
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use futures::stream::Stream;
use futures::sync::oneshot;
use futures::{task, Async, Future, Poll};
use futures::stream::{Stream, StreamExt};
use futures::channel::oneshot;
use futures::{ready, Future, FutureExt, Poll};
use rand;
use rand::distributions::{Distribution, Standard};
use smallvec::SmallVec;
@ -63,8 +65,10 @@ impl ActiveRequest {
}
/// polls the timeout and converts the error
fn poll_timeout(&mut self) -> Poll<(), ProtoError> {
self.timeout.poll().map_err(ProtoError::from)
fn poll_timeout(&mut self, cx: &mut Context) -> Poll<Result<(), ProtoError>> {
let timeout = Pin::new(&mut self.timeout);
// FIXME: change return to infallible
timeout.poll(cx).map(|r| Ok(r))
}
/// Returns true of the other side canceled the request
@ -127,7 +131,7 @@ where
impl<S, MF> DnsMultiplexer<S, MF, Box<dyn DnsStreamHandle>>
where
S: DnsClientStream + 'static,
S: DnsClientStream + Unpin + 'static,
MF: MessageFinalizer,
{
/// Spawns a new DnsMultiplexer Stream. This uses a default timeout of 5 seconds for all requests.
@ -145,7 +149,7 @@ where
signer: Option<Arc<MF>>,
) -> DnsMultiplexerConnect<F, S, MF>
where
F: Future<Item = S, Error = ProtoError> + Send + 'static,
F: Future<Output = Result<S, ProtoError>> + Send + Unpin + 'static,
{
Self::with_timeout(stream, stream_handle, Duration::from_secs(5), signer)
}
@ -167,7 +171,7 @@ where
signer: Option<Arc<MF>>,
) -> DnsMultiplexerConnect<F, S, MF>
where
F: Future<Item = S, Error = ProtoError> + Send + 'static,
F: Future<Output = Result<S, ProtoError>> + Send + Unpin + 'static,
{
DnsMultiplexerConnect {
stream,
@ -179,7 +183,7 @@ where
/// loop over active_requests and remove cancelled requests
/// this should free up space if we already had 4096 active requests
fn drop_cancelled(&mut self) {
fn drop_cancelled(&mut self, cx: &mut Context) {
let mut canceled = HashMap::<u16, ProtoError>::new();
for (&id, ref mut active_req) in &mut self.active_requests {
if active_req.is_canceled() {
@ -187,13 +191,13 @@ where
}
// check for timeouts...
match active_req.poll_timeout() {
Ok(Async::Ready(_)) => {
match active_req.poll_timeout(cx) {
Poll::Ready(Ok(_)) => {
debug!("request timed out: {}", id);
canceled.insert(id, ProtoError::from(ProtoErrorKind::Timeout));
}
Ok(Async::NotReady) => (),
Err(e) => {
Poll::Pending => (),
Poll::Ready(Err(e)) => {
error!("unexpected error from timeout: {}", e);
canceled.insert(id, ProtoError::from("error registering timeout"));
}
@ -215,20 +219,22 @@ where
}
/// creates random query_id, validates against all active queries
fn next_random_query_id(&self) -> Async<u16> {
fn next_random_query_id(&self) -> Poll<u16> {
let mut rand = rand::thread_rng();
for _ in 0..100 {
let id: u16 = Standard.sample(&mut rand); // the range is [0 ... u16::max]
if !self.active_requests.contains_key(&id) {
return Async::Ready(id);
return Poll::Ready(id);
}
}
warn!("could not get next random query id, delaying");
task::current().notify();
Async::NotReady
// FIXME: need a Waker here, right?
panic!("could not get next random query id, delaying");
//task::current().notify();
Poll::Pending
}
/// Closes all outstanding completes with a closed stream error
@ -258,8 +264,8 @@ where
#[must_use = "futures do nothing unless polled"]
pub struct DnsMultiplexerConnect<F, S, MF>
where
F: Future<Item = S, Error = ProtoError> + Send + 'static,
S: Stream<Item = SerialMessage, Error = ProtoError>,
F: Future<Output = Result<S, ProtoError>> + Send + Unpin + 'static,
S: Stream<Item = Result<SerialMessage, ProtoError>> + Unpin,
MF: MessageFinalizer + Send + Sync + 'static,
{
stream: F,
@ -270,17 +276,16 @@ where
impl<F, S, MF> Future for DnsMultiplexerConnect<F, S, MF>
where
F: Future<Item = S, Error = ProtoError> + Send + 'static,
S: DnsClientStream + 'static,
F: Future<Output = Result<S, ProtoError>> + Send + Unpin + 'static,
S: DnsClientStream + Unpin + 'static,
MF: MessageFinalizer + Send + Sync + 'static,
{
type Item = DnsMultiplexer<S, MF, Box<dyn DnsStreamHandle>>;
type Error = ProtoError;
type Output = Result<DnsMultiplexer<S, MF, Box<dyn DnsStreamHandle>>, ProtoError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let stream: S = try_ready!(self.stream.poll());
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let stream: S = ready!(self.stream.poll_unpin(cx))?;
Ok(Async::Ready(DnsMultiplexer {
Poll::Ready(Ok(DnsMultiplexer {
stream,
timeout_duration: self.timeout_duration,
stream_handle: self
@ -306,7 +311,7 @@ where
impl<S, MF> DnsRequestSender for DnsMultiplexer<S, MF>
where
S: DnsClientStream + 'static,
S: DnsClientStream + Unpin + 'static,
MF: MessageFinalizer + Send + Sync + 'static,
{
type DnsResponseFuture = DnsMultiplexerSerialResponse;
@ -318,8 +323,8 @@ where
// get next query_id
let query_id: u16 = match self.next_random_query_id() {
Async::Ready(id) => id,
Async::NotReady => {
Poll::Ready(id) => id,
Poll::Pending => {
return DnsMultiplexerSerialResponseInner::Err(Some(ProtoError::from(
"id space exhausted, consider filing an issue",
))).into()
@ -401,19 +406,18 @@ where
impl<S, MF> Stream for DnsMultiplexer<S, MF>
where
S: DnsClientStream + 'static,
S: DnsClientStream + Unpin + 'static,
MF: MessageFinalizer + Send + Sync + 'static,
{
type Item = ();
type Error = ProtoError;
type Item = Result<(), ProtoError>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
// Always drop the cancelled queries first
self.drop_cancelled();
self.drop_cancelled(cx);
if self.is_shutdown && self.active_requests.is_empty() {
debug!("stream is done: {}", self);
return Ok(Async::Ready(None));
return Poll::Ready(None);
}
// Collect all inbound requests, max 100 at a time for QoS
@ -421,8 +425,8 @@ where
// TODO: make the QoS configurable
let mut messages_received = 0;
for i in 0..QOS_MAX_RECEIVE_MSGS {
match self.stream.poll()? {
Async::Ready(Some(buffer)) => {
match self.stream.poll_next_unpin(cx)? {
Poll::Ready(Some(buffer)) => {
messages_received = i;
// deserialize or log decode_error
@ -450,12 +454,12 @@ where
Err(e) => debug!("error decoding message: {}", e),
}
}
Async::Ready(None) => {
Poll::Ready(None) => {
debug!("io_stream closed by other side: {}", self.stream);
self.stream_closed_close_all();
return Ok(Async::Ready(None));
return Poll::Ready(None);
}
Async::NotReady => break,
Poll::Pending => break,
}
}
@ -463,11 +467,12 @@ where
// was hit then "yield". This'll make sure that the future is
// woken up immediately on the next turn of the event loop.
if messages_received == QOS_MAX_RECEIVE_MSGS {
task::current().notify();
// FIXME: this was a task::current().notify(); is this right?
cx.waker().wake_by_ref();
}
// Finally, return not ready to keep the 'driver task' alive.
Ok(Async::NotReady)
Poll::Pending
}
}
@ -483,11 +488,10 @@ impl DnsMultiplexerSerialResponse {
}
impl Future for DnsMultiplexerSerialResponse {
type Item = DnsResponse;
type Error = ProtoError;
type Output = Result<DnsResponse, ProtoError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.0.poll()
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.0.poll_unpin(cx)
}
}
@ -503,23 +507,20 @@ enum DnsMultiplexerSerialResponseInner {
}
impl Future for DnsMultiplexerSerialResponseInner {
type Item = DnsResponse;
type Error = ProtoError;
type Output = Result<DnsResponse, ProtoError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match *self {
// The inner type of the completion might have been an error
// we need to unwrap that, and translate to be the Future's error
DnsMultiplexerSerialResponseInner::Completion(complete) => match try_ready!(
complete
.poll()
DnsMultiplexerSerialResponseInner::Completion(ref mut complete) => {
complete.poll_unpin(cx).map(|r| r
.map_err(|_| ProtoError::from("the completion was canceled"))
) {
Ok(response) => Ok(Async::Ready(response)),
Err(err) => Err(err),
},
DnsMultiplexerSerialResponseInner::Err(err) => {
Err(err.take().expect("cannot poll after complete"))
.and_then(|r| r)
)
}
DnsMultiplexerSerialResponseInner::Err(ref mut err) => {
Poll::Ready(Err(err.take().expect("cannot poll after complete")))
}
}
}

View File

@ -6,11 +6,14 @@
use std::fmt::{Debug, Display};
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::Context;
use futures::{ready, Future, Poll, Stream};
use futures::channel::mpsc::{UnboundedSender, TrySendError};
use futures::channel::oneshot::{self, Receiver, Sender};
use crate::error::*;
use futures::sync::mpsc::{SendError, UnboundedSender};
use futures::sync::oneshot;
use futures::{Future, Poll, Stream};
use crate::op::Message;
mod dns_exchange;
@ -44,7 +47,7 @@ fn ignore_send<M, E: Debug>(result: Result<M, E>) {
/// A non-multiplexed stream of Serialized DNS messages
pub trait DnsClientStream:
Stream<Item = SerialMessage, Error = ProtoError> + Display + Send
Stream<Item = Result<SerialMessage, ProtoError>> + Display + Send
{
/// The remote name server address
fn name_server_addr(&self) -> SocketAddr;
@ -64,7 +67,7 @@ impl BufStreamHandle {
}
/// see [`futures::sync::mpsc::UnboundedSender`]
pub fn unbounded_send(&self, msg: SerialMessage) -> Result<(), SendError<SerialMessage>> {
pub fn unbounded_send(&self, msg: SerialMessage) -> Result<(), TrySendError<SerialMessage>> {
self.sender.unbounded_send(msg)
}
}
@ -109,32 +112,33 @@ impl DnsStreamHandle for BufDnsStreamHandle {
/// A sender to which serialized DNS Messages can be sent
pub struct DnsRequestStreamHandle<F>
where
F: Future<Item = DnsResponse, Error = ProtoError> + Send,
F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
{
sender: UnboundedSender<OneshotDnsRequest<F>>,
}
impl<F> DnsRequestStreamHandle<F>
where
F: Future<Item = DnsResponse, Error = ProtoError> + Send,
F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
{
/// Constructs a new BufStreamHandle with the associated ProtoError
pub fn new(sender: UnboundedSender<OneshotDnsRequest<F>>) -> Self {
DnsRequestStreamHandle { sender }
}
// FIXME: does try send change the semantics this had before?
/// see [`futures::sync::mpsc::UnboundedSender`]
pub fn unbounded_send(
&self,
msg: OneshotDnsRequest<F>,
) -> Result<(), SendError<OneshotDnsRequest<F>>> {
) -> Result<(), TrySendError<OneshotDnsRequest<F>>> {
self.sender.unbounded_send(msg)
}
}
impl<F> Clone for DnsRequestStreamHandle<F>
where
F: Future<Item = DnsResponse, Error = ProtoError> + Send,
F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
{
fn clone(&self) -> Self {
DnsRequestStreamHandle {
@ -149,10 +153,10 @@ where
/// NotReady, if it is not ready to send a message, and `Err` or `None` in the case that the stream is
/// done, and should be shutdown.
pub trait DnsRequestSender:
Stream<Item = (), Error = ProtoError> + 'static + Display + Send
Stream<Item = Result<(), ProtoError>> + 'static + Display + Send + Unpin
{
/// A future that resolves to a response serial message
type DnsResponseFuture: Future<Item = DnsResponse, Error = ProtoError> + 'static + Send;
type DnsResponseFuture: Future<Output = Result<DnsResponse, ProtoError>> + 'static + Send;
/// Send a message, and return a future of the response
///
@ -166,7 +170,7 @@ pub trait DnsRequestSender:
/// Allows the upstream user to inform the underling stream that it should shutdown.
///
/// After this is called, the next time `poll` is called on the stream it would be correct to return `Ok(Async::Ready(()))`. This is not required though, if there are say outstanding requests that are not yet complete, then it would be correct to first wait for those results.
/// After this is called, the next time `poll` is called on the stream it would be correct to return `Poll::Ready(Ok(()))`. This is not required though, if there are say outstanding requests that are not yet complete, then it would be correct to first wait for those results.
fn shutdown(&mut self);
/// Returns true if the stream has been shutdown with `shutdown`
@ -176,14 +180,14 @@ pub trait DnsRequestSender:
/// Used for associating a name_server to a DnsRequestStreamHandle
pub struct BufDnsRequestStreamHandle<F>
where
F: Future<Item = DnsResponse, Error = ProtoError> + Send,
F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
{
sender: DnsRequestStreamHandle<F>,
}
impl<F> BufDnsRequestStreamHandle<F>
where
F: Future<Item = DnsResponse, Error = ProtoError> + Send,
F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
{
/// Construct a new BufDnsRequestStreamHandle
pub fn new(sender: DnsRequestStreamHandle<F>) -> Self {
@ -193,7 +197,7 @@ where
impl<F> Clone for BufDnsRequestStreamHandle<F>
where
F: Future<Item = DnsResponse, Error = ProtoError> + Send,
F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
{
fn clone(&self) -> Self {
BufDnsRequestStreamHandle {
@ -218,7 +222,7 @@ macro_rules! try_oneshot {
impl<F> DnsHandle for BufDnsRequestStreamHandle<F>
where
F: Future<Item = DnsResponse, Error = ProtoError> + Send + 'static,
F: Future<Output = Result<DnsResponse, ProtoError>> + Send + Unpin + 'static,
{
type Response = OneshotDnsResponseReceiver<F>;
@ -240,15 +244,15 @@ where
/// A OneshotDnsRequest creates a channel for a response to message
pub struct OneshotDnsRequest<F>
where
F: Future<Item = DnsResponse, Error = ProtoError> + Send,
F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
{
dns_request: DnsRequest,
sender_for_response: oneshot::Sender<F>,
sender_for_response: Sender<F>,
}
impl<F> OneshotDnsRequest<F>
where
F: Future<Item = DnsResponse, Error = ProtoError> + Send,
F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
{
fn oneshot(dns_request: DnsRequest) -> (OneshotDnsRequest<F>, oneshot::Receiver<F>) {
let (sender_for_response, receiver) = oneshot::channel();
@ -272,11 +276,11 @@ where
struct OneshotDnsResponse<F>(oneshot::Sender<F>)
where
F: Future<Item = DnsResponse, Error = ProtoError> + Send;
F: Future<Output = Result<DnsResponse, ProtoError>> + Send;
impl<F> OneshotDnsResponse<F>
where
F: Future<Item = DnsResponse, Error = ProtoError> + Send,
F: Future<Output = Result<DnsResponse, ProtoError>> + Send,
{
fn send_response(self, serial_response: F) -> Result<(), F> {
self.0.send(serial_response)
@ -286,10 +290,10 @@ where
/// A Future that wraps a oneshot::Receiver and resolves to the final value
pub enum OneshotDnsResponseReceiver<F>
where
F: Future<Item = DnsResponse, Error = ProtoError> + Send,
F: Future<Output = Result<DnsResponse, ProtoError>> + Send + Unpin,
{
/// The receiver
Receiver(oneshot::Receiver<F>),
Receiver(Receiver<F>),
/// The future once received
Received(F),
/// Error during the send operation
@ -298,29 +302,30 @@ where
impl<F> Future for OneshotDnsResponseReceiver<F>
where
F: Future<Item = DnsResponse, Error = ProtoError> + Send,
F: Future<Output = Result<DnsResponse, ProtoError>> + Send + Unpin,
{
type Item = <F as Future>::Item;
type Error = ProtoError;
type Output = <F as Future>::Output;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
loop {
let future;
match self {
*self = match *self.as_mut() {
OneshotDnsResponseReceiver::Receiver(ref mut receiver) => {
future = try_ready!(receiver
.poll()
.map_err(|_| ProtoError::from("receiver was canceled")));
let receiver = Pin::new(receiver);
let future = ready!(receiver
.poll(cx)
.map_err(|_| ProtoError::from("receiver was canceled")))?;
OneshotDnsResponseReceiver::Received(future)
}
OneshotDnsResponseReceiver::Received(ref mut future) => return future.poll(),
OneshotDnsResponseReceiver::Err(err) => {
return Err(err
OneshotDnsResponseReceiver::Received(ref mut future) => {
let future = Pin::new(future);
return future.poll(cx)
}
OneshotDnsResponseReceiver::Err(ref mut err) => {
return Poll::Ready(Err(err
.take()
.expect("futures should not be polled after complete"))
.expect("futures should not be polled after complete")))
}
}
*self = OneshotDnsResponseReceiver::Received(future);
};
}
}
}

View File

@ -7,7 +7,10 @@
//! `RetryDnsHandle` allows for DnsQueries to be reattempted on failure
use futures::{Future, Poll};
use std::pin::Pin;
use std::task::Context;
use futures::{Future, FutureExt, Poll};
use crate::error::ProtoError;
use crate::xfer::{DnsRequest, DnsResponse};
@ -37,9 +40,9 @@ impl<H: DnsHandle> RetryDnsHandle<H> {
impl<H> DnsHandle for RetryDnsHandle<H>
where
H: DnsHandle + 'static,
H: DnsHandle + Unpin + 'static,
{
type Response = Box<dyn Future<Item = DnsResponse, Error = ProtoError> + Send>;
type Response = Box<dyn Future<Output = Result<DnsResponse, ProtoError>> + Send + Unpin>;
fn send<R: Into<DnsRequest>>(&mut self, request: R) -> Self::Response {
let request = request.into();
@ -65,26 +68,26 @@ struct RetrySendFuture<H: DnsHandle> {
remaining_attempts: usize,
}
impl<H: DnsHandle> Future for RetrySendFuture<H> {
type Item = DnsResponse;
type Error = ProtoError;
impl<H: DnsHandle + Unpin> Future for RetrySendFuture<H> {
type Output = Result<DnsResponse, ProtoError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
// loop over the future, on errors, spawn a new future
// on ready and not ready return.
loop {
match self.future.poll() {
r @ Ok(_) => return r,
Err(e) => {
match self.future.poll_unpin(cx) {
Poll::Ready(Err(e)) => {
if self.remaining_attempts == 0 {
return Err(e);
return Poll::Ready(Err(e));
}
self.remaining_attempts -= 1;
// FIXME: if the "sent" Message is part of the error result,
// then we can just reuse it... and no clone necessary
self.future = self.handle.send(self.request.clone());
let request = self.request.clone();
self.future = self.handle.send(request);
}
poll => return poll,
}
}
}
@ -94,7 +97,8 @@ impl<H: DnsHandle> Future for RetrySendFuture<H> {
mod test {
use super::*;
use crate::error::*;
use futures::*;
use futures::future::*;
use futures::executor::block_on;
use crate::op::*;
use std::cell::Cell;
use DnsHandle;
@ -107,7 +111,7 @@ mod test {
}
impl DnsHandle for TestClient {
type Response = Box<dyn Future<Item = DnsResponse, Error = ProtoError> + Send>;
type Response = Box<dyn Future<Output = Result<DnsResponse, ProtoError>> + Send + Unpin>;
fn send<R: Into<DnsRequest>>(&mut self, _: R) -> Self::Response {
let i = self.attempts.get();
@ -115,11 +119,11 @@ mod test {
if (i > self.retries || self.retries - i == 0) && self.last_succeed {
let mut message = Message::new();
message.set_id(i);
return Box::new(finished(message.into()));
return Box::new(ok(message.into()));
}
self.attempts.set(i + 1);
Box::new(failed(ProtoError::from("last retry set to fail")))
Box::new(err(ProtoError::from("last retry set to fail")))
}
}
@ -134,7 +138,7 @@ mod test {
2,
);
let test1 = Message::new();
let result = handle.send(test1).wait().expect("should have succeeded");
let result = block_on(handle.send(test1)).expect("should have succeeded");
assert_eq!(result.id(), 1); // this is checking the number of iterations the TestClient ran
}
@ -149,6 +153,6 @@ mod test {
2,
);
let test1 = Message::new();
assert!(client.send(test1).wait().is_err());
assert!(block_on(client.send(test1)).is_err());
}
}

View File

@ -10,9 +10,12 @@
use std::clone::Clone;
use std::collections::HashSet;
use std::mem;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use futures::*;
use futures::{Future, FutureExt, Poll, TryFutureExt};
use futures::future::{self, SelectAll};
use crate::error::*;
use crate::op::{OpCode, Query};
@ -53,7 +56,7 @@ where
impl<H> SecureDnsHandle<H>
where
H: DnsHandle + 'static,
H: DnsHandle + Unpin + 'static,
{
/// Create a new SecureDnsHandle wrapping the specified handle.
///
@ -96,8 +99,8 @@ where
}
}
impl<H: DnsHandle> DnsHandle for SecureDnsHandle<H> {
type Response = Box<dyn Future<Item = DnsResponse, Error = ProtoError> + Send>;
impl<H: DnsHandle + Unpin> DnsHandle for SecureDnsHandle<H> {
type Response = Box<dyn Future<Output = Result<DnsResponse, ProtoError>> + Send + Unpin>;
fn is_verifying_dnssec(&self) -> bool {
// This handler is always verifying...
@ -109,7 +112,7 @@ impl<H: DnsHandle> DnsHandle for SecureDnsHandle<H> {
// backstop, this might need to be configurable at some point
if self.request_depth > 20 {
return Box::new(failed(ProtoError::from("exceeded max validation depth")));
return Box::new(future::err(ProtoError::from("exceeded max validation depth")));
}
// dnssec only matters on queries.
@ -178,7 +181,7 @@ impl<H: DnsHandle> DnsHandle for SecureDnsHandle<H> {
{
soa_name
} else {
return Err(ProtoError::from(
return future::err(ProtoError::from(
"could not validate negative response missing SOA",
));
};
@ -191,13 +194,13 @@ impl<H: DnsHandle> DnsHandle for SecureDnsHandle<H> {
if !verify_nsec(&query, soa_name, nsecs.as_slice()) {
// TODO change this to remove the NSECs, like we do for the others?
return Err(ProtoError::from(
return future::err(ProtoError::from(
"could not validate negative response with NSEC",
));
}
}
Ok(verified_message)
future::ok(verified_message)
}),
);
}
@ -209,17 +212,17 @@ impl<H: DnsHandle> DnsHandle for SecureDnsHandle<H> {
/// A future to verify all RRSets in a returned Message.
struct VerifyRrsetsFuture {
message_result: Option<DnsResponse>,
rrsets: SelectAll<Box<dyn Future<Item = Rrset, Error = ProtoError> + Send>>,
rrsets: SelectAll<Pin<Box<dyn Future<Output = Result<Rrset, ProtoError>> + Send>>>,
verified_rrsets: HashSet<(Name, RecordType)>,
}
/// this pulls all records returned in a Message response and returns a future which will
/// validate all of them.
fn verify_rrsets<H: DnsHandle>(
fn verify_rrsets<H: DnsHandle + Unpin>(
handle: &SecureDnsHandle<H>,
message_result: DnsResponse,
dns_class: DNSClass,
) -> Box<dyn Future<Item = DnsResponse, Error = ProtoError> + Send> {
) -> Pin<Box<dyn Future<Output = Result<DnsResponse, ProtoError>> + Send>> {
let mut rrset_types: HashSet<(Name, RecordType)> = HashSet::new();
for rrset in message_result
.answers()
@ -249,14 +252,14 @@ fn verify_rrsets<H: DnsHandle>(
message_result.take_name_servers();
message_result.take_additionals();
return Box::new(failed(ProtoError::from(ProtoErrorKind::Message(
return future::err(ProtoError::from(ProtoErrorKind::Message(
"no results to verify",
))));
))).boxed();
}
// collect all the rrsets to verify
// TODO: is there a way to get rid of this clone() safely?
let mut rrsets: Vec<Box<dyn Future<Item = Rrset, Error = ProtoError> + Send>> =
let mut rrsets: Vec<Pin<Box<dyn Future<Output = Result<Rrset, ProtoError>> + Send>>> =
Vec::with_capacity(rrset_types.len());
for (name, record_type) in rrset_types {
// TODO: should we evaluate the different sections (answers and name_servers) separately?
@ -305,14 +308,14 @@ fn verify_rrsets<H: DnsHandle>(
}
// spawn a select_all over this vec, these are the individual RRSet validators
let rrsets_to_verify = select_all(rrsets);
let rrsets_to_verify = future::select_all(rrsets);
// return the full Message validator
Box::new(VerifyRrsetsFuture {
VerifyRrsetsFuture {
message_result: Some(message_result),
rrsets: rrsets_to_verify,
verified_rrsets: HashSet::new(),
})
}.boxed()
}
fn is_dnssec(rr: &Record, dnssec_type: DNSSECRecordType) -> bool {
@ -320,22 +323,21 @@ fn is_dnssec(rr: &Record, dnssec_type: DNSSECRecordType) -> bool {
}
impl Future for VerifyRrsetsFuture {
type Item = DnsResponse;
type Error = ProtoError;
type Output = Result<DnsResponse, ProtoError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
if self.message_result.is_none() {
return Err(ProtoError::from(ProtoErrorKind::Message("message is none")));
return Poll::Ready(Err(ProtoError::from(ProtoErrorKind::Message("message is none"))));
}
// loop through all the rrset evaluations, filter all the rrsets in the Message
// down to just the ones that were able to be validated
loop {
let remaining = match self.rrsets.poll() {
let remaining = match self.rrsets.poll_unpin(cx) {
// one way the loop will stop, nothing is ready...
Ok(Async::NotReady) => return Ok(Async::NotReady),
Poll::Pending => return Poll::Pending,
// all rrsets verified! woop!
Ok(Async::Ready((rrset, _, remaining))) => {
Poll::Ready((Ok(rrset), _, remaining)) => {
debug!(
"an rrset was verified: {}, {:?}",
rrset.name, rrset.record_type
@ -346,10 +348,10 @@ impl Future for VerifyRrsetsFuture {
// TODO, should we return the Message on errors? Allow the consumer to decide what to do
// on a validation failure?
// any error, is an error for all
Err((e, _, remaining)) => {
Poll::Ready((Err(e), _, remaining)) => {
debug!("an rrset failed to verify: {:?}", e);
if remaining.is_empty() {
return Err(e);
return Poll::Ready(Err(e));
}
remaining
}
@ -357,10 +359,10 @@ impl Future for VerifyRrsetsFuture {
if !remaining.is_empty() {
// continue the evaluation
drop(mem::replace(&mut self.rrsets, select_all(remaining)));
drop(mem::replace(&mut self.as_mut().rrsets, future::select_all(remaining)));
} else {
// validated not none above...
let mut message_result = mem::replace(&mut self.message_result, None).unwrap();
let mut message_result = mem::replace(&mut self.as_mut().message_result, None).unwrap();
// take all the rrsets from the Message, filter down each set to the validated rrsets
// TODO: does the section in the message matter here?
@ -400,7 +402,7 @@ impl Future for VerifyRrsetsFuture {
message_result.insert_additionals(additionals);
// breaks out of the loop... and returns the filtered Message.
return Ok(Async::Ready(message_result));
return Poll::Ready(Ok(message_result));
}
}
}
@ -418,9 +420,9 @@ fn verify_rrset<H>(
handle: SecureDnsHandle<H>,
rrset: Rrset,
rrsigs: Vec<Record>,
) -> Box<dyn Future<Item = Rrset, Error = ProtoError> + Send>
) -> Pin<Box<dyn Future<Output = Result<Rrset, ProtoError>> + Send>>
where
H: DnsHandle,
H: DnsHandle + Unpin,
{
// Special case for unsigned DNSKEYs, it's valid for a DNSKEY to be bare in the zone if
// it's a trust_anchor, though some DNS servers choose to self-sign in this case,
@ -435,21 +437,19 @@ where
}
// standard validation path
Box::new(
verify_default_rrset(&handle.clone_with_context(), rrset, rrsigs)
.and_then(|rrset|
// POST validation
match rrset.record_type {
RecordType::DNSSEC(DNSSECRecordType::DNSKEY) =>
verify_dnskey_rrset(handle, rrset),
// RecordType::DNSSEC(DNSSECRecordType::DS) => verify_ds_rrset(handle, name, record_type, record_class, rrset, rrsigs),
_ => Box::new(finished(rrset)),
})
.map_err(|e| {
debug!("rrset failed validation: {}", e);
e
}),
)
verify_default_rrset(&handle.clone_with_context(), rrset, rrsigs)
.and_then(|rrset|
// POST validation
match rrset.record_type {
RecordType::DNSSEC(DNSSECRecordType::DNSKEY) => verify_dnskey_rrset(handle, rrset),
_ => future::ok(rrset).boxed(),
}
)
.map_err(|e| {
debug!("rrset failed validation: {}", e);
e
})
.boxed()
}
/// Verifies a dnskey rrset
@ -460,9 +460,9 @@ where
fn verify_dnskey_rrset<H>(
mut handle: SecureDnsHandle<H>,
rrset: Rrset,
) -> Box<dyn Future<Item = Rrset, Error = ProtoError> + Send>
) -> Pin<Box<dyn Future<Output = Result<Rrset, ProtoError>> + Send>>
where
H: DnsHandle,
H: DnsHandle + Unpin
{
debug!(
"dnskey validation {}, record_type: {:?}",
@ -505,7 +505,7 @@ where
rrset.name,
rrset.records.len()
);
return Box::new(finished(rrset));
return future::ok(rrset).boxed();
}
}
@ -551,15 +551,15 @@ where
preserve(&mut rrset.records, valid_keys);
debug!("validated dnskey: {}, {}", rrset.name, rrset.records.len());
Ok(rrset)
future::ok(rrset)
} else {
Err(ProtoError::from(ProtoErrorKind::Message(
future::err(ProtoError::from(ProtoErrorKind::Message(
"Could not validate all DNSKEYs",
)))
}
});
Box::new(valid_dnskey)
valid_dnskey.boxed()
}
/// Preserves the specified indexes in vec, all others will be removed
@ -631,9 +631,9 @@ fn verify_default_rrset<H>(
handle: &SecureDnsHandle<H>,
rrset: Rrset,
rrsigs: Vec<Record>,
) -> Box<dyn Future<Item = Rrset, Error = ProtoError> + Send>
) -> Pin<Box<dyn Future<Output = Result<Rrset, ProtoError>> + Send>>
where
H: DnsHandle,
H: DnsHandle + Unpin,
{
// the record set is going to be shared across a bunch of futures, Arc for that.
let rrset = Arc::new(rrset);
@ -659,8 +659,7 @@ where
// then return rrset. Like the standard case below, the DNSKEY is validated
// after this function. This function is only responsible for validating the signature
// the DNSKey validation should come after, see verify_rrset().
return Box::new(
done(
return future::ready(
rrsigs
.into_iter()
// this filter is technically unnecessary, can probably remove it...
@ -693,8 +692,8 @@ where
ProtoError::from(ProtoErrorKind::Message("self-signed dnskey is invalid"))
}),
)
.map(move |rrset| Arc::try_unwrap(rrset).expect("unable to unwrap Arc")),
);
.map_ok(move |rrset| Arc::try_unwrap(rrset).expect("unable to unwrap Arc"))
.boxed();
}
// we can validate with any of the rrsigs...
@ -707,57 +706,61 @@ where
// dns over TLS will mitigate this.
// TODO: strip RRSIGS to accepted algorithms and make algorithms configurable.
let verifications = rrsigs.into_iter()
// this filter is technically unnecessary, can probably remove it...
.filter(|rrsig| is_dnssec(rrsig, DNSSECRecordType::RRSIG))
.map(|rrsig|
if let RData::DNSSEC(DNSSECRData::SIG(sig)) = rrsig.unwrap_rdata() {
// setting up the context explicitly.
sig
} else {
panic!("expected a SIG here");
}
)
.map(|sig| {
let rrset = Arc::clone(&rrset);
let mut handle = handle.clone_with_context();
// this filter is technically unnecessary, can probably remove it...
.filter(|rrsig| is_dnssec(rrsig, DNSSECRecordType::RRSIG))
.map(|rrsig|
if let RData::DNSSEC(DNSSECRData::SIG(sig)) = rrsig.unwrap_rdata() {
// setting up the context explicitly.
sig
} else {
panic!("expected a SIG here");
}
)
.map(|sig| {
let rrset = Arc::clone(&rrset);
let mut handle = handle.clone_with_context();
handle.lookup(Query::query(sig.signer_name().clone(), RecordType::DNSSEC(DNSSECRecordType::DNSKEY)),
DnsRequestOptions::default())
.and_then(move |message|
// DNSKEYs are validated by the inner query
message.answers()
.iter()
.filter(|r| is_dnssec(r, DNSSECRecordType::DNSKEY))
.find(|r|
if let RData::DNSSEC(DNSSECRData::DNSKEY(ref dnskey)) = *r.rdata() {
verify_rrset_with_dnskey(dnskey, &sig, &rrset).is_ok()
} else {
panic!("expected a DNSKEY here: {:?}", r.rdata());
}
)
.map(|_| rrset)
.ok_or_else(|| ProtoError::from(ProtoErrorKind::Message("validation failed")))
)
})
.collect::<Vec<_>>();
handle
.lookup(
Query::query(sig.signer_name().clone(), RecordType::DNSSEC(DNSSECRecordType::DNSKEY)),
DnsRequestOptions::default()
)
.and_then(move |message|
// DNSKEYs are validated by the inner query
future::ready(message
.answers()
.iter()
.filter(|r| is_dnssec(r, DNSSECRecordType::DNSKEY))
.find(|r|
if let RData::DNSSEC(DNSSECRData::DNSKEY(ref dnskey)) = *r.rdata() {
verify_rrset_with_dnskey(dnskey, &sig, &rrset).is_ok()
} else {
panic!("expected a DNSKEY here: {:?}", r.rdata());
}
)
.map(|_| rrset)
.ok_or_else(|| ProtoError::from(ProtoErrorKind::Message("validation failed"))))
)
})
.collect::<Vec<_>>();
// if there are no available verifications, then we are in a failed state.
if verifications.is_empty() {
return Box::new(failed(ProtoError::from(ProtoErrorKind::RrsigsNotPresent {
return future::err(ProtoError::from(ProtoErrorKind::RrsigsNotPresent {
name: rrset.name.clone(),
record_type: rrset.record_type,
})));
})).boxed();
}
// as long as any of the verifications is good, then the RRSET is valid.
let select = select_ok(verifications)
let select = future::select_ok(verifications)
// getting here means at least one of the rrsigs succeeded...
.map(move |(rrset, rest)| {
.map_ok(move |(rrset, rest)| {
drop(rest); // drop all others, should free up Arc
Arc::try_unwrap(rrset).expect("unable to unwrap Arc")
});
Box::new(select)
select.boxed()
}
/// Verifies the given SIG of the RRSET with the DNSKEY.