libFenrir/src/connection/stream/mod.rs
Luca Fulchir 62a71a2af5
[transport] uud/uudl, first tests
Signed-off-by: Luca Fulchir <luca.fulchir@runesauth.com>
2025-03-21 17:57:08 +01:00

433 lines
12 KiB
Rust

//! Here we implement the multiplexing stream feature of Fenrir
//!
//! For now we will only have the TCP-like, reliable, in-order delivery
mod errors;
mod rob;
mod uud;
mod uudl;
pub use errors::Error;
use crate::{
connection::stream::{
rob::ReliableOrderedBytestream, uud::UnreliableUnorderedDatagram,
uudl::UnreliableUnorderedDatagramLimited,
},
enc::Random,
};
/// Kind of stream. any combination of:
/// reliable/unreliable ordered/unordered, bytestream/datagram
#[derive(Debug, Copy, Clone)]
#[repr(u8)]
pub enum Kind {
/// ROB: Reliable, Ordered, Bytestream
/// AKA: TCP-like
ROB = 0,
/// UUDL: Unreliable, Unordered, Datagram Limited
/// Aka: UDP-like. Data limited to the packet size
UUDL,
}
/// Id of the stream
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct ID(pub u16);
impl ID {
/// Length of the serialized field
pub const fn len() -> usize {
2
}
/// Minimum possible Stream ID (u16::MIN)
pub const fn min() -> Self {
Self(u16::MIN)
}
/// Maximum possible Stream ID (u16::MAX)
pub const fn max() -> Self {
Self(u16::MAX)
}
}
/// length of the chunk
#[derive(Debug, Copy, Clone)]
pub struct ChunkLen(pub u16);
impl ChunkLen {
/// Length of the serialized field
pub const fn len() -> usize {
2
}
}
//TODO: make pub?
#[derive(Debug, Copy, Clone)]
pub(crate) struct SequenceStart(pub(crate) Sequence);
impl SequenceStart {
pub(crate) fn offset(&self, seq: Sequence) -> usize {
if self.0.0 <= seq.0 {
(seq.0 - self.0.0).0 as usize
} else {
(seq.0 + (Sequence::max().0 - self.0.0)).0 as usize
}
}
}
impl ::core::ops::Add<u32> for SequenceStart {
type Output = SequenceStart;
fn add(self, other: u32) -> SequenceStart {
SequenceStart(self.0 + other)
}
}
impl ::core::ops::AddAssign<u32> for SequenceStart {
fn add_assign(&mut self, other: u32) {
self.0 += other;
}
}
// SequenceEnd is INCLUSIVE
#[derive(Debug, Copy, Clone)]
pub(crate) struct SequenceEnd(pub(crate) Sequence);
impl ::core::ops::Add<u32> for SequenceEnd {
type Output = SequenceEnd;
fn add(self, other: u32) -> SequenceEnd {
SequenceEnd(self.0 + other)
}
}
impl ::core::ops::AddAssign<u32> for SequenceEnd {
fn add_assign(&mut self, other: u32) {
self.0 += other;
}
}
// TODO: how to tell the compiler we don't use the two most significant bits?
// maybe NonZero + always using 2nd most significant bit?
/// Sequence number to rebuild the stream correctly
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub struct Sequence(pub ::core::num::Wrapping<u32>);
impl Sequence {
const SEQ_NOFLAG: u32 = 0x3FFFFFFF;
/// return a new sequence number, starting at random
pub fn new(rand: &Random) -> Self {
let mut raw_seq: [u8; 4] = [0; 4];
rand.fill(&mut raw_seq);
let seq = u32::from_le_bytes(raw_seq);
Self(::core::num::Wrapping(seq & Self::SEQ_NOFLAG))
}
/// Length of the serialized field
pub const fn len() -> usize {
4
}
/// Maximum possible sequence
pub const fn min() -> Self {
Self(::core::num::Wrapping(0))
}
/// Maximum possible sequence
pub const fn max() -> Self {
Self(::core::num::Wrapping(Self::SEQ_NOFLAG))
}
pub(crate) fn is_between(
&self,
start: SequenceStart,
end: SequenceEnd,
) -> bool {
if start.0 < end.0 {
start.0.0 <= self.0 && self.0 <= end.0.0
} else {
start.0.0 <= self.0 || self.0 <= end.0.0
}
}
pub(crate) fn cmp_in_window(
&self,
window_start: SequenceStart,
compare: Sequence,
) -> ::core::cmp::Ordering {
let offset_self = self.0 - window_start.0.0;
let offset_compare = compare.0 - window_start.0.0;
return offset_self.cmp(&offset_compare);
}
pub(crate) fn remaining_window(&self, end: SequenceEnd) -> u32 {
if self.0 <= end.0.0 {
(end.0.0.0 - self.0.0) + 1
} else {
end.0.0.0 + 1 + (Self::max().0 - self.0).0
}
}
pub(crate) fn diff_from(self, other: Sequence) -> u32 {
assert!(
self.0.0 > other.0.0,
"Sequence::diff_from inverted parameters"
);
self.0.0 - other.0.0
}
}
impl ::core::ops::Sub<u32> for Sequence {
type Output = Self;
fn sub(self, other: u32) -> Self {
Self(::core::num::Wrapping(
(self.0 - ::core::num::Wrapping::<u32>(other)).0 & Self::SEQ_NOFLAG,
))
}
}
impl ::core::ops::Add<Sequence> for Sequence {
type Output = Self;
fn add(self, other: Self) -> Self {
Self(::core::num::Wrapping(
(self.0 + other.0).0 & Self::SEQ_NOFLAG,
))
}
}
impl ::core::ops::Add<u32> for Sequence {
type Output = Sequence;
fn add(self, other: u32) -> Sequence {
Sequence(self.0 + ::core::num::Wrapping::<u32>(other))
}
}
impl ::core::ops::AddAssign<u32> for Sequence {
fn add_assign(&mut self, other: u32) {
self.0 += ::core::num::Wrapping::<u32>(other);
}
}
/// Chunk of data representing a stream
/// Every chunk is as follows:
/// | id (2 bytes) | length (2 bytes) |
/// | flag_start (1 BIT) | flag_end (1 BIT) | sequence (30 bits) |
/// | ...data... |
#[derive(Debug, Clone)]
pub struct Chunk<'a> {
/// Id of the stream this chunk is part of
pub id: ID,
/// Is this the beginning of a message?
pub flag_start: bool,
/// Is this the end of a message?
pub flag_end: bool,
/// Sequence number to reconstruct the Stream
pub sequence: Sequence,
data: &'a [u8],
}
impl<'a> Chunk<'a> {
const FLAGS_EXCLUDED_BITMASK: u8 = 0x3F;
const FLAG_START_BITMASK: u8 = 0x80;
const FLAG_END_BITMASK: u8 = 0x40;
/// Return the length of the header of a Chunk
pub const fn headers_len() -> usize {
ID::len() + ChunkLen::len() + Sequence::len()
}
/// Returns the total length of the chunk, including headers
pub fn len(&self) -> usize {
ID::len() + ChunkLen::len() + Sequence::len() + self.data.len()
}
/// deserialize a chunk of a stream
pub fn deserialize(raw: &'a [u8]) -> Result<Self, Error> {
if raw.len() <= ID::len() + ChunkLen::len() + Sequence::len() {
return Err(Error::NotEnoughData(0));
}
let id = ID(u16::from_le_bytes(raw[0..ID::len()].try_into().unwrap()));
let mut bytes_next = ID::len() + ChunkLen::len();
let length = ChunkLen(u16::from_le_bytes(
raw[ID::len()..bytes_next].try_into().unwrap(),
));
if ID::len() + ChunkLen::len() + Sequence::len() + length.0 as usize
> raw.len()
{
return Err(Error::NotEnoughData(4));
}
let flag_start = (raw[bytes_next] & Self::FLAG_START_BITMASK) != 0;
let flag_end = (raw[bytes_next] & Self::FLAG_END_BITMASK) != 0;
let bytes = bytes_next + 1;
bytes_next = bytes + Sequence::len();
let mut sequence_bytes: [u8; Sequence::len()] =
raw[bytes..bytes_next].try_into().unwrap();
sequence_bytes[0] = sequence_bytes[0] & Self::FLAGS_EXCLUDED_BITMASK;
let sequence =
Sequence(::core::num::Wrapping(u32::from_le_bytes(sequence_bytes)));
Ok(Self {
id,
flag_start,
flag_end,
sequence,
data: &raw[bytes_next..(bytes_next + length.0 as usize)],
})
}
/// serialize a chunk of a stream
pub fn serialize(&self, raw_out: &mut [u8]) {
raw_out[0..ID::len()].copy_from_slice(&self.id.0.to_le_bytes());
let mut bytes_next = ID::len() + ChunkLen::len();
raw_out[ID::len()..bytes_next]
.copy_from_slice(&(self.data.len() as u16).to_le_bytes());
let bytes = bytes_next;
bytes_next = bytes_next + Sequence::len();
raw_out[bytes..bytes_next]
.copy_from_slice(&self.sequence.0.0.to_le_bytes());
let mut flag_byte = raw_out[bytes] & Self::FLAGS_EXCLUDED_BITMASK;
if self.flag_start {
flag_byte = flag_byte | Self::FLAG_START_BITMASK;
}
if self.flag_end {
flag_byte = flag_byte | Self::FLAG_END_BITMASK;
}
raw_out[bytes] = flag_byte;
let bytes = bytes_next;
bytes_next = bytes_next + self.data.len();
raw_out[bytes..bytes_next].copy_from_slice(&self.data);
}
}
/// Kind of stream. any combination of:
/// reliable/unreliable ordered/unordered, bytestream/datagram
/// differences from Kind:
/// * not public
/// * has actual data
#[derive(Debug)]
pub(crate) enum Tracker {
/// ROB: Reliable, Ordered, Bytestream
/// AKA: TCP-like
ROB(ReliableOrderedBytestream),
UUDL(UnreliableUnorderedDatagramLimited),
}
impl Tracker {
pub(crate) fn new(kind: Kind, rand: &Random) -> Self {
match kind {
Kind::ROB => Tracker::ROB(ReliableOrderedBytestream::new(rand)),
Kind::UUDL => {
Tracker::UUDL(UnreliableUnorderedDatagramLimited::new())
}
}
}
}
#[derive(Debug, Eq, PartialEq)]
pub(crate) enum StreamData {
/// not enough data to return somthing to the user
NotReady = 0,
/// we can return something to the user
Ready,
}
impl ::core::ops::BitOr for StreamData {
type Output = Self;
// Required method
fn bitor(self, other: Self) -> Self::Output {
if self == StreamData::Ready || other == StreamData::Ready {
StreamData::Ready
} else {
StreamData::NotReady
}
}
}
/// Actual stream-tracking structure
#[derive(Debug)]
pub(crate) struct Stream {
id: ID,
data: Tracker,
}
impl Stream {
pub(crate) fn new(kind: Kind, rand: &Random) -> Self {
let id: u16 = 0;
rand.fill(&mut id.to_le_bytes());
Self {
id: ID(id),
data: Tracker::new(kind, rand),
}
}
pub(crate) fn recv(&mut self, chunk: Chunk) -> Result<StreamData, Error> {
match &mut self.data {
Tracker::ROB(tracker) => tracker.recv(chunk),
Tracker::UUDL(tracker) => tracker.recv(chunk),
}
}
pub(crate) fn get(&mut self) -> (SequenceStart, Vec<u8>) {
match &mut self.data {
// FIXME
Tracker::ROB(tracker) => {
(SequenceStart(Sequence::min()), tracker.get())
}
Tracker::UUDL(tracker) => tracker.get(),
}
}
}
/// Track what has been sent and what has been ACK'd from a stream
#[derive(Debug)]
pub(crate) struct SendTracker {
queue: Vec<Vec<u8>>,
sent: Vec<usize>,
ackd: Vec<usize>,
chunk_started: bool,
is_datagram: bool,
next_sequence: Sequence,
}
impl SendTracker {
pub(crate) fn new(rand: &Random) -> Self {
Self {
queue: Vec::with_capacity(4),
sent: Vec::with_capacity(4),
ackd: Vec::with_capacity(4),
chunk_started: false,
is_datagram: false,
next_sequence: Sequence::new(rand),
}
}
/// Enqueue user data to be sent
pub(crate) fn enqueue(&mut self, data: Vec<u8>) {
self.queue.push(data);
self.sent.push(0);
self.ackd.push(0);
}
/// Write the user data to the buffer and mark it as sent
pub(crate) fn get(&mut self, out: &mut [u8]) -> usize {
let data = match self.queue.get(0) {
Some(data) => data,
None => return 0,
};
let len = ::std::cmp::min(out.len(), data.len());
out[..len].copy_from_slice(&data[self.sent[0]..len]);
self.sent[0] = self.sent[0] + len;
len
}
/// Mark the sent data as successfully received from the receiver
pub(crate) fn ack(&mut self, size: usize) {
todo!()
}
pub(crate) fn serialize(&mut self, id: ID, raw: &mut [u8]) -> usize {
let max_data_len = raw.len() - Chunk::headers_len();
let data_len = ::std::cmp::min(max_data_len, self.queue[0].len());
let flag_start = !self.chunk_started;
let flag_end = self.is_datagram && data_len == self.queue[0].len();
let chunk = Chunk {
id,
flag_start,
flag_end,
sequence: self.next_sequence,
data: &self.queue[0][..data_len],
};
self.next_sequence = Sequence(
self.next_sequence.0 + ::core::num::Wrapping(data_len as u32),
);
if chunk.flag_end {
self.chunk_started = false;
}
chunk.serialize(raw);
data_len
}
}