From a3430f18135f2fb9b7b76f9667af6df85864143b Mon Sep 17 00:00:00 2001 From: Luca Fulchir Date: Sun, 28 May 2023 18:23:14 +0200 Subject: [PATCH] Initial connections: share auth.server connection Signed-off-by: Luca Fulchir --- src/connection/mod.rs | 95 ++++++++++++++++++++++++++++++++++++++++--- src/dnssec/record.rs | 2 +- src/enc/asym.rs | 4 +- src/inner/worker.rs | 9 +++- src/lib.rs | 65 ++++++++++++++++++++++++++--- 5 files changed, 160 insertions(+), 15 deletions(-) diff --git a/src/connection/mod.rs b/src/connection/mod.rs index e264cf7..6d3325d 100644 --- a/src/connection/mod.rs +++ b/src/connection/mod.rs @@ -12,7 +12,9 @@ pub use crate::connection::{ }; use crate::{ + dnssec, enc::{ + asym::PubKey, hkdf::HkdfSha3, sym::{CipherKind, CipherRecv, CipherSend}, }, @@ -201,11 +203,94 @@ impl ConnList { } } -/* use ::std::collections::HashMap; -pub(crate) struct AuthServerConnections { - conn_map : HashMap< - pub id: IDSend, +enum MapEntry { + Present(IDSend), + Reserved, +} + +/// return wether we already have a connection, we are waiting for one, or you +/// can start one +#[derive(Debug, Clone, Copy)] +pub(crate) enum Reservation { + /// we already have a connection. use this ID. + Present(IDSend), + /// we don't have a connection, but we are waiting for one to be established. + Waiting, + /// we have reserved a spot for your connection. + Reserved, +} + +/// Link the public key of the authentication server to a connection id +/// so that we can reuse that connection to ask for more authentications +/// +/// Note that a server can have multiple public keys, +/// and the handshake will only ever verify one. +/// To avoid malicious publication fo keys that are not yours, +/// on connection we: +/// * reserve all public keys of the server +/// * wait for the connection to finish +/// * remove all those reservations, exept the one key that actually succeded +/// While searching, we return a connection ID if just one key is a match +pub(crate) struct AuthServerConnections { + conn_map: HashMap, + next_reservation: u64, +} + +impl AuthServerConnections { + pub(crate) fn new() -> Self { + Self { + conn_map: HashMap::with_capacity(32), + next_reservation: 0, + } + } + /// add an ID to the reserved spot, + /// and unlock the other pubkeys which have not been verified + pub(crate) fn add( + &mut self, + pubkey: &PubKey, + id: IDSend, + record: &dnssec::Record, + ) { + let _ = self.conn_map.insert(*pubkey, MapEntry::Present(id)); + for (_, pk) in record.public_keys.iter() { + if pk == pubkey { + continue; + } + let _ = self.conn_map.remove(pk); + } + } + /// remove a dropped connection + pub(crate) fn remove_reserved(&mut self, record: &dnssec::Record) { + for (_, pk) in record.public_keys.iter() { + let _ = self.conn_map.remove(pk); + } + } + /// remove a dropped connection + pub(crate) fn remove_conn(&mut self, pubkey: &PubKey) { + let _ = self.conn_map.remove(pubkey); + } + + /// each dnssec::Record has multiple Pubkeys. reserve and ID for them all. + /// later on, when `add` is called we will delete + /// those that have not actually benn used + pub(crate) fn get_or_reserve( + &mut self, + record: &dnssec::Record, + ) -> Reservation { + for (_, pk) in record.public_keys.iter() { + match self.conn_map.get(pk) { + None => {} + Some(MapEntry::Reserved) => return Reservation::Waiting, + Some(MapEntry::Present(id)) => { + return Reservation::Present(id.clone()) + } + } + } + for (_, pk) in record.public_keys.iter() { + let _ = self.conn_map.insert(*pk, MapEntry::Reserved); + } + Reservation::Reserved + } } -*/ diff --git a/src/dnssec/record.rs b/src/dnssec/record.rs index b4e9bb9..6709751 100644 --- a/src/dnssec/record.rs +++ b/src/dnssec/record.rs @@ -18,8 +18,8 @@ //! * X bytes: IP //! ] //! [ # list of pubkeys -//! * 1 byte: pubkey type //! * 1 byte: pubkey id +//! * 1 byte: pubkey type //! * Y bytes: pubkey //! ] diff --git a/src/enc/asym.rs b/src/enc/asym.rs index 160ee12..6fd7901 100644 --- a/src/enc/asym.rs +++ b/src/enc/asym.rs @@ -85,7 +85,7 @@ impl KeyExchange { } /// Kind of public key in the handshake -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)] #[allow(missing_debug_implementations)] #[non_exhaustive] pub enum PubKey { @@ -204,7 +204,7 @@ impl ExchangePrivKey { } /// all Ephemeral Public keys -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)] #[non_exhaustive] pub enum ExchangePubKey { /// X25519(Curve25519) used for key exchange diff --git a/src/inner/worker.rs b/src/inner/worker.rs index c7adf10..512c2f6 100644 --- a/src/inner/worker.rs +++ b/src/inner/worker.rs @@ -11,7 +11,7 @@ use crate::{ ConnList, Connection, IDSend, Packet, }, dnssec, - enc::{hkdf::HkdfSha3, sym::Secret}, + enc::{asym::PubKey, hkdf::HkdfSha3, sym::Secret}, inner::{HandshakeAction, HandshakeTracker, ThreadTracker}, }; use ::std::{rc::Rc, sync::Arc, vec::Vec}; @@ -29,11 +29,16 @@ pub(crate) struct RawUdp { pub packet: Packet, } +pub(crate) enum ConnectionResult { + Failed(crate::Error), + Established((PubKey, IDSend)), +} + pub(crate) enum Work { /// ask the thread to report to the main thread the total number of /// connections present CountConnections(oneshot::Sender), - Connect((oneshot::Sender, dnssec::Record, ServiceID)), + Connect((oneshot::Sender, dnssec::Record, ServiceID)), Recv(RawUdp), } pub(crate) enum WorkAnswer { diff --git a/src/lib.rs b/src/lib.rs index 1aed2e2..6f48ec5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,7 +21,7 @@ pub mod enc; mod inner; use ::std::{sync::Arc, vec::Vec}; -use ::tokio::net::UdpSocket; +use ::tokio::{net::UdpSocket, sync::Mutex}; use auth::ServiceID; use crate::{ @@ -29,7 +29,7 @@ use crate::{ connection::{ handshake, socket::{SocketList, UdpClient, UdpServer}, - Packet, + AuthServerConnections, Packet, }, inner::{ worker::{RawUdp, Work, Worker}, @@ -74,6 +74,9 @@ pub struct Fenrir { stop_working: ::tokio::sync::broadcast::Sender, /// where to ask for token check token_check: Option>>, + /// tracks the connections to authentication servers + /// so that we can reuse them + conn_auth_srv: Mutex, // TODO: find a way to both increase and decrease these two in a thread-safe // manner _thread_pool: Vec<::std::thread::JoinHandle<()>>, @@ -98,6 +101,7 @@ impl Fenrir { dnssec: None, stop_working: sender, token_check: None, + conn_auth_srv: Mutex::new(AuthServerConnections::new()), _thread_pool: Vec::new(), _thread_work: Arc::new(Vec::new()), }; @@ -245,6 +249,30 @@ impl Fenrir { service: ServiceID, ) -> Result<(), Error> { let resolved = self.resolv(domain).await?; + loop { + // check if we already have a connection to that auth. srv + let is_reserved = { + let mut conn_auth_lock = self.conn_auth_srv.lock().await; + conn_auth_lock.get_or_reserve(&resolved) + }; + use connection::Reservation; + match is_reserved { + Reservation::Waiting => { + use ::std::time::Duration; + use ::tokio::time::sleep; + // PERF: exponential backoff. + // or we can have a broadcast channel + sleep(Duration::from_millis(50)).await; + continue; + } + Reservation::Reserved => break, + Reservation::Present(id_send) => { + //TODO: reuse connection + todo!() + } + } + } + // Spot reserved for the connection // find the thread with less connections @@ -280,12 +308,39 @@ impl Fenrir { // and tell that thread to connect somewhere let (send, recv) = ::tokio::sync::oneshot::channel(); let _ = self._thread_work[thread_idx] - .send(Work::Connect((send, resolved, service))) + .send(Work::Connect((send, resolved.clone(), service))) .await; - let _conn_res = recv.await; + match recv.await { + Ok(res) => { + use crate::inner::worker::ConnectionResult; + match res { + ConnectionResult::Failed(e) => { + let mut conn_auth_lock = + self.conn_auth_srv.lock().await; + conn_auth_lock.remove_reserved(&resolved); + Err(e) + } + ConnectionResult::Established((pubkey, id_send)) => { + let mut conn_auth_lock = + self.conn_auth_srv.lock().await; + conn_auth_lock.add(&pubkey, id_send, &resolved); - todo!() + //FIXME: user needs to somehow track the connection + Ok(()) + } + } + } + Err(e) => { + // Thread dropped the sender. no more thread? + let mut conn_auth_lock = self.conn_auth_srv.lock().await; + conn_auth_lock.remove_reserved(&resolved); + Err(Error::IO(::std::io::Error::new( + ::std::io::ErrorKind::Interrupted, + "recv failure on connect: ".to_owned() + &e.to_string(), + ))) + } + } } /// Start one working thread for each physical cpu