Track new connection

Signed-off-by: Luca Fulchir <luca.fulchir@runesauth.com>
This commit is contained in:
Luca Fulchir 2023-02-25 22:25:52 +01:00
parent bfe99b4c9c
commit f53ef95c73
Signed by: luca.fulchir
GPG Key ID: 8F6440603D13A78E
4 changed files with 144 additions and 40 deletions

View File

@ -31,6 +31,7 @@ arrayref = { version = "0.3" }
async-channel = { version = "1.8" }
# base85 repo has no tags, fix on a commit. v1.1.1 points to older, wrong version
base85 = { git = "https://gitlab.com/darkwyrm/base85", rev = "d98efbfd171dd9ba48e30a5c88f94db92fc7b3c6" }
bitmaps = { version = "3.2" }
chacha20poly1305 = { version = "0.10" }
futures = { version = "0.3" }
hkdf = { version = "0.12" }

View File

@ -26,3 +26,18 @@ impl ProtocolVersion {
*out = *self as u8;
}
}
/// A single connection and its data
#[derive(Debug)]
pub struct Connection {
/// Connection ID
pub id: ID,
/// The main hkdf used for all secrets in this connection
pub hkdf: crate::enc::hkdf::HkdfSha3,
}
impl Connection {
pub(crate) fn new(id: ID, hkdf: crate::enc::hkdf::HkdfSha3) -> Self {
Self { id, hkdf }
}
}

View File

