DNSSEC resolver initialization

Signed-off-by: Luca Fulchir <luca.fulchir@runesauth.com>
This commit is contained in:
Luca Fulchir 2023-02-04 16:21:16 +01:00
parent 104cbb6126
commit 3e4ef61edb
Signed by: luca.fulchir
GPG Key ID: 8F6440603D13A78E
3 changed files with 209 additions and 31 deletions

View File

@ -23,12 +23,16 @@ publish = false
crate_type = [ "lib", "cdylib", "staticlib" ] crate_type = [ "lib", "cdylib", "staticlib" ]
[dependencies] [dependencies]
# please keep these in alphabetical order
futures = { version = "^0.3" }
thiserror = { version = "^1.0" } thiserror = { version = "^1.0" }
tokio = { version = "^1", features = ["full"] } tokio = { version = "^1", features = ["full"] }
# PERF: todo linux-only, behind "iouring" feature # PERF: todo linux-only, behind "iouring" feature
#tokio-uring = { version = "^0.4" } #tokio-uring = { version = "^0.4" }
tracing = { version = "^0.1" } tracing = { version = "^0.1" }
trust-dns-client = { version = "^0.22" }
trust-dns-proto = { version = "^0.22" }
[profile.dev] [profile.dev]

View File

@ -1,7 +1,12 @@
//! //!
//! Configuration to initialize the Fenrir networking library //! Configuration to initialize the Fenrir networking library
use ::std::{net::SocketAddr, num::NonZeroUsize, option::Option, vec::Vec}; use ::std::{
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
num::NonZeroUsize,
option::Option,
vec,
};
/// Main config for libFenrir /// Main config for libFenrir
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -11,13 +16,24 @@ pub struct Config {
/// List of ipv4 or ipv6 UDP inet socket to listen on /// List of ipv4 or ipv6 UDP inet socket to listen on
/// If empty, libFenrir will listen on a random UDP port on `0.0.0.0` /// If empty, libFenrir will listen on a random UDP port on `0.0.0.0`
pub listen: Vec<SocketAddr>, pub listen: Vec<SocketAddr>,
/// List of DNS resolvers to use
pub resolvers: Vec<SocketAddr>,
} }
impl Default for Config { impl Default for Config {
fn default() -> Self { fn default() -> Self {
Config { Config {
threads: None, threads: None,
listen: Vec::new(), listen: vec![
// ipv4 random port
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
// ipv6 random port
SocketAddr::new(
IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)),
0,
),
],
resolvers: Vec::new(),
} }
} }
} }

View File

