feat: rnex works, all major roadblocks are gone, the rnex rewrite is now in working state

This commit is contained in:
DJMrTV 2025-05-07 21:52:05 +02:00
commit 5e726fc9b0
12 changed files with 494 additions and 667 deletions

View file

@ -3,5 +3,6 @@ pub mod router;
pub mod socket;
mod auth_module;
pub mod sockaddr;
//pub mod secure;
pub mod station_url;
pub mod station_url;
pub mod secure;
pub mod unsecure;

View file

@ -50,8 +50,7 @@ impl Router {
trace!("got valid prudp packet from someone({}): \n{:?}", addr, packet);
let connection = packet.source_sockaddr(addr);
println!("data from {:?}", connection);
let endpoints = self.endpoints.read().await;

View file

@ -4,10 +4,13 @@ use log::error;
use rc4::cipher::StreamCipherCoreWrapper;
use rc4::{KeyInit, Rc4, Rc4Core, StreamCipher};
use rc4::consts::U16;
use typenum::U5;
use crate::endianness::{IS_BIG_ENDIAN, ReadExtensions};
use crate::kerberos::{derive_key, TicketInternalData};
use crate::nex::account::Account;
use crate::prudp::socket::EncryptionPair;
use crate::prudp::packet::PRUDPPacket;
use crate::prudp::socket::{CryptoHandler, CryptoHandlerConnectionInstance, EncryptionPair};
use crate::prudp::unsecure::UnsecureInstance;
use crate::rmc::structures::RmcSerialize;
pub fn read_secure_connection_data(data: &[u8], act: &Account) -> Option<([u8; 32], u32, u32)>{
@ -97,4 +100,93 @@ pub fn generate_secure_encryption_pairs(mut session_key: [u8; 32], count: u8) ->
}
vec
}
pub struct Secure(pub &'static str, pub &'static Account);
pub struct SecureInstance {
access_key: &'static str,
session_key: [u8; 32],
streams: Vec<EncryptionPair<Rc4<U32>>>,
self_signature: [u8; 16],
remote_signature: [u8; 16],
pid: u32,
}
impl CryptoHandler for Secure {
type CryptoConnectionInstance = SecureInstance;
fn instantiate(
&self,
remote_signature: [u8; 16],
self_signature: [u8; 16],
payload: &[u8],
substream_count: u8,
) -> Option<(Vec<u8>, Self::CryptoConnectionInstance)> {
let (session_key, pid, check_value) = read_secure_connection_data(payload, &self.1)?;
let check_value_response = check_value + 1;
let data = bytemuck::bytes_of(&check_value_response);
let mut response = Vec::new();
data.serialize(&mut response).ok()?;
let encryption_pairs = generate_secure_encryption_pairs(session_key, substream_count);
Some((
response,
SecureInstance {
pid,
streams: encryption_pairs,
session_key,
access_key: self.0,
remote_signature,
self_signature,
},
))
}
fn sign_pre_handshake(&self, packet: &mut PRUDPPacket) {
packet.set_sizes();
packet.calculate_and_assign_signature(self.0, None, None);
}
}
impl CryptoHandlerConnectionInstance for SecureInstance {
type Encryption = Rc4<U5>;
fn decrypt_incoming(&mut self, substream: u8, data: &mut [u8]) {
if let Some(crypt_pair) = self.streams.get_mut(substream as usize){
crypt_pair.recv.apply_keystream(data);
}
}
fn encrypt_outgoing(&mut self, substream: u8, data: &mut [u8]) {
if let Some(crypt_pair) = self.streams.get_mut(substream as usize){
crypt_pair.send.apply_keystream(data);
}
}
fn get_user_id(&self) -> u32 {
self.pid
}
fn sign_connect(&self, packet: &mut PRUDPPacket) {
packet.set_sizes();
packet.calculate_and_assign_signature(self.access_key, None, Some(self.self_signature));
}
fn sign_packet(&self, packet: &mut PRUDPPacket) {
packet.set_sizes();
packet.calculate_and_assign_signature(self.access_key, Some(self.session_key), Some(self.self_signature));
}
fn verify_packet(&self, packet: &PRUDPPacket) -> bool {
true
}
}

View file

