From 384f5abca5edf11ba5de85df3ad7db897fb5af45 Mon Sep 17 00:00:00 2001 From: DJMrTV Date: Mon, 12 May 2025 10:28:54 +0200 Subject: [PATCH] feat: private battles --- Cargo.lock | 25 ++ Cargo.toml | 1 + macros/src/protos.rs | 69 ++++- src/main.rs | 63 ++-- src/nex-implementation/server.rs | 2 - src/nex/matchmake.rs | 122 ++++++++ src/nex/mod.rs | 4 +- src/nex/remote_console.rs | 23 ++ src/nex/user.rs | 378 +++++++++++++++++++++-- src/prudp/socket.rs | 51 +-- src/prudp/station_url.rs | 50 ++- src/rmc/protocols/matchmake.rs | 11 + src/rmc/protocols/matchmake_extension.rs | 20 ++ src/rmc/protocols/mod.rs | 10 +- src/rmc/protocols/nat_traversal.rs | 15 + src/rmc/protocols/notifications.rs | 21 ++ src/rmc/protocols/secure.rs | 6 +- src/rmc/response.rs | 3 +- src/rmc/structures/matchmake.rs | 23 +- src/rmc/structures/mod.rs | 2 + src/rmc/structures/primitives.rs | 30 ++ 21 files changed, 832 insertions(+), 97 deletions(-) create mode 100644 src/nex/matchmake.rs create mode 100644 src/nex/remote_console.rs create mode 100644 src/rmc/protocols/matchmake.rs create mode 100644 src/rmc/protocols/matchmake_extension.rs create mode 100644 src/rmc/protocols/nat_traversal.rs create mode 100644 src/rmc/protocols/notifications.rs diff --git a/Cargo.lock b/Cargo.lock index 7fabb1b..08dd9dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -521,6 +521,7 @@ checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" dependencies = [ "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -543,12 +544,34 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.98", +] + [[package]] name = "futures-sink" version = "0.3.31" @@ -570,6 +593,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-io", + "futures-macro", "futures-sink", "futures-task", "memchr", @@ -1866,6 +1890,7 @@ dependencies = [ "bytemuck", "chrono", "dotenv", + "futures", "hex", "hmac", "log", diff --git a/Cargo.toml b/Cargo.toml index edbb65b..9e95990 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,7 @@ serde = { version = "1.0.217", features = ["derive"] } async-trait = "0.1.86" paste = "1.0.15" typenum = "1.18.0" +futures = "0.3.31" [build-dependencies] tonic-build = "0.12.3" diff --git a/macros/src/protos.rs b/macros/src/protos.rs index 554df22..32d04c3 100644 --- a/macros/src/protos.rs +++ b/macros/src/protos.rs @@ -1,7 +1,7 @@ use proc_macro2::{Ident, Span, TokenStream, TokenTree}; use quote::{quote, ToTokens}; -use syn::{LitInt, ReturnType, Token, Type}; -use syn::token::{Brace, Paren, Semi}; +use syn::{LitInt, LitStr, ReturnType, Token, Type}; +use syn::token::{Brace, Bracket, Paren, Semi}; pub struct ProtoMethodData{ pub id: LitInt, @@ -61,9 +61,11 @@ impl RmcProtocolData{ quote!{ &self, data: ::std::vec::Vec }.to_tokens(tokens); }); - quote!{ - -> ::core::result::Result, ErrorCode> - }.to_tokens(tokens); + if self.has_returns { + quote! { + -> ::core::result::Result, ErrorCode> + }.to_tokens(tokens); + } Brace::default().surround(tokens, |tokens|{ quote! { let mut cursor = ::std::io::Cursor::new(data); }.to_tokens(tokens); @@ -73,10 +75,27 @@ impl RmcProtocolData{ let Ok(#param_name) = <#param_type as crate::rmc::structures::RmcSerialize>::deserialize( &mut cursor - ) else { - return Err(crate::rmc::response::ErrorCode::Core_InvalidArgument); - }; - }.to_tokens(tokens) + ) else + }.to_tokens(tokens); + + let error_msg = LitStr::new(&format!("an error occurred whilest deserializing {}", param_name), Span::call_site()); + + if self.has_returns { + quote! { + { + log::error!(#error_msg); + return Err(crate::rmc::response::ErrorCode::Core_InvalidArgument); + }; + }.to_tokens(tokens) + } else { + quote! { + { + log::error!(#error_msg); + return; + }; + }.to_tokens(tokens) + } + } quote!{ @@ -129,13 +148,28 @@ impl RmcProtocolData{ let raw_name = Ident::new(&format!("raw_{}", name), name.span()); + quote!{ #id => self.#raw_name(data).await, }.to_tokens(tokens); } quote!{ - _ => Err(crate::rmc::response::ErrorCode::Core_NotImplemented) + v => }.to_tokens(tokens); + + + + Brace::default().surround(tokens, |tokens|{ + quote!{ + log::error!("(protocol {})unimplemented method id called on protocol: {}", #id, v); + }.to_tokens(tokens); + if self.has_returns { + quote! { + Err(crate::rmc::response::ErrorCode::Core_NotImplemented) + }.to_tokens(tokens); + } + }); + }); Semi::default().to_tokens(tokens); @@ -155,7 +189,7 @@ impl RmcProtocolData{ }); quote!{ - impl RawAuth for T{} + impl #raw_name for T{} }.to_tokens(tokens); } @@ -218,8 +252,17 @@ impl RmcProtocolData{ &#param_name, &mut cursor ) - ).ok_or(crate::rmc::response::ErrorCode::Core_InvalidArgument)?; - }.to_tokens(tokens) + ).ok_or(crate::rmc::response::ErrorCode::Core_InvalidArgument) + }.to_tokens(tokens); + if self.has_returns { + quote! { + ?; + }.to_tokens(tokens) + } else { + quote! { + ; + }.to_tokens(tokens) + } } quote!{ diff --git a/src/main.rs b/src/main.rs index 08f3b87..e71b062 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,6 +9,8 @@ use crate::nex::account::Account; use crate::nex::auth_handler::{AuthHandler, RemoteAuthClientProtocol}; +use crate::nex::remote_console::RemoteConsole; +use crate::nex::user::{RemoteUserProtocol, User}; use crate::prudp::packet::VirtualPort; use crate::prudp::router::Router; use crate::prudp::secure::Secure; @@ -18,10 +20,12 @@ use crate::rmc::protocols::auth::Auth; use crate::rmc::protocols::auth::RawAuth; use crate::rmc::protocols::auth::RawAuthInfo; use crate::rmc::protocols::auth::RemoteAuth; -use crate::rmc::protocols::{new_rmc_gateway_connection, OnlyRemote}; +use crate::rmc::protocols::matchmake_extension::RemoteMatchmakeExtension; +use crate::rmc::protocols::{new_rmc_gateway_connection, OnlyRemote, RemoteInstantiatable}; use crate::rmc::response::ErrorCode; use crate::rmc::structures::any::Any; use crate::rmc::structures::connection_data::ConnectionData; +use crate::rmc::structures::matchmake::{CreateMatchmakeSessionParam, Gathering, MatchmakeParam, MatchmakeSession}; use crate::rmc::structures::qresult::QResult; use chrono::{Local, SecondsFormat}; use log::{error, info}; @@ -35,10 +39,14 @@ use std::marker::PhantomData; use std::net::{Ipv4Addr, SocketAddrV4}; use std::ops::{BitAnd, BitOr}; use std::str::FromStr; +use std::sync::Arc; use std::time::Duration; use std::{env, fs}; +use std::sync::atomic::AtomicU32; use tokio::task::JoinHandle; -use crate::nex::user::User; +use crate::kerberos::KerberosDateTime; +use crate::nex::matchmake::MatchmakeManager; +use crate::rmc::protocols::secure::RemoteSecure; mod endianness; mod prudp; @@ -284,10 +292,12 @@ async fn start_auth() -> JoinHandle<()> { info!("new connected user!"); - let _ = new_rmc_gateway_connection(conn, |_| AuthHandler { - destination_server_acct: &SECURE_SERVER_ACCOUNT, - build_name: "branch:origin/project/wup-agmj build:3_8_15_2004_0", - station_url: &SECURE_STATION_URL, + let _ = new_rmc_gateway_connection(conn, |_| { + Arc::new(AuthHandler { + destination_server_acct: &SECURE_SERVER_ACCOUNT, + build_name: "branch:origin/project/wup-agmj build:3_8_15_2004_0", + station_url: &SECURE_STATION_URL, + }) }); } }) @@ -295,6 +305,13 @@ async fn start_auth() -> JoinHandle<()> { async fn start_secure() -> JoinHandle<()> { tokio::spawn(async { + let mmm = Arc::new(MatchmakeManager{ + gid_counter: AtomicU32::new(1), + sessions: Default::default(), + users: Default::default(), + rv_cid_counter: AtomicU32::new(1), + }); + let (router_secure, _) = Router::new(SocketAddrV4::new(*OWN_IP_PRIVATE, *SECURE_SERVER_PORT)) .await @@ -304,6 +321,7 @@ async fn start_secure() -> JoinHandle<()> { .add_socket( VirtualPort::new(1, 10), Secure("6f599f81", &SECURE_SERVER_ACCOUNT), + //Unsecure("6f599f81"), ) .await .expect("unable to add socket"); @@ -318,19 +336,25 @@ async fn start_secure() -> JoinHandle<()> { info!("new connected user on secure :D!"); - let ip = conn.socket_addr.regular_socket_addr; + let ip = conn.socket_addr; let pid = conn.user_id; - let _ = new_rmc_gateway_connection(conn, |_| User { - ip, - pid + let _ = new_rmc_gateway_connection(conn, |r| { + Arc::new_cyclic(|w| User { + ip, + pid, + this: w.clone(), + remote: RemoteConsole::new(r), + station_url: Default::default(), + matchmake_manager: mmm.clone() + }) }); } }) } async fn start_test() { - let addr = SocketAddrV4::new(*OWN_IP_PRIVATE, *AUTH_SERVER_PORT); + let addr = SocketAddrV4::new(*OWN_IP_PRIVATE, *SECURE_SERVER_PORT); let virt_addr = VirtualPort::new(1, 10); let prudp_addr = PRUDPSockAddr::new(addr, virt_addr); @@ -346,15 +370,12 @@ async fn start_test() { let conn = socket_secure.connect(prudp_addr).await.unwrap(); - let remote = - new_rmc_gateway_connection(conn, |r| OnlyRemote::::new(r)); + let remote = new_rmc_gateway_connection(conn, |r| { + Arc::new(OnlyRemote::::new(r)) + }); - let v = remote - .login_ex("1469690705".to_string(), Any::default()) - .await - .unwrap(); - - println!("got it"); + tokio::time::sleep(Duration::from_secs(1)).await; + let urls = vec!["prudp:/address=192.168.178.45;port=60146;Pl=2;natf=0;natm=0;pmp=0;sid=15;upnp=0".to_owned()]; } async fn start_servers() { @@ -364,10 +385,12 @@ async fn start_servers() { let secure_server = start_secure().await; //let web_server = web::start_web().await; - //tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_secs(1)).await; //start_test().await; + + #[cfg(feature = "auth")] auth_server.await.expect("auth server crashed"); #[cfg(feature = "secure")] diff --git a/src/nex-implementation/server.rs b/src/nex-implementation/server.rs index dbf93de..b864e60 100644 --- a/src/nex-implementation/server.rs +++ b/src/nex-implementation/server.rs @@ -32,8 +32,6 @@ impl RMCProtocolServer{ error!("error reading rmc message"); return; }; - - println!("got rmc message {},{}", rmc.protocol_id, rmc.method_id); for proto in &self.0 { if let Some(response) = proto(&rmc, &socket, &connection).await { diff --git a/src/nex/matchmake.rs b/src/nex/matchmake.rs new file mode 100644 index 0000000..272ee0c --- /dev/null +++ b/src/nex/matchmake.rs @@ -0,0 +1,122 @@ +use std::collections::HashMap; +use std::sync::{Arc, Weak}; +use std::sync::atomic::AtomicU32; +use std::sync::atomic::Ordering::{Relaxed, Release}; +use rand::random; +use tokio::sync::{Mutex, RwLock}; +use crate::kerberos::KerberosDateTime; +use crate::nex::user::User; +use crate::rmc::protocols::notifications::{NotificationEvent, RemoteNotification}; +use crate::rmc::structures::matchmake::{Gathering, MatchmakeParam, MatchmakeSession}; +use crate::rmc::structures::variant::Variant; + +pub struct MatchmakeManager{ + pub gid_counter: AtomicU32, + pub sessions: RwLock>>>, + pub rv_cid_counter: AtomicU32, + pub users: RwLock>> +} + +impl MatchmakeManager{ + pub fn next_gid(&self) -> u32{ + self.gid_counter.fetch_add(1, Relaxed) + } + + pub fn next_cid(&self) -> u32{ + self.rv_cid_counter.fetch_add(1, Relaxed) + } +} + + +#[derive(Default, Debug)] +pub struct ExtendedMatchmakeSession{ + pub session: MatchmakeSession, + pub connected_players: Vec>, +} + +impl ExtendedMatchmakeSession{ + pub async fn from_matchmake_session(gid: u32, session: MatchmakeSession, host: &Weak) -> Self{ + let Some(host) = host.upgrade() else{ + return Default::default(); + }; + + + let mm_session = MatchmakeSession{ + gathering: Gathering{ + self_gid: 1, + owner_pid: host.pid, + host_pid: host.pid, + ..session.gathering.clone() + }, + datetime: KerberosDateTime::now(), + session_key: vec![16, 118, 112, 238, 158, 122, 106, 219, 196, 238, 34, 21, 228, 127, 137, 75, 198, 215, 192, 113, 84, 157, 53, 144, 210, 99, 233, 179, 232, 113, 203, 64],//(0..32).map(|_| random()).collect(), + matchmake_param: MatchmakeParam{ + params: vec![ + ("@SR".to_owned(), Variant::Bool(true)), + ("@GIR".to_owned(), Variant::SInt64(3)) + ] + }, + system_password_enabled: false, + ..session + }; + + Self{ + session: mm_session, + connected_players: Default::default() + } + } + + pub async fn add_player(&mut self, conn: Weak, join_msg: String) { + let Some(arc_conn) = conn.upgrade() else { + return + }; + + let joining_pid = arc_conn.pid; + + let old_particip = self.connected_players.clone(); + + self.connected_players.push(conn); + self.session.participation_count = self.connected_players.len() as u32; + + + for other_connection in &self.connected_players{ + let Some(other_conn) = other_connection.upgrade() else { + continue; + }; + + + let other_pid = other_conn.pid; + /*if other_pid == self.session.gathering.owner_pid && + joining_pid == self.session.gathering.owner_pid{ + continue; + }*/ + + other_conn.remote.process_notification_event(NotificationEvent{ + pid_source: joining_pid, + notif_type: 3001, + param_1: self.session.gathering.self_gid, + param_2: other_pid, + str_param: join_msg.clone(), + param_3: self.connected_players.len() as _ + }).await; + } + + for old_conns in &old_particip{ + let Some(old_conns) = old_conns.upgrade() else { + continue; + }; + + + let older_pid = old_conns.pid; + + arc_conn.remote.process_notification_event(NotificationEvent{ + pid_source: joining_pid, + notif_type: 3001, + param_1: self.session.gathering.self_gid, + param_2: older_pid, + str_param: join_msg.clone(), + param_3: self.connected_players.len() as _ + }).await; + } + } +} \ No newline at end of file diff --git a/src/nex/mod.rs b/src/nex/mod.rs index 623d6d8..76791bc 100644 --- a/src/nex/mod.rs +++ b/src/nex/mod.rs @@ -1,3 +1,5 @@ pub mod account; pub mod auth_handler; -pub mod user; \ No newline at end of file +pub mod user; +pub mod remote_console; +pub mod matchmake; \ No newline at end of file diff --git a/src/nex/remote_console.rs b/src/nex/remote_console.rs new file mode 100644 index 0000000..8868359 --- /dev/null +++ b/src/nex/remote_console.rs @@ -0,0 +1,23 @@ +use macros::rmc_struct; +use crate::rmc::protocols::notifications::{Notification, NotificationEvent, RawNotification, RawNotificationInfo, RemoteNotification}; +use crate::rmc::protocols::nat_traversal::{NatTraversal, RemoteNatTraversal, RawNatTraversalInfo, RawNatTraversal}; +use crate::define_rmc_proto; +use crate::nex::user::RemoteUserProtocol; + +define_rmc_proto!( + proto Console{ + Notification, + NatTraversal + } +); +/* +#[rmc_struct(Console)] +pub struct TestRemoteConsole{ + pub remote: RemoteUserProtocol, +} + +impl Notification for TestRemoteConsole{ + async fn process_notification_event(&self, event: NotificationEvent) { + println!("NOTIF RECIEVED: {:?}", event); + } +}*/ \ No newline at end of file diff --git a/src/nex/user.rs b/src/nex/user.rs index 77ebfb6..0e21e99 100644 --- a/src/nex/user.rs +++ b/src/nex/user.rs @@ -1,45 +1,379 @@ -use std::net::{Ipv4Addr, SocketAddrV4}; -use macros::rmc_struct; +use std::io::ErrorKind::HostUnreachable; use crate::define_rmc_proto; -use crate::prudp::station_url::{nat_types, StationUrl}; -use crate::prudp::station_url::Type::PRUDPS; -use crate::prudp::station_url::UrlOptions::{Address, NatFiltering, NatMapping, NatType, Port, PrincipalID, RVConnectionID}; -use crate::rmc::protocols::secure::{RemoteAuth, RawAuthInfo, RawAuth, Auth}; +use crate::nex::matchmake::{ExtendedMatchmakeSession, MatchmakeManager}; +use crate::nex::remote_console::RemoteConsole; +use crate::prudp::sockaddr::PRUDPSockAddr; +use crate::prudp::station_url::Type::{PRUDP, PRUDPS}; +use crate::prudp::station_url::UrlOptions::{ + Address, NatFiltering, NatMapping, NatType, Platform, Port, PrincipalID, RVConnectionID, + StreamID, PMP, UPNP, +}; +use crate::prudp::station_url::{nat_types, StationUrl, Type}; +use crate::rmc::protocols::matchmake::{ + Matchmake, RawMatchmake, RawMatchmakeInfo, RemoteMatchmake, +}; +use crate::rmc::protocols::matchmake_extension::{ + MatchmakeExtension, RawMatchmakeExtension, RawMatchmakeExtensionInfo, RemoteMatchmakeExtension, +}; +use crate::rmc::protocols::nat_traversal::{ + NatTraversal, RawNatTraversal, RawNatTraversalInfo, RemoteNatTraversal, +}; +use crate::rmc::protocols::secure::{RawSecure, RawSecureInfo, RemoteSecure, Secure}; use crate::rmc::response::ErrorCode; +use crate::rmc::structures::matchmake::{AutoMatchmakeParam, CreateMatchmakeSessionParam, JoinMatchmakeSessionParam, MatchmakeSession}; use crate::rmc::structures::qresult::QResult; +use macros::rmc_struct; +use std::net::{Ipv4Addr, SocketAddrV4}; +use std::sync::{Arc, Weak}; +use log::{error, info}; +use rocket::http::ext::IntoCollection; +use tokio::sync::{Mutex, RwLock}; +use crate::prudp::station_url::nat_types::PUBLIC; +use crate::rmc::response::ErrorCode::{Core_Exception, Core_InvalidArgument, RendezVous_AccountExpired, RendezVous_SessionVoid}; define_rmc_proto!( proto UserProtocol{ - Auth + Secure, + MatchmakeExtension, + Matchmake, + NatTraversal } ); #[rmc_struct(UserProtocol)] pub struct User { pub pid: u32, - pub ip: SocketAddrV4, + pub ip: PRUDPSockAddr, + pub this: Weak, + pub remote: RemoteConsole, + pub station_url: RwLock>, + pub matchmake_manager: Arc, } -impl Auth for User{ - async fn register(&self, station_urls: Vec) -> Result<(QResult, u32, String), ErrorCode> { - let public_station = StationUrl{ - url_type: PRUDPS, - options: vec![ - RVConnectionID(0), - Address(*self.ip.ip()), - Port(self.ip.port()), - NatFiltering(0), - NatMapping(0), - NatType(nat_types::BEHIND_NAT), - PrincipalID(self.pid), - ] +impl Secure for User { + async fn register( + &self, + station_urls: Vec, + ) -> Result<(QResult, u32, StationUrl), ErrorCode> { + let cid = self.matchmake_manager.next_cid(); + + println!("{:?}", station_urls); + + let mut users = self.matchmake_manager.users.write().await; + users.insert(cid, self.this.clone()); + drop(users); + + let mut public_station: Option = None; + let mut private_station: Option = None; + + for station in station_urls{ + let is_public = station.options.iter().any(|v| { + if let NatType(v) = v { + if *v & PUBLIC != 0 { + return true; + } + } + false + }); + + let Some(nat_filtering) = station.options.iter().find_map(|v| match v { + NatFiltering(v) => Some(v), + _ => None + }) else { + return Err(Core_Exception); + }; + + let Some(nat_mapping) = station.options.iter().find_map(|v| match v { + NatMapping(v) => Some(v), + _ => None + }) else { + return Err(Core_Exception); + }; + + if !is_public || (*nat_filtering == 0 && *nat_mapping == 0){ + private_station = Some(station.clone()); + } + + if is_public{ + public_station = Some(station); + } + } + + let Some(mut private_station) = private_station else { + return Err(Core_Exception); }; + let mut public_station = if let Some(public_station) = public_station{ + public_station + } else { + let mut public_station = private_station.clone(); + + public_station.options.retain(|v| { + match v { + Address(_) | Port(_) | NatFiltering(_) | NatMapping(_) | NatType(_) => false, + _ => true + } + }); + + public_station.options.push(Address(*self.ip.regular_socket_addr.ip())); + public_station.options.push(Port(self.ip.regular_socket_addr.port())); + public_station.options.push(NatFiltering(0)); + public_station.options.push(NatMapping(0)); + public_station.options.push(NatType(3)); + + public_station + }; + + let mut both = [&mut public_station, &mut private_station]; + + for station in both{ + station.options.retain(|v| { + match v { + PrincipalID(_) | RVConnectionID(_) => false, + _ => true + } + }); + + station.options.push(PrincipalID(self.pid)); + station.options.push(RVConnectionID(cid)); + } + + + let mut lock = self.station_url.write().await; + *lock = vec![ + public_station.clone(), + private_station + ]; + drop(lock); + let result = QResult::success(ErrorCode::Core_Unknown); - Ok((result, 0, public_station.to_string())) + let out = public_station.to_string(); + + println!("out: {}", out); + + Ok((result, cid, public_station)) + } + + async fn replace_url(&self, target_url: StationUrl, dest: StationUrl) -> Result<(), ErrorCode> { + let mut lock = self.station_url.write().await; + + let Some(target_addr) = target_url.options.iter().find(|v| matches!(v, Address(_))) else{ + return Err(ErrorCode::Core_InvalidArgument); + }; + + let Some(target_port) = target_url.options.iter().find(|v| matches!(v, Port(_))) else{ + return Err(ErrorCode::Core_InvalidArgument); + }; + + let Some(replacement_target) = lock.iter_mut().find(|url| { + url.options.iter().any(|o| o == target_addr) && + url.options.iter().any(|o| o == target_port) + }) else { + return Err(ErrorCode::Core_InvalidArgument); + }; + *replacement_target = dest; + + drop(lock); + + Ok(()) } } +impl MatchmakeExtension for User { + async fn get_playing_session(&self, pids: Vec) -> Result, ErrorCode> { + Ok(Vec::new()) + } + + async fn update_progress_score(&self, gid: u32, progress: u8) -> Result<(), ErrorCode> { + let mut sessions = self.matchmake_manager.sessions.read().await; + + let Some(session) = sessions.get(&gid) else { + return Err(RendezVous_SessionVoid); + }; + + let session = session.clone(); + drop(sessions); + + let mut session = session.lock().await; + + session.session.progress_score = progress; + + Ok(()) + } + + async fn create_matchmake_session_with_param( + &self, + session: CreateMatchmakeSessionParam, + ) -> Result { + println!("{:?}", session); + + let gid = self.matchmake_manager.next_gid(); + + let mut new_session = ExtendedMatchmakeSession::from_matchmake_session( + gid, + session.matchmake_session, + &self.this.clone(), + ) + .await; + + new_session.session.participation_count = session.participation_count as u32; + new_session + .add_player(self.this.clone(), session.join_message) + .await; + + let session = new_session.session.clone(); + + let mut sessions = self.matchmake_manager.sessions.write().await; + sessions.insert(gid, Arc::new(Mutex::new(new_session))); + drop(sessions); + + Ok(session) + } + + async fn join_matchmake_session_with_param( + &self, + join_session_param: JoinMatchmakeSessionParam, + ) -> Result { + let mut sessions = self.matchmake_manager.sessions.read().await; + + let Some(session) = sessions.get(&join_session_param.gid) else { + return Err(ErrorCode::RendezVous_SessionVoid); + }; + + let session = session.clone(); + drop(sessions); + + let mut session = session.lock().await; + + session.connected_players.retain(|v| v.upgrade().is_some_and(|v| v.pid != self.pid)); + + session + .add_player(self.this.clone(), join_session_param.join_message) + .await; + + let mm_session = session.session.clone(); + + Ok(mm_session) + } + + async fn auto_matchmake_with_param_postpone(&self, session: AutoMatchmakeParam) -> Result { + println!("{:?}", session.search_criteria); + + let AutoMatchmakeParam{ + join_message, + participation_count, + gid_for_participation_check, + matchmake_session, + additional_participants, + .. + } = session; + + self.create_matchmake_session_with_param(CreateMatchmakeSessionParam{ + join_message, + participation_count, + gid_for_participation_check, + create_matchmake_session_option: 0, + matchmake_session, + additional_participants + }).await + } +} + +impl Matchmake for User { + async fn unregister_gathering(&self, gid: u32) -> Result { + Ok(true) + } + async fn get_session_urls(&self, gid: u32) -> Result, ErrorCode> { + let sessions = self.matchmake_manager.sessions.read().await; + + let Some(session) = sessions.get(&gid) else { + return Err(ErrorCode::RendezVous_SessionVoid); + }; + + let session = session.clone(); + + drop(sessions); + + let session = session.lock().await; + + let urls: Vec<_> = + session + .connected_players + .iter() + .filter_map(|v| v.upgrade()) + .filter(|u| u.pid == session.session.gathering.host_pid) + .map(|u| async move { + u.station_url.read().await.clone() + }) + .next() + .ok_or(ErrorCode::RendezVous_SessionClosed)? + .await; + println!("{:?}", urls); + + Ok(urls) + } +} + +impl NatTraversal for User { + async fn report_nat_properties( + &self, + nat_mapping: u32, + nat_filtering: u32, + _rtt: u32, + ) -> Result<(), ErrorCode> { + + let mut urls = self.station_url.write().await; + + for station_url in urls.iter_mut() { + station_url.options.retain(|o| match o { + NatMapping(_) | NatFiltering(_) => false, + _ => true + }); + + station_url.options.push(NatMapping(nat_mapping as u8)); + station_url.options.push(NatFiltering(nat_filtering as u8)); + } + + Ok(()) + } + + async fn request_probe_initiation(&self, station_to_probe: String) -> Result<(), ErrorCode> { + info!("NO!"); + Err(RendezVous_AccountExpired) + } + + async fn request_probe_initialization_ext(&self, target_list: Vec, station_to_probe: String) -> Result<(), ErrorCode> { + let users = self.matchmake_manager.users.read().await; + + println!("requesting station probe for {:?} to {:?}", target_list, station_to_probe); + + for target in target_list{ + let Ok(url) = StationUrl::try_from(target.as_ref()) else{ + continue; + }; + + let Some(RVConnectionID(v)) = url.options.into_iter().find(|o| { matches!(o, &RVConnectionID(_)) }) else{ + continue; + }; + + let Some(v) = users.get(&v) else{ + continue; + }; + + let Some(user) = v.upgrade() else { + continue; + }; + + if let Err(e) = user.remote.request_probe_initiation(station_to_probe.clone()).await{ + error!("error whilest probing"); + } + } + + info!("finished probing"); + + Ok(()) + } +} diff --git a/src/prudp/socket.rs b/src/prudp/socket.rs index 5702c51..404cffe 100644 --- a/src/prudp/socket.rs +++ b/src/prudp/socket.rs @@ -67,7 +67,8 @@ struct InternalConnection { // maybe add connection id(need to see if its even needed) crypto_handler_instance: E, data_sender: Sender>, - socket: Arc + socket: Arc, + packet_queue: HashMap, } impl Deref for InternalConnection{ @@ -82,7 +83,7 @@ impl InternalConnection{ let prev_val = self.reliable_server_counter; let (val, _) = self.reliable_server_counter.overflowing_add(1); self.reliable_server_counter = val; - println!("{}", prev_val); + prev_val } } @@ -193,9 +194,7 @@ impl AnyInternalConnection for InternalConne packet .write_to(&mut vec) .expect("somehow failed to convert backet to bytes"); - - println!("{}", hex::encode(&vec)); - + self.socket .send_to(&vec, self.socket_addr.regular_socket_addr) .await @@ -294,6 +293,7 @@ impl InternalSocket { crypto_handler_instance: T::CryptoConnectionInstance, socket_addr: PRUDPSockAddr, session_id: u8, + is_instantiator: bool, ) { let common = Arc::new(CommonConnection { user_id: crypto_handler_instance.get_user_id(), @@ -307,10 +307,11 @@ impl InternalSocket { let internal = InternalConnection { common: common.clone(), crypto_handler_instance, - reliable_client_counter: 2, - reliable_server_counter: 1, + reliable_client_counter: if is_instantiator { 1 } else { 2 } , + reliable_server_counter: if is_instantiator { 2 } else { 1 }, data_sender: data_sender_from_client, - socket: self.socket.clone() + socket: self.socket.clone(), + packet_queue: Default::default() }; let internal = Arc::new(Mutex::new(internal)); @@ -326,10 +327,6 @@ impl InternalSocket { }; - - - - let mut connections = self.internal_connections.lock().await; connections.insert(socket_addr, internal.clone()); @@ -411,7 +408,7 @@ impl InternalSocket { //println!("connect out: {:?}", response); - self.create_connection(crypto, address, session_id).await; + self.create_connection(crypto, address, session_id, false).await; self.send_packet_unbuffered(address, response).await; } @@ -428,28 +425,34 @@ impl InternalSocket { error!("tried to send data on inactive connection!"); return }; + let conn = conn.clone(); drop(connections); + println!("recieved packed id: {}", packet.header.sequence_id); + let mut conn = conn.lock().await; - conn.crypto_handler_instance.decrypt_incoming(packet.header.substream_id, &mut packet.payload[..]); + conn.packet_queue.insert(packet.header.sequence_id, packet); - let mut data = Vec::new(); + let mut counter = conn.reliable_client_counter; - mem::swap(&mut data, &mut packet.payload); + while let Some(mut packet) = conn.packet_queue.remove(&counter) { + conn.crypto_handler_instance.decrypt_incoming(packet.header.substream_id, &mut packet.payload[..]); - let mut response = packet.base_acknowledgement_packet(); - response.header.types_and_flags.set_flag(HAS_SIZE | ACK); - response.header.session_id = conn.session_id; + let mut response = packet.base_acknowledgement_packet(); + response.header.types_and_flags.set_flag(HAS_SIZE | ACK); + response.header.session_id = conn.session_id; - conn.crypto_handler_instance.sign_packet(&mut response); + conn.crypto_handler_instance.sign_packet(&mut response); - self.send_packet_unbuffered(address, response).await; - - conn.data_sender.send(data).await.ok(); + self.send_packet_unbuffered(address, response).await; + conn.data_sender.send(packet.payload).await.ok(); + conn.reliable_client_counter = conn.reliable_client_counter.overflowing_add(1).0; + counter = conn.reliable_client_counter; + } } async fn handle_ping(&self, address: PRUDPSockAddr, packet: PRUDPPacket){ @@ -615,7 +618,7 @@ impl AnyInternalSocket for InternalSocket { let (_, crypt) = self.crypto_handler.instantiate(remote_signature, *own_signature, &[], 1)?; //todo: make this work for secure servers as well - self.create_connection(crypt, address, 0).await; + self.create_connection(crypt, address, 0, true).await; Some(()) } diff --git a/src/prudp/station_url.rs b/src/prudp/station_url.rs index f981fe8..0e20d6c 100644 --- a/src/prudp/station_url.rs +++ b/src/prudp/station_url.rs @@ -1,9 +1,13 @@ use std::net::Ipv4Addr; use log::error; -use std::fmt::{Display, Formatter, Write}; +use std::fmt::{Debug, Display, Formatter, Write}; +use std::io::Read; +use rocket::delete; use crate::prudp::station_url::Type::{PRUDP, PRUDPS, UDP}; -use crate::prudp::station_url::UrlOptions::{Address, ConnectionID, NatFiltering, NatMapping, NatType, Platform, PMP, Port, PrincipalID, RVConnectionID, StreamID, StreamType, UPNP}; - +use crate::prudp::station_url::UrlOptions::{Address, ConnectionID, NatFiltering, NatMapping, NatType, Platform, PMP, Port, PrincipalID, RVConnectionID, StreamID, StreamType, UPNP, PID}; +use crate::rmc::structures::Error::StationUrlInvalid; +use crate::rmc::structures::RmcSerialize; +#[derive(Clone, Copy, PartialEq, Eq)] pub enum Type{ UDP, PRUDP, @@ -15,6 +19,7 @@ pub mod nat_types{ pub const PUBLIC: u8 = 2; } +#[derive(Clone, Eq, PartialEq)] pub enum UrlOptions { Address(Ipv4Addr), Port(u16), @@ -29,16 +34,18 @@ pub enum UrlOptions { RVConnectionID(u32), Platform(u8), PMP(u8), + PID(u32), } +#[derive(Clone, PartialEq, Eq)] pub struct StationUrl{ pub url_type: Type, pub options: Vec } impl StationUrl{ - fn read_options(options: &str) -> Option>{ + pub fn read_options(options: &str) -> Option>{ let mut options_out = Vec::new(); for option in options.split(';'){ @@ -75,12 +82,21 @@ impl StationUrl{ "RVCID" => { options_out.push(RVConnectionID(option_value.parse().ok()?)) } + "rvcid" => { + options_out.push(RVConnectionID(option_value.parse().ok()?)) + } "pl" => { options_out.push(Platform(option_value.parse().ok()?)) } "pmp" => { options_out.push(PMP(option_value.parse().ok()?)) - } + }, + "pid" => { + options_out.push(PID(option_value.parse().ok()?)) + }, + "PID" => { + options_out.push(PID(option_value.parse().ok()?)) + }, _ => { error!("unimplemented option type, skipping: {}", option_name); } @@ -143,10 +159,12 @@ impl<'a> Into for &'a StationUrl{ RVConnectionID(v) => write!(url, "RVCID={}", v).expect("failed to write"), Platform(v) => write!(url, "pl={}", v).expect("failed to write"), PMP(v) => write!(url, "pmp={}", v).expect("failed to write"), + PID(v) => write!(url, "PID={}", v).expect("failed to write"), } + write!(url, ";").expect("failed to write"); } - url + url[0..url.len()-1].into() } } @@ -158,4 +176,24 @@ impl Display for StationUrl{ } +} + +impl RmcSerialize for StationUrl{ + fn deserialize(reader: &mut dyn Read) -> crate::rmc::structures::Result { + let str = String::deserialize(reader)?; + + Self::try_from(str.as_str()).map_err(|_| StationUrlInvalid) + } + fn serialize(&self, writer: &mut dyn std::io::Write) -> crate::rmc::structures::Result<()> { + let str: String = self.into(); + + str.serialize(writer) + } +} + +impl Debug for StationUrl{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let str: String = self.into(); + f.write_str(&str) + } } \ No newline at end of file diff --git a/src/rmc/protocols/matchmake.rs b/src/rmc/protocols/matchmake.rs new file mode 100644 index 0000000..5e2b23a --- /dev/null +++ b/src/rmc/protocols/matchmake.rs @@ -0,0 +1,11 @@ +use macros::{method_id, rmc_proto}; +use crate::prudp::station_url::StationUrl; +use crate::rmc::response::ErrorCode; + +#[rmc_proto(21)] +pub trait Matchmake{ + #[method_id(2)] + async fn unregister_gathering(&self, gid: u32) -> Result; + #[method_id(41)] + async fn get_session_urls(&self, gid: u32) -> Result, ErrorCode>; +} \ No newline at end of file diff --git a/src/rmc/protocols/matchmake_extension.rs b/src/rmc/protocols/matchmake_extension.rs new file mode 100644 index 0000000..6d0a2b9 --- /dev/null +++ b/src/rmc/protocols/matchmake_extension.rs @@ -0,0 +1,20 @@ +use macros::{method_id, rmc_proto}; +use crate::rmc::response::ErrorCode; +use crate::rmc::structures::matchmake::{AutoMatchmakeParam, CreateMatchmakeSessionParam, JoinMatchmakeSessionParam, MatchmakeSession}; + +#[rmc_proto(109)] +pub trait MatchmakeExtension{ + #[method_id(16)] + async fn get_playing_session(&self, pids: Vec) -> Result, ErrorCode>; + + #[method_id(34)] + async fn update_progress_score(&self, gid: u32, progress: u8) -> Result<(), ErrorCode>; + #[method_id(38)] + async fn create_matchmake_session_with_param(&self, session: CreateMatchmakeSessionParam) -> Result; + + #[method_id(39)] + async fn join_matchmake_session_with_param(&self, session: JoinMatchmakeSessionParam) -> Result; + + #[method_id(40)] + async fn auto_matchmake_with_param_postpone(&self, session: AutoMatchmakeParam) -> Result; +} \ No newline at end of file diff --git a/src/rmc/protocols/mod.rs b/src/rmc/protocols/mod.rs index fd09e63..eb3c44e 100644 --- a/src/rmc/protocols/mod.rs +++ b/src/rmc/protocols/mod.rs @@ -2,6 +2,10 @@ pub mod auth; pub mod secure; +pub mod notifications; +pub mod matchmake; +pub mod matchmake_extension; +pub mod nat_traversal; use crate::prudp::socket::{ExternalConnection, SendingConnection}; use crate::rmc::message::RMCMessage; @@ -261,7 +265,7 @@ async fn handle_incoming( rest_of_data } = message; - info!("got rmc request, handeling it now..."); + info!("RMC REQUEST: Proto: {}; Method: {};", protocol_id, method_id); remote.rmc_call(&sending_conn, protocol_id, method_id, call_id, rest_of_data).await; @@ -272,7 +276,7 @@ async fn handle_incoming( pub fn new_rmc_gateway_connection(conn: ExternalConnection, create_internal: F) -> Arc where - F: FnOnce(RmcConnection) -> T, + F: FnOnce(RmcConnection) -> Arc, { let notify = Arc::new(Notify::new()); let incoming: Arc>> = Default::default(); @@ -285,8 +289,6 @@ where let exposed_object = (create_internal)(rmc_conn); - let exposed_object = Arc::new(exposed_object); - { let exposed_object = exposed_object.clone(); tokio::spawn(async move { diff --git a/src/rmc/protocols/nat_traversal.rs b/src/rmc/protocols/nat_traversal.rs new file mode 100644 index 0000000..9dd488a --- /dev/null +++ b/src/rmc/protocols/nat_traversal.rs @@ -0,0 +1,15 @@ +use macros::{method_id, rmc_proto}; +use crate::rmc::response::ErrorCode; +use crate::rmc::structures::matchmake::{CreateMatchmakeSessionParam, MatchmakeSession}; + +#[rmc_proto(3)] +pub trait NatTraversal{ + #[method_id(2)] + async fn request_probe_initiation(&self, station_to_probe: String) -> Result<(),ErrorCode>; + + #[method_id(3)] + async fn request_probe_initialization_ext(&self, target_list: Vec, station_to_probe: String) -> Result<(),ErrorCode>; + + #[method_id(5)] + async fn report_nat_properties(&self, nat_mapping: u32, nat_filtering: u32, rtt: u32) -> Result<(),ErrorCode>; +} \ No newline at end of file diff --git a/src/rmc/protocols/notifications.rs b/src/rmc/protocols/notifications.rs new file mode 100644 index 0000000..daed6f0 --- /dev/null +++ b/src/rmc/protocols/notifications.rs @@ -0,0 +1,21 @@ +use macros::{method_id, rmc_proto, rmc_struct, RmcSerialize}; +use crate::rmc::response::ErrorCode; +use crate::rmc::structures::qresult::QResult; + +#[derive(RmcSerialize, Debug)] +#[rmc_struct(0)] +pub struct NotificationEvent{ + pub pid_source: u32, + pub notif_type: u32, + pub param_1: u32, + pub param_2: u32, + pub str_param: String, + pub param_3: u32, +} + +#[rmc_proto(14, NoReturn)] +pub trait Notification { + #[method_id(1)] + async fn process_notification_event(&self, event: NotificationEvent); +} + diff --git a/src/rmc/protocols/secure.rs b/src/rmc/protocols/secure.rs index fa11c35..39187bf 100644 --- a/src/rmc/protocols/secure.rs +++ b/src/rmc/protocols/secure.rs @@ -6,7 +6,9 @@ use crate::rmc::structures::connection_data::ConnectionData; use crate::rmc::structures::qresult::QResult; #[rmc_proto(11)] -pub trait Auth { +pub trait Secure { #[method_id(1)] - async fn register(&self, station_urls: Vec) -> Result<(QResult, u32, String), ErrorCode>; + async fn register(&self, station_urls: Vec) -> Result<(QResult, u32, StationUrl), ErrorCode>; + #[method_id(7)] + async fn replace_url(&self, target: StationUrl, dest: StationUrl) -> Result<(), ErrorCode>; } diff --git a/src/rmc/response.rs b/src/rmc/response.rs index f47ee66..26bc47b 100644 --- a/src/rmc/response.rs +++ b/src/rmc/response.rs @@ -154,8 +154,7 @@ pub async fn send_result( method_id: u32, call_id: u32, ) { - - println!("{}", hex::encode(result.clone().unwrap())); + let response_result = match result { Ok(v) => RMCResponseResult::Success { call_id, diff --git a/src/rmc/structures/matchmake.rs b/src/rmc/structures/matchmake.rs index f11b432..f3c4c03 100644 --- a/src/rmc/structures/matchmake.rs +++ b/src/rmc/structures/matchmake.rs @@ -92,5 +92,26 @@ pub struct CreateMatchmakeSessionParam { pub create_matchmake_session_option: u32, pub join_message: String, pub participation_count: u16, - } + +#[derive(RmcSerialize, Debug, Clone)] +#[rmc_struct(0)] +pub struct MatchmakeBlockListParam { + option_flag: u32 +} + +#[derive(RmcSerialize, Debug, Clone)] +#[rmc_struct(0)] +pub struct JoinMatchmakeSessionParam { + pub gid: u32, + pub additional_participants: Vec, + pub gid_for_participation_check: u32, + pub join_matchmake_session_open: u32, + pub join_matchmake_session_behavior: u8, + pub user_password: String, + pub system_password: String, + pub join_message: String, + pub participation_count: u16, + //pub extra_participant: u16, + //pub block_list_param: MatchmakeBlockListParam +} \ No newline at end of file diff --git a/src/rmc/structures/mod.rs b/src/rmc/structures/mod.rs index a9f9ab6..d1ed8b3 100644 --- a/src/rmc/structures/mod.rs +++ b/src/rmc/structures/mod.rs @@ -15,6 +15,8 @@ pub enum Error{ UnexpectedValue(u64), #[error("version mismatch: {0}")] VersionMismatch(u8), + #[error("an error occurred reading the station url")] + StationUrlInvalid } pub type Result = std::result::Result; diff --git a/src/rmc/structures/primitives.rs b/src/rmc/structures/primitives.rs index e8855f0..7c345d5 100644 --- a/src/rmc/structures/primitives.rs +++ b/src/rmc/structures/primitives.rs @@ -13,6 +13,16 @@ impl RmcSerialize for u8{ } } +impl RmcSerialize for i8{ + fn serialize(&self, writer: &mut dyn Write) -> crate::rmc::structures::Result<()> { + Ok(writer.write_all(bytes_of(self))?) + } + + fn deserialize(mut reader: &mut dyn Read) -> crate::rmc::structures::Result { + Ok(reader.read_struct(IS_BIG_ENDIAN)?) + } +} + impl RmcSerialize for u16{ fn serialize(&self, writer: &mut dyn Write) -> crate::rmc::structures::Result<()> { Ok(writer.write_all(bytes_of(self))?) @@ -23,6 +33,16 @@ impl RmcSerialize for u16{ } } +impl RmcSerialize for i16{ + fn serialize(&self, writer: &mut dyn Write) -> crate::rmc::structures::Result<()> { + Ok(writer.write_all(bytes_of(self))?) + } + + fn deserialize(mut reader: &mut dyn Read) -> crate::rmc::structures::Result { + Ok(reader.read_struct(IS_BIG_ENDIAN)?) + } +} + impl RmcSerialize for u32{ fn serialize(&self, writer: &mut dyn Write) -> crate::rmc::structures::Result<()> { Ok(writer.write_all(bytes_of(self))?) @@ -33,6 +53,16 @@ impl RmcSerialize for u32{ } } +impl RmcSerialize for i32{ + fn serialize(&self, writer: &mut dyn Write) -> crate::rmc::structures::Result<()> { + Ok(writer.write_all(bytes_of(self))?) + } + + fn deserialize(mut reader: &mut dyn Read) -> crate::rmc::structures::Result { + Ok(reader.read_struct(IS_BIG_ENDIAN)?) + } +} + impl RmcSerialize for u64{ fn serialize(&self, writer: &mut dyn Write) -> crate::rmc::structures::Result<()> { Ok(writer.write_all(bytes_of(self))?)