@ -13,12 +13,11 @@
//! //!
//! libFenrir is the official rust library implementing the Fenrir protocol //! libFenrir is the official rust library implementing the Fenrir protocol
use ::std::{ use ::std::{io::Result, net::SocketAddr, result, sync::Arc};
io::Result, use ::tokio::{net::UdpSocket, runtime, task::JoinHandle};
net::{IpAddr, Ipv4Addr, SocketAddr}, use trust_dns_client::client::{
AsyncClient as DnssecClient, ClientHandle as DnssecHandle,
}; };
use ::tokio::{net::UdpSocket, runtime};
use ::tracing::error;
mod config; mod config;
pub use config::Config; pub use config::Config;
@ -30,20 +29,28 @@ pub struct Fenrir {
cfg: Config, cfg: Config,
/// internal runtime /// internal runtime
rt: ::tokio::runtime::Runtime, rt: ::tokio::runtime::Runtime,
/// listening udp sockets
sockets: Vec<(Arc<UdpSocket>, JoinHandle<Result<()>>)>,
/// DNSSEC resolvers. multiple for failover
resolvers: Vec<(
DnssecClient,
JoinHandle<result::Result<(), ::trust_dns_proto::error::ProtoError>>,
)>,
/// Broadcast channel to tell workers to stop working
stop_working: ::tokio::sync::broadcast::Sender<bool>,
} }
/// Initialize a new Fenrir endpoint // TODO: graceful vs immediate stop
pub fn init(config: &Config) -> Result<Fenrir> {
// PERF: linux iouring impl Drop for Fenrir {
// PERF: multithread pinning with hwloc2 for topology fn drop(&mut self) {
let f = Fenrir::new(config.clone())?; self.stop_sync()
f.add_listener(); }
Ok(f)
} }
impl Fenrir { impl Fenrir {
/// Create a new Fenrir endpoint /// Create a new Fenrir endpoint
fn new(config: Config) -> Result<Self> { pub fn new(config: &Config) -> Result<Self> {
let mut builder = runtime::Builder::new_multi_thread(); let mut builder = runtime::Builder::new_multi_thread();
if let Some(threads) = config.threads { if let Some(threads) = config.threads {
builder.worker_threads(threads.get()); builder.worker_threads(threads.get());
@ -55,27 +62,178 @@ impl Fenrir {
return Err(e); return Err(e);
} }
}; };
Ok(Fenrir { // start listening
cfg: config, let listen_num = config.listen.len();
let resolvers_num = config.resolvers.len();
let (sender, _) = ::tokio::sync::broadcast::channel(1);
let endpoint = Fenrir {
cfg: config.clone(),
rt: rt, rt: rt,
}) sockets: Vec::with_capacity(listen_num),
resolvers: Vec::with_capacity(resolvers_num),
stop_working: sender,
};
Ok(endpoint)
} }
/// Add an UDP listener /// Start all workers, listeners
fn add_listener(&self) { pub async fn start(&mut self) -> Result<()> {
if self.cfg.listen.len() == 0 { if let Err(e) = self.add_sockets().await {
self.rt.spawn(listen_udp(SocketAddr::new( self.stop().await;
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), return Err(e);
0, // 0 means random port
)));
} else {
for sock in self.cfg.listen.iter() {
self.rt.spawn(listen_udp(sock.clone()));
} }
if let Err(e) = self.setup_dnssec().await {
self.stop().await;
return Err(e);
} }
}
}
/// Add an async udp listener
async fn listen_udp(sock: SocketAddr) -> Result<()> {
UdpSocket::bind(sock).await?;
Ok(()) Ok(())
} }
/// Stop all workers, listeners
/// asyncronous version for Drop
fn stop_sync(&mut self) {
let mut toempty_resolv = Vec::new();
let mut toempty_socket = Vec::new();
::std::mem::swap(&mut self.resolvers, &mut toempty_resolv);
::std::mem::swap(&mut self.sockets, &mut toempty_socket);
self.rt
.block_on(Self::real_stop(toempty_socket, toempty_resolv));
}
/// Stop all workers, listeners
pub async fn stop(&mut self) {
// TODO
let _ = self.stop_working.send(true);
let mut toempty_resolv = Vec::new();
let mut toempty_socket = Vec::new();
::std::mem::swap(&mut self.resolvers, &mut toempty_resolv);
::std::mem::swap(&mut self.sockets, &mut toempty_socket);
Self::real_stop(toempty_socket, toempty_resolv).await;
}
/// actually do the work of stopping resolvers and listeners
async fn real_stop(
sockets: Vec<(Arc<UdpSocket>, JoinHandle<Result<()>>)>,
resolvers: Vec<(
DnssecClient,
JoinHandle<
result::Result<(), ::trust_dns_proto::error::ProtoError>,
>,
)>,
) {
for r in resolvers.into_iter() {
let _ = r.1.await;
}
for s in sockets.into_iter() {
let _ = s.1.await;
}
}
/// Add all UDP sockets found in config
/// and start listening for packets
async fn add_sockets(&mut self) -> Result<()> {
let sockets = self.cfg.listen.iter().map(|s_addr| async {
//let socket = self.rt.block_on(bind_udp(s_addr.clone()))?;
let socket = self.rt.spawn(bind_udp(s_addr.clone())).await??;
Ok(Arc::new(socket))
});
let sockets = ::futures::future::join_all(sockets).await;
for s_res in sockets.into_iter() {
match s_res {
Ok(s) => {
let stop_working = self.stop_working.subscribe();
let join =
self.rt.spawn(listen_udp(stop_working, s.clone()));
self.sockets.push((s, join));
}
Err(e) => {
return Err(e);
}
}
}
Ok(())
}
/// Add and initialize the DNSSEC resolvers to the endpoint
async fn setup_dnssec(&mut self) -> Result<()> {
// use a TCP connection to the DNS.
// the records we need are big, will not fit in a UDP packet
use tokio::net::TcpStream;
use trust_dns_client::{
client::AsyncClient, proto::iocompat::AsyncIoTokioAsStd,
tcp::TcpClientStream,
};
for resolver in self.cfg.resolvers.iter() {
let (stream, sender) = TcpClientStream::<
AsyncIoTokioAsStd<TcpStream>,
>::new(resolver.clone());
let client_tostart = AsyncClient::new(stream, sender, None);
// await the connection to be established
let (client, bg) = client_tostart.await?;
let handle = self.rt.spawn(bg);
self.resolvers.push((client, handle));
}
Ok(())
}
/// get the fenrir data for a domain
pub async fn resolv(&mut self, domain: &str) -> Result<String> {
use std::str::FromStr;
use trust_dns_client::rr::{DNSClass, Name, RData, RecordType};
let fullname = "_fenrir".to_owned() + domain;
for client in self.resolvers.iter_mut() {
let query = client.0.query(
Name::from_str(&fullname).unwrap(),
DNSClass::IN,
RecordType::TXT,
);
// wait for its response
let response = query.await?;
// validate it's what we expected
match response.answers()[0].data() {
Some(RData::TXT(text)) => return Ok(text.to_string()),
_ => {
return Err(::std::io::Error::new(
::std::io::ErrorKind::NotFound,
"No TXT record found",
))
}
}
}
Err(::std::io::Error::new(
::std::io::ErrorKind::NotConnected,
"No DNS server usable",
))
}
}
/// Add an async udp listener
async fn bind_udp(sock: SocketAddr) -> Result<UdpSocket> {
let socket = UdpSocket::bind(sock).await?;
// PERF: SO_REUSEADDR/REUSEPORT
Ok(socket)
}
/// Run a dedicated loop to read packets on the listening socket
async fn listen_udp(
mut stop_working: ::tokio::sync::broadcast::Receiver<bool>,
socket: Arc<UdpSocket>,
) -> Result<()> {
// jumbo frames are 9K max
let mut buffer: [u8; 9000] = [0; 9000];
loop {
let (bytes, sock_from) = ::tokio::select! {
_done = stop_working.recv() => {
break;
}
result = socket.recv_from(&mut buffer) => {
result?
}
};
work(&buffer[0..bytes], sock_from).await;
}
Ok(())
}
/// Read and do stuff with the udp packet
async fn work(_buffer: &[u8], _sock_from: SocketAddr) {
// Do nothing for now
}