libFenrir/src/lib.rs
Luca Fulchir fd76ec9983
Split resolving and decoding
Signed-off-by: Luca Fulchir <luca.fulchir@runesauth.com>
2023-02-09 12:12:09 +01:00

175 lines
5.4 KiB
Rust

#![deny(
missing_docs,
missing_debug_implementations,
missing_copy_implementations,
trivial_casts,
trivial_numeric_casts,
unsafe_code,
unstable_features,
unused_import_braces,
unused_qualifications
)]
//!
//! libFenrir is the official rust library implementing the Fenrir protocol
use ::std::{net::SocketAddr, sync::Arc};
use ::tokio::{net::UdpSocket, task::JoinHandle};
mod config;
pub use config::Config;
pub mod dnssec;
/// Main fenrir library errors
#[derive(::thiserror::Error, Debug)]
pub enum Error {
/// General I/O error
#[error("IO: {0:?}")]
IO(#[from] ::std::io::Error),
/// Dnssec errors
#[error("Dnssec: {0:?}")]
Dnssec(#[from] dnssec::Error),
/// The library was not initialized (run .start())
#[error("not initialized")]
NotInitialized,
}
/// Instance of a fenrir endpoint
#[allow(missing_copy_implementations, missing_debug_implementations)]
pub struct Fenrir {
/// library Configuration
cfg: Config,
/// listening udp sockets
sockets: Vec<(Arc<UdpSocket>, JoinHandle<::std::io::Result<()>>)>,
/// DNSSEC resolver, with failovers
dnssec: Option<dnssec::Dnssec>,
/// Broadcast channel to tell workers to stop working
stop_working: ::tokio::sync::broadcast::Sender<bool>,
}
// TODO: graceful vs immediate stop
impl Drop for Fenrir {
fn drop(&mut self) {
self.stop_sync()
}
}
impl Fenrir {
/// Create a new Fenrir endpoint
pub fn new(config: &Config) -> Result<Self, Error> {
let listen_num = config.listen.len();
let (sender, _) = ::tokio::sync::broadcast::channel(1);
let endpoint = Fenrir {
cfg: config.clone(),
sockets: Vec::with_capacity(listen_num),
dnssec: None,
stop_working: sender,
};
Ok(endpoint)
}
/// Start all workers, listeners
pub async fn start(&mut self) -> Result<(), Error> {
if let Err(e) = self.add_sockets().await {
self.stop().await;
return Err(e.into());
}
self.dnssec = Some(dnssec::Dnssec::new(&self.cfg.resolvers).await?);
Ok(())
}
/// Stop all workers, listeners
/// asyncronous version for Drop
fn stop_sync(&mut self) {
let _ = self.stop_working.send(true);
let mut toempty_socket = Vec::new();
::std::mem::swap(&mut self.sockets, &mut toempty_socket);
let task = ::tokio::task::spawn(Self::stop_sockets(toempty_socket));
let _ = ::futures::executor::block_on(task);
self.dnssec = None;
}
/// Stop all workers, listeners
pub async fn stop(&mut self) {
let _ = self.stop_working.send(true);
let mut toempty_socket = Vec::new();
::std::mem::swap(&mut self.sockets, &mut toempty_socket);
Self::stop_sockets(toempty_socket).await;
self.dnssec = None;
}
/// actually do the work of stopping resolvers and listeners
async fn stop_sockets(
sockets: Vec<(Arc<UdpSocket>, JoinHandle<::std::io::Result<()>>)>,
) {
for s in sockets.into_iter() {
let _ = s.1.await;
}
}
/// Add all UDP sockets found in config
/// and start listening for packets
async fn add_sockets(&mut self) -> ::std::io::Result<()> {
let sockets = self.cfg.listen.iter().map(|s_addr| async {
let socket = ::tokio::spawn(bind_udp(s_addr.clone())).await??;
Ok(Arc::new(socket))
});
let sockets = ::futures::future::join_all(sockets).await;
for s_res in sockets.into_iter() {
match s_res {
Ok(s) => {
let stop_working = self.stop_working.subscribe();
let join =
::tokio::spawn(listen_udp(stop_working, s.clone()));
self.sockets.push((s, join));
}
Err(e) => {
return Err(e);
}
}
}
Ok(())
}
/// Get the raw TXT record of a Fenrir domain
pub async fn resolv_str(&self, domain: &str) -> Result<String, Error> {
match &self.dnssec {
Some(dnssec) => Ok(dnssec.resolv(domain).await?),
None => Err(Error::NotInitialized),
}
}
/// Get the raw TXT record of a Fenrir domain
pub async fn resolv(&self, domain: &str) -> Result<dnssec::Record, Error> {
let record_str = self.resolv_str(domain).await?;
Ok(dnssec::Dnssec::parse_txt_record(&record_str)?)
}
}
/// Add an async udp listener
async fn bind_udp(sock: SocketAddr) -> ::std::io::Result<UdpSocket> {
let socket = UdpSocket::bind(sock).await?;
// PERF: SO_REUSEADDR/REUSEPORT
Ok(socket)
}
/// Run a dedicated loop to read packets on the listening socket
async fn listen_udp(
mut stop_working: ::tokio::sync::broadcast::Receiver<bool>,
socket: Arc<UdpSocket>,
) -> ::std::io::Result<()> {
// jumbo frames are 9K max
let mut buffer: [u8; 9000] = [0; 9000];
loop {
let (bytes, sock_from) = ::tokio::select! {
_done = stop_working.recv() => {
break;
}
result = socket.recv_from(&mut buffer) => {
result?
}
};
work(&buffer[0..bytes], sock_from).await;
}
Ok(())
}
/// Read and do stuff with the udp packet
async fn work(_buffer: &[u8], _sock_from: SocketAddr) {
// Do nothing for now
}