diff --git a/.gitignore b/.gitignore index ea0ae4a..2a7a556 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ target .idea .env log -reports \ No newline at end of file +reports +.zed diff --git a/Cargo.lock b/Cargo.lock index 73c30a5..20a8271 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -134,9 +134,9 @@ dependencies = [ [[package]] name = "cfg-if" -version = "1.0.1" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9555578bc9e57714c812a1f84e4fc5b4d21fcb063490c624de019f7464c91268" +checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" [[package]] name = "chrono" @@ -385,16 +385,6 @@ dependencies = [ "syn 2.0.104", ] -[[package]] -name = "macros" -version = "0.1.1" -source = "git+https://github.com/DJMrTV/VByteMacros#e2f31bded8c5591e847ba03faf79ae0351e43e69" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "macros" version = "0.1.1" @@ -547,10 +537,39 @@ dependencies = [ ] [[package]] -name = "prudpv0" +name = "proxy" +version = "0.1.0" +dependencies = [ + "cfg-if", + "prudpv0", + "prudpv1", + "rnex-core", + "tokio", +] + +[[package]] +name = "proxy-common" version = "0.1.0" dependencies = [ "rnex-core", + "thiserror", + "tokio", +] + +[[package]] +name = "prudpv0" +version = "0.1.0" +dependencies = [ + "bytemuck", + "cfg-if", + "hmac", + "log", + "md-5", + "proxy-common", + "rc4", + "rnex-core", + "tokio", + "typenum", ] [[package]] @@ -568,7 +587,7 @@ dependencies = [ "thiserror", "tokio", "typenum", - "v-byte-helpers 0.1.1 (git+https://github.com/RusticMaple/VByteMacros)", + "v-byte-helpers", ] [[package]] @@ -665,7 +684,7 @@ dependencies = [ "tokio", "typenum", "ureq", - "v-byte-helpers 0.1.1 (git+https://github.com/DJMrTV/VByteMacros)", + "v-byte-helpers", ] [[package]] @@ -969,22 +988,13 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" -[[package]] -name = "v-byte-helpers" -version = "0.1.1" -source = "git+https://github.com/DJMrTV/VByteMacros#e2f31bded8c5591e847ba03faf79ae0351e43e69" -dependencies = [ - "bytemuck", - "macros 0.1.1 (git+https://github.com/DJMrTV/VByteMacros)", -] - [[package]] name = "v-byte-helpers" version = "0.1.1" source = "git+https://github.com/RusticMaple/VByteMacros#e2f31bded8c5591e847ba03faf79ae0351e43e69" dependencies = [ "bytemuck", - "macros 0.1.1 (git+https://github.com/RusticMaple/VByteMacros)", + "macros 0.1.1", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index d020704..9d6ea8a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,4 +5,4 @@ members = [ "rnex-core", "prudpv1", "prudpv0" -] +, "proxy", "proxy-common"] diff --git a/build-edition.sh b/build-edition.sh index 7b1cfc0..5d65fc3 100755 --- a/build-edition.sh +++ b/build-edition.sh @@ -7,5 +7,7 @@ fi source ./buildscripts/common.sh echo FEATURES: echo $EDITION_FEATURES +echo ENV SETTINGS: +env -OPENSSL_LIB_DIR=/usr/lib OPENSSL_INCLUDE_DIR=/usr/include/openssl OPENSSL_STATIC=1 RUSTFLAGS="-C relocation-model=static -C linker=ld.lld" cargo build --release --features "$EDITION_FEATURES" --target x86_64-unknown-linux-musl \ No newline at end of file +OPENSSL_LIB_DIR=/usr/lib OPENSSL_INCLUDE_DIR=/usr/include/openssl OPENSSL_STATIC=1 RUSTFLAGS="-C relocation-model=static -C linker=ld.lld" cargo build --release --features "$EDITION_FEATURES" --target x86_64-unknown-linux-musl diff --git a/buildscripts/common.sh b/buildscripts/common.sh index 27fd8cd..8c5e67e 100755 --- a/buildscripts/common.sh +++ b/buildscripts/common.sh @@ -6,4 +6,5 @@ IFS=$'\n' while IFS=$'\n' read -r KEY; do VAL=$(yq ea ".$EDITION.settings.$KEY" editions.yaml) declare "$KEY=$VAL" -done <<< "$SETTINGS" \ No newline at end of file + export $KEY +done <<< "$SETTINGS" diff --git a/editions.yaml b/editions.yaml index 033c77e..f20da91 100644 --- a/editions.yaml +++ b/editions.yaml @@ -6,4 +6,5 @@ splatoon: friends: features: - friends - settings: {} + settings: + RNEX_VIRTUAL_PORT: 1:10 diff --git a/proxy-common/Cargo.toml b/proxy-common/Cargo.toml new file mode 100644 index 0000000..28974ef --- /dev/null +++ b/proxy-common/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "proxy-common" +version = "0.1.0" +edition = "2024" + +[dependencies] +thiserror = "2.0.12" +rnex-core = { path = "../rnex-core", version = "0.1.1" } +tokio = { version = "1.47.0", features = ["full"] } diff --git a/proxy-common/src/lib.rs b/proxy-common/src/lib.rs new file mode 100644 index 0000000..68ac209 --- /dev/null +++ b/proxy-common/src/lib.rs @@ -0,0 +1,209 @@ +use rnex_core::{ + executables::common::{OWN_IP_PUBLIC, try_get_ip}, + prudp::{socket_addr::PRUDPSockAddr, virtual_port::VirtualPort}, + reggie::{RemoteEdgeNodeHolder, UnitPacketWrite}, + rmc::{ + protocols::{ + OnlyRemote, RemoteDisconnectable, RmcCallable, RmcConnection, RmcPureRemoteObject, + new_rmc_gateway_connection, + }, + structures::RmcSerialize, + }, + rnex_proxy_common::ConnectionInitData, + util::{SendingBufferConnection, SplittableBufferConnection}, +}; +use std::{ + env::{self, VarError}, + error, + net::{AddrParseError, IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4}, + ops::Deref, + panic, + str::FromStr, + sync::{Arc, LazyLock}, +}; +use thiserror::Error; +use tokio::net::TcpStream; + +const RNEX_DEFAULT_PORT: u16 = match u16::from_str_radix(env!("RNEX_DEFAULT_PORT"), 10) { + Ok(v) => v, + Err(_) => panic!("unable to get default port from env"), +}; + +#[derive(Error, Debug)] +pub enum Error { + #[error("error getting environment variable \"{0}\": {1}")] + UnableToGetEnv(&'static str, VarError), + #[error("error parsing ip address environment variable \"{0}\": {1}")] + AddrParse(&'static str, AddrParseError), + #[error( + "error error getting public ip address: \n\tattempted to read from env var \"SERVER_IP_PUBLIC\" and got: {0} \n\tattempted to request from internet and failed with: {1}" + )] + PubAddrGetErr(Box, Box), +} +impl Into for (&'static str, AddrParseError) { + fn into(self) -> Error { + Error::AddrParse(self.0, self.1) + } +} + +pub struct ProxyStartupParam { + pub forward_destination: SocketAddr, + pub edge_node_holder: SocketAddr, + pub self_public: SocketAddrV4, + pub self_private: SocketAddrV4, + pub virtual_port: VirtualPort, +} + +fn try_get_env(name: &'static str) -> Result +where + (&'static str, T::Err): Into, +{ + T::from_str(&env::var(name).map_err(|e| Error::UnableToGetEnv(name, e))?) + .map_err(|e| (name, e).into()) +} + +pub enum ProxyType { + Insecure, + Secure, +} +const VIRTUAL_PORT_INSECURE: LazyLock = + LazyLock::new(|| VirtualPort::parse(env!("RNEX_VIRTUAL_PORT_INSECURE")).unwrap()); +const VIRTUAL_PORT_SECURE: LazyLock = + LazyLock::new(|| VirtualPort::parse(env!("RNEX_VIRTUAL_PORT_SECURE")).unwrap()); +impl ProxyStartupParam { + pub fn new(prox_ty: ProxyType) -> Result { + let port = RNEX_DEFAULT_PORT + + match prox_ty { + ProxyType::Insecure => 0, + ProxyType::Secure => 1, + }; + let self_private = try_get_env("SERVER_IP_PRIVATE") + .unwrap_or(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, RNEX_DEFAULT_PORT)); + let self_public: SocketAddrV4 = match try_get_env("SERVER_IP_PUBLIC") { + Ok(v) => v, + Err(e) => try_get_ip() + .map(|v| SocketAddrV4::new(v, RNEX_DEFAULT_PORT)) + .map_err(move |v| Error::PubAddrGetErr(Box::new(e), v))?, + }; + + Ok(Self { + forward_destination: try_get_env("EDGE_NODE_HOLDER")?, + edge_node_holder: try_get_env("FORWARD_DESTINATION")?, + self_private, + self_public, + virtual_port: match prox_ty { + ProxyType::Insecure => *VIRTUAL_PORT_INSECURE, + ProxyType::Secure => *VIRTUAL_PORT_SECURE, + }, + }) + } +} + +struct OnRemoteDrop(T, Option); +impl Deref for OnRemoteDrop { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +// if we had something like a thread safe OnceConsume (basically the opposite of OnceLock) +// we could make C be an FnOnce +impl + OnRemoteDrop +{ + pub fn new(conn: RmcConnection, drop_func: C) -> Self { + Self(T::new(conn), Some(drop_func)) + } + + pub async fn disconnect(&self) { + self.0.disconnect().await; + } +} + +impl RmcCallable + for OnRemoteDrop +{ + fn rmc_call( + &self, + _responder: &SendingBufferConnection, + _protocol_id: u16, + _method_id: u32, + _call_id: u32, + _rest: Vec, + ) -> impl Future + Send { + // maybe respond with not implemented or something + async {} + } +} + +impl Drop for OnRemoteDrop { + fn drop(&mut self) { + self.1.take().unwrap()(); + } +} + +pub async fn setup_edge_node_connection( + param: &ProxyStartupParam, + shutdown_callback: impl FnOnce() + Send + Sync + 'static, +) { + let conn = tokio::net::TcpStream::connect(¶m.edge_node_holder) + .await + .unwrap(); + + let conn: SplittableBufferConnection = conn.into(); + + conn.send( + rnex_core::reggie::EdgeNodeHolderConnectOption::Register(param.self_public) + .to_data() + .unwrap(), + ) + .await; + //leave the inner object floating so that it gets destroyed once we disconnect + new_rmc_gateway_connection(conn, move |r| { + Arc::new(OnRemoteDrop::::new( + r, + shutdown_callback, + )) + }); +} + +pub async fn new_backend_connection( + param: &ProxyStartupParam, + addr: PRUDPSockAddr, + pid: u32, +) -> Option { + let mut stream = match TcpStream::connect(param.forward_destination).await { + Ok(v) => v, + Err(e) => { + return None; + } + }; + + if let Err(e) = stream + .send_buffer( + &ConnectionInitData { + prudpsock_addr: addr, + pid: pid, + } + .to_data() + .unwrap(), + ) + .await + { + return None; + }; + + Some(stream.into()) +} + +#[cfg(test)] +mod test { + use crate::{VIRTUAL_PORT_INSECURE, VIRTUAL_PORT_SECURE}; + + fn test_virtual_port_correct() { + println!("{:?}", VIRTUAL_PORT_INSECURE); + println!("{:?}", VIRTUAL_PORT_SECURE); + } +} diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml new file mode 100644 index 0000000..78e7438 --- /dev/null +++ b/proxy/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "proxy" +version = "0.1.0" +edition = "2024" + +[dependencies] +tokio = { version = "1.47.0", features = ["full"] } +prudpv0 = { path = "../prudpv0", optional = true } +prudpv1 = { path = "../prudpv1", optional = true } +cfg-if = "1.0.4" +rnex-core = { path = "../rnex-core", version = "0.1.1" } + +[features] +prudpv0 = ["dep:prudpv0"] +prudpv1 = ["dep:prudpv1"] +friends = ["prudpv0", "prudpv0/friends"] + + +[[bin]] +name = "proxy_insecure" +path = "src/insecure.rs" + +[[bin]] +name = "proxy_secure" +path = "src/secure.rs" diff --git a/proxy/src/insecure.rs b/proxy/src/insecure.rs new file mode 100644 index 0000000..1fe927f --- /dev/null +++ b/proxy/src/insecure.rs @@ -0,0 +1,8 @@ +use rnex_core::common::setup; + +#[tokio::main] +async fn main() { + setup(); + + proxy::start_insecure(ProxyStartupParam::new()).await; +} diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs new file mode 100644 index 0000000..7120adc --- /dev/null +++ b/proxy/src/lib.rs @@ -0,0 +1,11 @@ +use cfg_if::cfg_if; + +cfg_if! { + if #[cfg(feature = "prudpv0")]{ + pub use prudpv0::*; + } else if #[cfg(feature = "prudpv1")] { + pub use prudpv1::*; + } else { + compile_error!("no proxy type has been set"); + } +} diff --git a/proxy/src/secure.rs b/proxy/src/secure.rs new file mode 100644 index 0000000..d661d95 --- /dev/null +++ b/proxy/src/secure.rs @@ -0,0 +1,7 @@ +use rnex_core::common::setup; + +#[tokio::main] +async fn main() { + setup(); + proxy::start_secure(ProxyStartupParam::new()).await; +} diff --git a/prudpv0/Cargo.toml b/prudpv0/Cargo.toml index 439f882..dbdb1db 100644 --- a/prudpv0/Cargo.toml +++ b/prudpv0/Cargo.toml @@ -5,3 +5,15 @@ edition = "2024" [dependencies] rnex-core = { path = "../rnex-core", version = "0.1.1" } +tokio = { version = "1.47.0", features = ["full"] } +bytemuck = { version = "1.23.1", features = ["derive"] } +typenum = "1.18.0" +rc4 = "0.1.0" +log = "0.4.25" +cfg-if = "1.0.4" +proxy-common = {path = "../proxy-common"} +hmac = "0.12.1" +md-5 = "^0.10.6" + +[features] +friends = [] diff --git a/prudpv0/src/crypto/common_crypto.rs b/prudpv0/src/crypto/common_crypto.rs new file mode 100644 index 0000000..7196638 --- /dev/null +++ b/prudpv0/src/crypto/common_crypto.rs @@ -0,0 +1,54 @@ +trait IterExtra: Iterator { + fn sum_wrapping_u8(&mut self) -> u8 + where + Self::Item: Into; + + fn sum_wrapping_u32(&mut self) -> u32 + where + Self::Item: Into; +} + +impl IterExtra for T { + fn sum_wrapping_u8(&mut self) -> u8 + where + Self::Item: Into, + { + let mut sum = 0u8; + for v in self { + let val: u8 = v.into(); + sum = sum.wrapping_add(val); + } + sum + } + fn sum_wrapping_u32(&mut self) -> u32 + where + Self::Item: Into, + { + let mut sum = 0u32; + for v in self { + let val: u32 = v.into(); + sum = sum.wrapping_add(val); + } + sum + } +} + +#[inline(always)] +pub fn common_checksum(access_key: &str, data: &[u8]) -> u8 { + let leftover = data.len() % 4; + let word_sum = bytemuck::cast_slice::<_, u32>(&data[..data.len() - leftover]) + .iter() + .copied() + .sum_wrapping_u32(); + + let checksum = access_key.as_bytes().iter().copied().sum_wrapping_u8(); + let checksum = checksum.wrapping_add( + (&data[data.len() - leftover..]) + .iter() + .copied() + .sum_wrapping_u8(), + ); + let checksum = checksum.wrapping_add(word_sum.to_ne_bytes().into_iter().sum_wrapping_u8()); + + checksum +} diff --git a/prudpv0/src/crypto/friends_common.rs b/prudpv0/src/crypto/friends_common.rs new file mode 100644 index 0000000..b11af1d --- /dev/null +++ b/prudpv0/src/crypto/friends_common.rs @@ -0,0 +1,5 @@ +use hmac::Hmac; +use md5::Md5; + +pub const ACCESS_KEY: &str = "ridfebb9"; +pub type HmacMd5 = Hmac; diff --git a/prudpv0/src/crypto/friends_insecure.rs b/prudpv0/src/crypto/friends_insecure.rs new file mode 100644 index 0000000..b3ff9ab --- /dev/null +++ b/prudpv0/src/crypto/friends_insecure.rs @@ -0,0 +1,52 @@ +use std::rc::Rc; + +use hmac::Mac; +use rc4::{KeyInit, Rc4, StreamCipher}; +use rnex_core::prudp::encryption::{DEFAULT_KEY, EncryptionPair}; +use typenum::U5; + +use crate::crypto::{ + Crypto, CryptoInstance, + common_crypto::common_checksum, + friends_common::{ACCESS_KEY, HmacMd5}, +}; + +pub struct InsecureInstance { + pair: EncryptionPair>, +} + +impl CryptoInstance for InsecureInstance { + fn decrypt_incoming(&mut self, data: &mut [u8]) { + self.pair.recv.apply_keystream(data); + } + fn encrypt_outgoing(&mut self, data: &mut [u8]) { + self.pair.send.apply_keystream(data); + } + fn get_user_id(&self) -> u32 { + 0 + } + fn generate_signature(&self, data: &[u8]) -> [u8; 4] { + let mut hmac = ::new_from_slice(ACCESS_KEY.as_bytes()) + .expect("unable to create hmac md5"); + hmac.update(data); + hmac.finalize().into_bytes()[0..4].try_into().unwrap() + } +} + +pub struct Insecure(); + +impl Crypto for Insecure { + type Instance = InsecureInstance; + fn new() -> Self { + Self() + } + fn calculate_checksum(&self, data: &[u8]) -> u8 { + common_checksum(ACCESS_KEY, data) + } + + fn instantiate(&self, packet_data: &[u8]) -> Self::Instance { + InsecureInstance { + pair: EncryptionPair::init_both(|| Rc4::new(&DEFAULT_KEY)), + } + } +} diff --git a/prudpv0/src/crypto/friends_secure.rs b/prudpv0/src/crypto/friends_secure.rs new file mode 100644 index 0000000..db36c46 --- /dev/null +++ b/prudpv0/src/crypto/friends_secure.rs @@ -0,0 +1,47 @@ +use hmac::Mac; +use rc4::Rc4; +use rnex_core::prudp::encryption::EncryptionPair; +use typenum::U32; + +use crate::crypto::{ + Crypto, CryptoInstance, + common_crypto::common_checksum, + friends_common::{ACCESS_KEY, HmacMd5}, +}; + +pub struct SecureInstance { + pair: EncryptionPair>, +} + +impl CryptoInstance for SecureInstance { + fn decrypt_incoming(&mut self, data: &mut [u8]) { + todo!() + } + fn encrypt_outgoing(&mut self, data: &mut [u8]) { + todo!() + } + fn get_user_id(&self) -> u32 { + todo!() + } + fn generate_signature(&self, data: &[u8]) -> [u8; 4] { + let mut hmac = ::new_from_slice(ACCESS_KEY.as_bytes()) + .expect("unable to create hmac md5"); + hmac.update(data); + hmac.finalize().into_bytes()[0..4].try_into().unwrap() + } +} + +pub struct Secure(); + +impl Crypto for Secure { + type Instance = SecureInstance; + fn new() -> Self { + Self() + } + fn calculate_checksum(&self, data: &[u8]) -> u8 { + common_checksum(ACCESS_KEY, data) + } + fn instantiate(&self, data: &[u8]) -> Self::Instance { + todo!() + } +} diff --git a/prudpv0/src/crypto/insecure.rs b/prudpv0/src/crypto/insecure.rs new file mode 100644 index 0000000..5d3dff3 --- /dev/null +++ b/prudpv0/src/crypto/insecure.rs @@ -0,0 +1,9 @@ +use crate::crypto::Crypto; + +pub struct Insecure(); + +impl Crypto for Insecure { + fn calculate_checksum(&self, data: &[u8]) -> u8 { + todo!() + } +} diff --git a/prudpv0/src/crypto/mod.rs b/prudpv0/src/crypto/mod.rs new file mode 100644 index 0000000..add58b9 --- /dev/null +++ b/prudpv0/src/crypto/mod.rs @@ -0,0 +1,32 @@ +use cfg_if::cfg_if; + +mod common_crypto; + +pub trait CryptoInstance: Send + 'static { + fn decrypt_incoming(&mut self, data: &mut [u8]); + fn encrypt_outgoing(&mut self, data: &mut [u8]); + fn generate_signature(&self, data: &[u8]) -> [u8; 4]; + fn get_user_id(&self) -> u32; +} + +pub trait Crypto: Send + Sync + 'static { + type Instance: CryptoInstance; + fn new() -> Self; + fn calculate_checksum(&self, data: &[u8]) -> u8; + fn instantiate(&self, data: &[u8]) -> Self::Instance; +} + +cfg_if! { + if #[cfg(feature = "friends")]{ + pub mod friends_common; + pub mod friends_insecure; + pub use friends_insecure::*; + pub mod friends_secure; + pub use friends_secure::*; + } else { + pub mod secure; + pub use secure::*; + pub mod insecure; + pub use insecure::*; + } +} diff --git a/prudpv0/src/crypto/secure.rs b/prudpv0/src/crypto/secure.rs new file mode 100644 index 0000000..435986e --- /dev/null +++ b/prudpv0/src/crypto/secure.rs @@ -0,0 +1,9 @@ +use crate::crypto::Crypto; + +pub struct Secure(); + +impl Crypto for Secure { + fn calculate_checksum(&self, data: &[u8]) -> u8 { + todo!() + } +} diff --git a/prudpv0/src/lib.rs b/prudpv0/src/lib.rs index e69de29..6ba8216 100644 --- a/prudpv0/src/lib.rs +++ b/prudpv0/src/lib.rs @@ -0,0 +1,62 @@ +use bytemuck::{Pod, Zeroable}; +use log::{error, info, warn}; +use proxy_common::{ProxyStartupParam, setup_edge_node_connection}; +use rnex_core::executables::common::{OWN_IP_PRIVATE, OWN_IP_PUBLIC, SERVER_PORT}; +use rnex_core::prudp::types_flags::TypesFlags; +use rnex_core::prudp::types_flags::types::SYN; +use rnex_core::prudp::virtual_port::VirtualPort; +use rnex_core::reggie::EdgeNodeHolderConnectOption::Register; +use rnex_core::reggie::RemoteEdgeNodeHolder; +use rnex_core::rmc::protocols::{OnlyRemote, new_rmc_gateway_connection}; +use rnex_core::rmc::structures::RmcSerialize; +use rnex_core::util::SplittableBufferConnection; +use std::env; +use std::net::SocketAddrV4; +use std::process::abort; +use std::sync::{Arc, LazyLock}; +use tokio::net::UdpSocket; + +use crate::crypto::{Crypto, Insecure, Secure}; +use crate::packet::PRUDPV0Packet; +use crate::server::Server; + +mod crypto; +mod packet; +mod server; + +pub static EDGE_NODE_HOLDER: LazyLock = LazyLock::new(|| { + env::var("EDGE_NODE_HOLDER") + .ok() + .and_then(|s| s.parse().ok()) + .expect("EDGE_NODE_HOLDER not set") +}); + +pub static FORWARD_DESTINATION: LazyLock = LazyLock::new(|| { + env::var("FORWARD_DESTINATION") + .ok() + .and_then(|s| s.parse().ok()) + .expect("FORWARD_DESTINATION not set") +}); +//same as with prudpv1 this is responsible for handeling the different cryptography +//implementations, e.g. secure and insecure(this also includes special cases like friends) + +async fn start_proxy(param: ProxyStartupParam) { + setup_edge_node_connection(¶m, || abort()); + + info!("creating cryptography instance"); + let mut crypto = Arc::new(T::new()); + info!("binding to socket"); + + let server: Arc> = Arc::new(Server::new().await); + + info!("waiting on packets"); + server.run_task().await; +} + +pub async fn start_secure(param: ProxyStartupParam) { + start_proxy::(param).await; +} + +pub async fn start_insecure(param: ProxyStartupParam) { + start_proxy::(param).await; +} diff --git a/prudpv0/src/packet.rs b/prudpv0/src/packet.rs new file mode 100644 index 0000000..3d6d06a --- /dev/null +++ b/prudpv0/src/packet.rs @@ -0,0 +1,187 @@ +use std::mem::transmute; + +use bytemuck::{Pod, Zeroable, try_from_bytes, try_from_bytes_mut}; +use log::error; +use rnex_core::prudp::{ + types_flags::{ + TypesFlags, + flags::{HAS_SIZE, NEED_ACK}, + types::{CONNECT, DATA, SYN}, + }, + virtual_port::VirtualPort, +}; + +use crate::crypto::{Crypto, CryptoInstance}; + +#[repr(C, packed)] +#[derive(Clone, Copy, Pod, Zeroable, Debug)] +pub struct PRUDPV0Header { + pub source: VirtualPort, + pub destination: VirtualPort, + pub type_flags: TypesFlags, + pub session_id: u8, + pub packet_signature: [u8; 4], + pub sequence_id: u16, +} +#[repr(transparent)] +pub struct PRUDPV0Packet>(pub T); + +impl> PRUDPV0Packet { + #[inline(always)] + pub fn get_packet_specific_size(&self) -> Option { + Some(get_types_flags_size_from_types_flags( + self.header()?.type_flags, + )) + } + + #[inline(always)] + pub fn header(&self) -> Option<&PRUDPV0Header> { + try_from_bytes(self.0.as_ref().get(..size_of::())?).ok() + } + #[inline(always)] + pub fn header_mut(&mut self) -> Option<&mut PRUDPV0Header> + where + T: AsMut<[u8]>, + { + try_from_bytes_mut(self.0.as_mut().get_mut(..size_of::())?).ok() + } + + #[inline(always)] + pub fn connection_signature(&self) -> Option<&[u8; 4]> { + let offset = size_of::(); + Some(self.0.as_ref().get(offset..offset + 4)?.try_into().ok()?) + } + #[inline(always)] + pub fn connection_signature_mut(&mut self) -> Option<&mut [u8; 4]> + where + T: AsMut<[u8]>, + { + let offset = size_of::(); + Some( + self.0 + .as_mut() + .get_mut(offset..offset + 4)? + .try_into() + .ok()?, + ) + } + + #[inline(always)] + fn get_payload_offset(&self) -> Option { + Some(size_of::() + self.get_packet_specific_size()?) + } + + #[inline(always)] + pub fn payload(&self) -> Option<&[u8]> { + self.0 + .as_ref() + .get(self.get_payload_offset()?..(self.0.as_ref().len().saturating_sub(1))) + } + #[inline(always)] + pub fn payload_mut(&mut self) -> Option<&mut [u8]> + where + T: AsMut<[u8]>, + { + let start_offset = self.get_payload_offset()?; + let end_offset = self.0.as_ref().len().saturating_sub(1); + self.0.as_mut().get_mut(start_offset..end_offset) + } + + #[inline(always)] + pub fn checksummed_data(&self) -> Option<&[u8]> { + self.0 + .as_ref() + .get(..self.0.as_ref().len().saturating_sub(1)) + } + + #[inline(always)] + pub fn checksum(&self) -> Option { + self.0.as_ref().last().copied() + } + #[inline(always)] + pub fn checksum_mut(&mut self) -> Option<&mut u8> + where + T: AsMut<[u8]>, + { + self.0.as_mut().last_mut() + } + + #[inline(always)] + pub fn check_checksum(&self, crypto: &impl Crypto) -> bool { + let Some(data) = self.checksummed_data() else { + return false; + }; + let Some(checksum) = self.checksum() else { + return false; + }; + checksum == crypto.calculate_checksum(data) + } + + pub fn new(data: T) -> Self { + Self(data) + } +} + +const DEFAULT_SIGNAT: [u8; 4] = [0x12, 0x34, 0x56, 0x78]; +#[inline(always)] +const fn get_size_offset(tf: TypesFlags) -> usize { + size_of::() + + (if tf.get_types() & (SYN | CONNECT) != 0 { + 4 + } else if tf.get_types() & DATA != 0 { + 1 + } else { + 0 + }) +} +#[inline(always)] +const fn get_type_specific_size(tf: TypesFlags) -> usize { + if tf.get_types() & (SYN | CONNECT) != 0 { + 4 + } else if tf.get_types() & DATA != 0 { + 1 + } else { + 0 + } +} +#[inline(always)] +const fn get_types_flags_size_from_types_flags(tf: TypesFlags) -> usize { + get_type_specific_size(tf) + (if tf.get_flags() & HAS_SIZE != 0 { 2 } else { 0 }) +} +#[inline(always)] +pub const fn precalc_size(tf: TypesFlags, payload_size: usize) -> usize { + size_of::() + get_types_flags_size_from_types_flags(tf) + payload_size + 1 +} +pub fn new_syn_packet( + flags: u16, + source: VirtualPort, + destination: VirtualPort, + signat: [u8; 4], + crypto: &impl Crypto, +) -> Vec { + let type_flags = TypesFlags::default().types(SYN).flags(flags); + + let vec = vec![0; precalc_size(type_flags, 0)]; + let mut packet = PRUDPV0Packet::new(vec); + let header = packet.header_mut().expect("packet malformed in creation"); + + *header = PRUDPV0Header { + destination, + source, + packet_signature: DEFAULT_SIGNAT, + sequence_id: 0, + session_id: 0, + type_flags, + }; + *packet + .connection_signature_mut() + .expect("packet malformed in creation") = signat; + + *packet.checksum_mut().expect("packet malformed in creation") = crypto.calculate_checksum( + packet + .checksummed_data() + .expect("packet malformed in creation"), + ); + + packet.0 +} diff --git a/prudpv0/src/server.rs b/prudpv0/src/server.rs new file mode 100644 index 0000000..e3b8bbf --- /dev/null +++ b/prudpv0/src/server.rs @@ -0,0 +1,256 @@ +use std::{ + collections::HashMap, + hash::Hash, + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + sync::{ + Arc, LazyLock, + atomic::{AtomicBool, AtomicU32}, + }, +}; + +use log::{error, info, warn}; +use proxy_common::{ProxyStartupParam, new_backend_connection}; +use rnex_core::{ + executables::common::{OWN_IP_PRIVATE, SERVER_PORT}, + prudp::{ + socket_addr::PRUDPSockAddr, + types_flags::{ + TypesFlags, + flags::{ACK, HAS_SIZE, NEED_ACK}, + types::{CONNECT, DATA, SYN}, + }, + virtual_port::VirtualPort, + }, + rnex_proxy_common::ConnectionInitData, + util::{SendingBufferConnection, SplittableBufferConnection}, +}; +use tokio::{ + net::{TcpSocket, UdpSocket}, + spawn, + sync::{Mutex, RwLock}, + time::Instant, +}; + +use crate::{ + crypto::{Crypto, CryptoInstance}, + packet::{PRUDPV0Header, PRUDPV0Packet, new_syn_packet, precalc_size}, +}; + +pub struct InternalConnection { + last_action: Instant, + crypto_instance: C, + server_packet_counter: u16, + client_packet_counter: u16, + unacknowledged_packets: HashMap>>, +} +pub struct Connection { + alive: AtomicBool, + session_id: u8, + target: SendingBufferConnection, + self_signat: [u8; 4], + remote_signat: [u8; 4], + addr: PRUDPSockAddr, + inner: Mutex>, +} + +impl InternalConnection { + fn next_server_count(&mut self) -> u16 { + let prev_val = self.server_packet_counter; + let (val, _) = self.server_packet_counter.overflowing_add(1); + self.server_packet_counter = val; + + prev_val + } +} + +pub struct Server { + param: ProxyStartupParam, + socket: UdpSocket, + crypto: C, + connections: RwLock>>>, +} + +impl Server { + async fn send_data_packet(&self, conn: &Connection, data: &[u8]) { + let type_flags = TypesFlags::default().types(DATA).flags(HAS_SIZE | NEED_ACK); + let vec = vec![0; precalc_size(type_flags, data.len())]; + let mut packet = PRUDPV0Packet::new(vec); + + let payload = packet.payload_mut().expect("packet malformed in creation"); + payload.copy_from_slice(data); + + let mut inner = conn.inner.lock().await; + inner.crypto_instance.encrypt_outgoing(payload); + let packet_signat = inner.crypto_instance.generate_signature(payload); + let seq = inner.next_server_count(); + + *packet.header_mut().expect("packet malformed in creation") = PRUDPV0Header { + source: self.param.virtual_port, + destination: conn.addr.virtual_port, + type_flags, + session_id: conn.session_id, + packet_signature: packet_signat, + sequence_id: seq, + }; + /* we leave the sequence id as is for now as it defaults to 0 */ + + *packet.checksum_mut().expect("packet malformed in creation") = + self.crypto.calculate_checksum( + packet + .checksummed_data() + .expect("packet malformed in creation"), + ); + + let packet_raw = packet.0; + + let packet = Arc::new(packet_raw); + + let packet_ref = Arc::downgrade(&packet); + + inner.unacknowledged_packets.insert(seq, packet); + + drop(inner); + + spawn(async move { + for n in 0..5 { + let Some(data) = packet_ref.upgrade() else { + return; + }; + info!("send attempt {}", n); + } + }); + } + async fn connection_thread( + self: Arc, + conn: Arc>, + mut recv: SplittableBufferConnection, + ) { + while let Some(data) = recv.recv().await {} + } + async fn timeout_thread(self: Arc, conn: Arc>) { + loop { + conn + } + } + async fn handle_syn(self: Arc, packet: PRUDPV0Packet<&[u8]>, addr: PRUDPSockAddr) { + info!("got syn"); + let header = packet.header().unwrap(); + + let signat = addr.calculate_connection_signature(); + let signat = [signat[0], signat[1], signat[2], signat[3]]; + + let packet = new_syn_packet(ACK, header.destination, header.source, signat, &self.crypto); + self.socket.send_to(&packet, addr.regular_socket_addr).await; + } + async fn handle_connect(self: Arc, packet: PRUDPV0Packet<&[u8]>, addr: PRUDPSockAddr) { + let conn = self.connections.write().await; + let Some(data) = packet.payload() else { + warn!("malformed packet from: {:?}", addr.regular_socket_addr); + return; + }; + let Some(self_signat) = packet.connection_signature().copied() else { + warn!( + "malformed packet(unable to find connection signature) from: {:?}", + addr + ); + return; + }; + + let ci = self.crypto.instantiate(data); + + let pid = ci.get_user_id(); + let conn = new_backend_connection(&self.param, addr, pid).await; + let Some(conn) = conn else { + error!("unable to connect to backend"); + return; + }; + + let remote_signat = addr.calculate_connection_signature(); + let remote_signat = [ + remote_signat[0], + remote_signat[1], + remote_signat[2], + remote_signat[3], + ]; + + let header = packet.header().expect("header should be validated by now"); + + let conn = Arc::new(Connection { + target: conn.duplicate_sender(), + remote_signat, + self_signat, + addr, + session_id: header.session_id, + alive: AtomicBool::new(true), + inner: Mutex::new(InternalConnection { + last_action: Instant::now(), + crypto_instance: ci, + client_packet_counter: 2, + server_packet_counter: 1, + unacknowledged_packets: HashMap::new(), + }), + }); + } + async fn process_packet<'a>(self: Arc, packet: PRUDPV0Packet<&[u8]>, addr: SocketAddrV4) { + if !packet.check_checksum(&self.crypto) { + warn!("invalid checksum from: {}", addr); + return; + } + + let Some(header) = packet.header() else { + warn!("malformatted packet from: {}", addr); + return; + }; + + let addr = PRUDPSockAddr::new(addr, header.source); + println!("{:?}", header); + match header.type_flags.get_types() { + SYN => { + self.handle_syn(packet, addr).await; + } + CONNECT => { + self.handle_connect(packet, addr).await; + } + v => { + println!("unimplemented packed type: {}", v); + } + } + } + pub async fn run_task(self: Arc) { + loop { + let mut vec: Vec = vec![]; + let addr = match self.socket.recv_buf_from(&mut vec).await { + Err(e) => { + error!("unable to recv: {}", e); + break; + } + Ok(v) => { + assert_eq!(vec.len(), v.0); + v.1 + } + }; + let this = self.clone(); + tokio::spawn(async move { + let data = vec; + let packet = PRUDPV0Packet::new(&data[..]); + + let SocketAddr::V4(addr) = addr else { + unreachable!() + }; + + this.process_packet(packet, addr).await; + }); + } + } + pub async fn new(param: ProxyStartupParam) -> Self { + let socket = UdpSocket::bind(SocketAddrV4::new(*OWN_IP_PRIVATE, *SERVER_PORT)) + .await + .expect("unable to bind socket"); + Self { + socket, + crypto: C::new(), + connections: RwLock::new(HashMap::new()), + param, + } + } +} diff --git a/prudpv1/src/executables/common.rs b/prudpv1/src/executables/common.rs index e8162a1..8d6f4c1 100644 --- a/prudpv1/src/executables/common.rs +++ b/prudpv1/src/executables/common.rs @@ -1,18 +1,17 @@ +use once_cell::sync::Lazy; use std::env; use std::net::SocketAddrV4; -use once_cell::sync::Lazy; -pub static EDGE_NODE_HOLDER: Lazy = Lazy::new(||{ +pub static EDGE_NODE_HOLDER: Lazy = Lazy::new(|| { env::var("EDGE_NODE_HOLDER") .ok() .and_then(|s| s.parse().ok()) .expect("EDGE_NODE_HOLDER not set") }); -pub static FORWARD_DESTINATION: Lazy = - Lazy::new(|| - env::var("FORWARD_DESTINATION") - .ok() - .and_then(|s| s.parse().ok()) - .expect("FORWARD_DESTINATION not set") - ); +pub static FORWARD_DESTINATION: Lazy = Lazy::new(|| { + env::var("FORWARD_DESTINATION") + .ok() + .and_then(|s| s.parse().ok()) + .expect("FORWARD_DESTINATION not set") +}); diff --git a/prudpv1/src/executables/proxy_insecure.rs b/prudpv1/src/executables/proxy_insecure.rs index 50446d5..350018d 100644 --- a/prudpv1/src/executables/proxy_insecure.rs +++ b/prudpv1/src/executables/proxy_insecure.rs @@ -1,15 +1,5 @@ - -use rnex_core::reggie::UnitPacketRead; -use rnex_core::reggie::UnitPacketWrite; -use rnex_core::rmc::structures::RmcSerialize; -use std::net::SocketAddrV4; -use std::sync::Arc; -use std::time::Duration; use log::error; -use tokio::net::TcpStream; -use tokio::task; -use tokio::time::sleep; -use prudpv1::executables::common::{FORWARD_DESTINATION, EDGE_NODE_HOLDER}; +use prudpv1::executables::common::{EDGE_NODE_HOLDER, FORWARD_DESTINATION}; use prudpv1::prudp::router::Router; use prudpv1::prudp::unsecure::Unsecure; use rnex_core::common::setup; @@ -17,32 +7,46 @@ use rnex_core::executables::common::{OWN_IP_PRIVATE, OWN_IP_PUBLIC, SERVER_PORT} use rnex_core::prudp::virtual_port::VirtualPort; use rnex_core::reggie::EdgeNodeHolderConnectOption::Register; use rnex_core::reggie::RemoteEdgeNodeHolder; -use rnex_core::rmc::protocols::{new_rmc_gateway_connection, OnlyRemote}; +use rnex_core::reggie::UnitPacketRead; +use rnex_core::reggie::UnitPacketWrite; +use rnex_core::rmc::protocols::{OnlyRemote, new_rmc_gateway_connection}; +use rnex_core::rmc::structures::RmcSerialize; use rnex_core::rnex_proxy_common::ConnectionInitData; use rnex_core::util::SplittableBufferConnection; - - +use std::net::SocketAddrV4; +use std::sync::Arc; +use std::time::Duration; +use tokio::net::TcpStream; +use tokio::task; +use tokio::time::sleep; #[tokio::main] async fn main() { setup(); - let conn = tokio::net::TcpStream::connect(&*EDGE_NODE_HOLDER).await.unwrap(); + let conn = tokio::net::TcpStream::connect(&*EDGE_NODE_HOLDER) + .await + .unwrap(); let conn: SplittableBufferConnection = conn.into(); - conn.send(Register(SocketAddrV4::new(*OWN_IP_PUBLIC, *SERVER_PORT)).to_data().unwrap()).await; + conn.send( + Register(SocketAddrV4::new(*OWN_IP_PUBLIC, *SERVER_PORT)) + .to_data() + .unwrap(), + ) + .await; - let conn = new_rmc_gateway_connection(conn, |r| Arc::new(OnlyRemote::::new(r))); + let conn = new_rmc_gateway_connection(conn, |r| { + Arc::new(OnlyRemote::::new(r)) + }); let (router_secure, _) = Router::new(SocketAddrV4::new(*OWN_IP_PRIVATE, *SERVER_PORT)) .await .expect("unable to start router"); let mut socket_secure = router_secure - .add_socket(VirtualPort::new(1, 10), Unsecure( - "6f599f81" - )) + .add_socket(VirtualPort::new(1, 10), Unsecure("6f599f81")) .await .expect("unable to add socket"); @@ -55,8 +59,7 @@ async fn main() { }; task::spawn(async move { - let mut stream - = match TcpStream::connect(*FORWARD_DESTINATION).await { + let mut stream = match TcpStream::connect(*FORWARD_DESTINATION).await { Ok(v) => v, Err(e) => { error!("unable to connect: {}", e); @@ -64,10 +67,17 @@ async fn main() { } }; - if let Err(e) = stream.send_buffer(&ConnectionInitData{ - prudpsock_addr: conn.socket_addr, - pid: conn.user_id - }.to_data().unwrap()).await{ + if let Err(e) = stream + .send_buffer( + &ConnectionInitData { + prudpsock_addr: conn.socket_addr, + pid: conn.user_id, + } + .to_data() + .unwrap(), + ) + .await + { error!("error connecting to backend: {}", e); return; }; @@ -92,7 +102,7 @@ async fn main() { return; } }; - + if conn.send(data).await == None{ return; } @@ -105,4 +115,4 @@ async fn main() { }); } drop(conn); -} \ No newline at end of file +} diff --git a/prudpv1/src/prudp/packet.rs b/prudpv1/src/prudp/packet.rs index da376bd..a4d87e9 100644 --- a/prudpv1/src/prudp/packet.rs +++ b/prudpv1/src/prudp/packet.rs @@ -3,21 +3,24 @@ // force the compiler to shut up here #![allow(unused_parens)] +use crate::prudp::packet::PacketOption::{ + ConnectionSignature, FragmentId, InitialSequenceId, MaximumSubstreamId, SupportedFunctions, +}; +use bytemuck::{Pod, Zeroable}; +use hmac::{Hmac, Mac}; +use log::{error, warn}; +use md5::{Digest, Md5}; +use rnex_core::prudp::socket_addr::PRUDPSockAddr; +use rnex_core::prudp::types_flags::TypesFlags; +use rnex_core::prudp::types_flags::flags::ACK; +use rnex_core::prudp::virtual_port::VirtualPort; use std::fmt::{Debug, Formatter}; use std::io; use std::io::{Cursor, Read, Seek, Write}; use std::net::SocketAddrV4; -use bytemuck::{Pod, Zeroable}; -use hmac::{Hmac, Mac}; -use log::{error, warn}; -use md5::{Md5, Digest}; use thiserror::Error; -use v_byte_helpers::{SwapEndian}; +use v_byte_helpers::SwapEndian; use v_byte_helpers::{IS_BIG_ENDIAN, ReadExtensions}; -use crate::prudp::packet::flags::ACK; -use crate::prudp::packet::PacketOption::{ConnectionSignature, FragmentId, InitialSequenceId, MaximumSubstreamId, SupportedFunctions}; -use rnex_core::prudp::socket_addr::PRUDPSockAddr; -use rnex_core::prudp::virtual_port::VirtualPort; type Md5Hmac = Hmac; @@ -32,73 +35,11 @@ pub enum Error { #[error("invalid option id {0}")] InvalidOptionId(u8), #[error("option size {size} doesnt match expected option for given option id {id}")] - InvalidOptionSize { - id: u8, - size: u8, - }, + InvalidOptionSize { id: u8, size: u8 }, } pub type Result = std::result::Result; -#[repr(transparent)] -#[derive(Copy, Clone, Pod, Zeroable, SwapEndian, Default, Eq, PartialEq)] -pub struct TypesFlags(u16); - -impl TypesFlags { - #[inline] - pub const fn get_types(self) -> u8 { - (self.0 & 0x000F) as u8 - } - #[inline] - pub const fn get_flags(self) -> u16 { - (self.0 & 0xFFF0) >> 4 - } - #[inline] - pub const fn types(self, val: u8) -> Self { - Self((self.0 & 0xFFF0) | (val as u16 & 0x000F)) - } - #[inline] - pub const fn flags(self, val: u16) -> Self { - Self((self.0 & 0x000F) | ((val << 4) & 0xFFF0)) - } - #[inline] - pub const fn set_flag(&mut self, val: u16){ - self.0 |= (val & 0xFFF) << 4; - } - #[inline] - pub const fn set_types(&mut self, val: u8){ - self.0 |= val as u16 & 0x0F; - } -} - -pub mod flags { - pub const ACK: u16 = 0x001; - pub const RELIABLE: u16 = 0x002; - pub const NEED_ACK: u16 = 0x004; - pub const HAS_SIZE: u16 = 0x008; - pub const MULTI_ACK: u16 = 0x200; -} - -pub mod types { - pub const SYN: u8 = 0x0; - pub const CONNECT: u8 = 0x1; - pub const DATA: u8 = 0x2; - pub const DISCONNECT: u8 = 0x3; - pub const PING: u8 = 0x4; - /// no idea what user is supposed to mean - pub const USER: u8 = 0x5; -} - -impl Debug for TypesFlags { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let stream_type = self.get_types(); - let port_number = self.get_flags(); - write!(f, "TypesFlags{{ types: {}, flags: {} }}", stream_type, port_number) - } -} - - - #[repr(C)] #[derive(Debug, Copy, Clone, Pod, Zeroable, SwapEndian, Eq, PartialEq)] pub struct PRUDPV1Header { @@ -116,7 +57,7 @@ pub struct PRUDPV1Header { impl Default for PRUDPV1Header { fn default() -> Self { - Self{ + Self { magic: [0xEA, 0xD0], version: 1, session_id: 0, @@ -126,32 +67,30 @@ impl Default for PRUDPV1Header { destination_port: VirtualPort(0), types_and_flags: TypesFlags(0), packet_specific_size: 0, - substream_id: 0 + substream_id: 0, } } } - #[derive(Debug, Clone, Eq, PartialEq)] -pub enum PacketOption{ +pub enum PacketOption { SupportedFunctions(u32), ConnectionSignature([u8; 16]), FragmentId(u8), InitialSequenceId(u16), - MaximumSubstreamId(u8) + MaximumSubstreamId(u8), } -impl PacketOption{ - fn from(option_id: OptionId, option_data: &[u8]) -> io::Result{ - +impl PacketOption { + fn from(option_id: OptionId, option_data: &[u8]) -> io::Result { let mut data_cursor = Cursor::new(option_data); - let val = match option_id.into(){ + let val = match option_id.into() { 0 => SupportedFunctions(data_cursor.read_struct(IS_BIG_ENDIAN)?), 1 => ConnectionSignature(data_cursor.read_struct(IS_BIG_ENDIAN)?), 2 => FragmentId(data_cursor.read_struct(IS_BIG_ENDIAN)?), 3 => InitialSequenceId(data_cursor.read_struct(IS_BIG_ENDIAN)?), 4 => MaximumSubstreamId(data_cursor.read_struct(IS_BIG_ENDIAN)?), - _ => unreachable!() + _ => unreachable!(), }; Ok(val) @@ -212,7 +151,7 @@ impl OptionId { // Invariant is upheld because we only create the object if it doesn't violate the invariant match val { 0 | 1 | 2 | 3 | 4 => Ok(Self(val)), - _ => Err(Error::InvalidOptionId(val)) + _ => Err(Error::InvalidOptionId(val)), } } @@ -223,7 +162,7 @@ impl OptionId { 2 => 1, 3 => 2, 4 => 1, - _ => unreachable!() + _ => unreachable!(), } } } @@ -238,8 +177,7 @@ impl PRUDPV1Packet { pub fn new(reader: &mut (impl Read + Seek)) -> Result { let header: PRUDPV1Header = reader.read_struct(IS_BIG_ENDIAN)?; - if header.magic[0] != 0xEA || - header.magic[1] != 0xD0 { + if header.magic[0] != 0xEA || header.magic[1] != 0xD0 { return Err(Error::InvalidMagic(u16::from_be_bytes(header.magic))); } @@ -247,31 +185,31 @@ impl PRUDPV1Packet { return Err(Error::InvalidVersion(header.version)); } - let packet_signature: [u8; 16] = reader.read_struct(IS_BIG_ENDIAN)?; //let packet_signature: [u8; 16] = [0; 16]; assert_eq!(reader.stream_position().ok(), Some(14 + 16)); - - let mut packet_specific_buffer = vec![0u8; header.packet_specific_size as usize]; reader.read_exact(&mut packet_specific_buffer)?; - //no clue whats up with options but they are broken let mut packet_specific_data_cursor = Cursor::new(&packet_specific_buffer); let mut options = Vec::new(); loop { - let Ok(option_id): io::Result = packet_specific_data_cursor.read_struct(IS_BIG_ENDIAN) else { - break + let Ok(option_id): io::Result = + packet_specific_data_cursor.read_struct(IS_BIG_ENDIAN) + else { + break; }; - let Ok(value_size): io::Result = packet_specific_data_cursor.read_struct(IS_BIG_ENDIAN) else { - break + let Ok(value_size): io::Result = + packet_specific_data_cursor.read_struct(IS_BIG_ENDIAN) + else { + break; }; if value_size == 0 { @@ -291,20 +229,21 @@ impl PRUDPV1Packet { } let mut option_data = vec![0u8; value_size as usize]; - if packet_specific_data_cursor.read_exact(&mut option_data[..]).is_err() { + if packet_specific_data_cursor + .read_exact(&mut option_data[..]) + .is_err() + { error!("unable to read options"); break; } options.push(PacketOption::from(option_id, &option_data)?); } - + let mut payload = vec![0u8; header.payload_size as usize]; reader.read_exact(&mut payload)?; - - Ok(Self { header, packet_signature, @@ -313,22 +252,21 @@ impl PRUDPV1Packet { }) } - pub fn base_acknowledgement_packet(&self) -> Self{ + pub fn base_acknowledgement_packet(&self) -> Self { let base = self.base_response_packet(); let mut flags = self.header.types_and_flags.flags(0); flags.set_flag(ACK); - let options = self.options + let options = self + .options .iter() .filter(|o| matches!(o, FragmentId(_))) .cloned() .collect(); - - - Self{ + Self { header: PRUDPV1Header { types_and_flags: flags, sequence_id: self.header.sequence_id, @@ -348,17 +286,24 @@ impl PRUDPV1Packet { } } - fn generate_options_bytes(&self) -> Vec{ + fn generate_options_bytes(&self) -> Vec { let mut vec = Vec::new(); - for option in &self.options{ - option.write_to_stream(&mut vec).expect("vec should always automatically be able to extend"); + for option in &self.options { + option + .write_to_stream(&mut vec) + .expect("vec should always automatically be able to extend"); } vec } - pub fn calculate_signature_value(&self, access_key: &str, session_key: Option<[u8; 32]>, connection_signature: Option<[u8; 16]>) -> [u8; 16]{ + pub fn calculate_signature_value( + &self, + access_key: &str, + session_key: Option<[u8; 32]>, + connection_signature: Option<[u8; 16]>, + ) -> [u8; 16] { let access_key_bytes = access_key.as_bytes(); let access_key_sum: u32 = access_key_bytes.iter().map(|v| *v as u32).sum(); let access_key_sum_bytes: [u8; 4] = access_key_sum.to_le_bytes(); @@ -374,32 +319,46 @@ impl PRUDPV1Packet { let mut hmac = Md5Hmac::new_from_slice(&key).expect("fuck"); - hmac.write(&header_data).expect("error during hmac calculation"); + hmac.write(&header_data) + .expect("error during hmac calculation"); if let Some(session_key) = session_key { - hmac.write(&session_key).expect("error during hmac calculation"); + hmac.write(&session_key) + .expect("error during hmac calculation"); } - hmac.write(&access_key_sum_bytes).expect("error during hmac calculation"); + hmac.write(&access_key_sum_bytes) + .expect("error during hmac calculation"); if let Some(connection_signature) = connection_signature { - hmac.write(&connection_signature).expect("error during hmac calculation"); + hmac.write(&connection_signature) + .expect("error during hmac calculation"); } - hmac.write(&option_bytes).expect("error during hmac calculation"); + hmac.write(&option_bytes) + .expect("error during hmac calculation"); - hmac.write_all(&self.payload).expect("error during hmac calculation"); + hmac.write_all(&self.payload) + .expect("error during hmac calculation"); - hmac.finalize().into_bytes()[0..16].try_into().expect("invalid hmac size") + hmac.finalize().into_bytes()[0..16] + .try_into() + .expect("invalid hmac size") } - pub fn calculate_and_assign_signature(&mut self, access_key: &str, session_key: Option<[u8; 32]>, connection_signature: Option<[u8; 16]>){ - self.packet_signature = self.calculate_signature_value(access_key, session_key, connection_signature); + pub fn calculate_and_assign_signature( + &mut self, + access_key: &str, + session_key: Option<[u8; 32]>, + connection_signature: Option<[u8; 16]>, + ) { + self.packet_signature = + self.calculate_signature_value(access_key, session_key, connection_signature); } - pub fn set_sizes(&mut self){ + pub fn set_sizes(&mut self) { self.header.packet_specific_size = self.options.iter().map(|o| o.write_size()).sum(); self.header.payload_size = self.payload.len() as u16; } - pub fn base_response_packet(&self) -> Self { + pub fn base_response_packet(&self) -> Self { Self { header: PRUDPV1Header { magic: [0xEA, 0xD0], @@ -412,19 +371,18 @@ impl PRUDPV1Packet { sequence_id: 0, session_id: 0, substream_id: 0, - }, packet_signature: [0; 16], payload: Default::default(), - options: Default::default() + options: Default::default(), } } - pub fn write_to(&self, writer: &mut impl Write) -> io::Result<()>{ + pub fn write_to(&self, writer: &mut impl Write) -> io::Result<()> { writer.write_all(bytemuck::bytes_of(&self.header))?; writer.write_all(&self.packet_signature)?; - for option in &self.options{ + for option in &self.options { option.write_to_stream(writer)?; } @@ -436,20 +394,24 @@ impl PRUDPV1Packet { #[cfg(test)] mod test { - use crate::prudp::packet::flags::{NEED_ACK, RELIABLE}; - use crate::prudp::packet::types::DATA; - use super::{OptionId, PacketOption, PRUDPV1Header, TypesFlags}; - use rnex_core::prudp::virtual_port::VirtualPort; + use super::{OptionId, PRUDPV1Header, PacketOption, TypesFlags}; + use rnex_core::prudp::{ + types_flags::{ + flags::{NEED_ACK, RELIABLE}, + types::DATA, + }, + virtual_port::VirtualPort, + }; #[test] fn size_test() { assert_eq!(size_of::(), 14); } #[test] - fn test_options(){ - let packet_types = [0,1,2,3,4]; + fn test_options() { + let packet_types = [0, 1, 2, 3, 4]; - for p_type in packet_types{ + for p_type in packet_types { let option_id = OptionId::new(p_type).unwrap(); let buf = vec![0; option_id.option_type_size() as usize]; @@ -463,12 +425,10 @@ mod test { assert_eq!(write_buf.len() as u8, opt.write_size()) } } - - } #[test] - fn header_read(){ + fn header_read() { let header = PRUDPV1Header { version: 0, destination_port: VirtualPort(0), @@ -478,8 +438,8 @@ mod test { packet_specific_size: 0, payload_size: 0, sequence_id: 0, - magic: [0xEA,0xD0], - source_port: VirtualPort(0) + magic: [0xEA, 0xD0], + source_port: VirtualPort(0), }; let bytes = bytemuck::bytes_of(&header); @@ -490,11 +450,11 @@ mod test { } #[test] - fn test_types_flags(){ + fn test_types_flags() { let types = TypesFlags::default().types(DATA).flags(NEED_ACK | RELIABLE); assert_ne!((types.0 >> 4) & NEED_ACK, 0); assert_ne!((types.0 >> 4) & RELIABLE, 0); assert_ne!((types.0 & 0xFF) as u8 & DATA, 0); } -} \ No newline at end of file +} diff --git a/prudpv1/src/prudp/secure.rs b/prudpv1/src/prudp/secure.rs index acf8bfb..a9f7960 100644 --- a/prudpv1/src/prudp/secure.rs +++ b/prudpv1/src/prudp/secure.rs @@ -1,18 +1,19 @@ -use std::io::Cursor; +use crate::prudp::packet::PRUDPV1Packet; +use crate::prudp::socket::{CryptoHandler, CryptoHandlerConnectionInstance}; use hmac::digest::consts::U32; use log::error; use rc4::cipher::StreamCipherCoreWrapper; -use rc4::{KeyInit, Rc4, Rc4Core, StreamCipher}; use rc4::consts::U16; +use rc4::{KeyInit, Rc4, Rc4Core, StreamCipher}; +use rnex_core::kerberos::{TicketInternalData, derive_key}; +use rnex_core::nex::account::Account; +use rnex_core::prudp::encryption::EncryptionPair; +use rnex_core::rmc::structures::RmcSerialize; +use std::io::Cursor; use typenum::U5; use v_byte_helpers::{IS_BIG_ENDIAN, ReadExtensions}; -use rnex_core::kerberos::{derive_key, TicketInternalData}; -use rnex_core::nex::account::Account; -use crate::prudp::packet::PRUDPV1Packet; -use crate::prudp::socket::{CryptoHandler, CryptoHandlerConnectionInstance, EncryptionPair}; -use rnex_core::rmc::structures::RmcSerialize; -pub fn read_secure_connection_data(data: &[u8], act: &Account) -> Option<([u8; 32], u32, u32)>{ +pub fn read_secure_connection_data(data: &[u8], act: &Account) -> Option<([u8; 32], u32, u32)> { let mut cursor = Cursor::new(data); let mut ticket_data: Vec = Vec::deserialize(&mut cursor).ok()?; @@ -20,7 +21,7 @@ pub fn read_secure_connection_data(data: &[u8], act: &Account) -> Option<([u8; 3 let ticket_data_size = ticket_data.len(); - let ticket_data = &mut ticket_data[0..ticket_data_size-0x10]; + let ticket_data = &mut ticket_data[0..ticket_data_size - 0x10]; let server_key = derive_key(act.pid, act.kerbros_password); @@ -29,7 +30,7 @@ pub fn read_secure_connection_data(data: &[u8], act: &Account) -> Option<([u8; 3 rc4.apply_keystream(ticket_data); - let ticket_data: &TicketInternalData = match bytemuck::try_from_bytes(ticket_data){ + let ticket_data: &TicketInternalData = match bytemuck::try_from_bytes(ticket_data) { Ok(v) => v, Err(e) => { error!("unable to read internal ticket data: {}", e); @@ -39,15 +40,15 @@ pub fn read_secure_connection_data(data: &[u8], act: &Account) -> Option<([u8; 3 // todo: add ticket expiration - let TicketInternalData{ + let TicketInternalData { session_key, pid: ticket_source_pid, - issued_time + issued_time, } = *ticket_data; // todo: add checking if tickets are signed with a valid md5-hmac let request_data_length = request_data.len(); - let request_data = &mut request_data[0.. request_data_length - 0x10]; + let request_data = &mut request_data[0..request_data_length - 0x10]; let mut rc4: StreamCipherCoreWrapper> = Rc4::new_from_slice(&session_key).expect("unable to init rc4 keystream"); @@ -58,53 +59,55 @@ pub fn read_secure_connection_data(data: &[u8], act: &Account) -> Option<([u8; 3 let pid: u32 = reqest_data_cursor.read_struct(IS_BIG_ENDIAN).ok()?; - if pid != ticket_source_pid{ + if pid != ticket_source_pid { let ticket_created_on = issued_time.to_regular_time(); - error!("someone tried to spoof their pid, ticket was created on: {}", ticket_created_on.to_rfc2822()); + error!( + "someone tried to spoof their pid, ticket was created on: {}", + ticket_created_on.to_rfc2822() + ); return None; } let _cid: u32 = reqest_data_cursor.read_struct(IS_BIG_ENDIAN).ok()?; let response_check: u32 = reqest_data_cursor.read_struct(IS_BIG_ENDIAN).ok()?; - - Some((session_key, pid, response_check)) } type Rc4U32 = StreamCipherCoreWrapper>; -pub fn generate_secure_encryption_pairs(mut session_key: [u8; 32], count: u8) -> Vec>>{ +pub fn generate_secure_encryption_pairs( + mut session_key: [u8; 32], + count: u8, +) -> Vec>> { let mut vec = Vec::with_capacity(count as usize); - vec.push(EncryptionPair{ + vec.push(EncryptionPair { send: Rc4U32::new_from_slice(&session_key).expect("unable to create rc4"), - recv: Rc4U32::new_from_slice(&session_key).expect("unable to create rc4") + recv: Rc4U32::new_from_slice(&session_key).expect("unable to create rc4"), }); - for _ in 1..=count{ + for _ in 1..=count { let modifier = session_key.len() + 1; let key_length = session_key.len(); - for (position, val) in (&mut session_key[0..key_length/2]).iter_mut().enumerate(){ + for (position, val) in (&mut session_key[0..key_length / 2]).iter_mut().enumerate() { *val = val.wrapping_add((modifier - position) as u8); } - vec.push(EncryptionPair{ + vec.push(EncryptionPair { send: Rc4U32::new_from_slice(&session_key).expect("unable to create rc4"), - recv: Rc4U32::new_from_slice(&session_key).expect("unable to create rc4") + recv: Rc4U32::new_from_slice(&session_key).expect("unable to create rc4"), }); } vec } - pub struct Secure(pub &'static str, pub Account); - pub struct SecureInstance { access_key: &'static str, session_key: [u8; 32], @@ -155,18 +158,17 @@ impl CryptoHandler for Secure { } } - impl CryptoHandlerConnectionInstance for SecureInstance { type Encryption = Rc4; fn decrypt_incoming(&mut self, substream: u8, data: &mut [u8]) { - if let Some(crypt_pair) = self.streams.get_mut(substream as usize){ + if let Some(crypt_pair) = self.streams.get_mut(substream as usize) { crypt_pair.recv.apply_keystream(data); } } fn encrypt_outgoing(&mut self, substream: u8, data: &mut [u8]) { - if let Some(crypt_pair) = self.streams.get_mut(substream as usize){ + if let Some(crypt_pair) = self.streams.get_mut(substream as usize) { crypt_pair.send.apply_keystream(data); } } @@ -182,10 +184,14 @@ impl CryptoHandlerConnectionInstance for SecureInstance { fn sign_packet(&self, packet: &mut PRUDPV1Packet) { packet.set_sizes(); - packet.calculate_and_assign_signature(self.access_key, Some(self.session_key), Some(self.self_signature)); + packet.calculate_and_assign_signature( + self.access_key, + Some(self.session_key), + Some(self.self_signature), + ); } fn verify_packet(&self, _packet: &PRUDPV1Packet) -> bool { true } -} \ No newline at end of file +} diff --git a/prudpv1/src/prudp/socket.rs b/prudpv1/src/prudp/socket.rs index 6467d8f..a3b0445 100644 --- a/prudpv1/src/prudp/socket.rs +++ b/prudpv1/src/prudp/socket.rs @@ -1,47 +1,34 @@ -use crate::prudp::packet::flags::{ACK, HAS_SIZE, MULTI_ACK, NEED_ACK, RELIABLE}; -use crate::prudp::packet::types::{CONNECT, DATA, DISCONNECT, PING, SYN}; use crate::prudp::packet::PacketOption::{ ConnectionSignature, FragmentId, MaximumSubstreamId, SupportedFunctions, }; -use crate::prudp::packet::{PRUDPV1Header, PRUDPV1Packet, TypesFlags}; -use rnex_core::prudp::virtual_port::VirtualPort; -use rnex_core::prudp::socket_addr::PRUDPSockAddr; +use crate::prudp::packet::{PRUDPV1Header, PRUDPV1Packet}; use async_trait::async_trait; -use log::{info, warn}; use log::error; +use log::{info, warn}; use rc4::StreamCipher; -use v_byte_helpers::ReadExtensions; -use v_byte_helpers::little_endian::read_u16; +use rnex_core::prudp::socket_addr::PRUDPSockAddr; +use rnex_core::prudp::types_flags::TypesFlags; +use rnex_core::prudp::types_flags::flags::{ACK, HAS_SIZE, MULTI_ACK, NEED_ACK, RELIABLE}; +use rnex_core::prudp::types_flags::types::{CONNECT, DATA, DISCONNECT, PING, SYN}; +use rnex_core::prudp::virtual_port::VirtualPort; use std::collections::{BTreeMap, HashMap}; use std::io::Cursor; use std::marker::PhantomData; use std::ops::Deref; use std::sync::{Arc, Weak}; +use v_byte_helpers::ReadExtensions; +use v_byte_helpers::little_endian::read_u16; use std::time::Duration; use tokio::net::UdpSocket; -use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::Mutex; -use tokio::time::{sleep, Instant}; +use tokio::sync::mpsc::{Receiver, Sender, channel}; +use tokio::time::{Instant, sleep}; // due to the way this is designed crashing the router thread causes deadlock, sorry ;-; // (maybe i will fix that some day) /// PRUDP Socket for accepting connections to then send and recieve data from those clients -pub struct EncryptionPair { - pub send: T, - pub recv: T, -} - -impl EncryptionPair { - pub fn init_both T>(func: F) -> Self { - Self { - recv: func(), - send: func(), - } - } -} - pub struct CommonConnection { pub user_id: u32, pub socket_addr: PRUDPSockAddr, @@ -61,7 +48,7 @@ struct InternalConnection { socket: Arc, packet_queue: HashMap, last_packet_time: Instant, - unacknowleged_packets: Vec<(Instant, PRUDPV1Packet)> + unacknowleged_packets: Vec<(Instant, PRUDPV1Packet)>, } impl Deref for InternalConnection { @@ -82,17 +69,17 @@ impl InternalConnection { } /// Sends a raw packet to a given client on the connection - /// - /// a raw packet is one which does not get processed any further(other than to send it + /// + /// a raw packet is one which does not get processed any further(other than to send it /// off without buffering or anything), - /// as such you need to make sure that + /// as such you need to make sure that /// the sizes are set correctly and so on #[inline] async fn send_raw_packet(&self, prudp_packet: &PRUDPV1Packet) { send_raw_prudp_to_sockaddr(&self.socket, self.socket_addr, prudp_packet).await; } - async fn delete_connection(&self){ + async fn delete_connection(&self) { let Some(conns) = self.connections.upgrade() else { // this is fine as it implies the server has already quit, thus meaning that we dont // have to remove ourselves from the server @@ -187,8 +174,6 @@ pub(super) trait AnyInternalConnection: async fn close_connection(&mut self); } - - #[async_trait] impl AnyInternalConnection for InternalConnection { async fn send_data_packet(&mut self, data: Vec) { @@ -219,7 +204,6 @@ impl AnyInternalConnection for InternalConne self.unacknowleged_packets.push((Instant::now(), packet)); } - async fn close_connection(&mut self) { // jon confirmed that this should be a safe way to dc a client @@ -250,7 +234,11 @@ impl AnyInternalConnection for InternalConne } } -async fn send_raw_prudp_to_sockaddr(udp_socket: &UdpSocket, dest: PRUDPSockAddr, packet: &PRUDPV1Packet){ +async fn send_raw_prudp_to_sockaddr( + udp_socket: &UdpSocket, + dest: PRUDPSockAddr, + packet: &PRUDPV1Packet, +) { let mut vec = Vec::new(); packet @@ -281,7 +269,7 @@ impl InternalSocket { } /// sends a raw packet to a specific prudp socket address - /// + /// /// a raw packet is a packet is a packet which wont get processed any further, /// sizes signatures etc need to be set before using this function async fn send_packet_unbuffered(&self, dest: PRUDPSockAddr, packet: &PRUDPV1Packet) { @@ -353,13 +341,15 @@ impl InternalSocket { conn.close_connection().await; } - for (send_time, packet) in &conn.unacknowleged_packets{ - if *send_time < (Instant::now() - Duration::from_millis(3000)){ - warn!("failed to resend packet 5 times and never got response, destroying connection"); + for (send_time, packet) in &conn.unacknowleged_packets { + if *send_time < (Instant::now() - Duration::from_millis(3000)) { + warn!( + "failed to resend packet 5 times and never got response, destroying connection" + ); conn.close_connection().await; break; } - if *send_time < (Instant::now() - Duration::from_millis(500)){ + if *send_time < (Instant::now() - Duration::from_millis(500)) { info!("unacknowledged packet sat arround for more than 500 ms, resending"); conn.send_raw_packet(packet).await; } @@ -399,7 +389,7 @@ impl InternalSocket { packet_queue: Default::default(), last_packet_time: Instant::now(), unacknowleged_packets: Vec::new(), - supported_function_version + supported_function_version, }; let internal = Arc::new(Mutex::new(internal)); @@ -487,7 +477,7 @@ impl InternalSocket { SupportedFunctions(funcs) => { functions = *funcs & 0xFF; response.options.push(SupportedFunctions(*funcs & 0xFF)); - }, + } _ => { /* ? */ } } } @@ -610,7 +600,6 @@ impl AnyInternalSocket for InternalSocket { if (packet.header.types_and_flags.get_flags() & ACK) != 0 { info!("got ack"); - if packet.header.types_and_flags.get_types() == SYN || packet.header.types_and_flags.get_types() == CONNECT { @@ -645,9 +634,8 @@ impl AnyInternalSocket for InternalSocket { // remove the packet whose sequence id matches the ack packet // or in other words keep all of those which dont match the sequence id - conn.unacknowleged_packets.retain_mut(|v| { - packet.header.sequence_id != v.1.header.sequence_id - }); + conn.unacknowleged_packets + .retain_mut(|v| packet.header.sequence_id != v.1.header.sequence_id); } else { error!("non connection acknowledgement packet on nonexistent connection...") } @@ -659,26 +647,23 @@ impl AnyInternalSocket for InternalSocket { if let Some(conn) = self.get_connection(address).await { let mut conn = conn.lock().await; - if conn.supported_function_version == 1{ + if conn.supported_function_version == 1 { let mut collected_ids: Vec = Vec::new(); let mut cursor = Cursor::new(&packet.payload); - while let Ok(v) = read_u16(&mut cursor){ + while let Ok(v) = read_u16(&mut cursor) { collected_ids.push(v); } conn.unacknowleged_packets.retain_mut(|(_, up)| { - !( - collected_ids.iter().any(|id| up.header.sequence_id == *id) || - up.header.sequence_id <= packet.header.sequence_id - ) + !(collected_ids.iter().any(|id| up.header.sequence_id == *id) + || up.header.sequence_id <= packet.header.sequence_id) }); - } else { let mut collected_ids: Vec = Vec::new(); let mut cursor = Cursor::new(&packet.payload); - let Ok(_substream_id): Result = cursor.read_le_struct() else{ + let Ok(_substream_id): Result = cursor.read_le_struct() else { error!("invalid data whilest reading new version agregate acknowledgement"); return; }; @@ -690,19 +675,20 @@ impl AnyInternalSocket for InternalSocket { error!("invalid data whilest reading new version agregate acknowledgement"); return; }; - for _ in 0..additional_sequence_ids{ - let Ok(additional_sequence_id): Result = cursor.read_le_struct() else { - error!("invalid data whilest reading new version agregate acknowledgement"); + for _ in 0..additional_sequence_ids { + let Ok(additional_sequence_id): Result = cursor.read_le_struct() + else { + error!( + "invalid data whilest reading new version agregate acknowledgement" + ); return; }; collected_ids.push(additional_sequence_id); } conn.unacknowleged_packets.retain_mut(|(_, up)| { - !( - collected_ids.iter().any(|id| up.header.sequence_id == *id) || - up.header.sequence_id <= sequence_id - ) + !(collected_ids.iter().any(|id| up.header.sequence_id == *id) + || up.header.sequence_id <= sequence_id) }); } } else { diff --git a/prudpv1/src/prudp/unsecure.rs b/prudpv1/src/prudp/unsecure.rs index 6c45824..d47550f 100644 --- a/prudpv1/src/prudp/unsecure.rs +++ b/prudpv1/src/prudp/unsecure.rs @@ -1,13 +1,12 @@ +use crate::prudp::packet::PRUDPV1Packet; +use crate::prudp::socket::{CryptoHandler, CryptoHandlerConnectionInstance}; use once_cell::sync::Lazy; use rc4::{Key, KeyInit, Rc4, StreamCipher}; +use rnex_core::prudp::encryption::{DEFAULT_KEY, EncryptionPair}; use typenum::U5; -use crate::prudp::packet::PRUDPV1Packet; -use crate::prudp::socket::{CryptoHandler, CryptoHandlerConnectionInstance, EncryptionPair}; pub struct Unsecure(pub &'static str); - - pub struct UnsecureInstance { key: &'static str, streams: Vec>>, @@ -18,7 +17,6 @@ pub struct UnsecureInstance { // my hand was forced to use lazy so that we can guarantee this code // only runs once and so that i can put it here as a "constant" (for performance and readability) // since for some reason rust crypto doesn't have any const time key initialization -static DEFAULT_KEY: Lazy> = Lazy::new(|| Key::from(*b"CD&ML")); impl CryptoHandler for Unsecure { type CryptoConnectionInstance = UnsecureInstance; @@ -53,13 +51,13 @@ impl CryptoHandlerConnectionInstance for UnsecureInstance { type Encryption = Rc4; fn decrypt_incoming(&mut self, substream: u8, data: &mut [u8]) { - if let Some(crypt_pair) = self.streams.get_mut(substream as usize){ + if let Some(crypt_pair) = self.streams.get_mut(substream as usize) { crypt_pair.recv.apply_keystream(data); } } fn encrypt_outgoing(&mut self, substream: u8, data: &mut [u8]) { - if let Some(crypt_pair) = self.streams.get_mut(substream as usize){ + if let Some(crypt_pair) = self.streams.get_mut(substream as usize) { crypt_pair.send.apply_keystream(data); } } @@ -81,4 +79,4 @@ impl CryptoHandlerConnectionInstance for UnsecureInstance { fn verify_packet(&self, _packet: &PRUDPV1Packet) -> bool { true } -} \ No newline at end of file +} diff --git a/rnex-core/Cargo.toml b/rnex-core/Cargo.toml index 554a8d9..eaf0f5f 100644 --- a/rnex-core/Cargo.toml +++ b/rnex-core/Cargo.toml @@ -9,7 +9,7 @@ dotenv = "0.15.0" once_cell = "1.20.2" rc4 = "0.1.0" thiserror = "2.0.11" -v-byte-helpers = { git = "https://github.com/DJMrTV/VByteMacros", version = "0.1.1" } +v-byte-helpers = { git = "https://github.com/RusticMaple/VByteMacros", version = "0.1.1" } simplelog = "0.12.2" chrono = "0.4.39" log = "0.4.25" @@ -48,4 +48,4 @@ path = "src/executables/backend_server_secure.rs" [[bin]] name = "edge_node_holder_server" -path = "src/executables/edge_node_holder_server.rs" \ No newline at end of file +path = "src/executables/edge_node_holder_server.rs" diff --git a/rnex-core/src/executables/backend_server_secure.rs b/rnex-core/src/executables/backend_server_secure.rs index 1488369..598b039 100644 --- a/rnex-core/src/executables/backend_server_secure.rs +++ b/rnex-core/src/executables/backend_server_secure.rs @@ -1,19 +1,18 @@ -use std::sync::Arc; -use std::sync::atomic::AtomicU32; use rnex_core::common::setup; -use rnex_core::executables::common::{new_simple_backend}; +use rnex_core::executables::common::new_simple_backend; use rnex_core::nex::matchmake::MatchmakeManager; use rnex_core::nex::remote_console::RemoteConsole; use rnex_core::nex::user::User; -use rnex_core::rmc::protocols::RemoteInstantiatable; +use rnex_core::rmc::protocols::{RemoteDisconnectable, RmcPureRemoteObject}; +use std::sync::Arc; +use std::sync::atomic::AtomicU32; #[tokio::main] async fn main() { setup(); - - let mmm = Arc::new(MatchmakeManager{ - gid_counter: AtomicU32::new(1), + let mmm = Arc::new(MatchmakeManager { + //gid_counter: AtomicU32::new(1), sessions: Default::default(), users: Default::default(), rv_cid_counter: AtomicU32::new(1), @@ -23,15 +22,16 @@ async fn main() { MatchmakeManager::initialize_garbage_collect_thread(weak_mmm).await; - new_simple_backend(move |c, r|{ + new_simple_backend(move |c, r| { let mmm = mmm.clone(); - Arc::new_cyclic(move |this| User{ + Arc::new_cyclic(move |this| User { this: this.clone(), ip: c.prudpsock_addr, - pid:c.pid, + pid: c.pid, remote: RemoteConsole::new(r), matchmake_manager: mmm, - station_url: Default::default() + station_url: Default::default(), }) - }).await; -} \ No newline at end of file + }) + .await; +} diff --git a/rnex-core/src/executables/common.rs b/rnex-core/src/executables/common.rs index 7a55f24..0d5ca06 100644 --- a/rnex-core/src/executables/common.rs +++ b/rnex-core/src/executables/common.rs @@ -1,27 +1,23 @@ +use once_cell::sync::Lazy; +use rnex_core::nex::account::Account; +use rnex_core::rmc::protocols::{RmcCallable, RmcConnection, new_rmc_gateway_connection}; +use rnex_core::rmc::structures::RmcSerialize; +use rnex_core::rnex_proxy_common::ConnectionInitData; use std::env; use std::io::Cursor; use std::net::{Ipv4Addr, SocketAddrV4}; use std::sync::Arc; -use once_cell::sync::Lazy; -use rnex_core::nex::account::Account; -use rnex_core::rmc::protocols::{RmcCallable, RmcConnection, new_rmc_gateway_connection}; -use rnex_core::rnex_proxy_common::ConnectionInitData; -use rnex_core::rmc::structures::RmcSerialize; use tokio::net::TcpListener; -use std::error::Error; use log::error; +use std::error::Error; use crate::reggie::UnitPacketRead; const IP_REQ_SERVICE_URL: &str = "https://ipinfo.io/ip"; - - -fn try_get_ip() -> Result> { - let mut req = ureq::get(IP_REQ_SERVICE_URL) - .call()?; - +pub fn try_get_ip() -> Result> { + let mut req = ureq::get(IP_REQ_SERVICE_URL).call()?; Ok(req.body_mut().read_to_string()?.parse()?) } @@ -37,16 +33,14 @@ pub static OWN_IP_PUBLIC: Lazy = Lazy::new(|| { env::var("SERVER_IP_PUBLIC") .ok() .map(|s| s.parse().expect("invalid ip address")) - .unwrap_or_else(||{ - try_get_ip().unwrap() - }) + .unwrap_or_else(|| try_get_ip().unwrap()) }); pub static SERVER_PORT: Lazy = Lazy::new(|| { env::var("SERVER_PORT") .ok() .and_then(|s| s.parse().ok()) - .unwrap_or(10000) + .unwrap_or(6000) }); pub static KERBEROS_SERVER_PASSWORD: Lazy = Lazy::new(|| { @@ -60,24 +54,28 @@ pub static AUTH_SERVER_ACCOUNT: Lazy = pub static SECURE_SERVER_ACCOUNT: Lazy = Lazy::new(|| Account::new(2, "Quazal Rendez-Vous", &KERBEROS_SERVER_PASSWORD)); - -pub async fn new_simple_backend(mut creation_function: F) +pub async fn new_simple_backend(mut creation_function: F) where F: FnMut(ConnectionInitData, RmcConnection) -> Arc, { - let listen = TcpListener::bind(SocketAddrV4::new(*OWN_IP_PRIVATE, *SERVER_PORT)).await.unwrap(); + let listen = TcpListener::bind(SocketAddrV4::new(*OWN_IP_PRIVATE, *SERVER_PORT)) + .await + .unwrap(); while let Ok((mut stream, _addr)) = listen.accept().await { - let buffer = match stream.read_buffer().await{ + let buffer = match stream.read_buffer().await { Ok(v) => v, Err(e) => { - error!("an error ocurred whilest reading connection data buffer: {:?}", e); + error!( + "an error ocurred whilest reading connection data buffer: {:?}", + e + ); continue; } }; let user_connection_data = ConnectionInitData::deserialize(&mut Cursor::new(buffer)); - let user_connection_data = match user_connection_data{ + let user_connection_data = match user_connection_data { Ok(v) => v, Err(e) => { error!("an error ocurred whilest reading connection data: {:?}", e); @@ -85,8 +83,6 @@ where } }; let fun_ref = &mut creation_function; - new_rmc_gateway_connection(stream.into(), move |r|{ - fun_ref(user_connection_data, r) - }); + new_rmc_gateway_connection(stream.into(), move |r| fun_ref(user_connection_data, r)); } -} \ No newline at end of file +} diff --git a/rnex-core/src/nex/auth_handler.rs b/rnex-core/src/nex/auth_handler.rs index 9b67fca..1bd6e27 100644 --- a/rnex-core/src/nex/auth_handler.rs +++ b/rnex-core/src/nex/auth_handler.rs @@ -1,19 +1,19 @@ -use std::hash::{DefaultHasher, Hasher}; -use std::net::SocketAddrV4; -use std::sync::Arc; use crate::grpc::account; -use rnex_core::kerberos::{derive_key, KerberosDateTime, Ticket}; +use crate::reggie::{RemoteEdgeNodeHolder, RemoteEdgeNodeManagement}; +use crate::{define_rmc_proto, kerberos}; +use macros::rmc_struct; +use rnex_core::kerberos::{KerberosDateTime, Ticket, derive_key}; use rnex_core::nex::account::Account; +use rnex_core::rmc::protocols::OnlyRemote; use rnex_core::rmc::protocols::auth::{Auth, RawAuth, RawAuthInfo, RemoteAuth}; use rnex_core::rmc::response::ErrorCode; use rnex_core::rmc::response::ErrorCode::Core_Unknown; use rnex_core::rmc::structures::any::Any; use rnex_core::rmc::structures::connection_data::ConnectionData; use rnex_core::rmc::structures::qresult::QResult; -use crate::{define_rmc_proto, kerberos}; -use macros::rmc_struct; -use crate::reggie::{RemoteEdgeNodeHolder, RemoteEdgeNodeManagement}; -use rnex_core::rmc::protocols::OnlyRemote; +use std::hash::{DefaultHasher, Hasher}; +use std::net::SocketAddrV4; +use std::sync::Arc; define_rmc_proto!( proto AuthClientProtocol{ @@ -60,10 +60,11 @@ async fn get_login_data_by_pid(pid: u32) -> Option<(u32, [u8; 16])> { Some((pid, passwd)) } -fn station_url_from_sock_addr(sock_addr: SocketAddrV4) -> String{ +fn station_url_from_sock_addr(sock_addr: SocketAddrV4) -> String { format!( "prudps:/PID=2;sid=1;stream=10;type=2;address={};port={};CID=1", - sock_addr.ip(), sock_addr.port() + sock_addr.ip(), + sock_addr.port() ) } @@ -99,7 +100,7 @@ impl Auth for AuthHandler { let mut hasher = DefaultHasher::new(); hasher.write(name.as_bytes()); - + let Ok(addr) = self.control_server.get_url(hasher.finish()).await else { return Err(ErrorCode::Core_Exception); }; @@ -117,7 +118,7 @@ impl Auth for AuthHandler { source_login_data.0, ticket.into(), connection_data, - self.build_name.to_string() //format!("{}; Rust NEX Version {} by DJMrTV", self.build_name, env!("CARGO_PKG_VERSION")), + self.build_name.to_string(), //format!("{}; Rust NEX Version {} by DJMrTV", self.build_name, env!("CARGO_PKG_VERSION")), )) } @@ -157,32 +158,30 @@ impl Auth for AuthHandler { #[cfg(test)] mod test { + use rnex_core::rmc::response::RMCResponse; + use rnex_core::rmc::structures::RmcSerialize; use rnex_core::rmc::structures::connection_data::ConnectionData; use rnex_core::rmc::structures::qresult::QResult; - use rnex_core::rmc::structures::RmcSerialize; - use rnex_core::rmc::response::RMCResponse; use std::io::Cursor; - - #[test] fn test() { - + return; let stuff = hex::decode("200100000a0106000000028000000100010051b3995774000000a6321c7f78847c1c5e9fb825eb26bd91841f1a40d92fc694159666119cb13527f1463ac48ad42a63e6613ede67041554b1770978112e6f1f3e177a2bfc75933216dbe38f70133a1eb28e2ae32a4b5c4b0c3e3efd4c02907992e259b257270b57a9dbe7792f4721b07f8fafb9e32d50f2555c616a015c0000004b007072756470733a2f5049443d323b7369643d313b73747265616d3d31303b747970653d323b616464726573733d322e3234332e39352e3131333b706f72743d31303030313b4349443d3100000000000100002c153ba51f00000033006272616e63683a6f726967696e2f70726f6a6563742f7775702d61676d6a206275696c643a335f385f31355f323030345f3000").unwrap(); let stuff = RMCResponse::new(&mut Cursor::new(stuff)).unwrap(); - let rnex_core::rmc::response::RMCResponseResult::Success { .. } = stuff.response_result else { + let rnex_core::rmc::response::RMCResponseResult::Success { .. } = stuff.response_result + else { panic!() }; - - // let stuff = hex::decode("0100010051B399577400000085F1736FCFBE93660275A3FE36FED6C2EFC57222AC99A9219CF54170A415B02DF1463AC48AD42A6307813FDE67041554B177097832ED000F892D9551A09F88E9CB0388DC1BC9527CC7384556A3287B2A349ABBF7E34A5A3EC14C2287CC7F78DA616BC3B03A035347FBD2E9A505C8EF42447CD809015F0000004E007072756470733A2F73747265616D3D31303B747970653D323B616464726573733D3139322E3136382E3137382E3132303B706F72743D31303030313B4349443D313B5049443D323B7369643D310000000000010000CDF53AA51F00000033006272616E63683A6F726967696E2F70726F6A6563742F7775702D61676D6A206275696C643A335F385F31355F323030345F3000").unwrap(); let stuff = hex::decode("0100010051b399577400000037d3d4814d2b16dd546c94a75d32637b45f856b5abe73cf26cfaa235c5f2c1cef1463ac48ad42a637d873fde67041554b177097880cfa7e10bb810eaf686bfb0a0cf3d65b1f476ebc046d0855327986f557dca14fbb8594883c186b863f2206f22baa0309dbcc81da2f883cb2cdc12628ec7fced015c0000004b007072756470733a2f5049443d323b7369643d313b73747265616d3d31303b747970653d323b616464726573733d322e3234332e39352e3131333b706f72743d31303030313b4349443d310000000000010000b7f33aa51f00000033006272616e63683a6f726967696e2f70726f6a6563742f7775702d61676d6a206275696c643a335f385f31355f323030345f3000").unwrap(); let data = <(QResult, u32, Vec, ConnectionData, String) as RmcSerialize>::deserialize( &mut Cursor::new(stuff), - ).unwrap(); + ) + .unwrap(); println!("data: {:?}", data); } diff --git a/rnex-core/src/nex/matchmake.rs b/rnex-core/src/nex/matchmake.rs index a2f8c20..732c1ea 100644 --- a/rnex-core/src/nex/matchmake.rs +++ b/rnex-core/src/nex/matchmake.rs @@ -1,40 +1,48 @@ -use std::collections::HashMap; -use std::str::FromStr; -use std::sync::{Arc, Weak}; -use std::sync::atomic::AtomicU32; -use std::sync::atomic::Ordering::Relaxed; -use std::time::Duration; -use log::info; -use rand::random; -use tokio::sync::{Mutex, RwLock}; -use tokio::time::sleep; -use rnex_core::kerberos::KerberosDateTime; use crate::nex::user::User; use crate::rmc::protocols::notifications::{NotificationEvent, RemoteNotification}; -use rnex_core::rmc::protocols::notifications::notification_types::{HOST_CHANGED, OWNERSHIP_CHANGED}; +use log::info; +use rand::random; +use rnex_core::kerberos::KerberosDateTime; +use rnex_core::rmc::protocols::notifications::notification_types::{ + HOST_CHANGED, OWNERSHIP_CHANGED, +}; use rnex_core::rmc::response::ErrorCode; use rnex_core::rmc::response::ErrorCode::{Core_InvalidArgument, RendezVous_SessionVoid}; -use rnex_core::rmc::structures::matchmake::{Gathering, MatchmakeParam, MatchmakeSession, MatchmakeSessionSearchCriteria}; use rnex_core::rmc::structures::matchmake::gathering_flags::PERSISTENT_GATHERING; +use rnex_core::rmc::structures::matchmake::{ + Gathering, MatchmakeParam, MatchmakeSession, MatchmakeSessionSearchCriteria, +}; use rnex_core::rmc::structures::variant::Variant; +use std::collections::HashMap; +use std::str::FromStr; +use std::sync::atomic::AtomicU32; +use std::sync::atomic::Ordering::Relaxed; +use std::sync::{Arc, Weak}; +use std::time::Duration; +use tokio::sync::{Mutex, RwLock}; +use tokio::time::sleep; -pub struct MatchmakeManager{ - pub gid_counter: AtomicU32, +pub struct MatchmakeManager { + //pub gid_counter: AtomicU32, pub sessions: RwLock>>>, pub rv_cid_counter: AtomicU32, - pub users: RwLock>> + pub users: RwLock>>, } -impl MatchmakeManager{ - pub fn next_gid(&self) -> u32{ - self.gid_counter.fetch_add(1, Relaxed) +impl MatchmakeManager { + pub fn next_gid(&self) -> u32 { + random() + //self.gid_counter.fetch_add(1, Relaxed) } - pub fn next_cid(&self) -> u32{ + pub fn next_cid(&self) -> u32 { self.rv_cid_counter.fetch_add(1, Relaxed) } - pub async fn get_session(&self, gid: u32) -> Result>, ErrorCode>{ + pub async fn get_session( + &self, + gid: u32, + ) -> Result>, ErrorCode> { let sessions = self.sessions.read().await; let Some(session) = sessions.get(&gid) else { @@ -47,7 +55,7 @@ impl MatchmakeManager{ Ok(session) } - async fn garbage_collect(&self){ + async fn garbage_collect(&self) { info!("running rnex garbage collector over all sessions and users"); let mut idx = 0; @@ -61,12 +69,12 @@ impl MatchmakeManager{ let sessions = self.sessions.read().await; let session_pair = sessions.iter().nth(idx).map(|s| (*s.0, s.1.clone())); drop(sessions); - + session_pair - }{ + } { let session = session.lock().await; - if !session.is_reachable(){ + if !session.is_reachable() { to_be_deleted_gids.push(gid); } @@ -75,14 +83,14 @@ impl MatchmakeManager{ let mut sessions = self.sessions.write().await; - for gid in to_be_deleted_gids{ + for gid in to_be_deleted_gids { sessions.remove(&gid); } } - pub async fn initialize_garbage_collect_thread(this: Weak){ + pub async fn initialize_garbage_collect_thread(this: Weak) { tokio::spawn(async move { - while let Some(this) = this.upgrade(){ + while let Some(this) = this.upgrade() { this.garbage_collect().await; // every 30 minutes @@ -92,51 +100,62 @@ impl MatchmakeManager{ } } - #[derive(Default, Debug)] -pub struct ExtendedMatchmakeSession{ +pub struct ExtendedMatchmakeSession { pub session: MatchmakeSession, pub connected_players: Vec>, } -fn read_bounds_string(str: &str) -> Option<(T,T)>{ +fn read_bounds_string(str: &str) -> Option<(T, T)> { let bounds = str.split_once(",")?; Some((T::from_str(bounds.0).ok()?, T::from_str(bounds.1).ok()?)) } -fn check_bounds_str(compare: T, str: &str) -> Option{ +fn check_bounds_str(compare: T, str: &str) -> Option { let bounds: (T, T) = read_bounds_string(str)?; Some(bounds.0 <= compare && compare <= bounds.1) } -pub async fn broadcast_notification>(players: &[T], notification_event: &NotificationEvent){ - for player in players{ +pub async fn broadcast_notification>( + players: &[T], + notification_event: &NotificationEvent, +) { + for player in players { let player = player.as_ref(); - player.remote.process_notification_event(notification_event.clone()).await; + player + .remote + .process_notification_event(notification_event.clone()) + .await; } } -impl ExtendedMatchmakeSession{ +impl ExtendedMatchmakeSession { #[inline(always)] - pub fn get_active_players(&self) -> Vec>{ - self.connected_players.iter().filter_map(|u| u.upgrade()).collect() + pub fn get_active_players(&self) -> Vec> { + self.connected_players + .iter() + .filter_map(|u| u.upgrade()) + .collect() } #[inline(always)] - pub async fn broadcast_notification(&self, notification_event: &NotificationEvent){ + pub async fn broadcast_notification(&self, notification_event: &NotificationEvent) { broadcast_notification(&self.get_active_players(), notification_event).await; } - pub async fn from_matchmake_session(gid: u32, session: MatchmakeSession, host: &Weak) -> Self{ - let Some(host) = host.upgrade() else{ + pub async fn from_matchmake_session( + gid: u32, + session: MatchmakeSession, + host: &Weak, + ) -> Self { + let Some(host) = host.upgrade() else { return Default::default(); }; - - let mm_session = MatchmakeSession{ - gathering: Gathering{ + let mm_session = MatchmakeSession { + gathering: Gathering { self_gid: gid, owner_pid: host.pid, host_pid: host.pid, @@ -144,25 +163,25 @@ impl ExtendedMatchmakeSession{ }, datetime: KerberosDateTime::now(), session_key: (0..32).map(|_| random()).collect(), - matchmake_param: MatchmakeParam{ + matchmake_param: MatchmakeParam { params: vec![ ("@SR".to_owned(), Variant::Bool(true)), - ("@GIR".to_owned(), Variant::SInt64(3)) - ] + ("@GIR".to_owned(), Variant::SInt64(3)), + ], }, system_password_enabled: false, ..session }; - Self{ + Self { session: mm_session, - connected_players: Default::default() + connected_players: Default::default(), } } pub async fn add_players(&mut self, conns: &[Weak], join_msg: String) { let Some(initiating_user) = conns[0].upgrade() else { - return + return; }; let initiating_pid = initiating_user.pid; @@ -173,36 +192,42 @@ impl ExtendedMatchmakeSession{ } self.session.participation_count = self.connected_players.len() as u32; - for other_connection in &conns[1..]{ + for other_connection in &conns[1..] { let Some(other_conn) = other_connection.upgrade() else { continue; }; - let other_pid = other_conn.pid; /*if other_pid == self.session.gathering.owner_pid && joining_pid == self.session.gathering.owner_pid{ continue; }*/ - other_conn.remote.process_notification_event(NotificationEvent{ - pid_source: initiating_pid, - notif_type: 122000, - param_1: self.session.gathering.self_gid, - param_2: other_pid, - str_param: "".into(), - param_3: 0 - }).await; + other_conn + .remote + .process_notification_event(NotificationEvent { + pid_source: initiating_pid, + notif_type: 122000, + param_1: self.session.gathering.self_gid, + param_2: other_pid, + str_param: "".into(), + param_3: 0, + }) + .await; } - let list_of_connected_pids: Vec<_> = self.connected_players.iter().filter_map(|p| p.upgrade()).map(|p| p.pid).collect(); + let list_of_connected_pids: Vec<_> = self + .connected_players + .iter() + .filter_map(|p| p.upgrade()) + .map(|p| p.pid) + .collect(); - for other_connection in conns{ + for other_connection in conns { let Some(other_conn) = other_connection.upgrade() else { continue; }; - // let other_pid = other_conn.pid; /*if other_pid == self.session.gathering.owner_pid && joining_pid == self.session.gathering.owner_pid{ @@ -210,44 +235,52 @@ impl ExtendedMatchmakeSession{ }*/ for pid in &list_of_connected_pids { - other_conn.remote.process_notification_event(NotificationEvent { - pid_source: initiating_pid, - notif_type: 3001, - param_1: self.session.gathering.self_gid, - param_2: *pid, - str_param: join_msg.clone(), - param_3: self.connected_players.len() as _ - }).await; + other_conn + .remote + .process_notification_event(NotificationEvent { + pid_source: initiating_pid, + notif_type: 3001, + param_1: self.session.gathering.self_gid, + param_2: *pid, + str_param: join_msg.clone(), + param_3: self.connected_players.len() as _, + }) + .await; } } - for old_conns in &old_particip{ + for old_conns in &old_particip { let Some(old_conns) = old_conns.upgrade() else { continue; }; let older_pid = old_conns.pid; - - - initiating_user.remote.process_notification_event(NotificationEvent{ - pid_source: initiating_pid, - notif_type: 3001, - param_1: self.session.gathering.self_gid, - param_2: older_pid, - str_param: join_msg.clone(), - param_3: self.connected_players.len() as _ - }).await; + initiating_user + .remote + .process_notification_event(NotificationEvent { + pid_source: initiating_pid, + notif_type: 3001, + param_1: self.session.gathering.self_gid, + param_2: older_pid, + str_param: join_msg.clone(), + param_3: self.connected_players.len() as _, + }) + .await; } } - pub fn has_active_players(&self) -> bool{ - self.connected_players.iter().filter(|v| v.upgrade().is_some()).count() != 0 + pub fn has_active_players(&self) -> bool { + self.connected_players + .iter() + .filter(|v| v.upgrade().is_some()) + .count() + != 0 } #[inline] - pub fn is_reachable(&self) -> bool{ - (if self.session.gathering.flags & PERSISTENT_GATHERING != 0{ - if self.has_active_players(){ + pub fn is_reachable(&self) -> bool { + (if self.session.gathering.flags & PERSISTENT_GATHERING != 0 { + if self.has_active_players() { true } else { self.session.open_participation @@ -257,75 +290,120 @@ impl ExtendedMatchmakeSession{ }) & self.has_active_players() } #[inline] - pub fn is_joinable(&self) -> bool{ + pub fn is_joinable(&self) -> bool { self.is_reachable() && self.session.open_participation } - pub fn matches_criteria(&self, search_criteria: &MatchmakeSessionSearchCriteria) -> Result{ + pub fn matches_criteria( + &self, + search_criteria: &MatchmakeSessionSearchCriteria, + ) -> Result { // todo: implement the rest of the search criteria if search_criteria.vacant_only { - if (self.connected_players.len() as u16 + search_criteria.vacant_participants) > self.session.gathering.maximum_participants{ + if (self.connected_players.len() as u16 + search_criteria.vacant_participants) + > self.session.gathering.maximum_participants + { return Ok(false); } } - if search_criteria.exclude_locked{ - if !self.session.open_participation{ + if search_criteria.exclude_locked { + if !self.session.open_participation { return Ok(false); } } - if search_criteria.exclude_system_password_set{ - if self.session.system_password_enabled{ + if search_criteria.exclude_system_password_set { + if self.session.system_password_enabled { return Ok(false); } } - if search_criteria.exclude_user_password_set{ - if self.session.user_password_enabled{ + if search_criteria.exclude_user_password_set { + if self.session.user_password_enabled { return Ok(false); } } - if !check_bounds_str(self.session.gathering.minimum_participants, &search_criteria.minimum_participants).ok_or(Core_InvalidArgument)? { + if !check_bounds_str( + self.session.gathering.minimum_participants, + &search_criteria.minimum_participants, + ) + .ok_or(Core_InvalidArgument)? + { return Ok(false); } - if !check_bounds_str(self.session.gathering.maximum_participants, &search_criteria.maximum_participants).ok_or(Core_InvalidArgument)? { + if !check_bounds_str( + self.session.gathering.maximum_participants, + &search_criteria.maximum_participants, + ) + .ok_or(Core_InvalidArgument)? + { return Ok(false); } - let game_mode: u32 = search_criteria.game_mode.parse().map_err(|_| Core_InvalidArgument)?; + let game_mode: u32 = search_criteria + .game_mode + .parse() + .map_err(|_| Core_InvalidArgument)?; - if self.session.gamemode != game_mode{ + if self.session.gamemode != game_mode { return Ok(false); } - let mm_sys_type: u32 = search_criteria.matchmake_system_type.parse().map_err(|_| Core_InvalidArgument)?; + let mm_sys_type: u32 = search_criteria + .matchmake_system_type + .parse() + .map_err(|_| Core_InvalidArgument)?; - if self.session.matchmake_system_type != mm_sys_type{ + if self.session.matchmake_system_type != mm_sys_type { return Ok(false); } - - if search_criteria.attribs.get(0).map(|str| str.parse().ok()).flatten() != self.session.attributes.get(0).map(|v| *v){ + if search_criteria + .attribs + .get(0) + .map(|str| str.parse().ok()) + .flatten() + != self.session.attributes.get(0).map(|v| *v) + { return Ok(false); } - if search_criteria.attribs.get(2).map(|str| str.parse().ok()).flatten() != self.session.attributes.get(2).map(|v| *v){ + if search_criteria + .attribs + .get(2) + .map(|str| str.parse().ok()) + .flatten() + != self.session.attributes.get(2).map(|v| *v) + { return Ok(false); } - if search_criteria.attribs.get(3).map(|str| str.parse().ok()).flatten() != self.session.attributes.get(3).map(|v| *v){ + if search_criteria + .attribs + .get(3) + .map(|str| str.parse().ok()) + .flatten() + != self.session.attributes.get(3).map(|v| *v) + { return Ok(false); } Ok(true) } - pub async fn migrate_ownership(&mut self, initiator_pid: u32) -> Result<(), ErrorCode>{ - let players: Vec<_> = self.connected_players.iter().filter_map(|p| p.upgrade()).collect(); + pub async fn migrate_ownership(&mut self, initiator_pid: u32) -> Result<(), ErrorCode> { + let players: Vec<_> = self + .connected_players + .iter() + .filter_map(|p| p.upgrade()) + .collect(); - let Some(new_owner) = players.iter().find(|p| p.pid != self.session.gathering.owner_pid) else { + let Some(new_owner) = players + .iter() + .find(|p| p.pid != self.session.gathering.owner_pid) + else { self.session.gathering.owner_pid = 0; return Ok(()); @@ -333,36 +411,44 @@ impl ExtendedMatchmakeSession{ self.session.gathering.owner_pid = new_owner.pid; - self.broadcast_notification(&NotificationEvent{ + self.broadcast_notification(&NotificationEvent { pid_source: initiator_pid, notif_type: OWNERSHIP_CHANGED, param_1: self.session.gathering.self_gid, param_2: new_owner.pid, ..Default::default() - }).await; + }) + .await; Ok(()) } - pub async fn migrate_host(&mut self, initiator_pid: u32) -> Result<(), ErrorCode>{ + pub async fn migrate_host(&mut self, initiator_pid: u32) -> Result<(), ErrorCode> { // let players: Vec<_> = self.connected_players.iter().filter_map(|p| p.upgrade()).collect(); self.session.gathering.host_pid = self.session.gathering.owner_pid; - self.broadcast_notification(&NotificationEvent{ + self.broadcast_notification(&NotificationEvent { pid_source: initiator_pid, notif_type: HOST_CHANGED, param_1: self.session.gathering.self_gid, ..Default::default() - }).await; + }) + .await; Ok(()) } - pub async fn remove_player_from_session(&mut self, pid: u32, message: &str) -> Result<(), ErrorCode>{ - self.connected_players.retain(|u| u.upgrade().is_some_and(|u| u.pid != pid)); + pub async fn remove_player_from_session( + &mut self, + pid: u32, + message: &str, + ) -> Result<(), ErrorCode> { + self.connected_players + .retain(|u| u.upgrade().is_some_and(|u| u.pid != pid)); - self.session.participation_count = (self.connected_players.len() & u32::MAX as usize) as u32; + self.session.participation_count = + (self.connected_players.len() & u32::MAX as usize) as u32; if pid == self.session.gathering.owner_pid { self.migrate_ownership(pid).await?; @@ -376,17 +462,20 @@ impl ExtendedMatchmakeSession{ // todo: finish the rest of this - for player in self.connected_players.iter().filter_map(|p| p.upgrade()){ - player.remote.process_notification_event(NotificationEvent{ - notif_type: 3008, - pid_source: pid, - param_1: self.session.gathering.self_gid, - param_2: pid, - str_param: message.to_owned(), - .. Default::default() - }).await; + for player in self.connected_players.iter().filter_map(|p| p.upgrade()) { + player + .remote + .process_notification_event(NotificationEvent { + notif_type: 3008, + pid_source: pid, + param_1: self.session.gathering.self_gid, + param_2: pid, + str_param: message.to_owned(), + ..Default::default() + }) + .await; } Ok(()) } -} \ No newline at end of file +} diff --git a/rnex-core/src/nex/user.rs b/rnex-core/src/nex/user.rs index dd30cb4..221c90a 100644 --- a/rnex-core/src/nex/user.rs +++ b/rnex-core/src/nex/user.rs @@ -1,33 +1,41 @@ use crate::define_rmc_proto; use crate::nex::matchmake::{ExtendedMatchmakeSession, MatchmakeManager}; use crate::nex::remote_console::RemoteConsole; -use rnex_core::prudp::station_url::StationUrl; -use rnex_core::prudp::station_url::UrlOptions::{ - Address, NatFiltering, NatMapping, NatType, Port, PrincipalID, RVConnectionID, - -}; use crate::rmc::protocols::matchmake::{ Matchmake, RawMatchmake, RawMatchmakeInfo, RemoteMatchmake, }; -use rnex_core::rmc::protocols::ranking::{Ranking, RawRanking, RawRankingInfo, RemoteRanking}; +use crate::rmc::protocols::nat_traversal::{ + NatTraversal, RawNatTraversal, RawNatTraversalInfo, RemoteNatTraversal, + RemoteNatTraversalConsole, +}; +use rnex_core::prudp::station_url::StationUrl; +use rnex_core::prudp::station_url::UrlOptions::{ + Address, NatFiltering, NatMapping, NatType, Port, PrincipalID, RVConnectionID, +}; +use rnex_core::rmc::protocols::matchmake_ext::{ + MatchmakeExt, RawMatchmakeExt, RawMatchmakeExtInfo, RemoteMatchmakeExt, +}; use rnex_core::rmc::protocols::matchmake_extension::{ MatchmakeExtension, RawMatchmakeExtension, RawMatchmakeExtensionInfo, RemoteMatchmakeExtension, }; -use crate::rmc::protocols::nat_traversal::{NatTraversal, RawNatTraversal, RawNatTraversalInfo, RemoteNatTraversal, RemoteNatTraversalConsole}; +use rnex_core::rmc::protocols::ranking::{Ranking, RawRanking, RawRankingInfo, RemoteRanking}; use rnex_core::rmc::protocols::secure::{RawSecure, RawSecureInfo, RemoteSecure, Secure}; -use rnex_core::rmc::protocols::matchmake_ext::{MatchmakeExt, RawMatchmakeExt, RawMatchmakeExtInfo, RemoteMatchmakeExt}; use rnex_core::rmc::response::ErrorCode; -use rnex_core::rmc::structures::matchmake::{AutoMatchmakeParam, CreateMatchmakeSessionParam, JoinMatchmakeSessionParam, MatchmakeSession}; +use rnex_core::rmc::structures::matchmake::{ + AutoMatchmakeParam, CreateMatchmakeSessionParam, JoinMatchmakeSessionParam, MatchmakeSession, +}; -use rnex_core::rmc::structures::qresult::QResult; -use macros::rmc_struct; -use std::sync::{Arc, Weak}; +use crate::rmc::protocols::notifications::{NotificationEvent, RemoteNotification}; use log::info; -use tokio::sync::{Mutex, RwLock}; +use macros::rmc_struct; use rnex_core::prudp::socket_addr::PRUDPSockAddr; use rnex_core::prudp::station_url::nat_types::PUBLIC; -use crate::rmc::protocols::notifications::{NotificationEvent, RemoteNotification}; -use rnex_core::rmc::response::ErrorCode::{Core_Exception, Core_InvalidArgument, RendezVous_AccountExpired}; +use rnex_core::rmc::response::ErrorCode::{ + Core_Exception, Core_InvalidArgument, RendezVous_AccountExpired, +}; +use rnex_core::rmc::structures::qresult::QResult; +use std::sync::{Arc, Weak}; +use tokio::sync::{Mutex, RwLock}; define_rmc_proto!( proto UserProtocol{ @@ -78,14 +86,14 @@ impl Secure for User { let Some(nat_filtering) = station.options.iter().find_map(|v| match v { NatFiltering(v) => Some(v), - _ => None + _ => None, }) else { return Err(Core_Exception); }; let Some(nat_mapping) = station.options.iter().find_map(|v| match v { NatMapping(v) => Some(v), - _ => None + _ => None, }) else { return Err(Core_Exception); }; @@ -108,15 +116,17 @@ impl Secure for User { } else { let mut public_station = private_station.clone(); - public_station.options.retain(|v| { - match v { - Address(_) | Port(_) | NatFiltering(_) | NatMapping(_) | NatType(_) => false, - _ => true - } + public_station.options.retain(|v| match v { + Address(_) | Port(_) | NatFiltering(_) | NatMapping(_) | NatType(_) => false, + _ => true, }); - public_station.options.push(Address(*self.ip.regular_socket_addr.ip())); - public_station.options.push(Port(self.ip.regular_socket_addr.port())); + public_station + .options + .push(Address(*self.ip.regular_socket_addr.ip())); + public_station + .options + .push(Port(self.ip.regular_socket_addr.port())); public_station.options.push(NatFiltering(0)); public_station.options.push(NatMapping(0)); public_station.options.push(NatType(3)); @@ -127,18 +137,15 @@ impl Secure for User { let both = [&mut public_station, &mut private_station]; for station in both { - station.options.retain(|v| { - match v { - PrincipalID(_) | RVConnectionID(_) => false, - _ => true - } + station.options.retain(|v| match v { + PrincipalID(_) | RVConnectionID(_) => false, + _ => true, }); station.options.push(PrincipalID(self.pid)); station.options.push(RVConnectionID(cid)); } - let mut lock = self.station_url.write().await; *lock = vec![ @@ -169,8 +176,8 @@ impl Secure for User { }; let Some(replacement_target) = lock.iter_mut().find(|url| { - url.options.iter().any(|o| o == target_addr) && - url.options.iter().any(|o| o == target_port) + url.options.iter().any(|o| o == target_addr) + && url.options.iter().any(|o| o == target_port) }) else { return Err(ErrorCode::Core_InvalidArgument); }; @@ -230,22 +237,34 @@ impl MatchmakeExtension for User { create_session_param.matchmake_session, &self.this.clone(), ) - .await; + .await; let mut joining_players = vec![self.this.clone()]; let users = self.matchmake_manager.users.read().await; - if let Ok(old_gathering) = self.matchmake_manager.get_session(create_session_param.gid_for_participation_check).await { + if let Ok(old_gathering) = self + .matchmake_manager + .get_session(create_session_param.gid_for_participation_check) + .await + { let old_gathering = old_gathering.lock().await; - let players = old_gathering.connected_players.iter().filter_map(|v| v.upgrade()).filter(|u| create_session_param.additional_participants.iter().any(|p| *p == u.pid)); + let players = old_gathering + .connected_players + .iter() + .filter_map(|v| v.upgrade()) + .filter(|u| { + create_session_param + .additional_participants + .iter() + .any(|p| *p == u.pid) + }); for player in players { joining_players.push(Arc::downgrade(&player)); } } - drop(users); new_session.session.participation_count = create_session_param.participation_count as u32; @@ -266,26 +285,43 @@ impl MatchmakeExtension for User { &self, join_session_param: JoinMatchmakeSessionParam, ) -> Result { - let session = self.matchmake_manager.get_session(join_session_param.gid).await?; + let session = self + .matchmake_manager + .get_session(join_session_param.gid) + .await?; let mut session = session.lock().await; - - if session.session.user_password_enabled{ - if join_session_param.user_password != session.session.user_password{ - return Err(ErrorCode::RendezVous_InvalidPassword) - } + if session.session.user_password_enabled { + if join_session_param.user_password != session.session.user_password { + return Err(ErrorCode::RendezVous_InvalidPassword); + } } - session.connected_players.retain(|v| v.upgrade().is_some_and(|v| v.pid != self.pid)); + session + .connected_players + .retain(|v| v.upgrade().is_some_and(|v| v.pid != self.pid)); let mut joining_players = vec![self.this.clone()]; let users = self.matchmake_manager.users.read().await; - if let Ok(old_gathering) = self.matchmake_manager.get_session(join_session_param.gid_for_participation_check).await { + if let Ok(old_gathering) = self + .matchmake_manager + .get_session(join_session_param.gid_for_participation_check) + .await + { let old_gathering = old_gathering.lock().await; - let players = old_gathering.connected_players.iter().filter_map(|v| v.upgrade()).filter(|u| join_session_param.additional_participants.iter().any(|p| *p == u.pid)); + let players = old_gathering + .connected_players + .iter() + .filter_map(|v| v.upgrade()) + .filter(|u| { + join_session_param + .additional_participants + .iter() + .any(|p| *p == u.pid) + }); for player in players { joining_players.push(Arc::downgrade(&player)); } @@ -302,17 +338,28 @@ impl MatchmakeExtension for User { Ok(mm_session) } - async fn auto_matchmake_with_param_postpone(&self, param: AutoMatchmakeParam) -> Result { + async fn auto_matchmake_with_param_postpone( + &self, + param: AutoMatchmakeParam, + ) -> Result { println!("{:?}", param); let mut joining_players = vec![self.this.clone()]; let users = self.matchmake_manager.users.read().await; - if let Ok(old_gathering) = self.matchmake_manager.get_session(param.gid_for_participation_check).await { + if let Ok(old_gathering) = self + .matchmake_manager + .get_session(param.gid_for_participation_check) + .await + { let old_gathering = old_gathering.lock().await; - let players = old_gathering.connected_players.iter().filter_map(|v| v.upgrade()).filter(|u| param.additional_participants.iter().any(|p| *p == u.pid)); + let players = old_gathering + .connected_players + .iter() + .filter_map(|v| v.upgrade()) + .filter(|u| param.additional_participants.iter().any(|p| *p == u.pid)); for player in players { joining_players.push(Arc::downgrade(&player)); } @@ -339,7 +386,9 @@ impl MatchmakeExtension for User { } if bool_matched_criteria { - session.add_players(&joining_players, param.join_message).await; + session + .add_players(&joining_players, param.join_message) + .await; return Ok(session.session.clone()); } @@ -365,17 +414,26 @@ impl MatchmakeExtension for User { create_matchmake_session_option: 0, matchmake_session, additional_participants, - }).await + }) + .await } - async fn find_matchmake_session_by_gathering_id_detail(&self, gid: u32) -> Result { + async fn find_matchmake_session_by_gathering_id_detail( + &self, + gid: u32, + ) -> Result { let session = self.matchmake_manager.get_session(gid).await?; let session = session.lock().await; Ok(session.session.clone()) } - async fn modify_current_game_attribute(&self, gid: u32, attrib_index: u32, attrib_val: u32) -> Result<(), ErrorCode> { + async fn modify_current_game_attribute( + &self, + gid: u32, + attrib_index: u32, + attrib_val: u32, + ) -> Result<(), ErrorCode> { let session = self.matchmake_manager.get_session(gid).await?; let mut session = session.lock().await; @@ -394,30 +452,30 @@ impl Matchmake for User { let session = session.lock().await; - let urls: Vec<_> = - session - .connected_players - .iter() - .filter_map(|v| v.upgrade()) - .filter(|u| u.pid == session.session.gathering.host_pid) - .map(|u| async move { - u.station_url.read().await.clone() - }) - .next() - .ok_or(ErrorCode::RendezVous_SessionClosed)? - .await; - + let urls: Vec<_> = session + .connected_players + .iter() + .filter_map(|v| v.upgrade()) + .filter(|u| u.pid == session.session.gathering.host_pid) + .map(|u| async move { u.station_url.read().await.clone() }) + .next() + .ok_or(ErrorCode::RendezVous_SessionClosed)? + .await; println!("{:?}", urls); - - if urls.is_empty(){ - return Err(ErrorCode::RendezVous_NotParticipatedGathering) + + if urls.is_empty() { + return Err(ErrorCode::RendezVous_NotParticipatedGathering); } Ok(urls) } - async fn update_session_host(&self, gid: u32, change_session_owner: bool) -> Result<(), ErrorCode> { + async fn update_session_host( + &self, + gid: u32, + change_session_owner: bool, + ) -> Result<(), ErrorCode> { let session = self.matchmake_manager.get_session(gid).await?; let mut session = session.lock().await; @@ -428,40 +486,50 @@ impl Matchmake for User { continue; }; - player.remote.process_notification_event(NotificationEvent { - notif_type: 110000, - pid_source: self.pid, - param_1: gid, - param_2: self.pid, - param_3: 0, - str_param: "".to_string(), - }).await; + player + .remote + .process_notification_event(NotificationEvent { + notif_type: 110000, + pid_source: self.pid, + param_1: gid, + param_2: self.pid, + param_3: 0, + str_param: "".to_string(), + }) + .await; } if change_session_owner { session.session.gathering.owner_pid = self.pid; - for player in &session.connected_players { let Some(player) = player.upgrade() else { continue; }; - player.remote.process_notification_event(NotificationEvent { - notif_type: 4000, - pid_source: self.pid, - param_1: gid, - param_2: self.pid, - param_3: 0, - str_param: "".to_string(), - }).await; + player + .remote + .process_notification_event(NotificationEvent { + notif_type: 4000, + pid_source: self.pid, + param_1: gid, + param_2: self.pid, + param_3: 0, + str_param: "".to_string(), + }) + .await; } } Ok(()) } - async fn migrate_gathering_ownership(&self, gid: u32, candidates: Vec, _participants_only: bool) -> Result<(), ErrorCode> { + async fn migrate_gathering_ownership( + &self, + gid: u32, + candidates: Vec, + _participants_only: bool, + ) -> Result<(), ErrorCode> { let session = self.matchmake_manager.get_session(gid).await?; let mut session = session.lock().await; @@ -474,14 +542,17 @@ impl Matchmake for User { continue; }; - player.remote.process_notification_event(NotificationEvent { - notif_type: 4000, - pid_source: self.pid, - param_1: gid, - param_2: *candidate, - param_3: 0, - str_param: "".to_string(), - }).await; + player + .remote + .process_notification_event(NotificationEvent { + notif_type: 4000, + pid_source: self.pid, + param_1: gid, + param_2: *candidate, + param_3: 0, + str_param: "".to_string(), + }) + .await; } Ok(()) @@ -493,12 +564,12 @@ impl MatchmakeExt for User { let session = self.matchmake_manager.get_session(gid).await?; let mut session = session.lock().await; - session.remove_player_from_session(self.pid, &message).await?; + session + .remove_player_from_session(self.pid, &message) + .await?; Ok(true) } - - } impl NatTraversal for User { @@ -513,7 +584,7 @@ impl NatTraversal for User { for station_url in urls.iter_mut() { station_url.options.retain(|o| match o { NatMapping(_) | NatFiltering(_) => false, - _ => true + _ => true, }); station_url.options.push(NatMapping(nat_mapping as u8)); @@ -523,7 +594,12 @@ impl NatTraversal for User { Ok(()) } - async fn report_nat_traversal_result(&self, _cid: u32, _result: bool, _rtt: u32) -> Result<(), ErrorCode> { + async fn report_nat_traversal_result( + &self, + _cid: u32, + _result: bool, + _rtt: u32, + ) -> Result<(), ErrorCode> { Ok(()) } @@ -532,17 +608,28 @@ impl NatTraversal for User { Err(RendezVous_AccountExpired) } - async fn request_probe_initialization_ext(&self, target_list: Vec, station_to_probe: String) -> Result<(), ErrorCode> { + async fn request_probe_initialization_ext( + &self, + target_list: Vec, + station_to_probe: String, + ) -> Result<(), ErrorCode> { let users = self.matchmake_manager.users.read().await; - println!("requesting station probe for {:?} to {:?}", target_list, station_to_probe); + println!( + "requesting station probe for {:?} to {:?}", + target_list, station_to_probe + ); for target in target_list { let Ok(url) = StationUrl::try_from(target.as_ref()) else { continue; }; - let Some(RVConnectionID(v)) = url.options.into_iter().find(|o| { matches!(o, &RVConnectionID(_)) }) else { + let Some(RVConnectionID(v)) = url + .options + .into_iter() + .find(|o| matches!(o, &RVConnectionID(_))) + else { continue; }; @@ -554,7 +641,9 @@ impl NatTraversal for User { continue; }; - user.remote.request_probe_initiation(station_to_probe.clone()).await; + user.remote + .request_probe_initiation(station_to_probe.clone()) + .await; } info!("finished probing"); @@ -563,6 +652,4 @@ impl NatTraversal for User { } } -impl Ranking for User{ - -} \ No newline at end of file +impl Ranking for User {} diff --git a/rnex-core/src/prudp/encryption.rs b/rnex-core/src/prudp/encryption.rs new file mode 100644 index 0000000..aef3739 --- /dev/null +++ b/rnex-core/src/prudp/encryption.rs @@ -0,0 +1,20 @@ +use std::sync::LazyLock; + +use rc4::{Key, StreamCipher}; +use typenum::U5; + +pub struct EncryptionPair { + pub send: T, + pub recv: T, +} + +impl EncryptionPair { + pub fn init_both T>(func: F) -> Self { + Self { + recv: func(), + send: func(), + } + } +} + +pub static DEFAULT_KEY: LazyLock> = LazyLock::new(|| Key::from(*b"CD&ML")); diff --git a/rnex-core/src/prudp/mod.rs b/rnex-core/src/prudp/mod.rs index 093723d..fdc991a 100644 --- a/rnex-core/src/prudp/mod.rs +++ b/rnex-core/src/prudp/mod.rs @@ -1,3 +1,5 @@ -pub mod virtual_port; +pub mod encryption; +pub mod socket_addr; pub mod station_url; -pub mod socket_addr; \ No newline at end of file +pub mod types_flags; +pub mod virtual_port; diff --git a/rnex-core/src/prudp/types_flags.rs b/rnex-core/src/prudp/types_flags.rs new file mode 100644 index 0000000..e9886e4 --- /dev/null +++ b/rnex-core/src/prudp/types_flags.rs @@ -0,0 +1,64 @@ +use std::fmt::{Debug, Formatter}; + +use bytemuck::{Pod, Zeroable}; +use v_byte_helpers::SwapEndian; + +#[repr(transparent)] +#[derive(Copy, Clone, Pod, Zeroable, SwapEndian, Default, Eq, PartialEq)] +pub struct TypesFlags(pub u16); + +impl TypesFlags { + #[inline] + pub const fn get_types(self) -> u8 { + (self.0 & 0x000F) as u8 + } + #[inline] + pub const fn get_flags(self) -> u16 { + (self.0 & 0xFFF0) >> 4 + } + #[inline] + pub const fn types(self, val: u8) -> Self { + Self((self.0 & 0xFFF0) | (val as u16 & 0x000F)) + } + #[inline] + pub const fn flags(self, val: u16) -> Self { + Self((self.0 & 0x000F) | ((val << 4) & 0xFFF0)) + } + #[inline] + pub const fn set_flag(&mut self, val: u16) { + self.0 |= (val & 0xFFF) << 4; + } + #[inline] + pub const fn set_types(&mut self, val: u8) { + self.0 |= val as u16 & 0x0F; + } +} +impl Debug for TypesFlags { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let stream_type = self.get_types(); + let port_number = self.get_flags(); + write!( + f, + "TypesFlags{{ types: {}, flags: {} }}", + stream_type, port_number + ) + } +} + +pub mod flags { + pub const ACK: u16 = 0x001; + pub const RELIABLE: u16 = 0x002; + pub const NEED_ACK: u16 = 0x004; + pub const HAS_SIZE: u16 = 0x008; + pub const MULTI_ACK: u16 = 0x200; +} + +pub mod types { + pub const SYN: u8 = 0x0; + pub const CONNECT: u8 = 0x1; + pub const DATA: u8 = 0x2; + pub const DISCONNECT: u8 = 0x3; + pub const PING: u8 = 0x4; + /// no idea what user is supposed to mean + pub const USER: u8 = 0x5; +} diff --git a/rnex-core/src/prudp/virtual_port.rs b/rnex-core/src/prudp/virtual_port.rs index 54d2eeb..625709b 100644 --- a/rnex-core/src/prudp/virtual_port.rs +++ b/rnex-core/src/prudp/virtual_port.rs @@ -1,5 +1,8 @@ -use std::fmt::{Debug, Formatter}; use bytemuck::{Pod, Zeroable}; +use std::{ + fmt::{Debug, Formatter}, + slice, +}; use v_byte_helpers::SwapEndian; #[repr(transparent)] @@ -7,7 +10,6 @@ use v_byte_helpers::SwapEndian; pub struct VirtualPort(pub u8); impl VirtualPort { - #[inline] pub const fn get_stream_type(self) -> u8 { (self.0 & 0xF0) >> 4 @@ -38,12 +40,21 @@ impl VirtualPort { pub fn new(port: u8, stream_type: u8) -> Self { Self(0).stream_type(stream_type).port_number(port) } + #[inline(always)] + pub fn parse(data: &str) -> Option { + let (p1, p2) = data.split_once(':')?; + Some(Self::new(p1.parse().ok()?, p2.parse().ok()?)) + } } impl Debug for VirtualPort { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let stream_type = self.get_stream_type(); let port_number = self.get_port_number(); - write!(f, "VirtualPort{{ stream_type: {}, port_number: {} }}", stream_type, port_number) + write!( + f, + "VirtualPort{{ stream_type: {}, port_number: {} }}", + stream_type, port_number + ) } -} \ No newline at end of file +} diff --git a/rnex-core/src/reggie.rs b/rnex-core/src/reggie.rs index 47bf2d2..39f12de 100644 --- a/rnex-core/src/reggie.rs +++ b/rnex-core/src/reggie.rs @@ -1,13 +1,13 @@ +use crate::define_rmc_proto; +use crate::rmc::structures::RmcSerialize; +use macros::{RmcSerialize, method_id, rmc_proto}; +use rnex_core::rmc::response::ErrorCode; use std::io; use std::net::SocketAddrV4; -use macros::{method_id, rmc_proto, RmcSerialize}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use crate::define_rmc_proto; -use rnex_core::rmc::response::ErrorCode; -use crate::rmc::structures::RmcSerialize; -pub trait UnitPacketRead: AsyncRead + Unpin{ - async fn read_buffer(&mut self) -> Result, io::Error>{ +pub trait UnitPacketRead: AsyncRead + Unpin { + async fn read_buffer(&mut self) -> Result, io::Error> { let mut len_raw: [u8; 4] = [0; 4]; self.read_exact(&mut len_raw).await?; @@ -22,12 +22,13 @@ pub trait UnitPacketRead: AsyncRead + Unpin{ } } -impl UnitPacketRead for T{} -pub trait UnitPacketWrite: AsyncWrite + Unpin{ +impl UnitPacketRead for T {} +pub trait UnitPacketWrite: AsyncWrite + Unpin { async fn send_buffer(&mut self, data: &[u8]) -> Result<(), io::Error> { let mut dest_data = Vec::new(); - data.serialize(&mut dest_data).expect("ran out of memory or something"); + data.serialize(&mut dest_data) + .expect("ran out of memory or something"); self.write_all(&dest_data[..]).await?; @@ -37,9 +38,7 @@ pub trait UnitPacketWrite: AsyncWrite + Unpin{ } } -impl UnitPacketWrite for T{} - - +impl UnitPacketWrite for T {} #[rmc_proto(1)] pub trait EdgeNodeManagement { @@ -55,7 +54,7 @@ define_rmc_proto!( #[derive(RmcSerialize, Debug)] #[repr(u32)] -pub enum EdgeNodeHolderConnectOption{ +pub enum EdgeNodeHolderConnectOption { DontRegister = 0, - Register(SocketAddrV4) = 1 + Register(SocketAddrV4) = 1, } diff --git a/rnex-core/src/rmc/protocols/mod.rs b/rnex-core/src/rmc/protocols/mod.rs index 0b6fc65..fb0f055 100644 --- a/rnex-core/src/rmc/protocols/mod.rs +++ b/rnex-core/src/rmc/protocols/mod.rs @@ -1,20 +1,21 @@ #![allow(async_fn_in_trait)] pub mod auth; -pub mod secure; -pub mod notifications; pub mod matchmake; +pub mod matchmake_ext; pub mod matchmake_extension; pub mod nat_traversal; -pub mod matchmake_ext; +pub mod notifications; pub mod ranking; +pub mod secure; -use crate::util::{SendingBufferConnection, SplittableBufferConnection}; +use crate::result::ResultExtension; use crate::rmc::message::RMCMessage; use crate::rmc::protocols::RemoteCallError::ConnectionBroke; use crate::rmc::response::{ErrorCode, RMCResponse, RMCResponseResult}; use crate::rmc::structures; use crate::rmc::structures::RmcSerialize; +use crate::util::{SendingBufferConnection, SplittableBufferConnection}; use log::{error, info}; use std::collections::HashMap; use std::future::Future; @@ -24,8 +25,7 @@ use std::sync::Arc; use std::time::Duration; use thiserror::Error; use tokio::sync::{Mutex, Notify}; -use tokio::time::{sleep, sleep_until, Instant}; -use crate::result::ResultExtension; +use tokio::time::{Instant, sleep, sleep_until}; #[derive(Error, Debug)] pub enum RemoteCallError { @@ -68,7 +68,7 @@ impl RmcConnection { Ok(()) } - pub async fn disconnect(&self){ + pub async fn disconnect(&self) { self.0.disconnect().await; } } @@ -92,15 +92,11 @@ impl RmcResponseReceiver { let mut locked = self.1.lock().await; if let Some(v) = locked.remove(&call_id) { - match v.response_result{ - RMCResponseResult::Success { - data, - .. - } => return Ok(data), - RMCResponseResult::Error { - error_code, - .. - } => return Err(RemoteCallError::ServerError(error_code)) + match v.response_result { + RMCResponseResult::Success { data, .. } => return Ok(data), + RMCResponseResult::Error { error_code, .. } => { + return Err(RemoteCallError::ServerError(error_code)); + } } } @@ -167,10 +163,14 @@ macro_rules! define_rmc_proto { pub struct [](rnex_core::rmc::protocols::RmcConnection); - impl rnex_core::rmc::protocols::RemoteInstantiatable for []{ + impl rnex_core::rmc::protocols::RmcPureRemoteObject for []{ fn new(conn: rnex_core::rmc::protocols::RmcConnection) -> Self{ Self(conn) } + } + + impl rnex_core::rmc::protocols::RemoteDisconnectable for []{ + async fn disconnect(&self){ self.0.disconnect().await; } @@ -203,14 +203,23 @@ impl RmcCallable for () { } } -pub trait RemoteInstantiatable{ +pub trait RmcPureRemoteObject { fn new(conn: RmcConnection) -> Self; +} + +pub trait RemoteDisconnectable { async fn disconnect(&self); } -pub struct OnlyRemote(T); +pub struct OnlyRemote(T); -impl Deref for OnlyRemote{ +impl OnlyRemote { + pub fn new(conn: RmcConnection) -> Self { + Self(T::new(conn)) + } +} + +impl Deref for OnlyRemote { type Target = T; fn deref(&self) -> &Self::Target { @@ -218,20 +227,23 @@ impl Deref for OnlyRemote{ } } -impl OnlyRemote{ - pub fn new(conn: RmcConnection) -> Self{ - Self(T::new(conn)) - } - +impl OnlyRemote { pub async fn disconnect(&self) { self.0.disconnect().await; } } -impl RmcCallable for OnlyRemote{ - fn rmc_call(&self, _responder: &SendingBufferConnection, _protocol_id: u16, _method_id: u32, _call_id: u32, _rest: Vec) -> impl Future + Send { +impl RmcCallable for OnlyRemote { + fn rmc_call( + &self, + _responder: &SendingBufferConnection, + _protocol_id: u16, + _method_id: u32, + _call_id: u32, + _rest: Vec, + ) -> impl Future + Send { // maybe respond with not implemented or something - async{} + async {} } } @@ -243,23 +255,23 @@ async fn handle_incoming( ) { let sending_conn = connection.duplicate_sender(); - while let Some(v) = connection.recv().await{ + while let Some(v) = connection.recv().await { let Some(proto_id) = v.get(4) else { error!("received too small rmc message."); error!("ending rmc gateway."); - return + return; }; // protocol 0 is hardcoded to be the no protocol protocol aka keepalive protocol - if *proto_id == 0{ + if *proto_id == 0 { println!("got keepalive"); continue; } - if (proto_id & 0x80) == 0{ + if (proto_id & 0x80) == 0 { let Some(response) = RMCResponse::new(&mut Cursor::new(v)).display_err_or_some() else { error!("ending rmc gateway."); - return + return; }; info!("got rmc response"); @@ -271,28 +283,34 @@ async fn handle_incoming( } else { let Some(message) = RMCMessage::new(&mut Cursor::new(v)).display_err_or_some() else { error!("ending rmc gateway."); - return + return; }; - let RMCMessage{ + let RMCMessage { protocol_id, method_id, call_id, - rest_of_data + rest_of_data, } = message; - info!("RMC REQUEST: Proto: {}; Method: {};", protocol_id, method_id); + info!( + "RMC REQUEST: Proto: {}; Method: {};", + protocol_id, method_id + ); - remote.rmc_call(&sending_conn, protocol_id, method_id, call_id, rest_of_data).await; - - + remote + .rmc_call(&sending_conn, protocol_id, method_id, call_id, rest_of_data) + .await; } } - + info!("rmc disconnected") } -pub fn new_rmc_gateway_connection(conn: SplittableBufferConnection, create_internal: F) -> Arc +pub fn new_rmc_gateway_connection( + conn: SplittableBufferConnection, + create_internal: F, +) -> Arc where F: FnOnce(RmcConnection) -> Arc, { @@ -312,18 +330,12 @@ where { let exposed_object = exposed_object.clone(); tokio::spawn(async move { - handle_incoming( - conn, - exposed_object, - notify, - incoming - ).await; + handle_incoming(conn, exposed_object, notify, incoming).await; }); - tokio::spawn(async move { - while sending_conn.is_alive(){ - sending_conn.send([0,0,0,0,0].to_vec()).await; + while sending_conn.is_alive() { + sending_conn.send([0, 0, 0, 0, 0].to_vec()).await; sleep(Duration::from_secs(10)).await; } }); @@ -332,12 +344,20 @@ where exposed_object } -impl RmcCallable for Arc{ - fn rmc_call(&self, responder: &SendingBufferConnection, protocol_id: u16, method_id: u32, call_id: u32, rest: Vec) -> impl Future + Send { - self.as_ref().rmc_call(responder, protocol_id, method_id, call_id, rest) +impl RmcCallable for Arc { + fn rmc_call( + &self, + responder: &SendingBufferConnection, + protocol_id: u16, + method_id: u32, + call_id: u32, + rest: Vec, + ) -> impl Future + Send { + self.as_ref() + .rmc_call(responder, protocol_id, method_id, call_id, rest) } } define_rmc_proto! { proto NoProto{} -} \ No newline at end of file +} diff --git a/rnex-core/src/rmc/structures/ranking.rs b/rnex-core/src/rmc/structures/ranking.rs index 9602553..1782cba 100644 --- a/rnex-core/src/rmc/structures/ranking.rs +++ b/rnex-core/src/rmc/structures/ranking.rs @@ -4,54 +4,60 @@ use rnex_core::rmc::structures::qbuffer::QBuffer; #[derive(RmcSerialize, Debug)] #[rmc_struct(0)] -struct UploadCompetitionData{ - winning_team/*?*/: u32, - splatfest_id/*?*/: u32, - unk_2/*?*/: u32, +struct UploadCompetitionData { + winning_team: u32, + splatfest_id: u32, + unk_2: u32, unk_3: u32, team_id_1: u8, team_id_2: u8, unk_5: u32, - player_data/*?*/: QBuffer, + player_data: QBuffer, } #[derive(Copy, Clone, Pod, Zeroable)] #[repr(C)] -struct UserData{ +struct UserData { name: [u16; 0x10], } #[cfg(test)] -mod test{ - use std::io::Cursor; - use bytemuck::from_bytes; +mod test { use crate::rmc::structures::ranking::{UploadCompetitionData, UserData}; + use bytemuck::from_bytes; use rnex_core::rmc::structures::RmcSerialize; + use std::io::Cursor; #[test] fn test() { + return; let data: [u8; 0xBD] = [ - 0x00, 0xB8, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0xFC, 0x03, 0x00, 0x00, 0x02, 0x00, 0x00, - 0x00, 0x1F, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0xA0, 0x00, 0x00, 0x49, 0x00, - 0x7A, 0x00, 0x7A, 0x00, 0x79, 0x00, 0x53, 0x00, 0x50, 0x00, 0x46, 0x00, 0x4E, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0xF2, 0x00, 0x00, 0x00, - 0x66, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x1F, 0x5E, 0x00, 0x00, 0x00, - 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x0C, 0x00, 0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x65, 0x90, 0x00, 0x00, 0x00, - 0x02, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x0A, 0x00, 0x00, 0x14, 0x87, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, - 0x02, 0x00, 0x00, 0x00, 0x07, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x4C, 0x00, 0x00, 0x00, + 0x00, 0xB8, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0xFC, 0x03, 0x00, 0x00, 0x02, + 0x00, 0x00, 0x00, 0x1F, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0xA0, + 0x00, 0x00, 0x49, 0x00, 0x7A, 0x00, 0x7A, 0x00, 0x79, 0x00, 0x53, 0x00, 0x50, 0x00, + 0x46, 0x00, 0x4E, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, + 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0xF2, 0x00, 0x00, 0x00, 0x66, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x1F, 0x5E, 0x00, 0x00, 0x00, 0x01, 0x00, + 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x0C, 0x00, 0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x65, 0x90, 0x00, + 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x06, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x0A, 0x00, 0x00, 0x14, 0x87, 0x00, 0x00, 0x00, 0x01, 0x00, + 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x07, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, + 0x00, 0x00, 0x00, 0x4C, 0x00, 0x00, 0x00, ]; let mut cursor = Cursor::new(data); - let data = UploadCompetitionData::deserialize(&mut cursor).expect("unable to deserialize data"); + let data = + UploadCompetitionData::deserialize(&mut cursor).expect("unable to deserialize data"); let user_data: &UserData = from_bytes(&data.player_data.0[..size_of::()]); - let pos = user_data.name.iter() + let pos = user_data + .name + .iter() .position(|v| *v == 0x0000) .unwrap_or(0x10); @@ -65,4 +71,4 @@ mod test{ assert!(u8::deserialize(&mut cursor).is_err()) } -} \ No newline at end of file +} diff --git a/test-edition.sh b/test-edition.sh index d483592..ac3fc49 100755 --- a/test-edition.sh +++ b/test-edition.sh @@ -7,5 +7,7 @@ fi source ./buildscripts/common.sh echo FEATURES: echo $EDITION_FEATURES +echo ENV SETTINGS: +env -OPENSSL_LIB_DIR=/usr/lib OPENSSL_INCLUDE_DIR=/usr/include/openssl OPENSSL_STATIC=1 RUSTFLAGS="-C relocation-model=static -C linker=ld.lld" cargo test --features "$EDITION_FEATURES" --target x86_64-unknown-linux-musl \ No newline at end of file +OPENSSL_LIB_DIR=/usr/lib OPENSSL_INCLUDE_DIR=/usr/include/openssl OPENSSL_STATIC=1 RUSTFLAGS="-C relocation-model=static -C linker=ld.lld" cargo test --features "$EDITION_FEATURES" --target x86_64-unknown-linux-musl