diff --git a/src/nex/user.rs b/src/nex/user.rs index e5edc00..019b2bd 100644 --- a/src/nex/user.rs +++ b/src/nex/user.rs @@ -408,6 +408,10 @@ impl Matchmake for User { println!("{:?}", urls); + + if urls.is_empty(){ + return Err(ErrorCode::RendezVous_NotParticipatedGathering) + } Ok(urls) } diff --git a/src/prudp/packet.rs b/src/prudp/packet.rs index 6c89eb9..ab72874 100644 --- a/src/prudp/packet.rs +++ b/src/prudp/packet.rs @@ -44,26 +44,27 @@ pub type Result = std::result::Result; pub struct TypesFlags(u16); impl TypesFlags { + #[inline] pub const fn get_types(self) -> u8 { (self.0 & 0x000F) as u8 } - + #[inline] pub const fn get_flags(self) -> u16 { (self.0 & 0xFFF0) >> 4 } - + #[inline] pub const fn types(self, val: u8) -> Self { Self((self.0 & 0xFFF0) | (val as u16 & 0x000F)) } - + #[inline] pub const fn flags(self, val: u16) -> Self { Self((self.0 & 0x000F) | ((val << 4) & 0xFFF0)) } - + #[inline] pub const fn set_flag(&mut self, val: u16){ self.0 |= (val & 0xFFF) << 4; } - + #[inline] pub const fn set_types(&mut self, val: u8){ self.0 |= val as u16 & 0x0F; } diff --git a/src/prudp/router.rs b/src/prudp/router.rs index fc5a998..de2f249 100644 --- a/src/prudp/router.rs +++ b/src/prudp/router.rs @@ -64,7 +64,7 @@ impl Router { tokio::spawn(async move { - endpoint.recieve_packet(connection, packet).await + endpoint.receive_packet(connection, packet).await }); } } diff --git a/src/prudp/socket.rs b/src/prudp/socket.rs index d9224fd..3657290 100644 --- a/src/prudp/socket.rs +++ b/src/prudp/socket.rs @@ -1,10 +1,14 @@ +use crate::nex::account::Account; use crate::prudp::packet::flags::{ACK, HAS_SIZE, MULTI_ACK, NEED_ACK, RELIABLE}; use crate::prudp::packet::types::{CONNECT, DATA, DISCONNECT, PING, SYN}; -use crate::prudp::packet::PacketOption::{ConnectionSignature, FragmentId, InitialSequenceId, MaximumSubstreamId, SupportedFunctions}; +use crate::prudp::packet::PacketOption::{ + ConnectionSignature, FragmentId, InitialSequenceId, MaximumSubstreamId, SupportedFunctions, +}; use crate::prudp::packet::{PRUDPHeader, PRUDPPacket, PacketOption, TypesFlags, VirtualPort}; use crate::prudp::router::{Error, Router}; use crate::prudp::sockaddr::PRUDPSockAddr; use async_trait::async_trait; +use chrono::NaiveTime; use hmac::digest::consts::U5; use log::info; use log::{error, trace, warn}; @@ -21,17 +25,18 @@ use std::net::SocketAddrV4; use std::ops::Deref; use std::pin::Pin; use std::sync::{Arc, Weak}; + +use std::time::Duration; use tokio::net::UdpSocket; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::{Mutex, RwLock}; +use tokio::time::{sleep, Instant}; use tokio_stream::Stream; -use crate::nex::account::Account; // due to the way this is designed crashing the router thread causes deadlock, sorry ;-; // (maybe i will fix that some day) /// PRUDP Socket for accepting connections to then send and recieve data from those clients - pub struct EncryptionPair { pub send: T, pub recv: T, @@ -46,11 +51,6 @@ impl EncryptionPair { } } -pub struct NewEncryptionPair { - pub send: E, - pub recv: E, -} - pub struct CommonConnection { pub user_id: u32, pub socket_addr: PRUDPSockAddr, @@ -60,6 +60,7 @@ pub struct CommonConnection { struct InternalConnection { common: Arc, + connections: Weak>>>>>, reliable_server_counter: u16, reliable_client_counter: u16, // maybe add connection id(need to see if its even needed) @@ -67,23 +68,40 @@ struct InternalConnection { data_sender: Sender>, socket: Arc, packet_queue: HashMap, + last_packet_time: Instant, } -impl Deref for InternalConnection{ +impl Deref for InternalConnection { type Target = CommonConnection; fn deref(&self) -> &Self::Target { &self.common } } -impl InternalConnection{ - fn next_server_count(&mut self) -> u16{ +impl InternalConnection { + fn next_server_count(&mut self) -> u16 { let prev_val = self.reliable_server_counter; let (val, _) = self.reliable_server_counter.overflowing_add(1); self.reliable_server_counter = val; prev_val } + + #[inline] + async fn send_raw_packet(&self, mut prudp_packet: PRUDPPacket) { + prudp_packet.set_sizes(); + + let mut vec = Vec::new(); + + prudp_packet + .write_to(&mut vec) + .expect("somehow failed to convert backet to bytes"); + + self.socket + .send_to(&vec, self.socket_addr.regular_socket_addr) + .await + .expect("failed to send data back"); + } } pub struct ExternalConnection { @@ -92,9 +110,9 @@ pub struct ExternalConnection { } #[derive(Clone)] -pub struct SendingConnection{ +pub struct SendingConnection { common: Arc, - inernal: Weak> + internal: Weak>, } pub struct CommonSocket { @@ -120,8 +138,8 @@ pub struct ExternalSocket { internal: Weak, } -impl ExternalSocket{ - pub async fn connect(&mut self, addr: PRUDPSockAddr) -> Option{ +impl ExternalSocket { + pub async fn connect(&mut self, addr: PRUDPSockAddr) -> Option { let socket = self.internal.upgrade()?; socket.connect(addr).await; @@ -129,7 +147,7 @@ impl ExternalSocket{ self.connection_receiver.recv().await } - pub async fn accept(&mut self) -> Option{ + pub async fn accept(&mut self) -> Option { self.connection_receiver.recv().await } } @@ -152,7 +170,7 @@ impl Deref for InternalSocket { pub(super) trait AnyInternalSocket: Send + Sync + Deref + 'static { - async fn recieve_packet(&self, address: PRUDPSockAddr, packet: PRUDPPacket); + async fn receive_packet(&self, address: PRUDPSockAddr, packet: PRUDPPacket); async fn connect(&self, address: PRUDPSockAddr) -> Option<()>; } @@ -161,13 +179,15 @@ pub(super) trait AnyInternalConnection: Send + Sync + Deref + 'static { async fn send_data_packet(&mut self, data: Vec); + + async fn close_connection(&mut self); } #[async_trait] -impl AnyInternalConnection for InternalConnection{ +impl AnyInternalConnection for InternalConnection { async fn send_data_packet(&mut self, data: Vec) { - let mut packet = PRUDPPacket{ - header: PRUDPHeader{ + let mut packet = PRUDPPacket { + header: PRUDPHeader { sequence_id: self.next_server_count(), substream_id: 0, session_id: self.session_id, @@ -181,26 +201,74 @@ impl AnyInternalConnection for InternalConne ..Default::default() }; - self.crypto_handler_instance.encrypt_outgoing(0, &mut packet.payload[..]); + self.crypto_handler_instance + .encrypt_outgoing(0, &mut packet.payload[..]); packet.set_sizes(); self.crypto_handler_instance.sign_packet(&mut packet); - let mut vec = Vec::new(); + self.send_raw_packet(packet).await; + } - packet - .write_to(&mut vec) - .expect("somehow failed to convert backet to bytes"); - - self.socket - .send_to(&vec, self.socket_addr.regular_socket_addr) - .await - .expect("failed to send data back"); + async fn close_connection(&mut self) { + // jon confirmed that this should be a safe way to dc a client + + let mut packet = PRUDPPacket { + header: PRUDPHeader { + sequence_id: self.next_server_count(), + substream_id: 0, + session_id: self.session_id, + types_and_flags: TypesFlags::default().types(DISCONNECT), + destination_port: self.common.socket_addr.virtual_port, + source_port: self.server_port, + ..Default::default() + }, + payload: Vec::new(), + options: vec![FragmentId(0)], + ..Default::default() + }; + + // no need for encryption the, the payload is empty + + packet.set_sizes(); + + self.crypto_handler_instance.sign_packet(&mut packet); + + self.send_raw_packet(packet).await; + + let Some(conns) = self.connections.upgrade() else { + // this is fine as it implies the server has already quit, thus meaning that we dont + // have to remove ourselves from the server + return; + }; + + let mut conns = conns.lock().await; + + conns.remove(&self.socket_addr); + + // the connection will now drop as soon as we leave this due to no longer having a permanent + // reference } } impl InternalSocket { + async fn get_connection( + &self, + addr: PRUDPSockAddr, + ) -> Option>>> { + let connections = self.internal_connections.lock().await; + let Some(conn) = connections.get(&addr) else { + error!("tried to send data on inactive connection!"); + return None; + }; + + let conn = conn.clone(); + drop(connections); + + Some(conn) + } + async fn send_packet_unbuffered(&self, dest: PRUDPSockAddr, mut packet: PRUDPPacket) { packet.set_sizes(); @@ -232,12 +300,12 @@ impl InternalSocket { // todo: refactor this to be more readable(low priority cause it doesnt change anything api wise) for options in &packet.options { match options { - SupportedFunctions(functions) => response - .options - .push(SupportedFunctions(*functions & 0x04)), - MaximumSubstreamId(max_substream) => response - .options - .push(MaximumSubstreamId(*max_substream)), + SupportedFunctions(functions) => { + response.options.push(SupportedFunctions(*functions & 0x04)) + } + MaximumSubstreamId(max_substream) => { + response.options.push(MaximumSubstreamId(*max_substream)) + } _ => { /* ??? */ } } } @@ -248,41 +316,41 @@ impl InternalSocket { //println!("got syn: {:?}", response); - self.send_packet_unbuffered(address, response) - .await; + self.send_packet_unbuffered(address, response).await; } async fn connection_thread( - socket: Arc, - self_port: VirtualPort, - connection: Arc>>, - mut data_recv: Receiver> + connection: Weak>>, ) { //todo: handle stuff like resending packets if they arent acknowledged in here - while let Some(data) = data_recv.recv().await{ - let mut locked_conn = connection.lock().await; - let packet = PRUDPPacket{ - header: PRUDPHeader{ - sequence_id: locked_conn.next_server_count(), - substream_id: 0, - session_id: locked_conn.session_id, - types_and_flags: TypesFlags::default().types(DATA).flags(RELIABLE | NEED_ACK), - destination_port: locked_conn.common.socket_addr.virtual_port, - source_port: self_port, + + while let Some(conn) = connection.upgrade() { + let mut conn = conn.lock().await; + + if conn.last_packet_time < (Instant::now() - Duration::from_secs(5)) { + conn.send_raw_packet(PRUDPPacket { + header: PRUDPHeader { + sequence_id: 0, + substream_id: 0, + session_id: 0, + types_and_flags: TypesFlags::default().types(PING).flags(NEED_ACK), + destination_port: conn.common.socket_addr.virtual_port, + source_port: conn.server_port, + ..Default::default() + }, + payload: Vec::new(), + options: vec![], ..Default::default() - }, - payload: data, - options: vec![FragmentId(0)], - ..Default::default() - }; - - //packet. - - - - + }) + .await; + } + if conn.last_packet_time < (Instant::now() - Duration::from_secs(30)) { + conn.close_connection().await; + } + drop(conn); + sleep(Duration::from_secs(5)).await; } } @@ -297,7 +365,7 @@ impl InternalSocket { user_id: crypto_handler_instance.get_user_id(), socket_addr, session_id, - server_port: self.virtual_port + server_port: self.virtual_port, }); let (data_sender_from_client, data_receiver_from_client) = channel(16); @@ -305,11 +373,13 @@ impl InternalSocket { let internal = InternalConnection { common: common.clone(), crypto_handler_instance, - reliable_client_counter: if is_instantiator { 1 } else { 2 } , + connections: Arc::downgrade(&self.internal_connections), + reliable_client_counter: if is_instantiator { 1 } else { 2 }, reliable_server_counter: if is_instantiator { 2 } else { 1 }, data_sender: data_sender_from_client, socket: self.socket.clone(), - packet_queue: Default::default() + packet_queue: Default::default(), + last_packet_time: Instant::now(), }; let internal = Arc::new(Mutex::new(internal)); @@ -317,12 +387,11 @@ impl InternalSocket { let dyn_internal: Arc> = internal.clone(); let external = ExternalConnection { - sending: SendingConnection{ + sending: SendingConnection { common, - inernal: Arc::downgrade(&dyn_internal) + internal: Arc::downgrade(&dyn_internal), }, data_receiver: data_receiver_from_client, - }; let mut connections = self.internal_connections.lock().await; @@ -331,6 +400,8 @@ impl InternalSocket { drop(connections); + tokio::spawn(Self::connection_thread(Arc::downgrade(&internal))); + self.connection_sender .send(external) .await @@ -380,7 +451,6 @@ impl InternalSocket { response.payload = return_data; - //let remote_signature = address.calculate_connection_signature(); response @@ -389,24 +459,22 @@ impl InternalSocket { for option in &packet.options { match option { - MaximumSubstreamId(max_substream) => response - .options - .push(MaximumSubstreamId(*max_substream)), - SupportedFunctions(funcs) => { - response.options.push(SupportedFunctions(*funcs)) + MaximumSubstreamId(max_substream) => { + response.options.push(MaximumSubstreamId(*max_substream)) } + SupportedFunctions(funcs) => response.options.push(SupportedFunctions(*funcs)), _ => { /* ? */ } } } - response.set_sizes(); crypto.sign_connect(&mut response); //println!("connect out: {:?}", response); - self.create_connection(crypto, address, session_id, false).await; + self.create_connection(crypto, address, session_id, false) + .await; self.send_packet_unbuffered(address, response).await; } @@ -414,14 +482,16 @@ impl InternalSocket { async fn handle_data(&self, address: PRUDPSockAddr, mut packet: PRUDPPacket) { info!("got data"); - if packet.header.types_and_flags.get_flags() & (NEED_ACK | RELIABLE) != (NEED_ACK | RELIABLE){ + if packet.header.types_and_flags.get_flags() & (NEED_ACK | RELIABLE) + != (NEED_ACK | RELIABLE) + { error!("invalid or unimplemented packet flags"); } let connections = self.internal_connections.lock().await; - let Some(conn) = connections.get(&address) else{ + let Some(conn) = connections.get(&address) else { error!("tried to send data on inactive connection!"); - return + return; }; let conn = conn.clone(); @@ -436,7 +506,8 @@ impl InternalSocket { let mut counter = conn.reliable_client_counter; while let Some(mut packet) = conn.packet_queue.remove(&counter) { - conn.crypto_handler_instance.decrypt_incoming(packet.header.substream_id, &mut packet.payload[..]); + conn.crypto_handler_instance + .decrypt_incoming(packet.header.substream_id, &mut packet.payload[..]); let mut response = packet.base_acknowledgement_packet(); response.header.types_and_flags.set_flag(HAS_SIZE | ACK); @@ -453,11 +524,11 @@ impl InternalSocket { } } - async fn handle_ping(&self, address: PRUDPSockAddr, packet: PRUDPPacket){ + async fn handle_ping(&self, address: PRUDPSockAddr, packet: PRUDPPacket) { let connections = self.internal_connections.lock().await; - let Some(conn) = connections.get(&address) else{ + let Some(conn) = connections.get(&address) else { error!("tried to send data on inactive connection!"); - return + return; }; let conn = conn.clone(); drop(connections); @@ -473,11 +544,11 @@ impl InternalSocket { self.send_packet_unbuffered(address, response).await; } - async fn handle_disconnect(&self, address: PRUDPSockAddr, packet: PRUDPPacket){ + async fn handle_disconnect(&self, address: PRUDPSockAddr, packet: PRUDPPacket) { let connections = self.internal_connections.lock().await; - let Some(conn) = connections.get(&address) else{ + let Some(conn) = connections.get(&address) else { error!("tried to send data on inactive connection!"); - return + return; }; let conn = conn.clone(); drop(connections); @@ -493,37 +564,53 @@ impl InternalSocket { self.send_packet_unbuffered(address, response.clone()).await; self.send_packet_unbuffered(address, response.clone()).await; self.send_packet_unbuffered(address, response).await; + + //self.internal_connections.lock().await; } } #[async_trait] impl AnyInternalSocket for InternalSocket { - async fn recieve_packet(&self, address: PRUDPSockAddr, packet: PRUDPPacket) { - // todo: handle acks + async fn receive_packet(&self, address: PRUDPSockAddr, packet: PRUDPPacket) { + // todo: handle acks and resending + + if let Some(conn) = self.get_connection(address).await { + let mut conn = conn.lock().await; + + // reset timeout + conn.last_packet_time = Instant::now(); + } + if (packet.header.types_and_flags.get_flags() & ACK) != 0 { info!("got ack"); - if packet.header.types_and_flags.get_types() == SYN || - packet.header.types_and_flags.get_types() == CONNECT{ - if packet.header.types_and_flags.get_types() == SYN{ + + if packet.header.types_and_flags.get_types() == SYN + || packet.header.types_and_flags.get_types() == CONNECT + { + if packet.header.types_and_flags.get_types() == SYN { println!("Syn: {:?}", packet); } - if packet.header.types_and_flags.get_types() == CONNECT{ + if packet.header.types_and_flags.get_types() == CONNECT { println!("Connect: {:?}", packet); } let sender = self.connection_establishment_data_sender.lock().await; info!("redirecting ack to active connection establishment code"); - if let Some(conn) = sender.as_ref(){ + if let Some(conn) = sender.as_ref() { if let Err(e) = conn.send(packet).await { - error!("error whilest sending data to connection establishment: {}", e); + error!( + "error whilest sending data to connection establishment: {}", + e + ); } } else { error!("got connection response without the active reciever being present"); } } + return; } @@ -556,8 +643,8 @@ impl AnyInternalSocket for InternalSocket { let remote_signature = address.calculate_connection_signature(); - let packet = PRUDPPacket{ - header: PRUDPHeader{ + let packet = PRUDPPacket { + header: PRUDPHeader { source_port: self.virtual_port, destination_port: address.virtual_port, types_and_flags: TypesFlags::default().types(SYN).flags(NEED_ACK), @@ -566,16 +653,14 @@ impl AnyInternalSocket for InternalSocket { options: vec![ SupportedFunctions(0x104), MaximumSubstreamId(0), - ConnectionSignature(remote_signature) + ConnectionSignature(remote_signature), ], ..Default::default() }; - - self.send_packet_unbuffered(address, packet).await; - let Some(syn_ack_packet) = recv.recv().await else{ + let Some(syn_ack_packet) = recv.recv().await else { error!("what"); return None; }; @@ -589,10 +674,8 @@ impl AnyInternalSocket for InternalSocket { return None; }; - - - let packet = PRUDPPacket{ - header: PRUDPHeader{ + let packet = PRUDPPacket { + header: PRUDPHeader { source_port: self.virtual_port, destination_port: address.virtual_port, types_and_flags: TypesFlags::default().types(CONNECT).flags(NEED_ACK), @@ -601,19 +684,21 @@ impl AnyInternalSocket for InternalSocket { options: vec![ SupportedFunctions(0x04), MaximumSubstreamId(0), - ConnectionSignature(remote_signature) + ConnectionSignature(remote_signature), ], ..Default::default() }; self.send_packet_unbuffered(address, packet).await; - let Some(connect_ack_packet) = recv.recv().await else{ + let Some(connect_ack_packet) = recv.recv().await else { error!("what"); return None; }; - let (_, crypt) = self.crypto_handler.instantiate(remote_signature, *own_signature, &[], 1)?; + let (_, crypt) = + self.crypto_handler + .instantiate(remote_signature, *own_signature, &[], 1)?; //todo: make this work for secure servers as well self.create_connection(crypt, address, 0, true).await; @@ -680,38 +765,54 @@ pub trait CryptoHandler: Send + Sync + 'static { fn sign_pre_handshake(&self, packet: &mut PRUDPPacket); } -impl Deref for ExternalConnection{ +impl Deref for ExternalConnection { type Target = SendingConnection; fn deref(&self) -> &Self::Target { &self.sending } } -impl Deref for SendingConnection{ +impl Deref for SendingConnection { type Target = CommonConnection; fn deref(&self) -> &Self::Target { &self.common } } -impl ExternalConnection{ - pub async fn recv(&mut self) -> Option>{ +impl ExternalConnection { + pub async fn recv(&mut self) -> Option> { self.data_receiver.recv().await } //todo: make this an actual result instead of an option - pub fn duplicate_sender(&self) -> SendingConnection{ + pub fn duplicate_sender(&self) -> SendingConnection { self.sending.clone() } } -impl SendingConnection{ +impl SendingConnection { pub async fn send(&self, data: Vec) -> Option<()> { - let internal = self.inernal.upgrade()?; + let internal = self.internal.upgrade()?; let mut internal = internal.lock().await; internal.send_data_packet(data).await; Some(()) } -} \ No newline at end of file + + pub async fn close_connection(&self) { + let Some(internal) = self.internal.upgrade() else { + return; + }; + + let mut internal = internal.lock().await; + + internal.close_connection().await; + } +} + +impl Drop for InternalConnection { + fn drop(&mut self) { + println!("yatta"); + } +} diff --git a/src/rmc/protocols/mod.rs b/src/rmc/protocols/mod.rs index 4ea73df..53e6c40 100644 --- a/src/rmc/protocols/mod.rs +++ b/src/rmc/protocols/mod.rs @@ -274,6 +274,8 @@ async fn handle_incoming( } } + + info!("rmc disconnected") } pub fn new_rmc_gateway_connection(conn: ExternalConnection, create_internal: F) -> Arc diff --git a/src/web/mod.rs b/src/web/mod.rs index e096f34..ee43898 100644 --- a/src/web/mod.rs +++ b/src/web/mod.rs @@ -9,6 +9,7 @@ use tokio::task::JoinHandle; use serde::Serialize; use tokio::sync::Mutex; use crate::nex::matchmake::MatchmakeManager; +use crate::rmc::protocols::HasRmcConnection; use crate::rmc::protocols::notifications::NotificationEvent; struct RnexApiAuth; @@ -45,10 +46,25 @@ async fn players_in_match(mmm: &State>, gid: u32) -> Optio Some(Json(gathering.connected_players.iter().filter_map(|p| p.upgrade()).map(|p| p.pid).collect())) } +#[get("/player//disconnect")] +async fn disconnect_player(_auth: RnexApiAuth, mmm: &State>, pid: u32) -> Option<()>{ + // this doesnt work and is broken, there might be some other way to remotely close gatherings... + // also if anyone gets this working change it to POST cause the only reason its get is because + // that makes testing it easier + let mmm = mmm.users.read().await; + + for player in mmm.values().filter_map(|p| p.upgrade()).filter(|p| p.pid == pid) { + player.remote.get_connection().0.close_connection().await; + } + + + Some(()) +} + #[get("/gathering//close")] async fn close_gathering(_auth: RnexApiAuth, mmm: &State>, gid: u32) -> Option<()>{ // this doesnt work and is broken, there might be some other way to remotely close gatherings... - // also if anyone gets this working change it to POST cause the only reason its get is because + // also if anyone gets this working change it to POST cause the only reason its get is because // that makes testing it easier let mmm = mmm.sessions.read().await; @@ -73,7 +89,7 @@ async fn close_gathering(_auth: RnexApiAuth, mmm: &State>, pub async fn start_web(mgr: Arc) -> JoinHandle<()> { tokio::spawn(async move { rocket::build() - .mount("/", routes![gatherings, players_in_match, close_gathering]) + .mount("/", routes![gatherings, players_in_match, close_gathering, disconnect_player]) .manage(mgr) .launch().await .expect("unable to start webserver");