@ -17,6 +17,19 @@ impl ConnectionID {
pub fn new_handshake() -> Self {
Self::Handshake
}
/// New id from u64. PLZ NON ZERO
pub(crate) fn new_u64(raw: u64) -> Self {
#[allow(unsafe_code)]
unsafe {
ConnectionID::ID(::core::num::NonZeroU64::new_unchecked(raw))
}
}
pub(crate) fn as_u64(&self) -> u64 {
match self {
ConnectionID::Handshake => 0,
ConnectionID::ID(id) => id.get(),
}
}
/// New random service ID
pub fn new_rand(rand: &::ring::rand::SystemRandom) -> Self {
use ::ring::rand::SecureRandom;

View File

@ -20,9 +20,10 @@ pub mod dnssec;
pub mod enc;
use ::arc_swap::{ArcSwap, ArcSwapAny, ArcSwapOption};
use ::std::{net::SocketAddr, pin::Pin, sync::Arc};
use ::tokio::macros::support::Future;
use ::tokio::{net::UdpSocket, task::JoinHandle};
use ::std::{net::SocketAddr, pin::Pin, sync::Arc, vec, vec::Vec};
use ::tokio::{
macros::support::Future, net::UdpSocket, sync::RwLock, task::JoinHandle,
};
use crate::enc::{
asym,
@ -30,7 +31,10 @@ use crate::enc::{
sym::{CipherKind, CipherRecv, CipherSend},
};
pub use config::Config;
use connection::handshake::{self, Handshake, HandshakeKey};
use connection::{
handshake::{self, Handshake, HandshakeKey},
Connection,
};
/// Main fenrir library errors
#[derive(::thiserror::Error, Debug)]
@ -52,18 +56,6 @@ pub enum Error {
Key(#[from] crate::enc::Error),
}
// No async here
struct FenrirInner {
key_exchanges: ArcSwapAny<Arc<Vec<(asym::Key, asym::KeyExchange)>>>,
ciphers: ArcSwapAny<Arc<Vec<CipherKind>>>,
keys: ArcSwapAny<Arc<Vec<HandshakeKey>>>,
}
#[allow(unsafe_code)]
unsafe impl Send for FenrirInner {}
#[allow(unsafe_code)]
unsafe impl Sync for FenrirInner {}
/// Information needed to reply after the key exchange
#[derive(Debug, Clone)]
pub struct AuthNeededInfo {
@ -83,6 +75,17 @@ pub enum HandshakeAction {
/// Packet parsed, now go perform authentication
AuthNeeded(AuthNeededInfo),
}
// No async here
struct FenrirInner {
key_exchanges: ArcSwapAny<Arc<Vec<(asym::Key, asym::KeyExchange)>>>,
ciphers: ArcSwapAny<Arc<Vec<CipherKind>>>,
keys: ArcSwapAny<Arc<Vec<HandshakeKey>>>,
}
#[allow(unsafe_code)]
unsafe impl Send for FenrirInner {}
#[allow(unsafe_code)]
unsafe impl Sync for FenrirInner {}
// No async here
impl FenrirInner {
@ -215,17 +218,20 @@ impl SocketList {
socket: Arc<UdpSocket>,
handle: JoinHandle<::std::io::Result<()>>,
) {
let mut new_list;
{
let old_list = self.list.load();
new_list = Arc::new(Vec::with_capacity(old_list.len() + 1));
// we could simplify this into just a `.swap` instead of `.rcu` but
// it is not yet guaranteed that only one thread will call this fn
// ...we don't need performance here anyway
let arc_handle = Arc::new(handle);
self.list.rcu(|old_list| {
let mut new_list = Arc::new(Vec::with_capacity(old_list.len() + 1));
new_list = old_list.to_vec().into();
}
Arc::get_mut(&mut new_list)
.unwrap()
.push((socket, Arc::new(handle)));
self.list.swap(new_list);
Arc::get_mut(&mut new_list)
.unwrap()
.push((socket.clone(), arc_handle.clone()));
new_list
});
}
/// This method assumes no other `add_sockets` are being run
async fn stop_all(mut self) {
let mut arc_list = self.list.into_inner();
let list = loop {
@ -282,6 +288,61 @@ enum Work {
Recv(RawUdp),
}
// PERF: Arc<RwLock<ConnList>> loks a bit too much, need to find
// faster ways to do this
struct ConnList {
connections: Vec<Option<Arc<Connection>>>,
/// Bitmap to track which connection ids are used or free
ids_used: Vec<::bitmaps::Bitmap<1024>>,
}
impl ConnList {
fn new() -> Self {
let mut bitmap_id = ::bitmaps::Bitmap::<1024>::new();
bitmap_id.set(0, true); // ID(0) == handshake
Self {
connections: Vec::with_capacity(128),
ids_used: vec![bitmap_id],
}
}
fn reserve_first(&mut self, hkdf: HkdfSha3) -> Arc<Connection> {
// uhm... bad things are going on here:
// * id must be initialized, but only because:
// * rust does not understand that after the `!found` id is always
// initialized
// * `ID::new_u64` is really safe only with >0, but here it always is
// ...we should probably rewrite it in better, safer rust
let mut id: u64 = 0;
let mut found = false;
for (i, b) in self.ids_used.iter_mut().enumerate() {
match b.first_false_index() {
Some(idx) => {
b.set(idx, true);
id = ((i as u64) * 1024) + (idx as u64);
found = true;
break;
}
None => {}
}
}
if !found {
let mut new_bitmap = ::bitmaps::Bitmap::<1024>::new();
new_bitmap.set(0, true);
id = (self.ids_used.len() as u64) * 1024;
self.ids_used.push(new_bitmap);
}
let new_id = connection::ID::new_u64(id);
let new_conn = Arc::new(connection::Connection::new(new_id, hkdf));
if (self.connections.len() as u64) < id {
self.connections.push(Some(new_conn.clone()));
} else {
// very probably redundant
self.connections[id as usize] = Some(new_conn.clone());
}
new_conn
}
}
/// Instance of a fenrir endpoint
#[allow(missing_copy_implementations, missing_debug_implementations)]
pub struct Fenrir {
@ -303,6 +364,8 @@ pub struct Fenrir {
work_recv: Arc<::async_channel::Receiver<Work>>,
// PERF: rand uses syscalls. should we do that async?
rand: ::ring::rand::SystemRandom,
/// list of Established connections
connections: Arc<RwLock<ConnList>>,
}
// TODO: graceful vs immediate stop
@ -333,6 +396,7 @@ impl Fenrir {
work_send: Arc::new(work_send),
work_recv: Arc::new(work_recv),
rand: ::ring::rand::SystemRandom::new(),
connections: Arc::new(RwLock::new(ConnList::new())),
};
Ok(endpoint)
}
@ -389,7 +453,7 @@ impl Fenrir {
/// Add all UDP sockets found in config
/// and start listening for packets
async fn add_sockets(&mut self) -> ::std::io::Result<()> {
async fn add_sockets(&self) -> ::std::io::Result<()> {
let sockets = self.cfg.listen.iter().map(|s_addr| async {
let socket =
::tokio::spawn(Self::bind_udp(s_addr.clone())).await??;
@ -589,20 +653,28 @@ impl Fenrir {
// connection ID
let srv_conn_id =
connection::ID::new_rand(&self.rand);
let auth_conn_id =
connection::ID::new_rand(&self.rand);
let srv_secret =
enc::sym::Secret::new_rand(&self.rand);
// track connection
let auth_conn = {
let mut lock =
self.connections.write().await;
lock.reserve_first(authinfo.hkdf)
};
// TODO: move all the next bits into
// dirsync::Req::respond(...)
let resp_data = dirsync::RespData {
client_nonce: req_data.nonce,
id: auth_conn_id,
id: auth_conn.id,
service_id: srv_conn_id,
service_key: srv_secret,
};
// build response
let secret_send =
authinfo.hkdf.get_secret(b"to_client");
auth_conn.hkdf.get_secret(b"to_client");
let mut cipher_send = CipherSend::new(
authinfo.cipher,
secret_send,
@ -654,16 +726,19 @@ impl Fenrir {
client: UdpClient,
server: UdpServer,
) {
let sockets = self.sockets.lock();
let src_sock = match sockets.find(server) {
Some(src_sock) => src_sock,
None => {
::tracing::error!(
"Can't send packet: Server changed listening ip!"
);
return;
}
};
let src_sock;
{
let sockets = self.sockets.lock();
src_sock = match sockets.find(server) {
Some(src_sock) => src_sock,
None => {
::tracing::error!(
"Can't send packet: Server changed listening ip!"
);
return;
}
};
}
src_sock.send_to(&data, client.0);
}
}