[stream] fragment, uud free/get

Signed-off-by: Luca Fulchir <luca.fulchir@runesauth.com>
This commit is contained in:
Luca Fulchir 2025-03-28 18:35:44 +01:00
parent 62a71a2af5
commit 9ec52a0151
Signed by: luca.fulchir
GPG Key ID: 8F6440603D13A78E
4 changed files with 273 additions and 156 deletions

18
flake.lock generated
View File

@ -20,11 +20,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1741862977,
"narHash": "sha256-prZ0M8vE/ghRGGZcflvxCu40ObKaB+ikn74/xQoNrGQ=",
"lastModified": 1742751704,
"narHash": "sha256-rBfc+H1dDBUQ2mgVITMGBPI1PGuCznf9rcWX/XIULyE=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "cdd2ef009676ac92b715ff26630164bb88fec4e0",
"rev": "f0946fa5f1fb876a9dc2e1850d9d3a4e3f914092",
"type": "github"
},
"original": {
@ -36,11 +36,11 @@
},
"nixpkgs-unstable": {
"locked": {
"lastModified": 1741851582,
"narHash": "sha256-cPfs8qMccim2RBgtKGF+x9IBCduRvd/N5F4nYpU0TVE=",
"lastModified": 1742889210,
"narHash": "sha256-hw63HnwnqU3ZQfsMclLhMvOezpM7RSB0dMAtD5/sOiw=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "6607cf789e541e7873d40d3a8f7815ea92204f32",
"rev": "698214a32beb4f4c8e3942372c694f40848b360d",
"type": "github"
},
"original": {
@ -65,11 +65,11 @@
]
},
"locked": {
"lastModified": 1742005800,
"narHash": "sha256-6wuOGWkyW6R4A6Th9NMi6WK2jjddvZt7V2+rLPk6L3o=",
"lastModified": 1742956365,
"narHash": "sha256-Slrqmt6kJ/M7Z/ce4ebQWsz2aeEodrX56CsupOEPoz0=",
"owner": "oxalica",
"repo": "rust-overlay",
"rev": "028cd247a6375f83b94adc33d83676480fc9c294",
"rev": "a0e3395c63cdbc9c1ec17915f8328c077c79c4a1",
"type": "github"
},
"original": {

View File

@ -29,6 +29,38 @@ pub enum Kind {
UUDL,
}
/// Tracking for a contiguous set of data
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum Fragment {
/// Beginning, no end
Start((SequenceStart, SequenceEnd)),
/// Neither beginning nor end
Middle((SequenceStart, SequenceEnd)),
/// No beginning, but with end
End((SequenceStart, SequenceEnd)),
/// both beginning and end, waiting to be delivered to the user
Ready((SequenceStart, SequenceEnd)),
/// both beginning and end, already delivered to the user
Delivered((SequenceStart, SequenceEnd)),
/// both beginning and end, data might not be available anymore
Deallocated((SequenceStart, SequenceEnd)),
}
impl Fragment {
// FIXME: sequence start/end?
/// extract the sequences from the fragment
pub fn get_seqs(&self) -> (SequenceStart, SequenceEnd) {
match self {
Fragment::Start((f, t))
| Fragment::Middle((f, t))
| Fragment::End((f, t))
| Fragment::Ready((f, t))
| Fragment::Delivered((f, t))
| Fragment::Deallocated((f, t)) => (*f, *t),
}
}
}
/// Id of the stream
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct ID(pub u16);
@ -60,7 +92,7 @@ impl ChunkLen {
}
//TODO: make pub?
#[derive(Debug, Copy, Clone)]
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub(crate) struct SequenceStart(pub(crate) Sequence);
impl SequenceStart {
pub(crate) fn offset(&self, seq: Sequence) -> usize {
@ -86,7 +118,7 @@ impl ::core::ops::AddAssign<u32> for SequenceStart {
}
// SequenceEnd is INCLUSIVE
#[derive(Debug, Copy, Clone)]
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub(crate) struct SequenceEnd(pub(crate) Sequence);
impl ::core::ops::Add<u32> for SequenceEnd {

View File

@ -67,10 +67,8 @@ impl ReliableOrderedBytestream {
let mut ret = Vec::with_capacity(self.data.len());
ret.extend_from_slice(first);
ret.extend_from_slice(second);
self.window_start =
self.window_start + (ret.len() as u32);
self.window_end =
self.window_end + (ret.len() as u32);
self.window_start = self.window_start + (ret.len() as u32);
self.window_end = self.window_end + (ret.len() as u32);
self.data.clear();
return ret;
}
@ -78,10 +76,8 @@ impl ReliableOrderedBytestream {
let last_missing_idx = self.missing.len() - 1;
let mut last_missing = &mut self.missing[last_missing_idx];
last_missing.1 = last_missing.1 + (data_len as u32);
self.window_start =
self.window_start + (data_len as u32);
self.window_end =
self.window_end + (data_len as u32);
self.window_start = self.window_start + (data_len as u32);
self.window_end = self.window_end + (data_len as u32);
let mut ret = Vec::with_capacity(data_len);
let (first, second) = self.data[..].split_at(self.pivot as usize);
@ -157,8 +153,7 @@ impl ReliableOrderedBytestream {
// [....chunk....]
// [...missing...]
copy_ranges.push((offset, (missing_to - 0)));
el.1 =
el.0 + (((offset_end - missing_from) - 1) as u32);
el.1 = el.0 + (((offset_end - missing_from) - 1) as u32);
}
}
}

View File

@ -7,80 +7,70 @@
use crate::{
connection::stream::{
Chunk, Error, Sequence, SequenceEnd, SequenceStart, StreamData,
Chunk, Error, Fragment, Sequence, SequenceEnd, SequenceStart,
StreamData,
},
enc::Random,
};
use ::core::{
cmp::{self, Ordering},
marker::PhantomData,
num::Wrapping,
ops,
};
use ::std::collections::{BTreeMap, VecDeque};
#[cfg(test)]
mod tests;
#[derive(Debug, PartialEq, Eq)]
enum Fragment {
Start((Sequence, Sequence)),
Middle((Sequence, Sequence)),
End((Sequence, Sequence)),
Full((Sequence, Sequence)),
Delivered((Sequence, Sequence)),
}
impl Fragment {
// FIXME: sequence start/end?
fn get_seqs(&self) -> (Sequence, Sequence) {
match self {
Fragment::Start((f, t))
| Fragment::Middle((f, t))
| Fragment::End((f, t))
| Fragment::Full((f, t))
| Fragment::Delivered((f, t)) => (*f, *t),
}
}
fn is_start(&self) -> bool {
match self {
Fragment::Start(_) | Fragment::Full(_) | Fragment::Delivered(_) => {
true
}
Fragment::End(_) | Fragment::Middle(_) => false,
}
}
fn is_end(&self) -> bool {
match self {
Fragment::End(_) | Fragment::Full(_) | Fragment::Delivered(_) => {
true
}
Fragment::Start(_) | Fragment::Middle(_) => false,
}
}
}
/*
impl ::std::cmp::PartialEq<Sequence> for Fragment {
fn eq(&self, other: &Sequence) -> bool {
self.get_seq() == *other
}
}
impl ::std::cmp::PartialOrd<Sequence> for Fragment {
fn partial_cmp(&self, other: &Sequence) -> Option<::std::cmp::Ordering> {
Some(self.get_seq().cmp(other))
}
}
impl ::std::cmp::PartialOrd<Fragment> for Fragment {
fn partial_cmp(&self, other: &Fragment) -> Option<::std::cmp::Ordering> {
Some(self.get_seq().cmp(&other.get_seq()))
}
}
impl Ord for Fragment {
fn cmp(&self, other: &Fragment) -> ::std::cmp::Ordering {
self.get_seq().cmp(&other.get_seq())
}
}
*/
type Timer = u64;
pub struct Data<'a> {
data_first: &'a mut [u8],
data_second: &'a mut [u8],
pub from: SequenceStart,
//pub(crate) stream: &'a Uud,
pub(crate) stream: ::std::ptr::NonNull<Uud>,
_not_send_sync: PhantomData<*const ()>,
}
impl<'a> Drop for Data<'a> {
fn drop(&mut self) {
// safe because we are !Send
#[allow(unsafe_code)]
unsafe {
let uud = self.stream.as_mut();
uud.free(
self.from,
(self.data_first.len() + self.data_second.len()) as u32,
);
}
}
}
impl<'a> ops::Index<usize> for Data<'a> {
type Output = u8;
fn index(&self, index: usize) -> &Self::Output {
let first_len = self.data_first.len();
if index < first_len {
return &self.data_first[index];
}
return &self.data_second[index - first_len];
}
}
impl<'a> ops::IndexMut<usize> for Data<'a> {
fn index_mut(&mut self, index: usize) -> &mut Self::Output {
let first_len = self.data_first.len();
if index < first_len {
return &mut self.data_first[index];
}
return &mut self.data_second[index - first_len];
}
}
pub(crate) struct Uud {
pub(crate) window_start: SequenceStart,
window_end: SequenceEnd,
@ -128,6 +118,78 @@ impl Uud {
pub(crate) fn window_size(&self) -> u32 {
self.data.len() as u32
}
pub(crate) fn get(&mut self) -> Option<Data> {
let self_ptr = ::std::ptr::NonNull::new(self).unwrap();
for track in self.track.iter_mut() {
if let Fragment::Ready((start, end)) = track.0 {
let data_from = (self.window_start.offset(start.0)
+ self.pivot as usize)
% self.data.len();
let data_to = (self.window_start.offset(end.0)
+ self.pivot as usize)
% self.data.len();
track.0 = Fragment::Delivered((start, end));
let first: &mut [u8];
let second: &mut [u8];
if data_from < data_to {
let (tmp_first, tmp_second) =
self.data.split_at_mut(data_to);
first = &mut tmp_first[data_from..];
second = &mut tmp_second[0..0];
} else {
let (tmp_second, tmp_first) =
self.data.split_at_mut(self.pivot as usize);
first = &mut tmp_first[(data_from - self.pivot as usize)..];
second = &mut tmp_second[..data_to];
}
return Some(Data {
from: start,
data_first: first,
data_second: second,
stream: self_ptr,
_not_send_sync: PhantomData::default(),
});
}
}
None
}
pub(crate) fn free(&mut self, from: SequenceStart, len: u32) {
if !from.0.is_between(self.window_start, self.window_end) {
return;
}
let mut first_keep = 0;
let mut last_sequence = self.window_start.0;
let mut deallocated = false;
for (idx, track) in self.track.iter_mut().enumerate() {
if let Fragment::Delivered((start, to)) = track.0 {
if start == from && to.0 == from.0 + len {
track.0 = Fragment::Deallocated((start, to));
deallocated = true;
if idx == first_keep {
first_keep = idx + 1;
last_sequence = to.0;
continue;
}
}
}
if idx == first_keep {
if let Fragment::Deallocated((_, to)) = track.0 {
first_keep = idx + 1;
last_sequence = to.0;
continue;
}
}
if deallocated {
break;
}
}
self.track.drain(..first_keep);
self.pivot = ((self.pivot as usize
+ self.window_start.offset(last_sequence))
% self.data.len()) as u32;
}
pub(crate) fn recv(&mut self, chunk: Chunk) -> Result<StreamData, Error> {
let chunk_to = chunk.sequence + chunk.data.len() as u32;
if !chunk
@ -164,16 +226,14 @@ impl Uud {
let mut last_usable = self.window_end.0;
let mut ret = StreamData::NotReady;
let mut copy_data_idx_from = 0;
// FIXME: and on receiving first fragment after the second, or empty
// track?
for (idx, (fragment, _)) in self.track.iter_mut().enumerate().rev() {
let (from, to) = fragment.get_seqs();
let to_next = to + 1;
match to_next.cmp_in_window(self.window_start, chunk.sequence) {
::core::cmp::Ordering::Equal => {
match to_next.0.cmp_in_window(self.window_start, chunk.sequence) {
Ordering::Equal => {
// `chunk` is immediately after `fragment`
if !chunk_to.is_between(
SequenceStart(to_next),
SequenceStart(to_next.0),
SequenceEnd(last_usable),
) {
return Err(Error::Reconstructing);
@ -186,7 +246,7 @@ impl Uud {
return Err(Error::WrongFlags);
}
if chunk_flag_end {
*fragment = Fragment::Full((
*fragment = Fragment::Ready((
from,
to + (data.len() as u32),
));
@ -217,16 +277,23 @@ impl Uud {
as usize;
}
Fragment::End(_)
| Fragment::Full(_)
| Fragment::Delivered(_) => {
| Fragment::Ready(_)
| Fragment::Delivered(_)
| Fragment::Deallocated(_) => {
if !chunk.flag_start {
return Err(Error::WrongFlags);
}
let toinsert = if chunk_flag_end {
ret = StreamData::Ready;
Fragment::Full((chunk.sequence, chunk_to))
Fragment::Ready((
SequenceStart(chunk.sequence),
SequenceEnd(chunk_to),
))
} else {
Fragment::Start((chunk.sequence, chunk_to))
Fragment::Start((
SequenceStart(chunk.sequence),
SequenceEnd(chunk_to),
))
};
self.track.insert(idx + 1, (toinsert, 0));
copy_data_idx_from =
@ -236,11 +303,11 @@ impl Uud {
}
break;
}
::core::cmp::Ordering::Less => {
Ordering::Less => {
// there is a data hole between `chunk` and `fragment`
if !chunk_to.is_between(
SequenceStart(to_next),
SequenceStart(to_next.0),
SequenceEnd(last_usable),
) {
return Err(Error::Reconstructing);
@ -248,15 +315,27 @@ impl Uud {
let toinsert = if chunk.flag_start {
if chunk_flag_end {
ret = StreamData::Ready;
Fragment::Full((chunk.sequence, chunk_to))
Fragment::Ready((
SequenceStart(chunk.sequence),
SequenceEnd(chunk_to),
))
} else {
Fragment::Start((chunk.sequence, chunk_to))
Fragment::Start((
SequenceStart(chunk.sequence),
SequenceEnd(chunk_to),
))
}
} else {
if chunk_flag_end {
Fragment::End((chunk.sequence, chunk_to))
Fragment::End((
SequenceStart(chunk.sequence),
SequenceEnd(chunk_to),
))
} else {
Fragment::Middle((chunk.sequence, chunk_to))
Fragment::Middle((
SequenceStart(chunk.sequence),
SequenceEnd(chunk_to),
))
}
};
self.track.insert(idx + 1, (toinsert, 0));
@ -264,12 +343,12 @@ impl Uud {
chunk.sequence.diff_from(self.window_start.0) as usize;
break;
}
::core::cmp::Ordering::Greater => {
Ordering::Greater => {
// to_next > chunk.sequence
// `fragment` is too new, need to look at older ones
if from.cmp_in_window(self.window_start, chunk.sequence)
!= ::core::cmp::Ordering::Greater
if from.0.cmp_in_window(self.window_start, chunk.sequence)
!= Ordering::Greater
{
// to_next > chunk.sequence >= from
// overlapping not yet allowed
@ -277,45 +356,45 @@ impl Uud {
}
if idx == 0 {
// check if we can add before everything
if chunk_to == from {
if fragment.is_start() {
if chunk_to == from.0 {
match fragment {
Fragment::Middle(_) => {
if chunk.flag_start {
*fragment = Fragment::Start((
SequenceStart(chunk.sequence),
to,
));
} else {
*fragment = Fragment::Middle((
SequenceStart(chunk.sequence),
to,
));
}
}
Fragment::End(_) => {
if chunk.flag_start {
*fragment = Fragment::Ready((
SequenceStart(chunk.sequence),
to,
));
ret = StreamData::Ready;
} else {
*fragment = Fragment::End((
SequenceStart(chunk.sequence),
to,
));
}
}
Fragment::Start(_)
| Fragment::Ready(_)
| Fragment::Delivered(_)
| Fragment::Deallocated(_) => {
if chunk_flag_end {
// add, don't merge
} else {
// fragment.start, but !chunk.end
return Err(Error::WrongFlags);
}
} else {
if chunk_flag_end {
//chunk.end but !fragment.start
return Err(Error::WrongFlags);
} else {
if chunk.flag_start {
if fragment.is_end() {
*fragment = Fragment::Full((
chunk.sequence,
to,
));
ret = StreamData::Ready;
} else {
*fragment = Fragment::Start((
chunk.sequence,
to,
));
}
} else {
if fragment.is_end() {
*fragment = Fragment::End((
chunk.sequence,
to,
));
} else {
*fragment = Fragment::Middle((
chunk.sequence,
to,
));
}
}
}
}
copy_data_idx_from =
@ -327,15 +406,27 @@ impl Uud {
let toinsert = if chunk.flag_start {
if chunk_flag_end {
ret = StreamData::Ready;
Fragment::Full((chunk.sequence, chunk_to))
Fragment::Ready((
SequenceStart(chunk.sequence),
SequenceEnd(chunk_to),
))
} else {
Fragment::Start((chunk.sequence, chunk_to))
Fragment::Start((
SequenceStart(chunk.sequence),
SequenceEnd(chunk_to),
))
}
} else {
if chunk_flag_end {
Fragment::End((chunk.sequence, chunk_to))
Fragment::End((
SequenceStart(chunk.sequence),
SequenceEnd(chunk_to),
))
} else {
Fragment::Middle((chunk.sequence, chunk_to))
Fragment::Middle((
SequenceStart(chunk.sequence),
SequenceEnd(chunk_to),
))
}
};
self.track.insert(0, (toinsert, 0));
@ -344,7 +435,7 @@ impl Uud {
as usize;
break;
}
last_usable = from - 1;
last_usable = from.0 - 1;
}
}
}
@ -357,7 +448,7 @@ impl Uud {
let data_pivot = self.data.len() - data_idx_from;
let (first, second) = data.split_at(data_pivot);
self.data[data_idx_from..].copy_from_slice(&first);
self.data[..data_idx_to].copy_from_slice(&data);
self.data[..data_idx_to].copy_from_slice(&second);
}
Ok(ret)
}
@ -419,9 +510,8 @@ impl UnreliableUnorderedDatagram {
ret.extend_from_slice(first);
ret.extend_from_slice(second);
self.window_start += ret.len() as u32;
self.window_end = SequenceEnd(Sequence(
::core::num::Wrapping::<u32>(ret.len() as u32),
));
self.window_end =
SequenceEnd(Sequence(Wrapping::<u32>(ret.len() as u32)));
self.data.clear();
return ret;
}
@ -434,7 +524,7 @@ impl UnreliableUnorderedDatagram {
let mut ret = Vec::with_capacity(data_len);
let (first, second) = self.data[..].split_at(self.pivot as usize);
let first_len = ::core::cmp::min(data_len, first.len());
let first_len = cmp::min(data_len, first.len());
let second_len = data_len - first_len;
ret.extend_from_slice(&first[..first_len]);
@ -527,7 +617,7 @@ impl UnreliableUnorderedDatagram {
let to = to + 1;
if from <= first.len() {
let first_from = from;
let first_to = ::core::cmp::min(first.len(), to);
let first_to = cmp::min(first.len(), to);
let data_first_from = from - offset;
let data_first_to = first_to - offset;
first[first_from..first_to]