From 9ca4123c3723a144fad22719fde3a200a26fff67 Mon Sep 17 00:00:00 2001 From: Luca Fulchir Date: Wed, 28 Jun 2023 18:49:33 +0200 Subject: [PATCH] Review conn tracking, data reporting Signed-off-by: Luca Fulchir --- TODO | 3 ++ src/connection/mod.rs | 70 +++++++++++++++++++++----------- src/connection/stream/mod.rs | 22 +++++++++- src/connection/stream/rob/mod.rs | 28 ++++++++----- src/inner/worker.rs | 43 +++++++++++++------- src/lib.rs | 13 +++++- 6 files changed, 127 insertions(+), 52 deletions(-) diff --git a/TODO b/TODO index 9531367..85a5331 100644 --- a/TODO +++ b/TODO @@ -1 +1,4 @@ * Wrapping for everything that wraps (sigh) +* track user connection (add u64 from user) +* split API in LocalThread and ThreadSafe + * split send/recv API in Centralized, Decentralized diff --git a/src/connection/mod.rs b/src/connection/mod.rs index dbe62e1..07af86e 100644 --- a/src/connection/mod.rs +++ b/src/connection/mod.rs @@ -14,7 +14,7 @@ use ::std::{ pub use crate::connection::{handshake::Handshake, packet::Packet}; use crate::{ - connection::socket::UdpClient, + connection::{socket::UdpClient, stream::StreamData}, dnssec, enc::{ self, @@ -141,28 +141,32 @@ impl ProtocolVersion { } } +/// Unique tracker of connections #[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)] -pub(crate) struct UserConnTracker(Wrapping); -impl UserConnTracker { - fn advance(&mut self) -> Self { +pub struct ConnTracker(Wrapping); +impl ConnTracker { + pub(crate) fn new(start: u16) -> Self { + Self(Wrapping(start as u64)) + } + pub(crate) fn advance(&mut self, amount: u16) -> Self { let old = self.0; - self.0 = self.0 + Wrapping(1); - UserConnTracker(old) + self.0 = self.0 + Wrapping(amount as u64); + ConnTracker(old) } } /// Connection to an Authentication Server #[derive(Debug)] -pub struct AuthSrvConn(pub(crate) Conn); +pub struct AuthSrvConn(pub Conn); /// Connection to a service #[derive(Debug)] -pub struct ServiceConn(pub(crate) Conn); +pub struct ServiceConn(pub Conn); /// The connection, as seen from a user of libFenrir #[derive(Debug)] pub struct Conn { pub(crate) queue: ::async_channel::Sender, - pub(crate) conn: UserConnTracker, + pub(crate) fast: ConnTracker, } impl Conn { @@ -172,9 +176,13 @@ impl Conn { use crate::inner::worker::Work; let _ = self .queue - .send(Work::UserSend((self.conn, stream, data))) + .send(Work::UserSend((self.tracker(), stream, data))) .await; } + /// Get the library tracking id + pub fn tracker(&self) -> ConnTracker { + self.fast + } } /// Role: track the connection direction @@ -205,6 +213,10 @@ pub(crate) enum Enqueue { Immediate(::tokio::time::Instant), } +/// Connection tracking id. Set by the user +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Copy, Clone)] +pub struct UserTracker(pub ::core::num::NonZeroU64); + /// A single connection and its data #[derive(Debug)] pub(crate) struct Connection { @@ -212,6 +224,9 @@ pub(crate) struct Connection { pub(crate) id_recv: IDRecv, /// Sending Conn ID pub(crate) id_send: IDSend, + /// User-managed id to track this connection + /// the user can set this to better track this connection + pub(crate) user_tracker: Option, /// Sending address pub(crate) send_addr: UdpClient, /// The main hkdf used for all secrets in this connection @@ -251,6 +266,8 @@ impl Connection { Self { id_recv: IDRecv(ID::Handshake), id_send: IDSend(ID::Handshake), + user_tracker: None, + // will be overwritten send_addr: UdpClient(SocketAddr::new( IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 31337, @@ -265,7 +282,10 @@ impl Connection { recv_queue: BTreeMap::new(), } } - pub(crate) fn recv(&mut self, mut udp: crate::RawUdp) -> Result<(), Error> { + pub(crate) fn recv( + &mut self, + mut udp: crate::RawUdp, + ) -> Result { let mut data = &mut udp.data[ID::len()..]; let aad = enc::sym::AAD(&[]); self.cipher_recv.decrypt(aad, &mut data)?; @@ -285,18 +305,22 @@ impl Connection { break; } } + let mut data_ready = StreamData::NotReady; for chunk in chunks.into_iter() { - let stream = match self.recv_queue.get_mut(&chunk.id) { + let stream_id = chunk.id; + let stream = match self.recv_queue.get_mut(&stream_id) { Some(stream) => stream, None => { ::tracing::debug!("Ignoring chunk for unknown stream::ID"); continue; } }; - stream.recv(chunk); + match stream.recv(chunk) { + Ok(status) => data_ready = data_ready | status, + Err(e) => ::tracing::debug!("stream: {:?}: {:?}", stream_id, e), + } } - // FIXME: report if we need to return data to the user - Ok(()) + Ok(data_ready) } pub(crate) fn enqueue( &mut self, @@ -402,8 +426,8 @@ impl Connection { pub(crate) struct ConnList { thread_id: ThreadTracker, connections: Vec>, - user_tracker: BTreeMap, - last_tracked: UserConnTracker, + user_tracker: BTreeMap, + last_tracked: ConnTracker, /// Bitmap to track which connection ids are used or free ids_used: Vec<::bitmaps::Bitmap<1024>>, } @@ -420,7 +444,7 @@ impl ConnList { thread_id, connections: Vec::with_capacity(INITIAL_CAP), user_tracker: BTreeMap::new(), - last_tracked: UserConnTracker(Wrapping(0)), + last_tracked: ConnTracker(Wrapping(0)), ids_used: vec![bitmap_id], }; ret.connections.resize_with(INITIAL_CAP, || None); @@ -437,10 +461,7 @@ impl ConnList { (conn_id.get() / (self.thread_id.total as u64)) as usize; (&mut self.connections[id_in_thread]).into() } - pub fn get_mut( - &mut self, - tracker: UserConnTracker, - ) -> Option<&mut Connection> { + pub fn get_mut(&mut self, tracker: ConnTracker) -> Option<&mut Connection> { let idx = if let Some(idx) = self.user_tracker.get(&tracker) { *idx } else { @@ -504,7 +525,7 @@ impl ConnList { pub(crate) fn track( &mut self, conn: Connection, - ) -> Result { + ) -> Result { let conn_id = match conn.id_recv { IDRecv(ID::Handshake) => { return Err(()); @@ -516,8 +537,9 @@ impl ConnList { self.connections[id_in_thread] = Some(conn); let mut tracked; loop { - tracked = self.last_tracked.advance(); + tracked = self.last_tracked.advance(self.thread_id.total); if self.user_tracker.get(&tracked).is_none() { + // like, never gonna happen, it's 64 bit let _ = self.user_tracker.insert(tracked, id_in_thread); break; } diff --git a/src/connection/stream/mod.rs b/src/connection/stream/mod.rs index 96971cc..425cbd8 100644 --- a/src/connection/stream/mod.rs +++ b/src/connection/stream/mod.rs @@ -238,6 +238,26 @@ impl Tracker { } } +#[derive(Debug, Eq, PartialEq)] +pub(crate) enum StreamData { + /// not enough data to return somthing to the user + NotReady = 0, + /// we can return something to the user + Ready, +} +impl ::core::ops::BitOr for StreamData { + type Output = Self; + + // Required method + fn bitor(self, other: Self) -> Self::Output { + if self == StreamData::Ready || other == StreamData::Ready { + StreamData::Ready + } else { + StreamData::NotReady + } + } +} + /// Actual stream-tracking structure #[derive(Debug, Clone)] pub(crate) struct Stream { @@ -254,7 +274,7 @@ impl Stream { data: Tracker::new(kind, rand), } } - pub(crate) fn recv(&mut self, chunk: Chunk) -> Result<(), Error> { + pub(crate) fn recv(&mut self, chunk: Chunk) -> Result { match &mut self.data { Tracker::ROB(tracker) => tracker.recv(chunk), } diff --git a/src/connection/stream/rob/mod.rs b/src/connection/stream/rob/mod.rs index 1bfd159..21361bb 100644 --- a/src/connection/stream/rob/mod.rs +++ b/src/connection/stream/rob/mod.rs @@ -2,7 +2,9 @@ //! AKA: TCP-like use crate::{ - connection::stream::{Chunk, Error, Sequence, SequenceEnd, SequenceStart}, + connection::stream::{ + Chunk, Error, Sequence, SequenceEnd, SequenceStart, StreamData, + }, enc::Random, }; @@ -93,7 +95,7 @@ impl ReliableOrderedBytestream { ((self.pivot as usize + data_len) % self.data.len()) as u32; ret } - pub(crate) fn recv(&mut self, chunk: Chunk) -> Result<(), Error> { + pub(crate) fn recv(&mut self, chunk: Chunk) -> Result { if !chunk .sequence .is_between(self.window_start, self.window_end) @@ -106,7 +108,7 @@ impl ReliableOrderedBytestream { chunk.data.len(), ); if maxlen == 0 { - // or empty chunk, but we don't care + // empty window or empty chunk, but we don't care return Err(Error::OutOfWindow); } // translate Sequences to offsets in self.data @@ -119,7 +121,7 @@ impl ReliableOrderedBytestream { let mut copy_ranges = Vec::new(); let mut to_delete = Vec::new(); let mut to_add = Vec::new(); - // note: te included ranges are (INCLUSIVE, INCLUSIVE) + // note: the ranges are (INCLUSIVE, INCLUSIVE) for (idx, el) in self.missing.iter_mut().enumerate() { let missing_from = self.window_start.offset(el.0); if missing_from > offset_end { @@ -146,7 +148,6 @@ impl ReliableOrderedBytestream { if missing_to > offset_end { // [..chunk..] // [....missing....] - // chunk is in the middle of a missing fragment to_add.push(( el.0.plus_u32(((offset_end - missing_from) + 1) as u32), el.1, @@ -156,16 +157,12 @@ impl ReliableOrderedBytestream { } else if offset <= missing_to { // [....chunk....] // [...missing...] - // chunk copy_ranges.push((offset, (missing_to - 0))); el.1 = el.0.plus_u32(((offset_end - missing_from) - 1) as u32); } } } - self.missing.append(&mut to_add); - self.missing - .sort_by(|(from_a, _), (from_b, _)| from_a.0 .0.cmp(&from_b.0 .0)); { let mut deleted = 0; for idx in to_delete.into_iter() { @@ -173,6 +170,10 @@ impl ReliableOrderedBytestream { deleted = deleted + 1; } } + self.missing.append(&mut to_add); + self.missing + .sort_by(|(from_a, _), (from_b, _)| from_a.0 .0.cmp(&from_b.0 .0)); + // copy only the missing data let (first, second) = self.data[..].split_at_mut(self.pivot as usize); for (from, to) in copy_ranges.into_iter() { @@ -198,7 +199,12 @@ impl ReliableOrderedBytestream { .copy_from_slice(&data[data_from..data_to]); } } - - Ok(()) + if self.missing.len() == 0 + || self.window_start.offset(self.missing[0].0) == 0 + { + Ok(StreamData::Ready) + } else { + Ok(StreamData::NotReady) + } } } diff --git a/src/inner/worker.rs b/src/inner/worker.rs index 725578d..c71b6f2 100644 --- a/src/inner/worker.rs +++ b/src/inner/worker.rs @@ -11,8 +11,8 @@ use crate::{ }, packet::{self, Packet}, socket::{UdpClient, UdpServer}, - stream, AuthSrvConn, ConnList, Connection, IDSend, ServiceConn, - UserConnTracker, + stream, AuthSrvConn, ConnList, ConnTracker, Connection, IDSend, + ServiceConn, }, dnssec, enc::{ @@ -46,6 +46,16 @@ pub(crate) struct ConnectInfo { // TODO: UserID, Token information } +/// Connection event. Mostly used to give the data to the user +#[derive(Debug, Eq, PartialEq, Clone)] +#[non_exhaustive] +pub enum Event { + /// Work loop has exited. nothing more to do + End, + /// Data from a connection + Data(Vec), +} + pub(crate) enum Work { /// ask the thread to report to the main thread the total number of /// connections present @@ -53,8 +63,8 @@ pub(crate) enum Work { Connect(ConnectInfo), DropHandshake(KeyID), Recv(RawUdp), - UserSend((UserConnTracker, stream::ID, Vec)), - SendData((UserConnTracker, ::tokio::time::Instant)), + UserSend((ConnTracker, stream::ID, Vec)), + SendData((ConnTracker, ::tokio::time::Instant)), } /// Actual worker implementation. @@ -136,7 +146,7 @@ impl Worker { } /// Continuously loop and process work as needed - pub async fn work_loop(&mut self) { + pub async fn work_loop(&mut self) -> Result { 'mainloop: loop { let next_timer = self.work_timers.get_next(); ::tokio::pin!(next_timer); @@ -436,7 +446,7 @@ impl Worker { } Work::UserSend((tracker, stream, data)) => { let conn = match self.connections.get_mut(tracker) { - None => return, + None => continue, Some(conn) => conn, }; use connection::Enqueue; @@ -468,21 +478,21 @@ impl Worker { .queue_sender .send(Work::SendData((tracker, instant))) .await; - return; + continue; } let mut raw: Vec = Vec::with_capacity(1200); raw.resize(raw.capacity(), 0); let conn = match self.connections.get_mut(tracker) { - None => return, + None => continue, Some(conn) => conn, }; let pkt = match conn.write_pkt(&mut raw) { Ok(pkt) => pkt, - Err(enc::Error::NotEnoughData(0)) => return, + Err(enc::Error::NotEnoughData(0)) => continue, Err(e) => { ::tracing::error!("Packet generation: {:?}", e); - return; + continue; } }; let dest = conn.send_addr; @@ -493,6 +503,7 @@ impl Worker { } } } + Ok(Event::End) } /// Read and do stuff with the raw udp packet async fn recv(&mut self, mut udp: RawUdp) { @@ -527,8 +538,12 @@ impl Worker { None => return, Some(conn) => conn, }; - if let Err(e) = conn.recv(udp) { - ::tracing::trace!("Conn Recv: {:?}", e.to_string()); + match conn.recv(udp) { + Ok(stream::StreamData::NotReady) => {} + Ok(stream::StreamData::Ready) => { + // + } + Err(e) => ::tracing::trace!("Conn Recv: {:?}", e.to_string()), } } /// Receive an handshake packet @@ -693,7 +708,7 @@ impl Worker { }; let authsrv_conn = AuthSrvConn(connection::Conn { queue: self.queue_sender.clone(), - conn: track_auth_conn, + fast: track_auth_conn, }); let mut service_conn = None; if cci.service_id != auth::SERVICEID_AUTH { @@ -735,7 +750,7 @@ impl Worker { }; service_conn = Some(ServiceConn(connection::Conn { queue: self.queue_sender.clone(), - conn: track_serv_conn, + fast: track_serv_conn, })); } let _ = cci.answer.send(Ok(handshake::tracker::ConnectOk { diff --git a/src/lib.rs b/src/lib.rs index 8ac887a..61af27a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,7 +34,7 @@ use crate::{ AuthServerConnections, Packet, }, inner::{ - worker::{ConnectInfo, RawUdp, Work, Worker}, + worker::{ConnectInfo, Event, RawUdp, Work, Worker}, ThreadTracker, }, }; @@ -638,7 +638,16 @@ impl Fenrir { Ok(worker) => worker, Err(_) => return, }; - worker.work_loop().await + loop { + match worker.work_loop().await { + Ok(_) => continue, + Ok(Event::End) => break, + Err(e) => { + ::tracing::error!("Worker: {:?}", e); + break; + } + } + } }); }); loop {