From 4f26aae1d776787f90295101cc8aabf8ef801de5 Mon Sep 17 00:00:00 2001 From: DJMrTV Date: Mon, 3 Feb 2025 21:33:07 +0100 Subject: [PATCH] fix(prudp): use seperate streams for seperate substreams via `EncryptionPair` --- src/main.rs | 37 ++++++++++++++++++++----------------- src/prudp/socket.rs | 36 +++++++++++++++++++++++++----------- 2 files changed, 45 insertions(+), 28 deletions(-) diff --git a/src/main.rs b/src/main.rs index a54a641..34645b8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,7 +15,7 @@ use crate::nex::account::Account; use crate::protocols::auth; use crate::protocols::auth::AuthProtocolConfig; use crate::protocols::server::RMCProtocolServer; -use crate::prudp::socket::Socket; +use crate::prudp::socket::{EncryptionPair, Socket}; use crate::prudp::packet::{PRUDPPacket, VirtualPort}; use crate::prudp::router::Router; use crate::rmc::message::RMCMessage; @@ -112,18 +112,27 @@ async fn start_auth_server() -> AuthServer{ router.clone(), VirtualPort::new(1,10), "6f599f81", - Box::new(|_|{ + Box::new(|_, count|{ Box::pin( async move { - let rc4: Rc4 = Rc4::new_from_slice( "CD&ML".as_bytes()).unwrap(); - let cypher = Box::new(rc4); - let server_cypher: Box = cypher; - let rc4: Rc4 = Rc4::new_from_slice( "CD&ML".as_bytes()).unwrap(); - let cypher = Box::new(rc4); - let client_cypher: Box = cypher; - Some((Vec::new(), (server_cypher, client_cypher), None)) + let encryption_pairs = Vec::from_iter((0..=count).map(|v| { + let rc4: Rc4 = Rc4::new_from_slice( "CD&ML".as_bytes()).unwrap(); + let cypher = Box::new(rc4); + let server_cypher: Box = cypher; + + let rc4: Rc4 = Rc4::new_from_slice( "CD&ML".as_bytes()).unwrap(); + let cypher = Box::new(rc4); + let client_cypher: Box = cypher; + + EncryptionPair{ + recv: client_cypher, + send: server_cypher + } + })); + + Some((Vec::new(), encryption_pairs, None)) } ) }), @@ -162,18 +171,12 @@ async fn start_secure_server() -> SecureServer{ router.clone(), VirtualPort::new(1,10), "6f599f81", - Box::new(|p|{ + Box::new(|p, count|{ Box::pin( async move { - let rc4: Rc4 = Rc4::new_from_slice( "CD&ML".as_bytes()).unwrap(); - let cypher = Box::new(rc4); - let server_cypher: Box = cypher; - let rc4: Rc4 = Rc4::new_from_slice( "CD&ML".as_bytes()).unwrap(); - let cypher = Box::new(rc4); - let client_cypher: Box = cypher; - Some((Vec::new(), (server_cypher, client_cypher), None)) + Some((Vec::new(), Vec::new(), None)) } ) }), diff --git a/src/prudp/socket.rs b/src/prudp/socket.rs index 4bf9f0c..930d8f3 100644 --- a/src/prudp/socket.rs +++ b/src/prudp/socket.rs @@ -26,7 +26,7 @@ pub struct Socket { } -type OnConnectHandlerFn = Box Pin, (Box, Box), Option)>> + Send>> + Send + Sync>; +type OnConnectHandlerFn = Box Pin, Vec, Option)>> + Send>> + Send + Sync>; type OnDataHandlerFn = Box Fn(PRUDPPacket, Arc, &'a mut MutexGuard<'_, ConnectionData>) -> Pin + 'a + Send>> + Send + Sync>; pub struct ActiveSecureConnectionData { @@ -43,12 +43,16 @@ pub struct SocketData { on_data_handler: OnDataHandlerFn, } +pub struct EncryptionPair{ + pub send: Box, + pub recv: Box +} + pub struct ActiveConnectionData { pub reliable_client_counter: u16, pub reliable_server_counter: u16, pub reliable_client_queue: VecDeque, - server_encryption: Box, - client_decryption: Box, + pub encryption_pairs: Vec, pub server_session_id: u8, pub active_secure_connection_data: Option } @@ -218,18 +222,22 @@ impl SocketData { CONNECT => { info!("got connect"); + let Some(MaximumSubstreamId(max_substream)) = packet.options.iter().find(|v| matches!(v, MaximumSubstreamId(_))) else { + return; + }; + let Some(( accepted, - (client_decryption, server_encryption), + encryption_pairs, active_secure_connection_data - )) = (self.on_connect_handler)(packet.clone()).await else { + )) = (self.on_connect_handler)(packet.clone(), *max_substream).await else { error!("invalid connection request"); return; }; + connection.active_connection_data = Some(ActiveConnectionData { - client_decryption, - server_encryption, + encryption_pairs, reliable_client_queue: VecDeque::new(), reliable_client_counter: 2, reliable_server_counter: 1, @@ -341,7 +349,11 @@ impl SocketData { active_connection.reliable_client_counter = active_connection.reliable_client_counter.overflowing_add(1).0; - active_connection.client_decryption.apply_keystream(&mut packet.payload); + let Some(stream) = active_connection.encryption_pairs.get_mut(packet.header.substream_id as usize).map(|e| &mut e.recv) else { + return; + }; + + stream.apply_keystream(&mut packet.payload); // we cant divert this off to another thread we HAVE to process it now to keep order @@ -417,8 +429,6 @@ impl SocketData { impl ConnectionData{ pub async fn finish_and_send_packet_to(&mut self, socket: &SocketData, mut packet: PRUDPPacket){ - println!("{}", hex::encode(&packet.payload)); - if (packet.header.types_and_flags.get_flags() & RELIABLE) != 0{ let Some(active_connection) = self.active_connection_data.as_mut() else { error!("tried to send a secure packet to an inactive connection"); @@ -428,7 +438,11 @@ impl ConnectionData{ packet.header.sequence_id = active_connection.reliable_server_counter; active_connection.reliable_server_counter += 1; - active_connection.server_encryption.apply_keystream(&mut packet.payload); + let Some(encryption) = active_connection.encryption_pairs.get_mut(packet.header.substream_id as usize).map(|e| &mut e.send) else { + return; + }; + + encryption.apply_keystream(&mut packet.payload); } packet.header.source_port = socket.virtual_port;