diff --git a/Cargo.lock b/Cargo.lock index 7fabb1b..cc388be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -521,6 +521,7 @@ checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" dependencies = [ "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -543,12 +544,34 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.98", +] + [[package]] name = "futures-sink" version = "0.3.31" @@ -570,6 +593,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-io", + "futures-macro", "futures-sink", "futures-task", "memchr", @@ -1448,7 +1472,7 @@ checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" dependencies = [ "rand_chacha 0.9.0", "rand_core 0.9.3", - "zerocopy 0.8.14", + "zerocopy 0.8.25", ] [[package]] @@ -1866,6 +1890,7 @@ dependencies = [ "bytemuck", "chrono", "dotenv", + "futures", "hex", "hmac", "log", @@ -2656,11 +2681,11 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.14" +version = "0.8.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a367f292d93d4eab890745e75a778da40909cab4d6ff8173693812f79c4a2468" +checksum = "a1702d9583232ddb9174e01bb7c15a2ab8fb1bc6f227aa1233858c351a3ba0cb" dependencies = [ - "zerocopy-derive 0.8.14", + "zerocopy-derive 0.8.25", ] [[package]] @@ -2676,9 +2701,9 @@ dependencies = [ [[package]] name = "zerocopy-derive" -version = "0.8.14" +version = "0.8.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3931cb58c62c13adec22e38686b559c86a30565e16ad6e8510a337cedc611e1" +checksum = "28a6e20d751156648aa063f3800b706ee209a32c0b4d9f24be3d980b01be55ef" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index edbb65b..c384cb3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,10 @@ serde = { version = "1.0.217", features = ["derive"] } async-trait = "0.1.86" paste = "1.0.15" typenum = "1.18.0" +futures = "0.3.31" + + + [build-dependencies] tonic-build = "0.12.3" @@ -49,3 +53,4 @@ tonic-build = "0.12.3" default = ["secure", "auth"] secure = [] auth = [] + diff --git a/macros/src/protos.rs b/macros/src/protos.rs index 554df22..32d04c3 100644 --- a/macros/src/protos.rs +++ b/macros/src/protos.rs @@ -1,7 +1,7 @@ use proc_macro2::{Ident, Span, TokenStream, TokenTree}; use quote::{quote, ToTokens}; -use syn::{LitInt, ReturnType, Token, Type}; -use syn::token::{Brace, Paren, Semi}; +use syn::{LitInt, LitStr, ReturnType, Token, Type}; +use syn::token::{Brace, Bracket, Paren, Semi}; pub struct ProtoMethodData{ pub id: LitInt, @@ -61,9 +61,11 @@ impl RmcProtocolData{ quote!{ &self, data: ::std::vec::Vec }.to_tokens(tokens); }); - quote!{ - -> ::core::result::Result, ErrorCode> - }.to_tokens(tokens); + if self.has_returns { + quote! { + -> ::core::result::Result, ErrorCode> + }.to_tokens(tokens); + } Brace::default().surround(tokens, |tokens|{ quote! { let mut cursor = ::std::io::Cursor::new(data); }.to_tokens(tokens); @@ -73,10 +75,27 @@ impl RmcProtocolData{ let Ok(#param_name) = <#param_type as crate::rmc::structures::RmcSerialize>::deserialize( &mut cursor - ) else { - return Err(crate::rmc::response::ErrorCode::Core_InvalidArgument); - }; - }.to_tokens(tokens) + ) else + }.to_tokens(tokens); + + let error_msg = LitStr::new(&format!("an error occurred whilest deserializing {}", param_name), Span::call_site()); + + if self.has_returns { + quote! { + { + log::error!(#error_msg); + return Err(crate::rmc::response::ErrorCode::Core_InvalidArgument); + }; + }.to_tokens(tokens) + } else { + quote! { + { + log::error!(#error_msg); + return; + }; + }.to_tokens(tokens) + } + } quote!{ @@ -129,13 +148,28 @@ impl RmcProtocolData{ let raw_name = Ident::new(&format!("raw_{}", name), name.span()); + quote!{ #id => self.#raw_name(data).await, }.to_tokens(tokens); } quote!{ - _ => Err(crate::rmc::response::ErrorCode::Core_NotImplemented) + v => }.to_tokens(tokens); + + + + Brace::default().surround(tokens, |tokens|{ + quote!{ + log::error!("(protocol {})unimplemented method id called on protocol: {}", #id, v); + }.to_tokens(tokens); + if self.has_returns { + quote! { + Err(crate::rmc::response::ErrorCode::Core_NotImplemented) + }.to_tokens(tokens); + } + }); + }); Semi::default().to_tokens(tokens); @@ -155,7 +189,7 @@ impl RmcProtocolData{ }); quote!{ - impl RawAuth for T{} + impl #raw_name for T{} }.to_tokens(tokens); } @@ -218,8 +252,17 @@ impl RmcProtocolData{ &#param_name, &mut cursor ) - ).ok_or(crate::rmc::response::ErrorCode::Core_InvalidArgument)?; - }.to_tokens(tokens) + ).ok_or(crate::rmc::response::ErrorCode::Core_InvalidArgument) + }.to_tokens(tokens); + if self.has_returns { + quote! { + ?; + }.to_tokens(tokens) + } else { + quote! { + ; + }.to_tokens(tokens) + } } quote!{ diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..01d0d4d --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,11 @@ +mod endianness; +mod prudp; +pub mod rmc; +//mod protocols; + +mod grpc; +mod kerberos; +mod nex; +mod result; +mod versions; +mod web; diff --git a/src/main.rs b/src/main.rs index 08f3b87..64ee3b1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,6 +9,8 @@ use crate::nex::account::Account; use crate::nex::auth_handler::{AuthHandler, RemoteAuthClientProtocol}; +use crate::nex::remote_console::RemoteConsole; +use crate::nex::user::{RemoteUserProtocol, User}; use crate::prudp::packet::VirtualPort; use crate::prudp::router::Router; use crate::prudp::secure::Secure; @@ -18,10 +20,12 @@ use crate::rmc::protocols::auth::Auth; use crate::rmc::protocols::auth::RawAuth; use crate::rmc::protocols::auth::RawAuthInfo; use crate::rmc::protocols::auth::RemoteAuth; -use crate::rmc::protocols::{new_rmc_gateway_connection, OnlyRemote}; +use crate::rmc::protocols::matchmake_extension::RemoteMatchmakeExtension; +use crate::rmc::protocols::{new_rmc_gateway_connection, OnlyRemote, RemoteInstantiatable}; use crate::rmc::response::ErrorCode; use crate::rmc::structures::any::Any; use crate::rmc::structures::connection_data::ConnectionData; +use crate::rmc::structures::matchmake::{CreateMatchmakeSessionParam, Gathering, MatchmakeParam, MatchmakeSession}; use crate::rmc::structures::qresult::QResult; use chrono::{Local, SecondsFormat}; use log::{error, info}; @@ -35,10 +39,14 @@ use std::marker::PhantomData; use std::net::{Ipv4Addr, SocketAddrV4}; use std::ops::{BitAnd, BitOr}; use std::str::FromStr; +use std::sync::Arc; use std::time::Duration; use std::{env, fs}; +use std::sync::atomic::AtomicU32; use tokio::task::JoinHandle; -use crate::nex::user::User; +use crate::kerberos::KerberosDateTime; +use crate::nex::matchmake::MatchmakeManager; +use crate::rmc::protocols::secure::RemoteSecure; mod endianness; mod prudp; @@ -270,7 +278,9 @@ async fn start_auth() -> JoinHandle<()> { .expect("unable to start router"); let mut socket_secure = router_secure - .add_socket(VirtualPort::new(1, 10), Unsecure("6f599f81")) + .add_socket(VirtualPort::new(1, 10), Unsecure( + "6f599f81" + )) .await .expect("unable to add socket"); @@ -284,10 +294,12 @@ async fn start_auth() -> JoinHandle<()> { info!("new connected user!"); - let _ = new_rmc_gateway_connection(conn, |_| AuthHandler { - destination_server_acct: &SECURE_SERVER_ACCOUNT, - build_name: "branch:origin/project/wup-agmj build:3_8_15_2004_0", - station_url: &SECURE_STATION_URL, + let _ = new_rmc_gateway_connection(conn, |_| { + Arc::new(AuthHandler { + destination_server_acct: &SECURE_SERVER_ACCOUNT, + build_name: "branch:origin/project/wup-agmj build:3_8_15_2004_0", + station_url: &SECURE_STATION_URL, + }) }); } }) @@ -295,6 +307,15 @@ async fn start_auth() -> JoinHandle<()> { async fn start_secure() -> JoinHandle<()> { tokio::spawn(async { + let mmm = Arc::new(MatchmakeManager{ + gid_counter: AtomicU32::new(1), + sessions: Default::default(), + users: Default::default(), + rv_cid_counter: AtomicU32::new(1), + }); + + let web_server = web::start_web(mmm.clone()).await; + let (router_secure, _) = Router::new(SocketAddrV4::new(*OWN_IP_PRIVATE, *SECURE_SERVER_PORT)) .await @@ -303,7 +324,10 @@ async fn start_secure() -> JoinHandle<()> { let mut socket_secure = router_secure .add_socket( VirtualPort::new(1, 10), - Secure("6f599f81", &SECURE_SERVER_ACCOUNT), + Secure( + "6f599f81", + &SECURE_SERVER_ACCOUNT + ), ) .await .expect("unable to add socket"); @@ -318,19 +342,25 @@ async fn start_secure() -> JoinHandle<()> { info!("new connected user on secure :D!"); - let ip = conn.socket_addr.regular_socket_addr; + let ip = conn.socket_addr; let pid = conn.user_id; - let _ = new_rmc_gateway_connection(conn, |_| User { - ip, - pid + let _ = new_rmc_gateway_connection(conn, |r| { + Arc::new_cyclic(|w| User { + ip, + pid, + this: w.clone(), + remote: RemoteConsole::new(r), + station_url: Default::default(), + matchmake_manager: mmm.clone() + }) }); } }) } async fn start_test() { - let addr = SocketAddrV4::new(*OWN_IP_PRIVATE, *AUTH_SERVER_PORT); + let addr = SocketAddrV4::new(*OWN_IP_PRIVATE, *SECURE_SERVER_PORT); let virt_addr = VirtualPort::new(1, 10); let prudp_addr = PRUDPSockAddr::new(addr, virt_addr); @@ -346,15 +376,12 @@ async fn start_test() { let conn = socket_secure.connect(prudp_addr).await.unwrap(); - let remote = - new_rmc_gateway_connection(conn, |r| OnlyRemote::::new(r)); + let remote = new_rmc_gateway_connection(conn, |r| { + Arc::new(OnlyRemote::::new(r)) + }); - let v = remote - .login_ex("1469690705".to_string(), Any::default()) - .await - .unwrap(); - - println!("got it"); + tokio::time::sleep(Duration::from_secs(1)).await; + let urls = vec!["prudp:/address=192.168.178.45;port=60146;Pl=2;natf=0;natm=0;pmp=0;sid=15;upnp=0".to_owned()]; } async fn start_servers() { @@ -362,15 +389,16 @@ async fn start_servers() { let auth_server = start_auth().await; #[cfg(feature = "secure")] let secure_server = start_secure().await; - //let web_server = web::start_web().await; - //tokio::time::sleep(Duration::from_secs(1)).await; + + tokio::time::sleep(Duration::from_secs(1)).await; //start_test().await; + + #[cfg(feature = "auth")] auth_server.await.expect("auth server crashed"); #[cfg(feature = "secure")] secure_server.await.expect("auth server crashed"); - //web_server.await.expect("webserver crashed"); } diff --git a/src/nex-implementation/auth/method_login.rs b/src/nex-implementation/auth/method_login.rs deleted file mode 100644 index aface70..0000000 --- a/src/nex-implementation/auth/method_login.rs +++ /dev/null @@ -1,26 +0,0 @@ -use std::io::Cursor; -use std::sync::Arc; -use log::error; -use tokio::sync::Mutex; -use crate::protocols::auth::AuthProtocolConfig; -use crate::rmc::message::RMCMessage; -use crate::rmc::response::{ErrorCode, RMCResponseResult}; -use crate::rmc::structures::RmcSerialize; - -pub async fn login(rmcmessage: &RMCMessage, _name: &str) -> RMCResponseResult{ - - - rmcmessage.error_result_with_code(ErrorCode::Core_NotImplemented) -} - -pub async fn login_raw_params(rmcmessage: &RMCMessage, _: &Arc, _: &Arc>, _data: AuthProtocolConfig) -> RMCResponseResult{ - let mut reader = Cursor::new(&rmcmessage.rest_of_data); - - let Ok(str) = String::deserialize(&mut reader) else { - error!("error reading packet"); - return rmcmessage.error_result_with_code(ErrorCode::Core_InvalidArgument); - }; - - - login(rmcmessage, &str).await -} \ No newline at end of file diff --git a/src/nex-implementation/auth/method_login_ex.rs b/src/nex-implementation/auth/method_login_ex.rs deleted file mode 100644 index 413ab82..0000000 --- a/src/nex-implementation/auth/method_login_ex.rs +++ /dev/null @@ -1,81 +0,0 @@ -use std::io::{Cursor, Write}; -use std::sync::Arc; -use bytemuck::bytes_of; -use log::{error}; -use tokio::sync::Mutex; -use crate::grpc::account; -use crate::kerberos::KerberosDateTime; -use crate::protocols::auth::AuthProtocolConfig; -use crate::protocols::auth::ticket_generation::generate_ticket; -use crate::rmc; -use crate::rmc::message::RMCMessage; -use crate::rmc::response::{ErrorCode, RMCResponseResult}; -use crate::rmc::structures::{RmcSerialize}; -use crate::rmc::structures::any::Any; -use crate::rmc::structures::qresult::QResult; - -pub async fn login_ex(rmcmessage: &RMCMessage, proto_data: AuthProtocolConfig, pid: u32) -> RMCResponseResult{ - // todo: figure out how the AuthenticationInfo struct works, parse it and validate login info - - let Ok(mut client) = account::Client::new().await else { - return rmcmessage.error_result_with_code(ErrorCode::Core_Exception); - }; - - let Ok(passwd) = client.get_nex_password(pid).await else{ - return rmcmessage.error_result_with_code(ErrorCode::Core_Exception); - }; - - let source_login_data = (pid, passwd); - let destination_login_data = proto_data.secure_server_account.get_login_data(); - - let ticket = generate_ticket(source_login_data, destination_login_data); - - let result = QResult::success(ErrorCode::Core_Unknown); - - let connection_data = rmc::structures::connection_data::ConnectionData{ - station_url: proto_data.station_url, - special_station_url: "", - date_time: KerberosDateTime::now(), - special_protocols: Vec::new() - }; - - let mut response: Vec = Vec::new(); - - result.serialize(&mut response).expect("failed serializing result"); - response.write_all(bytes_of(&source_login_data.0)).expect("failed writing pid"); - ticket.serialize(&mut response).expect("failed serializing ticket"); - connection_data.serialize(&mut response).expect("failed writing connection data"); - proto_data.build_name.serialize(&mut response).expect("failed writing build name"); - - return rmcmessage.success_with_data(response); -} - -pub async fn login_ex_raw_params(rmcmessage: &RMCMessage, _: &Arc, _: &Arc>, data: AuthProtocolConfig) -> RMCResponseResult{ - let mut reader = Cursor::new(&rmcmessage.rest_of_data); - - let Ok(str) = String::deserialize(&mut reader) else { - error!("error reading packet"); - return rmcmessage.error_result_with_code(ErrorCode::Core_InvalidArgument); - }; - - let Ok(any) = Any::deserialize(&mut reader) else { - error!("error reading packet"); - return rmcmessage.error_result_with_code(ErrorCode::Core_InvalidArgument); - }; - - match any.name.as_ref(){ - "AuthenticationInfo" => { - - } - v => { - error!("error reading packet: invalid structure type: {}", v); - return rmcmessage.error_result_with_code(ErrorCode::Core_InvalidArgument); - } - } - - let Ok(pid) = str.parse() else { - return rmcmessage.error_result_with_code(ErrorCode::Core_InvalidArgument); - }; - - login_ex(rmcmessage, data, pid).await -} \ No newline at end of file diff --git a/src/nex-implementation/auth/method_request_ticket.rs b/src/nex-implementation/auth/method_request_ticket.rs deleted file mode 100644 index aa89448..0000000 --- a/src/nex-implementation/auth/method_request_ticket.rs +++ /dev/null @@ -1,51 +0,0 @@ -use std::io::Cursor; -use std::sync::Arc; -use tokio::sync::Mutex; -use crate::endianness::{IS_BIG_ENDIAN, ReadExtensions}; -use crate::protocols::auth::{AuthProtocolConfig, get_login_data_by_pid}; -use crate::protocols::auth::ticket_generation::generate_ticket; -use crate::rmc::message::RMCMessage; -use crate::rmc::response::{ErrorCode, RMCResponseResult}; -use crate::rmc::response::ErrorCode::Core_Unknown; -use crate::rmc::structures::qresult::QResult; -use crate::rmc::structures::RmcSerialize; - -pub async fn request_ticket(rmcmessage: &RMCMessage, data: AuthProtocolConfig, source_pid: u32, destination_pid: u32) -> RMCResponseResult{ - let Some(source_login_data) = get_login_data_by_pid(source_pid).await else { - return rmcmessage.error_result_with_code(ErrorCode::Core_Exception); - }; - - let desgination_login_data = if destination_pid == data.secure_server_account.pid{ - data.secure_server_account.get_login_data() - } else { - let Some(login) = get_login_data_by_pid(destination_pid).await else { - return rmcmessage.error_result_with_code(ErrorCode::Core_Exception); - }; - login - }; - - let result = QResult::success(Core_Unknown); - - let ticket = generate_ticket(source_login_data, desgination_login_data); - - let mut response: Vec = Vec::new(); - - result.serialize(&mut response).expect("failed serializing result"); - ticket.serialize(&mut response).expect("failed serializing ticket"); - - rmcmessage.success_with_data(response) -} - -pub async fn request_ticket_raw_params(rmcmessage: &RMCMessage, _: &Arc, _: &Arc>, data: AuthProtocolConfig) -> RMCResponseResult{ - let mut reader = Cursor::new(&rmcmessage.rest_of_data); - - let Ok(source_pid) = reader.read_struct(IS_BIG_ENDIAN) else { - return rmcmessage.error_result_with_code(ErrorCode::Core_InvalidArgument); - }; - - let Ok(destination_pid) = reader.read_struct(IS_BIG_ENDIAN) else { - return rmcmessage.error_result_with_code(ErrorCode::Core_InvalidArgument); - }; - - request_ticket(rmcmessage, data, source_pid, destination_pid).await -} \ No newline at end of file diff --git a/src/nex-implementation/auth/mod.rs b/src/nex-implementation/auth/mod.rs deleted file mode 100644 index 1cc4f87..0000000 --- a/src/nex-implementation/auth/mod.rs +++ /dev/null @@ -1,38 +0,0 @@ -mod method_login_ex; -mod method_login; -mod ticket_generation; -mod method_request_ticket; - -use crate::define_protocol; -use crate::grpc::account; -use crate::nex::account::Account; -use crate::protocols::auth::method_login::login_raw_params; -use crate::protocols::auth::method_login_ex::login_ex_raw_params; -use crate::protocols::auth::method_request_ticket::request_ticket_raw_params; - -#[derive(Copy, Clone)] -pub struct AuthProtocolConfig { - pub secure_server_account: &'static Account, - pub build_name: &'static str, - pub station_url: &'static str -} - -define_protocol!{ - 10(proto_data: AuthProtocolConfig) => { - 0x01 => login_raw_params, - 0x02 => login_ex_raw_params, - 0x03 => request_ticket_raw_params - } -} - -async fn get_login_data_by_pid(pid: u32) -> Option<(u32, [u8; 16])> { - let Ok(mut client) = account::Client::new().await else { - return None - }; - - let Ok(passwd) = client.get_nex_password(pid).await else{ - return None - }; - - Some((pid, passwd)) -} \ No newline at end of file diff --git a/src/nex-implementation/auth/ticket_generation.rs b/src/nex-implementation/auth/ticket_generation.rs deleted file mode 100644 index f2219de..0000000 --- a/src/nex-implementation/auth/ticket_generation.rs +++ /dev/null @@ -1,19 +0,0 @@ -use crate::kerberos; -use crate::kerberos::{derive_key, Ticket}; - - -pub fn generate_ticket(source_act_login_data: (u32, [u8;16]), dest_act_login_data: (u32, [u8;16])) -> Box<[u8]>{ - let source_key = derive_key(source_act_login_data.0, source_act_login_data.1); - let dest_key = derive_key(dest_act_login_data.0, dest_act_login_data.1); - - let internal_data = kerberos::TicketInternalData::new(source_act_login_data.0); - - let encrypted_inner = internal_data.encrypt(dest_key); - let encrypted_session_ticket = Ticket{ - pid: dest_act_login_data.0, - session_key: internal_data.session_key, - }.encrypt(source_key, &encrypted_inner); - - - encrypted_session_ticket -} \ No newline at end of file diff --git a/src/nex-implementation/matchmake/method_unregister_gathering.rs b/src/nex-implementation/matchmake/method_unregister_gathering.rs deleted file mode 100644 index 22ec79f..0000000 --- a/src/nex-implementation/matchmake/method_unregister_gathering.rs +++ /dev/null @@ -1,35 +0,0 @@ -use std::io::Cursor; -use std::sync::Arc; -use tokio::sync::{Mutex, RwLock}; -use crate::protocols::matchmake_common::MatchmakeData; -use crate::rmc::message::RMCMessage; -use crate::rmc::response::{ErrorCode, RMCResponseResult}; -use crate::rmc::structures::qresult::QResult; -use crate::rmc::structures::RmcSerialize; - -pub async fn unregister_gathering(rmcmessage: &RMCMessage, gid: u32, data: Arc>) -> RMCResponseResult{ - let mut rd = data.write().await; - - rd.matchmake_sessions.remove(&gid); - - let result = QResult::success(ErrorCode::Core_Unknown); - - let mut response = Vec::new(); - - result.serialize(&mut response).expect("aaa"); - - rmcmessage.success_with_data(response) -} - -pub async fn unregister_gathering_raw_params(rmcmessage: &RMCMessage, _: &Arc, _: &Arc>, data: Arc>) -> RMCResponseResult{ - let mut reader = Cursor::new(&rmcmessage.rest_of_data); - - let Ok(gid) = u32::deserialize(&mut reader) else { - return rmcmessage.error_result_with_code(ErrorCode::Core_InvalidArgument); - }; - - - - - unregister_gathering(rmcmessage, gid, data).await -} \ No newline at end of file diff --git a/src/nex-implementation/matchmake/mod.rs b/src/nex-implementation/matchmake/mod.rs deleted file mode 100644 index cc85e23..0000000 --- a/src/nex-implementation/matchmake/mod.rs +++ /dev/null @@ -1,13 +0,0 @@ -mod method_unregister_gathering; - -use std::sync::Arc; -use tokio::sync::RwLock; -use crate::define_protocol; -use crate::protocols::matchmake::method_unregister_gathering::unregister_gathering_raw_params; -use crate::protocols::matchmake_common::MatchmakeData; - -define_protocol!{ - 21(matchmake_data: Arc>) => { - 2 => unregister_gathering_raw_params - } -} \ No newline at end of file diff --git a/src/nex-implementation/matchmake_common/mod.rs b/src/nex-implementation/matchmake_common/mod.rs deleted file mode 100644 index c4265d3..0000000 --- a/src/nex-implementation/matchmake_common/mod.rs +++ /dev/null @@ -1,136 +0,0 @@ -use std::collections::{BTreeMap}; -use std::sync::Arc; -use log::error; -use rand::random; -use tokio::sync::{Mutex, RwLock}; -use crate::kerberos::KerberosDateTime; -use crate::protocols::notification::Notification; -use crate::rmc::structures::matchmake::{Gathering, MatchmakeParam, MatchmakeSession}; -use crate::rmc::structures::variant::Variant; - -#[derive(Default, Debug)] -pub struct ExtendedMatchmakeSession{ - pub session: MatchmakeSession, - pub connected_players: Vec>>, -} - -pub struct MatchmakeData{ - pub(crate) matchmake_sessions: BTreeMap>> -} - -impl ExtendedMatchmakeSession{ - pub async fn from_matchmake_session(session: MatchmakeSession, host: &Mutex) -> Self{ - let host = host.lock().await; - - let ConnectionData{ - active_connection_data, - .. - } = &*host; - - let Some(active_connection_data) = active_connection_data else{ - return Default::default(); - }; - - let ActiveConnectionData{ - active_secure_connection_data, - .. - } = active_connection_data; - - let Some(active_secure_connection_data) = active_secure_connection_data else{ - return Default::default(); - }; - - - let mm_session = MatchmakeSession{ - gathering: Gathering{ - self_gid: 1, - owner_pid: active_secure_connection_data.pid, - host_pid: active_secure_connection_data.pid, - ..session.gathering.clone() - }, - datetime: KerberosDateTime::now(), - session_key: (0..32).map(|_| random()).collect(), - matchmake_param: MatchmakeParam{ - params: vec![ - ("@SR".to_owned(), Variant::Bool(true)), - ("@GIR".to_owned(), Variant::SInt64(3)) - ] - }, - system_password_enabled: false, - ..session - }; - - Self{ - session: mm_session, - connected_players: Default::default() - } - } - - pub async fn add_player(&mut self, socket: &SocketData, conn: Arc>, join_msg: String) { - let locked = conn.lock().await; - - let Some(joining_pid) = locked.active_connection_data.as_ref() - .map(|c| - c.active_secure_connection_data.as_ref() - .map(|c| c.pid) - ).flatten() else { - error!("tried to add player without secure connection"); - return - }; - - drop(locked); - - self.connected_players.push(conn); - self.session.participation_count = self.connected_players.len() as u32; - - - for other_connection in &self.connected_players{ - let mut conn = other_connection.lock().await; - - - let Some(other_pid) = conn.active_connection_data.as_ref() - .map(|c| - c.active_secure_connection_data.as_ref() - .map(|c| c.pid - ) - ).flatten() else { - error!("tried to send connection notification to player secure connection"); - return - }; - - /*if other_pid == self.session.gathering.owner_pid && - joining_pid == self.session.gathering.owner_pid{ - continue; - }*/ - - conn.send_notification(socket, Notification{ - pid_source: joining_pid, - notif_type: 3001, - param_1: self.session.gathering.self_gid, - param_2: other_pid, - str_param: join_msg.clone(), - param_3: self.session.participation_count - }).await; - } - } -} -pub async fn add_matchmake_session(mm_data: Arc>,session: ExtendedMatchmakeSession) -> Arc> { - let gid = session.session.gathering.self_gid; - - let mut mm_data = mm_data.write().await; - - let session = Arc::new(Mutex::new(session)); - - mm_data.matchmake_sessions.insert(gid, session.clone()); - - session -} - -impl MatchmakeData { - - - - pub async fn try_find_session_with_criteria(&self, ) -> Option>>{ - None - } -} \ No newline at end of file diff --git a/src/nex-implementation/matchmake_extension/method_auto_matchmake_with_param_postpone.rs b/src/nex-implementation/matchmake_extension/method_auto_matchmake_with_param_postpone.rs deleted file mode 100644 index 4c5d70a..0000000 --- a/src/nex-implementation/matchmake_extension/method_auto_matchmake_with_param_postpone.rs +++ /dev/null @@ -1,98 +0,0 @@ -use std::io::Cursor; -use std::sync::Arc; -use std::time::Duration; -use chrono::SecondsFormat::Millis; -use log::info; -use rand::random; -use tokio::sync::{Mutex, RwLock}; -use tokio::time::sleep; -use crate::protocols::matchmake_common::{ExtendedMatchmakeSession, MatchmakeData}; -use crate::rmc::message::RMCMessage; -use crate::rmc::response::{ErrorCode, RMCResponseResult}; -use crate::rmc::structures::matchmake::{AutoMatchmakeParam, MatchmakeSession}; -use crate::rmc::structures::RmcSerialize; - - - -pub async fn auto_matchmake_with_param_postpone( - rmcmessage: &RMCMessage, - conn: &Arc>, - socket: &Arc, - mm_data: Arc>, - auto_matchmake_param: AutoMatchmakeParam -) -> RMCResponseResult{ - //println!("auto_matchmake_with_param_postpone: {:?}", auto_matchmake_param); - let locked_conn = conn.lock().await; - let Some(secure_conn) = - locked_conn.active_connection_data.as_ref().map(|a| a.active_secure_connection_data.as_ref()).flatten() else { - return rmcmessage.error_result_with_code(ErrorCode::Core_Exception); - }; - - let pid = secure_conn.pid; - - drop(locked_conn); - - let mm_data_read = mm_data.read().await; - //todo: there is a bit of a race condition here, i dont have any idea on how to fix it though... - let session = if let Some(session) = mm_data_read.try_find_session_with_criteria().await{ - session - } else { - // drop it first so that we dont cause a deadlock, also drop it right here so we dont hold - // up anything else unnescesarily - drop(mm_data_read); - - let session = - ExtendedMatchmakeSession::from_matchmake_session(auto_matchmake_param.matchmake_session, &conn).await; - - let gid = session.session.gathering.self_gid; - - let mut mm_data = mm_data.write().await; - - let session = Arc::new(Mutex::new(session)); - - mm_data.matchmake_sessions.insert(gid, session.clone()); - - session - }; - - let mut locked_session = session.lock().await; - - //todo: refactor so that this works - { - let session = session.clone(); - let socket = socket.clone(); - let connection = conn.clone(); - let join_msg = auto_matchmake_param.join_message.clone(); - tokio::spawn(async move{ - sleep(Duration::from_millis(500)).await; - println!("adding player"); - let mut session = session.lock().await; - session.add_player(&socket, connection, join_msg).await; - }); - } - - info!("new session: {:?}", locked_session); - - let mut response = Vec::new(); - - locked_session.session.serialize(&mut response).expect("unable to serialize matchmake session"); - - rmcmessage.success_with_data(response) -} - -pub async fn auto_matchmake_with_param_postpone_raw_params( - rmcmessage: &RMCMessage, - socket: &Arc, - connection_data: &Arc>, - data: Arc> -) -> RMCResponseResult{ - let mut reader = Cursor::new(&rmcmessage.rest_of_data); - - let Ok(matchmake_param) = AutoMatchmakeParam::deserialize(&mut reader) else { - return rmcmessage.error_result_with_code(ErrorCode::Core_InvalidArgument); - }; - - - - auto_matchmake_with_param_postpone(rmcmessage, connection_data, socket, data, matchmake_param).await -} \ No newline at end of file diff --git a/src/nex-implementation/matchmake_extension/method_create_matchmake_session_with_param.rs b/src/nex-implementation/matchmake_extension/method_create_matchmake_session_with_param.rs deleted file mode 100644 index 438a9ed..0000000 --- a/src/nex-implementation/matchmake_extension/method_create_matchmake_session_with_param.rs +++ /dev/null @@ -1,87 +0,0 @@ -use std::io::Cursor; -use std::sync::Arc; -use std::time::Duration; -use log::info; -use tokio::sync::{Mutex, RwLock}; -use tokio::time::sleep; -use crate::protocols::matchmake_common::{add_matchmake_session, ExtendedMatchmakeSession, MatchmakeData}; -use crate::protocols::matchmake_extension::method_auto_matchmake_with_param_postpone::auto_matchmake_with_param_postpone; -use crate::rmc::message::RMCMessage; -use crate::rmc::response::{ErrorCode, RMCResponseResult}; -use crate::rmc::structures::matchmake::{AutoMatchmakeParam, CreateMatchmakeSessionParam}; -use crate::rmc::structures::RmcSerialize; - -pub async fn create_matchmake_session_with_param( - rmcmessage: &RMCMessage, - conn: &Arc>, - socket: &Arc, - mm_data: Arc>, - create_matchmake_session: CreateMatchmakeSessionParam -) -> RMCResponseResult { - - let mut session = - ExtendedMatchmakeSession::from_matchmake_session(create_matchmake_session.matchmake_session, &conn).await; - - session.session.participation_count = create_matchmake_session.participation_count as u32; - - let session = add_matchmake_session(mm_data, session).await; - - let mut session = session.lock().await; - - session.add_player(&socket, conn.clone(), create_matchmake_session.join_message).await; - - - - let mut response = Vec::new(); - - - session.session.serialize(&mut response).expect("unable to serialize session"); - - println!("{}", hex::encode(&response)); - - - - rmcmessage.success_with_data(response) -} - -pub async fn create_matchmake_session_with_param_raw_params( - rmcmessage: &RMCMessage, - socket: &Arc, - connection_data: &Arc>, - data: Arc> -) -> RMCResponseResult{ - let mut reader = Cursor::new(&rmcmessage.rest_of_data); - - let Ok(matchmake_param) = CreateMatchmakeSessionParam::deserialize(&mut reader) else { - return rmcmessage.error_result_with_code(ErrorCode::Core_InvalidArgument); - }; - - create_matchmake_session_with_param(rmcmessage, connection_data, socket, data, matchmake_param).await -} - -#[cfg(test)] -mod test{ - use std::io::Cursor; - use crate::prudp::packet::PRUDPPacket; - use crate::rmc::message::RMCMessage; - use crate::rmc::structures::matchmake::MatchmakeSession; - use crate::rmc::structures::RmcSerialize; - - #[test] - fn test(){ - let data = hex::decode("ead001030000a1af12001800050002010000000000000000000000000000000000").unwrap(); - - let packet = PRUDPPacket::new(&mut Cursor::new(data)).unwrap(); - - println!("{:?}", packet); - } - - #[test] - fn test_2(){ - let data = hex::decode("250000008e0100000001000000001700000051b39957b90b00000100000051b3995701000001000000").unwrap(); - - let msg = RMCMessage::new(&mut Cursor::new(data)).unwrap(); - - println!("{:?}", msg) - } -} \ No newline at end of file diff --git a/src/nex-implementation/matchmake_extension/method_get_playing_session.rs b/src/nex-implementation/matchmake_extension/method_get_playing_session.rs deleted file mode 100644 index d88a6f5..0000000 --- a/src/nex-implementation/matchmake_extension/method_get_playing_session.rs +++ /dev/null @@ -1,34 +0,0 @@ -use std::io::Cursor; -use std::sync::Arc; -use log::info; -use tokio::sync::{Mutex, RwLock}; -use crate::protocols::matchmake_common::MatchmakeData; -use crate::rmc::message::RMCMessage; -use crate::rmc::response::{ErrorCode, RMCResponseResult}; -use crate::rmc::structures::RmcSerialize; - -type PIDList = Vec; - -async fn get_playing_session(rmcmessage: &RMCMessage, _data: Arc>) -> RMCResponseResult { - //todo: propperly implement this - - let cheeseburger = PIDList::new(); - - let mut vec = Vec::new(); - - cheeseburger.serialize(&mut vec).expect("somehow unable to write cheeseburger"); - - rmcmessage.success_with_data(vec) -} - -pub async fn get_playing_session_raw_params(rmcmessage: &RMCMessage, _: &Arc, _: &Arc>, data: Arc>) -> RMCResponseResult{ - let mut reader = Cursor::new(&rmcmessage.rest_of_data); - - let Ok(list) = PIDList::deserialize(&mut reader) else { - return rmcmessage.error_result_with_code(ErrorCode::FPD_FriendNotExists); - }; - - info!("get_playing_session got called with {:?}", list); - - get_playing_session(rmcmessage, data).await -} \ No newline at end of file diff --git a/src/nex-implementation/matchmake_extension/mod.rs b/src/nex-implementation/matchmake_extension/mod.rs deleted file mode 100644 index ddca083..0000000 --- a/src/nex-implementation/matchmake_extension/mod.rs +++ /dev/null @@ -1,19 +0,0 @@ -mod method_get_playing_session; -mod method_auto_matchmake_with_param_postpone; -mod method_create_matchmake_session_with_param; - -use std::sync::Arc; -use tokio::sync::{RwLock}; -use crate::define_protocol; -use crate::protocols::matchmake_common::MatchmakeData; -use method_get_playing_session::get_playing_session_raw_params; -use method_auto_matchmake_with_param_postpone::auto_matchmake_with_param_postpone_raw_params; -use crate::protocols::matchmake_extension::method_create_matchmake_session_with_param::create_matchmake_session_with_param_raw_params; - -define_protocol!{ - 109(matchmake_data: Arc>) => { - 16 => get_playing_session_raw_params, - 38 => create_matchmake_session_with_param_raw_params, - 40 => auto_matchmake_with_param_postpone_raw_params - } -} \ No newline at end of file diff --git a/src/nex-implementation/mod.rs b/src/nex-implementation/mod.rs deleted file mode 100644 index 16bf0bf..0000000 --- a/src/nex-implementation/mod.rs +++ /dev/null @@ -1,118 +0,0 @@ -use std::env; -use std::future::Future; -use std::pin::Pin; -use std::sync::Arc; -use log::warn; -use once_cell::sync::Lazy; -use tokio::sync::Mutex; -use crate::grpc; -use crate::rmc::message::RMCMessage; -use crate::rmc::response::{ErrorCode, RMCResponse}; - - -pub mod auth; -pub mod server; -pub mod secure; -pub mod matchmake_extension; -pub mod matchmake_common; -pub mod matchmake; -pub mod notification; -pub mod nat_traversal; - -static IS_MAINTENANCE: Lazy = Lazy::new(|| { - env::var("IS_MAINTENANCE") - .ok() - .map(|v| v.parse().expect("IS_MAINTENANCE should be a boolean value")) - .unwrap_or(false) -}); -static BYPASS_LEVEL: Lazy = Lazy::new(|| { - env::var("MAINTENANCE_BYPASS_MINIMUM_ACCESS_LEVEL") - .ok() - .map(|v| v.parse().expect("IS_MAINTENANCE should be a boolean value")) - .unwrap_or(3) -}); - - -pub fn block_if_maintenance<'a>(rmcmessage: &'a RMCMessage, _: &'a Arc , conn: &'a Arc>) -> Pin> + Send + 'a)>> { - Box::pin(async move { - let conn = conn.lock().await; - - if let Some(active_conn) = conn.active_connection_data.as_ref() { - if let Some(secure_conn) = active_conn.active_secure_connection_data.as_ref() { - if let Ok(mut client) = grpc::account::Client::new().await { - if let Ok(client_data) = client.get_user_data(secure_conn.pid).await{ - if client_data.access_level >= *BYPASS_LEVEL{ - return None; - } - } - } - } - } - - - warn!("login attempted whilest servers are in maintenance"); - - if *IS_MAINTENANCE { - Some(RMCResponse { - protocol_id: rmcmessage.protocol_id as u8, - response_result: rmcmessage.error_result_with_code(ErrorCode::RendezVous_GameServerMaintenance), - }) - } else { - None - } - }) -} - -#[macro_export] -macro_rules! define_protocol { - ($id:literal ($($varname:ident : $ty:ty),*) => {$($func_id:literal => $func:path),*} ) => { - #[allow(unused_parens)] - async fn protocol (rmcmessage: &crate::RMCMessage, socket: &::std::sync::Arc, connection: &::std::sync::Arc<::tokio::sync::Mutex>, $($varname : $ty),*) -> Option{ - if rmcmessage.protocol_id != $id{ - return None; - } - - let self_data: ( $( $ty ),* ) = ($( $varname ),*); - - let response_result = match rmcmessage.method_id{ - $( - $func_id => $func ( rmcmessage, socket, connection, self_data).await, - )* - _ => { - log::error!("invalid method id sent to protocol {}: {:?}", $id, rmcmessage.method_id); - return Some( - crate::rmc::response::RMCResponse{ - protocol_id: $id, - response_result: rmcmessage.error_result_with_code(crate::rmc::response::ErrorCode::Core_NotImplemented) - } - ); - } - }; - - Some(crate::rmc::response::RMCResponse{ - protocol_id: $id, - response_result - }) - } - #[allow(unused_parens)] - pub fn bound_protocol($($varname : $ty,)*) -> Box Fn(&'message_lifetime crate::RMCMessage, &'message_lifetime ::std::sync::Arc, &'message_lifetime ::std::sync::Arc<::tokio::sync::Mutex>) - -> ::std::pin::Pin> + Send + 'message_lifetime>> + Send + Sync>{ - Box::new( - move |v, s, cd| { - Box::pin({ - $( - let $varname = $varname.clone(); - )* - - async move { - $( - let $varname = $varname.clone(); - )* - protocol(v, s, cd, $($varname,)*).await - } - }) - } - ) - } - }; -} \ No newline at end of file diff --git a/src/nex-implementation/nat_traversal/method_report_nat_properties.rs b/src/nex-implementation/nat_traversal/method_report_nat_properties.rs deleted file mode 100644 index 7d01740..0000000 --- a/src/nex-implementation/nat_traversal/method_report_nat_properties.rs +++ /dev/null @@ -1,29 +0,0 @@ -use std::io::Cursor; -use std::sync::Arc; -use std::time::Duration; -use tokio::sync::{Mutex, RwLock}; -use tokio::time::sleep; -use crate::protocols::matchmake_common::MatchmakeData; -use crate::rmc::message::RMCMessage; -use crate::rmc::response::{ErrorCode, RMCResponseResult}; -use crate::rmc::structures::matchmake::CreateMatchmakeSessionParam; - -pub async fn report_nat_properties( - rmcmessage: &RMCMessage, - socket: &Arc, - connection_data: &Arc>, -) -> RMCResponseResult{ - sleep(Duration::from_millis(50)).await; - rmcmessage.success_with_data(Vec::new()) -} - -pub async fn report_nat_properties_raw_params( - rmcmessage: &RMCMessage, - socket: &Arc, - connection_data: &Arc>, - _: () -) -> RMCResponseResult{ - let mut reader = Cursor::new(&rmcmessage.rest_of_data); - - report_nat_properties(rmcmessage, socket, connection_data).await -} \ No newline at end of file diff --git a/src/nex-implementation/nat_traversal/mod.rs b/src/nex-implementation/nat_traversal/mod.rs deleted file mode 100644 index 48e59e6..0000000 --- a/src/nex-implementation/nat_traversal/mod.rs +++ /dev/null @@ -1,10 +0,0 @@ -mod method_report_nat_properties; - -use crate::define_protocol; -use crate::protocols::nat_traversal::method_report_nat_properties::report_nat_properties_raw_params; - -define_protocol!{ - 3() => { - 5 => report_nat_properties_raw_params - } -} \ No newline at end of file diff --git a/src/nex-implementation/notification/mod.rs b/src/nex-implementation/notification/mod.rs deleted file mode 100644 index 50cc992..0000000 --- a/src/nex-implementation/notification/mod.rs +++ /dev/null @@ -1,159 +0,0 @@ -use macros::RmcSerialize; -use rand::random; -use crate::prudp::packet::{PRUDPHeader, PRUDPPacket, PacketOption, TypesFlags}; -use crate::prudp::packet::flags::{NEED_ACK, RELIABLE}; -use crate::prudp::packet::types::DATA; -use crate::rmc::message::RMCMessage; -use crate::rmc::structures::RmcSerialize; - -#[derive(Debug, Eq, PartialEq, RmcSerialize)] -#[rmc_struct(0)] -pub struct Notification{ - pub pid_source: u32, - pub notif_type: u32, - pub param_1: u32, - pub param_2: u32, - pub str_param: String, - pub param_3: u32, -} - - -impl ConnectionData{ - pub async fn send_notification(&mut self, socket: &SocketData, notif: Notification){ - println!("sending notification"); - - let mut data = Vec::new(); - - notif.serialize(&mut data).expect("unable to write"); - - let message = RMCMessage{ - protocol_id: 14, - method_id: 1, - call_id: 1, - rest_of_data: data - }; - - println!("notif: {}", hex::encode(message.to_data())); - - - let mut prudp_packet = PRUDPPacket{ - header: PRUDPHeader{ - types_and_flags: TypesFlags::default().types(DATA).flags(NEED_ACK | RELIABLE), - source_port: socket.get_virual_port(), - destination_port: self.sock_addr.virtual_port, - ..Default::default() - }, - options: vec![ - PacketOption::FragmentId(0), - ], - payload: message.to_data(), - packet_signature: [0;16] - }; - - self.finish_and_send_packet_to(socket, prudp_packet).await; - } -} - -#[cfg(test)] -mod test{ - use std::io::Cursor; - use rand::random; - use crate::protocols::notification::Notification; - use crate::prudp::packet::{PRUDPHeader, PRUDPPacket, PacketOption, TypesFlags}; - use crate::prudp::packet::flags::{NEED_ACK, RELIABLE}; - use crate::prudp::packet::types::DATA; - use crate::rmc::message::RMCMessage; - use crate::rmc::structures::RmcSerialize; - - #[test] - fn test(){ - let data = hex::decode("ead001032900a1af62000000000000000000000000000000000000000000020100250000000e57238a6601000000001700000051b39957b90b00003661636851b3995701000001000000").unwrap(); - - - let packet = PRUDPPacket::new(&mut Cursor::new(data)).expect("invalid packet"); - - println!("{:?}", packet); - - let rmc = RMCMessage::new(&mut Cursor::new(packet.payload)).expect("invalid rmc message"); - - println!("{:?}", rmc); - - let notif = Notification::deserialize(&mut Cursor::new(rmc.rest_of_data)).expect("invalid notification"); - - println!("{:?}", notif); - } - #[test] - fn test2(){ - - let data = hex::decode("250000000e57b6801001000000001700000051b39957b90b0000248a5a9851b3995701000001000000").unwrap(); - //let packet = PRUDPPacket::new(&mut Cursor::new(data)).expect("invalid packet"); - - //println!("{:?}", packet); - - let rmc = RMCMessage::new(&mut Cursor::new(data)).expect("invalid rmc message"); - - println!("{:?}", rmc); - - let notif = Notification::deserialize(&mut Cursor::new(rmc.rest_of_data)).expect("invalid notification"); - - println!("{:?}", notif); - } - - #[test] - fn test_rmc_serialization(){ - let notif = Notification{ - pid_source: random(), - notif_type: random(), - param_1: random(), - param_2: random(), - str_param: "".to_string(), - param_3: random(), - }; - - let mut notif_data = Vec::new(); - - notif.serialize(&mut notif_data).unwrap(); - - let message = RMCMessage{ - protocol_id: 14, - method_id: 1, - call_id: random(), - rest_of_data: notif_data - }; - - let mut prudp_packet = PRUDPPacket{ - header: PRUDPHeader{ - ..Default::default() - }, - options: vec![ - PacketOption::FragmentId(0), - ], - payload: message.to_data(), - packet_signature: [0;16] - }; - - prudp_packet.set_sizes(); - - - - let mut packet_data: Vec = Vec::new(); - - prudp_packet.write_to(&mut packet_data).expect("what"); - - let packet_deserialized = PRUDPPacket::new(&mut Cursor::new(packet_data)).unwrap(); - - assert_eq!(prudp_packet, packet_deserialized); - - let message_deserialized = RMCMessage::new(&mut Cursor::new(packet_deserialized.payload)).unwrap(); - - assert_eq!(message, message_deserialized); - - let notification_deserialized = Notification::deserialize(&mut Cursor::new(message_deserialized.rest_of_data)).unwrap(); - - assert_eq!(notification_deserialized, notif); - - - - - } -} \ No newline at end of file diff --git a/src/nex-implementation/secure/method_register.rs b/src/nex-implementation/secure/method_register.rs deleted file mode 100644 index 0d1cd61..0000000 --- a/src/nex-implementation/secure/method_register.rs +++ /dev/null @@ -1,63 +0,0 @@ -use std::io::{Cursor, Write}; -use std::sync::Arc; -use bytemuck::bytes_of; -use tokio::sync::Mutex; -use crate::prudp::station_url::{nat_types, StationUrl}; -use crate::prudp::station_url::Type::PRUDPS; -use crate::prudp::station_url::UrlOptions::{Address, NatFiltering, NatMapping, NatType, Port, PrincipalID, RVConnectionID}; -use crate::rmc::message::RMCMessage; -use crate::rmc::response::{ErrorCode, RMCResponseResult}; -use crate::rmc::structures::qresult::QResult; -use crate::rmc::structures::RmcSerialize; - -type StringList = Vec; - -pub async fn register(rmcmessage: &RMCMessage, _station_urls: Vec, conn_data: &Arc>) -> RMCResponseResult{ - let locked = conn_data.lock().await; - let Some(active_connection_data) = locked.active_connection_data.as_ref() else { - return rmcmessage.error_result_with_code(ErrorCode::RendezVous_NotAuthenticated) - }; - - let Some(active_secure_connection_data) = active_connection_data.active_secure_connection_data.as_ref() else { - return rmcmessage.error_result_with_code(ErrorCode::RendezVous_NotAuthenticated) - }; - - let public_station = StationUrl{ - url_type: PRUDPS, - options: vec![ - RVConnectionID(active_connection_data.connection_id), - Address(*locked.sock_addr.regular_socket_addr.ip()), - Port(locked.sock_addr.regular_socket_addr.port()), - NatFiltering(0), - NatMapping(0), - NatType(nat_types::BEHIND_NAT), - PrincipalID(active_secure_connection_data.pid), - ] - }; - - - - let result = QResult::success(ErrorCode::Core_Unknown); - - let mut response = Vec::new(); - - result.serialize(&mut response).expect("unable to serialize result"); - response.write_all(bytes_of(&active_connection_data.connection_id)).expect("unable to serialize connection id"); - public_station.to_string().serialize(&mut response).expect("unable to serialize station id"); - - rmcmessage.success_with_data(response) -} - -pub async fn register_raw_params(rmcmessage: &RMCMessage, _: &Arc, conn_data: &Arc>, _: ()) -> RMCResponseResult{ - let mut reader = Cursor::new(&rmcmessage.rest_of_data); - - let Ok(station_urls) = StringList::deserialize(&mut reader) else { - return rmcmessage.error_result_with_code(ErrorCode::Core_InvalidArgument); - }; - - let Ok(station_urls): Result, _> = station_urls.iter().map(|c| StationUrl::try_from((&c) as &str)).collect() else { - return rmcmessage.error_result_with_code(ErrorCode::Core_InvalidArgument); - }; - - register(rmcmessage, station_urls, conn_data).await -} \ No newline at end of file diff --git a/src/nex-implementation/secure/method_send_report.rs b/src/nex-implementation/secure/method_send_report.rs deleted file mode 100644 index 8242371..0000000 --- a/src/nex-implementation/secure/method_send_report.rs +++ /dev/null @@ -1,35 +0,0 @@ -use std::io::Cursor; -use std::sync::Arc; -use log::error; -use tokio::sync::Mutex; -use crate::endianness::{IS_BIG_ENDIAN, ReadExtensions}; -use crate::rmc::message::RMCMessage; -use crate::rmc::response::{RMCResponseResult}; -use crate::rmc::response::ErrorCode::Core_InvalidArgument; -use crate::rmc::structures::{qbuffer, RmcSerialize}; -use crate::rmc::structures::qbuffer::QBuffer; - -pub async fn send_report(rmcmessage: &RMCMessage, report_id: u32, data: Vec) -> RMCResponseResult{ - let result = tokio::fs::write(format!("./reports/{}", report_id), data).await; - - match result{ - Ok(_) => {}, - Err(e) => error!("{}", e) - } - - rmcmessage.success_with_data(Vec::new()) -} - -pub async fn send_report_raw_params(rmcmessage: &RMCMessage, _: &Arc, _conn_data: &Arc>, _: ()) -> RMCResponseResult{ - let mut reader = Cursor::new(&rmcmessage.rest_of_data); - - let Ok(error_id) = reader.read_struct(IS_BIG_ENDIAN) else { - return rmcmessage.error_result_with_code(Core_InvalidArgument); - }; - - let Ok(QBuffer(data)) = QBuffer::deserialize(&mut reader) else { - return rmcmessage.error_result_with_code(Core_InvalidArgument); - }; - - send_report(rmcmessage, error_id, data).await -} \ No newline at end of file diff --git a/src/nex-implementation/secure/mod.rs b/src/nex-implementation/secure/mod.rs deleted file mode 100644 index b59cce1..0000000 --- a/src/nex-implementation/secure/mod.rs +++ /dev/null @@ -1,13 +0,0 @@ -mod method_register; -mod method_send_report; - -use crate::define_protocol; -use crate::protocols::secure::method_register::register_raw_params; -use crate::protocols::secure::method_send_report::send_report_raw_params; - -define_protocol!{ - 11() => { - 0x01 => register_raw_params, - 0x08 => send_report_raw_params - } -} \ No newline at end of file diff --git a/src/nex-implementation/server.rs b/src/nex-implementation/server.rs deleted file mode 100644 index dbf93de..0000000 --- a/src/nex-implementation/server.rs +++ /dev/null @@ -1,61 +0,0 @@ -use std::future::Future; -use std::io::Cursor; -use std::pin::Pin; -use std::sync::Arc; -use log::error; -use tokio::sync::Mutex; -use crate::prudp::packet::PRUDPPacket; -use crate::rmc::message::RMCMessage; -use crate::rmc::response::{RMCResponse, RMCResponseResult, send_response}; -use crate::rmc::response::ErrorCode::Core_NotImplemented; -use crate::web::DirectionalData::Incoming; -use crate::web::WEB_DATA; - -type ContainedProtocolList = Box<[Box Fn(&'a RMCMessage, &'a Arc, &'a Arc>) -> Pin> + Send + 'a>> + Send + Sync>]>; - -pub struct RMCProtocolServer(ContainedProtocolList); - -impl RMCProtocolServer{ - pub fn new(protocols: ContainedProtocolList) -> Arc{ - Arc::new(Self(protocols)) - } - - pub async fn process_message(&self, packet: PRUDPPacket, socket: Arc, connection: Arc>){ - let locked = connection.lock().await; - let addr = locked.sock_addr.regular_socket_addr; - drop(locked); - let mut web = WEB_DATA.lock().await; - web.data.push((addr, Incoming(hex::encode(&packet.payload)))); - drop(web); - - let Ok(rmc) = RMCMessage::new(&mut Cursor::new(&packet.payload)) else { - error!("error reading rmc message"); - return; - }; - - println!("got rmc message {},{}", rmc.protocol_id, rmc.method_id); - - for proto in &self.0 { - if let Some(response) = proto(&rmc, &socket, &connection).await { - if matches!(response.response_result, RMCResponseResult::Error {..}){ - error!("an rmc error occurred") - } - let mut locked = connection.lock().await; - send_response(&packet, &socket, &mut locked, response).await; - drop(locked); - return; - } - } - - error!("tried to send message to unimplemented protocol {} with method id {}", rmc.protocol_id, rmc.method_id); - let mut locked = connection.lock().await; - send_response(&packet, &socket, &mut locked, RMCResponse{ - protocol_id: rmc.protocol_id as u8, - response_result: RMCResponseResult::Error { - call_id: rmc.call_id, - error_code: Core_NotImplemented - } - }).await; - - } -} \ No newline at end of file diff --git a/src/nex/auth_handler.rs b/src/nex/auth_handler.rs index d0b5e2e..04f0f4f 100644 --- a/src/nex/auth_handler.rs +++ b/src/nex/auth_handler.rs @@ -97,7 +97,7 @@ impl Auth for AuthHandler { source_login_data.0, ticket.into(), connection_data, - self.build_name.to_owned(), + self.build_name.to_string() //format!("{}; Rust NEX Version {} by DJMrTV", self.build_name, env!("CARGO_PKG_VERSION")), )) } diff --git a/src/nex/matchmake.rs b/src/nex/matchmake.rs new file mode 100644 index 0000000..d486663 --- /dev/null +++ b/src/nex/matchmake.rs @@ -0,0 +1,340 @@ +use std::collections::HashMap; +use std::str::FromStr; +use std::sync::{Arc, Weak}; +use std::sync::atomic::AtomicU32; +use std::sync::atomic::Ordering::{Relaxed, Release}; +use rand::random; +use tokio::sync::{Mutex, RwLock}; +use crate::kerberos::KerberosDateTime; +use crate::nex::user::User; +use crate::rmc::protocols::notifications::{NotificationEvent, RemoteNotification}; +use crate::rmc::protocols::notifications::notification_types::{HOST_CHANGED, OWNERSHIP_CHANGED}; +use crate::rmc::response::ErrorCode; +use crate::rmc::response::ErrorCode::{Core_InvalidArgument, RendezVous_SessionVoid}; +use crate::rmc::structures::matchmake::{Gathering, MatchmakeParam, MatchmakeSession, MatchmakeSessionSearchCriteria}; +use crate::rmc::structures::matchmake::gathering_flags::PERSISTENT_GATHERING; +use crate::rmc::structures::variant::Variant; + +pub struct MatchmakeManager{ + pub gid_counter: AtomicU32, + pub sessions: RwLock>>>, + pub rv_cid_counter: AtomicU32, + pub users: RwLock>> +} + +impl MatchmakeManager{ + pub fn next_gid(&self) -> u32{ + self.gid_counter.fetch_add(1, Relaxed) + } + + pub fn next_cid(&self) -> u32{ + self.rv_cid_counter.fetch_add(1, Relaxed) + } + + pub async fn get_session(&self, gid: u32) -> Result>, ErrorCode>{ + let sessions = self.sessions.read().await; + + let Some(session) = sessions.get(&gid) else { + return Err(RendezVous_SessionVoid); + }; + + let session = session.clone(); + drop(sessions); + + Ok(session) + } +} + + +#[derive(Default, Debug)] +pub struct ExtendedMatchmakeSession{ + pub session: MatchmakeSession, + pub connected_players: Vec>, +} + +fn read_bounds_string(str: &str) -> Option<(T,T)>{ + let bounds = str.split_once(",")?; + + Some((T::from_str(bounds.0).ok()?, T::from_str(bounds.1).ok()?)) +} + +fn check_bounds_str(compare: T, str: &str) -> Option{ + let bounds: (T, T) = read_bounds_string(str)?; + + Some(bounds.0 <= compare && compare <= bounds.1) +} + +pub async fn broadcast_notification>(players: &[T], notification_event: &NotificationEvent){ + for player in players{ + let player = player.as_ref(); + player.remote.process_notification_event(notification_event.clone()).await; + } +} + +impl ExtendedMatchmakeSession{ + pub fn get_active_players(&self) -> Vec>{ + self.connected_players.iter().filter_map(|u| u.upgrade()).collect() + } + + pub async fn broadcast_notification(&self, notification_event: &NotificationEvent){ + broadcast_notification(&self.get_active_players(), notification_event).await; + } + + pub async fn from_matchmake_session(gid: u32, session: MatchmakeSession, host: &Weak) -> Self{ + let Some(host) = host.upgrade() else{ + return Default::default(); + }; + + + let mm_session = MatchmakeSession{ + gathering: Gathering{ + self_gid: gid, + owner_pid: host.pid, + host_pid: host.pid, + ..session.gathering.clone() + }, + datetime: KerberosDateTime::now(), + session_key: (0..32).map(|_| random()).collect(), + matchmake_param: MatchmakeParam{ + params: vec![ + ("@SR".to_owned(), Variant::Bool(true)), + ("@GIR".to_owned(), Variant::SInt64(3)) + ] + }, + system_password_enabled: false, + ..session + }; + + Self{ + session: mm_session, + connected_players: Default::default() + } + } + + pub async fn add_players(&mut self, conns: &[Weak], join_msg: String) { + let Some(initiating_user) = conns[0].upgrade() else { + return + }; + + let initiating_pid = initiating_user.pid; + + let old_particip = self.connected_players.clone(); + for conn in conns { + self.connected_players.push(conn.clone()); + } + self.session.participation_count = self.connected_players.len() as u32; + + for other_connection in &conns[1..]{ + let Some(other_conn) = other_connection.upgrade() else { + continue; + }; + + + let other_pid = other_conn.pid; + /*if other_pid == self.session.gathering.owner_pid && + joining_pid == self.session.gathering.owner_pid{ + continue; + }*/ + + other_conn.remote.process_notification_event(NotificationEvent{ + pid_source: initiating_pid, + notif_type: 122000, + param_1: self.session.gathering.self_gid, + param_2: other_pid, + str_param: "".into(), + param_3: 0 + }).await; + } + + let list_of_connected_pids: Vec<_> = self.connected_players.iter().filter_map(|p| p.upgrade()).map(|p| p.pid).collect(); + + for other_connection in conns{ + let Some(other_conn) = other_connection.upgrade() else { + continue; + }; + + + let other_pid = other_conn.pid; + /*if other_pid == self.session.gathering.owner_pid && + joining_pid == self.session.gathering.owner_pid{ + continue; + }*/ + + for pid in &list_of_connected_pids { + other_conn.remote.process_notification_event(NotificationEvent { + pid_source: initiating_pid, + notif_type: 3001, + param_1: self.session.gathering.self_gid, + param_2: *pid, + str_param: join_msg.clone(), + param_3: self.connected_players.len() as _ + }).await; + } + } + + for old_conns in &old_particip{ + let Some(old_conns) = old_conns.upgrade() else { + continue; + }; + + let older_pid = old_conns.pid; + + + + initiating_user.remote.process_notification_event(NotificationEvent{ + pid_source: initiating_pid, + notif_type: 3001, + param_1: self.session.gathering.self_gid, + param_2: older_pid, + str_param: join_msg.clone(), + param_3: self.connected_players.len() as _ + }).await; + } + } + #[inline] + pub fn is_reachable(&self) -> bool{ + if self.session.gathering.flags & PERSISTENT_GATHERING != 0{ + if !self.connected_players.is_empty(){ + true + } else { + self.session.open_participation + } + } else { + !self.connected_players.is_empty() + } + } + #[inline] + pub fn is_joinable(&self) -> bool{ + self.is_reachable() && self.session.open_participation + } + + pub fn matches_criteria(&self, search_criteria: &MatchmakeSessionSearchCriteria) -> Result{ + // todo: implement the rest of the search criteria + + if search_criteria.vacant_only { + if (self.connected_players.len() as u16 + search_criteria.vacant_participants) > self.session.gathering.maximum_participants{ + return Ok(false); + } + } + + if search_criteria.exclude_locked{ + if !self.session.open_participation{ + return Ok(false); + } + } + + if search_criteria.exclude_system_password_set{ + if self.session.system_password_enabled{ + return Ok(false); + } + } + + if search_criteria.exclude_user_password_set{ + if self.session.user_password_enabled{ + return Ok(false); + } + } + + if !check_bounds_str(self.session.gathering.minimum_participants, &search_criteria.minimum_participants).ok_or(Core_InvalidArgument)? { + return Ok(false); + } + + if !check_bounds_str(self.session.gathering.maximum_participants, &search_criteria.maximum_participants).ok_or(Core_InvalidArgument)? { + return Ok(false); + } + + let game_mode: u32 = search_criteria.game_mode.parse().map_err(|_| Core_InvalidArgument)?; + + if self.session.gamemode != game_mode{ + return Ok(false); + } + + let mm_sys_type: u32 = search_criteria.matchmake_system_type.parse().map_err(|_| Core_InvalidArgument)?; + + if self.session.matchmake_system_type != mm_sys_type{ + return Ok(false); + } + + ; + + if search_criteria.attribs.get(0).map(|str| str.parse().ok()).flatten() != self.session.attributes.get(0).map(|v| *v){ + return Ok(false); + } + if search_criteria.attribs.get(2).map(|str| str.parse().ok()).flatten() != self.session.attributes.get(2).map(|v| *v){ + return Ok(false); + } + if search_criteria.attribs.get(3).map(|str| str.parse().ok()).flatten() != self.session.attributes.get(3).map(|v| *v){ + return Ok(false); + } + + Ok(true) + } + + pub async fn migrate_ownership(&mut self, initiator_pid: u32) -> Result<(), ErrorCode>{ + let players: Vec<_> = self.connected_players.iter().filter_map(|p| p.upgrade()).collect(); + + let Some(new_owner) = players.iter().find(|p| p.pid != self.session.gathering.owner_pid) else { + self.session.gathering.owner_pid = 0; + + return Ok(()); + }; + + self.session.gathering.owner_pid = new_owner.pid; + + self.broadcast_notification(&NotificationEvent{ + pid_source: initiator_pid, + notif_type: OWNERSHIP_CHANGED, + param_1: self.session.gathering.self_gid, + param_2: new_owner.pid, + ..Default::default() + }).await; + + Ok(()) + } + + pub async fn migrate_host(&mut self, initiator_pid: u32) -> Result<(), ErrorCode>{ + let players: Vec<_> = self.connected_players.iter().filter_map(|p| p.upgrade()).collect(); + + self.session.gathering.host_pid = self.session.gathering.owner_pid; + + self.broadcast_notification(&NotificationEvent{ + pid_source: initiator_pid, + notif_type: HOST_CHANGED, + param_1: self.session.gathering.self_gid, + ..Default::default() + }).await; + + Ok(()) + } + + pub async fn remove_player_from_session(&mut self, pid: u32, message: &str) -> Result<(), ErrorCode>{ + self.connected_players.retain(|u| u.upgrade().is_some_and(|u| u.pid != pid)); + + self.session.participation_count = (self.connected_players.len() & u32::MAX as usize) as u32; + + if pid == self.session.gathering.owner_pid { + self.migrate_ownership(pid).await?; + } + + if pid == self.session.gathering.host_pid { + self.migrate_host(pid).await?; + } + + // todo: support DisconnectChangeOwner + + // todo: finish the rest of this + + for player in self.connected_players.iter().filter_map(|p| p.upgrade()){ + player.remote.process_notification_event(NotificationEvent{ + notif_type: 3008, + pid_source: pid, + param_1: self.session.gathering.self_gid, + param_2: pid, + str_param: message.to_owned(), + .. Default::default() + }).await; + } + + Ok(()) + } +} \ No newline at end of file diff --git a/src/nex/mod.rs b/src/nex/mod.rs index 623d6d8..76791bc 100644 --- a/src/nex/mod.rs +++ b/src/nex/mod.rs @@ -1,3 +1,5 @@ pub mod account; pub mod auth_handler; -pub mod user; \ No newline at end of file +pub mod user; +pub mod remote_console; +pub mod matchmake; \ No newline at end of file diff --git a/src/nex/remote_console.rs b/src/nex/remote_console.rs new file mode 100644 index 0000000..3744aa1 --- /dev/null +++ b/src/nex/remote_console.rs @@ -0,0 +1,23 @@ +use macros::rmc_struct; +use crate::rmc::protocols::notifications::{Notification, NotificationEvent, RawNotification, RawNotificationInfo, RemoteNotification}; +use crate::rmc::protocols::nat_traversal::{NatTraversalConsole, RemoteNatTraversalConsole, RawNatTraversalConsoleInfo, RawNatTraversalConsole}; +use crate::define_rmc_proto; +use crate::nex::user::RemoteUserProtocol; + +define_rmc_proto!( + proto Console{ + Notification, + NatTraversalConsole + } +); +/* +#[rmc_struct(Console)] +pub struct TestRemoteConsole{ + pub remote: RemoteUserProtocol, +} + +impl Notification for TestRemoteConsole{ + async fn process_notification_event(&self, event: NotificationEvent) { + println!("NOTIF RECIEVED: {:?}", event); + } +}*/ \ No newline at end of file diff --git a/src/nex/user.rs b/src/nex/user.rs index 77ebfb6..019b2bd 100644 --- a/src/nex/user.rs +++ b/src/nex/user.rs @@ -1,45 +1,567 @@ -use std::net::{Ipv4Addr, SocketAddrV4}; -use macros::rmc_struct; +use std::io::ErrorKind::HostUnreachable; use crate::define_rmc_proto; -use crate::prudp::station_url::{nat_types, StationUrl}; -use crate::prudp::station_url::Type::PRUDPS; -use crate::prudp::station_url::UrlOptions::{Address, NatFiltering, NatMapping, NatType, Port, PrincipalID, RVConnectionID}; -use crate::rmc::protocols::secure::{RemoteAuth, RawAuthInfo, RawAuth, Auth}; +use crate::nex::matchmake::{ExtendedMatchmakeSession, MatchmakeManager}; +use crate::nex::remote_console::RemoteConsole; +use crate::prudp::sockaddr::PRUDPSockAddr; +use crate::prudp::station_url::Type::{PRUDP, PRUDPS}; +use crate::prudp::station_url::UrlOptions::{ + Address, NatFiltering, NatMapping, NatType, Platform, Port, PrincipalID, RVConnectionID, + StreamID, PMP, UPNP, +}; +use crate::prudp::station_url::{nat_types, StationUrl, Type}; +use crate::rmc::protocols::matchmake::{ + Matchmake, RawMatchmake, RawMatchmakeInfo, RemoteMatchmake, +}; +use crate::rmc::protocols::ranking::{Ranking, RawRanking, RawRankingInfo, RemoteRanking}; +use crate::rmc::protocols::matchmake_extension::{ + MatchmakeExtension, RawMatchmakeExtension, RawMatchmakeExtensionInfo, RemoteMatchmakeExtension, +}; +use crate::rmc::protocols::nat_traversal::{NatTraversal, RawNatTraversal, RawNatTraversalInfo, RemoteNatTraversal, RemoteNatTraversalConsole}; +use crate::rmc::protocols::secure::{RawSecure, RawSecureInfo, RemoteSecure, Secure}; +use crate::rmc::protocols::matchmake_ext::{MatchmakeExt, RawMatchmakeExt, RawMatchmakeExtInfo, RemoteMatchmakeExt}; use crate::rmc::response::ErrorCode; +use crate::rmc::structures::matchmake::{AutoMatchmakeParam, CreateMatchmakeSessionParam, JoinMatchmakeSessionParam, MatchmakeSession}; + use crate::rmc::structures::qresult::QResult; +use macros::rmc_struct; +use std::net::{Ipv4Addr, SocketAddrV4}; +use std::sync::{Arc, Weak}; +use log::{error, info}; +use rocket::http::ext::IntoCollection; +use tokio::sync::{Mutex, RwLock}; +use tonic::Code::InvalidArgument; +use crate::prudp::station_url::nat_types::PUBLIC; +use crate::rmc::protocols::notifications::{NotificationEvent, RemoteNotification}; +use crate::rmc::response::ErrorCode::{Core_Exception, Core_InvalidArgument, RendezVous_AccountExpired, RendezVous_SessionVoid}; define_rmc_proto!( proto UserProtocol{ - Auth + Secure, + MatchmakeExtension, + MatchmakeExt, + Matchmake, + NatTraversal, + Ranking } ); #[rmc_struct(UserProtocol)] pub struct User { pub pid: u32, - pub ip: SocketAddrV4, + pub ip: PRUDPSockAddr, + pub this: Weak, + pub remote: RemoteConsole, + pub station_url: RwLock>, + pub matchmake_manager: Arc, } -impl Auth for User{ - async fn register(&self, station_urls: Vec) -> Result<(QResult, u32, String), ErrorCode> { - let public_station = StationUrl{ - url_type: PRUDPS, - options: vec![ - RVConnectionID(0), - Address(*self.ip.ip()), - Port(self.ip.port()), - NatFiltering(0), - NatMapping(0), - NatType(nat_types::BEHIND_NAT), - PrincipalID(self.pid), - ] +impl Secure for User { + async fn register( + &self, + station_urls: Vec, + ) -> Result<(QResult, u32, StationUrl), ErrorCode> { + let cid = self.matchmake_manager.next_cid(); + + println!("{:?}", station_urls); + + let mut users = self.matchmake_manager.users.write().await; + users.insert(cid, self.this.clone()); + drop(users); + + let mut public_station: Option = None; + let mut private_station: Option = None; + + for station in station_urls { + let is_public = station.options.iter().any(|v| { + if let NatType(v) = v { + if *v & PUBLIC != 0 { + return true; + } + } + false + }); + + let Some(nat_filtering) = station.options.iter().find_map(|v| match v { + NatFiltering(v) => Some(v), + _ => None + }) else { + return Err(Core_Exception); + }; + + let Some(nat_mapping) = station.options.iter().find_map(|v| match v { + NatMapping(v) => Some(v), + _ => None + }) else { + return Err(Core_Exception); + }; + + if !is_public || (*nat_filtering == 0 && *nat_mapping == 0) { + private_station = Some(station.clone()); + } + + if is_public { + public_station = Some(station); + } + } + + let Some(mut private_station) = private_station else { + return Err(Core_Exception); }; + let mut public_station = if let Some(public_station) = public_station { + public_station + } else { + let mut public_station = private_station.clone(); + + public_station.options.retain(|v| { + match v { + Address(_) | Port(_) | NatFiltering(_) | NatMapping(_) | NatType(_) => false, + _ => true + } + }); + + public_station.options.push(Address(*self.ip.regular_socket_addr.ip())); + public_station.options.push(Port(self.ip.regular_socket_addr.port())); + public_station.options.push(NatFiltering(0)); + public_station.options.push(NatMapping(0)); + public_station.options.push(NatType(3)); + + public_station + }; + + let mut both = [&mut public_station, &mut private_station]; + + for station in both { + station.options.retain(|v| { + match v { + PrincipalID(_) | RVConnectionID(_) => false, + _ => true + } + }); + + station.options.push(PrincipalID(self.pid)); + station.options.push(RVConnectionID(cid)); + } + + + let mut lock = self.station_url.write().await; + + *lock = vec![ + public_station.clone(), + // private_station.clone() + ]; + + drop(lock); + let result = QResult::success(ErrorCode::Core_Unknown); - Ok((result, 0, public_station.to_string())) + let out = public_station.to_string(); + + println!("out: {}", out); + + Ok((result, cid, public_station)) + } + + async fn replace_url(&self, target_url: StationUrl, dest: StationUrl) -> Result<(), ErrorCode> { + let mut lock = self.station_url.write().await; + + let Some(target_addr) = target_url.options.iter().find(|v| matches!(v, Address(_))) else { + return Err(ErrorCode::Core_InvalidArgument); + }; + + let Some(target_port) = target_url.options.iter().find(|v| matches!(v, Port(_))) else { + return Err(ErrorCode::Core_InvalidArgument); + }; + + let Some(replacement_target) = lock.iter_mut().find(|url| { + url.options.iter().any(|o| o == target_addr) && + url.options.iter().any(|o| o == target_port) + }) else { + return Err(ErrorCode::Core_InvalidArgument); + }; + *replacement_target = dest; + + drop(lock); + + Ok(()) } } +impl MatchmakeExtension for User { + async fn close_participation(&self, gid: u32) -> Result<(), ErrorCode> { + let session = self.matchmake_manager.get_session(gid).await?; + + let mut session = session.lock().await; + + session.session.open_participation = false; + + Ok(()) + } + + async fn open_participation(&self, gid: u32) -> Result<(), ErrorCode> { + let session = self.matchmake_manager.get_session(gid).await?; + + let mut session = session.lock().await; + + session.session.open_participation = true; + + Ok(()) + } + + async fn get_playing_session(&self, pids: Vec) -> Result, ErrorCode> { + Ok(Vec::new()) + } + + async fn update_progress_score(&self, gid: u32, progress: u8) -> Result<(), ErrorCode> { + let session = self.matchmake_manager.get_session(gid).await?; + + let mut session = session.lock().await; + + session.session.progress_score = progress; + + Ok(()) + } + + async fn create_matchmake_session_with_param( + &self, + create_session_param: CreateMatchmakeSessionParam, + ) -> Result { + println!("{:?}", create_session_param); + + let gid = self.matchmake_manager.next_gid(); + + let mut new_session = ExtendedMatchmakeSession::from_matchmake_session( + gid, + create_session_param.matchmake_session, + &self.this.clone(), + ) + .await; + + let mut joining_players = vec![self.this.clone()]; + + let users = self.matchmake_manager.users.read().await; + + if let Ok(old_gathering) = self.matchmake_manager.get_session(create_session_param.gid_for_participation_check).await { + let old_gathering = old_gathering.lock().await; + + let players = old_gathering.connected_players.iter().filter_map(|v| v.upgrade()).filter(|u| create_session_param.additional_participants.iter().any(|p| *p == u.pid)); + for player in players { + joining_players.push(Arc::downgrade(&player)); + } + } + drop(users); + + new_session.session.participation_count = create_session_param.participation_count as u32; + new_session + .add_players(&joining_players, create_session_param.join_message) + .await; + + let session = new_session.session.clone(); + + let mut sessions = self.matchmake_manager.sessions.write().await; + sessions.insert(gid, Arc::new(Mutex::new(new_session))); + drop(sessions); + + Ok(session) + } + + async fn join_matchmake_session_with_param( + &self, + join_session_param: JoinMatchmakeSessionParam, + ) -> Result { + let session = self.matchmake_manager.get_session(join_session_param.gid).await?; + + let mut session = session.lock().await; + + session.connected_players.retain(|v| v.upgrade().is_some_and(|v| v.pid != self.pid)); + + let mut joining_players = vec![self.this.clone()]; + + let users = self.matchmake_manager.users.read().await; + + if let Ok(old_gathering) = self.matchmake_manager.get_session(join_session_param.gid_for_participation_check).await { + let old_gathering = old_gathering.lock().await; + + let players = old_gathering.connected_players.iter().filter_map(|v| v.upgrade()).filter(|u| join_session_param.additional_participants.iter().any(|p| *p == u.pid)); + for player in players { + joining_players.push(Arc::downgrade(&player)); + } + } + + drop(users); + + session + .add_players(&joining_players, join_session_param.join_message) + .await; + + let mm_session = session.session.clone(); + + Ok(mm_session) + } + + async fn auto_matchmake_with_param_postpone(&self, param: AutoMatchmakeParam) -> Result { + println!("{:?}", param); + + let mut joining_players = vec![self.this.clone()]; + + let users = self.matchmake_manager.users.read().await; + + if let Ok(old_gathering) = self.matchmake_manager.get_session(param.gid_for_participation_check).await { + let old_gathering = old_gathering.lock().await; + + let players = old_gathering.connected_players.iter().filter_map(|v| v.upgrade()).filter(|u| param.additional_participants.iter().any(|p| *p == u.pid)); + for player in players { + joining_players.push(Arc::downgrade(&player)); + } + } + + drop(users); + + let sessions = self.matchmake_manager.sessions.read().await; + for session in sessions.values() { + let mut session = session.lock().await; + + println!("checking session!"); + + if !session.is_joinable() { + continue; + } + + let mut bool_matched_criteria = false; + + for criteria in ¶m.search_criteria { + if session.matches_criteria(criteria)? { + bool_matched_criteria = true; + } + } + + if bool_matched_criteria { + session.add_players(&joining_players, param.join_message).await; + + return Ok(session.session.clone()); + } + } + + drop(sessions); + + println!("making new session!"); + + let AutoMatchmakeParam { + join_message, + participation_count, + gid_for_participation_check, + matchmake_session, + additional_participants, + .. + } = param; + + self.create_matchmake_session_with_param(CreateMatchmakeSessionParam { + join_message, + participation_count, + gid_for_participation_check, + create_matchmake_session_option: 0, + matchmake_session, + additional_participants, + }).await + } + + async fn find_matchmake_session_by_gathering_id_detail(&self, gid: u32) -> Result { + let session = self.matchmake_manager.get_session(gid).await?; + let session = session.lock().await; + + Ok(session.session.clone()) + } + + async fn modify_current_game_attribute(&self, gid: u32, attrib_index: u32, attrib_val: u32) -> Result<(), ErrorCode> { + let session = self.matchmake_manager.get_session(gid).await?; + let mut session = session.lock().await; + + session.session.attributes[attrib_index as usize] = attrib_val; + + Ok(()) + } +} + +impl Matchmake for User { + async fn unregister_gathering(&self, gid: u32) -> Result { + Ok(true) + } + async fn get_session_urls(&self, gid: u32) -> Result, ErrorCode> { + let session = self.matchmake_manager.get_session(gid).await?; + + let session = session.lock().await; + + let urls: Vec<_> = + session + .connected_players + .iter() + .filter_map(|v| v.upgrade()) + .filter(|u| u.pid == session.session.gathering.host_pid) + .map(|u| async move { + u.station_url.read().await.clone() + }) + .next() + .ok_or(ErrorCode::RendezVous_SessionClosed)? + .await; + + + println!("{:?}", urls); + + if urls.is_empty(){ + return Err(ErrorCode::RendezVous_NotParticipatedGathering) + } + + Ok(urls) + } + + async fn update_session_host(&self, gid: u32, change_session_owner: bool) -> Result<(), ErrorCode> { + let session = self.matchmake_manager.get_session(gid).await?; + let mut session = session.lock().await; + + session.session.gathering.host_pid = self.pid; + + for player in &session.connected_players { + let Some(player) = player.upgrade() else { + continue; + }; + + player.remote.process_notification_event(NotificationEvent { + notif_type: 110000, + pid_source: self.pid, + param_1: gid, + param_2: self.pid, + param_3: 0, + str_param: "".to_string(), + }).await; + } + + if change_session_owner { + session.session.gathering.owner_pid = self.pid; + + + for player in &session.connected_players { + let Some(player) = player.upgrade() else { + continue; + }; + + player.remote.process_notification_event(NotificationEvent { + notif_type: 4000, + pid_source: self.pid, + param_1: gid, + param_2: self.pid, + param_3: 0, + str_param: "".to_string(), + }).await; + } + } + + Ok(()) + } + + async fn migrate_gathering_ownership(&self, gid: u32, candidates: Vec, participants_only: bool) -> Result<(), ErrorCode> { + let session = self.matchmake_manager.get_session(gid).await?; + let mut session = session.lock().await; + + let candidate = candidates.get(0).ok_or(Core_InvalidArgument)?; + + session.session.gathering.owner_pid = *candidate; + + for player in &session.connected_players { + let Some(player) = player.upgrade() else { + continue; + }; + + player.remote.process_notification_event(NotificationEvent { + notif_type: 4000, + pid_source: self.pid, + param_1: gid, + param_2: *candidate, + param_3: 0, + str_param: "".to_string(), + }).await; + } + + Ok(()) + } +} + +impl MatchmakeExt for User { + async fn end_participation(&self, gid: u32, message: String) -> Result { + let session = self.matchmake_manager.get_session(gid).await?; + let mut session = session.lock().await; + + session.remove_player_from_session(self.pid, &message).await?; + + Ok(true) + } + + +} + +impl NatTraversal for User { + async fn report_nat_properties( + &self, + nat_mapping: u32, + nat_filtering: u32, + _rtt: u32, + ) -> Result<(), ErrorCode> { + let mut urls = self.station_url.write().await; + + for station_url in urls.iter_mut() { + station_url.options.retain(|o| match o { + NatMapping(_) | NatFiltering(_) => false, + _ => true + }); + + station_url.options.push(NatMapping(nat_mapping as u8)); + station_url.options.push(NatFiltering(nat_filtering as u8)); + } + + Ok(()) + } + + async fn report_nat_traversal_result(&self, cid: u32, result: bool, rtt: u32) -> Result<(), ErrorCode> { + Ok(()) + } + + async fn request_probe_initiation(&self, station_to_probe: String) -> Result<(), ErrorCode> { + info!("NO!"); + Err(RendezVous_AccountExpired) + } + + async fn request_probe_initialization_ext(&self, target_list: Vec, station_to_probe: String) -> Result<(), ErrorCode> { + let users = self.matchmake_manager.users.read().await; + + println!("requesting station probe for {:?} to {:?}", target_list, station_to_probe); + + for target in target_list { + let Ok(url) = StationUrl::try_from(target.as_ref()) else { + continue; + }; + + let Some(RVConnectionID(v)) = url.options.into_iter().find(|o| { matches!(o, &RVConnectionID(_)) }) else { + continue; + }; + + let Some(v) = users.get(&v) else { + continue; + }; + + let Some(user) = v.upgrade() else { + continue; + }; + + user.remote.request_probe_initiation(station_to_probe.clone()).await; + } + + info!("finished probing"); + + Ok(()) + } +} + +impl Ranking for User{ + +} \ No newline at end of file diff --git a/src/prudp/packet.rs b/src/prudp/packet.rs index 75dc211..ab72874 100644 --- a/src/prudp/packet.rs +++ b/src/prudp/packet.rs @@ -44,26 +44,27 @@ pub type Result = std::result::Result; pub struct TypesFlags(u16); impl TypesFlags { + #[inline] pub const fn get_types(self) -> u8 { (self.0 & 0x000F) as u8 } - + #[inline] pub const fn get_flags(self) -> u16 { (self.0 & 0xFFF0) >> 4 } - + #[inline] pub const fn types(self, val: u8) -> Self { Self((self.0 & 0xFFF0) | (val as u16 & 0x000F)) } - + #[inline] pub const fn flags(self, val: u16) -> Self { Self((self.0 & 0x000F) | ((val << 4) & 0xFFF0)) } - + #[inline] pub const fn set_flag(&mut self, val: u16){ self.0 |= (val & 0xFFF) << 4; } - + #[inline] pub const fn set_types(&mut self, val: u8){ self.0 |= val as u16 & 0x0F; } @@ -340,8 +341,7 @@ impl PRUDPPacket { options.push(PacketOption::from(option_id, &option_data)?); } - - trace!("reading payload"); + let mut payload = vec![0u8; header.payload_size as usize]; reader.read_exact(&mut payload)?; diff --git a/src/prudp/router.rs b/src/prudp/router.rs index 8621236..de2f249 100644 --- a/src/prudp/router.rs +++ b/src/prudp/router.rs @@ -47,8 +47,6 @@ impl Router { }, }; - trace!("got valid prudp packet from someone({}): \n{:?}", addr, packet); - let connection = packet.source_sockaddr(addr); @@ -63,11 +61,10 @@ impl Router { // Dont keep the locked structure for too long drop(endpoints); - - trace!("sending packet to endpoint"); + tokio::spawn(async move { - endpoint.recieve_packet(connection, packet).await + endpoint.receive_packet(connection, packet).await }); } } @@ -95,7 +92,7 @@ impl Router { } pub async fn new(addr: SocketAddrV4) -> io::Result<(Arc, JoinHandle<()>)>{ - trace!("starting router on {}", addr); + // trace!("starting router on {}", addr); let socket = Arc::new(UdpSocket::bind(addr).await?); diff --git a/src/prudp/socket.rs b/src/prudp/socket.rs index 5702c51..5bb08f9 100644 --- a/src/prudp/socket.rs +++ b/src/prudp/socket.rs @@ -1,12 +1,14 @@ +use crate::nex::account::Account; use crate::prudp::packet::flags::{ACK, HAS_SIZE, MULTI_ACK, NEED_ACK, RELIABLE}; use crate::prudp::packet::types::{CONNECT, DATA, DISCONNECT, PING, SYN}; -use crate::prudp::packet::PacketOption::{ConnectionSignature, FragmentId, InitialSequenceId, MaximumSubstreamId, SupportedFunctions}; +use crate::prudp::packet::PacketOption::{ + ConnectionSignature, FragmentId, InitialSequenceId, MaximumSubstreamId, SupportedFunctions, +}; use crate::prudp::packet::{PRUDPHeader, PRUDPPacket, PacketOption, TypesFlags, VirtualPort}; use crate::prudp::router::{Error, Router}; use crate::prudp::sockaddr::PRUDPSockAddr; -use crate::web::DirectionalData::Outgoing; -use crate::web::WEB_DATA; use async_trait::async_trait; +use chrono::NaiveTime; use hmac::digest::consts::U5; use log::info; use log::{error, trace, warn}; @@ -23,17 +25,18 @@ use std::net::SocketAddrV4; use std::ops::Deref; use std::pin::Pin; use std::sync::{Arc, Weak}; + +use std::time::Duration; use tokio::net::UdpSocket; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::{Mutex, RwLock}; +use tokio::time::{sleep, Instant}; use tokio_stream::Stream; -use crate::nex::account::Account; // due to the way this is designed crashing the router thread causes deadlock, sorry ;-; // (maybe i will fix that some day) /// PRUDP Socket for accepting connections to then send and recieve data from those clients - pub struct EncryptionPair { pub send: T, pub recv: T, @@ -48,11 +51,6 @@ impl EncryptionPair { } } -pub struct NewEncryptionPair { - pub send: E, - pub recv: E, -} - pub struct CommonConnection { pub user_id: u32, pub socket_addr: PRUDPSockAddr, @@ -62,29 +60,48 @@ pub struct CommonConnection { struct InternalConnection { common: Arc, + connections: Weak>>>>>, reliable_server_counter: u16, reliable_client_counter: u16, // maybe add connection id(need to see if its even needed) crypto_handler_instance: E, data_sender: Sender>, - socket: Arc + socket: Arc, + packet_queue: HashMap, + last_packet_time: Instant, } -impl Deref for InternalConnection{ +impl Deref for InternalConnection { type Target = CommonConnection; fn deref(&self) -> &Self::Target { &self.common } } -impl InternalConnection{ - fn next_server_count(&mut self) -> u16{ +impl InternalConnection { + fn next_server_count(&mut self) -> u16 { let prev_val = self.reliable_server_counter; let (val, _) = self.reliable_server_counter.overflowing_add(1); self.reliable_server_counter = val; - println!("{}", prev_val); + prev_val } + + #[inline] + async fn send_raw_packet(&self, mut prudp_packet: PRUDPPacket) { + prudp_packet.set_sizes(); + + let mut vec = Vec::new(); + + prudp_packet + .write_to(&mut vec) + .expect("somehow failed to convert backet to bytes"); + + self.socket + .send_to(&vec, self.socket_addr.regular_socket_addr) + .await + .expect("failed to send data back"); + } } pub struct ExternalConnection { @@ -93,9 +110,9 @@ pub struct ExternalConnection { } #[derive(Clone)] -pub struct SendingConnection{ +pub struct SendingConnection { common: Arc, - inernal: Weak> + internal: Weak>, } pub struct CommonSocket { @@ -121,8 +138,8 @@ pub struct ExternalSocket { internal: Weak, } -impl ExternalSocket{ - pub async fn connect(&mut self, addr: PRUDPSockAddr) -> Option{ +impl ExternalSocket { + pub async fn connect(&mut self, addr: PRUDPSockAddr) -> Option { let socket = self.internal.upgrade()?; socket.connect(addr).await; @@ -130,7 +147,7 @@ impl ExternalSocket{ self.connection_receiver.recv().await } - pub async fn accept(&mut self) -> Option{ + pub async fn accept(&mut self) -> Option { self.connection_receiver.recv().await } } @@ -153,7 +170,7 @@ impl Deref for InternalSocket { pub(super) trait AnyInternalSocket: Send + Sync + Deref + 'static { - async fn recieve_packet(&self, address: PRUDPSockAddr, packet: PRUDPPacket); + async fn receive_packet(&self, address: PRUDPSockAddr, packet: PRUDPPacket); async fn connect(&self, address: PRUDPSockAddr) -> Option<()>; } @@ -162,13 +179,15 @@ pub(super) trait AnyInternalConnection: Send + Sync + Deref + 'static { async fn send_data_packet(&mut self, data: Vec); + + async fn close_connection(&mut self); } #[async_trait] -impl AnyInternalConnection for InternalConnection{ +impl AnyInternalConnection for InternalConnection { async fn send_data_packet(&mut self, data: Vec) { - let mut packet = PRUDPPacket{ - header: PRUDPHeader{ + let mut packet = PRUDPPacket { + header: PRUDPHeader { sequence_id: self.next_server_count(), substream_id: 0, session_id: self.session_id, @@ -182,28 +201,74 @@ impl AnyInternalConnection for InternalConne ..Default::default() }; - self.crypto_handler_instance.encrypt_outgoing(0, &mut packet.payload[..]); + self.crypto_handler_instance + .encrypt_outgoing(0, &mut packet.payload[..]); packet.set_sizes(); self.crypto_handler_instance.sign_packet(&mut packet); - let mut vec = Vec::new(); + self.send_raw_packet(packet).await; + } - packet - .write_to(&mut vec) - .expect("somehow failed to convert backet to bytes"); + async fn close_connection(&mut self) { + // jon confirmed that this should be a safe way to dc a client - println!("{}", hex::encode(&vec)); + let mut packet = PRUDPPacket { + header: PRUDPHeader { + sequence_id: self.next_server_count(), + substream_id: 0, + session_id: self.session_id, + types_and_flags: TypesFlags::default().types(DISCONNECT), + destination_port: self.common.socket_addr.virtual_port, + source_port: self.server_port, + ..Default::default() + }, + payload: Vec::new(), + options: vec![FragmentId(0)], + ..Default::default() + }; - self.socket - .send_to(&vec, self.socket_addr.regular_socket_addr) - .await - .expect("failed to send data back"); + // no need for encryption the, the payload is empty + + packet.set_sizes(); + + self.crypto_handler_instance.sign_packet(&mut packet); + + self.send_raw_packet(packet).await; + + let Some(conns) = self.connections.upgrade() else { + // this is fine as it implies the server has already quit, thus meaning that we dont + // have to remove ourselves from the server + return; + }; + + let mut conns = conns.lock().await; + + conns.remove(&self.socket_addr); + + // the connection will now drop as soon as we leave this due to no longer having a permanent + // reference } } impl InternalSocket { + async fn get_connection( + &self, + addr: PRUDPSockAddr, + ) -> Option>>> { + let connections = self.internal_connections.lock().await; + let Some(conn) = connections.get(&addr) else { + error!("tried to send data on inactive connection!"); + return None; + }; + + let conn = conn.clone(); + drop(connections); + + Some(conn) + } + async fn send_packet_unbuffered(&self, dest: PRUDPSockAddr, mut packet: PRUDPPacket) { packet.set_sizes(); @@ -235,12 +300,12 @@ impl InternalSocket { // todo: refactor this to be more readable(low priority cause it doesnt change anything api wise) for options in &packet.options { match options { - SupportedFunctions(functions) => response - .options - .push(SupportedFunctions(*functions & 0x04)), - MaximumSubstreamId(max_substream) => response - .options - .push(MaximumSubstreamId(*max_substream)), + SupportedFunctions(functions) => { + response.options.push(SupportedFunctions(*functions & 0xFF)) + } + MaximumSubstreamId(max_substream) => { + response.options.push(MaximumSubstreamId(*max_substream)) + } _ => { /* ??? */ } } } @@ -251,41 +316,41 @@ impl InternalSocket { //println!("got syn: {:?}", response); - self.send_packet_unbuffered(address, response) - .await; + self.send_packet_unbuffered(address, response).await; } async fn connection_thread( - socket: Arc, - self_port: VirtualPort, - connection: Arc>>, - mut data_recv: Receiver> + connection: Weak>>, ) { //todo: handle stuff like resending packets if they arent acknowledged in here - while let Some(data) = data_recv.recv().await{ - let mut locked_conn = connection.lock().await; - let packet = PRUDPPacket{ - header: PRUDPHeader{ - sequence_id: locked_conn.next_server_count(), - substream_id: 0, - session_id: locked_conn.session_id, - types_and_flags: TypesFlags::default().types(DATA).flags(RELIABLE | NEED_ACK), - destination_port: locked_conn.common.socket_addr.virtual_port, - source_port: self_port, + + while let Some(conn) = connection.upgrade() { + let mut conn = conn.lock().await; + + if conn.last_packet_time < (Instant::now() - Duration::from_secs(5)) { + conn.send_raw_packet(PRUDPPacket { + header: PRUDPHeader { + sequence_id: 0, + substream_id: 0, + session_id: 0, + types_and_flags: TypesFlags::default().types(PING).flags(NEED_ACK), + destination_port: conn.common.socket_addr.virtual_port, + source_port: conn.server_port, + ..Default::default() + }, + payload: Vec::new(), + options: vec![], ..Default::default() - }, - payload: data, - options: vec![FragmentId(0)], - ..Default::default() - }; - - //packet. - - - - + }) + .await; + } + if conn.last_packet_time < (Instant::now() - Duration::from_secs(30)) { + conn.close_connection().await; + } + drop(conn); + sleep(Duration::from_secs(5)).await; } } @@ -294,12 +359,13 @@ impl InternalSocket { crypto_handler_instance: T::CryptoConnectionInstance, socket_addr: PRUDPSockAddr, session_id: u8, + is_instantiator: bool, ) { let common = Arc::new(CommonConnection { user_id: crypto_handler_instance.get_user_id(), socket_addr, session_id, - server_port: self.virtual_port + server_port: self.virtual_port, }); let (data_sender_from_client, data_receiver_from_client) = channel(16); @@ -307,10 +373,13 @@ impl InternalSocket { let internal = InternalConnection { common: common.clone(), crypto_handler_instance, - reliable_client_counter: 2, - reliable_server_counter: 1, + connections: Arc::downgrade(&self.internal_connections), + reliable_client_counter: if is_instantiator { 1 } else { 2 }, + reliable_server_counter: if is_instantiator { 2 } else { 1 }, data_sender: data_sender_from_client, - socket: self.socket.clone() + socket: self.socket.clone(), + packet_queue: Default::default(), + last_packet_time: Instant::now(), }; let internal = Arc::new(Mutex::new(internal)); @@ -318,24 +387,21 @@ impl InternalSocket { let dyn_internal: Arc> = internal.clone(); let external = ExternalConnection { - sending: SendingConnection{ + sending: SendingConnection { common, - inernal: Arc::downgrade(&dyn_internal) + internal: Arc::downgrade(&dyn_internal), }, data_receiver: data_receiver_from_client, - }; - - - - let mut connections = self.internal_connections.lock().await; connections.insert(socket_addr, internal.clone()); drop(connections); + tokio::spawn(Self::connection_thread(Arc::downgrade(&internal))); + self.connection_sender .send(external) .await @@ -385,7 +451,6 @@ impl InternalSocket { response.payload = return_data; - //let remote_signature = address.calculate_connection_signature(); response @@ -394,24 +459,22 @@ impl InternalSocket { for option in &packet.options { match option { - MaximumSubstreamId(max_substream) => response - .options - .push(MaximumSubstreamId(*max_substream)), - SupportedFunctions(funcs) => { - response.options.push(SupportedFunctions(*funcs)) + MaximumSubstreamId(max_substream) => { + response.options.push(MaximumSubstreamId(*max_substream)) } + SupportedFunctions(funcs) => response.options.push(SupportedFunctions(*funcs & 0xFF)), _ => { /* ? */ } } } - response.set_sizes(); crypto.sign_connect(&mut response); //println!("connect out: {:?}", response); - self.create_connection(crypto, address, session_id).await; + self.create_connection(crypto, address, session_id, false) + .await; self.send_packet_unbuffered(address, response).await; } @@ -419,44 +482,53 @@ impl InternalSocket { async fn handle_data(&self, address: PRUDPSockAddr, mut packet: PRUDPPacket) { info!("got data"); - if packet.header.types_and_flags.get_flags() & (NEED_ACK | RELIABLE) != (NEED_ACK | RELIABLE){ + if packet.header.types_and_flags.get_flags() & (NEED_ACK | RELIABLE) + != (NEED_ACK | RELIABLE) + { error!("invalid or unimplemented packet flags"); } let connections = self.internal_connections.lock().await; - let Some(conn) = connections.get(&address) else{ + let Some(conn) = connections.get(&address) else { error!("tried to send data on inactive connection!"); - return + return; }; + let conn = conn.clone(); drop(connections); + println!("recieved packed id: {}", packet.header.sequence_id); + let mut conn = conn.lock().await; - conn.crypto_handler_instance.decrypt_incoming(packet.header.substream_id, &mut packet.payload[..]); + conn.packet_queue.insert(packet.header.sequence_id, packet); - let mut data = Vec::new(); + let mut counter = conn.reliable_client_counter; - mem::swap(&mut data, &mut packet.payload); + while let Some(mut packet) = conn.packet_queue.remove(&counter) { + conn.crypto_handler_instance + .decrypt_incoming(packet.header.substream_id, &mut packet.payload[..]); - let mut response = packet.base_acknowledgement_packet(); - response.header.types_and_flags.set_flag(HAS_SIZE | ACK); - response.header.session_id = conn.session_id; + let mut response = packet.base_acknowledgement_packet(); + response.header.types_and_flags.set_flag(HAS_SIZE | ACK); + response.header.session_id = conn.session_id; - conn.crypto_handler_instance.sign_packet(&mut response); + conn.crypto_handler_instance.sign_packet(&mut response); - self.send_packet_unbuffered(address, response).await; - - conn.data_sender.send(data).await.ok(); + self.send_packet_unbuffered(address, response).await; + conn.data_sender.send(packet.payload).await.ok(); + conn.reliable_client_counter = conn.reliable_client_counter.overflowing_add(1).0; + counter = conn.reliable_client_counter; + } } - async fn handle_ping(&self, address: PRUDPSockAddr, packet: PRUDPPacket){ + async fn handle_ping(&self, address: PRUDPSockAddr, packet: PRUDPPacket) { let connections = self.internal_connections.lock().await; - let Some(conn) = connections.get(&address) else{ + let Some(conn) = connections.get(&address) else { error!("tried to send data on inactive connection!"); - return + return; }; let conn = conn.clone(); drop(connections); @@ -472,11 +544,11 @@ impl InternalSocket { self.send_packet_unbuffered(address, response).await; } - async fn handle_disconnect(&self, address: PRUDPSockAddr, packet: PRUDPPacket){ + async fn handle_disconnect(&self, address: PRUDPSockAddr, packet: PRUDPPacket) { let connections = self.internal_connections.lock().await; - let Some(conn) = connections.get(&address) else{ + let Some(conn) = connections.get(&address) else { error!("tried to send data on inactive connection!"); - return + return; }; let conn = conn.clone(); drop(connections); @@ -492,37 +564,53 @@ impl InternalSocket { self.send_packet_unbuffered(address, response.clone()).await; self.send_packet_unbuffered(address, response.clone()).await; self.send_packet_unbuffered(address, response).await; + + //self.internal_connections.lock().await; } } #[async_trait] impl AnyInternalSocket for InternalSocket { - async fn recieve_packet(&self, address: PRUDPSockAddr, packet: PRUDPPacket) { - // todo: handle acks + async fn receive_packet(&self, address: PRUDPSockAddr, packet: PRUDPPacket) { + // todo: handle acks and resending + + if let Some(conn) = self.get_connection(address).await { + let mut conn = conn.lock().await; + + // reset timeout + conn.last_packet_time = Instant::now(); + } + if (packet.header.types_and_flags.get_flags() & ACK) != 0 { info!("got ack"); - if packet.header.types_and_flags.get_types() == SYN || - packet.header.types_and_flags.get_types() == CONNECT{ - if packet.header.types_and_flags.get_types() == SYN{ + + if packet.header.types_and_flags.get_types() == SYN + || packet.header.types_and_flags.get_types() == CONNECT + { + if packet.header.types_and_flags.get_types() == SYN { println!("Syn: {:?}", packet); } - if packet.header.types_and_flags.get_types() == CONNECT{ + if packet.header.types_and_flags.get_types() == CONNECT { println!("Connect: {:?}", packet); } let sender = self.connection_establishment_data_sender.lock().await; info!("redirecting ack to active connection establishment code"); - if let Some(conn) = sender.as_ref(){ + if let Some(conn) = sender.as_ref() { if let Err(e) = conn.send(packet).await { - error!("error whilest sending data to connection establishment: {}", e); + error!( + "error whilest sending data to connection establishment: {}", + e + ); } } else { error!("got connection response without the active reciever being present"); } } + return; } @@ -555,8 +643,8 @@ impl AnyInternalSocket for InternalSocket { let remote_signature = address.calculate_connection_signature(); - let packet = PRUDPPacket{ - header: PRUDPHeader{ + let packet = PRUDPPacket { + header: PRUDPHeader { source_port: self.virtual_port, destination_port: address.virtual_port, types_and_flags: TypesFlags::default().types(SYN).flags(NEED_ACK), @@ -565,16 +653,14 @@ impl AnyInternalSocket for InternalSocket { options: vec![ SupportedFunctions(0x104), MaximumSubstreamId(0), - ConnectionSignature(remote_signature) + ConnectionSignature(remote_signature), ], ..Default::default() }; - - self.send_packet_unbuffered(address, packet).await; - let Some(syn_ack_packet) = recv.recv().await else{ + let Some(syn_ack_packet) = recv.recv().await else { error!("what"); return None; }; @@ -588,10 +674,8 @@ impl AnyInternalSocket for InternalSocket { return None; }; - - - let packet = PRUDPPacket{ - header: PRUDPHeader{ + let packet = PRUDPPacket { + header: PRUDPHeader { source_port: self.virtual_port, destination_port: address.virtual_port, types_and_flags: TypesFlags::default().types(CONNECT).flags(NEED_ACK), @@ -600,22 +684,24 @@ impl AnyInternalSocket for InternalSocket { options: vec![ SupportedFunctions(0x04), MaximumSubstreamId(0), - ConnectionSignature(remote_signature) + ConnectionSignature(remote_signature), ], ..Default::default() }; self.send_packet_unbuffered(address, packet).await; - let Some(connect_ack_packet) = recv.recv().await else{ + let Some(connect_ack_packet) = recv.recv().await else { error!("what"); return None; }; - let (_, crypt) = self.crypto_handler.instantiate(remote_signature, *own_signature, &[], 1)?; + let (_, crypt) = + self.crypto_handler + .instantiate(remote_signature, *own_signature, &[], 1)?; //todo: make this work for secure servers as well - self.create_connection(crypt, address, 0).await; + self.create_connection(crypt, address, 0, true).await; Some(()) } @@ -679,39 +765,54 @@ pub trait CryptoHandler: Send + Sync + 'static { fn sign_pre_handshake(&self, packet: &mut PRUDPPacket); } -impl Deref for ExternalConnection{ +impl Deref for ExternalConnection { type Target = SendingConnection; fn deref(&self) -> &Self::Target { &self.sending } } -impl Deref for SendingConnection{ +impl Deref for SendingConnection { type Target = CommonConnection; fn deref(&self) -> &Self::Target { &self.common } } -impl ExternalConnection{ - pub async fn recv(&mut self) -> Option>{ +impl ExternalConnection { + pub async fn recv(&mut self) -> Option> { self.data_receiver.recv().await } //todo: make this an actual result instead of an option - pub fn duplicate_sender(&self) -> SendingConnection{ + pub fn duplicate_sender(&self) -> SendingConnection { self.sending.clone() } } -impl SendingConnection{ +impl SendingConnection { pub async fn send(&self, data: Vec) -> Option<()> { - println!("{}", hex::encode(&data)); - let internal = self.inernal.upgrade()?; + let internal = self.internal.upgrade()?; let mut internal = internal.lock().await; internal.send_data_packet(data).await; Some(()) } -} \ No newline at end of file + + pub async fn close_connection(&self) { + let Some(internal) = self.internal.upgrade() else { + return; + }; + + let mut internal = internal.lock().await; + + internal.close_connection().await; + } +} + +impl Drop for InternalConnection { + fn drop(&mut self) { + println!("yatta"); + } +} diff --git a/src/prudp/station_url.rs b/src/prudp/station_url.rs index f981fe8..0e20d6c 100644 --- a/src/prudp/station_url.rs +++ b/src/prudp/station_url.rs @@ -1,9 +1,13 @@ use std::net::Ipv4Addr; use log::error; -use std::fmt::{Display, Formatter, Write}; +use std::fmt::{Debug, Display, Formatter, Write}; +use std::io::Read; +use rocket::delete; use crate::prudp::station_url::Type::{PRUDP, PRUDPS, UDP}; -use crate::prudp::station_url::UrlOptions::{Address, ConnectionID, NatFiltering, NatMapping, NatType, Platform, PMP, Port, PrincipalID, RVConnectionID, StreamID, StreamType, UPNP}; - +use crate::prudp::station_url::UrlOptions::{Address, ConnectionID, NatFiltering, NatMapping, NatType, Platform, PMP, Port, PrincipalID, RVConnectionID, StreamID, StreamType, UPNP, PID}; +use crate::rmc::structures::Error::StationUrlInvalid; +use crate::rmc::structures::RmcSerialize; +#[derive(Clone, Copy, PartialEq, Eq)] pub enum Type{ UDP, PRUDP, @@ -15,6 +19,7 @@ pub mod nat_types{ pub const PUBLIC: u8 = 2; } +#[derive(Clone, Eq, PartialEq)] pub enum UrlOptions { Address(Ipv4Addr), Port(u16), @@ -29,16 +34,18 @@ pub enum UrlOptions { RVConnectionID(u32), Platform(u8), PMP(u8), + PID(u32), } +#[derive(Clone, PartialEq, Eq)] pub struct StationUrl{ pub url_type: Type, pub options: Vec } impl StationUrl{ - fn read_options(options: &str) -> Option>{ + pub fn read_options(options: &str) -> Option>{ let mut options_out = Vec::new(); for option in options.split(';'){ @@ -75,12 +82,21 @@ impl StationUrl{ "RVCID" => { options_out.push(RVConnectionID(option_value.parse().ok()?)) } + "rvcid" => { + options_out.push(RVConnectionID(option_value.parse().ok()?)) + } "pl" => { options_out.push(Platform(option_value.parse().ok()?)) } "pmp" => { options_out.push(PMP(option_value.parse().ok()?)) - } + }, + "pid" => { + options_out.push(PID(option_value.parse().ok()?)) + }, + "PID" => { + options_out.push(PID(option_value.parse().ok()?)) + }, _ => { error!("unimplemented option type, skipping: {}", option_name); } @@ -143,10 +159,12 @@ impl<'a> Into for &'a StationUrl{ RVConnectionID(v) => write!(url, "RVCID={}", v).expect("failed to write"), Platform(v) => write!(url, "pl={}", v).expect("failed to write"), PMP(v) => write!(url, "pmp={}", v).expect("failed to write"), + PID(v) => write!(url, "PID={}", v).expect("failed to write"), } + write!(url, ";").expect("failed to write"); } - url + url[0..url.len()-1].into() } } @@ -158,4 +176,24 @@ impl Display for StationUrl{ } +} + +impl RmcSerialize for StationUrl{ + fn deserialize(reader: &mut dyn Read) -> crate::rmc::structures::Result { + let str = String::deserialize(reader)?; + + Self::try_from(str.as_str()).map_err(|_| StationUrlInvalid) + } + fn serialize(&self, writer: &mut dyn std::io::Write) -> crate::rmc::structures::Result<()> { + let str: String = self.into(); + + str.serialize(writer) + } +} + +impl Debug for StationUrl{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let str: String = self.into(); + f.write_str(&str) + } } \ No newline at end of file diff --git a/src/rmc/protocols/matchmake.rs b/src/rmc/protocols/matchmake.rs new file mode 100644 index 0000000..1c2d5e5 --- /dev/null +++ b/src/rmc/protocols/matchmake.rs @@ -0,0 +1,17 @@ +use macros::{method_id, rmc_proto}; +use crate::prudp::station_url::StationUrl; +use crate::rmc::response::ErrorCode; + +#[rmc_proto(21)] +pub trait Matchmake{ + #[method_id(2)] + async fn unregister_gathering(&self, gid: u32) -> Result; + #[method_id(41)] + async fn get_session_urls(&self, gid: u32) -> Result, ErrorCode>; + + #[method_id(42)] + async fn update_session_host(&self, gid: u32, change_owner: bool) -> Result<(), ErrorCode>; + + #[method_id(44)] + async fn migrate_gathering_ownership(&self, gid: u32, candidates: Vec, participants_only: bool) -> Result<(), ErrorCode>; +} \ No newline at end of file diff --git a/src/rmc/protocols/matchmake_ext.rs b/src/rmc/protocols/matchmake_ext.rs new file mode 100644 index 0000000..d492503 --- /dev/null +++ b/src/rmc/protocols/matchmake_ext.rs @@ -0,0 +1,9 @@ +use macros::{method_id, rmc_proto}; +use crate::prudp::station_url::StationUrl; +use crate::rmc::response::ErrorCode; + +#[rmc_proto(50)] +pub trait MatchmakeExt{ + #[method_id(1)] + async fn end_participation(&self, gid: u32, message: String) -> Result; +} \ No newline at end of file diff --git a/src/rmc/protocols/matchmake_extension.rs b/src/rmc/protocols/matchmake_extension.rs new file mode 100644 index 0000000..cd3f281 --- /dev/null +++ b/src/rmc/protocols/matchmake_extension.rs @@ -0,0 +1,32 @@ +use macros::{method_id, rmc_proto}; +use crate::rmc::response::ErrorCode; +use crate::rmc::structures::matchmake::{AutoMatchmakeParam, CreateMatchmakeSessionParam, JoinMatchmakeSessionParam, MatchmakeSession}; + +#[rmc_proto(109)] +pub trait MatchmakeExtension{ + #[method_id(1)] + async fn close_participation(&self, gid: u32) -> Result<(), ErrorCode>; + + #[method_id(2)] + async fn open_participation(&self, gid: u32) -> Result<(), ErrorCode>; + + #[method_id(8)] + async fn modify_current_game_attribute(&self, gid: u32, attrib_index: u32, attrib_val: u32) -> Result<(), ErrorCode>; + + #[method_id(16)] + async fn get_playing_session(&self, pids: Vec) -> Result, ErrorCode>; + + #[method_id(34)] + async fn update_progress_score(&self, gid: u32, progress: u8) -> Result<(), ErrorCode>; + #[method_id(38)] + async fn create_matchmake_session_with_param(&self, session: CreateMatchmakeSessionParam) -> Result; + + #[method_id(39)] + async fn join_matchmake_session_with_param(&self, session: JoinMatchmakeSessionParam) -> Result; + + #[method_id(40)] + async fn auto_matchmake_with_param_postpone(&self, session: AutoMatchmakeParam) -> Result; + + #[method_id(41)] + async fn find_matchmake_session_by_gathering_id_detail(&self, gid: u32) -> Result; +} \ No newline at end of file diff --git a/src/rmc/protocols/mod.rs b/src/rmc/protocols/mod.rs index fd09e63..53e6c40 100644 --- a/src/rmc/protocols/mod.rs +++ b/src/rmc/protocols/mod.rs @@ -2,6 +2,12 @@ pub mod auth; pub mod secure; +pub mod notifications; +pub mod matchmake; +pub mod matchmake_extension; +pub mod nat_traversal; +pub mod matchmake_ext; +pub mod ranking; use crate::prudp::socket::{ExternalConnection, SendingConnection}; use crate::rmc::message::RMCMessage; @@ -261,18 +267,20 @@ async fn handle_incoming( rest_of_data } = message; - info!("got rmc request, handeling it now..."); + info!("RMC REQUEST: Proto: {}; Method: {};", protocol_id, method_id); remote.rmc_call(&sending_conn, protocol_id, method_id, call_id, rest_of_data).await; } } + + info!("rmc disconnected") } pub fn new_rmc_gateway_connection(conn: ExternalConnection, create_internal: F) -> Arc where - F: FnOnce(RmcConnection) -> T, + F: FnOnce(RmcConnection) -> Arc, { let notify = Arc::new(Notify::new()); let incoming: Arc>> = Default::default(); @@ -285,8 +293,6 @@ where let exposed_object = (create_internal)(rmc_conn); - let exposed_object = Arc::new(exposed_object); - { let exposed_object = exposed_object.clone(); tokio::spawn(async move { diff --git a/src/rmc/protocols/nat_traversal.rs b/src/rmc/protocols/nat_traversal.rs new file mode 100644 index 0000000..328a651 --- /dev/null +++ b/src/rmc/protocols/nat_traversal.rs @@ -0,0 +1,24 @@ +use macros::{method_id, rmc_proto}; +use crate::rmc::response::ErrorCode; +use crate::rmc::structures::matchmake::{CreateMatchmakeSessionParam, MatchmakeSession}; + +#[rmc_proto(3)] +pub trait NatTraversal{ + #[method_id(2)] + async fn request_probe_initiation(&self, station_to_probe: String) -> Result<(),ErrorCode>; + + #[method_id(3)] + async fn request_probe_initialization_ext(&self, target_list: Vec, station_to_probe: String) -> Result<(),ErrorCode>; + + #[method_id(4)] + async fn report_nat_traversal_result(&self, cid: u32, result: bool, rtt: u32) -> Result<(),ErrorCode>; + + #[method_id(5)] + async fn report_nat_properties(&self, nat_mapping: u32, nat_filtering: u32, rtt: u32) -> Result<(),ErrorCode>; +} + +#[rmc_proto(3, NoReturn)] +pub trait NatTraversalConsole{ + #[method_id(2)] + async fn request_probe_initiation(&self, station_to_probe: String); +} \ No newline at end of file diff --git a/src/rmc/protocols/notifications.rs b/src/rmc/protocols/notifications.rs new file mode 100644 index 0000000..416238a --- /dev/null +++ b/src/rmc/protocols/notifications.rs @@ -0,0 +1,26 @@ +use macros::{method_id, rmc_proto, rmc_struct, RmcSerialize}; +use crate::rmc::response::ErrorCode; +use crate::rmc::structures::qresult::QResult; + +pub mod notification_types{ + pub const OWNERSHIP_CHANGED: u32 = 4000; + pub const HOST_CHANGED: u32 = 110000; +} + +#[derive(RmcSerialize, Debug, Default, Clone)] +#[rmc_struct(0)] +pub struct NotificationEvent{ + pub pid_source: u32, + pub notif_type: u32, + pub param_1: u32, + pub param_2: u32, + pub str_param: String, + pub param_3: u32, +} + +#[rmc_proto(14, NoReturn)] +pub trait Notification { + #[method_id(1)] + async fn process_notification_event(&self, event: NotificationEvent); +} + diff --git a/src/rmc/protocols/ranking.rs b/src/rmc/protocols/ranking.rs new file mode 100644 index 0000000..974af41 --- /dev/null +++ b/src/rmc/protocols/ranking.rs @@ -0,0 +1,5 @@ +use macros::{method_id, rmc_proto}; + +#[rmc_proto(112)] +pub trait Ranking{ +} \ No newline at end of file diff --git a/src/rmc/protocols/secure.rs b/src/rmc/protocols/secure.rs index fa11c35..39187bf 100644 --- a/src/rmc/protocols/secure.rs +++ b/src/rmc/protocols/secure.rs @@ -6,7 +6,9 @@ use crate::rmc::structures::connection_data::ConnectionData; use crate::rmc::structures::qresult::QResult; #[rmc_proto(11)] -pub trait Auth { +pub trait Secure { #[method_id(1)] - async fn register(&self, station_urls: Vec) -> Result<(QResult, u32, String), ErrorCode>; + async fn register(&self, station_urls: Vec) -> Result<(QResult, u32, StationUrl), ErrorCode>; + #[method_id(7)] + async fn replace_url(&self, target: StationUrl, dest: StationUrl) -> Result<(), ErrorCode>; } diff --git a/src/rmc/response.rs b/src/rmc/response.rs index f47ee66..f63c345 100644 --- a/src/rmc/response.rs +++ b/src/rmc/response.rs @@ -13,9 +13,6 @@ use crate::prudp::socket::{ExternalConnection, SendingConnection}; use crate::rmc::response::ErrorCode::Core_Exception; use crate::rmc::structures::qresult::ERROR_MASK; use crate::rmc::structures::RmcSerialize; -use crate::web::DirectionalData::{Incoming, Outgoing}; -use crate::web::WEB_DATA; - pub enum RMCResponseResult { Success { call_id: u32, @@ -154,8 +151,7 @@ pub async fn send_result( method_id: u32, call_id: u32, ) { - - println!("{}", hex::encode(result.clone().unwrap())); + let response_result = match result { Ok(v) => RMCResponseResult::Success { call_id, diff --git a/src/rmc/structures/matchmake.rs b/src/rmc/structures/matchmake.rs index f11b432..28e851b 100644 --- a/src/rmc/structures/matchmake.rs +++ b/src/rmc/structures/matchmake.rs @@ -1,6 +1,6 @@ -use macros::RmcSerialize; use crate::kerberos::KerberosDateTime; use crate::rmc::structures::variant::Variant; +use macros::RmcSerialize; // rmc structure #[derive(RmcSerialize, Debug, Clone, Default)] @@ -25,7 +25,6 @@ pub struct MatchmakeParam { pub params: Vec<(String, Variant)>, } - // rmc structure #[derive(RmcSerialize, Debug, Clone, Default)] #[rmc_struct(3)] @@ -92,5 +91,36 @@ pub struct CreateMatchmakeSessionParam { pub create_matchmake_session_option: u32, pub join_message: String, pub participation_count: u16, - +} + +#[derive(RmcSerialize, Debug, Clone)] +#[rmc_struct(0)] +pub struct MatchmakeBlockListParam { + option_flag: u32, +} + +#[derive(RmcSerialize, Debug, Clone)] +#[rmc_struct(0)] +pub struct JoinMatchmakeSessionParam { + pub gid: u32, + pub additional_participants: Vec, + pub gid_for_participation_check: u32, + pub join_matchmake_session_open: u32, + pub join_matchmake_session_behavior: u8, + pub user_password: String, + pub system_password: String, + pub join_message: String, + pub participation_count: u16, + //pub extra_participant: u16, + //pub block_list_param: MatchmakeBlockListParam +} + +pub mod gathering_flags { + pub const PERSISTENT_GATHERING: u32 = 0x1; + pub const DISCONNECT_CHANGE_OWNER: u32 = 0x10; + pub const PERSISTENT_GATHERING_LEAVE_PARTICIPATION: u32 = 0x40; + pub const PERSISTENT_GATHERING_ALLOW_ZERO_USERS: u32 = 0x80; + pub const PARTICIPANTS_CHANGE_OWNER: u32 = 0x200; + pub const VERBOSE_PARTICIPANTS: u32 = 0x400; + pub const VERBOSE_PARTICIPANTS_EX: u32 = 0x800; } diff --git a/src/rmc/structures/mod.rs b/src/rmc/structures/mod.rs index a9f9ab6..d1ed8b3 100644 --- a/src/rmc/structures/mod.rs +++ b/src/rmc/structures/mod.rs @@ -15,6 +15,8 @@ pub enum Error{ UnexpectedValue(u64), #[error("version mismatch: {0}")] VersionMismatch(u8), + #[error("an error occurred reading the station url")] + StationUrlInvalid } pub type Result = std::result::Result; diff --git a/src/rmc/structures/primitives.rs b/src/rmc/structures/primitives.rs index e8855f0..7c345d5 100644 --- a/src/rmc/structures/primitives.rs +++ b/src/rmc/structures/primitives.rs @@ -13,6 +13,16 @@ impl RmcSerialize for u8{ } } +impl RmcSerialize for i8{ + fn serialize(&self, writer: &mut dyn Write) -> crate::rmc::structures::Result<()> { + Ok(writer.write_all(bytes_of(self))?) + } + + fn deserialize(mut reader: &mut dyn Read) -> crate::rmc::structures::Result { + Ok(reader.read_struct(IS_BIG_ENDIAN)?) + } +} + impl RmcSerialize for u16{ fn serialize(&self, writer: &mut dyn Write) -> crate::rmc::structures::Result<()> { Ok(writer.write_all(bytes_of(self))?) @@ -23,6 +33,16 @@ impl RmcSerialize for u16{ } } +impl RmcSerialize for i16{ + fn serialize(&self, writer: &mut dyn Write) -> crate::rmc::structures::Result<()> { + Ok(writer.write_all(bytes_of(self))?) + } + + fn deserialize(mut reader: &mut dyn Read) -> crate::rmc::structures::Result { + Ok(reader.read_struct(IS_BIG_ENDIAN)?) + } +} + impl RmcSerialize for u32{ fn serialize(&self, writer: &mut dyn Write) -> crate::rmc::structures::Result<()> { Ok(writer.write_all(bytes_of(self))?) @@ -33,6 +53,16 @@ impl RmcSerialize for u32{ } } +impl RmcSerialize for i32{ + fn serialize(&self, writer: &mut dyn Write) -> crate::rmc::structures::Result<()> { + Ok(writer.write_all(bytes_of(self))?) + } + + fn deserialize(mut reader: &mut dyn Read) -> crate::rmc::structures::Result { + Ok(reader.read_struct(IS_BIG_ENDIAN)?) + } +} + impl RmcSerialize for u64{ fn serialize(&self, writer: &mut dyn Write) -> crate::rmc::structures::Result<()> { Ok(writer.write_all(bytes_of(self))?) diff --git a/src/web/mod.rs b/src/web/mod.rs index 3461257..ee43898 100644 --- a/src/web/mod.rs +++ b/src/web/mod.rs @@ -1,37 +1,97 @@ use std::net::SocketAddrV4; +use std::sync::Arc; +use async_trait::async_trait; use once_cell::sync::Lazy; -use rocket::{get, routes, Rocket}; +use rocket::{get, routes, Request, Rocket, State}; +use rocket::request::{FromRequest, Outcome}; use rocket::serde::json::Json; use tokio::task::JoinHandle; use serde::Serialize; use tokio::sync::Mutex; +use crate::nex::matchmake::MatchmakeManager; +use crate::rmc::protocols::HasRmcConnection; +use crate::rmc::protocols::notifications::NotificationEvent; -#[get("/")] -async fn server_data() -> Json { - Json(WEB_DATA.lock().await.clone()) +struct RnexApiAuth; + +#[async_trait] +impl<'r> FromRequest<'r> for RnexApiAuth{ + + type Error = (); + async fn from_request<'a>(request: &'r Request<'a>) -> Outcome { + Outcome::Success(RnexApiAuth) + } } -pub async fn start_web() -> JoinHandle<()>{ - tokio::spawn(async{ + +#[get("/gatherings")] +async fn gatherings(mmm: &State>) -> Json>{ + let matches = mmm.sessions.read().await; + + Json(matches.keys().map(|v| *v).collect()) +} + +#[get("/gathering//players")] +async fn players_in_match(mmm: &State>, gid: u32) -> Option>>{ + let mmm = mmm.sessions.read().await; + + let gathering = mmm.get(&gid)?; + + let gathering = gathering.clone(); + + drop(mmm); + + let gathering = gathering.lock().await; + + Some(Json(gathering.connected_players.iter().filter_map(|p| p.upgrade()).map(|p| p.pid).collect())) +} + +#[get("/player//disconnect")] +async fn disconnect_player(_auth: RnexApiAuth, mmm: &State>, pid: u32) -> Option<()>{ + // this doesnt work and is broken, there might be some other way to remotely close gatherings... + // also if anyone gets this working change it to POST cause the only reason its get is because + // that makes testing it easier + let mmm = mmm.users.read().await; + + for player in mmm.values().filter_map(|p| p.upgrade()).filter(|p| p.pid == pid) { + player.remote.get_connection().0.close_connection().await; + } + + + Some(()) +} + +#[get("/gathering//close")] +async fn close_gathering(_auth: RnexApiAuth, mmm: &State>, gid: u32) -> Option<()>{ + // this doesnt work and is broken, there might be some other way to remotely close gatherings... + // also if anyone gets this working change it to POST cause the only reason its get is because + // that makes testing it easier + let mmm = mmm.sessions.read().await; + + let gathering = mmm.get(&gid)?; + + let gathering = gathering.clone(); + + drop(mmm); + + let gathering = gathering.lock().await; + + gathering.broadcast_notification(&NotificationEvent{ + pid_source: gathering.session.gathering.owner_pid, + notif_type: 109000, + param_1: gathering.session.gathering.self_gid, + ..Default::default() + }).await; + + Some(()) +} + +pub async fn start_web(mgr: Arc) -> JoinHandle<()> { + tokio::spawn(async move { rocket::build() - .mount("/",routes![server_data]) + .mount("/", routes![gatherings, players_in_match, close_gathering, disconnect_player]) + .manage(mgr) .launch().await .expect("unable to start webserver"); }) -} -#[derive(Serialize, Clone)] -pub enum DirectionalData{ - Incoming(String), - Outgoing(String) -} - -#[derive(Serialize, Default, Clone)] -pub struct WebData{ - pub data: Vec<(SocketAddrV4, DirectionalData)> -} - -pub static WEB_DATA: Lazy> = Lazy::new(|| Mutex::new( - WebData{ - data: Vec::new(), - } -)); \ No newline at end of file +} \ No newline at end of file