From aff1c313f5d4f98d9e48818645c9a7191a0e53c6 Mon Sep 17 00:00:00 2001 From: Luca Fulchir Date: Sat, 10 Jun 2023 14:42:24 +0200 Subject: [PATCH] Cleanup & incomplete tests Signed-off-by: Luca Fulchir --- flake.nix | 3 +- src/config/mod.rs | 9 +- src/connection/handshake/mod.rs | 109 +--------- src/connection/handshake/tests.rs | 2 +- src/connection/handshake/tracker.rs | 326 ++++++++++++++++++++++++++++ src/dnssec/mod.rs | 64 +----- src/dnssec/record.rs | 2 +- src/dnssec/tests.rs | 59 +++++ src/enc/asym.rs | 10 +- src/enc/sym.rs | 1 - src/inner/mod.rs | 231 -------------------- src/inner/worker.rs | 5 +- src/lib.rs | 85 +++++++- src/tests.rs | 126 +++++++++++ 14 files changed, 618 insertions(+), 414 deletions(-) create mode 100644 src/connection/handshake/tracker.rs create mode 100644 src/dnssec/tests.rs create mode 100644 src/tests.rs diff --git a/flake.nix b/flake.nix index 7a74671..21e2442 100644 --- a/flake.nix +++ b/flake.nix @@ -18,6 +18,7 @@ pkgs-unstable = import nixpkgs-unstable { inherit system overlays; }; + RUST_VERSION="1.69.0"; in { devShells.default = pkgs.mkShell { @@ -40,7 +41,7 @@ cargo-flamegraph cargo-license lld - rust-bin.stable."1.69.0".default + rust-bin.stable.${RUST_VERSION}.default rustfmt rust-analyzer # fenrir deps diff --git a/src/config/mod.rs b/src/config/mod.rs index 09773c3..c0fe949 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -3,7 +3,11 @@ use crate::{ connection::handshake::HandshakeID, - enc::{asym::KeyExchangeKind, hkdf::HkdfKind, sym::CipherKind}, + enc::{ + asym::{KeyExchangeKind, KeyID, PrivKey, PubKey}, + hkdf::HkdfKind, + sym::CipherKind, + }, }; use ::std::{ net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, @@ -30,6 +34,8 @@ pub struct Config { pub hkdfs: Vec, /// Supported Ciphers pub ciphers: Vec, + /// list of public/private keys + pub keys: Vec<(KeyID, PrivKey, PubKey)>, } impl Default for Config { @@ -50,6 +56,7 @@ impl Default for Config { key_exchanges: [KeyExchangeKind::X25519DiffieHellman].to_vec(), hkdfs: [HkdfKind::Sha3].to_vec(), ciphers: [CipherKind::XChaCha20Poly1305].to_vec(), + keys: Vec::new(), } } } diff --git a/src/connection/handshake/mod.rs b/src/connection/handshake/mod.rs index 6322e79..6dd248e 100644 --- a/src/connection/handshake/mod.rs +++ b/src/connection/handshake/mod.rs @@ -3,14 +3,11 @@ pub mod dirsync; #[cfg(test)] mod tests; +pub(crate) mod tracker; use crate::{ - auth::ServiceID, - connection::{self, Connection, IDRecv, ProtocolVersion}, - enc::{ - asym::{KeyID, PrivKey, PubKey}, - sym::{HeadLen, TagLen}, - }, + connection::ProtocolVersion, + enc::sym::{HeadLen, TagLen}, }; use ::num_traits::FromPrimitive; @@ -70,106 +67,6 @@ impl HandshakeID { 1 } } - -pub(crate) struct HandshakeServer { - pub id: KeyID, - pub key: PrivKey, -} - -pub(crate) struct HandshakeClient { - pub service_id: ServiceID, - pub service_conn_id: IDRecv, - pub connection: Connection, - pub timeout: Option<::tokio::task::JoinHandle<()>>, -} - -/// Tracks the keys used by the client and the handshake -/// they are associated with -pub(crate) struct HandshakeClientList { - used: Vec<::bitmaps::Bitmap<1024>>, // index = KeyID - keys: Vec>, - list: Vec>, -} - -impl HandshakeClientList { - pub(crate) fn new() -> Self { - Self { - used: [::bitmaps::Bitmap::<1024>::new()].to_vec(), - keys: Vec::with_capacity(16), - list: Vec::with_capacity(16), - } - } - pub(crate) fn get(&self, id: KeyID) -> Option<&HandshakeClient> { - if id.0 as usize >= self.list.len() { - return None; - } - self.list[id.0 as usize].as_ref() - } - pub(crate) fn remove(&mut self, id: KeyID) -> Option { - if id.0 as usize >= self.list.len() { - return None; - } - let used_vec_idx = id.0 as usize / 1024; - let used_bitmap_idx = id.0 as usize % 1024; - let used_iter = match self.used.get_mut(used_vec_idx) { - Some(used_iter) => used_iter, - None => return None, - }; - used_iter.set(used_bitmap_idx, false); - self.keys[id.0 as usize] = None; - let mut owned = None; - ::core::mem::swap(&mut self.list[id.0 as usize], &mut owned); - owned - } - pub(crate) fn add( - &mut self, - priv_key: PrivKey, - pub_key: PubKey, - service_id: ServiceID, - service_conn_id: IDRecv, - connection: Connection, - ) -> Result<(KeyID, &mut HandshakeClient), ()> { - let maybe_free_key_idx = - self.used.iter().enumerate().find_map(|(idx, bmap)| { - match bmap.first_false_index() { - Some(false_idx) => Some(((idx * 1024), false_idx)), - None => None, - } - }); - let free_key_idx = match maybe_free_key_idx { - Some((idx, false_idx)) => { - let free_key_idx = idx * 1024 + false_idx; - if free_key_idx > KeyID::MAX as usize { - return Err(()); - } - self.used[idx].set(false_idx, true); - free_key_idx - } - None => { - let mut bmap = ::bitmaps::Bitmap::<1024>::new(); - bmap.set(0, true); - self.used.push(bmap); - self.used.len() * 1024 - } - }; - if self.keys.len() >= free_key_idx { - self.keys.push(None); - self.list.push(None); - } - self.keys[free_key_idx] = Some((priv_key, pub_key)); - self.list[free_key_idx] = Some(HandshakeClient { - service_id, - service_conn_id, - connection, - timeout: None, - }); - Ok(( - KeyID(free_key_idx as u16), - self.list[free_key_idx].as_mut().unwrap(), - )) - } -} - /// Parsed handshake #[derive(Debug, Clone, PartialEq)] pub enum HandshakeData { diff --git a/src/connection/handshake/tests.rs b/src/connection/handshake/tests.rs index fc8b2b3..83cacfd 100644 --- a/src/connection/handshake/tests.rs +++ b/src/connection/handshake/tests.rs @@ -1,7 +1,7 @@ use crate::{ auth, connection::{handshake::*, ID}, - enc, + enc::{self, asym::KeyID}, }; #[test] diff --git a/src/connection/handshake/tracker.rs b/src/connection/handshake/tracker.rs new file mode 100644 index 0000000..a907f3b --- /dev/null +++ b/src/connection/handshake/tracker.rs @@ -0,0 +1,326 @@ +//! Handhsake handling + +use crate::{ + auth::ServiceID, + connection::{ + self, + handshake::{self, Error, Handshake}, + Connection, IDRecv, + }, + enc::{ + self, + asym::{self, KeyID, PrivKey, PubKey}, + hkdf::{Hkdf, HkdfKind}, + sym::{CipherKind, CipherRecv}, + }, + inner::ThreadTracker, +}; + +pub(crate) struct HandshakeServer { + pub id: KeyID, + pub key: PrivKey, +} + +pub(crate) struct HandshakeClient { + pub service_id: ServiceID, + pub service_conn_id: IDRecv, + pub connection: Connection, + pub timeout: Option<::tokio::task::JoinHandle<()>>, +} + +/// Tracks the keys used by the client and the handshake +/// they are associated with +pub(crate) struct HandshakeClientList { + used: Vec<::bitmaps::Bitmap<1024>>, // index = KeyID + keys: Vec>, + list: Vec>, +} + +impl HandshakeClientList { + pub(crate) fn new() -> Self { + Self { + used: [::bitmaps::Bitmap::<1024>::new()].to_vec(), + keys: Vec::with_capacity(16), + list: Vec::with_capacity(16), + } + } + pub(crate) fn get(&self, id: KeyID) -> Option<&HandshakeClient> { + if id.0 as usize >= self.list.len() { + return None; + } + self.list[id.0 as usize].as_ref() + } + pub(crate) fn remove(&mut self, id: KeyID) -> Option { + if id.0 as usize >= self.list.len() { + return None; + } + let used_vec_idx = id.0 as usize / 1024; + let used_bitmap_idx = id.0 as usize % 1024; + let used_iter = match self.used.get_mut(used_vec_idx) { + Some(used_iter) => used_iter, + None => return None, + }; + used_iter.set(used_bitmap_idx, false); + self.keys[id.0 as usize] = None; + let mut owned = None; + ::core::mem::swap(&mut self.list[id.0 as usize], &mut owned); + owned + } + pub(crate) fn add( + &mut self, + priv_key: PrivKey, + pub_key: PubKey, + service_id: ServiceID, + service_conn_id: IDRecv, + connection: Connection, + ) -> Result<(KeyID, &mut HandshakeClient), ()> { + let maybe_free_key_idx = + self.used.iter().enumerate().find_map(|(idx, bmap)| { + match bmap.first_false_index() { + Some(false_idx) => Some(((idx * 1024), false_idx)), + None => None, + } + }); + let free_key_idx = match maybe_free_key_idx { + Some((idx, false_idx)) => { + let free_key_idx = idx * 1024 + false_idx; + if free_key_idx > KeyID::MAX as usize { + return Err(()); + } + self.used[idx].set(false_idx, true); + free_key_idx + } + None => { + let mut bmap = ::bitmaps::Bitmap::<1024>::new(); + bmap.set(0, true); + self.used.push(bmap); + self.used.len() * 1024 + } + }; + if self.keys.len() >= free_key_idx { + self.keys.push(None); + self.list.push(None); + } + self.keys[free_key_idx] = Some((priv_key, pub_key)); + self.list[free_key_idx] = Some(HandshakeClient { + service_id, + service_conn_id, + connection, + timeout: None, + }); + Ok(( + KeyID(free_key_idx as u16), + self.list[free_key_idx].as_mut().unwrap(), + )) + } +} +/// Information needed to reply after the key exchange +#[derive(Debug, Clone)] +pub(crate) struct AuthNeededInfo { + /// Parsed handshake packet + pub handshake: Handshake, + /// hkdf generated from the handshake + pub hkdf: Hkdf, + /// cipher to be used in both directions + pub cipher: CipherKind, +} + +/// Client information needed to fully establish the conenction +#[derive(Debug)] +pub(crate) struct ClientConnectInfo { + /// The service ID that we are connecting to + pub service_id: ServiceID, + /// The service ID that we are connecting to + pub service_connection_id: IDRecv, + /// Parsed handshake packet + pub handshake: Handshake, + /// Connection + pub connection: Connection, +} +/// Intermediate actions to be taken while parsing the handshake +#[derive(Debug)] +pub(crate) enum HandshakeAction { + /// Parsing finished, all ok, nothing to do + Nonthing, + /// Packet parsed, now go perform authentication + AuthNeeded(AuthNeededInfo), + /// the client can fully establish a connection with this info + ClientConnect(ClientConnectInfo), +} + +/// Tracking of handhsakes and conenctions +/// Note that we have multiple Handshake trackers, pinned to different cores +/// Each of them will handle a subset of all handshakes. +/// Each handshake is routed to a different tracker by checking +/// core = (udp_src_sender_port % total_threads) - 1 +pub(crate) struct HandshakeTracker { + thread_id: ThreadTracker, + key_exchanges: Vec<(asym::KeyKind, asym::KeyExchangeKind)>, + ciphers: Vec, + /// ephemeral keys used server side in key exchange + keys_srv: Vec, + /// ephemeral keys used client side in key exchange + hshake_cli: HandshakeClientList, +} + +impl HandshakeTracker { + pub(crate) fn new(thread_id: ThreadTracker) -> Self { + Self { + thread_id, + ciphers: Vec::new(), + key_exchanges: Vec::new(), + keys_srv: Vec::new(), + hshake_cli: HandshakeClientList::new(), + } + } + pub(crate) fn new_client( + &mut self, + priv_key: PrivKey, + pub_key: PubKey, + service_id: ServiceID, + service_conn_id: IDRecv, + connection: Connection, + ) -> Result<(KeyID, &mut HandshakeClient), ()> { + self.hshake_cli.add( + priv_key, + pub_key, + service_id, + service_conn_id, + connection, + ) + } + pub(crate) fn timeout_client( + &mut self, + key_id: KeyID, + ) -> Option<[IDRecv; 2]> { + if let Some(hshake) = self.hshake_cli.remove(key_id) { + Some([hshake.connection.id_recv, hshake.service_conn_id]) + } else { + None + } + } + pub(crate) fn recv_handshake( + &mut self, + mut handshake: Handshake, + handshake_raw: &mut [u8], + ) -> Result { + use connection::handshake::{dirsync::DirSync, HandshakeData}; + match handshake.data { + HandshakeData::DirSync(ref mut ds) => match ds { + DirSync::Req(ref mut req) => { + let ephemeral_key = { + if let Some(h_k) = + self.keys_srv.iter().find(|k| k.id == req.key_id) + { + // Directory synchronized can only use keys + // for key exchange, not signing keys + if let PrivKey::Exchange(k) = &h_k.key { + Some(k.clone()) + } else { + None + } + } else { + None + } + }; + if ephemeral_key.is_none() { + ::tracing::debug!( + "No such server key id: {:?}", + req.key_id + ); + return Err(handshake::Error::UnknownKeyID.into()); + } + let ephemeral_key = ephemeral_key.unwrap(); + { + if None + == self.key_exchanges.iter().find(|&x| { + *x == (ephemeral_key.kind(), req.exchange) + }) + { + return Err( + enc::Error::UnsupportedKeyExchange.into() + ); + } + } + { + if None + == self.ciphers.iter().find(|&x| *x == req.cipher) + { + return Err(enc::Error::UnsupportedCipher.into()); + } + } + let shared_key = match ephemeral_key + .key_exchange(req.exchange, req.exchange_key) + { + Ok(shared_key) => shared_key, + Err(e) => return Err(handshake::Error::Key(e).into()), + }; + let hkdf = Hkdf::new(HkdfKind::Sha3, b"fenrir", shared_key); + let secret_recv = hkdf.get_secret(b"to_server"); + let cipher_recv = CipherRecv::new(req.cipher, secret_recv); + use crate::enc::sym::AAD; + let aad = AAD(&mut []); // no aad for now + match cipher_recv.decrypt( + aad, + &mut handshake_raw[req.encrypted_offset()..], + ) { + Ok(cleartext) => { + req.data.deserialize_as_cleartext(cleartext)?; + } + Err(e) => { + return Err(handshake::Error::Key(e).into()); + } + } + + let cipher = req.cipher; + + return Ok(HandshakeAction::AuthNeeded(AuthNeededInfo { + handshake, + hkdf, + cipher, + })); + } + DirSync::Resp(resp) => { + let hshake = match self.hshake_cli.get(resp.client_key_id) { + Some(hshake) => hshake, + None => { + ::tracing::debug!( + "No such client key id: {:?}", + resp.client_key_id + ); + return Err(handshake::Error::UnknownKeyID.into()); + } + }; + let cipher_recv = &hshake.connection.cipher_recv; + use crate::enc::sym::AAD; + // no aad for now + let aad = AAD(&mut []); + let mut raw_data = &mut handshake_raw[resp + .encrypted_offset() + ..(resp.encrypted_offset() + resp.encrypted_length())]; + match cipher_recv.decrypt(aad, &mut raw_data) { + Ok(cleartext) => { + resp.data.deserialize_as_cleartext(&cleartext)?; + } + Err(e) => { + return Err(handshake::Error::Key(e).into()); + } + } + let hshake = + self.hshake_cli.remove(resp.client_key_id).unwrap(); + if let Some(timeout) = hshake.timeout { + timeout.abort(); + } + return Ok(HandshakeAction::ClientConnect( + ClientConnectInfo { + service_id: hshake.service_id, + service_connection_id: hshake.service_conn_id, + handshake, + connection: hshake.connection, + }, + )); + } + }, + } + } +} diff --git a/src/dnssec/mod.rs b/src/dnssec/mod.rs index bc0f661..d1128c1 100644 --- a/src/dnssec/mod.rs +++ b/src/dnssec/mod.rs @@ -9,6 +9,9 @@ pub use record::Record; use crate::auth::Domain; +#[cfg(test)] +mod tests; + /// Common errors for Dnssec setup and usage #[derive(::thiserror::Error, Debug)] pub enum Error { @@ -44,7 +47,7 @@ pub struct Dnssec { impl Dnssec { /// Spawn connections to DNS via TCP - pub async fn new(resolvers: &Vec) -> Result { + pub fn new(resolvers: &Vec) -> Result { // use a TCP connection to the DNS. // the records we need are big, will not fit in a UDP packet let resolv_conf_resolvers: Vec; @@ -146,62 +149,3 @@ impl Dnssec { }; } } -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_serialization() { - let rand = enc::Random::new(); - let (_, exchange_key) = - match enc::asym::KeyExchangeKind::X25519DiffieHellman - .new_keypair(&rand) - { - Ok(pair) => pair, - Err(_) => { - assert!(false, "Can't generate random keypair"); - return; - } - }; - use crate::enc; - let record = Record { - public_keys : [(enc::asym::KeyID(42), - enc::asym::PubKey::Exchange(exchange_key))].to_vec(), - addresses: [record::Address { - ip: ::std::net::IpAddr::V4(::std::net::Ipv4Addr::new(127,0,0,1)), - port: Some(::core::num::NonZeroU16::new(31337).unwrap()), - priority: record::AddressPriority::P1, - weight: record::AddressWeight::W1, - handshake_ids: [crate::connection::handshake::HandshakeID::DirectorySynchronized].to_vec(), - public_key_idx : [record::PubKeyIdx(0)].to_vec(), - - }].to_vec(), - key_exchanges: [enc::asym::KeyExchangeKind::X25519DiffieHellman].to_vec(), - hkdfs: [enc::hkdf::HkdfKind::Sha3].to_vec(), - ciphers: [enc::sym::CipherKind::XChaCha20Poly1305].to_vec(), - - }; - let encoded = match record.encode() { - Ok(encoded) => encoded, - Err(e) => { - assert!(false, "{}", e.to_string()); - return; - } - }; - let full_record = "v=Fenrir1 ".to_string() + &encoded; - let record = match Dnssec::parse_txt_record(&full_record) { - Ok(record) => record, - Err(e) => { - assert!(false, "{}", e.to_string()); - return; - } - }; - let _re_encoded = match record.encode() { - Ok(re_encoded) => re_encoded, - Err(e) => { - assert!(false, "{}", e.to_string()); - return; - } - }; - } -} diff --git a/src/dnssec/record.rs b/src/dnssec/record.rs index 6a49a3a..9426b3e 100644 --- a/src/dnssec/record.rs +++ b/src/dnssec/record.rs @@ -46,7 +46,7 @@ use crate::{ connection::handshake::HandshakeID, enc::{ self, - asym::{ExchangePubKey, KeyExchangeKind, KeyID, PubKey}, + asym::{KeyExchangeKind, KeyID, PubKey}, hkdf::HkdfKind, sym::CipherKind, }, diff --git a/src/dnssec/tests.rs b/src/dnssec/tests.rs new file mode 100644 index 0000000..ff450ae --- /dev/null +++ b/src/dnssec/tests.rs @@ -0,0 +1,59 @@ +use super::*; + +#[test] +fn test_dnssec_serialization() { + let rand = enc::Random::new(); + let (_, exchange_key) = + match enc::asym::KeyExchangeKind::X25519DiffieHellman.new_keypair(&rand) + { + Ok(pair) => pair, + Err(_) => { + assert!(false, "Can't generate random keypair"); + return; + } + }; + use crate::{connection::handshake::HandshakeID, enc}; + + let record = Record { + public_keys: [( + enc::asym::KeyID(42), + enc::asym::PubKey::Exchange(exchange_key), + )] + .to_vec(), + addresses: [record::Address { + ip: ::std::net::IpAddr::V4(::std::net::Ipv4Addr::new(127, 0, 0, 1)), + port: Some(::core::num::NonZeroU16::new(31337).unwrap()), + priority: record::AddressPriority::P1, + weight: record::AddressWeight::W1, + handshake_ids: [HandshakeID::DirectorySynchronized].to_vec(), + public_key_idx: [record::PubKeyIdx(0)].to_vec(), + }] + .to_vec(), + key_exchanges: [enc::asym::KeyExchangeKind::X25519DiffieHellman] + .to_vec(), + hkdfs: [enc::hkdf::HkdfKind::Sha3].to_vec(), + ciphers: [enc::sym::CipherKind::XChaCha20Poly1305].to_vec(), + }; + let encoded = match record.encode() { + Ok(encoded) => encoded, + Err(e) => { + assert!(false, "{}", e.to_string()); + return; + } + }; + let full_record = "v=Fenrir1 ".to_string() + &encoded; + let record = match Dnssec::parse_txt_record(&full_record) { + Ok(record) => record, + Err(e) => { + assert!(false, "{}", e.to_string()); + return; + } + }; + let _re_encoded = match record.encode() { + Ok(re_encoded) => re_encoded, + Err(e) => { + assert!(false, "{}", e.to_string()); + return; + } + }; +} diff --git a/src/enc/asym.rs b/src/enc/asym.rs index 16201cf..32c3aa2 100644 --- a/src/enc/asym.rs +++ b/src/enc/asym.rs @@ -169,7 +169,6 @@ impl KeyExchangeKind { let priv_key = ExchangePrivKey::X25519(raw_priv); Ok((priv_key, pub_key)) } - _ => Err(Error::UnsupportedKeyExchange), } } } @@ -300,6 +299,15 @@ impl PrivKey { } } } +// Fake debug implementation to avoid leaking secrets +impl ::core::fmt::Debug for PrivKey { + fn fmt( + &self, + f: &mut core::fmt::Formatter<'_>, + ) -> Result<(), ::std::fmt::Error> { + ::core::fmt::Debug::fmt("[hidden privkey]", f) + } +} /// Ephemeral private keys #[derive(Clone)] diff --git a/src/enc/sym.rs b/src/enc/sym.rs index 37b5d78..5728808 100644 --- a/src/enc/sym.rs +++ b/src/enc/sym.rs @@ -5,7 +5,6 @@ use crate::{ config::Config, enc::{Random, Secret}, }; -use ::zeroize::Zeroize; /// List of possible Ciphers #[derive( diff --git a/src/inner/mod.rs b/src/inner/mod.rs index 882e8c1..001ca16 100644 --- a/src/inner/mod.rs +++ b/src/inner/mod.rs @@ -4,60 +4,6 @@ pub(crate) mod worker; -use crate::{ - auth::ServiceID, - connection::{ - self, - handshake::{ - self, Handshake, HandshakeClient, HandshakeClientList, - HandshakeServer, - }, - Connection, IDRecv, - }, - enc::{ - self, - asym::{self, KeyID, PrivKey, PubKey}, - hkdf::{Hkdf, HkdfKind}, - sym::{CipherKind, CipherRecv}, - }, - Error, -}; -use ::std::vec::Vec; - -/// Information needed to reply after the key exchange -#[derive(Debug, Clone)] -pub(crate) struct AuthNeededInfo { - /// Parsed handshake packet - pub handshake: Handshake, - /// hkdf generated from the handshake - pub hkdf: Hkdf, - /// cipher to be used in both directions - pub cipher: CipherKind, -} - -/// Client information needed to fully establish the conenction -#[derive(Debug)] -pub(crate) struct ClientConnectInfo { - /// The service ID that we are connecting to - pub service_id: ServiceID, - /// The service ID that we are connecting to - pub service_connection_id: IDRecv, - /// Parsed handshake packet - pub handshake: Handshake, - /// Connection - pub connection: Connection, -} -/// Intermediate actions to be taken while parsing the handshake -#[derive(Debug)] -pub(crate) enum HandshakeAction { - /// Parsing finished, all ok, nothing to do - Nonthing, - /// Packet parsed, now go perform authentication - AuthNeeded(AuthNeededInfo), - /// the client can fully establish a connection with this info - ClientConnect(ClientConnectInfo), -} - /// Track the total number of threads and our index /// 65K cpus should be enough for anybody #[derive(Debug, Clone, Copy)] @@ -66,180 +12,3 @@ pub(crate) struct ThreadTracker { /// Note: starts from 1 pub id: u16, } - -/// Tracking of handhsakes and conenctions -/// Note that we have multiple Handshake trackers, pinned to different cores -/// Each of them will handle a subset of all handshakes. -/// Each handshake is routed to a different tracker by checking -/// core = (udp_src_sender_port % total_threads) - 1 -pub(crate) struct HandshakeTracker { - thread_id: ThreadTracker, - key_exchanges: Vec<(asym::KeyKind, asym::KeyExchangeKind)>, - ciphers: Vec, - /// ephemeral keys used server side in key exchange - keys_srv: Vec, - /// ephemeral keys used client side in key exchange - hshake_cli: HandshakeClientList, -} - -impl HandshakeTracker { - pub(crate) fn new(thread_id: ThreadTracker) -> Self { - Self { - thread_id, - ciphers: Vec::new(), - key_exchanges: Vec::new(), - keys_srv: Vec::new(), - hshake_cli: HandshakeClientList::new(), - } - } - pub(crate) fn new_client( - &mut self, - priv_key: PrivKey, - pub_key: PubKey, - service_id: ServiceID, - service_conn_id: IDRecv, - connection: Connection, - ) -> Result<(KeyID, &mut HandshakeClient), ()> { - self.hshake_cli.add( - priv_key, - pub_key, - service_id, - service_conn_id, - connection, - ) - } - pub(crate) fn timeout_client( - &mut self, - key_id: KeyID, - ) -> Option<[IDRecv; 2]> { - if let Some(hshake) = self.hshake_cli.remove(key_id) { - Some([hshake.connection.id_recv, hshake.service_conn_id]) - } else { - None - } - } - pub(crate) fn recv_handshake( - &mut self, - mut handshake: Handshake, - handshake_raw: &mut [u8], - ) -> Result { - use connection::handshake::{dirsync::DirSync, HandshakeData}; - match handshake.data { - HandshakeData::DirSync(ref mut ds) => match ds { - DirSync::Req(ref mut req) => { - let ephemeral_key = { - if let Some(h_k) = - self.keys_srv.iter().find(|k| k.id == req.key_id) - { - // Directory synchronized can only use keys - // for key exchange, not signing keys - if let PrivKey::Exchange(k) = &h_k.key { - Some(k.clone()) - } else { - None - } - } else { - None - } - }; - if ephemeral_key.is_none() { - ::tracing::debug!( - "No such server key id: {:?}", - req.key_id - ); - return Err(handshake::Error::UnknownKeyID.into()); - } - let ephemeral_key = ephemeral_key.unwrap(); - { - if None - == self.key_exchanges.iter().find(|&x| { - *x == (ephemeral_key.kind(), req.exchange) - }) - { - return Err( - enc::Error::UnsupportedKeyExchange.into() - ); - } - } - { - if None - == self.ciphers.iter().find(|&x| *x == req.cipher) - { - return Err(enc::Error::UnsupportedCipher.into()); - } - } - let shared_key = match ephemeral_key - .key_exchange(req.exchange, req.exchange_key) - { - Ok(shared_key) => shared_key, - Err(e) => return Err(handshake::Error::Key(e).into()), - }; - let hkdf = Hkdf::new(HkdfKind::Sha3, b"fenrir", shared_key); - let secret_recv = hkdf.get_secret(b"to_server"); - let cipher_recv = CipherRecv::new(req.cipher, secret_recv); - use crate::enc::sym::AAD; - let aad = AAD(&mut []); // no aad for now - match cipher_recv.decrypt( - aad, - &mut handshake_raw[req.encrypted_offset()..], - ) { - Ok(cleartext) => { - req.data.deserialize_as_cleartext(cleartext)?; - } - Err(e) => { - return Err(handshake::Error::Key(e).into()); - } - } - - let cipher = req.cipher; - - return Ok(HandshakeAction::AuthNeeded(AuthNeededInfo { - handshake, - hkdf, - cipher, - })); - } - DirSync::Resp(resp) => { - let hshake = match self.hshake_cli.get(resp.client_key_id) { - Some(hshake) => hshake, - None => { - ::tracing::debug!( - "No such client key id: {:?}", - resp.client_key_id - ); - return Err(handshake::Error::UnknownKeyID.into()); - } - }; - let cipher_recv = &hshake.connection.cipher_recv; - use crate::enc::sym::AAD; - // no aad for now - let aad = AAD(&mut []); - let mut raw_data = &mut handshake_raw[resp - .encrypted_offset() - ..(resp.encrypted_offset() + resp.encrypted_length())]; - match cipher_recv.decrypt(aad, &mut raw_data) { - Ok(cleartext) => { - resp.data.deserialize_as_cleartext(&cleartext)?; - } - Err(e) => { - return Err(handshake::Error::Key(e).into()); - } - } - let hshake = - self.hshake_cli.remove(resp.client_key_id).unwrap(); - if let Some(timeout) = hshake.timeout { - timeout.abort(); - } - return Ok(HandshakeAction::ClientConnect( - ClientConnectInfo { - service_id: hshake.service_id, - service_connection_id: hshake.service_conn_id, - handshake, - connection: hshake.connection, - }, - )); - } - }, - } - } -} diff --git a/src/inner/worker.rs b/src/inner/worker.rs index 4f0459f..e8f6c5a 100644 --- a/src/inner/worker.rs +++ b/src/inner/worker.rs @@ -7,6 +7,7 @@ use crate::{ handshake::{ self, dirsync::{self, DirSync}, + tracker::{HandshakeAction, HandshakeTracker}, Handshake, HandshakeData, }, socket::{UdpClient, UdpServer}, @@ -18,9 +19,9 @@ use crate::{ hkdf::{self, Hkdf, HkdfKind}, sym, Random, Secret, }, - inner::{HandshakeAction, HandshakeTracker, ThreadTracker}, + inner::ThreadTracker, }; -use ::std::{rc::Rc, sync::Arc, vec::Vec}; +use ::std::{sync::Arc, vec::Vec}; /// This worker must be cpu-pinned use ::tokio::{ net::UdpSocket, diff --git a/src/lib.rs b/src/lib.rs index cac07b0..0c01c8b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,6 +20,9 @@ pub mod dnssec; pub mod enc; mod inner; +#[cfg(test)] +mod tests; + use ::std::{sync::Arc, vec::Vec}; use ::tokio::{net::UdpSocket, sync::Mutex}; @@ -74,7 +77,7 @@ pub struct Fenrir { /// listening udp sockets sockets: SocketList, /// DNSSEC resolver, with failovers - dnssec: Option, + dnssec: dnssec::Dnssec, /// Broadcast channel to tell workers to stop working stop_working: ::tokio::sync::broadcast::Sender, /// where to ask for token check @@ -100,10 +103,11 @@ impl Fenrir { /// Create a new Fenrir endpoint pub fn new(config: &Config) -> Result { let (sender, _) = ::tokio::sync::broadcast::channel(1); + let dnssec = dnssec::Dnssec::new(&config.resolvers)?; let endpoint = Fenrir { cfg: config.clone(), sockets: SocketList::new(), - dnssec: None, + dnssec, stop_working: sender, token_check: None, conn_auth_srv: Mutex::new(AuthServerConnections::new()), @@ -113,6 +117,7 @@ impl Fenrir { Ok(endpoint) } + ///FIXME: remove this, move into new() /// Start all workers, listeners pub async fn start( &mut self, @@ -123,7 +128,14 @@ impl Fenrir { self.stop().await; return Err(e.into()); } - self.dnssec = Some(dnssec::Dnssec::new(&self.cfg.resolvers).await?); + Ok(()) + } + ///FIXME: remove this, move into new() + pub async fn setup_no_workers(&mut self) -> Result<(), Error> { + if let Err(e) = self.add_sockets().await { + self.stop().await; + return Err(e.into()); + } Ok(()) } @@ -137,7 +149,6 @@ impl Fenrir { let mut old_thread_pool = Vec::new(); ::std::mem::swap(&mut self._thread_pool, &mut old_thread_pool); let _ = old_thread_pool.into_iter().map(|th| th.join()); - self.dnssec = None; } /// Stop all workers, listeners @@ -148,7 +159,6 @@ impl Fenrir { let mut old_thread_pool = Vec::new(); ::std::mem::swap(&mut self._thread_pool, &mut old_thread_pool); let _ = old_thread_pool.into_iter().map(|th| th.join()); - self.dnssec = None; } /// Add all UDP sockets found in config /// and start listening for packets @@ -235,9 +245,9 @@ impl Fenrir { } /// Get the raw TXT record of a Fenrir domain pub async fn resolv_txt(&self, domain: &Domain) -> Result { - match &self.dnssec { - Some(dnssec) => Ok(dnssec.resolv(domain).await?), - None => Err(Error::NotInitialized), + match self.dnssec.resolv(domain).await { + Ok(res) => Ok(res), + Err(e) => Err(e.into()), } } @@ -250,13 +260,22 @@ impl Fenrir { Ok(dnssec::Dnssec::parse_txt_record(&record_str)?) } - /// Connect to a service + /// Connect to a service, doing the dnssec resolution ourselves pub async fn connect( &self, domain: &Domain, service: ServiceID, ) -> Result<(), Error> { let resolved = self.resolv(domain).await?; + self.connect_resolved(resolved, domain, service).await + } + /// Connect to a service, with the user provided details + pub async fn connect_resolved( + &self, + resolved: dnssec::Record, + domain: &Domain, + service: ServiceID, + ) -> Result<(), Error> { loop { // check if we already have a connection to that auth. srv let is_reserved = { @@ -354,6 +373,54 @@ impl Fenrir { } } + async fn start_single_worker( + &mut self, + ) -> ::std::result::Result< + impl futures::Future>, + Error, + > { + let thread_idx = self._thread_work.len() as u16; + let max_threads = self.cfg.threads.unwrap().get() as u16; + if thread_idx >= max_threads { + ::tracing::error!( + "thread id higher than number of threads in config" + ); + assert!( + false, + "thread_idx is an index that can't reach cfg.threads" + ); + return Err(Error::Setup("Thread id > threads_max".to_owned())); + } + let thread_id = ThreadTracker { + id: thread_idx, + total: max_threads, + }; + let (work_send, work_recv) = ::async_channel::unbounded::(); + let worker = Worker::new_and_loop( + self.cfg.clone(), + thread_id, + self.stop_working.subscribe(), + self.token_check.clone(), + self.cfg.listen.clone(), + work_recv, + ); + loop { + let queues_lock = match Arc::get_mut(&mut self._thread_work) { + Some(queues_lock) => queues_lock, + None => { + // should not even ever happen + ::tokio::time::sleep(::std::time::Duration::from_millis( + 50, + )) + .await; + continue; + } + }; + queues_lock.push(work_send); + break; + } + Ok(worker) + } // TODO: start work on a LocalSet provided by the user /// Start one working thread for each physical cpu /// threads are pinned to each cpu core. diff --git a/src/tests.rs b/src/tests.rs new file mode 100644 index 0000000..1032012 --- /dev/null +++ b/src/tests.rs @@ -0,0 +1,126 @@ +use crate::*; + +#[::tokio::test] +async fn test_connection_dirsync() { + return; + use enc::asym::{KeyID, PrivKey, PubKey}; + let rand = enc::Random::new(); + let (priv_exchange_key, pub_exchange_key) = + match enc::asym::KeyExchangeKind::X25519DiffieHellman.new_keypair(&rand) + { + Ok((privkey, pubkey)) => { + (PrivKey::Exchange(privkey), PubKey::Exchange(pubkey)) + } + Err(_) => { + assert!(false, "Can't generate random keypair"); + return; + } + }; + let dnssec_record = Record { + public_keys: [(KeyID(42), pub_exchange_key)].to_vec(), + addresses: [record::Address { + ip: ::std::net::IpAddr::V4(::std::net::Ipv4Addr::new(127, 0, 0, 1)), + port: Some(::core::num::NonZeroU16::new(31337).unwrap()), + priority: record::AddressPriority::P1, + weight: record::AddressWeight::W1, + handshake_ids: [HandshakeID::DirectorySynchronized].to_vec(), + public_key_idx: [record::PubKeyIdx(0)].to_vec(), + }] + .to_vec(), + key_exchanges: [enc::asym::KeyExchangeKind::X25519DiffieHellman] + .to_vec(), + hkdfs: [enc::hkdf::HkdfKind::Sha3].to_vec(), + ciphers: [enc::sym::CipherKind::XChaCha20Poly1305].to_vec(), + }; + let cfg_client = { + let mut cfg = config::Config::default(); + cfg.threads = Some(::core::num::NonZeroUsize::new(1).unwrap()); + cfg + }; + let cfg_server = { + let mut cfg = cfg_client.clone(); + cfg.keys = [(KeyID(42), priv_exchange_key, pub_exchange_key)].to_vec(); + cfg + }; + + let mut server = Fenrir::new(&cfg_server).unwrap(); + let _ = server.setup_no_workers().await; + let srv_worker = server.start_single_worker().await; + + ::tokio::task::spawn_local(async move { srv_worker }); + let mut client = Fenrir::new(&cfg_client).unwrap(); + let _ = client.setup_no_workers().await; + let cli_worker = server.start_single_worker().await; + ::tokio::task::spawn_local(async move { cli_worker }); + + use crate::{ + connection::handshake::HandshakeID, + dnssec::{record, Record}, + }; + + let _ = client + .connect_resolved( + dnssec_record, + &Domain("example.com".to_owned()), + auth::SERVICEID_AUTH, + ) + .await; + + /* + let thread_id = ThreadTracker { total: 1, id: 0 }; + + let (stop_sender, _) = ::tokio::sync::broadcast::channel::(1); + + use ::std::net; + let cli_socket_addr = [net::SocketAddr::new( + net::IpAddr::V4(net::Ipv4Addr::new(127, 0, 0, 1)), + 0, + )] + .to_vec(); + let srv_socket_addr = [net::SocketAddr::new( + net::IpAddr::V4(net::Ipv4Addr::new(127, 0, 0, 1)), + 0, + )] + .to_vec(); + + let srv_sock = Arc::new(connection::socket::bind_udp(srv_socket_addr[0]) + .await + .unwrap()); + let cli_sock = Arc::new(connection::socket::bind_udp(cli_socket_addr[0]) + .await + .unwrap()); + + use crate::inner::worker::Work; + let (srv_work_send, srv_work_recv) = ::async_channel::unbounded::(); + let (cli_work_send, cli_work_recv) = ::async_channel::unbounded::(); + + let srv_queue = Arc::new([srv_work_recv.clone()].to_vec()); + let cli_queue = Arc::new([cli_work_recv.clone()].to_vec()); + + let listen_work_srv = + + + ::tokio::spawn(Fenrir::listen_udp( + stop_sender.subscribe(), + + + let _server = crate::inner::worker::Worker::new( + cfg.clone(), + thread_id, + stop_sender.subscribe(), + None, + srv_socket_addr, + srv_work_recv, + ); + let _client = crate::inner::worker::Worker::new( + cfg, + thread_id, + stop_sender.subscribe(), + None, + cli_socket_addr, + cli_work_recv, + ); + + todo!() + */ +}