From 62a71a2af5cea7d4f0d482df8ad89b29932f94e6 Mon Sep 17 00:00:00 2001 From: Luca Fulchir Date: Fri, 21 Mar 2025 17:57:08 +0100 Subject: [PATCH] [transport] uud/uudl, first tests Signed-off-by: Luca Fulchir --- .gitignore | 1 + Cargo.toml | 34 +- flake.lock | 80 +--- flake.nix | 65 ++-- rustfmt.toml | 2 +- src/connection/handshake/tracker.rs | 16 +- src/connection/mod.rs | 7 +- src/connection/stream/errors.rs | 6 + src/connection/stream/mod.rs | 136 +++++-- src/connection/stream/rob/mod.rs | 25 +- src/connection/stream/rob/tests.rs | 24 +- src/connection/stream/uud/mod.rs | 557 ++++++++++++++++++++++++++++ src/connection/stream/uud/tests.rs | 249 +++++++++++++ src/connection/stream/uudl/mod.rs | 43 +++ src/connection/stream/uudl/tests.rs | 56 +++ src/dnssec/mod.rs | 7 +- src/enc/asym.rs | 3 +- src/enc/hkdf.rs | 5 +- src/enc/mod.rs | 2 +- src/enc/tests.rs | 4 +- src/inner/mod.rs | 2 +- src/inner/worker.rs | 2 +- src/tests.rs | 2 +- 23 files changed, 1143 insertions(+), 185 deletions(-) create mode 100644 src/connection/stream/uud/mod.rs create mode 100644 src/connection/stream/uud/tests.rs create mode 100644 src/connection/stream/uudl/mod.rs create mode 100644 src/connection/stream/uudl/tests.rs diff --git a/.gitignore b/.gitignore index 4ea67ad..3438345 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ *.swp /target /Cargo.lock +/flake.profile* diff --git a/Cargo.toml b/Cargo.toml index 0544e30..d857346 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,11 +2,11 @@ name = "fenrir" version = "0.1.0" -edition = "2021" +edition = "2024" # Fenrir won't be ready for a while, # we might as well use async fn in trait, which is nightly # remember to update this -rust-version = "1.67.0" +rust-version = "1.85.0" homepage = "https://git.runesauth.com/RunesAuth/libFenrir" repository = "https://git.runesauth.com/RunesAuth/libFenrir" license = "Apache-2.0 WITH LLVM-exception" @@ -21,16 +21,16 @@ publish = false [lib] -crate_type = [ "lib", "cdylib", "staticlib" ] +crate-type = [ "lib", "cdylib", "staticlib" ] [dependencies] # please keep these in alphabetical order -arc-swap = { version = "1.6" } +arc-swap = { version = "1.7" } arrayref = { version = "0.3" } -async-channel = { version = "1.8" } +async-channel = { version = "2.3" } # base85 repo has no tags, fix on a commit. v1.1.1 points to older, wrong version -base85 = { git = "https://gitlab.com/darkwyrm/base85", rev = "d98efbfd171dd9ba48e30a5c88f94db92fc7b3c6" } +base85 = { git = "https://gitlab.com/darkwyrm/base85", rev = "b5389888aca6208a7563c8dbf2af46a82e724fa1" } bitmaps = { version = "3.2" } chacha20poly1305 = { version = "0.10" } futures = { version = "0.3" } @@ -38,27 +38,27 @@ hkdf = { version = "0.12" } hwloc2 = {version = "2.2" } libc = { version = "0.2" } num-traits = { version = "0.2" } -num-derive = { version = "0.3" } +num-derive = { version = "0.4" } rand_core = {version = "0.6" } -ring = { version = "0.16" } +ring = { version = "0.17" } bincode = { version = "1.3" } sha3 = { version = "0.10" } -strum = { version = "0.24" } -strum_macros = { version = "0.24" } -thiserror = { version = "1.0" } +strum = { version = "0.26" } +strum_macros = { version = "0.26" } +thiserror = { version = "2.0" } tokio = { version = "1", features = ["full"] } # PERF: todo linux-only, behind "iouring" feature #tokio-uring = { version = "0.4" } tracing = { version = "0.1" } tracing-test = { version = "0.2" } -trust-dns-resolver = { version = "0.22", features = [ "dnssec-ring" ] } -trust-dns-client = { version = "0.22", features = [ "dnssec" ] } -trust-dns-proto = { version = "0.22" } +trust-dns-resolver = { version = "0.23", features = [ "dnssec-ring" ] } +trust-dns-client = { version = "0.23", features = [ "dnssec" ] } +trust-dns-proto = { version = "0.23" } # don't use stable dalek. forces zeroize 1.3, # breaks our and chacha20poly1305 # reason: zeroize is not pure rust, # so we can't have multiple versions of if -x25519-dalek = { version = "2.0.0-pre.1", features = [ "serde" ] } +x25519-dalek = { version = "2.0", features = [ "serde", "static_secrets" ] } zeroize = { version = "1" } [profile.dev] @@ -84,3 +84,7 @@ incremental = true codegen-units = 256 rpath = false +#[target.x86_64-unknown-linux-gnu] +#linker = "clang" +#rustflags = ["-C", "link-arg=--ld-path=mold"] + diff --git a/flake.lock b/flake.lock index 85c58c2..01b37d0 100644 --- a/flake.lock +++ b/flake.lock @@ -5,29 +5,11 @@ "systems": "systems" }, "locked": { - "lastModified": 1687171271, - "narHash": "sha256-BJlq+ozK2B1sJDQXS3tzJM5a+oVZmi1q0FlBK/Xqv7M=", + "lastModified": 1731533236, + "narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=", "owner": "numtide", "repo": "flake-utils", - "rev": "abfb11bd1aec8ced1c9bb9adfe68018230f4fb3c", - "type": "github" - }, - "original": { - "owner": "numtide", - "repo": "flake-utils", - "type": "github" - } - }, - "flake-utils_2": { - "inputs": { - "systems": "systems_2" - }, - "locked": { - "lastModified": 1681202837, - "narHash": "sha256-H+Rh19JDwRtpVPAWp64F+rlEtxUWBAQW28eAi3SRSzg=", - "owner": "numtide", - "repo": "flake-utils", - "rev": "cfacdce06f30d2b68473a46042957675eebb3401", + "rev": "11707dc2f618dd54ca8739b309ec4fc024de578b", "type": "github" }, "original": { @@ -38,27 +20,27 @@ }, "nixpkgs": { "locked": { - "lastModified": 1687555006, - "narHash": "sha256-GD2Kqb/DXQBRJcHqkM2qFZqbVenyO7Co/80JHRMg2U0=", + "lastModified": 1741862977, + "narHash": "sha256-prZ0M8vE/ghRGGZcflvxCu40ObKaB+ikn74/xQoNrGQ=", "owner": "nixos", "repo": "nixpkgs", - "rev": "33223d479ffde3d05ac16c6dff04ae43cc27e577", + "rev": "cdd2ef009676ac92b715ff26630164bb88fec4e0", "type": "github" }, "original": { "owner": "nixos", - "ref": "nixos-23.05", + "ref": "nixos-24.11", "repo": "nixpkgs", "type": "github" } }, "nixpkgs-unstable": { "locked": { - "lastModified": 1687502512, - "narHash": "sha256-dBL/01TayOSZYxtY4cMXuNCBk8UMLoqRZA+94xiFpJA=", + "lastModified": 1741851582, + "narHash": "sha256-cPfs8qMccim2RBgtKGF+x9IBCduRvd/N5F4nYpU0TVE=", "owner": "nixos", "repo": "nixpkgs", - "rev": "3ae20aa58a6c0d1ca95c9b11f59a2d12eebc511f", + "rev": "6607cf789e541e7873d40d3a8f7815ea92204f32", "type": "github" }, "original": { @@ -68,22 +50,6 @@ "type": "github" } }, - "nixpkgs_2": { - "locked": { - "lastModified": 1681358109, - "narHash": "sha256-eKyxW4OohHQx9Urxi7TQlFBTDWII+F+x2hklDOQPB50=", - "owner": "NixOS", - "repo": "nixpkgs", - "rev": "96ba1c52e54e74c3197f4d43026b3f3d92e83ff9", - "type": "github" - }, - "original": { - "owner": "NixOS", - "ref": "nixpkgs-unstable", - "repo": "nixpkgs", - "type": "github" - } - }, "root": { "inputs": { "flake-utils": "flake-utils", @@ -94,15 +60,16 @@ }, "rust-overlay": { "inputs": { - "flake-utils": "flake-utils_2", - "nixpkgs": "nixpkgs_2" + "nixpkgs": [ + "nixpkgs" + ] }, "locked": { - "lastModified": 1687660699, - "narHash": "sha256-crI/CA/OJc778I5qJhwhhl8/PKKzc0D7vvVxOtjfvSo=", + "lastModified": 1742005800, + "narHash": "sha256-6wuOGWkyW6R4A6Th9NMi6WK2jjddvZt7V2+rLPk6L3o=", "owner": "oxalica", "repo": "rust-overlay", - "rev": "b3bd1d49f1ae609c1d68a66bba7a95a9a4256031", + "rev": "028cd247a6375f83b94adc33d83676480fc9c294", "type": "github" }, "original": { @@ -125,21 +92,6 @@ "repo": "default", "type": "github" } - }, - "systems_2": { - "locked": { - "lastModified": 1681028828, - "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", - "owner": "nix-systems", - "repo": "default", - "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", - "type": "github" - }, - "original": { - "owner": "nix-systems", - "repo": "default", - "type": "github" - } } }, "root": "root", diff --git a/flake.nix b/flake.nix index 21e2442..9354115 100644 --- a/flake.nix +++ b/flake.nix @@ -2,9 +2,12 @@ description = "libFenrir"; inputs = { - nixpkgs.url = "github:nixos/nixpkgs/nixos-23.05"; + nixpkgs.url = "github:nixos/nixpkgs/nixos-24.11"; nixpkgs-unstable.url = "github:nixos/nixpkgs/nixos-unstable"; - rust-overlay.url = "github:oxalica/rust-overlay"; + rust-overlay = { + url = "github:oxalica/rust-overlay"; + inputs.nixpkgs.follows = "nixpkgs"; + }; flake-utils.url = "github:numtide/flake-utils"; }; @@ -18,35 +21,47 @@ pkgs-unstable = import nixpkgs-unstable { inherit system overlays; }; - RUST_VERSION="1.69.0"; + #RUST_VERSION="1.85.0"; + RUST_VERSION="2025-03-15"; in { devShells.default = pkgs.mkShell { + name = "libFenrir"; buildInputs = with pkgs; [ - git - gnupg - openssh - openssl - pkg-config - exa - fd - #(rust-bin.stable.latest.default.override { - # go with nightly to have async fn in traits - #(rust-bin.nightly."2023-02-01".default.override { - # #extensions = [ "rust-src" ]; - # #targets = [ "arm-unknown-linux-gnueabihf" ]; - #}) - clippy - cargo-watch - cargo-flamegraph - cargo-license - lld - rust-bin.stable.${RUST_VERSION}.default - rustfmt - rust-analyzer + # system deps + git + gnupg + openssh + openssl + pkg-config + fd + # rust deps + #(rust-bin.stable.latest.default.override { + # go with nightly to have async fn in traits + #(rust-bin.nightly."2023-02-01".default.override { + # #extensions = [ "rust-src" ]; + # #targets = [ "arm-unknown-linux-gnueabihf" ]; + #}) + clippy + cargo-watch + cargo-flamegraph + cargo-license + lld + #rust-bin.stable.${RUST_VERSION}.default + #rust-bin.beta.${RUST_VERSION}.default + rust-bin.nightly.${RUST_VERSION}.default + rustfmt + rust-analyzer + #clang_16 + #mold # fenrir deps - hwloc + hwloc ]; + # if you want to try the mold linker, add 'clang_16', 'mold', and append this to ~/.cargo/config.toml: + # [target.x86_64-unknown-linux-gnu] + # linker = "clang" + # rustflags = ["-C", "link-arg=--ld-path=mold"] + shellHook = '' # use zsh or other custom shell USER_SHELL="$(grep $USER /etc/passwd | cut -d ':' -f 7)" diff --git a/rustfmt.toml b/rustfmt.toml index 8008a3e..a562fa2 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,4 +1,4 @@ -edition = "2021" +edition = "2024" unstable_features = true format_strings = true max_width = 80 diff --git a/src/connection/handshake/tracker.rs b/src/connection/handshake/tracker.rs index fbedc34..ad4fd85 100644 --- a/src/connection/handshake/tracker.rs +++ b/src/connection/handshake/tracker.rs @@ -277,7 +277,7 @@ impl Tracker { use handshake::dirsync::DirSync; match handshake.data { handshake::Data::DirSync(ref mut ds) => match ds { - DirSync::Req(ref mut req) => { + &mut DirSync::Req(ref mut req) => { if !self.key_exchanges.contains(&req.exchange) { return Err(enc::Error::UnsupportedKeyExchange.into()); } @@ -298,21 +298,19 @@ impl Tracker { let ephemeral_key; match has_key { Some(s_k) => { - if let PrivKey::Exchange(ref k) = &s_k.key { + if let &PrivKey::Exchange(ref k) = &s_k.key { ephemeral_key = k; } else { unreachable!(); } } - None => { - return Err(handshake::Error::UnknownKeyID.into()) - } + None => return Err(Error::UnknownKeyID.into()), } let shared_key = match ephemeral_key .key_exchange(req.exchange, req.exchange_key) { Ok(shared_key) => shared_key, - Err(e) => return Err(handshake::Error::Key(e).into()), + Err(e) => return Err(Error::Key(e).into()), }; let hkdf = Hkdf::new(hkdf::Kind::Sha3, b"fenrir", shared_key); @@ -335,7 +333,7 @@ impl Tracker { req.data.deserialize_as_cleartext(cleartext)?; } Err(e) => { - return Err(handshake::Error::Key(e).into()); + return Err(Error::Key(e).into()); } } @@ -352,7 +350,7 @@ impl Tracker { "No such client key id: {:?}", resp.client_key_id ); - return Err(handshake::Error::UnknownKeyID.into()); + return Err(Error::UnknownKeyID.into()); } }; let cipher_recv = &hshake.connection.cipher_recv; @@ -371,7 +369,7 @@ impl Tracker { resp.data.deserialize_as_cleartext(&cleartext)?; } Err(e) => { - return Err(handshake::Error::Key(e).into()); + return Err(Error::Key(e).into()); } } let hshake = diff --git a/src/connection/mod.rs b/src/connection/mod.rs index e205e33..4b88d34 100644 --- a/src/connection/mod.rs +++ b/src/connection/mod.rs @@ -31,7 +31,7 @@ use crate::{ pub enum Error { /// Can't decrypt packet #[error("Decrypt error: {0}")] - Decrypt(#[from] crate::enc::Error), + Decrypt(#[from] enc::Error), /// Error in parsing a packet realated to the connection #[error("Chunk parsing: {0}")] Parse(#[from] stream::Error), @@ -321,7 +321,8 @@ impl Connection { Some(stream) => stream, None => continue, }; - ret.push((stream_id, stream.get())); + let data = stream.get(); // FIXME + ret.push((stream_id, data.1)); } Some(ret) } @@ -330,7 +331,7 @@ impl Connection { mut udp: crate::RawUdp, ) -> Result { let mut data = &mut udp.data[ID::len()..]; - let aad = enc::sym::AAD(&[]); + let aad = sym::AAD(&[]); self.cipher_recv.decrypt(aad, &mut data)?; let mut bytes_parsed = 0; let mut chunks = Vec::with_capacity(2); diff --git a/src/connection/stream/errors.rs b/src/connection/stream/errors.rs index 07dcbd8..8efdc49 100644 --- a/src/connection/stream/errors.rs +++ b/src/connection/stream/errors.rs @@ -9,4 +9,10 @@ pub enum Error { /// Sequence outside of the window #[error("Sequence out of the sliding window")] OutOfWindow, + /// Wrong start/end flags received, can't reconstruct data + #[error("Wrong start/end flags received")] + WrongFlags, + /// Can't reconstruct the data + #[error("Error in reconstructing the bytestream/datagrams")] + Reconstructing, } diff --git a/src/connection/stream/mod.rs b/src/connection/stream/mod.rs index 44ac3be..c63c3c3 100644 --- a/src/connection/stream/mod.rs +++ b/src/connection/stream/mod.rs @@ -4,9 +4,17 @@ mod errors; mod rob; +mod uud; +mod uudl; pub use errors::Error; -use crate::{connection::stream::rob::ReliableOrderedBytestream, enc::Random}; +use crate::{ + connection::stream::{ + rob::ReliableOrderedBytestream, uud::UnreliableUnorderedDatagram, + uudl::UnreliableUnorderedDatagramLimited, + }, + enc::Random, +}; /// Kind of stream. any combination of: /// reliable/unreliable ordered/unordered, bytestream/datagram @@ -16,6 +24,9 @@ pub enum Kind { /// ROB: Reliable, Ordered, Bytestream /// AKA: TCP-like ROB = 0, + /// UUDL: Unreliable, Unordered, Datagram Limited + /// Aka: UDP-like. Data limited to the packet size + UUDL, } /// Id of the stream @@ -52,28 +63,49 @@ impl ChunkLen { #[derive(Debug, Copy, Clone)] pub(crate) struct SequenceStart(pub(crate) Sequence); impl SequenceStart { - pub(crate) fn plus_u32(&self, other: u32) -> Sequence { - self.0.plus_u32(other) - } pub(crate) fn offset(&self, seq: Sequence) -> usize { - if self.0 .0 <= seq.0 { - (seq.0 - self.0 .0).0 as usize + if self.0.0 <= seq.0 { + (seq.0 - self.0.0).0 as usize } else { - (seq.0 + (Sequence::max().0 - self.0 .0)).0 as usize + (seq.0 + (Sequence::max().0 - self.0.0)).0 as usize } } } -// SequenceEnd is INCLUSIVE -#[derive(Debug, Copy, Clone)] -pub(crate) struct SequenceEnd(pub(crate) Sequence); -impl SequenceEnd { - pub(crate) fn plus_u32(&self, other: u32) -> Sequence { - self.0.plus_u32(other) + +impl ::core::ops::Add for SequenceStart { + type Output = SequenceStart; + fn add(self, other: u32) -> SequenceStart { + SequenceStart(self.0 + other) } } -/// Sequence number to rebuild the stream correctly +impl ::core::ops::AddAssign for SequenceStart { + fn add_assign(&mut self, other: u32) { + self.0 += other; + } +} + +// SequenceEnd is INCLUSIVE #[derive(Debug, Copy, Clone)] +pub(crate) struct SequenceEnd(pub(crate) Sequence); + +impl ::core::ops::Add for SequenceEnd { + type Output = SequenceEnd; + fn add(self, other: u32) -> SequenceEnd { + SequenceEnd(self.0 + other) + } +} + +impl ::core::ops::AddAssign for SequenceEnd { + fn add_assign(&mut self, other: u32) { + self.0 += other; + } +} + +// TODO: how to tell the compiler we don't use the two most significant bits? +// maybe NonZero + always using 2nd most significant bit? +/// Sequence number to rebuild the stream correctly +#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)] pub struct Sequence(pub ::core::num::Wrapping); impl Sequence { @@ -90,6 +122,10 @@ impl Sequence { 4 } /// Maximum possible sequence + pub const fn min() -> Self { + Self(::core::num::Wrapping(0)) + } + /// Maximum possible sequence pub const fn max() -> Self { Self(::core::num::Wrapping(Self::SEQ_NOFLAG)) } @@ -98,27 +134,48 @@ impl Sequence { start: SequenceStart, end: SequenceEnd, ) -> bool { - if start.0 .0 < end.0 .0 { - start.0 .0 <= self.0 && self.0 <= end.0 .0 + if start.0 < end.0 { + start.0.0 <= self.0 && self.0 <= end.0.0 } else { - start.0 .0 <= self.0 || self.0 <= end.0 .0 + start.0.0 <= self.0 || self.0 <= end.0.0 } } + pub(crate) fn cmp_in_window( + &self, + window_start: SequenceStart, + compare: Sequence, + ) -> ::core::cmp::Ordering { + let offset_self = self.0 - window_start.0.0; + let offset_compare = compare.0 - window_start.0.0; + return offset_self.cmp(&offset_compare); + } pub(crate) fn remaining_window(&self, end: SequenceEnd) -> u32 { - if self.0 <= end.0 .0 { - (end.0 .0 .0 - self.0 .0) + 1 + if self.0 <= end.0.0 { + (end.0.0.0 - self.0.0) + 1 } else { - end.0 .0 .0 + 1 + (Self::max().0 - self.0).0 + end.0.0.0 + 1 + (Self::max().0 - self.0).0 } } - pub(crate) fn plus_u32(self, other: u32) -> Self { + pub(crate) fn diff_from(self, other: Sequence) -> u32 { + assert!( + self.0.0 > other.0.0, + "Sequence::diff_from inverted parameters" + ); + self.0.0 - other.0.0 + } +} + +impl ::core::ops::Sub for Sequence { + type Output = Self; + + fn sub(self, other: u32) -> Self { Self(::core::num::Wrapping( - (self.0 .0 + other) & Self::SEQ_NOFLAG, + (self.0 - ::core::num::Wrapping::(other)).0 & Self::SEQ_NOFLAG, )) } } -impl ::core::ops::Add for Sequence { +impl ::core::ops::Add for Sequence { type Output = Self; fn add(self, other: Self) -> Self { @@ -128,10 +185,24 @@ impl ::core::ops::Add for Sequence { } } +impl ::core::ops::Add for Sequence { + type Output = Sequence; + fn add(self, other: u32) -> Sequence { + Sequence(self.0 + ::core::num::Wrapping::(other)) + } +} + +impl ::core::ops::AddAssign for Sequence { + fn add_assign(&mut self, other: u32) { + self.0 += ::core::num::Wrapping::(other); + } +} + /// Chunk of data representing a stream /// Every chunk is as follows: /// | id (2 bytes) | length (2 bytes) | /// | flag_start (1 BIT) | flag_end (1 BIT) | sequence (30 bits) | +/// | ...data... | #[derive(Debug, Clone)] pub struct Chunk<'a> { /// Id of the stream this chunk is part of @@ -203,7 +274,7 @@ impl<'a> Chunk<'a> { let bytes = bytes_next; bytes_next = bytes_next + Sequence::len(); raw_out[bytes..bytes_next] - .copy_from_slice(&self.sequence.0 .0.to_le_bytes()); + .copy_from_slice(&self.sequence.0.0.to_le_bytes()); let mut flag_byte = raw_out[bytes] & Self::FLAGS_EXCLUDED_BITMASK; if self.flag_start { flag_byte = flag_byte | Self::FLAG_START_BITMASK; @@ -223,17 +294,21 @@ impl<'a> Chunk<'a> { /// differences from Kind: /// * not public /// * has actual data -#[derive(Debug, Clone)] +#[derive(Debug)] pub(crate) enum Tracker { /// ROB: Reliable, Ordered, Bytestream /// AKA: TCP-like ROB(ReliableOrderedBytestream), + UUDL(UnreliableUnorderedDatagramLimited), } impl Tracker { pub(crate) fn new(kind: Kind, rand: &Random) -> Self { match kind { Kind::ROB => Tracker::ROB(ReliableOrderedBytestream::new(rand)), + Kind::UUDL => { + Tracker::UUDL(UnreliableUnorderedDatagramLimited::new()) + } } } } @@ -259,7 +334,7 @@ impl ::core::ops::BitOr for StreamData { } /// Actual stream-tracking structure -#[derive(Debug, Clone)] +#[derive(Debug)] pub(crate) struct Stream { id: ID, data: Tracker, @@ -277,11 +352,16 @@ impl Stream { pub(crate) fn recv(&mut self, chunk: Chunk) -> Result { match &mut self.data { Tracker::ROB(tracker) => tracker.recv(chunk), + Tracker::UUDL(tracker) => tracker.recv(chunk), } } - pub(crate) fn get(&mut self) -> Vec { + pub(crate) fn get(&mut self) -> (SequenceStart, Vec) { match &mut self.data { - Tracker::ROB(tracker) => tracker.get(), + // FIXME + Tracker::ROB(tracker) => { + (SequenceStart(Sequence::min()), tracker.get()) + } + Tracker::UUDL(tracker) => tracker.get(), } } } diff --git a/src/connection/stream/rob/mod.rs b/src/connection/stream/rob/mod.rs index 21361bb..901dc69 100644 --- a/src/connection/stream/rob/mod.rs +++ b/src/connection/stream/rob/mod.rs @@ -26,7 +26,7 @@ impl ReliableOrderedBytestream { pub(crate) fn new(rand: &Random) -> Self { let window_len = 1048576; // 1MB. should be enough for anybody. (lol) let window_start = SequenceStart(Sequence::new(rand)); - let window_end = SequenceEnd(window_start.0.plus_u32(window_len - 1)); + let window_end = SequenceEnd(window_start.0 +(window_len - 1)); let mut data = Vec::with_capacity(window_len as usize); data.resize(data.capacity(), 0); @@ -44,9 +44,9 @@ impl ReliableOrderedBytestream { "Max window size is {}", Sequence::max().0 .0 ); - let window_len = size; // 1MB. should be enough for anybody. (lol) + let window_len = size; let window_start = SequenceStart(Sequence::new(rand)); - let window_end = SequenceEnd(window_start.0.plus_u32(window_len - 1)); + let window_end = SequenceEnd(window_start.0 +(window_len - 1)); let mut data = Vec::with_capacity(window_len as usize); data.resize(data.capacity(), 0); @@ -68,20 +68,20 @@ impl ReliableOrderedBytestream { ret.extend_from_slice(first); ret.extend_from_slice(second); self.window_start = - SequenceStart(self.window_start.plus_u32(ret.len() as u32)); + self.window_start + (ret.len() as u32); self.window_end = - SequenceEnd(self.window_end.plus_u32(ret.len() as u32)); + self.window_end + (ret.len() as u32); self.data.clear(); return ret; } let data_len = self.window_start.offset(self.missing[0].0); let last_missing_idx = self.missing.len() - 1; let mut last_missing = &mut self.missing[last_missing_idx]; - last_missing.1 = last_missing.1.plus_u32(data_len as u32); + last_missing.1 = last_missing.1 + (data_len as u32); self.window_start = - SequenceStart(self.window_start.plus_u32(data_len as u32)); + self.window_start + (data_len as u32); self.window_end = - SequenceEnd(self.window_end.plus_u32(data_len as u32)); + self.window_end + (data_len as u32); let mut ret = Vec::with_capacity(data_len); let (first, second) = self.data[..].split_at(self.pivot as usize); @@ -141,25 +141,24 @@ impl ReliableOrderedBytestream { // [....chunk....] // [...missing...] copy_ranges.push((missing_from, offset_end)); - el.0 = - el.0.plus_u32(((offset_end - missing_from) + 1) as u32); + el.0 +=((offset_end - missing_from) + 1) as u32; } } else if missing_from < offset { if missing_to > offset_end { // [..chunk..] // [....missing....] to_add.push(( - el.0.plus_u32(((offset_end - missing_from) + 1) as u32), + el.0 + (((offset_end - missing_from) + 1) as u32), el.1, )); - el.1 = el.0.plus_u32(((offset - missing_from) - 1) as u32); + el.1 = el.0 + (((offset - missing_from) - 1) as u32); copy_ranges.push((offset, offset_end)); } else if offset <= missing_to { // [....chunk....] // [...missing...] copy_ranges.push((offset, (missing_to - 0))); el.1 = - el.0.plus_u32(((offset_end - missing_from) - 1) as u32); + el.0 + (((offset_end - missing_from) - 1) as u32); } } } diff --git a/src/connection/stream/rob/tests.rs b/src/connection/stream/rob/tests.rs index 20cb508..f599d09 100644 --- a/src/connection/stream/rob/tests.rs +++ b/src/connection/stream/rob/tests.rs @@ -36,7 +36,7 @@ fn test_stream_rob_sequential() { id: stream::ID(42), flag_start: false, flag_end: true, - sequence: start.plus_u32(512), + sequence: start + 512, data: &data[512..], }; let _ = rob.recv(chunk); @@ -77,7 +77,7 @@ fn test_stream_rob_retransmit() { id: stream::ID(42), flag_start: false, flag_end: false, - sequence: start.plus_u32(50), + sequence: start +50, data: &data[50..60], }; let _ = rob.recv(chunk); @@ -85,7 +85,7 @@ fn test_stream_rob_retransmit() { id: stream::ID(42), flag_start: false, flag_end: false, - sequence: start.plus_u32(40), + sequence: start + 40, data: &data[40..60], }; let _ = rob.recv(chunk); @@ -93,7 +93,7 @@ fn test_stream_rob_retransmit() { id: stream::ID(42), flag_start: false, flag_end: false, - sequence: start.plus_u32(80), + sequence: start + 80, data: &data[80..], }; let _ = rob.recv(chunk); @@ -101,7 +101,7 @@ fn test_stream_rob_retransmit() { id: stream::ID(42), flag_start: false, flag_end: false, - sequence: start.plus_u32(50), + sequence: start + 50, data: &data[50..90], }; let _ = rob.recv(chunk); @@ -109,7 +109,7 @@ fn test_stream_rob_retransmit() { id: stream::ID(42), flag_start: false, flag_end: false, - sequence: start.plus_u32(max_window as u32), + sequence: start +(max_window as u32), data: &data[max_window..], }; let _ = rob.recv(chunk); @@ -117,7 +117,7 @@ fn test_stream_rob_retransmit() { id: stream::ID(42), flag_start: false, flag_end: true, - sequence: start.plus_u32(90), + sequence: start +90, data: &data[90..max_window], }; let _ = rob.recv(chunk); @@ -157,7 +157,7 @@ fn test_stream_rob_rolling() { id: stream::ID(42), flag_start: true, flag_end: false, - sequence: start.plus_u32(50), + sequence: start + 50, data: &data[50..100], }; let _ = rob.recv(chunk); @@ -172,7 +172,7 @@ fn test_stream_rob_rolling() { id: stream::ID(42), flag_start: true, flag_end: false, - sequence: start.plus_u32(40), + sequence: start + 40, data: &data[40..], }; let _ = rob.recv(chunk); @@ -212,7 +212,7 @@ fn test_stream_rob_rolling_second_case() { id: stream::ID(42), flag_start: true, flag_end: false, - sequence: start.plus_u32(50), + sequence: start + 50, data: &data[50..100], }; let _ = rob.recv(chunk); @@ -227,7 +227,7 @@ fn test_stream_rob_rolling_second_case() { id: stream::ID(42), flag_start: true, flag_end: false, - sequence: start.plus_u32(40), + sequence: start + 40, data: &data[40..100], }; let _ = rob.recv(chunk); @@ -235,7 +235,7 @@ fn test_stream_rob_rolling_second_case() { id: stream::ID(42), flag_start: true, flag_end: false, - sequence: start.plus_u32(100), + sequence: start + 100, data: &data[100..], }; let _ = rob.recv(chunk); diff --git a/src/connection/stream/uud/mod.rs b/src/connection/stream/uud/mod.rs new file mode 100644 index 0000000..f14e1f7 --- /dev/null +++ b/src/connection/stream/uud/mod.rs @@ -0,0 +1,557 @@ +//! Implementation of the Unreliable, unordered, Datagram transmission model +//! +//! AKA: UDP-like, but the datagram can cross the packet-size (MTU) limit. +//! +//! Only fully received datagrams will be delivered to the user, and +//! half-received ones will be discarded after a timeout + +use crate::{ + connection::stream::{ + Chunk, Error, Sequence, SequenceEnd, SequenceStart, StreamData, + }, + enc::Random, +}; + +use ::std::collections::{BTreeMap, VecDeque}; + +#[cfg(test)] +mod tests; + +#[derive(Debug, PartialEq, Eq)] +enum Fragment { + Start((Sequence, Sequence)), + Middle((Sequence, Sequence)), + End((Sequence, Sequence)), + Full((Sequence, Sequence)), + Delivered((Sequence, Sequence)), +} + +impl Fragment { + // FIXME: sequence start/end? + fn get_seqs(&self) -> (Sequence, Sequence) { + match self { + Fragment::Start((f, t)) + | Fragment::Middle((f, t)) + | Fragment::End((f, t)) + | Fragment::Full((f, t)) + | Fragment::Delivered((f, t)) => (*f, *t), + } + } + fn is_start(&self) -> bool { + match self { + Fragment::Start(_) | Fragment::Full(_) | Fragment::Delivered(_) => { + true + } + Fragment::End(_) | Fragment::Middle(_) => false, + } + } + fn is_end(&self) -> bool { + match self { + Fragment::End(_) | Fragment::Full(_) | Fragment::Delivered(_) => { + true + } + Fragment::Start(_) | Fragment::Middle(_) => false, + } + } +} +/* +impl ::std::cmp::PartialEq for Fragment { + fn eq(&self, other: &Sequence) -> bool { + self.get_seq() == *other + } +} + +impl ::std::cmp::PartialOrd for Fragment { + fn partial_cmp(&self, other: &Sequence) -> Option<::std::cmp::Ordering> { + Some(self.get_seq().cmp(other)) + } +} + +impl ::std::cmp::PartialOrd for Fragment { + fn partial_cmp(&self, other: &Fragment) -> Option<::std::cmp::Ordering> { + Some(self.get_seq().cmp(&other.get_seq())) + } +} +impl Ord for Fragment { + fn cmp(&self, other: &Fragment) -> ::std::cmp::Ordering { + self.get_seq().cmp(&other.get_seq()) + } +} +*/ + +type Timer = u64; + +pub(crate) struct Uud { + pub(crate) window_start: SequenceStart, + window_end: SequenceEnd, + pivot: u32, + data: Vec, + track: VecDeque<(Fragment, Timer)>, +} + +impl Uud { + pub(crate) fn new(rand: &Random) -> Self { + let window_len = 1048576; // 1MB. should be enough for anybody. (lol) + let window_start = SequenceStart(Sequence::new(rand)); + let window_end = SequenceEnd(window_start.0 + (window_len - 1)); + let mut data = Vec::with_capacity(window_len as usize); + data.resize(data.capacity(), 0); + + Self { + window_start, + window_end, + pivot: window_len, + data, + track: VecDeque::with_capacity(4), + } + } + pub(crate) fn with_window_size(rand: &Random, size: u32) -> Self { + assert!( + size < Sequence::max().0.0, + "Max window size is {}", + Sequence::max().0.0 + ); + let window_len = size; + let window_start = SequenceStart(Sequence::new(rand)); + let window_end = SequenceEnd(window_start.0 + (window_len - 1)); + let mut data = Vec::with_capacity(window_len as usize); + data.resize(data.capacity(), 0); + + Self { + window_start, + window_end, + pivot: window_len, + data, + track: VecDeque::with_capacity(4), + } + } + pub(crate) fn window_size(&self) -> u32 { + self.data.len() as u32 + } + pub(crate) fn recv(&mut self, chunk: Chunk) -> Result { + let chunk_to = chunk.sequence + chunk.data.len() as u32; + if !chunk + .sequence + .is_between(self.window_start, self.window_end) + { + return Err(Error::OutOfWindow); + } + + // make sure we consider only the bytes inside the sliding window + let maxlen = ::std::cmp::min( + chunk.sequence.remaining_window(self.window_end) as usize, + chunk.data.len(), + ); + if maxlen == 0 { + // empty window or empty chunk, but we don't care + return Err(Error::OutOfWindow); + } + let chunk_flag_end: bool; + if maxlen != chunk.data.len() { + // we are not considering the full chunk, so + // make sure the end flag is not set + chunk_flag_end = false; + + // FIXME: what happens if we "truncate" this chunk now, + // then we have more space in the window + // then we receive the same packet again? + } else { + chunk_flag_end = chunk.flag_end; + } + // translate Sequences to offsets in self.data + let data = &chunk.data[..maxlen]; + let chunk_to = chunk.sequence + data.len() as u32; + let mut last_usable = self.window_end.0; + let mut ret = StreamData::NotReady; + let mut copy_data_idx_from = 0; + // FIXME: and on receiving first fragment after the second, or empty + // track? + for (idx, (fragment, _)) in self.track.iter_mut().enumerate().rev() { + let (from, to) = fragment.get_seqs(); + let to_next = to + 1; + match to_next.cmp_in_window(self.window_start, chunk.sequence) { + ::core::cmp::Ordering::Equal => { + // `chunk` is immediately after `fragment` + if !chunk_to.is_between( + SequenceStart(to_next), + SequenceEnd(last_usable), + ) { + return Err(Error::Reconstructing); + } + match fragment { + Fragment::Start((_, f_end)) => { + if chunk.flag_start { + // we can't start a datagram twice. + // ignore the data + return Err(Error::WrongFlags); + } + if chunk_flag_end { + *fragment = Fragment::Full(( + from, + to + (data.len() as u32), + )); + ret = StreamData::Ready; + } else { + *f_end += data.len() as u32; + } + copy_data_idx_from = + chunk.sequence.diff_from(self.window_start.0) + as usize; + } + Fragment::Middle((_, f_end)) => { + if chunk.flag_start { + // we can't start a datagram twice. + // ignore the data + return Err(Error::WrongFlags); + } + if chunk_flag_end { + *fragment = Fragment::End(( + from, + to + (data.len() as u32), + )); + } else { + *f_end += data.len() as u32; + } + copy_data_idx_from = + chunk.sequence.diff_from(self.window_start.0) + as usize; + } + Fragment::End(_) + | Fragment::Full(_) + | Fragment::Delivered(_) => { + if !chunk.flag_start { + return Err(Error::WrongFlags); + } + let toinsert = if chunk_flag_end { + ret = StreamData::Ready; + Fragment::Full((chunk.sequence, chunk_to)) + } else { + Fragment::Start((chunk.sequence, chunk_to)) + }; + self.track.insert(idx + 1, (toinsert, 0)); + copy_data_idx_from = + chunk.sequence.diff_from(self.window_start.0) + as usize; + } + } + break; + } + ::core::cmp::Ordering::Less => { + // there is a data hole between `chunk` and `fragment` + + if !chunk_to.is_between( + SequenceStart(to_next), + SequenceEnd(last_usable), + ) { + return Err(Error::Reconstructing); + } + let toinsert = if chunk.flag_start { + if chunk_flag_end { + ret = StreamData::Ready; + Fragment::Full((chunk.sequence, chunk_to)) + } else { + Fragment::Start((chunk.sequence, chunk_to)) + } + } else { + if chunk_flag_end { + Fragment::End((chunk.sequence, chunk_to)) + } else { + Fragment::Middle((chunk.sequence, chunk_to)) + } + }; + self.track.insert(idx + 1, (toinsert, 0)); + copy_data_idx_from = + chunk.sequence.diff_from(self.window_start.0) as usize; + break; + } + ::core::cmp::Ordering::Greater => { + // to_next > chunk.sequence + // `fragment` is too new, need to look at older ones + + if from.cmp_in_window(self.window_start, chunk.sequence) + != ::core::cmp::Ordering::Greater + { + // to_next > chunk.sequence >= from + // overlapping not yet allowed + return Err(Error::Reconstructing); + } + if idx == 0 { + // check if we can add before everything + if chunk_to == from { + if fragment.is_start() { + if chunk_flag_end { + // add, don't merge + } else { + //fragment.start, but !chunk.end + return Err(Error::WrongFlags); + } + } else { + if chunk_flag_end { + //chunk.end but !fragment.start + return Err(Error::WrongFlags); + } else { + if chunk.flag_start { + if fragment.is_end() { + *fragment = Fragment::Full(( + chunk.sequence, + to, + )); + ret = StreamData::Ready; + } else { + *fragment = Fragment::Start(( + chunk.sequence, + to, + )); + } + } else { + if fragment.is_end() { + *fragment = Fragment::End(( + chunk.sequence, + to, + )); + } else { + *fragment = Fragment::Middle(( + chunk.sequence, + to, + )); + } + } + } + } + copy_data_idx_from = + chunk.sequence.diff_from(self.window_start.0) + as usize; + break; + } + // chunk before fragment + let toinsert = if chunk.flag_start { + if chunk_flag_end { + ret = StreamData::Ready; + Fragment::Full((chunk.sequence, chunk_to)) + } else { + Fragment::Start((chunk.sequence, chunk_to)) + } + } else { + if chunk_flag_end { + Fragment::End((chunk.sequence, chunk_to)) + } else { + Fragment::Middle((chunk.sequence, chunk_to)) + } + }; + self.track.insert(0, (toinsert, 0)); + copy_data_idx_from = + chunk.sequence.diff_from(self.window_start.0) + as usize; + break; + } + last_usable = from - 1; + } + } + } + let data_idx_from = + (copy_data_idx_from + self.pivot as usize) % self.data.len(); + let data_idx_to = (data_idx_from + data.len()) % self.data.len(); + if data_idx_from < data_idx_to { + self.data[data_idx_from..data_idx_to].copy_from_slice(&data); + } else { + let data_pivot = self.data.len() - data_idx_from; + let (first, second) = data.split_at(data_pivot); + self.data[data_idx_from..].copy_from_slice(&first); + self.data[..data_idx_to].copy_from_slice(&data); + } + Ok(ret) + } +} + +/// Copy of ROB for reference +#[derive(Debug, Clone)] +pub(crate) struct UnreliableUnorderedDatagram { + pub(crate) window_start: SequenceStart, + window_end: SequenceEnd, + pivot: u32, + data: Vec, + missing: Vec<(Sequence, Sequence)>, +} + +impl UnreliableUnorderedDatagram { + pub(crate) fn new(rand: &Random) -> Self { + let window_len = 1048576; // 1MB. should be enough for anybody. (lol) + let window_start = SequenceStart(Sequence::new(rand)); + let window_end = SequenceEnd(window_start.0 + (window_len - 1)); + let mut data = Vec::with_capacity(window_len as usize); + data.resize(data.capacity(), 0); + + Self { + window_start, + window_end, + pivot: window_len, + data, + missing: [(window_start.0, window_end.0)].to_vec(), + } + } + pub(crate) fn with_window_size(rand: &Random, size: u32) -> Self { + assert!( + size < Sequence::max().0.0, + "Max window size is {}", + Sequence::max().0.0 + ); + let window_len = size; // 1MB. should be enough for anybody. (lol) + let window_start = SequenceStart(Sequence::new(rand)); + let window_end = SequenceEnd(window_start.0 + (window_len - 1)); + let mut data = Vec::with_capacity(window_len as usize); + data.resize(data.capacity(), 0); + + Self { + window_start, + window_end, + pivot: window_len, + data, + missing: [(window_start.0, window_end.0)].to_vec(), + } + } + pub(crate) fn window_size(&self) -> u32 { + self.data.len() as u32 + } + pub(crate) fn get(&mut self) -> Vec { + if self.missing.len() == 0 { + let (first, second) = self.data.split_at(self.pivot as usize); + let mut ret = Vec::with_capacity(self.data.len()); + ret.extend_from_slice(first); + ret.extend_from_slice(second); + self.window_start += ret.len() as u32; + self.window_end = SequenceEnd(Sequence( + ::core::num::Wrapping::(ret.len() as u32), + )); + self.data.clear(); + return ret; + } + let data_len = self.window_start.offset(self.missing[0].0); + let last_missing_idx = self.missing.len() - 1; + let mut last_missing = &mut self.missing[last_missing_idx]; + last_missing.1 += data_len as u32; + self.window_start += data_len as u32; + self.window_end += data_len as u32; + + let mut ret = Vec::with_capacity(data_len); + let (first, second) = self.data[..].split_at(self.pivot as usize); + let first_len = ::core::cmp::min(data_len, first.len()); + let second_len = data_len - first_len; + + ret.extend_from_slice(&first[..first_len]); + ret.extend_from_slice(&second[..second_len]); + + self.pivot = + ((self.pivot as usize + data_len) % self.data.len()) as u32; + ret + } + pub(crate) fn recv(&mut self, chunk: Chunk) -> Result { + if !chunk + .sequence + .is_between(self.window_start, self.window_end) + { + return Err(Error::OutOfWindow); + } + // make sure we consider only the bytes inside the sliding window + let maxlen = ::std::cmp::min( + chunk.sequence.remaining_window(self.window_end) as usize, + chunk.data.len(), + ); + if maxlen == 0 { + // empty window or empty chunk, but we don't care + return Err(Error::OutOfWindow); + } + // translate Sequences to offsets in self.data + let data = &chunk.data[..maxlen]; + let offset = self.window_start.offset(chunk.sequence); + let offset_end = offset + chunk.data.len() - 1; + + // Find the chunks we are missing that we can copy, + // and fix the missing tracker + let mut copy_ranges = Vec::new(); + let mut to_delete = Vec::new(); + let mut to_add = Vec::new(); + // note: the ranges are (INCLUSIVE, INCLUSIVE) + for (idx, el) in self.missing.iter_mut().enumerate() { + let missing_from = self.window_start.offset(el.0); + if missing_from > offset_end { + break; + } + let missing_to = self.window_start.offset(el.1); + if missing_to < offset { + continue; + } + if missing_from >= offset && missing_from <= offset_end { + if missing_to <= offset_end { + // [.....chunk.....] + // [..missing..] + to_delete.push(idx); + copy_ranges.push((missing_from, missing_to)); + } else { + // [....chunk....] + // [...missing...] + copy_ranges.push((missing_from, offset_end)); + el.0 += ((offset_end - missing_from) + 1) as u32; + } + } else if missing_from < offset { + if missing_to > offset_end { + // [..chunk..] + // [....missing....] + to_add.push(( + el.0 + (((offset_end - missing_from) + 1) as u32), + el.1, + )); + el.1 = el.0 + (((offset - missing_from) - 1) as u32); + copy_ranges.push((offset, offset_end)); + } else if offset <= missing_to { + // [....chunk....] + // [...missing...] + copy_ranges.push((offset, (missing_to - 0))); + el.1 = el.0 + (((offset_end - missing_from) - 1) as u32); + } + } + } + { + let mut deleted = 0; + for idx in to_delete.into_iter() { + self.missing.remove(idx + deleted); + deleted = deleted + 1; + } + } + self.missing.append(&mut to_add); + self.missing + .sort_by(|(from_a, _), (from_b, _)| from_a.0.0.cmp(&from_b.0.0)); + + // copy only the missing data + let (first, second) = self.data[..].split_at_mut(self.pivot as usize); + for (from, to) in copy_ranges.into_iter() { + let to = to + 1; + if from <= first.len() { + let first_from = from; + let first_to = ::core::cmp::min(first.len(), to); + let data_first_from = from - offset; + let data_first_to = first_to - offset; + first[first_from..first_to] + .copy_from_slice(&data[data_first_from..data_first_to]); + + let second_to = to - first_to; + let data_second_to = data_first_to + second_to; + second[..second_to] + .copy_from_slice(&data[data_first_to..data_second_to]); + } else { + let second_from = from - first.len(); + let second_to = to - first.len(); + let data_from = from - offset; + let data_to = to - offset; + second[second_from..second_to] + .copy_from_slice(&data[data_from..data_to]); + } + } + if self.missing.len() == 0 + || self.window_start.offset(self.missing[0].0) == 0 + { + Ok(StreamData::Ready) + } else { + Ok(StreamData::NotReady) + } + } +} diff --git a/src/connection/stream/uud/tests.rs b/src/connection/stream/uud/tests.rs new file mode 100644 index 0000000..634853e --- /dev/null +++ b/src/connection/stream/uud/tests.rs @@ -0,0 +1,249 @@ +use crate::{ + connection::stream::{self, Chunk, uud::*}, + enc::Random, +}; + +#[::tracing_test::traced_test] +#[test] +fn test_stream_uud_sequential() { + let rand = Random::new(); + let mut uud = UnreliableUnorderedDatagram::with_window_size(&rand, 1048576); + + let mut data = Vec::with_capacity(1024); + data.resize(data.capacity(), 0); + rand.fill(&mut data[..]); + + let start = uud.window_start.0; + + let chunk = Chunk { + id: stream::ID(42), + flag_start: true, + flag_end: false, + sequence: start, + data: &data[..512], + }; + let got = uud.get(); + assert!(&got[..] == &[], "uud: got data?"); + let _ = uud.recv(chunk); + let got = uud.get(); + assert!( + &data[..512] == &got[..], + "UUD1: DIFF: {:?} {:?}", + &data[..512].len(), + &got[..].len() + ); + let chunk = Chunk { + id: stream::ID(42), + flag_start: false, + flag_end: true, + sequence: start + 512, + data: &data[512..], + }; + let _ = uud.recv(chunk); + let got = uud.get(); + assert!( + &data[512..] == &got[..], + "UUD2: DIFF: {:?} {:?}", + &data[512..].len(), + &got[..].len() + ); +} + +#[::tracing_test::traced_test] +#[test] +fn test_stream_uud_retransmit() { + let rand = Random::new(); + let max_window: usize = 100; + let mut uud = + UnreliableUnorderedDatagram::with_window_size(&rand, max_window as u32); + + let mut data = Vec::with_capacity(120); + data.resize(data.capacity(), 0); + for i in 0..data.len() { + data[i] = i as u8; + } + + let start = uud.window_start.0; + + let chunk = Chunk { + id: stream::ID(42), + flag_start: true, + flag_end: false, + sequence: start, + data: &data[..40], + }; + let _ = uud.recv(chunk); + let chunk = Chunk { + id: stream::ID(42), + flag_start: false, + flag_end: false, + sequence: start + 50, + data: &data[50..60], + }; + let _ = uud.recv(chunk); + let chunk = Chunk { + id: stream::ID(42), + flag_start: false, + flag_end: false, + sequence: start + 40, + data: &data[40..60], + }; + let _ = uud.recv(chunk); + let chunk = Chunk { + id: stream::ID(42), + flag_start: false, + flag_end: false, + sequence: start + 80, + data: &data[80..], + }; + let _ = uud.recv(chunk); + let chunk = Chunk { + id: stream::ID(42), + flag_start: false, + flag_end: false, + sequence: start + 50, + data: &data[50..90], + }; + let _ = uud.recv(chunk); + let chunk = Chunk { + id: stream::ID(42), + flag_start: false, + flag_end: false, + sequence: start + (max_window as u32), + data: &data[max_window..], + }; + let _ = uud.recv(chunk); + let chunk = Chunk { + id: stream::ID(42), + flag_start: false, + flag_end: true, + sequence: start + 90, + data: &data[90..max_window], + }; + let _ = uud.recv(chunk); + let got = uud.get(); + assert!( + &data[..max_window] == &got[..], + "DIFF:\n {:?}\n {:?}", + &data[..max_window], + &got[..], + ); +} +#[::tracing_test::traced_test] +#[test] +fn test_stream_uud_rolling() { + let rand = Random::new(); + let max_window: usize = 100; + let mut uud = + UnreliableUnorderedDatagram::with_window_size(&rand, max_window as u32); + + let mut data = Vec::with_capacity(120); + data.resize(data.capacity(), 0); + for i in 0..data.len() { + data[i] = i as u8; + } + + let start = uud.window_start.0; + + let chunk = Chunk { + id: stream::ID(42), + flag_start: true, + flag_end: false, + sequence: start, + data: &data[..40], + }; + let _ = uud.recv(chunk); + let chunk = Chunk { + id: stream::ID(42), + flag_start: true, + flag_end: false, + sequence: start + 50, + data: &data[50..100], + }; + let _ = uud.recv(chunk); + let got = uud.get(); + assert!( + &data[..40] == &got[..], + "DIFF:\n {:?}\n {:?}", + &data[..40], + &got[..], + ); + let chunk = Chunk { + id: stream::ID(42), + flag_start: true, + flag_end: false, + sequence: start + 40, + data: &data[40..], + }; + let _ = uud.recv(chunk); + let got = uud.get(); + assert!( + &data[40..] == &got[..], + "DIFF:\n {:?}\n {:?}", + &data[40..], + &got[..], + ); +} +#[::tracing_test::traced_test] +#[test] +fn test_stream_uud_rolling_second_case() { + let rand = Random::new(); + let max_window: usize = 100; + let mut uud = + UnreliableUnorderedDatagram::with_window_size(&rand, max_window as u32); + + let mut data = Vec::with_capacity(120); + data.resize(data.capacity(), 0); + for i in 0..data.len() { + data[i] = i as u8; + } + + let start = uud.window_start.0; + + let chunk = Chunk { + id: stream::ID(42), + flag_start: true, + flag_end: false, + sequence: start, + data: &data[..40], + }; + let _ = uud.recv(chunk); + let chunk = Chunk { + id: stream::ID(42), + flag_start: true, + flag_end: false, + sequence: start + 50, + data: &data[50..100], + }; + let _ = uud.recv(chunk); + let got = uud.get(); + assert!( + &data[..40] == &got[..], + "DIFF:\n {:?}\n {:?}", + &data[..40], + &got[..], + ); + let chunk = Chunk { + id: stream::ID(42), + flag_start: true, + flag_end: false, + sequence: start + 40, + data: &data[40..100], + }; + let _ = uud.recv(chunk); + let chunk = Chunk { + id: stream::ID(42), + flag_start: true, + flag_end: false, + sequence: start + 100, + data: &data[100..], + }; + let _ = uud.recv(chunk); + let got = uud.get(); + assert!( + &data[40..] == &got[..], + "DIFF:\n {:?}\n {:?}", + &data[40..], + &got[..], + ); +} diff --git a/src/connection/stream/uudl/mod.rs b/src/connection/stream/uudl/mod.rs new file mode 100644 index 0000000..aeebd9a --- /dev/null +++ b/src/connection/stream/uudl/mod.rs @@ -0,0 +1,43 @@ +//! Implementation of the Unreliable, Unordered, Datagram Limited +//! transmission model +//! +//! AKA: UDP-like. "Limited" because the data must fit in a single packet +//! + +use crate::connection::stream::{ + Chunk, Error, Sequence, SequenceStart, StreamData, +}; + +use ::std::collections::{BTreeMap, VecDeque}; + +#[cfg(test)] +mod tests; + +/// UnReliable, UnOrdered, Datagram, Limited to the packet size +/// AKA: UDP-like +#[derive(Debug)] +pub(crate) struct UnreliableUnorderedDatagramLimited { + received: VecDeque<(SequenceStart, Vec)>, +} + +impl UnreliableUnorderedDatagramLimited { + pub(crate) fn new() -> Self { + Self { + received: VecDeque::with_capacity(4), + } + } + pub(crate) fn get(&mut self) -> (SequenceStart, Vec) { + match self.received.pop_front() { + Some(data) => data, + None => (SequenceStart(Sequence::min()), Vec::new()), + } + } + pub(crate) fn recv(&mut self, chunk: Chunk) -> Result { + if !chunk.flag_start || !chunk.flag_end { + return Err(Error::WrongFlags); + } + self.received + .push_back((SequenceStart(chunk.sequence), chunk.data.to_vec())); + Ok(StreamData::Ready) + } +} diff --git a/src/connection/stream/uudl/tests.rs b/src/connection/stream/uudl/tests.rs new file mode 100644 index 0000000..7814902 --- /dev/null +++ b/src/connection/stream/uudl/tests.rs @@ -0,0 +1,56 @@ +use crate::{ + connection::stream::{self, uudl::*, Chunk}, + enc::Random, +}; + +#[::tracing_test::traced_test] +#[test] +fn test_stream_uudl_sequential() { + let rand = Random::new(); + let mut uudl = UnreliableUnorderedDatagramLimited::new(); + + let mut data = Vec::with_capacity(1024); + data.resize(data.capacity(), 0); + rand.fill(&mut data[..]); + + //let start = uudl.window_start.0; + let start = Sequence( + ::core::num::Wrapping(0) + ); + + let chunk = Chunk { + id: stream::ID(42), + flag_start: true, + flag_end: true, + sequence: start, + data: &data[..512], + }; + let got = uudl.get().1; + assert!(&got[..] == &[], "uudl: got data?"); + let _ = uudl.recv(chunk); + let got = uudl.get().1; + assert!( + &data[..512] == &got[..], + "UUDL1: DIFF: {:?} {:?}", + &data[..512].len(), + &got[..].len() + ); + let chunk = Chunk { + id: stream::ID(42), + flag_start: true, + flag_end: true, + sequence: start + 512, + data: &data[512..], + }; + let _ = uudl.recv(chunk); + let got = uudl.get().1; + assert!( + &data[512..] == &got[..], + "UUDL2: DIFF: {:?} {:?}", + &data[512..].len(), + &got[..].len() + ); + let got = uudl.get().1; + assert!(&got[..] == &[], "uudl: got data?"); +} + diff --git a/src/dnssec/mod.rs b/src/dnssec/mod.rs index d1128c1..9013ea6 100644 --- a/src/dnssec/mod.rs +++ b/src/dnssec/mod.rs @@ -87,14 +87,11 @@ impl Dnssec { )); } - let resolver = match TokioAsyncResolver::tokio(config, opts) { - Ok(resolver) => resolver, - Err(e) => return Err(Error::Setup(e.to_string())), - }; + let resolver = TokioAsyncResolver::tokio(config, opts); Ok(Self { resolver }) } - const TXT_RECORD_START: &str = "v=Fenrir1 "; + const TXT_RECORD_START: &'static str = "v=Fenrir1 "; /// Get the fenrir data for a domain pub async fn resolv(&self, domain: &Domain) -> ::std::io::Result { use ::trust_dns_client::rr::Name; diff --git a/src/enc/asym.rs b/src/enc/asym.rs index 4958a38..240f889 100644 --- a/src/enc/asym.rs +++ b/src/enc/asym.rs @@ -162,7 +162,8 @@ impl KeyExchangeKind { ) -> Result<(ExchangePrivKey, ExchangePubKey), Error> { match self { KeyExchangeKind::X25519DiffieHellman => { - let raw_priv = ::x25519_dalek::StaticSecret::new(rnd); + let raw_priv = + ::x25519_dalek::StaticSecret::random_from_rng(rnd); let pub_key = ExchangePubKey::X25519( ::x25519_dalek::PublicKey::from(&raw_priv), ); diff --git a/src/enc/hkdf.rs b/src/enc/hkdf.rs index 7ba885b..8d47ff3 100644 --- a/src/enc/hkdf.rs +++ b/src/enc/hkdf.rs @@ -71,7 +71,7 @@ impl Hkdf { // Hack & tricks: // HKDF are pretty important, but this lib don't zero out the data. // we can't use #[derive(Zeroing)] either. -// So we craete a union with a Zeroing object, and drop both manually. +// So we create a union with a Zeroing object, and drop the zeroable buffer. // TODO: move this to Hkdf instead of Sha3 @@ -88,8 +88,7 @@ impl Drop for HkdfInner { fn drop(&mut self) { #[allow(unsafe_code)] unsafe { - drop(&mut self.hkdf); - drop(&mut self.zeroable); + ::core::mem::ManuallyDrop::drop(&mut self.zeroable); } } } diff --git a/src/enc/mod.rs b/src/enc/mod.rs index 663c72d..6f919db 100644 --- a/src/enc/mod.rs +++ b/src/enc/mod.rs @@ -74,7 +74,7 @@ impl ::rand_core::RngCore for &Random { ) -> Result<(), ::rand_core::Error> { match self.rnd.fill(dest) { Ok(()) => Ok(()), - Err(e) => Err(::rand_core::Error::new(e)), + Err(e) => Err(::rand_core::Error::new(e.to_string())), } } } diff --git a/src/enc/tests.rs b/src/enc/tests.rs index ddd4125..b3fd73f 100644 --- a/src/enc/tests.rs +++ b/src/enc/tests.rs @@ -70,7 +70,7 @@ fn test_encrypt_decrypt() { let encrypt_to = encrypt_from + resp.encrypted_length(nonce_len, tag_len); let h_resp = - Handshake::new(handshake::Data::DirSync(dirsync::DirSync::Resp(resp))); + Handshake::new(Data::DirSync(dirsync::DirSync::Resp(resp))); let mut bytes = Vec::::with_capacity( h_resp.len(cipher.nonce_len(), cipher.tag_len()), @@ -119,7 +119,7 @@ fn test_encrypt_decrypt() { } }; // reparse - if let handshake::Data::DirSync(dirsync::DirSync::Resp(r_a)) = + if let Data::DirSync(dirsync::DirSync::Resp(r_a)) = &mut deserialized.data { let enc_start = r_a.encrypted_offset() + cipher.nonce_len().0; diff --git a/src/inner/mod.rs b/src/inner/mod.rs index 6102fde..dafcb5b 100644 --- a/src/inner/mod.rs +++ b/src/inner/mod.rs @@ -18,7 +18,7 @@ pub(crate) struct ThreadTracker { } pub(crate) static mut SLEEP_RESOLUTION: ::std::time::Duration = - if cfg!(linux) || cfg!(macos) { + if cfg!(target_os = "linux") || cfg!(target_os = "macos") { ::std::time::Duration::from_millis(1) } else { // windows diff --git a/src/inner/worker.rs b/src/inner/worker.rs index f344bb6..f4101b8 100644 --- a/src/inner/worker.rs +++ b/src/inner/worker.rs @@ -52,7 +52,7 @@ pub struct ConnData { /// Connection tracking information pub conn: ConnTracker, /// received data, for each stream - pub data: Vec<(stream::ID, Vec)>, + pub data: Vec<(stream::ID, Vec)>, //FIXME: ChunkOwned } /// Connection event. Mostly used to give the data to the user diff --git a/src/tests.rs b/src/tests.rs index ff28ae6..fe7eea2 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -17,7 +17,7 @@ async fn test_connection_dirsync() { } }; let cfg_client = { - let mut cfg = config::Config::default(); + let mut cfg = Config::default(); cfg.threads = Some(::core::num::NonZeroUsize::new(1).unwrap()); cfg };