From 3e09b9cee0417ea6525612d57b2f6e5b95ab63ce Mon Sep 17 00:00:00 2001 From: Luca Fulchir Date: Mon, 5 Jun 2023 10:33:25 +0200 Subject: [PATCH] Send initial dirsync packet and handshake timeout Signed-off-by: Luca Fulchir --- src/connection/handshake/mod.rs | 14 +++--- src/dnssec/record.rs | 11 +++++ src/inner/mod.rs | 12 ++++- src/inner/worker.rs | 77 +++++++++++++++++++++++++-------- 4 files changed, 87 insertions(+), 27 deletions(-) diff --git a/src/connection/handshake/mod.rs b/src/connection/handshake/mod.rs index 273b57e..427bff4 100644 --- a/src/connection/handshake/mod.rs +++ b/src/connection/handshake/mod.rs @@ -78,12 +78,10 @@ pub(crate) struct HandshakeServer { } pub(crate) struct HandshakeClient { - pub id: KeyID, - pub key: PrivKey, pub service_id: ServiceID, pub service_conn_id: IDRecv, pub connection: Connection, - pub timeout: Rc, + pub timeout: Option<::tokio::task::JoinHandle<()>>, } /// Tracks the keys used by the client and the handshake @@ -131,7 +129,7 @@ impl HandshakeClientList { service_id: ServiceID, service_conn_id: IDRecv, connection: Connection, - ) -> Result<(KeyID, &HandshakeClient), ()> { + ) -> Result<(KeyID, &mut HandshakeClient), ()> { let maybe_free_key_idx = self.used.iter().enumerate().find_map(|(idx, bmap)| { match bmap.first_false_index() { @@ -159,18 +157,16 @@ impl HandshakeClientList { self.keys.push(None); self.list.push(None); } - self.keys[free_key_idx] = Some((priv_key.clone(), pub_key)); + self.keys[free_key_idx] = Some((priv_key, pub_key)); self.list[free_key_idx] = Some(HandshakeClient { - id: KeyID(free_key_idx as u16), - key: priv_key, service_id, service_conn_id, connection, - timeout: Rc::new(0), + timeout: None, }); Ok(( KeyID(free_key_idx as u16), - self.list[free_key_idx].as_ref().unwrap(), + self.list[free_key_idx].as_mut().unwrap(), )) } } diff --git a/src/dnssec/record.rs b/src/dnssec/record.rs index 35b7489..ab5065b 100644 --- a/src/dnssec/record.rs +++ b/src/dnssec/record.rs @@ -186,6 +186,17 @@ pub struct Address { } impl Address { + /// return this Address as a socket address + /// Note that since Fenrir can work on top of IP, without ports, + /// this is not guaranteed to return a SocketAddr + pub fn as_sockaddr(&self) -> Option<::std::net::SocketAddr> { + match self.port { + Some(port) => { + Some(::std::net::SocketAddr::new(self.ip, port.get())) + } + None => None, + } + } fn raw_len(&self) -> usize { // UDP port + Priority + Weight + pubkey_len + handshake_len let mut size = 6; diff --git a/src/inner/mod.rs b/src/inner/mod.rs index 147984f..67add2a 100644 --- a/src/inner/mod.rs +++ b/src/inner/mod.rs @@ -99,7 +99,7 @@ impl HandshakeTracker { service_id: ServiceID, service_conn_id: IDRecv, connection: Connection, - ) -> Result<(KeyID, &HandshakeClient), ()> { + ) -> Result<(KeyID, &mut HandshakeClient), ()> { self.hshake_cli.add( priv_key, pub_key, @@ -108,6 +108,16 @@ impl HandshakeTracker { connection, ) } + pub(crate) fn timeout_client( + &mut self, + key_id: KeyID, + ) -> Option<[IDRecv; 2]> { + if let Some(hshake) = self.hshake_cli.remove(key_id) { + Some([hshake.connection.id_recv, hshake.service_conn_id]) + } else { + None + } + } pub(crate) fn recv_handshake( &mut self, mut handshake: Handshake, diff --git a/src/inner/worker.rs b/src/inner/worker.rs index 3210229..642d9d2 100644 --- a/src/inner/worker.rs +++ b/src/inner/worker.rs @@ -14,7 +14,7 @@ use crate::{ }, dnssec, enc::{ - asym::{PrivKey, PubKey}, + asym::{KeyID, PrivKey, PubKey}, hkdf::{self, Hkdf, HkdfKind}, sym, Random, Secret, }, @@ -24,7 +24,7 @@ use ::std::{rc::Rc, sync::Arc, vec::Vec}; /// This worker must be cpu-pinned use ::tokio::{ net::UdpSocket, - sync::{oneshot, Mutex}, + sync::{mpsc, oneshot, Mutex}, }; /// Track a raw Udp packet @@ -48,6 +48,7 @@ pub(crate) enum Work { /// connections present CountConnections(oneshot::Sender), Connect(ConnectInfo), + DropHandshake(KeyID), Recv(RawUdp), } pub(crate) enum WorkAnswer { @@ -64,6 +65,8 @@ pub(crate) struct Worker { token_check: Option>>, sockets: Vec, queue: ::async_channel::Receiver, + queue_timeouts_recv: mpsc::UnboundedReceiver, + queue_timeouts_send: mpsc::UnboundedSender, thread_channels: Vec<::async_channel::Sender>, connections: ConnList, handshakes: HandshakeTracker, @@ -132,6 +135,8 @@ impl Worker { } }; + let (queue_timeouts_send, queue_timeouts_recv) = + mpsc::unbounded_channel(); Ok(Self { cfg, thread_id, @@ -140,6 +145,8 @@ impl Worker { token_check, sockets, queue, + queue_timeouts_recv, + queue_timeouts_send, thread_channels: Vec::new(), connections: ConnList::new(thread_id), handshakes: HandshakeTracker::new(thread_id), @@ -151,6 +158,12 @@ impl Worker { _done = self.stop_working.recv() => { break; } + maybe_timeout = self.queue.recv() => { + match maybe_timeout { + Ok(work) => work, + Err(_) => break, + } + } maybe_work = self.queue.recv() => { match maybe_work { Ok(work) => work, @@ -166,20 +179,22 @@ impl Worker { Work::Connect(conn_info) => { // PERF: geolocation - // Find the first destination with a coherent - // pubkey/key exchange + // Find the first destination with: + // * UDP port + // * a coherent pubkey/key exchange. let destination = conn_info.resolved.addresses.iter().find_map(|addr| { - if addr - .handshake_ids - .iter() - .find(|h_srv| { - self.cfg.handshakes.contains(h_srv) - }) - .is_none() + if addr.port.is_none() + || addr + .handshake_ids + .iter() + .find(|h_srv| { + self.cfg.handshakes.contains(h_srv) + }) + .is_none() { // skip servers with no corresponding - // handshake types + // handshake types or no udp port return None; } @@ -250,7 +265,6 @@ impl Worker { } }; - // FIXME: save KeyID let (priv_key, pub_key) = match exchange.new_keypair(&self.rand) { Ok(pair) => pair, @@ -353,12 +367,34 @@ impl Worker { continue 'mainloop; } - // start timeout + // send always from the first socket + // FIXME: select based on routing table + let sender = self.sockets[0].local_addr().unwrap(); + let dest = UdpServer(addr.as_sockaddr().unwrap()); - // send packeti - //self.send_packet(raw, + // start the timeout right before sending the packet + hshake.timeout = Some(::tokio::task::spawn_local( + Self::handshake_timeout( + self.queue_timeouts_send.clone(), + client_key_id, + ), + )); - todo!() + // send packet + self.send_packet(raw, UdpClient(sender), dest).await; + + continue 'mainloop; + } + Work::DropHandshake(key_id) => { + if let Some(connections) = + self.handshakes.timeout_client(key_id) + { + for conn_id in connections.into_iter() { + if !conn_id.0.is_handshake() { + self.connections.delete(conn_id); + } + } + }; } //TODO: reconf message to add channels Work::Recv(pkt) => { @@ -367,6 +403,13 @@ impl Worker { } } } + async fn handshake_timeout( + timeout_queue: mpsc::UnboundedSender, + key_id: KeyID, + ) { + ::tokio::time::sleep(::std::time::Duration::from_secs(10)).await; + let _ = timeout_queue.send(Work::DropHandshake(key_id)); + } /// Read and do stuff with the raw udp packet async fn recv(&mut self, mut udp: RawUdp) { if udp.packet.id.is_handshake() {