diff --git a/prudpv1/src/prudp/socket.rs b/prudpv1/src/prudp/socket.rs index 4aca993..d44c97b 100644 --- a/prudpv1/src/prudp/socket.rs +++ b/prudpv1/src/prudp/socket.rs @@ -10,7 +10,11 @@ use async_trait::async_trait; use log::info; use log::error; use rc4::StreamCipher; +use rnex_core::rmc::structures::qbuffer::QBuffer; +use v_byte_helpers::ReadExtensions; +use v_byte_helpers::little_endian::{read_u16, read_u32}; use std::collections::{BTreeMap, HashMap}; +use std::io::Cursor; use std::marker::PhantomData; use std::ops::Deref; use std::sync::{Arc, Weak}; @@ -51,12 +55,14 @@ struct InternalConnection { connections: Weak>>>>>, reliable_server_counter: u16, reliable_client_counter: u16, + supported_function_version: u32, // maybe add connection id(need to see if its even needed) crypto_handler_instance: E, data_sender: Sender>, socket: Arc, packet_queue: HashMap, last_packet_time: Instant, + unacknowleged_packets: Vec<(Instant, PRUDPV1Packet)> } impl Deref for InternalConnection { @@ -67,6 +73,7 @@ impl Deref for InternalConnection { } impl InternalConnection { + /// gives back the next server packet sequence id which the client expects to send, incrementing it in the process fn next_server_count(&mut self) -> u16 { let prev_val = self.reliable_server_counter; let (val, _) = self.reliable_server_counter.overflowing_add(1); @@ -75,20 +82,15 @@ impl InternalConnection { prev_val } + /// Sends a raw packet to a given client on the connection + /// + /// a raw packet is one which does not get processed any further(other than to send it + /// off without buffering or anything), + /// as such you need to make sure that + /// the sizes are set correctly and so on #[inline] - async fn send_raw_packet(&self, mut prudp_packet: PRUDPV1Packet) { - prudp_packet.set_sizes(); - - let mut vec = Vec::new(); - - prudp_packet - .write_to(&mut vec) - .expect("somehow failed to convert backet to bytes"); - - self.socket - .send_to(&vec, self.socket_addr.regular_socket_addr) - .await - .expect("failed to send data back"); + async fn send_raw_packet(&self, prudp_packet: &PRUDPV1Packet) { + send_raw_prudp_to_sockaddr(&self.socket, self.socket_addr, prudp_packet).await; } } @@ -196,7 +198,9 @@ impl AnyInternalConnection for InternalConne self.crypto_handler_instance.sign_packet(&mut packet); - self.send_raw_packet(packet).await; + self.send_raw_packet(&packet).await; + + self.unacknowleged_packets.push((Instant::now(), packet)); } async fn close_connection(&mut self) { @@ -223,7 +227,7 @@ impl AnyInternalConnection for InternalConne self.crypto_handler_instance.sign_packet(&mut packet); - self.send_raw_packet(packet).await; + self.send_raw_packet(&packet).await; let Some(conns) = self.connections.upgrade() else { // this is fine as it implies the server has already quit, thus meaning that we dont @@ -240,6 +244,19 @@ impl AnyInternalConnection for InternalConne } } +async fn send_raw_prudp_to_sockaddr(udp_socket: &UdpSocket, dest: PRUDPSockAddr, packet: &PRUDPV1Packet){ + let mut vec = Vec::new(); + + packet + .write_to(&mut vec) + .expect("somehow failed to convert backet to bytes"); + + udp_socket + .send_to(&vec, dest.regular_socket_addr) + .await + .expect("failed to send data back"); +} + impl InternalSocket { async fn get_connection( &self, @@ -257,19 +274,12 @@ impl InternalSocket { Some(conn) } - async fn send_packet_unbuffered(&self, dest: PRUDPSockAddr, mut packet: PRUDPV1Packet) { - packet.set_sizes(); - - let mut vec = Vec::new(); - - packet - .write_to(&mut vec) - .expect("somehow failed to convert backet to bytes"); - - self.socket - .send_to(&vec, dest.regular_socket_addr) - .await - .expect("failed to send data back"); + /// sends a raw packet to a specific prudp socket address + /// + /// a raw packet is a packet is a packet which wont get processed any further, + /// sizes signatures etc need to be set before using this function + async fn send_packet_unbuffered(&self, dest: PRUDPSockAddr, packet: &PRUDPV1Packet) { + send_raw_prudp_to_sockaddr(&self.socket, dest, packet).await } async fn handle_syn(&self, address: PRUDPSockAddr, packet: PRUDPV1Packet) { @@ -304,7 +314,7 @@ impl InternalSocket { //println!("got syn: {:?}", response); - self.send_packet_unbuffered(address, response).await; + self.send_packet_unbuffered(address, &response).await; } async fn connection_thread( @@ -316,7 +326,7 @@ impl InternalSocket { let mut conn = conn.lock().await; if conn.last_packet_time < (Instant::now() - Duration::from_secs(5)) { - conn.send_raw_packet(PRUDPV1Packet { + conn.send_raw_packet(&PRUDPV1Packet { header: PRUDPV1Header { sequence_id: 0, substream_id: 0, @@ -336,9 +346,17 @@ impl InternalSocket { if conn.last_packet_time < (Instant::now() - Duration::from_secs(30)) { conn.close_connection().await; } + + for (send_time, packet) in &conn.unacknowleged_packets{ + if *send_time < (Instant::now() - Duration::from_millis(500)){ + info!("unacknowledged packet sat arround for more than 500 ms, resending"); + conn.send_raw_packet(packet).await; + } + } + drop(conn); - sleep(Duration::from_secs(5)).await; + sleep(Duration::from_millis(500)).await; } } @@ -348,6 +366,7 @@ impl InternalSocket { socket_addr: PRUDPSockAddr, session_id: u8, is_instantiator: bool, + supported_function_version: u32, ) { let common = Arc::new(CommonConnection { user_id: crypto_handler_instance.get_user_id(), @@ -368,6 +387,8 @@ impl InternalSocket { socket: self.socket.clone(), packet_queue: Default::default(), last_packet_time: Instant::now(), + unacknowleged_packets: Vec::new(), + supported_function_version }; let internal = Arc::new(Mutex::new(internal)); @@ -445,12 +466,17 @@ impl InternalSocket { .options .push(ConnectionSignature(Default::default())); + let mut functions: u32 = 0; + for option in &packet.options { match option { MaximumSubstreamId(max_substream) => { response.options.push(MaximumSubstreamId(*max_substream)) } - SupportedFunctions(funcs) => response.options.push(SupportedFunctions(*funcs & 0xFF)), + SupportedFunctions(funcs) => { + functions = *funcs & 0xFF; + response.options.push(SupportedFunctions(*funcs & 0xFF)); + }, _ => { /* ? */ } } } @@ -461,10 +487,10 @@ impl InternalSocket { //println!("connect out: {:?}", response); - self.create_connection(crypto, address, session_id, false) + self.create_connection(crypto, address, session_id, false, functions) .await; - self.send_packet_unbuffered(address, response).await; + self.send_packet_unbuffered(address, &response).await; } async fn handle_data(&self, address: PRUDPSockAddr, packet: PRUDPV1Packet) { @@ -503,7 +529,7 @@ impl InternalSocket { conn.crypto_handler_instance.sign_packet(&mut response); - self.send_packet_unbuffered(address, response).await; + self.send_packet_unbuffered(address, &response).await; conn.data_sender.send(packet.payload).await.ok(); @@ -529,7 +555,7 @@ impl InternalSocket { conn.crypto_handler_instance.sign_packet(&mut response); - self.send_packet_unbuffered(address, response).await; + self.send_packet_unbuffered(address, &response).await; } async fn handle_disconnect(&self, address: PRUDPSockAddr, packet: PRUDPV1Packet) { @@ -549,9 +575,9 @@ impl InternalSocket { conn.crypto_handler_instance.sign_packet(&mut response); - self.send_packet_unbuffered(address, response.clone()).await; - self.send_packet_unbuffered(address, response.clone()).await; - self.send_packet_unbuffered(address, response).await; + self.send_packet_unbuffered(address, &response).await; + self.send_packet_unbuffered(address, &response).await; + self.send_packet_unbuffered(address, &response).await; //self.internal_connections.lock().await; } @@ -572,7 +598,7 @@ impl AnyInternalSocket for InternalSocket { if (packet.header.types_and_flags.get_flags() & ACK) != 0 { info!("got ack"); - + if packet.header.types_and_flags.get_types() == SYN || packet.header.types_and_flags.get_types() == CONNECT { @@ -597,13 +623,76 @@ impl AnyInternalSocket for InternalSocket { } else { error!("got connection response without the active reciever being present"); } + return; + } + + if let Some(conn) = self.get_connection(address).await { + let mut conn = conn.lock().await; + + // remove the packet whose sequence id matches the ack packet + // or in other words keep all of those which dont match the sequence id + conn.unacknowleged_packets.retain_mut(|v| { + packet.header.sequence_id != v.1.header.sequence_id + }); + } else { + error!("non connection acknowledgement packet on nonexistent connection...") } return; } if (packet.header.types_and_flags.get_flags() & MULTI_ACK) != 0 { - info!("got multi ack"); + if let Some(conn) = self.get_connection(address).await { + let mut conn = conn.lock().await; + + if conn.supported_function_version == 1{ + let mut collected_ids: Vec = Vec::new(); + let mut cursor = Cursor::new(&packet.payload); + + while let Ok(v) = read_u16(&mut cursor){ + collected_ids.push(v); + } + + conn.unacknowleged_packets.retain_mut(|(_, up)| { + !( + collected_ids.iter().any(|id| up.header.sequence_id == *id) || + up.header.sequence_id < packet.header.sequence_id + ) + }); + + } else { + let mut collected_ids: Vec = Vec::new(); + let mut cursor = Cursor::new(&packet.payload); + + let Ok(_substream_id): Result = cursor.read_le_struct() else{ + error!("invalid data whilest reading new version agregate acknowledgement"); + return; + }; + let Ok(additional_sequence_ids): Result = cursor.read_le_struct() else { + error!("invalid data whilest reading new version agregate acknowledgement"); + return; + }; + let Ok(sequence_id): Result = cursor.read_le_struct() else { + error!("invalid data whilest reading new version agregate acknowledgement"); + return; + }; + for _ in 0..additional_sequence_ids{ + let Ok(additional_sequence_id): Result = cursor.read_le_struct() else { + error!("invalid data whilest reading new version agregate acknowledgement"); + return; + }; + collected_ids.push(additional_sequence_id); + conn.unacknowleged_packets.retain_mut(|(_, up)| { + !( + collected_ids.iter().any(|id| up.header.sequence_id == *id) || + up.header.sequence_id < sequence_id + ) + }); + } + } + } else { + error!("non connection acknowledgement packet on nonexistent connection...") + } return; } @@ -631,7 +720,7 @@ impl AnyInternalSocket for InternalSocket { let remote_signature = address.calculate_connection_signature(); - let packet = PRUDPV1Packet { + let mut packet = PRUDPV1Packet { header: PRUDPV1Header { source_port: self.virtual_port, destination_port: address.virtual_port, @@ -646,7 +735,9 @@ impl AnyInternalSocket for InternalSocket { ..Default::default() }; - self.send_packet_unbuffered(address, packet).await; + packet.set_sizes(); + + self.send_packet_unbuffered(address, &packet).await; let Some(syn_ack_packet) = recv.recv().await else { error!("what"); @@ -662,7 +753,7 @@ impl AnyInternalSocket for InternalSocket { return None; }; - let packet = PRUDPV1Packet { + let mut packet = PRUDPV1Packet { header: PRUDPV1Header { source_port: self.virtual_port, destination_port: address.virtual_port, @@ -677,7 +768,9 @@ impl AnyInternalSocket for InternalSocket { ..Default::default() }; - self.send_packet_unbuffered(address, packet).await; + packet.set_sizes(); + + self.send_packet_unbuffered(address, &packet).await; let Some(_connect_ack_packet) = recv.recv().await else { error!("what"); @@ -689,7 +782,7 @@ impl AnyInternalSocket for InternalSocket { .instantiate(remote_signature, *own_signature, &[], 1)?; //todo: make this work for secure servers as well - self.create_connection(crypt, address, 0, true).await; + self.create_connection(crypt, address, 0, true, 4).await; Some(()) } diff --git a/rnex-core/src/rmc/protocols/ranking.rs b/rnex-core/src/rmc/protocols/ranking.rs index 298922f..d5746e9 100644 --- a/rnex-core/src/rmc/protocols/ranking.rs +++ b/rnex-core/src/rmc/protocols/ranking.rs @@ -1,5 +1,35 @@ use macros::rmc_proto; +use crate::rmc::response::ErrorCode; +use macros::{method_id, rmc_struct, RmcSerialize}; + +#[derive(RmcSerialize, Debug, Default, Clone)] +struct ResultsRange{ + offset: u32, + size: u32 +} + +#[derive(RmcSerialize, Debug, Default, Clone)] +#[rmc_struct(1)] +struct CompetitionRankingGetParam{ + unk: u32, + range: ResultsRange, + festival_ids: Vec, +} + +#[derive(RmcSerialize, Debug, Default, Clone)] +#[rmc_struct(0)] +struct CompetitionRankingScoreInfo{ + fest_id: u32, + score_data: Vec, + unk: u32, + team_wins: Vec, + team_votes: Vec +} + + #[rmc_proto(112)] pub trait Ranking{ + //#[method_id(16)] + //async fn competition_ranking_get_param(&self, param: CompetitionRankingGetParam) -> Result,ErrorCode>; } \ No newline at end of file