Send initial dirsync packet and handshake timeout

Signed-off-by: Luca Fulchir <luca.fulchir@runesauth.com>
This commit is contained in:
Luca Fulchir 2023-06-05 10:33:25 +02:00
parent 289c6c318e
commit 3e09b9cee0
Signed by: luca.fulchir
GPG Key ID: 8F6440603D13A78E
4 changed files with 87 additions and 27 deletions

View File

@ -78,12 +78,10 @@ pub(crate) struct HandshakeServer {
} }
pub(crate) struct HandshakeClient { pub(crate) struct HandshakeClient {
pub id: KeyID,
pub key: PrivKey,
pub service_id: ServiceID, pub service_id: ServiceID,
pub service_conn_id: IDRecv, pub service_conn_id: IDRecv,
pub connection: Connection, pub connection: Connection,
pub timeout: Rc<u32>, pub timeout: Option<::tokio::task::JoinHandle<()>>,
} }
/// Tracks the keys used by the client and the handshake /// Tracks the keys used by the client and the handshake
@ -131,7 +129,7 @@ impl HandshakeClientList {
service_id: ServiceID, service_id: ServiceID,
service_conn_id: IDRecv, service_conn_id: IDRecv,
connection: Connection, connection: Connection,
) -> Result<(KeyID, &HandshakeClient), ()> { ) -> Result<(KeyID, &mut HandshakeClient), ()> {
let maybe_free_key_idx = let maybe_free_key_idx =
self.used.iter().enumerate().find_map(|(idx, bmap)| { self.used.iter().enumerate().find_map(|(idx, bmap)| {
match bmap.first_false_index() { match bmap.first_false_index() {
@ -159,18 +157,16 @@ impl HandshakeClientList {
self.keys.push(None); self.keys.push(None);
self.list.push(None); self.list.push(None);
} }
self.keys[free_key_idx] = Some((priv_key.clone(), pub_key)); self.keys[free_key_idx] = Some((priv_key, pub_key));
self.list[free_key_idx] = Some(HandshakeClient { self.list[free_key_idx] = Some(HandshakeClient {
id: KeyID(free_key_idx as u16),
key: priv_key,
service_id, service_id,
service_conn_id, service_conn_id,
connection, connection,
timeout: Rc::new(0), timeout: None,
}); });
Ok(( Ok((
KeyID(free_key_idx as u16), KeyID(free_key_idx as u16),
self.list[free_key_idx].as_ref().unwrap(), self.list[free_key_idx].as_mut().unwrap(),
)) ))
} }
} }

View File

@ -186,6 +186,17 @@ pub struct Address {
} }
impl Address { impl Address {
/// return this Address as a socket address
/// Note that since Fenrir can work on top of IP, without ports,
/// this is not guaranteed to return a SocketAddr
pub fn as_sockaddr(&self) -> Option<::std::net::SocketAddr> {
match self.port {
Some(port) => {
Some(::std::net::SocketAddr::new(self.ip, port.get()))
}
None => None,
}
}
fn raw_len(&self) -> usize { fn raw_len(&self) -> usize {
// UDP port + Priority + Weight + pubkey_len + handshake_len // UDP port + Priority + Weight + pubkey_len + handshake_len
let mut size = 6; let mut size = 6;

View File

@ -99,7 +99,7 @@ impl HandshakeTracker {
service_id: ServiceID, service_id: ServiceID,
service_conn_id: IDRecv, service_conn_id: IDRecv,
connection: Connection, connection: Connection,
) -> Result<(KeyID, &HandshakeClient), ()> { ) -> Result<(KeyID, &mut HandshakeClient), ()> {
self.hshake_cli.add( self.hshake_cli.add(
priv_key, priv_key,
pub_key, pub_key,
@ -108,6 +108,16 @@ impl HandshakeTracker {
connection, connection,
) )
} }
pub(crate) fn timeout_client(
&mut self,
key_id: KeyID,
) -> Option<[IDRecv; 2]> {
if let Some(hshake) = self.hshake_cli.remove(key_id) {
Some([hshake.connection.id_recv, hshake.service_conn_id])
} else {
None
}
}
pub(crate) fn recv_handshake( pub(crate) fn recv_handshake(
&mut self, &mut self,
mut handshake: Handshake, mut handshake: Handshake,

View File

@ -14,7 +14,7 @@ use crate::{
}, },
dnssec, dnssec,
enc::{ enc::{
asym::{PrivKey, PubKey}, asym::{KeyID, PrivKey, PubKey},
hkdf::{self, Hkdf, HkdfKind}, hkdf::{self, Hkdf, HkdfKind},
sym, Random, Secret, sym, Random, Secret,
}, },
@ -24,7 +24,7 @@ use ::std::{rc::Rc, sync::Arc, vec::Vec};
/// This worker must be cpu-pinned /// This worker must be cpu-pinned
use ::tokio::{ use ::tokio::{
net::UdpSocket, net::UdpSocket,
sync::{oneshot, Mutex}, sync::{mpsc, oneshot, Mutex},
}; };
/// Track a raw Udp packet /// Track a raw Udp packet
@ -48,6 +48,7 @@ pub(crate) enum Work {
/// connections present /// connections present
CountConnections(oneshot::Sender<usize>), CountConnections(oneshot::Sender<usize>),
Connect(ConnectInfo), Connect(ConnectInfo),
DropHandshake(KeyID),
Recv(RawUdp), Recv(RawUdp),
} }
pub(crate) enum WorkAnswer { pub(crate) enum WorkAnswer {
@ -64,6 +65,8 @@ pub(crate) struct Worker {
token_check: Option<Arc<Mutex<TokenChecker>>>, token_check: Option<Arc<Mutex<TokenChecker>>>,
sockets: Vec<UdpSocket>, sockets: Vec<UdpSocket>,
queue: ::async_channel::Receiver<Work>, queue: ::async_channel::Receiver<Work>,
queue_timeouts_recv: mpsc::UnboundedReceiver<Work>,
queue_timeouts_send: mpsc::UnboundedSender<Work>,
thread_channels: Vec<::async_channel::Sender<Work>>, thread_channels: Vec<::async_channel::Sender<Work>>,
connections: ConnList, connections: ConnList,
handshakes: HandshakeTracker, handshakes: HandshakeTracker,
@ -132,6 +135,8 @@ impl Worker {
} }
}; };
let (queue_timeouts_send, queue_timeouts_recv) =
mpsc::unbounded_channel();
Ok(Self { Ok(Self {
cfg, cfg,
thread_id, thread_id,
@ -140,6 +145,8 @@ impl Worker {
token_check, token_check,
sockets, sockets,
queue, queue,
queue_timeouts_recv,
queue_timeouts_send,
thread_channels: Vec::new(), thread_channels: Vec::new(),
connections: ConnList::new(thread_id), connections: ConnList::new(thread_id),
handshakes: HandshakeTracker::new(thread_id), handshakes: HandshakeTracker::new(thread_id),
@ -151,6 +158,12 @@ impl Worker {
_done = self.stop_working.recv() => { _done = self.stop_working.recv() => {
break; break;
} }
maybe_timeout = self.queue.recv() => {
match maybe_timeout {
Ok(work) => work,
Err(_) => break,
}
}
maybe_work = self.queue.recv() => { maybe_work = self.queue.recv() => {
match maybe_work { match maybe_work {
Ok(work) => work, Ok(work) => work,
@ -166,11 +179,13 @@ impl Worker {
Work::Connect(conn_info) => { Work::Connect(conn_info) => {
// PERF: geolocation // PERF: geolocation
// Find the first destination with a coherent // Find the first destination with:
// pubkey/key exchange // * UDP port
// * a coherent pubkey/key exchange.
let destination = let destination =
conn_info.resolved.addresses.iter().find_map(|addr| { conn_info.resolved.addresses.iter().find_map(|addr| {
if addr if addr.port.is_none()
|| addr
.handshake_ids .handshake_ids
.iter() .iter()
.find(|h_srv| { .find(|h_srv| {
@ -179,7 +194,7 @@ impl Worker {
.is_none() .is_none()
{ {
// skip servers with no corresponding // skip servers with no corresponding
// handshake types // handshake types or no udp port
return None; return None;
} }
@ -250,7 +265,6 @@ impl Worker {
} }
}; };
// FIXME: save KeyID
let (priv_key, pub_key) = let (priv_key, pub_key) =
match exchange.new_keypair(&self.rand) { match exchange.new_keypair(&self.rand) {
Ok(pair) => pair, Ok(pair) => pair,
@ -353,12 +367,34 @@ impl Worker {
continue 'mainloop; continue 'mainloop;
} }
// start timeout // send always from the first socket
// FIXME: select based on routing table
let sender = self.sockets[0].local_addr().unwrap();
let dest = UdpServer(addr.as_sockaddr().unwrap());
// send packeti // start the timeout right before sending the packet
//self.send_packet(raw, hshake.timeout = Some(::tokio::task::spawn_local(
Self::handshake_timeout(
self.queue_timeouts_send.clone(),
client_key_id,
),
));
todo!() // send packet
self.send_packet(raw, UdpClient(sender), dest).await;
continue 'mainloop;
}
Work::DropHandshake(key_id) => {
if let Some(connections) =
self.handshakes.timeout_client(key_id)
{
for conn_id in connections.into_iter() {
if !conn_id.0.is_handshake() {
self.connections.delete(conn_id);
}
}
};
} }
//TODO: reconf message to add channels //TODO: reconf message to add channels
Work::Recv(pkt) => { Work::Recv(pkt) => {
@ -367,6 +403,13 @@ impl Worker {
} }
} }
} }
async fn handshake_timeout(
timeout_queue: mpsc::UnboundedSender<Work>,
key_id: KeyID,
) {
::tokio::time::sleep(::std::time::Duration::from_secs(10)).await;
let _ = timeout_queue.send(Work::DropHandshake(key_id));
}
/// Read and do stuff with the raw udp packet /// Read and do stuff with the raw udp packet
async fn recv(&mut self, mut udp: RawUdp) { async fn recv(&mut self, mut udp: RawUdp) {
if udp.packet.id.is_handshake() { if udp.packet.id.is_handshake() {