WIP: Actual send and receive data #5

Draft
luca.fulchir wants to merge 6 commits from transport into main
6 changed files with 127 additions and 52 deletions
Showing only changes of commit 9ca4123c37 - Show all commits

3
TODO
View File

@ -1 +1,4 @@
* Wrapping for everything that wraps (sigh)
* track user connection (add u64 from user)
* split API in LocalThread and ThreadSafe
* split send/recv API in Centralized, Decentralized

View File

@ -14,7 +14,7 @@ use ::std::{
pub use crate::connection::{handshake::Handshake, packet::Packet};
use crate::{
connection::socket::UdpClient,
connection::{socket::UdpClient, stream::StreamData},
dnssec,
enc::{
self,
@ -141,28 +141,32 @@ impl ProtocolVersion {
}
}
/// Unique tracker of connections
#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
pub(crate) struct UserConnTracker(Wrapping<usize>);
impl UserConnTracker {
fn advance(&mut self) -> Self {
pub struct ConnTracker(Wrapping<u64>);
impl ConnTracker {
pub(crate) fn new(start: u16) -> Self {
Self(Wrapping(start as u64))
}
pub(crate) fn advance(&mut self, amount: u16) -> Self {
let old = self.0;
self.0 = self.0 + Wrapping(1);
UserConnTracker(old)
self.0 = self.0 + Wrapping(amount as u64);
ConnTracker(old)
}
}
/// Connection to an Authentication Server
#[derive(Debug)]
pub struct AuthSrvConn(pub(crate) Conn);
pub struct AuthSrvConn(pub Conn);
/// Connection to a service
#[derive(Debug)]
pub struct ServiceConn(pub(crate) Conn);
pub struct ServiceConn(pub Conn);
/// The connection, as seen from a user of libFenrir
#[derive(Debug)]
pub struct Conn {
pub(crate) queue: ::async_channel::Sender<worker::Work>,
pub(crate) conn: UserConnTracker,
pub(crate) fast: ConnTracker,
}
impl Conn {
@ -172,9 +176,13 @@ impl Conn {
use crate::inner::worker::Work;
let _ = self
.queue
.send(Work::UserSend((self.conn, stream, data)))
.send(Work::UserSend((self.tracker(), stream, data)))
.await;
}
/// Get the library tracking id
pub fn tracker(&self) -> ConnTracker {
self.fast
}
}
/// Role: track the connection direction
@ -205,6 +213,10 @@ pub(crate) enum Enqueue {
Immediate(::tokio::time::Instant),
}
/// Connection tracking id. Set by the user
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Copy, Clone)]
pub struct UserTracker(pub ::core::num::NonZeroU64);
/// A single connection and its data
#[derive(Debug)]
pub(crate) struct Connection {
@ -212,6 +224,9 @@ pub(crate) struct Connection {
pub(crate) id_recv: IDRecv,
/// Sending Conn ID
pub(crate) id_send: IDSend,
/// User-managed id to track this connection
/// the user can set this to better track this connection
pub(crate) user_tracker: Option<UserTracker>,
/// Sending address
pub(crate) send_addr: UdpClient,
/// The main hkdf used for all secrets in this connection
@ -251,6 +266,8 @@ impl Connection {
Self {
id_recv: IDRecv(ID::Handshake),
id_send: IDSend(ID::Handshake),
user_tracker: None,
// will be overwritten
send_addr: UdpClient(SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
31337,
@ -265,7 +282,10 @@ impl Connection {
recv_queue: BTreeMap::new(),
}
}
pub(crate) fn recv(&mut self, mut udp: crate::RawUdp) -> Result<(), Error> {
pub(crate) fn recv(
&mut self,
mut udp: crate::RawUdp,
) -> Result<StreamData, Error> {
let mut data = &mut udp.data[ID::len()..];
let aad = enc::sym::AAD(&[]);
self.cipher_recv.decrypt(aad, &mut data)?;
@ -285,18 +305,22 @@ impl Connection {
break;
}
}
let mut data_ready = StreamData::NotReady;
for chunk in chunks.into_iter() {
let stream = match self.recv_queue.get_mut(&chunk.id) {
let stream_id = chunk.id;
let stream = match self.recv_queue.get_mut(&stream_id) {
Some(stream) => stream,
None => {
::tracing::debug!("Ignoring chunk for unknown stream::ID");
continue;
}
};
stream.recv(chunk);
match stream.recv(chunk) {
Ok(status) => data_ready = data_ready | status,
Err(e) => ::tracing::debug!("stream: {:?}: {:?}", stream_id, e),
}
// FIXME: report if we need to return data to the user
Ok(())
}
Ok(data_ready)
}
pub(crate) fn enqueue(
&mut self,
@ -402,8 +426,8 @@ impl Connection {
pub(crate) struct ConnList {
thread_id: ThreadTracker,
connections: Vec<Option<Connection>>,
user_tracker: BTreeMap<UserConnTracker, usize>,
last_tracked: UserConnTracker,
user_tracker: BTreeMap<ConnTracker, usize>,
last_tracked: ConnTracker,
/// Bitmap to track which connection ids are used or free
ids_used: Vec<::bitmaps::Bitmap<1024>>,
}
@ -420,7 +444,7 @@ impl ConnList {
thread_id,
connections: Vec::with_capacity(INITIAL_CAP),
user_tracker: BTreeMap::new(),
last_tracked: UserConnTracker(Wrapping(0)),
last_tracked: ConnTracker(Wrapping(0)),
ids_used: vec![bitmap_id],
};
ret.connections.resize_with(INITIAL_CAP, || None);
@ -437,10 +461,7 @@ impl ConnList {
(conn_id.get() / (self.thread_id.total as u64)) as usize;
(&mut self.connections[id_in_thread]).into()
}
pub fn get_mut(
&mut self,
tracker: UserConnTracker,
) -> Option<&mut Connection> {
pub fn get_mut(&mut self, tracker: ConnTracker) -> Option<&mut Connection> {
let idx = if let Some(idx) = self.user_tracker.get(&tracker) {
*idx
} else {
@ -504,7 +525,7 @@ impl ConnList {
pub(crate) fn track(
&mut self,
conn: Connection,
) -> Result<UserConnTracker, ()> {
) -> Result<ConnTracker, ()> {
let conn_id = match conn.id_recv {
IDRecv(ID::Handshake) => {
return Err(());
@ -516,8 +537,9 @@ impl ConnList {
self.connections[id_in_thread] = Some(conn);
let mut tracked;
loop {
tracked = self.last_tracked.advance();
tracked = self.last_tracked.advance(self.thread_id.total);
if self.user_tracker.get(&tracked).is_none() {
// like, never gonna happen, it's 64 bit
let _ = self.user_tracker.insert(tracked, id_in_thread);
break;
}

View File

@ -238,6 +238,26 @@ impl Tracker {
}
}
#[derive(Debug, Eq, PartialEq)]
pub(crate) enum StreamData {
/// not enough data to return somthing to the user
NotReady = 0,
/// we can return something to the user
Ready,
}
impl ::core::ops::BitOr for StreamData {
type Output = Self;
// Required method
fn bitor(self, other: Self) -> Self::Output {
if self == StreamData::Ready || other == StreamData::Ready {
StreamData::Ready
} else {
StreamData::NotReady
}
}
}
/// Actual stream-tracking structure
#[derive(Debug, Clone)]
pub(crate) struct Stream {
@ -254,7 +274,7 @@ impl Stream {
data: Tracker::new(kind, rand),
}
}
pub(crate) fn recv(&mut self, chunk: Chunk) -> Result<(), Error> {
pub(crate) fn recv(&mut self, chunk: Chunk) -> Result<StreamData, Error> {
match &mut self.data {
Tracker::ROB(tracker) => tracker.recv(chunk),
}

View File

@ -2,7 +2,9 @@
//! AKA: TCP-like
use crate::{
connection::stream::{Chunk, Error, Sequence, SequenceEnd, SequenceStart},
connection::stream::{
Chunk, Error, Sequence, SequenceEnd, SequenceStart, StreamData,
},
enc::Random,
};
@ -93,7 +95,7 @@ impl ReliableOrderedBytestream {
((self.pivot as usize + data_len) % self.data.len()) as u32;
ret
}
pub(crate) fn recv(&mut self, chunk: Chunk) -> Result<(), Error> {
pub(crate) fn recv(&mut self, chunk: Chunk) -> Result<StreamData, Error> {
if !chunk
.sequence
.is_between(self.window_start, self.window_end)
@ -106,7 +108,7 @@ impl ReliableOrderedBytestream {
chunk.data.len(),
);
if maxlen == 0 {
// or empty chunk, but we don't care
// empty window or empty chunk, but we don't care
return Err(Error::OutOfWindow);
}
// translate Sequences to offsets in self.data
@ -119,7 +121,7 @@ impl ReliableOrderedBytestream {
let mut copy_ranges = Vec::new();
let mut to_delete = Vec::new();
let mut to_add = Vec::new();
// note: te included ranges are (INCLUSIVE, INCLUSIVE)
// note: the ranges are (INCLUSIVE, INCLUSIVE)
for (idx, el) in self.missing.iter_mut().enumerate() {
let missing_from = self.window_start.offset(el.0);
if missing_from > offset_end {
@ -146,7 +148,6 @@ impl ReliableOrderedBytestream {
if missing_to > offset_end {
// [..chunk..]
// [....missing....]
// chunk is in the middle of a missing fragment
to_add.push((
el.0.plus_u32(((offset_end - missing_from) + 1) as u32),
el.1,
@ -156,16 +157,12 @@ impl ReliableOrderedBytestream {
} else if offset <= missing_to {
// [....chunk....]
// [...missing...]
// chunk
copy_ranges.push((offset, (missing_to - 0)));
el.1 =
el.0.plus_u32(((offset_end - missing_from) - 1) as u32);
}
}
}
self.missing.append(&mut to_add);
self.missing
.sort_by(|(from_a, _), (from_b, _)| from_a.0 .0.cmp(&from_b.0 .0));
{
let mut deleted = 0;
for idx in to_delete.into_iter() {
@ -173,6 +170,10 @@ impl ReliableOrderedBytestream {
deleted = deleted + 1;
}
}
self.missing.append(&mut to_add);
self.missing
.sort_by(|(from_a, _), (from_b, _)| from_a.0 .0.cmp(&from_b.0 .0));
// copy only the missing data
let (first, second) = self.data[..].split_at_mut(self.pivot as usize);
for (from, to) in copy_ranges.into_iter() {
@ -198,7 +199,12 @@ impl ReliableOrderedBytestream {
.copy_from_slice(&data[data_from..data_to]);
}
}
Ok(())
if self.missing.len() == 0
|| self.window_start.offset(self.missing[0].0) == 0
{
Ok(StreamData::Ready)
} else {
Ok(StreamData::NotReady)
}
}
}

View File

@ -11,8 +11,8 @@ use crate::{
},
packet::{self, Packet},
socket::{UdpClient, UdpServer},
stream, AuthSrvConn, ConnList, Connection, IDSend, ServiceConn,
UserConnTracker,
stream, AuthSrvConn, ConnList, ConnTracker, Connection, IDSend,
ServiceConn,
},
dnssec,
enc::{
@ -46,6 +46,16 @@ pub(crate) struct ConnectInfo {
// TODO: UserID, Token information
}
/// Connection event. Mostly used to give the data to the user
#[derive(Debug, Eq, PartialEq, Clone)]
#[non_exhaustive]
pub enum Event {
/// Work loop has exited. nothing more to do
End,
/// Data from a connection
Data(Vec<u8>),
}
pub(crate) enum Work {
/// ask the thread to report to the main thread the total number of
/// connections present
@ -53,8 +63,8 @@ pub(crate) enum Work {
Connect(ConnectInfo),
DropHandshake(KeyID),
Recv(RawUdp),
UserSend((UserConnTracker, stream::ID, Vec<u8>)),
SendData((UserConnTracker, ::tokio::time::Instant)),
UserSend((ConnTracker, stream::ID, Vec<u8>)),
SendData((ConnTracker, ::tokio::time::Instant)),
}
/// Actual worker implementation.
@ -136,7 +146,7 @@ impl Worker {
}
/// Continuously loop and process work as needed
pub async fn work_loop(&mut self) {
pub async fn work_loop(&mut self) -> Result<Event, crate::Error> {
'mainloop: loop {
let next_timer = self.work_timers.get_next();
::tokio::pin!(next_timer);
@ -436,7 +446,7 @@ impl Worker {
}
Work::UserSend((tracker, stream, data)) => {
let conn = match self.connections.get_mut(tracker) {
None => return,
None => continue,
Some(conn) => conn,
};
use connection::Enqueue;
@ -468,21 +478,21 @@ impl Worker {
.queue_sender
.send(Work::SendData((tracker, instant)))
.await;
return;
continue;
}
let mut raw: Vec<u8> = Vec::with_capacity(1200);
raw.resize(raw.capacity(), 0);
let conn = match self.connections.get_mut(tracker) {
None => return,
None => continue,
Some(conn) => conn,
};
let pkt = match conn.write_pkt(&mut raw) {
Ok(pkt) => pkt,
Err(enc::Error::NotEnoughData(0)) => return,
Err(enc::Error::NotEnoughData(0)) => continue,
Err(e) => {
::tracing::error!("Packet generation: {:?}", e);
return;
continue;
}
};
let dest = conn.send_addr;
@ -493,6 +503,7 @@ impl Worker {
}
}
}
Ok(Event::End)
}
/// Read and do stuff with the raw udp packet
async fn recv(&mut self, mut udp: RawUdp) {
@ -527,8 +538,12 @@ impl Worker {
None => return,
Some(conn) => conn,
};
if let Err(e) = conn.recv(udp) {
::tracing::trace!("Conn Recv: {:?}", e.to_string());
match conn.recv(udp) {
Ok(stream::StreamData::NotReady) => {}
Ok(stream::StreamData::Ready) => {
//
}
Err(e) => ::tracing::trace!("Conn Recv: {:?}", e.to_string()),
}
}
/// Receive an handshake packet
@ -693,7 +708,7 @@ impl Worker {
};
let authsrv_conn = AuthSrvConn(connection::Conn {
queue: self.queue_sender.clone(),
conn: track_auth_conn,
fast: track_auth_conn,
});
let mut service_conn = None;
if cci.service_id != auth::SERVICEID_AUTH {
@ -735,7 +750,7 @@ impl Worker {
};
service_conn = Some(ServiceConn(connection::Conn {
queue: self.queue_sender.clone(),
conn: track_serv_conn,
fast: track_serv_conn,
}));
}
let _ = cci.answer.send(Ok(handshake::tracker::ConnectOk {

View File

@ -34,7 +34,7 @@ use crate::{
AuthServerConnections, Packet,
},
inner::{
worker::{ConnectInfo, RawUdp, Work, Worker},
worker::{ConnectInfo, Event, RawUdp, Work, Worker},
ThreadTracker,
},
};
@ -638,7 +638,16 @@ impl Fenrir {
Ok(worker) => worker,
Err(_) => return,
};
worker.work_loop().await
loop {
match worker.work_loop().await {
Ok(_) => continue,
Ok(Event::End) => break,
Err(e) => {
::tracing::error!("Worker: {:?}", e);
break;
}
}
}
});
});
loop {