From f53ef95c7395b7409153054286a21f8ae031481a Mon Sep 17 00:00:00 2001 From: Luca Fulchir Date: Sat, 25 Feb 2023 22:25:52 +0100 Subject: [PATCH] Track new connection Signed-off-by: Luca Fulchir --- Cargo.toml | 1 + src/connection/mod.rs | 15 ++++ src/connection/packet.rs | 13 ++++ src/lib.rs | 155 +++++++++++++++++++++++++++++---------- 4 files changed, 144 insertions(+), 40 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 07bb430..906053a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ arrayref = { version = "0.3" } async-channel = { version = "1.8" } # base85 repo has no tags, fix on a commit. v1.1.1 points to older, wrong version base85 = { git = "https://gitlab.com/darkwyrm/base85", rev = "d98efbfd171dd9ba48e30a5c88f94db92fc7b3c6" } +bitmaps = { version = "3.2" } chacha20poly1305 = { version = "0.10" } futures = { version = "0.3" } hkdf = { version = "0.12" } diff --git a/src/connection/mod.rs b/src/connection/mod.rs index 22c5993..7220a03 100644 --- a/src/connection/mod.rs +++ b/src/connection/mod.rs @@ -26,3 +26,18 @@ impl ProtocolVersion { *out = *self as u8; } } + +/// A single connection and its data +#[derive(Debug)] +pub struct Connection { + /// Connection ID + pub id: ID, + /// The main hkdf used for all secrets in this connection + pub hkdf: crate::enc::hkdf::HkdfSha3, +} + +impl Connection { + pub(crate) fn new(id: ID, hkdf: crate::enc::hkdf::HkdfSha3) -> Self { + Self { id, hkdf } + } +} diff --git a/src/connection/packet.rs b/src/connection/packet.rs index d8c366e..b59e02a 100644 --- a/src/connection/packet.rs +++ b/src/connection/packet.rs @@ -17,6 +17,19 @@ impl ConnectionID { pub fn new_handshake() -> Self { Self::Handshake } + /// New id from u64. PLZ NON ZERO + pub(crate) fn new_u64(raw: u64) -> Self { + #[allow(unsafe_code)] + unsafe { + ConnectionID::ID(::core::num::NonZeroU64::new_unchecked(raw)) + } + } + pub(crate) fn as_u64(&self) -> u64 { + match self { + ConnectionID::Handshake => 0, + ConnectionID::ID(id) => id.get(), + } + } /// New random service ID pub fn new_rand(rand: &::ring::rand::SystemRandom) -> Self { use ::ring::rand::SecureRandom; diff --git a/src/lib.rs b/src/lib.rs index d667a46..391cdff 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,9 +20,10 @@ pub mod dnssec; pub mod enc; use ::arc_swap::{ArcSwap, ArcSwapAny, ArcSwapOption}; -use ::std::{net::SocketAddr, pin::Pin, sync::Arc}; -use ::tokio::macros::support::Future; -use ::tokio::{net::UdpSocket, task::JoinHandle}; +use ::std::{net::SocketAddr, pin::Pin, sync::Arc, vec, vec::Vec}; +use ::tokio::{ + macros::support::Future, net::UdpSocket, sync::RwLock, task::JoinHandle, +}; use crate::enc::{ asym, @@ -30,7 +31,10 @@ use crate::enc::{ sym::{CipherKind, CipherRecv, CipherSend}, }; pub use config::Config; -use connection::handshake::{self, Handshake, HandshakeKey}; +use connection::{ + handshake::{self, Handshake, HandshakeKey}, + Connection, +}; /// Main fenrir library errors #[derive(::thiserror::Error, Debug)] @@ -52,18 +56,6 @@ pub enum Error { Key(#[from] crate::enc::Error), } -// No async here -struct FenrirInner { - key_exchanges: ArcSwapAny>>, - ciphers: ArcSwapAny>>, - keys: ArcSwapAny>>, -} - -#[allow(unsafe_code)] -unsafe impl Send for FenrirInner {} -#[allow(unsafe_code)] -unsafe impl Sync for FenrirInner {} - /// Information needed to reply after the key exchange #[derive(Debug, Clone)] pub struct AuthNeededInfo { @@ -83,6 +75,17 @@ pub enum HandshakeAction { /// Packet parsed, now go perform authentication AuthNeeded(AuthNeededInfo), } +// No async here +struct FenrirInner { + key_exchanges: ArcSwapAny>>, + ciphers: ArcSwapAny>>, + keys: ArcSwapAny>>, +} + +#[allow(unsafe_code)] +unsafe impl Send for FenrirInner {} +#[allow(unsafe_code)] +unsafe impl Sync for FenrirInner {} // No async here impl FenrirInner { @@ -215,17 +218,20 @@ impl SocketList { socket: Arc, handle: JoinHandle<::std::io::Result<()>>, ) { - let mut new_list; - { - let old_list = self.list.load(); - new_list = Arc::new(Vec::with_capacity(old_list.len() + 1)); + // we could simplify this into just a `.swap` instead of `.rcu` but + // it is not yet guaranteed that only one thread will call this fn + // ...we don't need performance here anyway + let arc_handle = Arc::new(handle); + self.list.rcu(|old_list| { + let mut new_list = Arc::new(Vec::with_capacity(old_list.len() + 1)); new_list = old_list.to_vec().into(); - } - Arc::get_mut(&mut new_list) - .unwrap() - .push((socket, Arc::new(handle))); - self.list.swap(new_list); + Arc::get_mut(&mut new_list) + .unwrap() + .push((socket.clone(), arc_handle.clone())); + new_list + }); } + /// This method assumes no other `add_sockets` are being run async fn stop_all(mut self) { let mut arc_list = self.list.into_inner(); let list = loop { @@ -282,6 +288,61 @@ enum Work { Recv(RawUdp), } +// PERF: Arc> loks a bit too much, need to find +// faster ways to do this +struct ConnList { + connections: Vec>>, + /// Bitmap to track which connection ids are used or free + ids_used: Vec<::bitmaps::Bitmap<1024>>, +} + +impl ConnList { + fn new() -> Self { + let mut bitmap_id = ::bitmaps::Bitmap::<1024>::new(); + bitmap_id.set(0, true); // ID(0) == handshake + Self { + connections: Vec::with_capacity(128), + ids_used: vec![bitmap_id], + } + } + fn reserve_first(&mut self, hkdf: HkdfSha3) -> Arc { + // uhm... bad things are going on here: + // * id must be initialized, but only because: + // * rust does not understand that after the `!found` id is always + // initialized + // * `ID::new_u64` is really safe only with >0, but here it always is + // ...we should probably rewrite it in better, safer rust + let mut id: u64 = 0; + let mut found = false; + for (i, b) in self.ids_used.iter_mut().enumerate() { + match b.first_false_index() { + Some(idx) => { + b.set(idx, true); + id = ((i as u64) * 1024) + (idx as u64); + found = true; + break; + } + None => {} + } + } + if !found { + let mut new_bitmap = ::bitmaps::Bitmap::<1024>::new(); + new_bitmap.set(0, true); + id = (self.ids_used.len() as u64) * 1024; + self.ids_used.push(new_bitmap); + } + let new_id = connection::ID::new_u64(id); + let new_conn = Arc::new(connection::Connection::new(new_id, hkdf)); + if (self.connections.len() as u64) < id { + self.connections.push(Some(new_conn.clone())); + } else { + // very probably redundant + self.connections[id as usize] = Some(new_conn.clone()); + } + new_conn + } +} + /// Instance of a fenrir endpoint #[allow(missing_copy_implementations, missing_debug_implementations)] pub struct Fenrir { @@ -303,6 +364,8 @@ pub struct Fenrir { work_recv: Arc<::async_channel::Receiver>, // PERF: rand uses syscalls. should we do that async? rand: ::ring::rand::SystemRandom, + /// list of Established connections + connections: Arc>, } // TODO: graceful vs immediate stop @@ -333,6 +396,7 @@ impl Fenrir { work_send: Arc::new(work_send), work_recv: Arc::new(work_recv), rand: ::ring::rand::SystemRandom::new(), + connections: Arc::new(RwLock::new(ConnList::new())), }; Ok(endpoint) } @@ -389,7 +453,7 @@ impl Fenrir { /// Add all UDP sockets found in config /// and start listening for packets - async fn add_sockets(&mut self) -> ::std::io::Result<()> { + async fn add_sockets(&self) -> ::std::io::Result<()> { let sockets = self.cfg.listen.iter().map(|s_addr| async { let socket = ::tokio::spawn(Self::bind_udp(s_addr.clone())).await??; @@ -589,20 +653,28 @@ impl Fenrir { // connection ID let srv_conn_id = connection::ID::new_rand(&self.rand); - let auth_conn_id = - connection::ID::new_rand(&self.rand); let srv_secret = enc::sym::Secret::new_rand(&self.rand); + // track connection + let auth_conn = { + let mut lock = + self.connections.write().await; + lock.reserve_first(authinfo.hkdf) + }; + + // TODO: move all the next bits into + // dirsync::Req::respond(...) + let resp_data = dirsync::RespData { client_nonce: req_data.nonce, - id: auth_conn_id, + id: auth_conn.id, service_id: srv_conn_id, service_key: srv_secret, }; // build response let secret_send = - authinfo.hkdf.get_secret(b"to_client"); + auth_conn.hkdf.get_secret(b"to_client"); let mut cipher_send = CipherSend::new( authinfo.cipher, secret_send, @@ -654,16 +726,19 @@ impl Fenrir { client: UdpClient, server: UdpServer, ) { - let sockets = self.sockets.lock(); - let src_sock = match sockets.find(server) { - Some(src_sock) => src_sock, - None => { - ::tracing::error!( - "Can't send packet: Server changed listening ip!" - ); - return; - } - }; + let src_sock; + { + let sockets = self.sockets.lock(); + src_sock = match sockets.find(server) { + Some(src_sock) => src_sock, + None => { + ::tracing::error!( + "Can't send packet: Server changed listening ip!" + ); + return; + } + }; + } src_sock.send_to(&data, client.0); } }