fix(prudp): use seperate streams for seperate substreams via EncryptionPair

This commit is contained in:
DJMrTV 2025-02-03 21:33:07 +01:00
commit 4f26aae1d7
2 changed files with 45 additions and 28 deletions

View file

@ -26,7 +26,7 @@ pub struct Socket {
}
type OnConnectHandlerFn = Box<dyn Fn(PRUDPPacket) -> Pin<Box<dyn Future<Output=Option<(Vec<u8>, (Box<dyn StreamCipher + Send>, Box<dyn StreamCipher + Send>), Option<ActiveSecureConnectionData>)>> + Send>> + Send + Sync>;
type OnConnectHandlerFn = Box<dyn Fn(PRUDPPacket, u8) -> Pin<Box<dyn Future<Output=Option<(Vec<u8>, Vec<EncryptionPair>, Option<ActiveSecureConnectionData>)>> + Send>> + Send + Sync>;
type OnDataHandlerFn = Box<dyn for<'a> Fn(PRUDPPacket, Arc<SocketData>, &'a mut MutexGuard<'_, ConnectionData>) -> Pin<Box<dyn Future<Output=()> + 'a + Send>> + Send + Sync>;
pub struct ActiveSecureConnectionData {
@ -43,12 +43,16 @@ pub struct SocketData {
on_data_handler: OnDataHandlerFn,
}
pub struct EncryptionPair{
pub send: Box<dyn StreamCipher + Send>,
pub recv: Box<dyn StreamCipher + Send>
}
pub struct ActiveConnectionData {
pub reliable_client_counter: u16,
pub reliable_server_counter: u16,
pub reliable_client_queue: VecDeque<PRUDPPacket>,
server_encryption: Box<dyn StreamCipher + Send>,
client_decryption: Box<dyn StreamCipher + Send>,
pub encryption_pairs: Vec<EncryptionPair>,
pub server_session_id: u8,
pub active_secure_connection_data: Option<ActiveSecureConnectionData>
}
@ -218,18 +222,22 @@ impl SocketData {
CONNECT => {
info!("got connect");
let Some(MaximumSubstreamId(max_substream)) = packet.options.iter().find(|v| matches!(v, MaximumSubstreamId(_))) else {
return;
};
let Some((
accepted,
(client_decryption, server_encryption),
encryption_pairs,
active_secure_connection_data
)) = (self.on_connect_handler)(packet.clone()).await else {
)) = (self.on_connect_handler)(packet.clone(), *max_substream).await else {
error!("invalid connection request");
return;
};
connection.active_connection_data = Some(ActiveConnectionData {
client_decryption,
server_encryption,
encryption_pairs,
reliable_client_queue: VecDeque::new(),
reliable_client_counter: 2,
reliable_server_counter: 1,
@ -341,7 +349,11 @@ impl SocketData {
active_connection.reliable_client_counter = active_connection.reliable_client_counter.overflowing_add(1).0;
active_connection.client_decryption.apply_keystream(&mut packet.payload);
let Some(stream) = active_connection.encryption_pairs.get_mut(packet.header.substream_id as usize).map(|e| &mut e.recv) else {
return;
};
stream.apply_keystream(&mut packet.payload);
// we cant divert this off to another thread we HAVE to process it now to keep order
@ -417,8 +429,6 @@ impl SocketData {
impl ConnectionData{
pub async fn finish_and_send_packet_to(&mut self, socket: &SocketData, mut packet: PRUDPPacket){
println!("{}", hex::encode(&packet.payload));
if (packet.header.types_and_flags.get_flags() & RELIABLE) != 0{
let Some(active_connection) = self.active_connection_data.as_mut() else {
error!("tried to send a secure packet to an inactive connection");
@ -428,7 +438,11 @@ impl ConnectionData{
packet.header.sequence_id = active_connection.reliable_server_counter;
active_connection.reliable_server_counter += 1;
active_connection.server_encryption.apply_keystream(&mut packet.payload);
let Some(encryption) = active_connection.encryption_pairs.get_mut(packet.header.substream_id as usize).map(|e| &mut e.send) else {
return;
};
encryption.apply_keystream(&mut packet.payload);
}
packet.header.source_port = socket.virtual_port;