@ -27,6 +27,7 @@ use tokio::net::UdpSocket;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::{Mutex, RwLock};
use tokio_stream::Stream;
use crate::nex::account::Account;
// due to the way this is designed crashing the router thread causes deadlock, sorry ;-;
// (maybe i will fix that some day)
@ -39,473 +40,13 @@ pub struct EncryptionPair<T: StreamCipher + Send> {
}
impl<T: StreamCipher + Send> EncryptionPair<T> {
fn init_both<F: Fn() -> T>(func: F) -> Self {
pub fn init_both<F: Fn() -> T>(func: F) -> Self {
Self {
recv: func(),
send: func(),
}
}
}
/*
pub async fn process_packet(
self: &Arc<Self>,
client_address: PRUDPSockAddr,
packet: &PRUDPPacket,
) {
let conn = self.connections.read().await;
if !conn.contains_key(&client_address) {
drop(conn);
let mut conn = self.connections.write().await;
//only insert if we STILL dont have the connection preventing double insertion
if !conn.contains_key(&client_address) {
conn.insert(
client_address,
(
Arc::new(Mutex::new(ConnectionData {
sock_addr: client_address,
id: random(),
signature: [0; 16],
server_signature: [0; 16],
active_connection_data: None,
})),
Arc::new(Mutex::new(())),
),
);
}
drop(conn);
} else {
drop(conn);
}
let connections = self.connections.read().await;
let Some(conn) = connections.get(&client_address) else {
error!("connection is still not present after making sure connection is present, giving up.");
return;
};
let conn = conn.clone();
// dont keep holding the connections list unnescesarily
drop(connections);
let mutual_exclusion_packet_handeling_mtx = conn.1.lock().await;
let mut connection = conn.0.lock().await;
if (packet.header.types_and_flags.get_flags() & ACK) != 0 {
//todo: handle acknowledgements and resending packets propperly
println!("got ack");
return;
}
if (packet.header.types_and_flags.get_flags() & MULTI_ACK) != 0 {
println!("got multi ack");
return;
}
match packet.header.types_and_flags.get_types() {
SYN => {
println!("got syn");
// reset heartbeat?
let mut response_packet = packet.base_response_packet();
response_packet.header.types_and_flags.set_types(SYN);
response_packet.header.types_and_flags.set_flag(ACK);
response_packet.header.types_and_flags.set_flag(HAS_SIZE);
connection.signature = client_address.calculate_connection_signature();
response_packet
.options
.push(ConnectionSignature(connection.signature));
for options in &packet.options {
match options {
SupportedFunctions(functions) => response_packet
.options
.push(SupportedFunctions(*functions & 0x04)),
MaximumSubstreamId(max_substream) => response_packet
.options
.push(MaximumSubstreamId(*max_substream)),
_ => { /* ??? */ }
}
}
response_packet.set_sizes();
response_packet.calculate_and_assign_signature(self.access_key, None, None);
let mut vec = Vec::new();
response_packet
.write_to(&mut vec)
.expect("somehow failed to convert backet to bytes");
self.socket
.send_to(&vec, client_address.regular_socket_addr)
.await
.expect("failed to send data back");
}
CONNECT => {
println!("got connect");
let Some(MaximumSubstreamId(max_substream)) = packet
.options
.iter()
.find(|v| matches!(v, MaximumSubstreamId(_)))
else {
return;
};
let Some((response_data, encryption_pairs, active_secure_connection_data)) =
(self.on_connect_handler)(packet.clone(), *max_substream).await
else {
error!("invalid connection request");
return;
};
connection.active_connection_data = Some(ActiveConnectionData {
encryption_pairs,
reliable_client_queue: VecDeque::new(),
reliable_client_counter: 2,
reliable_server_counter: 1,
server_session_id: packet.header.session_id,
active_secure_connection_data,
connection_id: random(),
});
let mut response_packet = packet.base_response_packet();
response_packet.payload = response_data;
response_packet.header.types_and_flags.set_types(CONNECT);
response_packet.header.types_and_flags.set_flag(ACK);
response_packet.header.types_and_flags.set_flag(HAS_SIZE);
// todo: (or not) sliding windows and stuff
response_packet.header.session_id = packet.header.session_id;
response_packet.header.sequence_id = 1;
response_packet
.options
.push(ConnectionSignature(Default::default()));
//let mut init_seq_id = 0;
for option in &packet.options {
match option {
MaximumSubstreamId(max_substream) => response_packet
.options
.push(MaximumSubstreamId(*max_substream)),
SupportedFunctions(funcs) => {
response_packet.options.push(SupportedFunctions(*funcs))
}
ConnectionSignature(sig) => connection.server_signature = *sig,
PacketOption::InitialSequenceId(_id) => {
//init_seq_id = *id;
}
_ => { /* ? */ }
}
}
// Splatoon doesnt use compression so we arent gonna compress unless i at some point
// want to implement some server which requires it
// No encryption here for the same reason
// todo: implement something to do secure servers
if connection.server_signature == <[u8; 16] as Default>::default() {
error!("didn't get connection signature from client")
}
response_packet.set_sizes();
response_packet.calculate_and_assign_signature(
self.access_key,
None,
Some(connection.server_signature),
);
let mut vec = Vec::new();
response_packet
.write_to(&mut vec)
.expect("somehow failed to convert backet to bytes");
self.socket
.send_to(&vec, client_address.regular_socket_addr)
.await
.expect("failed to send data back");
}
DATA => {
if (packet.header.types_and_flags.get_flags() & RELIABLE) != 0 {
let Some(active_connection) = connection.active_connection_data.as_mut() else {
error!("got data packet on non active connection!");
return;
};
match active_connection
.reliable_client_queue
.binary_search_by_key(&packet.header.sequence_id, |p| p.header.sequence_id)
{
Ok(_) => warn!("recieved packet twice"),
Err(position) => active_connection
.reliable_client_queue
.insert(position, packet.clone()),
}
if (packet.header.types_and_flags.get_flags() & NEED_ACK) != 0 {
let mut ack = packet.base_acknowledgement_packet();
ack.header.session_id = active_connection.server_session_id;
ack.set_sizes();
let potential_session_key = connection
.active_connection_data
.as_ref()
.unwrap()
.active_secure_connection_data
.as_ref()
.map(|s| s.session_key);
ack.calculate_and_assign_signature(
self.access_key,
potential_session_key,
Some(connection.server_signature),
);
let mut vec = Vec::new();
ack.write_to(&mut vec)
.expect("somehow failed to convert backet to bytes");
self.socket
.send_to(&vec, client_address.regular_socket_addr)
.await
.expect("failed to send data back");
}
drop(connection);
while let Some(mut packet) = {
let mut locked = conn.0.lock().await;
let packet = locked
.active_connection_data
.as_mut()
.map(|a| {
a.reliable_client_queue
.front()
.is_some_and(|v| {
v.header.sequence_id == a.reliable_client_counter
})
.then(|| a.reliable_client_queue.pop_front())
})
.flatten()
.flatten();
drop(locked);
packet
} {
if packet.options.iter().any(|v| match v {
PacketOption::FragmentId(f) => *f != 0,
_ => false,
}) {
error!("fragmented packets are unsupported right now")
}
let mut locked = conn.0.lock().await;
let active_connection = locked.active_connection_data.as_mut()
.expect("we litterally just recieved a packet which requires the connection to be active, failing this should be impossible");
active_connection.reliable_client_counter = active_connection
.reliable_client_counter
.overflowing_add(1)
.0;
let Some(stream) = active_connection
.encryption_pairs
.get_mut(packet.header.substream_id as usize)
.map(|e| &mut e.recv)
else {
return;
};
stream.apply_keystream(&mut packet.payload);
drop(locked);
// we cant divert this off to another thread we HAVE to process it now to keep order
(self.on_data_handler)(packet, self.clone(), conn.0.clone()).await;
// ignored for now
}
} else {
error!("unreliable packets are unimplemented");
unimplemented!()
}
//info!("{:?}", packet);
}
PING => {
let ConnectionData {
active_connection_data,
server_signature,
..
} = &mut *connection;
info!("got ping");
if (packet.header.types_and_flags.get_flags() & NEED_ACK) != 0 {
let Some(active_connection) = active_connection_data.as_mut() else {
error!("got data packet on non active connection!");
return;
};
let mut ack = packet.base_acknowledgement_packet();
ack.header.session_id = active_connection.server_session_id;
ack.set_sizes();
let potential_session_key = active_connection
.active_secure_connection_data
.as_ref()
.map(|s| s.session_key);
ack.calculate_and_assign_signature(
self.access_key,
potential_session_key,
Some(*server_signature),
);
let mut vec = Vec::new();
ack.write_to(&mut vec)
.expect("somehow failed to convert backet to bytes");
self.socket
.send_to(&vec, client_address.regular_socket_addr)
.await
.expect("failed to send data back");
}
}
DISCONNECT => {
println!("got disconnect");
let Some(active_connection) = &connection.active_connection_data else {
return;
};
let mut ack = packet.base_acknowledgement_packet();
ack.header.session_id = active_connection.server_session_id;
ack.set_sizes();
let potential_session_key = active_connection
.active_secure_connection_data
.as_ref()
.map(|s| s.session_key);
ack.calculate_and_assign_signature(
self.access_key,
potential_session_key,
Some(connection.server_signature),
);
let mut vec = Vec::new();
ack.write_to(&mut vec)
.expect("somehow failed to convert backet to bytes");
self.socket
.send_to(&vec, client_address.regular_socket_addr)
.await
.expect("failed to send data back");
self.socket
.send_to(&vec, client_address.regular_socket_addr)
.await
.expect("failed to send data back");
self.socket
.send_to(&vec, client_address.regular_socket_addr)
.await
.expect("failed to send data back");
}
_ => error!(
"unimplemented packet type: {}",
packet.header.types_and_flags.get_types()
),
}
drop(mutual_exclusion_packet_handeling_mtx)
}*/
/*
impl ConnectionData {
pub async fn finish_and_send_packet_to(
&mut self,
socket: &SocketData,
mut packet: PRUDPPacket,
) {
let mut web = WEB_DATA.lock().await;
web.data.push((
self.sock_addr.regular_socket_addr,
Outgoing(hex::encode(&packet.payload)),
));
drop(web);
if (packet.header.types_and_flags.get_flags() & RELIABLE) != 0 {
let Some(active_connection) = self.active_connection_data.as_mut() else {
error!("tried to send a secure packet to an inactive connection");
return;
};
packet.header.sequence_id = active_connection.reliable_server_counter;
active_connection.reliable_server_counter += 1;
let Some(encryption) = active_connection
.encryption_pairs
.get_mut(packet.header.substream_id as usize)
.map(|e| &mut e.send)
else {
return;
};
encryption.apply_keystream(&mut packet.payload);
}
packet.header.session_id = self
.active_connection_data
.as_ref()
.map(|v| v.server_session_id)
.unwrap_or_default();
packet.header.source_port = socket.virtual_port;
packet.header.destination_port = self.sock_addr.virtual_port;
packet.set_sizes();
let potential_session_key = self
.active_connection_data
.as_ref()
.unwrap()
.active_secure_connection_data
.as_ref()
.map(|s| s.session_key);
packet.calculate_and_assign_signature(
socket.access_key,
potential_session_key,
Some(self.server_signature),
);
let mut vec = Vec::new();
packet
.write_to(&mut vec)
.expect("somehow failed to convert backet to bytes");
if let Err(e) = socket
.socket
.send_to(&vec, self.sock_addr.regular_socket_addr)
.await
{
error!("unable to send packet to destination: {}", e);
}
}
}*/
pub struct NewEncryptionPair<E: StreamCipher> {
pub send: E,
@ -538,9 +79,11 @@ impl<E: CryptoHandlerConnectionInstance> Deref for InternalConnection<E>{
impl<E: CryptoHandlerConnectionInstance> InternalConnection<E>{
fn next_server_count(&mut self) -> u16{
let prev_val = self.reliable_server_counter;
let (val, _) = self.reliable_server_counter.overflowing_add(1);
self.reliable_server_counter = val;
val
println!("{}", prev_val);
prev_val
}
}
@ -645,14 +188,14 @@ impl<T: CryptoHandlerConnectionInstance> AnyInternalConnection for InternalConne
self.crypto_handler_instance.sign_packet(&mut packet);
packet.set_sizes();
let mut vec = Vec::new();
packet
.write_to(&mut vec)
.expect("somehow failed to convert backet to bytes");
println!("{}", hex::encode(&vec));
self.socket
.send_to(&vec, self.socket_addr.regular_socket_addr)
.await
@ -670,8 +213,6 @@ impl<T: CryptoHandler> InternalSocket<T> {
.write_to(&mut vec)
.expect("somehow failed to convert backet to bytes");
println!("sent out: {}", hex::encode(&vec));
self.socket
.send_to(&vec, dest.regular_socket_addr)
.await
@ -824,12 +365,15 @@ impl<T: CryptoHandler> InternalSocket<T> {
let session_id = packet.header.session_id;
let (return_data, crypto) = self.crypto_handler.instantiate(
let Some((return_data, crypto)) = self.crypto_handler.instantiate(
remote_signature,
*own_signature,
&packet.payload,
1 + *max_substream,
);
) else {
error!("someone attempted to connect with invalid data");
return;
};
let mut response = packet.base_response_packet();
response.header.types_and_flags.set_types(CONNECT);
@ -895,12 +439,6 @@ impl<T: CryptoHandler> InternalSocket<T> {
mem::swap(&mut data, &mut packet.payload);
conn.data_sender.send(data).await.expect("socket died");
if packet.header.types_and_flags.get_flags() & NEED_ACK == 0{
return;
}
let mut response = packet.base_acknowledgement_packet();
response.header.types_and_flags.set_flag(HAS_SIZE | ACK);
response.header.session_id = conn.session_id;
@ -908,6 +446,10 @@ impl<T: CryptoHandler> InternalSocket<T> {
conn.crypto_handler_instance.sign_packet(&mut response);
self.send_packet_unbuffered(address, response).await;
conn.data_sender.send(data).await.ok();
}
async fn handle_ping(&self, address: PRUDPSockAddr, packet: PRUDPPacket){
@ -993,6 +535,8 @@ impl<T: CryptoHandler> AnyInternalSocket for InternalSocket<T> {
SYN => self.handle_syn(address, packet).await,
CONNECT => self.handle_connect(address, packet).await,
DATA => self.handle_data(address, packet).await,
DISCONNECT => self.handle_disconnect(address, packet).await,
PING => self.handle_ping(address, packet).await,
_ => {
error!(
"unimplemented packet type: {}",
@ -1068,7 +612,7 @@ impl<T: CryptoHandler> AnyInternalSocket for InternalSocket<T> {
return None;
};
let (_, crypt) = self.crypto_handler.instantiate(remote_signature, *own_signature, &[], 1);
let (_, crypt) = self.crypto_handler.instantiate(remote_signature, *own_signature, &[], 1)?;
//todo: make this work for secure servers as well
self.create_connection(crypt, address, 0).await;
@ -1130,7 +674,7 @@ pub trait CryptoHandler: Send + Sync + 'static {
own_signature: [u8; 16],
_: &[u8],
substream_count: u8,
) -> (Vec<u8>, Self::CryptoConnectionInstance);
) -> Option<(Vec<u8>, Self::CryptoConnectionInstance)>;
fn sign_pre_handshake(&self, packet: &mut PRUDPPacket);
}
@ -1162,6 +706,7 @@ impl ExternalConnection{
impl SendingConnection{
pub async fn send(&self, data: Vec<u8>) -> Option<()> {
println!("{}", hex::encode(&data));
let internal = self.inernal.upgrade()?;
let mut internal = internal.lock().await;
@ -1169,81 +714,4 @@ impl SendingConnection{
internal.send_data_packet(data).await;
Some(())
}
}
pub struct Unsecure(pub &'static str);
pub struct UnsecureInstance {
key: &'static str,
streams: Vec<EncryptionPair<Rc4<U5>>>,
self_signature: [u8; 16],
remote_signature: [u8; 16],
}
// my hand was forced to use lazy so that we can guarantee this code
// only runs once and so that i can put it here as a "constant" (for performance and readability)
// since for some reason rust crypto doesn't have any const time key initialization
static DEFAULT_KEY: Lazy<Key<U5>> = Lazy::new(|| Key::from(*b"CD&ML"));
impl CryptoHandler for Unsecure {
type CryptoConnectionInstance = UnsecureInstance;
fn instantiate(
&self,
remote_signature: [u8; 16],
self_signature: [u8; 16],
_: &[u8],
substream_count: u8,
) -> (Vec<u8>, Self::CryptoConnectionInstance) {
(
Vec::new(),
UnsecureInstance {
streams: (0..substream_count)
.map(|_| EncryptionPair::init_both(|| Rc4::new(&DEFAULT_KEY)))
.collect(),
key: self.0,
remote_signature,
self_signature,
},
)
}
fn sign_pre_handshake(&self, packet: &mut PRUDPPacket) {
packet.set_sizes();
packet.calculate_and_assign_signature(self.0, None, None);
}
}
impl CryptoHandlerConnectionInstance for UnsecureInstance {
type Encryption = Rc4<U5>;
fn decrypt_incoming(&mut self, substream: u8, data: &mut [u8]) {
if let Some(crypt_pair) = self.streams.get_mut(substream as usize){
crypt_pair.recv.apply_keystream(data);
}
}
fn encrypt_outgoing(&mut self, substream: u8, data: &mut [u8]) {
if let Some(crypt_pair) = self.streams.get_mut(substream as usize){
crypt_pair.send.apply_keystream(data);
}
}
fn get_user_id(&self) -> u32 {
0
}
fn sign_connect(&self, packet: &mut PRUDPPacket) {
packet.set_sizes();
packet.calculate_and_assign_signature(self.key, None, Some(self.self_signature));
}
fn sign_packet(&self, packet: &mut PRUDPPacket) {
packet.set_sizes();
packet.calculate_and_assign_signature(self.key, None, Some(self.self_signature));
}
fn verify_packet(&self, packet: &PRUDPPacket) -> bool {
true
}
}
}

84
src/prudp/unsecure.rs Normal file
View file

@ -0,0 +1,84 @@
use once_cell::sync::Lazy;
use rc4::{Key, KeyInit, Rc4, StreamCipher};
use typenum::U5;
use crate::prudp::packet::PRUDPPacket;
use crate::prudp::socket::{CryptoHandler, CryptoHandlerConnectionInstance, EncryptionPair};
pub struct Unsecure(pub &'static str);
pub struct UnsecureInstance {
key: &'static str,
streams: Vec<EncryptionPair<Rc4<U5>>>,
self_signature: [u8; 16],
remote_signature: [u8; 16],
}
// my hand was forced to use lazy so that we can guarantee this code
// only runs once and so that i can put it here as a "constant" (for performance and readability)
// since for some reason rust crypto doesn't have any const time key initialization
static DEFAULT_KEY: Lazy<Key<U5>> = Lazy::new(|| Key::from(*b"CD&ML"));
impl CryptoHandler for Unsecure {
type CryptoConnectionInstance = UnsecureInstance;
fn instantiate(
&self,
remote_signature: [u8; 16],
self_signature: [u8; 16],
_: &[u8],
substream_count: u8,
) -> Option<(Vec<u8>, Self::CryptoConnectionInstance)> {
Some((
Vec::new(),
UnsecureInstance {
streams: (0..substream_count)
.map(|_| EncryptionPair::init_both(|| Rc4::new(&DEFAULT_KEY)))
.collect(),
key: self.0,
remote_signature,
self_signature,
},
))
}
fn sign_pre_handshake(&self, packet: &mut PRUDPPacket) {
packet.set_sizes();
packet.calculate_and_assign_signature(self.0, None, None);
}
}
impl CryptoHandlerConnectionInstance for UnsecureInstance {
type Encryption = Rc4<U5>;
fn decrypt_incoming(&mut self, substream: u8, data: &mut [u8]) {
if let Some(crypt_pair) = self.streams.get_mut(substream as usize){
crypt_pair.recv.apply_keystream(data);
}
}
fn encrypt_outgoing(&mut self, substream: u8, data: &mut [u8]) {
if let Some(crypt_pair) = self.streams.get_mut(substream as usize){
crypt_pair.send.apply_keystream(data);
}
}
fn get_user_id(&self) -> u32 {
0
}
fn sign_connect(&self, packet: &mut PRUDPPacket) {
packet.set_sizes();
packet.calculate_and_assign_signature(self.key, None, Some(self.self_signature));
}
fn sign_packet(&self, packet: &mut PRUDPPacket) {
packet.set_sizes();
packet.calculate_and_assign_signature(self.key, None, Some(self.self_signature));
}
fn verify_packet(&self, packet: &PRUDPPacket) -> bool {
true
}
}