From 3e4ef61edbdcf27e6f301a195dde112b1d2ea935 Mon Sep 17 00:00:00 2001 From: Luca Fulchir Date: Sat, 4 Feb 2023 16:21:16 +0100 Subject: [PATCH] DNSSEC resolver initialization Signed-off-by: Luca Fulchir --- Cargo.toml | 4 + src/config/mod.rs | 20 ++++- src/lib.rs | 216 +++++++++++++++++++++++++++++++++++++++------- 3 files changed, 209 insertions(+), 31 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 855e70d..90279d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,12 +23,16 @@ publish = false crate_type = [ "lib", "cdylib", "staticlib" ] [dependencies] +# please keep these in alphabetical order +futures = { version = "^0.3" } thiserror = { version = "^1.0" } tokio = { version = "^1", features = ["full"] } # PERF: todo linux-only, behind "iouring" feature #tokio-uring = { version = "^0.4" } tracing = { version = "^0.1" } +trust-dns-client = { version = "^0.22" } +trust-dns-proto = { version = "^0.22" } [profile.dev] diff --git a/src/config/mod.rs b/src/config/mod.rs index 3e008ef..e3fd4c8 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -1,7 +1,12 @@ //! //! Configuration to initialize the Fenrir networking library -use ::std::{net::SocketAddr, num::NonZeroUsize, option::Option, vec::Vec}; +use ::std::{ + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, + num::NonZeroUsize, + option::Option, + vec, +}; /// Main config for libFenrir #[derive(Clone, Debug)] @@ -11,13 +16,24 @@ pub struct Config { /// List of ipv4 or ipv6 UDP inet socket to listen on /// If empty, libFenrir will listen on a random UDP port on `0.0.0.0` pub listen: Vec, + /// List of DNS resolvers to use + pub resolvers: Vec, } impl Default for Config { fn default() -> Self { Config { threads: None, - listen: Vec::new(), + listen: vec![ + // ipv4 random port + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), + // ipv6 random port + SocketAddr::new( + IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), + 0, + ), + ], + resolvers: Vec::new(), } } } diff --git a/src/lib.rs b/src/lib.rs index 37d8e00..ae8eca5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,12 +13,11 @@ //! //! libFenrir is the official rust library implementing the Fenrir protocol -use ::std::{ - io::Result, - net::{IpAddr, Ipv4Addr, SocketAddr}, +use ::std::{io::Result, net::SocketAddr, result, sync::Arc}; +use ::tokio::{net::UdpSocket, runtime, task::JoinHandle}; +use trust_dns_client::client::{ + AsyncClient as DnssecClient, ClientHandle as DnssecHandle, }; -use ::tokio::{net::UdpSocket, runtime}; -use ::tracing::error; mod config; pub use config::Config; @@ -30,20 +29,28 @@ pub struct Fenrir { cfg: Config, /// internal runtime rt: ::tokio::runtime::Runtime, + /// listening udp sockets + sockets: Vec<(Arc, JoinHandle>)>, + /// DNSSEC resolvers. multiple for failover + resolvers: Vec<( + DnssecClient, + JoinHandle>, + )>, + /// Broadcast channel to tell workers to stop working + stop_working: ::tokio::sync::broadcast::Sender, } -/// Initialize a new Fenrir endpoint -pub fn init(config: &Config) -> Result { - // PERF: linux iouring - // PERF: multithread pinning with hwloc2 for topology - let f = Fenrir::new(config.clone())?; - f.add_listener(); - Ok(f) +// TODO: graceful vs immediate stop + +impl Drop for Fenrir { + fn drop(&mut self) { + self.stop_sync() + } } impl Fenrir { /// Create a new Fenrir endpoint - fn new(config: Config) -> Result { + pub fn new(config: &Config) -> Result { let mut builder = runtime::Builder::new_multi_thread(); if let Some(threads) = config.threads { builder.worker_threads(threads.get()); @@ -55,27 +62,178 @@ impl Fenrir { return Err(e); } }; - Ok(Fenrir { - cfg: config, + // start listening + let listen_num = config.listen.len(); + let resolvers_num = config.resolvers.len(); + let (sender, _) = ::tokio::sync::broadcast::channel(1); + let endpoint = Fenrir { + cfg: config.clone(), rt: rt, - }) + sockets: Vec::with_capacity(listen_num), + resolvers: Vec::with_capacity(resolvers_num), + stop_working: sender, + }; + Ok(endpoint) } - /// Add an UDP listener - fn add_listener(&self) { - if self.cfg.listen.len() == 0 { - self.rt.spawn(listen_udp(SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), - 0, // 0 means random port - ))); - } else { - for sock in self.cfg.listen.iter() { - self.rt.spawn(listen_udp(sock.clone())); - } + /// Start all workers, listeners + pub async fn start(&mut self) -> Result<()> { + if let Err(e) = self.add_sockets().await { + self.stop().await; + return Err(e); + } + if let Err(e) = self.setup_dnssec().await { + self.stop().await; + return Err(e); + } + Ok(()) + } + /// Stop all workers, listeners + /// asyncronous version for Drop + fn stop_sync(&mut self) { + let mut toempty_resolv = Vec::new(); + let mut toempty_socket = Vec::new(); + ::std::mem::swap(&mut self.resolvers, &mut toempty_resolv); + ::std::mem::swap(&mut self.sockets, &mut toempty_socket); + self.rt + .block_on(Self::real_stop(toempty_socket, toempty_resolv)); + } + /// Stop all workers, listeners + pub async fn stop(&mut self) { + // TODO + let _ = self.stop_working.send(true); + let mut toempty_resolv = Vec::new(); + let mut toempty_socket = Vec::new(); + ::std::mem::swap(&mut self.resolvers, &mut toempty_resolv); + ::std::mem::swap(&mut self.sockets, &mut toempty_socket); + Self::real_stop(toempty_socket, toempty_resolv).await; + } + /// actually do the work of stopping resolvers and listeners + async fn real_stop( + sockets: Vec<(Arc, JoinHandle>)>, + resolvers: Vec<( + DnssecClient, + JoinHandle< + result::Result<(), ::trust_dns_proto::error::ProtoError>, + >, + )>, + ) { + for r in resolvers.into_iter() { + let _ = r.1.await; + } + for s in sockets.into_iter() { + let _ = s.1.await; } } + /// Add all UDP sockets found in config + /// and start listening for packets + async fn add_sockets(&mut self) -> Result<()> { + let sockets = self.cfg.listen.iter().map(|s_addr| async { + //let socket = self.rt.block_on(bind_udp(s_addr.clone()))?; + let socket = self.rt.spawn(bind_udp(s_addr.clone())).await??; + Ok(Arc::new(socket)) + }); + let sockets = ::futures::future::join_all(sockets).await; + for s_res in sockets.into_iter() { + match s_res { + Ok(s) => { + let stop_working = self.stop_working.subscribe(); + let join = + self.rt.spawn(listen_udp(stop_working, s.clone())); + self.sockets.push((s, join)); + } + Err(e) => { + return Err(e); + } + } + } + Ok(()) + } + /// Add and initialize the DNSSEC resolvers to the endpoint + async fn setup_dnssec(&mut self) -> Result<()> { + // use a TCP connection to the DNS. + // the records we need are big, will not fit in a UDP packet + use tokio::net::TcpStream; + use trust_dns_client::{ + client::AsyncClient, proto::iocompat::AsyncIoTokioAsStd, + tcp::TcpClientStream, + }; + + for resolver in self.cfg.resolvers.iter() { + let (stream, sender) = TcpClientStream::< + AsyncIoTokioAsStd, + >::new(resolver.clone()); + let client_tostart = AsyncClient::new(stream, sender, None); + // await the connection to be established + let (client, bg) = client_tostart.await?; + + let handle = self.rt.spawn(bg); + self.resolvers.push((client, handle)); + } + Ok(()) + } + /// get the fenrir data for a domain + pub async fn resolv(&mut self, domain: &str) -> Result { + use std::str::FromStr; + use trust_dns_client::rr::{DNSClass, Name, RData, RecordType}; + + let fullname = "_fenrir".to_owned() + domain; + for client in self.resolvers.iter_mut() { + let query = client.0.query( + Name::from_str(&fullname).unwrap(), + DNSClass::IN, + RecordType::TXT, + ); + + // wait for its response + let response = query.await?; + + // validate it's what we expected + match response.answers()[0].data() { + Some(RData::TXT(text)) => return Ok(text.to_string()), + _ => { + return Err(::std::io::Error::new( + ::std::io::ErrorKind::NotFound, + "No TXT record found", + )) + } + } + } + Err(::std::io::Error::new( + ::std::io::ErrorKind::NotConnected, + "No DNS server usable", + )) + } } + /// Add an async udp listener -async fn listen_udp(sock: SocketAddr) -> Result<()> { - UdpSocket::bind(sock).await?; +async fn bind_udp(sock: SocketAddr) -> Result { + let socket = UdpSocket::bind(sock).await?; + // PERF: SO_REUSEADDR/REUSEPORT + Ok(socket) +} + +/// Run a dedicated loop to read packets on the listening socket +async fn listen_udp( + mut stop_working: ::tokio::sync::broadcast::Receiver, + socket: Arc, +) -> Result<()> { + // jumbo frames are 9K max + let mut buffer: [u8; 9000] = [0; 9000]; + loop { + let (bytes, sock_from) = ::tokio::select! { + _done = stop_working.recv() => { + break; + } + result = socket.recv_from(&mut buffer) => { + result? + } + }; + work(&buffer[0..bytes], sock_from).await; + } Ok(()) } + +/// Read and do stuff with the udp packet +async fn work(_buffer: &[u8], _sock_from: SocketAddr) { + // Do nothing for now +}