diff --git a/flake.lock b/flake.lock index 01b37d0..8ee4474 100644 --- a/flake.lock +++ b/flake.lock @@ -20,11 +20,11 @@ }, "nixpkgs": { "locked": { - "lastModified": 1741862977, - "narHash": "sha256-prZ0M8vE/ghRGGZcflvxCu40ObKaB+ikn74/xQoNrGQ=", + "lastModified": 1742751704, + "narHash": "sha256-rBfc+H1dDBUQ2mgVITMGBPI1PGuCznf9rcWX/XIULyE=", "owner": "nixos", "repo": "nixpkgs", - "rev": "cdd2ef009676ac92b715ff26630164bb88fec4e0", + "rev": "f0946fa5f1fb876a9dc2e1850d9d3a4e3f914092", "type": "github" }, "original": { @@ -36,11 +36,11 @@ }, "nixpkgs-unstable": { "locked": { - "lastModified": 1741851582, - "narHash": "sha256-cPfs8qMccim2RBgtKGF+x9IBCduRvd/N5F4nYpU0TVE=", + "lastModified": 1742889210, + "narHash": "sha256-hw63HnwnqU3ZQfsMclLhMvOezpM7RSB0dMAtD5/sOiw=", "owner": "nixos", "repo": "nixpkgs", - "rev": "6607cf789e541e7873d40d3a8f7815ea92204f32", + "rev": "698214a32beb4f4c8e3942372c694f40848b360d", "type": "github" }, "original": { @@ -65,11 +65,11 @@ ] }, "locked": { - "lastModified": 1742005800, - "narHash": "sha256-6wuOGWkyW6R4A6Th9NMi6WK2jjddvZt7V2+rLPk6L3o=", + "lastModified": 1742956365, + "narHash": "sha256-Slrqmt6kJ/M7Z/ce4ebQWsz2aeEodrX56CsupOEPoz0=", "owner": "oxalica", "repo": "rust-overlay", - "rev": "028cd247a6375f83b94adc33d83676480fc9c294", + "rev": "a0e3395c63cdbc9c1ec17915f8328c077c79c4a1", "type": "github" }, "original": { diff --git a/src/connection/stream/mod.rs b/src/connection/stream/mod.rs index c63c3c3..7b92131 100644 --- a/src/connection/stream/mod.rs +++ b/src/connection/stream/mod.rs @@ -29,6 +29,38 @@ pub enum Kind { UUDL, } +/// Tracking for a contiguous set of data +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub enum Fragment { + /// Beginning, no end + Start((SequenceStart, SequenceEnd)), + /// Neither beginning nor end + Middle((SequenceStart, SequenceEnd)), + /// No beginning, but with end + End((SequenceStart, SequenceEnd)), + /// both beginning and end, waiting to be delivered to the user + Ready((SequenceStart, SequenceEnd)), + /// both beginning and end, already delivered to the user + Delivered((SequenceStart, SequenceEnd)), + /// both beginning and end, data might not be available anymore + Deallocated((SequenceStart, SequenceEnd)), +} + +impl Fragment { + // FIXME: sequence start/end? + /// extract the sequences from the fragment + pub fn get_seqs(&self) -> (SequenceStart, SequenceEnd) { + match self { + Fragment::Start((f, t)) + | Fragment::Middle((f, t)) + | Fragment::End((f, t)) + | Fragment::Ready((f, t)) + | Fragment::Delivered((f, t)) + | Fragment::Deallocated((f, t)) => (*f, *t), + } + } +} + /// Id of the stream #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct ID(pub u16); @@ -60,7 +92,7 @@ impl ChunkLen { } //TODO: make pub? -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, Eq, PartialEq)] pub(crate) struct SequenceStart(pub(crate) Sequence); impl SequenceStart { pub(crate) fn offset(&self, seq: Sequence) -> usize { @@ -86,7 +118,7 @@ impl ::core::ops::AddAssign for SequenceStart { } // SequenceEnd is INCLUSIVE -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, Eq, PartialEq)] pub(crate) struct SequenceEnd(pub(crate) Sequence); impl ::core::ops::Add for SequenceEnd { diff --git a/src/connection/stream/rob/mod.rs b/src/connection/stream/rob/mod.rs index 901dc69..6626b08 100644 --- a/src/connection/stream/rob/mod.rs +++ b/src/connection/stream/rob/mod.rs @@ -26,7 +26,7 @@ impl ReliableOrderedBytestream { pub(crate) fn new(rand: &Random) -> Self { let window_len = 1048576; // 1MB. should be enough for anybody. (lol) let window_start = SequenceStart(Sequence::new(rand)); - let window_end = SequenceEnd(window_start.0 +(window_len - 1)); + let window_end = SequenceEnd(window_start.0 + (window_len - 1)); let mut data = Vec::with_capacity(window_len as usize); data.resize(data.capacity(), 0); @@ -40,13 +40,13 @@ impl ReliableOrderedBytestream { } pub(crate) fn with_window_size(rand: &Random, size: u32) -> Self { assert!( - size < Sequence::max().0 .0, + size < Sequence::max().0.0, "Max window size is {}", - Sequence::max().0 .0 + Sequence::max().0.0 ); let window_len = size; let window_start = SequenceStart(Sequence::new(rand)); - let window_end = SequenceEnd(window_start.0 +(window_len - 1)); + let window_end = SequenceEnd(window_start.0 + (window_len - 1)); let mut data = Vec::with_capacity(window_len as usize); data.resize(data.capacity(), 0); @@ -67,10 +67,8 @@ impl ReliableOrderedBytestream { let mut ret = Vec::with_capacity(self.data.len()); ret.extend_from_slice(first); ret.extend_from_slice(second); - self.window_start = - self.window_start + (ret.len() as u32); - self.window_end = - self.window_end + (ret.len() as u32); + self.window_start = self.window_start + (ret.len() as u32); + self.window_end = self.window_end + (ret.len() as u32); self.data.clear(); return ret; } @@ -78,10 +76,8 @@ impl ReliableOrderedBytestream { let last_missing_idx = self.missing.len() - 1; let mut last_missing = &mut self.missing[last_missing_idx]; last_missing.1 = last_missing.1 + (data_len as u32); - self.window_start = - self.window_start + (data_len as u32); - self.window_end = - self.window_end + (data_len as u32); + self.window_start = self.window_start + (data_len as u32); + self.window_end = self.window_end + (data_len as u32); let mut ret = Vec::with_capacity(data_len); let (first, second) = self.data[..].split_at(self.pivot as usize); @@ -141,7 +137,7 @@ impl ReliableOrderedBytestream { // [....chunk....] // [...missing...] copy_ranges.push((missing_from, offset_end)); - el.0 +=((offset_end - missing_from) + 1) as u32; + el.0 += ((offset_end - missing_from) + 1) as u32; } } else if missing_from < offset { if missing_to > offset_end { @@ -157,8 +153,7 @@ impl ReliableOrderedBytestream { // [....chunk....] // [...missing...] copy_ranges.push((offset, (missing_to - 0))); - el.1 = - el.0 + (((offset_end - missing_from) - 1) as u32); + el.1 = el.0 + (((offset_end - missing_from) - 1) as u32); } } } @@ -171,7 +166,7 @@ impl ReliableOrderedBytestream { } self.missing.append(&mut to_add); self.missing - .sort_by(|(from_a, _), (from_b, _)| from_a.0 .0.cmp(&from_b.0 .0)); + .sort_by(|(from_a, _), (from_b, _)| from_a.0.0.cmp(&from_b.0.0)); // copy only the missing data let (first, second) = self.data[..].split_at_mut(self.pivot as usize); diff --git a/src/connection/stream/uud/mod.rs b/src/connection/stream/uud/mod.rs index f14e1f7..ebde42e 100644 --- a/src/connection/stream/uud/mod.rs +++ b/src/connection/stream/uud/mod.rs @@ -7,80 +7,70 @@ use crate::{ connection::stream::{ - Chunk, Error, Sequence, SequenceEnd, SequenceStart, StreamData, + Chunk, Error, Fragment, Sequence, SequenceEnd, SequenceStart, + StreamData, }, enc::Random, }; +use ::core::{ + cmp::{self, Ordering}, + marker::PhantomData, + num::Wrapping, + ops, +}; use ::std::collections::{BTreeMap, VecDeque}; #[cfg(test)] mod tests; -#[derive(Debug, PartialEq, Eq)] -enum Fragment { - Start((Sequence, Sequence)), - Middle((Sequence, Sequence)), - End((Sequence, Sequence)), - Full((Sequence, Sequence)), - Delivered((Sequence, Sequence)), -} - -impl Fragment { - // FIXME: sequence start/end? - fn get_seqs(&self) -> (Sequence, Sequence) { - match self { - Fragment::Start((f, t)) - | Fragment::Middle((f, t)) - | Fragment::End((f, t)) - | Fragment::Full((f, t)) - | Fragment::Delivered((f, t)) => (*f, *t), - } - } - fn is_start(&self) -> bool { - match self { - Fragment::Start(_) | Fragment::Full(_) | Fragment::Delivered(_) => { - true - } - Fragment::End(_) | Fragment::Middle(_) => false, - } - } - fn is_end(&self) -> bool { - match self { - Fragment::End(_) | Fragment::Full(_) | Fragment::Delivered(_) => { - true - } - Fragment::Start(_) | Fragment::Middle(_) => false, - } - } -} -/* -impl ::std::cmp::PartialEq for Fragment { - fn eq(&self, other: &Sequence) -> bool { - self.get_seq() == *other - } -} - -impl ::std::cmp::PartialOrd for Fragment { - fn partial_cmp(&self, other: &Sequence) -> Option<::std::cmp::Ordering> { - Some(self.get_seq().cmp(other)) - } -} - -impl ::std::cmp::PartialOrd for Fragment { - fn partial_cmp(&self, other: &Fragment) -> Option<::std::cmp::Ordering> { - Some(self.get_seq().cmp(&other.get_seq())) - } -} -impl Ord for Fragment { - fn cmp(&self, other: &Fragment) -> ::std::cmp::Ordering { - self.get_seq().cmp(&other.get_seq()) - } -} -*/ - type Timer = u64; +pub struct Data<'a> { + data_first: &'a mut [u8], + data_second: &'a mut [u8], + pub from: SequenceStart, + //pub(crate) stream: &'a Uud, + pub(crate) stream: ::std::ptr::NonNull, + _not_send_sync: PhantomData<*const ()>, +} + +impl<'a> Drop for Data<'a> { + fn drop(&mut self) { + // safe because we are !Send + #[allow(unsafe_code)] + unsafe { + let uud = self.stream.as_mut(); + uud.free( + self.from, + (self.data_first.len() + self.data_second.len()) as u32, + ); + } + } +} + +impl<'a> ops::Index for Data<'a> { + type Output = u8; + + fn index(&self, index: usize) -> &Self::Output { + let first_len = self.data_first.len(); + if index < first_len { + return &self.data_first[index]; + } + return &self.data_second[index - first_len]; + } +} + +impl<'a> ops::IndexMut for Data<'a> { + fn index_mut(&mut self, index: usize) -> &mut Self::Output { + let first_len = self.data_first.len(); + if index < first_len { + return &mut self.data_first[index]; + } + return &mut self.data_second[index - first_len]; + } +} + pub(crate) struct Uud { pub(crate) window_start: SequenceStart, window_end: SequenceEnd, @@ -128,6 +118,78 @@ impl Uud { pub(crate) fn window_size(&self) -> u32 { self.data.len() as u32 } + pub(crate) fn get(&mut self) -> Option { + let self_ptr = ::std::ptr::NonNull::new(self).unwrap(); + for track in self.track.iter_mut() { + if let Fragment::Ready((start, end)) = track.0 { + let data_from = (self.window_start.offset(start.0) + + self.pivot as usize) + % self.data.len(); + let data_to = (self.window_start.offset(end.0) + + self.pivot as usize) + % self.data.len(); + + track.0 = Fragment::Delivered((start, end)); + let first: &mut [u8]; + let second: &mut [u8]; + if data_from < data_to { + let (tmp_first, tmp_second) = + self.data.split_at_mut(data_to); + first = &mut tmp_first[data_from..]; + second = &mut tmp_second[0..0]; + } else { + let (tmp_second, tmp_first) = + self.data.split_at_mut(self.pivot as usize); + first = &mut tmp_first[(data_from - self.pivot as usize)..]; + second = &mut tmp_second[..data_to]; + } + + return Some(Data { + from: start, + data_first: first, + data_second: second, + stream: self_ptr, + _not_send_sync: PhantomData::default(), + }); + } + } + None + } + pub(crate) fn free(&mut self, from: SequenceStart, len: u32) { + if !from.0.is_between(self.window_start, self.window_end) { + return; + } + let mut first_keep = 0; + let mut last_sequence = self.window_start.0; + let mut deallocated = false; + for (idx, track) in self.track.iter_mut().enumerate() { + if let Fragment::Delivered((start, to)) = track.0 { + if start == from && to.0 == from.0 + len { + track.0 = Fragment::Deallocated((start, to)); + deallocated = true; + if idx == first_keep { + first_keep = idx + 1; + last_sequence = to.0; + continue; + } + } + } + if idx == first_keep { + if let Fragment::Deallocated((_, to)) = track.0 { + first_keep = idx + 1; + last_sequence = to.0; + continue; + } + } + if deallocated { + break; + } + } + self.track.drain(..first_keep); + self.pivot = ((self.pivot as usize + + self.window_start.offset(last_sequence)) + % self.data.len()) as u32; + } pub(crate) fn recv(&mut self, chunk: Chunk) -> Result { let chunk_to = chunk.sequence + chunk.data.len() as u32; if !chunk @@ -164,16 +226,14 @@ impl Uud { let mut last_usable = self.window_end.0; let mut ret = StreamData::NotReady; let mut copy_data_idx_from = 0; - // FIXME: and on receiving first fragment after the second, or empty - // track? for (idx, (fragment, _)) in self.track.iter_mut().enumerate().rev() { let (from, to) = fragment.get_seqs(); let to_next = to + 1; - match to_next.cmp_in_window(self.window_start, chunk.sequence) { - ::core::cmp::Ordering::Equal => { + match to_next.0.cmp_in_window(self.window_start, chunk.sequence) { + Ordering::Equal => { // `chunk` is immediately after `fragment` if !chunk_to.is_between( - SequenceStart(to_next), + SequenceStart(to_next.0), SequenceEnd(last_usable), ) { return Err(Error::Reconstructing); @@ -186,7 +246,7 @@ impl Uud { return Err(Error::WrongFlags); } if chunk_flag_end { - *fragment = Fragment::Full(( + *fragment = Fragment::Ready(( from, to + (data.len() as u32), )); @@ -217,16 +277,23 @@ impl Uud { as usize; } Fragment::End(_) - | Fragment::Full(_) - | Fragment::Delivered(_) => { + | Fragment::Ready(_) + | Fragment::Delivered(_) + | Fragment::Deallocated(_) => { if !chunk.flag_start { return Err(Error::WrongFlags); } let toinsert = if chunk_flag_end { ret = StreamData::Ready; - Fragment::Full((chunk.sequence, chunk_to)) + Fragment::Ready(( + SequenceStart(chunk.sequence), + SequenceEnd(chunk_to), + )) } else { - Fragment::Start((chunk.sequence, chunk_to)) + Fragment::Start(( + SequenceStart(chunk.sequence), + SequenceEnd(chunk_to), + )) }; self.track.insert(idx + 1, (toinsert, 0)); copy_data_idx_from = @@ -236,11 +303,11 @@ impl Uud { } break; } - ::core::cmp::Ordering::Less => { + Ordering::Less => { // there is a data hole between `chunk` and `fragment` if !chunk_to.is_between( - SequenceStart(to_next), + SequenceStart(to_next.0), SequenceEnd(last_usable), ) { return Err(Error::Reconstructing); @@ -248,15 +315,27 @@ impl Uud { let toinsert = if chunk.flag_start { if chunk_flag_end { ret = StreamData::Ready; - Fragment::Full((chunk.sequence, chunk_to)) + Fragment::Ready(( + SequenceStart(chunk.sequence), + SequenceEnd(chunk_to), + )) } else { - Fragment::Start((chunk.sequence, chunk_to)) + Fragment::Start(( + SequenceStart(chunk.sequence), + SequenceEnd(chunk_to), + )) } } else { if chunk_flag_end { - Fragment::End((chunk.sequence, chunk_to)) + Fragment::End(( + SequenceStart(chunk.sequence), + SequenceEnd(chunk_to), + )) } else { - Fragment::Middle((chunk.sequence, chunk_to)) + Fragment::Middle(( + SequenceStart(chunk.sequence), + SequenceEnd(chunk_to), + )) } }; self.track.insert(idx + 1, (toinsert, 0)); @@ -264,12 +343,12 @@ impl Uud { chunk.sequence.diff_from(self.window_start.0) as usize; break; } - ::core::cmp::Ordering::Greater => { + Ordering::Greater => { // to_next > chunk.sequence // `fragment` is too new, need to look at older ones - if from.cmp_in_window(self.window_start, chunk.sequence) - != ::core::cmp::Ordering::Greater + if from.0.cmp_in_window(self.window_start, chunk.sequence) + != Ordering::Greater { // to_next > chunk.sequence >= from // overlapping not yet allowed @@ -277,44 +356,44 @@ impl Uud { } if idx == 0 { // check if we can add before everything - if chunk_to == from { - if fragment.is_start() { - if chunk_flag_end { - // add, don't merge - } else { - //fragment.start, but !chunk.end - return Err(Error::WrongFlags); - } - } else { - if chunk_flag_end { - //chunk.end but !fragment.start - return Err(Error::WrongFlags); - } else { + if chunk_to == from.0 { + match fragment { + Fragment::Middle(_) => { if chunk.flag_start { - if fragment.is_end() { - *fragment = Fragment::Full(( - chunk.sequence, - to, - )); - ret = StreamData::Ready; - } else { - *fragment = Fragment::Start(( - chunk.sequence, - to, - )); - } + *fragment = Fragment::Start(( + SequenceStart(chunk.sequence), + to, + )); } else { - if fragment.is_end() { - *fragment = Fragment::End(( - chunk.sequence, - to, - )); - } else { - *fragment = Fragment::Middle(( - chunk.sequence, - to, - )); - } + *fragment = Fragment::Middle(( + SequenceStart(chunk.sequence), + to, + )); + } + } + Fragment::End(_) => { + if chunk.flag_start { + *fragment = Fragment::Ready(( + SequenceStart(chunk.sequence), + to, + )); + ret = StreamData::Ready; + } else { + *fragment = Fragment::End(( + SequenceStart(chunk.sequence), + to, + )); + } + } + Fragment::Start(_) + | Fragment::Ready(_) + | Fragment::Delivered(_) + | Fragment::Deallocated(_) => { + if chunk_flag_end { + // add, don't merge + } else { + // fragment.start, but !chunk.end + return Err(Error::WrongFlags); } } } @@ -327,15 +406,27 @@ impl Uud { let toinsert = if chunk.flag_start { if chunk_flag_end { ret = StreamData::Ready; - Fragment::Full((chunk.sequence, chunk_to)) + Fragment::Ready(( + SequenceStart(chunk.sequence), + SequenceEnd(chunk_to), + )) } else { - Fragment::Start((chunk.sequence, chunk_to)) + Fragment::Start(( + SequenceStart(chunk.sequence), + SequenceEnd(chunk_to), + )) } } else { if chunk_flag_end { - Fragment::End((chunk.sequence, chunk_to)) + Fragment::End(( + SequenceStart(chunk.sequence), + SequenceEnd(chunk_to), + )) } else { - Fragment::Middle((chunk.sequence, chunk_to)) + Fragment::Middle(( + SequenceStart(chunk.sequence), + SequenceEnd(chunk_to), + )) } }; self.track.insert(0, (toinsert, 0)); @@ -344,7 +435,7 @@ impl Uud { as usize; break; } - last_usable = from - 1; + last_usable = from.0 - 1; } } } @@ -357,7 +448,7 @@ impl Uud { let data_pivot = self.data.len() - data_idx_from; let (first, second) = data.split_at(data_pivot); self.data[data_idx_from..].copy_from_slice(&first); - self.data[..data_idx_to].copy_from_slice(&data); + self.data[..data_idx_to].copy_from_slice(&second); } Ok(ret) } @@ -419,9 +510,8 @@ impl UnreliableUnorderedDatagram { ret.extend_from_slice(first); ret.extend_from_slice(second); self.window_start += ret.len() as u32; - self.window_end = SequenceEnd(Sequence( - ::core::num::Wrapping::(ret.len() as u32), - )); + self.window_end = + SequenceEnd(Sequence(Wrapping::(ret.len() as u32))); self.data.clear(); return ret; } @@ -434,7 +524,7 @@ impl UnreliableUnorderedDatagram { let mut ret = Vec::with_capacity(data_len); let (first, second) = self.data[..].split_at(self.pivot as usize); - let first_len = ::core::cmp::min(data_len, first.len()); + let first_len = cmp::min(data_len, first.len()); let second_len = data_len - first_len; ret.extend_from_slice(&first[..first_len]); @@ -527,7 +617,7 @@ impl UnreliableUnorderedDatagram { let to = to + 1; if from <= first.len() { let first_from = from; - let first_to = ::core::cmp::min(first.len(), to); + let first_to = cmp::min(first.len(), to); let data_first_from = from - offset; let data_first_to = first_to - offset; first[first_from..first_to]