make sending data actually reliable

This commit is contained in:
Maple Nebel 2025-11-08 12:04:39 +00:00
commit f5b30496d7
2 changed files with 169 additions and 46 deletions

View file

@ -10,7 +10,11 @@ use async_trait::async_trait;
use log::info;
use log::error;
use rc4::StreamCipher;
use rnex_core::rmc::structures::qbuffer::QBuffer;
use v_byte_helpers::ReadExtensions;
use v_byte_helpers::little_endian::{read_u16, read_u32};
use std::collections::{BTreeMap, HashMap};
use std::io::Cursor;
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::{Arc, Weak};
@ -51,12 +55,14 @@ struct InternalConnection<E: CryptoHandlerConnectionInstance> {
connections: Weak<Mutex<BTreeMap<PRUDPSockAddr, Arc<Mutex<InternalConnection<E>>>>>>,
reliable_server_counter: u16,
reliable_client_counter: u16,
supported_function_version: u32,
// maybe add connection id(need to see if its even needed)
crypto_handler_instance: E,
data_sender: Sender<Vec<u8>>,
socket: Arc<UdpSocket>,
packet_queue: HashMap<u16, PRUDPV1Packet>,
last_packet_time: Instant,
unacknowleged_packets: Vec<(Instant, PRUDPV1Packet)>
}
impl<E: CryptoHandlerConnectionInstance> Deref for InternalConnection<E> {
@ -67,6 +73,7 @@ impl<E: CryptoHandlerConnectionInstance> Deref for InternalConnection<E> {
}
impl<E: CryptoHandlerConnectionInstance> InternalConnection<E> {
/// gives back the next server packet sequence id which the client expects to send, incrementing it in the process
fn next_server_count(&mut self) -> u16 {
let prev_val = self.reliable_server_counter;
let (val, _) = self.reliable_server_counter.overflowing_add(1);
@ -75,20 +82,15 @@ impl<E: CryptoHandlerConnectionInstance> InternalConnection<E> {
prev_val
}
/// Sends a raw packet to a given client on the connection
///
/// a raw packet is one which does not get processed any further(other than to send it
/// off without buffering or anything),
/// as such you need to make sure that
/// the sizes are set correctly and so on
#[inline]
async fn send_raw_packet(&self, mut prudp_packet: PRUDPV1Packet) {
prudp_packet.set_sizes();
let mut vec = Vec::new();
prudp_packet
.write_to(&mut vec)
.expect("somehow failed to convert backet to bytes");
self.socket
.send_to(&vec, self.socket_addr.regular_socket_addr)
.await
.expect("failed to send data back");
async fn send_raw_packet(&self, prudp_packet: &PRUDPV1Packet) {
send_raw_prudp_to_sockaddr(&self.socket, self.socket_addr, prudp_packet).await;
}
}
@ -196,7 +198,9 @@ impl<T: CryptoHandlerConnectionInstance> AnyInternalConnection for InternalConne
self.crypto_handler_instance.sign_packet(&mut packet);
self.send_raw_packet(packet).await;
self.send_raw_packet(&packet).await;
self.unacknowleged_packets.push((Instant::now(), packet));
}
async fn close_connection(&mut self) {
@ -223,7 +227,7 @@ impl<T: CryptoHandlerConnectionInstance> AnyInternalConnection for InternalConne
self.crypto_handler_instance.sign_packet(&mut packet);
self.send_raw_packet(packet).await;
self.send_raw_packet(&packet).await;
let Some(conns) = self.connections.upgrade() else {
// this is fine as it implies the server has already quit, thus meaning that we dont
@ -240,6 +244,19 @@ impl<T: CryptoHandlerConnectionInstance> AnyInternalConnection for InternalConne
}
}
async fn send_raw_prudp_to_sockaddr(udp_socket: &UdpSocket, dest: PRUDPSockAddr, packet: &PRUDPV1Packet){
let mut vec = Vec::new();
packet
.write_to(&mut vec)
.expect("somehow failed to convert backet to bytes");
udp_socket
.send_to(&vec, dest.regular_socket_addr)
.await
.expect("failed to send data back");
}
impl<T: CryptoHandler> InternalSocket<T> {
async fn get_connection(
&self,
@ -257,19 +274,12 @@ impl<T: CryptoHandler> InternalSocket<T> {
Some(conn)
}
async fn send_packet_unbuffered(&self, dest: PRUDPSockAddr, mut packet: PRUDPV1Packet) {
packet.set_sizes();
let mut vec = Vec::new();
packet
.write_to(&mut vec)
.expect("somehow failed to convert backet to bytes");
self.socket
.send_to(&vec, dest.regular_socket_addr)
.await
.expect("failed to send data back");
/// sends a raw packet to a specific prudp socket address
///
/// a raw packet is a packet is a packet which wont get processed any further,
/// sizes signatures etc need to be set before using this function
async fn send_packet_unbuffered(&self, dest: PRUDPSockAddr, packet: &PRUDPV1Packet) {
send_raw_prudp_to_sockaddr(&self.socket, dest, packet).await
}
async fn handle_syn(&self, address: PRUDPSockAddr, packet: PRUDPV1Packet) {
@ -304,7 +314,7 @@ impl<T: CryptoHandler> InternalSocket<T> {
//println!("got syn: {:?}", response);
self.send_packet_unbuffered(address, response).await;
self.send_packet_unbuffered(address, &response).await;
}
async fn connection_thread(
@ -316,7 +326,7 @@ impl<T: CryptoHandler> InternalSocket<T> {
let mut conn = conn.lock().await;
if conn.last_packet_time < (Instant::now() - Duration::from_secs(5)) {
conn.send_raw_packet(PRUDPV1Packet {
conn.send_raw_packet(&PRUDPV1Packet {
header: PRUDPV1Header {
sequence_id: 0,
substream_id: 0,
@ -336,9 +346,17 @@ impl<T: CryptoHandler> InternalSocket<T> {
if conn.last_packet_time < (Instant::now() - Duration::from_secs(30)) {
conn.close_connection().await;
}
for (send_time, packet) in &conn.unacknowleged_packets{
if *send_time < (Instant::now() - Duration::from_millis(500)){
info!("unacknowledged packet sat arround for more than 500 ms, resending");
conn.send_raw_packet(packet).await;
}
}
drop(conn);
sleep(Duration::from_secs(5)).await;
sleep(Duration::from_millis(500)).await;
}
}
@ -348,6 +366,7 @@ impl<T: CryptoHandler> InternalSocket<T> {
socket_addr: PRUDPSockAddr,
session_id: u8,
is_instantiator: bool,
supported_function_version: u32,
) {
let common = Arc::new(CommonConnection {
user_id: crypto_handler_instance.get_user_id(),
@ -368,6 +387,8 @@ impl<T: CryptoHandler> InternalSocket<T> {
socket: self.socket.clone(),
packet_queue: Default::default(),
last_packet_time: Instant::now(),
unacknowleged_packets: Vec::new(),
supported_function_version
};
let internal = Arc::new(Mutex::new(internal));
@ -445,12 +466,17 @@ impl<T: CryptoHandler> InternalSocket<T> {
.options
.push(ConnectionSignature(Default::default()));
let mut functions: u32 = 0;
for option in &packet.options {
match option {
MaximumSubstreamId(max_substream) => {
response.options.push(MaximumSubstreamId(*max_substream))
}
SupportedFunctions(funcs) => response.options.push(SupportedFunctions(*funcs & 0xFF)),
SupportedFunctions(funcs) => {
functions = *funcs & 0xFF;
response.options.push(SupportedFunctions(*funcs & 0xFF));
},
_ => { /* ? */ }
}
}
@ -461,10 +487,10 @@ impl<T: CryptoHandler> InternalSocket<T> {
//println!("connect out: {:?}", response);
self.create_connection(crypto, address, session_id, false)
self.create_connection(crypto, address, session_id, false, functions)
.await;
self.send_packet_unbuffered(address, response).await;
self.send_packet_unbuffered(address, &response).await;
}
async fn handle_data(&self, address: PRUDPSockAddr, packet: PRUDPV1Packet) {
@ -503,7 +529,7 @@ impl<T: CryptoHandler> InternalSocket<T> {
conn.crypto_handler_instance.sign_packet(&mut response);
self.send_packet_unbuffered(address, response).await;
self.send_packet_unbuffered(address, &response).await;
conn.data_sender.send(packet.payload).await.ok();
@ -529,7 +555,7 @@ impl<T: CryptoHandler> InternalSocket<T> {
conn.crypto_handler_instance.sign_packet(&mut response);
self.send_packet_unbuffered(address, response).await;
self.send_packet_unbuffered(address, &response).await;
}
async fn handle_disconnect(&self, address: PRUDPSockAddr, packet: PRUDPV1Packet) {
@ -549,9 +575,9 @@ impl<T: CryptoHandler> InternalSocket<T> {
conn.crypto_handler_instance.sign_packet(&mut response);
self.send_packet_unbuffered(address, response.clone()).await;
self.send_packet_unbuffered(address, response.clone()).await;
self.send_packet_unbuffered(address, response).await;
self.send_packet_unbuffered(address, &response).await;
self.send_packet_unbuffered(address, &response).await;
self.send_packet_unbuffered(address, &response).await;
//self.internal_connections.lock().await;
}
@ -572,7 +598,7 @@ impl<T: CryptoHandler> AnyInternalSocket for InternalSocket<T> {
if (packet.header.types_and_flags.get_flags() & ACK) != 0 {
info!("got ack");
if packet.header.types_and_flags.get_types() == SYN
|| packet.header.types_and_flags.get_types() == CONNECT
{
@ -597,13 +623,76 @@ impl<T: CryptoHandler> AnyInternalSocket for InternalSocket<T> {
} else {
error!("got connection response without the active reciever being present");
}
return;
}
if let Some(conn) = self.get_connection(address).await {
let mut conn = conn.lock().await;
// remove the packet whose sequence id matches the ack packet
// or in other words keep all of those which dont match the sequence id
conn.unacknowleged_packets.retain_mut(|v| {
packet.header.sequence_id != v.1.header.sequence_id
});
} else {
error!("non connection acknowledgement packet on nonexistent connection...")
}
return;
}
if (packet.header.types_and_flags.get_flags() & MULTI_ACK) != 0 {
info!("got multi ack");
if let Some(conn) = self.get_connection(address).await {
let mut conn = conn.lock().await;
if conn.supported_function_version == 1{
let mut collected_ids: Vec<u16> = Vec::new();
let mut cursor = Cursor::new(&packet.payload);
while let Ok(v) = read_u16(&mut cursor){
collected_ids.push(v);
}
conn.unacknowleged_packets.retain_mut(|(_, up)| {
!(
collected_ids.iter().any(|id| up.header.sequence_id == *id) ||
up.header.sequence_id < packet.header.sequence_id
)
});
} else {
let mut collected_ids: Vec<u16> = Vec::new();
let mut cursor = Cursor::new(&packet.payload);
let Ok(_substream_id): Result<u8, _> = cursor.read_le_struct() else{
error!("invalid data whilest reading new version agregate acknowledgement");
return;
};
let Ok(additional_sequence_ids): Result<u8, _> = cursor.read_le_struct() else {
error!("invalid data whilest reading new version agregate acknowledgement");
return;
};
let Ok(sequence_id): Result<u16, _> = cursor.read_le_struct() else {
error!("invalid data whilest reading new version agregate acknowledgement");
return;
};
for _ in 0..additional_sequence_ids{
let Ok(additional_sequence_id): Result<u16, _> = cursor.read_le_struct() else {
error!("invalid data whilest reading new version agregate acknowledgement");
return;
};
collected_ids.push(additional_sequence_id);
conn.unacknowleged_packets.retain_mut(|(_, up)| {
!(
collected_ids.iter().any(|id| up.header.sequence_id == *id) ||
up.header.sequence_id < sequence_id
)
});
}
}
} else {
error!("non connection acknowledgement packet on nonexistent connection...")
}
return;
}
@ -631,7 +720,7 @@ impl<T: CryptoHandler> AnyInternalSocket for InternalSocket<T> {
let remote_signature = address.calculate_connection_signature();
let packet = PRUDPV1Packet {
let mut packet = PRUDPV1Packet {
header: PRUDPV1Header {
source_port: self.virtual_port,
destination_port: address.virtual_port,
@ -646,7 +735,9 @@ impl<T: CryptoHandler> AnyInternalSocket for InternalSocket<T> {
..Default::default()
};
self.send_packet_unbuffered(address, packet).await;
packet.set_sizes();
self.send_packet_unbuffered(address, &packet).await;
let Some(syn_ack_packet) = recv.recv().await else {
error!("what");
@ -662,7 +753,7 @@ impl<T: CryptoHandler> AnyInternalSocket for InternalSocket<T> {
return None;
};
let packet = PRUDPV1Packet {
let mut packet = PRUDPV1Packet {
header: PRUDPV1Header {
source_port: self.virtual_port,
destination_port: address.virtual_port,
@ -677,7 +768,9 @@ impl<T: CryptoHandler> AnyInternalSocket for InternalSocket<T> {
..Default::default()
};
self.send_packet_unbuffered(address, packet).await;
packet.set_sizes();
self.send_packet_unbuffered(address, &packet).await;
let Some(_connect_ack_packet) = recv.recv().await else {
error!("what");
@ -689,7 +782,7 @@ impl<T: CryptoHandler> AnyInternalSocket for InternalSocket<T> {
.instantiate(remote_signature, *own_signature, &[], 1)?;
//todo: make this work for secure servers as well
self.create_connection(crypt, address, 0, true).await;
self.create_connection(crypt, address, 0, true, 4).await;
Some(())
}

View file

@ -1,5 +1,35 @@
use macros::rmc_proto;
use crate::rmc::response::ErrorCode;
use macros::{method_id, rmc_struct, RmcSerialize};
#[derive(RmcSerialize, Debug, Default, Clone)]
struct ResultsRange{
offset: u32,
size: u32
}
#[derive(RmcSerialize, Debug, Default, Clone)]
#[rmc_struct(1)]
struct CompetitionRankingGetParam{
unk: u32,
range: ResultsRange,
festival_ids: Vec<u32>,
}
#[derive(RmcSerialize, Debug, Default, Clone)]
#[rmc_struct(0)]
struct CompetitionRankingScoreInfo{
fest_id: u32,
score_data: Vec<u32>,
unk: u32,
team_wins: Vec<u32>,
team_votes: Vec<u32>
}
#[rmc_proto(112)]
pub trait Ranking{
//#[method_id(16)]
//async fn competition_ranking_get_param(&self, param: CompetitionRankingGetParam) -> Result<Vec<CompetitionRankingScoreInfo>,ErrorCode>;
}