Strong types for server/receiver and send packet

Signed-off-by: Luca Fulchir <luca.fulchir@runesauth.com>
This commit is contained in:
Luca Fulchir 2023-02-25 16:33:48 +01:00
parent 4157c207a3
commit bfe99b4c9c
Signed by: luca.fulchir
GPG Key ID: 8F6440603D13A78E
1 changed files with 59 additions and 21 deletions

View File

@ -192,22 +192,22 @@ type TokenChecker =
// so we have to handle `drop()` manually, and garbage-collect the ones we // so we have to handle `drop()` manually, and garbage-collect the ones we
// are no longer using in the background. sigh. // are no longer using in the background. sigh.
// Just go with a ArcSwapAny<Arc<Vec<Arc< ... // Just go with a ArcSwapAny<Arc<Vec<Arc< ...
type SocketTracker = (Arc<UdpSocket>, Arc<JoinHandle<::std::io::Result<()>>>);
struct SocketList { struct SocketList {
sockets: list: ArcSwap<Vec<SocketTracker>>,
ArcSwap<Vec<(Arc<UdpSocket>, Arc<JoinHandle<::std::io::Result<()>>>)>>,
} }
impl SocketList { impl SocketList {
fn new() -> Self { fn new() -> Self {
Self { Self {
sockets: ArcSwap::new(Arc::new(Vec::new())), list: ArcSwap::new(Arc::new(Vec::new())),
} }
} }
// TODO: fn rm_socket() // TODO: fn rm_socket()
fn rm_all(&self) -> Self { fn rm_all(&self) -> Self {
let new_list = Arc::new(Vec::new()); let new_list = Arc::new(Vec::new());
let old_list = self.sockets.swap(new_list); let old_list = self.list.swap(new_list);
Self { Self {
sockets: old_list.into(), list: old_list.into(),
} }
} }
async fn add_socket( async fn add_socket(
@ -217,24 +217,17 @@ impl SocketList {
) { ) {
let mut new_list; let mut new_list;
{ {
let old_list = self.sockets.load(); let old_list = self.list.load();
new_list = Arc::new(Vec::with_capacity(old_list.len() + 1)); new_list = Arc::new(Vec::with_capacity(old_list.len() + 1));
new_list = old_list.to_vec().into(); new_list = old_list.to_vec().into();
} }
Arc::get_mut(&mut new_list) Arc::get_mut(&mut new_list)
.unwrap() .unwrap()
.push((socket, Arc::new(handle))); .push((socket, Arc::new(handle)));
self.sockets.swap(new_list); self.list.swap(new_list);
}
async fn find(&self, sock: SocketAddr) -> Option<Arc<UdpSocket>> {
let list = self.sockets.load();
match list.iter().find(|&(s, _)| s.local_addr().unwrap() == sock) {
Some((sock, _)) => Some(sock.clone()),
None => None,
}
} }
async fn stop_all(mut self) { async fn stop_all(mut self) {
let mut arc_list = self.sockets.into_inner(); let mut arc_list = self.list.into_inner();
let list = loop { let list = loop {
match Arc::try_unwrap(arc_list) { match Arc::try_unwrap(arc_list) {
Ok(list) => break list, Ok(list) => break list,
@ -251,12 +244,38 @@ impl SocketList {
Arc::get_mut(&mut handle).unwrap().await; Arc::get_mut(&mut handle).unwrap().await;
} }
} }
fn lock(&self) -> SocketListRef {
SocketListRef {
list: self.list.load_full(),
} }
}
}
// TODO: impl Drop for SocketList
struct SocketListRef {
list: Arc<Vec<SocketTracker>>,
}
impl SocketListRef {
fn find(&self, sock: UdpServer) -> Option<Arc<UdpSocket>> {
match self
.list
.iter()
.find(|&(s, _)| s.local_addr().unwrap() == sock.0)
{
Some((sock_srv, _)) => Some(sock_srv.clone()),
None => None,
}
}
}
#[derive(Debug, Copy, Clone)]
struct UdpClient(SocketAddr);
#[derive(Debug, Copy, Clone)]
struct UdpServer(SocketAddr);
struct RawUdp { struct RawUdp {
data: Vec<u8>, data: Vec<u8>,
src: SocketAddr, src: UdpClient,
dst: SocketAddr, dst: UdpServer,
} }
enum Work { enum Work {
@ -430,7 +449,7 @@ impl Fenrir {
socket: Arc<UdpSocket>, socket: Arc<UdpSocket>,
) -> ::std::io::Result<()> { ) -> ::std::io::Result<()> {
// jumbo frames are 9K max // jumbo frames are 9K max
let sock_receiver = socket.local_addr()?; let sock_receiver = UdpServer(socket.local_addr()?);
let mut buffer: [u8; 9000] = [0; 9000]; let mut buffer: [u8; 9000] = [0; 9000];
loop { loop {
let (bytes, sock_sender) = ::tokio::select! { let (bytes, sock_sender) = ::tokio::select! {
@ -444,8 +463,8 @@ impl Fenrir {
let data: Vec<u8> = buffer[..bytes].to_vec(); let data: Vec<u8> = buffer[..bytes].to_vec();
work_queue.send(Work::Recv(RawUdp { work_queue.send(Work::Recv(RawUdp {
data, data,
src: sock_sender, src: UdpClient(sock_sender),
dst: sock_receiver.clone(), dst: sock_receiver,
})); }));
} }
Ok(()) Ok(())
@ -614,7 +633,8 @@ impl Fenrir {
let mut raw_out = let mut raw_out =
Vec::<u8>::with_capacity(packet.len()); Vec::<u8>::with_capacity(packet.len());
packet.serialize(&mut raw_out); packet.serialize(&mut raw_out);
todo!() self.send_packet(raw_out, udp.src, udp.dst)
.await;
} }
_ => { _ => {
todo!() todo!()
@ -628,4 +648,22 @@ impl Fenrir {
// copy packet, spawn // copy packet, spawn
todo!(); todo!();
} }
async fn send_packet(
&self,
data: Vec<u8>,
client: UdpClient,
server: UdpServer,
) {
let sockets = self.sockets.lock();
let src_sock = match sockets.find(server) {
Some(src_sock) => src_sock,
None => {
::tracing::error!(
"Can't send packet: Server changed listening ip!"
);
return;
}
};
src_sock.send_to(&data, client.0);
}
} }