Review conn tracking, data reporting
Signed-off-by: Luca Fulchir <luca.fulchir@runesauth.com>
This commit is contained in:
parent
d8a27bf969
commit
c8ee2995d9
3
TODO
3
TODO
|
@ -1 +1,4 @@
|
||||||
* Wrapping for everything that wraps (sigh)
|
* Wrapping for everything that wraps (sigh)
|
||||||
|
* track user connection (add u64 from user)
|
||||||
|
* split API in LocalThread and ThreadSafe
|
||||||
|
* split send/recv API in Centralized, Decentralized
|
||||||
|
|
|
@ -14,7 +14,7 @@ use ::std::{
|
||||||
pub use crate::connection::{handshake::Handshake, packet::Packet};
|
pub use crate::connection::{handshake::Handshake, packet::Packet};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
connection::socket::UdpClient,
|
connection::{socket::UdpClient, stream::StreamData},
|
||||||
dnssec,
|
dnssec,
|
||||||
enc::{
|
enc::{
|
||||||
self,
|
self,
|
||||||
|
@ -141,28 +141,32 @@ impl ProtocolVersion {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Unique tracker of connections
|
||||||
#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
|
#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
|
||||||
pub(crate) struct UserConnTracker(Wrapping<usize>);
|
pub struct ConnTracker(Wrapping<u64>);
|
||||||
impl UserConnTracker {
|
impl ConnTracker {
|
||||||
fn advance(&mut self) -> Self {
|
pub(crate) fn new(start: u16) -> Self {
|
||||||
|
Self(Wrapping(start as u64))
|
||||||
|
}
|
||||||
|
pub(crate) fn advance(&mut self, amount: u16) -> Self {
|
||||||
let old = self.0;
|
let old = self.0;
|
||||||
self.0 = self.0 + Wrapping(1);
|
self.0 = self.0 + Wrapping(amount as u64);
|
||||||
UserConnTracker(old)
|
ConnTracker(old)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Connection to an Authentication Server
|
/// Connection to an Authentication Server
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct AuthSrvConn(pub(crate) Conn);
|
pub struct AuthSrvConn(pub Conn);
|
||||||
/// Connection to a service
|
/// Connection to a service
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct ServiceConn(pub(crate) Conn);
|
pub struct ServiceConn(pub Conn);
|
||||||
|
|
||||||
/// The connection, as seen from a user of libFenrir
|
/// The connection, as seen from a user of libFenrir
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Conn {
|
pub struct Conn {
|
||||||
pub(crate) queue: ::async_channel::Sender<worker::Work>,
|
pub(crate) queue: ::async_channel::Sender<worker::Work>,
|
||||||
pub(crate) conn: UserConnTracker,
|
pub(crate) fast: ConnTracker,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Conn {
|
impl Conn {
|
||||||
|
@ -172,9 +176,13 @@ impl Conn {
|
||||||
use crate::inner::worker::Work;
|
use crate::inner::worker::Work;
|
||||||
let _ = self
|
let _ = self
|
||||||
.queue
|
.queue
|
||||||
.send(Work::UserSend((self.conn, stream, data)))
|
.send(Work::UserSend((self.tracker(), stream, data)))
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
/// Get the library tracking id
|
||||||
|
pub fn tracker(&self) -> ConnTracker {
|
||||||
|
self.fast
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Role: track the connection direction
|
/// Role: track the connection direction
|
||||||
|
@ -205,6 +213,10 @@ pub(crate) enum Enqueue {
|
||||||
Immediate(::tokio::time::Instant),
|
Immediate(::tokio::time::Instant),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Connection tracking id. Set by the user
|
||||||
|
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Copy, Clone)]
|
||||||
|
pub struct UserTracker(pub ::core::num::NonZeroU64);
|
||||||
|
|
||||||
/// A single connection and its data
|
/// A single connection and its data
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub(crate) struct Connection {
|
pub(crate) struct Connection {
|
||||||
|
@ -212,6 +224,9 @@ pub(crate) struct Connection {
|
||||||
pub(crate) id_recv: IDRecv,
|
pub(crate) id_recv: IDRecv,
|
||||||
/// Sending Conn ID
|
/// Sending Conn ID
|
||||||
pub(crate) id_send: IDSend,
|
pub(crate) id_send: IDSend,
|
||||||
|
/// User-managed id to track this connection
|
||||||
|
/// the user can set this to better track this connection
|
||||||
|
pub(crate) user_tracker: Option<UserTracker>,
|
||||||
/// Sending address
|
/// Sending address
|
||||||
pub(crate) send_addr: UdpClient,
|
pub(crate) send_addr: UdpClient,
|
||||||
/// The main hkdf used for all secrets in this connection
|
/// The main hkdf used for all secrets in this connection
|
||||||
|
@ -251,6 +266,8 @@ impl Connection {
|
||||||
Self {
|
Self {
|
||||||
id_recv: IDRecv(ID::Handshake),
|
id_recv: IDRecv(ID::Handshake),
|
||||||
id_send: IDSend(ID::Handshake),
|
id_send: IDSend(ID::Handshake),
|
||||||
|
user_tracker: None,
|
||||||
|
// will be overwritten
|
||||||
send_addr: UdpClient(SocketAddr::new(
|
send_addr: UdpClient(SocketAddr::new(
|
||||||
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
|
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
|
||||||
31337,
|
31337,
|
||||||
|
@ -265,7 +282,10 @@ impl Connection {
|
||||||
recv_queue: BTreeMap::new(),
|
recv_queue: BTreeMap::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub(crate) fn recv(&mut self, mut udp: crate::RawUdp) -> Result<(), Error> {
|
pub(crate) fn recv(
|
||||||
|
&mut self,
|
||||||
|
mut udp: crate::RawUdp,
|
||||||
|
) -> Result<StreamData, Error> {
|
||||||
let mut data = &mut udp.data[ID::len()..];
|
let mut data = &mut udp.data[ID::len()..];
|
||||||
let aad = enc::sym::AAD(&[]);
|
let aad = enc::sym::AAD(&[]);
|
||||||
self.cipher_recv.decrypt(aad, &mut data)?;
|
self.cipher_recv.decrypt(aad, &mut data)?;
|
||||||
|
@ -285,18 +305,22 @@ impl Connection {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
let mut data_ready = StreamData::NotReady;
|
||||||
for chunk in chunks.into_iter() {
|
for chunk in chunks.into_iter() {
|
||||||
let stream = match self.recv_queue.get_mut(&chunk.id) {
|
let stream_id = chunk.id;
|
||||||
|
let stream = match self.recv_queue.get_mut(&stream_id) {
|
||||||
Some(stream) => stream,
|
Some(stream) => stream,
|
||||||
None => {
|
None => {
|
||||||
::tracing::debug!("Ignoring chunk for unknown stream::ID");
|
::tracing::debug!("Ignoring chunk for unknown stream::ID");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
stream.recv(chunk);
|
match stream.recv(chunk) {
|
||||||
|
Ok(status) => data_ready = data_ready | status,
|
||||||
|
Err(e) => ::tracing::debug!("stream: {:?}: {:?}", stream_id, e),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// FIXME: report if we need to return data to the user
|
Ok(data_ready)
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
pub(crate) fn enqueue(
|
pub(crate) fn enqueue(
|
||||||
&mut self,
|
&mut self,
|
||||||
|
@ -402,8 +426,8 @@ impl Connection {
|
||||||
pub(crate) struct ConnList {
|
pub(crate) struct ConnList {
|
||||||
thread_id: ThreadTracker,
|
thread_id: ThreadTracker,
|
||||||
connections: Vec<Option<Connection>>,
|
connections: Vec<Option<Connection>>,
|
||||||
user_tracker: BTreeMap<UserConnTracker, usize>,
|
user_tracker: BTreeMap<ConnTracker, usize>,
|
||||||
last_tracked: UserConnTracker,
|
last_tracked: ConnTracker,
|
||||||
/// Bitmap to track which connection ids are used or free
|
/// Bitmap to track which connection ids are used or free
|
||||||
ids_used: Vec<::bitmaps::Bitmap<1024>>,
|
ids_used: Vec<::bitmaps::Bitmap<1024>>,
|
||||||
}
|
}
|
||||||
|
@ -420,7 +444,7 @@ impl ConnList {
|
||||||
thread_id,
|
thread_id,
|
||||||
connections: Vec::with_capacity(INITIAL_CAP),
|
connections: Vec::with_capacity(INITIAL_CAP),
|
||||||
user_tracker: BTreeMap::new(),
|
user_tracker: BTreeMap::new(),
|
||||||
last_tracked: UserConnTracker(Wrapping(0)),
|
last_tracked: ConnTracker(Wrapping(0)),
|
||||||
ids_used: vec![bitmap_id],
|
ids_used: vec![bitmap_id],
|
||||||
};
|
};
|
||||||
ret.connections.resize_with(INITIAL_CAP, || None);
|
ret.connections.resize_with(INITIAL_CAP, || None);
|
||||||
|
@ -437,10 +461,7 @@ impl ConnList {
|
||||||
(conn_id.get() / (self.thread_id.total as u64)) as usize;
|
(conn_id.get() / (self.thread_id.total as u64)) as usize;
|
||||||
(&mut self.connections[id_in_thread]).into()
|
(&mut self.connections[id_in_thread]).into()
|
||||||
}
|
}
|
||||||
pub fn get_mut(
|
pub fn get_mut(&mut self, tracker: ConnTracker) -> Option<&mut Connection> {
|
||||||
&mut self,
|
|
||||||
tracker: UserConnTracker,
|
|
||||||
) -> Option<&mut Connection> {
|
|
||||||
let idx = if let Some(idx) = self.user_tracker.get(&tracker) {
|
let idx = if let Some(idx) = self.user_tracker.get(&tracker) {
|
||||||
*idx
|
*idx
|
||||||
} else {
|
} else {
|
||||||
|
@ -504,7 +525,7 @@ impl ConnList {
|
||||||
pub(crate) fn track(
|
pub(crate) fn track(
|
||||||
&mut self,
|
&mut self,
|
||||||
conn: Connection,
|
conn: Connection,
|
||||||
) -> Result<UserConnTracker, ()> {
|
) -> Result<ConnTracker, ()> {
|
||||||
let conn_id = match conn.id_recv {
|
let conn_id = match conn.id_recv {
|
||||||
IDRecv(ID::Handshake) => {
|
IDRecv(ID::Handshake) => {
|
||||||
return Err(());
|
return Err(());
|
||||||
|
@ -516,8 +537,9 @@ impl ConnList {
|
||||||
self.connections[id_in_thread] = Some(conn);
|
self.connections[id_in_thread] = Some(conn);
|
||||||
let mut tracked;
|
let mut tracked;
|
||||||
loop {
|
loop {
|
||||||
tracked = self.last_tracked.advance();
|
tracked = self.last_tracked.advance(self.thread_id.total);
|
||||||
if self.user_tracker.get(&tracked).is_none() {
|
if self.user_tracker.get(&tracked).is_none() {
|
||||||
|
// like, never gonna happen, it's 64 bit
|
||||||
let _ = self.user_tracker.insert(tracked, id_in_thread);
|
let _ = self.user_tracker.insert(tracked, id_in_thread);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -238,6 +238,26 @@ impl Tracker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Eq, PartialEq)]
|
||||||
|
pub(crate) enum StreamData {
|
||||||
|
/// not enough data to return somthing to the user
|
||||||
|
NotReady = 0,
|
||||||
|
/// we can return something to the user
|
||||||
|
Ready,
|
||||||
|
}
|
||||||
|
impl ::core::ops::BitOr for StreamData {
|
||||||
|
type Output = Self;
|
||||||
|
|
||||||
|
// Required method
|
||||||
|
fn bitor(self, other: Self) -> Self::Output {
|
||||||
|
if self == StreamData::Ready || other == StreamData::Ready {
|
||||||
|
StreamData::Ready
|
||||||
|
} else {
|
||||||
|
StreamData::NotReady
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Actual stream-tracking structure
|
/// Actual stream-tracking structure
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub(crate) struct Stream {
|
pub(crate) struct Stream {
|
||||||
|
@ -254,7 +274,7 @@ impl Stream {
|
||||||
data: Tracker::new(kind, rand),
|
data: Tracker::new(kind, rand),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub(crate) fn recv(&mut self, chunk: Chunk) -> Result<(), Error> {
|
pub(crate) fn recv(&mut self, chunk: Chunk) -> Result<StreamData, Error> {
|
||||||
match &mut self.data {
|
match &mut self.data {
|
||||||
Tracker::ROB(tracker) => tracker.recv(chunk),
|
Tracker::ROB(tracker) => tracker.recv(chunk),
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,9 @@
|
||||||
//! AKA: TCP-like
|
//! AKA: TCP-like
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
connection::stream::{Chunk, Error, Sequence, SequenceEnd, SequenceStart},
|
connection::stream::{
|
||||||
|
Chunk, Error, Sequence, SequenceEnd, SequenceStart, StreamData,
|
||||||
|
},
|
||||||
enc::Random,
|
enc::Random,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -93,7 +95,7 @@ impl ReliableOrderedBytestream {
|
||||||
((self.pivot as usize + data_len) % self.data.len()) as u32;
|
((self.pivot as usize + data_len) % self.data.len()) as u32;
|
||||||
ret
|
ret
|
||||||
}
|
}
|
||||||
pub(crate) fn recv(&mut self, chunk: Chunk) -> Result<(), Error> {
|
pub(crate) fn recv(&mut self, chunk: Chunk) -> Result<StreamData, Error> {
|
||||||
if !chunk
|
if !chunk
|
||||||
.sequence
|
.sequence
|
||||||
.is_between(self.window_start, self.window_end)
|
.is_between(self.window_start, self.window_end)
|
||||||
|
@ -106,7 +108,7 @@ impl ReliableOrderedBytestream {
|
||||||
chunk.data.len(),
|
chunk.data.len(),
|
||||||
);
|
);
|
||||||
if maxlen == 0 {
|
if maxlen == 0 {
|
||||||
// or empty chunk, but we don't care
|
// empty window or empty chunk, but we don't care
|
||||||
return Err(Error::OutOfWindow);
|
return Err(Error::OutOfWindow);
|
||||||
}
|
}
|
||||||
// translate Sequences to offsets in self.data
|
// translate Sequences to offsets in self.data
|
||||||
|
@ -119,7 +121,7 @@ impl ReliableOrderedBytestream {
|
||||||
let mut copy_ranges = Vec::new();
|
let mut copy_ranges = Vec::new();
|
||||||
let mut to_delete = Vec::new();
|
let mut to_delete = Vec::new();
|
||||||
let mut to_add = Vec::new();
|
let mut to_add = Vec::new();
|
||||||
// note: te included ranges are (INCLUSIVE, INCLUSIVE)
|
// note: the ranges are (INCLUSIVE, INCLUSIVE)
|
||||||
for (idx, el) in self.missing.iter_mut().enumerate() {
|
for (idx, el) in self.missing.iter_mut().enumerate() {
|
||||||
let missing_from = self.window_start.offset(el.0);
|
let missing_from = self.window_start.offset(el.0);
|
||||||
if missing_from > offset_end {
|
if missing_from > offset_end {
|
||||||
|
@ -146,7 +148,6 @@ impl ReliableOrderedBytestream {
|
||||||
if missing_to > offset_end {
|
if missing_to > offset_end {
|
||||||
// [..chunk..]
|
// [..chunk..]
|
||||||
// [....missing....]
|
// [....missing....]
|
||||||
// chunk is in the middle of a missing fragment
|
|
||||||
to_add.push((
|
to_add.push((
|
||||||
el.0.plus_u32(((offset_end - missing_from) + 1) as u32),
|
el.0.plus_u32(((offset_end - missing_from) + 1) as u32),
|
||||||
el.1,
|
el.1,
|
||||||
|
@ -156,16 +157,12 @@ impl ReliableOrderedBytestream {
|
||||||
} else if offset <= missing_to {
|
} else if offset <= missing_to {
|
||||||
// [....chunk....]
|
// [....chunk....]
|
||||||
// [...missing...]
|
// [...missing...]
|
||||||
// chunk
|
|
||||||
copy_ranges.push((offset, (missing_to - 0)));
|
copy_ranges.push((offset, (missing_to - 0)));
|
||||||
el.1 =
|
el.1 =
|
||||||
el.0.plus_u32(((offset_end - missing_from) - 1) as u32);
|
el.0.plus_u32(((offset_end - missing_from) - 1) as u32);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.missing.append(&mut to_add);
|
|
||||||
self.missing
|
|
||||||
.sort_by(|(from_a, _), (from_b, _)| from_a.0 .0.cmp(&from_b.0 .0));
|
|
||||||
{
|
{
|
||||||
let mut deleted = 0;
|
let mut deleted = 0;
|
||||||
for idx in to_delete.into_iter() {
|
for idx in to_delete.into_iter() {
|
||||||
|
@ -173,6 +170,10 @@ impl ReliableOrderedBytestream {
|
||||||
deleted = deleted + 1;
|
deleted = deleted + 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
self.missing.append(&mut to_add);
|
||||||
|
self.missing
|
||||||
|
.sort_by(|(from_a, _), (from_b, _)| from_a.0 .0.cmp(&from_b.0 .0));
|
||||||
|
|
||||||
// copy only the missing data
|
// copy only the missing data
|
||||||
let (first, second) = self.data[..].split_at_mut(self.pivot as usize);
|
let (first, second) = self.data[..].split_at_mut(self.pivot as usize);
|
||||||
for (from, to) in copy_ranges.into_iter() {
|
for (from, to) in copy_ranges.into_iter() {
|
||||||
|
@ -198,7 +199,12 @@ impl ReliableOrderedBytestream {
|
||||||
.copy_from_slice(&data[data_from..data_to]);
|
.copy_from_slice(&data[data_from..data_to]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if self.missing.len() == 0
|
||||||
Ok(())
|
|| self.window_start.offset(self.missing[0].0) == 0
|
||||||
|
{
|
||||||
|
Ok(StreamData::Ready)
|
||||||
|
} else {
|
||||||
|
Ok(StreamData::NotReady)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,8 +11,8 @@ use crate::{
|
||||||
},
|
},
|
||||||
packet::{self, Packet},
|
packet::{self, Packet},
|
||||||
socket::{UdpClient, UdpServer},
|
socket::{UdpClient, UdpServer},
|
||||||
stream, AuthSrvConn, ConnList, Connection, IDSend, ServiceConn,
|
stream, AuthSrvConn, ConnList, ConnTracker, Connection, IDSend,
|
||||||
UserConnTracker,
|
ServiceConn,
|
||||||
},
|
},
|
||||||
dnssec,
|
dnssec,
|
||||||
enc::{
|
enc::{
|
||||||
|
@ -46,6 +46,16 @@ pub(crate) struct ConnectInfo {
|
||||||
// TODO: UserID, Token information
|
// TODO: UserID, Token information
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Connection event. Mostly used to give the data to the user
|
||||||
|
#[derive(Debug, Eq, PartialEq, Clone)]
|
||||||
|
#[non_exhaustive]
|
||||||
|
pub enum Event {
|
||||||
|
/// Work loop has exited. nothing more to do
|
||||||
|
End,
|
||||||
|
/// Data from a connection
|
||||||
|
Data(Vec<u8>),
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) enum Work {
|
pub(crate) enum Work {
|
||||||
/// ask the thread to report to the main thread the total number of
|
/// ask the thread to report to the main thread the total number of
|
||||||
/// connections present
|
/// connections present
|
||||||
|
@ -53,8 +63,8 @@ pub(crate) enum Work {
|
||||||
Connect(ConnectInfo),
|
Connect(ConnectInfo),
|
||||||
DropHandshake(KeyID),
|
DropHandshake(KeyID),
|
||||||
Recv(RawUdp),
|
Recv(RawUdp),
|
||||||
UserSend((UserConnTracker, stream::ID, Vec<u8>)),
|
UserSend((ConnTracker, stream::ID, Vec<u8>)),
|
||||||
SendData((UserConnTracker, ::tokio::time::Instant)),
|
SendData((ConnTracker, ::tokio::time::Instant)),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Actual worker implementation.
|
/// Actual worker implementation.
|
||||||
|
@ -136,7 +146,7 @@ impl Worker {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Continuously loop and process work as needed
|
/// Continuously loop and process work as needed
|
||||||
pub async fn work_loop(&mut self) {
|
pub async fn work_loop(&mut self) -> Result<Event, crate::Error> {
|
||||||
'mainloop: loop {
|
'mainloop: loop {
|
||||||
let next_timer = self.work_timers.get_next();
|
let next_timer = self.work_timers.get_next();
|
||||||
::tokio::pin!(next_timer);
|
::tokio::pin!(next_timer);
|
||||||
|
@ -436,7 +446,7 @@ impl Worker {
|
||||||
}
|
}
|
||||||
Work::UserSend((tracker, stream, data)) => {
|
Work::UserSend((tracker, stream, data)) => {
|
||||||
let conn = match self.connections.get_mut(tracker) {
|
let conn = match self.connections.get_mut(tracker) {
|
||||||
None => return,
|
None => continue,
|
||||||
Some(conn) => conn,
|
Some(conn) => conn,
|
||||||
};
|
};
|
||||||
use connection::Enqueue;
|
use connection::Enqueue;
|
||||||
|
@ -468,21 +478,21 @@ impl Worker {
|
||||||
.queue_sender
|
.queue_sender
|
||||||
.send(Work::SendData((tracker, instant)))
|
.send(Work::SendData((tracker, instant)))
|
||||||
.await;
|
.await;
|
||||||
return;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut raw: Vec<u8> = Vec::with_capacity(1200);
|
let mut raw: Vec<u8> = Vec::with_capacity(1200);
|
||||||
raw.resize(raw.capacity(), 0);
|
raw.resize(raw.capacity(), 0);
|
||||||
let conn = match self.connections.get_mut(tracker) {
|
let conn = match self.connections.get_mut(tracker) {
|
||||||
None => return,
|
None => continue,
|
||||||
Some(conn) => conn,
|
Some(conn) => conn,
|
||||||
};
|
};
|
||||||
let pkt = match conn.write_pkt(&mut raw) {
|
let pkt = match conn.write_pkt(&mut raw) {
|
||||||
Ok(pkt) => pkt,
|
Ok(pkt) => pkt,
|
||||||
Err(enc::Error::NotEnoughData(0)) => return,
|
Err(enc::Error::NotEnoughData(0)) => continue,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
::tracing::error!("Packet generation: {:?}", e);
|
::tracing::error!("Packet generation: {:?}", e);
|
||||||
return;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let dest = conn.send_addr;
|
let dest = conn.send_addr;
|
||||||
|
@ -493,6 +503,7 @@ impl Worker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Ok(Event::End)
|
||||||
}
|
}
|
||||||
/// Read and do stuff with the raw udp packet
|
/// Read and do stuff with the raw udp packet
|
||||||
async fn recv(&mut self, mut udp: RawUdp) {
|
async fn recv(&mut self, mut udp: RawUdp) {
|
||||||
|
@ -527,8 +538,12 @@ impl Worker {
|
||||||
None => return,
|
None => return,
|
||||||
Some(conn) => conn,
|
Some(conn) => conn,
|
||||||
};
|
};
|
||||||
if let Err(e) = conn.recv(udp) {
|
match conn.recv(udp) {
|
||||||
::tracing::trace!("Conn Recv: {:?}", e.to_string());
|
Ok(stream::StreamData::NotReady) => {}
|
||||||
|
Ok(stream::StreamData::Ready) => {
|
||||||
|
//
|
||||||
|
}
|
||||||
|
Err(e) => ::tracing::trace!("Conn Recv: {:?}", e.to_string()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/// Receive an handshake packet
|
/// Receive an handshake packet
|
||||||
|
@ -693,7 +708,7 @@ impl Worker {
|
||||||
};
|
};
|
||||||
let authsrv_conn = AuthSrvConn(connection::Conn {
|
let authsrv_conn = AuthSrvConn(connection::Conn {
|
||||||
queue: self.queue_sender.clone(),
|
queue: self.queue_sender.clone(),
|
||||||
conn: track_auth_conn,
|
fast: track_auth_conn,
|
||||||
});
|
});
|
||||||
let mut service_conn = None;
|
let mut service_conn = None;
|
||||||
if cci.service_id != auth::SERVICEID_AUTH {
|
if cci.service_id != auth::SERVICEID_AUTH {
|
||||||
|
@ -735,7 +750,7 @@ impl Worker {
|
||||||
};
|
};
|
||||||
service_conn = Some(ServiceConn(connection::Conn {
|
service_conn = Some(ServiceConn(connection::Conn {
|
||||||
queue: self.queue_sender.clone(),
|
queue: self.queue_sender.clone(),
|
||||||
conn: track_serv_conn,
|
fast: track_serv_conn,
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
let _ = cci.answer.send(Ok(handshake::tracker::ConnectOk {
|
let _ = cci.answer.send(Ok(handshake::tracker::ConnectOk {
|
||||||
|
|
13
src/lib.rs
13
src/lib.rs
|
@ -34,7 +34,7 @@ use crate::{
|
||||||
AuthServerConnections, Packet,
|
AuthServerConnections, Packet,
|
||||||
},
|
},
|
||||||
inner::{
|
inner::{
|
||||||
worker::{ConnectInfo, RawUdp, Work, Worker},
|
worker::{ConnectInfo, Event, RawUdp, Work, Worker},
|
||||||
ThreadTracker,
|
ThreadTracker,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
@ -638,7 +638,16 @@ impl Fenrir {
|
||||||
Ok(worker) => worker,
|
Ok(worker) => worker,
|
||||||
Err(_) => return,
|
Err(_) => return,
|
||||||
};
|
};
|
||||||
worker.work_loop().await
|
loop {
|
||||||
|
match worker.work_loop().await {
|
||||||
|
Ok(_) => continue,
|
||||||
|
Ok(Event::End) => break,
|
||||||
|
Err(e) => {
|
||||||
|
::tracing::error!("Worker: {:?}", e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
loop {
|
loop {
|
||||||
|
|
Loading…
Reference in New Issue