From 98193a58d880ffe5ae8c3359565cded1b39ecee0 Mon Sep 17 00:00:00 2001 From: DJMrTV Date: Sun, 29 Jun 2025 11:40:42 +0200 Subject: [PATCH] feat: a lot of things(i lost track) --- Cargo.lock | 54 +++ Cargo.toml | 9 +- macros/src/lib.rs | 432 ++++++++++++++------- macros/src/protos.rs | 6 +- src/executables/backend_server_insecure.rs | 28 +- src/executables/backend_server_secure.rs | 16 +- src/executables/common.rs | 63 ++- src/executables/control_server.rs | 209 ++++++++++ src/executables/proxy_insecure.rs | 68 +++- src/executables/proxy_secure.rs | 90 +++-- src/grpc/account.rs | 11 - src/lib.rs | 8 + src/main.rs | 4 +- src/nex/account.rs | 7 +- src/nex/auth_handler.rs | 24 +- src/nex/matchmake.rs | 6 +- src/nex/remote_console.rs | 4 +- src/nex/user.rs | 27 +- src/prudp/packet.rs | 2 +- src/prudp/router.rs | 4 +- src/prudp/secure.rs | 5 +- src/prudp/socket.rs | 31 +- src/prudp/station_url.rs | 1 - src/reggie.rs | 228 ++++++++++- src/rmc/protocols/matchmake_ext.rs | 1 - src/rmc/protocols/mod.rs | 68 +++- src/rmc/protocols/nat_traversal.rs | 1 - src/rmc/protocols/notifications.rs | 2 - src/rmc/protocols/ranking.rs | 2 +- src/rmc/protocols/secure.rs | 2 - src/rmc/response.rs | 12 +- src/rmc/structures/buffer.rs | 2 +- src/rmc/structures/connection_data.rs | 5 +- src/rmc/structures/list.rs | 27 ++ src/rmc/structures/mod.rs | 4 +- src/rmc/structures/networking.rs | 1 + src/rmc/structures/primitives.rs | 9 + src/rmc/structures/qbuffer.rs | 1 - src/rnex_proxy_common.rs | 1 - src/util.rs | 87 +++-- src/versions.rs | 2 +- src/web/mod.rs | 7 +- 42 files changed, 1206 insertions(+), 365 deletions(-) create mode 100644 src/executables/control_server.rs diff --git a/Cargo.lock b/Cargo.lock index e420127..fcc53fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -435,6 +435,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "data-encoding" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" + [[package]] name = "der" version = "0.7.10" @@ -2239,8 +2245,10 @@ dependencies = [ "tokio", "tokio-rustls", "tokio-stream", + "tokio-tungstenite", "tonic", "tonic-build", + "tungstenite", "typenum", "v_byte_macros", ] @@ -2428,6 +2436,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha2" version = "0.10.9" @@ -2772,6 +2791,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "489a59b6730eda1b0171fcfda8b121f4bee2b35cba8645ca35c5f7ba3eb736c1" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.15" @@ -3002,6 +3033,23 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eadc29d668c91fcc564941132e17b28a7ceb2f3ebf0b9dae3e03fd7a6748eb0d" +dependencies = [ + "bytes", + "data-encoding", + "http 1.3.1", + "httparse", + "log", + "rand 0.9.1", + "sha1", + "thiserror", + "utf-8", +] + [[package]] name = "typenum" version = "1.18.0" @@ -3056,6 +3104,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8_iter" version = "1.0.4" diff --git a/Cargo.toml b/Cargo.toml index 9a393b1..e5c2164 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,8 @@ rustls = "0.23.27" rustls-pki-types = "1.12.0" rustls-webpki = "0.103.3" tokio-rustls = "0.26.2" +tokio-tungstenite = "0.27.0" +tungstenite = "0.27.0" @@ -65,6 +67,7 @@ tonic-build = "0.12.3" default = ["secure", "auth"] secure = [] auth = [] +no_tls = [] [[bin]] name = "proxy_insecure" @@ -81,4 +84,8 @@ path = "src/executables/backend_server_insecure.rs" [[bin]] name = "backend_server_secure" -path = "src/executables/backend_server_secure.rs" \ No newline at end of file +path = "src/executables/backend_server_secure.rs" + +[[bin]] +name = "control_server" +path = "src/executables/control_server.rs" \ No newline at end of file diff --git a/macros/src/lib.rs b/macros/src/lib.rs index 835ea6b..ad77595 100644 --- a/macros/src/lib.rs +++ b/macros/src/lib.rs @@ -2,128 +2,74 @@ mod protos; extern crate proc_macro; -use proc_macro2::{Ident, Literal, Span, TokenTree}; +use crate::protos::{ProtoMethodData, RmcProtocolData}; use proc_macro::TokenStream; -use std::iter::FromIterator; -use std::mem; -use syn::{parse_macro_input, DeriveInput, Data, PathSegment, TraitItem, FieldsNamed, Fields, Visibility, Type, TypePath, Path, ImplItem, ImplItemConst, Expr, ExprLit, Lit, TypeParamBound, TraitBound, TraitBoundModifier, LitInt, Token, FnArg, Receiver, PatType, Pat, TypeInfer, TypeReference, TraitItemFn, Signature, Block, Stmt, Local, LocalInit, LitStr, PathArguments, ReturnType}; -use quote::{quote, ToTokens, TokenStreamExt}; -use syn::buffer::TokenBuffer; -use syn::parse::{Parse, ParseBuffer, ParseStream}; +use proc_macro2::{Ident, Literal, Span}; +use quote::{quote, TokenStreamExt}; +use syn::parse::{Parse, ParseStream}; use syn::punctuated::Punctuated; use syn::spanned::Spanned; -use syn::token::Comma; -use syn::Visibility::Public; -use crate::protos::{ProtoMethodData, RmcProtocolData}; +use syn::{ + parse_macro_input, Attribute, Data, DataStruct, DeriveInput, Fields, FnArg, LitInt, Pat, Token, + TraitItem, +}; -fn self_referece_type() -> Type { - Type::Reference( - TypeReference { - and_token: Default::default(), - lifetime: None, - mutability: None, - elem: Box::new(Type::Path( - TypePath { - qself: None, - path: Path { - leading_colon: None, - segments: { - let mut punct = Punctuated::new(); - - punct.push_value(PathSegment{ - ident: Ident::new("Self", Span::call_site()), - arguments: PathArguments::None - }); - - punct - } - } - } - )) - } - ) -} - -struct ProtoInputParams{ +struct ProtoInputParams { proto_num: LitInt, - properties: Option<(Token![,], Punctuated)> + properties: Option<(Token![,], Punctuated)>, } -impl Parse for ProtoInputParams{ +impl Parse for ProtoInputParams { fn parse(input: ParseStream) -> syn::Result { let proto_num = input.parse()?; - if let Some(seperator) = input.parse()?{ + if let Some(seperator) = input.parse()? { let mut punctuated = Punctuated::new(); loop { - punctuated.push_value( - input.parse()? - ); + punctuated.push_value(input.parse()?); if let Some(punct) = input.parse()? { punctuated.push_punct(punct); } else { - return Ok( - Self{ - proto_num, - properties: Some((seperator, punctuated)) - } - ) + return Ok(Self { + proto_num, + properties: Some((seperator, punctuated)), + }); } } } else { - Ok( - Self{ - proto_num, - properties: None - } - ) + Ok(Self { + proto_num, + properties: None, + }) } } } -fn single_ident_path(ident: Ident) -> Path{ - Path{ - segments: { - let mut punc = Punctuated::new(); - punc.push(PathSegment::from(ident)); - punc - }, - leading_colon: None, - } -} - - -#[proc_macro_derive(RmcSerialize, attributes(extends, rmc_struct))] -pub fn rmc_serialize(input: TokenStream) -> TokenStream { - let derive_input = parse_macro_input!(input as DeriveInput); - - let struct_attr = derive_input.attrs.iter() - .find(|a| a.path().segments.len() == 1 && - a.path().segments.first().is_some_and(|p| p.ident.to_string() == "rmc_struct")); - - let Data::Struct(s) = derive_input.data else { - panic!("rmc struct type MUST be a struct"); - }; - - // generate base data - +fn gen_serialize_data_struct( + s: DataStruct, + struct_attr: Option<&Attribute>, +) -> (proc_macro2::TokenStream, proc_macro2::TokenStream) { let serialize_base_content = { let mut serialize_content = quote! {}; - for f in &s.fields{ - if f.attrs.iter() - .any(|a| a.path().segments.len() == 1 && - a.path().segments.first().is_some_and(|p| p.ident.to_string() == "extends")){ + for f in &s.fields { + if f.attrs.iter().any(|a| { + a.path().segments.len() == 1 + && a.path() + .segments + .first() + .is_some_and(|p| p.ident.to_string() == "extends") + }) { continue; } let ident = f.ident.as_ref().unwrap(); - serialize_content.append_all(quote!{ + serialize_content.append_all(quote! { self.#ident.serialize(writer)?; }) } - quote!{ + quote! { #serialize_content Ok(()) @@ -135,10 +81,10 @@ pub fn rmc_serialize(input: TokenStream) -> TokenStream { for f in &s.fields { let ident = f.ident.as_ref().unwrap(); - structure_content.append_all(quote!{#ident, }); + structure_content.append_all(quote! {#ident, }); } - quote!{ + quote! { Ok(Self{ #structure_content }) @@ -148,22 +94,26 @@ pub fn rmc_serialize(input: TokenStream) -> TokenStream { let deserialize_base_content = { let mut deserialize_content = quote! {}; - for f in &s.fields{ - if f.attrs.iter() - .any(|a| a.path().segments.len() == 1 && - a.path().segments.first().is_some_and(|p| p.ident.to_string() == "extends")){ + for f in &s.fields { + if f.attrs.iter().any(|a| { + a.path().segments.len() == 1 + && a.path() + .segments + .first() + .is_some_and(|p| p.ident.to_string() == "extends") + }) { continue; } let ident = f.ident.as_ref().unwrap(); let ty = &f.ty; - deserialize_content.append_all(quote!{ + deserialize_content.append_all(quote! { let #ident = <#ty> :: deserialize(reader)?; }) } - quote!{ + quote! { #deserialize_content #struct_ctor } @@ -171,15 +121,19 @@ pub fn rmc_serialize(input: TokenStream) -> TokenStream { // generate base with extends stuff - let serialize_base_content = if let Some(attr) = struct_attr{ + let serialize_base_content = if let Some(attr) = struct_attr { let version: Literal = attr.parse_args().expect("has to be a literal"); let pre_inner = if let Some(f) = s.fields.iter().find(|f| { - f.attrs.iter() - .any(|a| a.path().segments.len() == 1 && - a.path().segments.first().is_some_and(|p| p.ident.to_string() == "extends")) - }){ - let ident= f.ident.as_ref().unwrap(); + f.attrs.iter().any(|a| { + a.path().segments.len() == 1 + && a.path() + .segments + .first() + .is_some_and(|p| p.ident.to_string() == "extends") + }) + }) { + let ident = f.ident.as_ref().unwrap(); quote! { self.#ident.serialize(writer)?; } @@ -199,16 +153,20 @@ pub fn rmc_serialize(input: TokenStream) -> TokenStream { serialize_base_content }; - let deserialize_base_content = if let Some(attr) = struct_attr{ + let deserialize_base_content = if let Some(attr) = struct_attr { let version: Literal = attr.parse_args().expect("has to be a literal"); let pre_inner = if let Some(f) = s.fields.iter().find(|f| { - f.attrs.iter() - .any(|a| a.path().segments.len() == 1 && - a.path().segments.first().is_some_and(|p| p.ident.to_string() == "extends")) - }){ - let ident= f.ident.as_ref().unwrap(); - let ty= &f.ty; + f.attrs.iter().any(|a| { + a.path().segments.len() == 1 + && a.path() + .segments + .first() + .is_some_and(|p| p.ident.to_string() == "extends") + }) + }) { + let ident = f.ident.as_ref().unwrap(); + let ty = &f.ty; quote! { let #ident = <#ty> :: deserialize(reader)?; } @@ -226,6 +184,183 @@ pub fn rmc_serialize(input: TokenStream) -> TokenStream { deserialize_base_content }; + (serialize_base_content, deserialize_base_content) +} + +#[proc_macro_derive(RmcSerialize, attributes(extends, rmc_struct))] +pub fn rmc_serialize(input: TokenStream) -> TokenStream { + let derive_input = parse_macro_input!(input as DeriveInput); + + let struct_attr = derive_input.attrs.iter().find(|a| { + a.path().segments.len() == 1 + && a.path() + .segments + .first() + .is_some_and(|p| p.ident.to_string() == "rmc_struct") + }); + let repr_attr = derive_input.attrs.iter().find(|a| { + a.path().segments.len() == 1 + && a.path() + .segments + .first() + .is_some_and(|p| p.ident.to_string() == "repr") + }); + + /*let Data::Struct(s) = derive_input.data else { + panic!("rmc struct type MUST be a struct"); + };*/ + + let (serialize_base_content, deserialize_base_content) = match derive_input.data { + Data::Struct(s) => gen_serialize_data_struct(s, struct_attr), + Data::Enum(e) => { + let Some(repr_attr) = repr_attr else { + panic!("missing repr attribute"); + }; + + let ty: Ident = repr_attr.parse_args().unwrap(); + + let mut inner_match_de = quote! {}; + let mut inner_match_se = quote! {}; + + for variant in e.variants { + let Some((_, val)) = variant.discriminant else { + panic!("missing discriminant"); + }; + + let field_data_de = match &variant.fields { + Fields::Named(v) => { + let mut base = quote! {}; + for field in v.named.iter() { + let ty = &field.ty; + let name = &field.ident; + + base.append_all(quote!{ + #name: <#ty as rust_nex::rmc::structures::RmcSerialize>::deserialize(reader)?, + }); + } + + quote! {{#base}} + } + Fields::Unnamed(n) => { + let mut base = quote! {}; + + for field in n.unnamed.iter() { + let ty = &field.ty; + + base.append_all(quote!{ + <#ty as rust_nex::rmc::structures::RmcSerialize>::deserialize(reader)?, + }); + } + + quote! {(#base)} + } + Fields::Unit => { + quote! {} + } + }; + + let mut se_with_fields = quote! { + <#ty as rust_nex::rmc::structures::RmcSerialize>::serialize(&#val, writer)?; + }; + + match &variant.fields { + Fields::Named(v) => { + for field in v.named.iter() { + let ty = &field.ty; + let name = &field.ident; + + se_with_fields.append_all(quote!{ + <#ty as rust_nex::rmc::structures::RmcSerialize>::serialize(#name ,writer)?; + }); + } + } + Fields::Unnamed(n) => { + for (i, field) in n.unnamed.iter().enumerate() { + let ty = &field.ty; + + let ident = Ident::new(&format!("val_{}", i), Span::call_site()); + + se_with_fields.append_all(quote!{ + <#ty as rust_nex::rmc::structures::RmcSerialize>::serialize(#ident, writer)?; + }); + } + } + Fields::Unit => {} + }; + + let field_match_se = match &variant.fields { + Fields::Named(v) => { + let mut base = quote! {}; + + for field in v.named.iter() { + let name = &field.ident; + + base.append_all(quote! { + #name, + }); + } + + quote! {{#base}} + } + Fields::Unnamed(n) => { + let mut base = quote! {}; + + for (i, _field) in n.unnamed.iter().enumerate() { + let ident = Ident::new(&format!("val_{}", i), Span::call_site()); + + base.append_all(quote! { + #ident, + }); + } + + quote! {(#base)} + } + Fields::Unit => { + quote! {} + } + }; + + let name = variant.ident; + + inner_match_de.append_all(quote! { + #val => Self::#name #field_data_de, + }); + + inner_match_se.append_all(quote! { + Self::#name #field_match_se => { + #se_with_fields + }, + }); + } + + let serialize_base_content = quote! { + match self{ + #inner_match_se + }; + + + + Ok(()) + }; + + let deserialize_base_content = quote! { + let val: Self = match <#ty as rust_nex::rmc::structures::RmcSerialize>::deserialize(reader)?{ + #inner_match_de + v => return Err(rust_nex::rmc::structures::Error::UnexpectedValue(v as _)) + }; + + Ok(val) + }; + + (serialize_base_content, deserialize_base_content) + } + Data::Union(_) => { + unimplemented!() + } + }; + + // generate base data + let ident = derive_input.ident; let tokens = quote! { @@ -255,7 +390,7 @@ pub fn rmc_serialize(input: TokenStream) -> TokenStream { /// [`macro@method_id`] attribute. /// /// You can also specify to have the protocol to be non-returning by adding a second parameter to -/// the attribute which is just `NoReturn` e.g. ` #[rmc_proto(1, NoReturn)]` +/// the attribute which is just `NoReturn` e.g. `#[rmc_proto(1, NoReturn)]` /// /// Example /// ``` @@ -270,34 +405,39 @@ pub fn rmc_serialize(input: TokenStream) -> TokenStream { /// } /// ``` #[proc_macro_attribute] -pub fn rmc_proto(attr: TokenStream, input: TokenStream) -> TokenStream{ +pub fn rmc_proto(attr: TokenStream, input: TokenStream) -> TokenStream { + let params = parse_macro_input!(attr as ProtoInputParams); - let mut params = parse_macro_input!(attr as ProtoInputParams); - - let ProtoInputParams{ + let ProtoInputParams { proto_num, - properties + properties, } = params; - let no_return_data = properties.is_some_and(|p| p.1.iter().any(|i|{ - i.to_string() == "NoReturn" - })); + let no_return_data = + properties.is_some_and(|p| p.1.iter().any(|i| i.to_string() == "NoReturn")); - let mut input = parse_macro_input!(input as syn::ItemTrait); + let input = parse_macro_input!(input as syn::ItemTrait); // gigantic ass struct initializer (to summarize this gets all of the data) - let raw_data = RmcProtocolData{ + let raw_data = RmcProtocolData { has_returns: !no_return_data, name: input.ident.clone(), id: proto_num, methods: input .items .iter() - .filter_map(|v| match v{ TraitItem::Fn(v) => Some(v), _ => None }) - .map(|func|{ - let Some(attr) = func.attrs.iter() - .find(|a| a.path().segments.last().is_some_and(|s| s.ident.to_string() == "method_id")) else { - panic!( "every function inside of an rmc protocol must have a method id"); + .filter_map(|v| match v { + TraitItem::Fn(v) => Some(v), + _ => None, + }) + .map(|func| { + let Some(attr) = func.attrs.iter().find(|a| { + a.path() + .segments + .last() + .is_some_and(|s| s.ident.to_string() == "method_id") + }) else { + panic!("every function inside of an rmc protocol must have a method id"); }; let Ok(id): Result = attr.parse_args() else { @@ -314,26 +454,30 @@ pub fn rmc_proto(attr: TokenStream, input: TokenStream) -> TokenStream{ panic!("what"); }; let Pat::Ident(i) = &*t.pat else { - panic!("unable to handle non identifier patterns as parameter bindings"); + panic!( + "unable to handle non identifier patterns as parameter bindings" + ); }; (i.ident.clone(), t.ty.as_ref().clone()) - }).collect(); + }) + .collect(); - ProtoMethodData{ + ProtoMethodData { id, name: func.sig.ident.clone(), parameters: funcs, - ret_val: func.sig.output.clone() + ret_val: func.sig.output.clone(), } - }).collect() - + }) + .collect(), }; - quote!{ + quote! { #input #raw_data - }.into() + } + .into() } /// Used to specify the method id of methods when making rmc protocols. @@ -342,25 +486,25 @@ pub fn rmc_proto(attr: TokenStream, input: TokenStream) -> TokenStream{ /// Note: This attribute doesn't do anything by itself and just returns the thing it was attached to /// unchanged. #[proc_macro_attribute] -pub fn method_id(_attr: TokenStream, input: TokenStream) -> TokenStream{ +pub fn method_id(_attr: TokenStream, input: TokenStream) -> TokenStream { // this attribute doesnt do anything by itself, see `rmc_proto` input } - - #[proc_macro_attribute] -pub fn rmc_struct(attr: TokenStream, input: TokenStream) -> TokenStream{ - let mut type_data = parse_macro_input!(input as DeriveInput); +pub fn rmc_struct(attr: TokenStream, input: TokenStream) -> TokenStream { + let type_data = parse_macro_input!(input as DeriveInput); let mut ident = parse_macro_input!(attr as syn::Path); let last_token = ident.segments.last_mut().expect("empty path?"); - last_token.ident = Ident::new(&("Local".to_owned() + &last_token.ident.to_string()), last_token.span()); - + last_token.ident = Ident::new( + &("Local".to_owned() + &last_token.ident.to_string()), + last_token.span(), + ); let struct_name = &type_data.ident; - let out = quote!{ + let out = quote! { #type_data impl #ident for #struct_name{ @@ -378,7 +522,7 @@ pub fn rmc_struct(attr: TokenStream, input: TokenStream) -> TokenStream{ } #[proc_macro_attribute] -pub fn connection(_attr: TokenStream, input: TokenStream) -> TokenStream{ +pub fn connection(_attr: TokenStream, input: TokenStream) -> TokenStream { // this attribute doesnt do anything by itself, see `rmc_struct` input -} \ No newline at end of file +} diff --git a/macros/src/protos.rs b/macros/src/protos.rs index 8efedb4..14ec2f6 100644 --- a/macros/src/protos.rs +++ b/macros/src/protos.rs @@ -1,7 +1,7 @@ -use proc_macro2::{Ident, Span, TokenStream, TokenTree}; +use proc_macro2::{Ident, Span, TokenStream}; use quote::{quote, ToTokens}; -use syn::{LitInt, LitStr, ReturnType, Token, Type}; -use syn::token::{Brace, Bracket, Paren, Semi}; +use syn::{LitInt, LitStr, ReturnType, Type}; +use syn::token::{Brace, Paren, Semi}; pub struct ProtoMethodData{ pub id: LitInt, diff --git a/src/executables/backend_server_insecure.rs b/src/executables/backend_server_insecure.rs index 6e7f3a3..db65844 100644 --- a/src/executables/backend_server_insecure.rs +++ b/src/executables/backend_server_insecure.rs @@ -23,9 +23,11 @@ use tokio::net::{TcpListener, TcpSocket}; use tokio::task; use tokio_rustls::TlsAcceptor; use rust_nex::define_rmc_proto; -use rust_nex::executables::common::{OWN_IP_PRIVATE, SECURE_SERVER_ACCOUNT, SERVER_PORT}; +use rust_nex::executables::common::{RemoteController, OWN_IP_PRIVATE, SECURE_SERVER_ACCOUNT, SERVER_PORT}; +use rust_nex::executables::common::ServerCluster::{Auth, Secure}; +use rust_nex::executables::common::ServerType::Backend; use rust_nex::nex::auth_handler::AuthHandler; -use rust_nex::rmc::protocols::new_rmc_gateway_connection; +use rust_nex::rmc::protocols::{new_rmc_gateway_connection, OnlyRemote}; use rust_nex::rmc::response::ErrorCode; use rust_nex::rmc::structures::RmcSerialize; use rust_nex::rnex_proxy_common::ConnectionInitData; @@ -44,17 +46,21 @@ pub static SECURE_PROXY_PORT: Lazy = Lazy::new(|| { .unwrap_or(10000) }); -static SECURE_STATION_URL: Lazy = Lazy::new(|| { - format!( - "prudps:/PID=2;sid=1;stream=10;type=2;address={};port={};CID=1", - *SECURE_PROXY_ADDR, *SECURE_PROXY_PORT - ) -}); - #[tokio::main] async fn main() { setup(); + let conn = rust_nex::reggie::rmc_connect_to( + "agmp-control.spfn.net", + Backend{ + name: "agmp-auth-1.spfn.net".to_string(), + cluster: Auth + }, + |r| Arc::new(OnlyRemote::::new(r)) + ).await; + let conn = conn.unwrap(); + + let acceptor = get_configured_tls_acceptor().await; let listen = TcpListener::bind(SocketAddrV4::new(*OWN_IP_PRIVATE, *SERVER_PORT)).await.unwrap(); @@ -84,14 +90,14 @@ async fn main() { continue; } }; - + let controller = conn.clone(); task::spawn(async move { info!("connection to secure backend established"); new_rmc_gateway_connection(stream.into(), |_| { Arc::new(AuthHandler { destination_server_acct: &SECURE_SERVER_ACCOUNT, build_name: "branch:origin/project/wup-agmj build:3_8_15_2004_0", - station_url: &SECURE_STATION_URL, + control_server: controller }) }); }); diff --git a/src/executables/backend_server_secure.rs b/src/executables/backend_server_secure.rs index 5744e8c..eac3784 100644 --- a/src/executables/backend_server_secure.rs +++ b/src/executables/backend_server_secure.rs @@ -8,12 +8,14 @@ use log::{error, info}; use tokio::net::TcpListener; use tokio::task; use rust_nex::common::setup; -use rust_nex::executables::common::{OWN_IP_PRIVATE, SERVER_PORT}; +use rust_nex::executables::common::{RemoteController, RemoteControllerManagement, OWN_IP_PRIVATE, SERVER_PORT}; +use rust_nex::executables::common::ServerCluster::Secure; +use rust_nex::executables::common::ServerType::Backend; use rust_nex::nex::matchmake::MatchmakeManager; use rust_nex::nex::remote_console::RemoteConsole; use rust_nex::nex::user::User; use rust_nex::reggie::get_configured_tls_acceptor; -use rust_nex::rmc::protocols::new_rmc_gateway_connection; +use rust_nex::rmc::protocols::{new_rmc_gateway_connection, OnlyRemote}; use rust_nex::rnex_proxy_common::ConnectionInitData; use rust_nex::rmc::protocols::RemoteInstantiatable; @@ -22,6 +24,16 @@ use rust_nex::rmc::protocols::RemoteInstantiatable; async fn main() { setup(); + let conn = rust_nex::reggie::rmc_connect_to( + "agmp-control.spfn.net", + Backend{ + name: "agmp-secure-1.spfn.net".to_string(), + cluster: Secure + }, + |r| Arc::new(OnlyRemote::::new(r)) + ).await; + let conn = conn.unwrap(); + let acceptor = get_configured_tls_acceptor().await; let listen = TcpListener::bind(SocketAddrV4::new(*OWN_IP_PRIVATE, *SERVER_PORT)).await.unwrap(); diff --git a/src/executables/common.rs b/src/executables/common.rs index 0b81da8..c8ed07f 100644 --- a/src/executables/common.rs +++ b/src/executables/common.rs @@ -1,7 +1,12 @@ use std::env; -use std::net::Ipv4Addr; +use std::net::{Ipv4Addr, SocketAddrV4}; +use macros::{method_id, rmc_proto, RmcSerialize}; use once_cell::sync::Lazy; +use tonic::transport::Server; +use rust_nex::define_rmc_proto; +use rust_nex::prudp::station_url::StationUrl; use crate::nex::account::Account; +use crate::rmc::response::ErrorCode; pub static OWN_IP_PRIVATE: Lazy = Lazy::new(|| { env::var("SERVER_IP") @@ -10,6 +15,13 @@ pub static OWN_IP_PRIVATE: Lazy = Lazy::new(|| { .expect("no private ip specified") }); +pub static OWN_IP_PUBLIC: Lazy = Lazy::new(|| { + env::var("SERVER_IP_PUBLIC") + .ok() + .and_then(|s| s.parse().ok()) + .expect("no private ip specified") +}); + pub static SERVER_PORT: Lazy = Lazy::new(|| { env::var("SERVER_PORT") .ok() @@ -27,3 +39,52 @@ pub static AUTH_SERVER_ACCOUNT: Lazy = Lazy::new(|| Account::new(1, "Quazal Authentication", &KERBEROS_SERVER_PASSWORD)); pub static SECURE_SERVER_ACCOUNT: Lazy = Lazy::new(|| Account::new(2, "Quazal Rendez-Vous", &KERBEROS_SERVER_PASSWORD)); + + +#[rmc_proto(1)] +pub trait ProxyManagement { + #[method_id(1)] + async fn update_url(&self, url: String) -> Result<(), ErrorCode>; +} + +define_rmc_proto!( + proto Proxy{ + ProxyManagement + } +); + +#[rmc_proto(2)] +pub trait ControllerManagement { + #[method_id(1)] + async fn get_secure_proxy_url(&self) -> Result; + + #[method_id(2)] + async fn get_secure_account(&self) -> Result; +} + +define_rmc_proto!( + proto Controller{ + ControllerManagement + } +); + +#[derive(RmcSerialize)] +#[repr(u32)] +pub enum ServerCluster{ + Auth = 0, + Secure = 1 +} + +#[derive(RmcSerialize)] +#[repr(u32)] +pub enum ServerType{ + Proxy{ + addr: SocketAddrV4, + cluster: ServerCluster + } = 1, + Backend{ + name: String, + cluster: ServerCluster + } = 2, +} + diff --git a/src/executables/control_server.rs b/src/executables/control_server.rs new file mode 100644 index 0000000..238539d --- /dev/null +++ b/src/executables/control_server.rs @@ -0,0 +1,209 @@ +use std::future::Future; +use rust_nex::rmc::protocols::{LocalNoProto, RmcCallable}; +use rust_nex::rmc::structures::RmcSerialize; +use std::io::Cursor; +use std::net::{Ipv4Addr, SocketAddrV4}; +use macros::rmc_struct; +use rust_nex::common::setup; +use rust_nex::executables::common::{ControllerManagement, LocalController, RemoteProxy, RemoteProxyManagement, ServerCluster, ServerType, KERBEROS_SERVER_PASSWORD}; +use rust_nex::prudp::station_url::StationUrl; +use rust_nex::reggie::{get_configured_tls_acceptor, TestStruct, WebStreamSocket}; +use rust_nex::rmc::protocols::{new_rmc_gateway_connection, OnlyRemote}; +use rust_nex::rmc::response::ErrorCode; +use rust_nex::reggie::UnitPacketRead; +use std::sync::{Arc, Weak}; +use log::error; +use once_cell::sync::Lazy; +use rand::random; +use tokio::net::TcpListener; +use tokio::sync::RwLock; +use tokio::task; +use tungstenite::client; +use rust_nex::nex::account::Account; +use rust_nex::rmc::response::ErrorCode::{Core_Exception, Core_InvalidIndex}; +use rust_nex::rmc::protocols::RemoteInstantiatable; +use rust_nex::util::SendingBufferConnection; + +pub static AUTH_SERVER_ACCOUNT: Lazy = + Lazy::new(|| Account::new(1, "Quazal Authentication", &KERBEROS_SERVER_PASSWORD)); +pub static SECURE_SERVER_ACCOUNT: Lazy = + Lazy::new(|| Account::new(2, "Quazal Rendez-Vous", &KERBEROS_SERVER_PASSWORD)); + +#[rmc_struct(Controller)] +struct ServerController { + insecure_proxies: RwLock>>, + insecure_backend_url: RwLock, + secure_proxies: RwLock>>, + secure_backend_url: RwLock, + account: Account +} + +impl ServerController{ + async fn update_urls(&self, cluster: ServerCluster){ + let url = match cluster{ + ServerCluster::Auth => { + self.insecure_backend_url.read().await + } + ServerCluster::Secure => { + self.secure_backend_url.read().await + } + }.clone(); + + let read_lock = match cluster{ + ServerCluster::Auth => { + self.insecure_proxies.read().await + } + ServerCluster::Secure => { + self.secure_proxies.read().await + } + }; + + for proxy in read_lock.iter().filter_map(|v| v.upgrade()){ + if let Err(e) = proxy.proxy.update_url(url.clone()).await { + error!("error whilest updating proxy url: {:?}", e); + } + } + } +} + +struct Proxy{ + proxy: RemoteProxy, + ip: SocketAddrV4, + controller: Arc +} + +impl RmcCallable for Proxy{ + fn rmc_call(&self, responder: &SendingBufferConnection, protocol_id: u16, method_id: u32, call_id: u32, rest: Vec) -> impl Future + Send { + self.controller.rmc_call(responder, protocol_id, method_id, call_id, rest) + } +} + + +impl ControllerManagement for ServerController { + async fn get_secure_proxy_url(&self) -> Result { + let proxy = self.secure_proxies.write().await; + + let proxies = proxy.iter().filter_map(|v| v.upgrade()); + + let idx: usize = random::() % proxy.len(); + // do not switch this to using regular array indexing i specifically wrote it like this as + // to have absolutely now way of panicking, we cant have the control server panicking after + // all + let Some(proxy) = proxies.clone().nth(idx).or_else(|| proxies.clone().nth(0)) else { + return Err(Core_InvalidIndex); + }; + + let station_url = format!( + "prudps:/PID=2;sid=1;stream=10;type=2;address={};port={};CID=1", + proxy.ip.ip(), proxy.ip.port() + ); + + Ok(station_url) + } + + async fn get_secure_account(&self) -> Result { + Ok(self.account.clone()) + } +} + + + +#[tokio::main] +async fn main() { + setup(); + + let socket = TcpListener::bind("0.0.0.0:10003").await.unwrap(); + + let acceptor = get_configured_tls_acceptor().await; + + let server_controller = Arc::new(ServerController { + account: SECURE_SERVER_ACCOUNT.clone(), + secure_proxies: Default::default(), + secure_backend_url: Default::default(), + insecure_backend_url: Default::default(), + insecure_proxies: Default::default(), + }); + + while let Ok((stream, _sock_addr)) = socket.accept().await { + let websocket = tokio_tungstenite::accept_async(stream).await.unwrap(); + + let stream = WebStreamSocket::new(websocket); + + let mut stream = acceptor.accept(stream).await.unwrap(); + let server_controller = server_controller.clone(); + tokio::spawn(async move { + let server_controller = server_controller; + let Ok(server_type) = stream.read_buffer().await else { + error!("failed to read server type"); + return; + }; + + let Ok(server_type) = ServerType::deserialize(&mut Cursor::new(server_type)) else { + error!("failed to read server type"); + return; + }; + + match server_type { + ServerType::Proxy{ + addr, + cluster + } => { + + let mut write_lock = match cluster{ + ServerCluster::Auth => { + server_controller.insecure_proxies.write().await + } + ServerCluster::Secure => { + server_controller.secure_proxies.write().await + } + }; + + let server_controller_internal = server_controller.clone(); + + let remo = new_rmc_gateway_connection(stream.into(), move |r| + Arc::new(Proxy { + proxy: RemoteProxy::new(r), + ip: addr, + controller: server_controller_internal + })); + + write_lock.push(Arc::downgrade(&remo)); + + let url = match cluster{ + ServerCluster::Auth => { + server_controller.insecure_backend_url.read().await + } + ServerCluster::Secure => { + server_controller.secure_backend_url.read().await + } + }.clone(); + + if let Err(e) = remo.proxy.update_url(url.clone()).await { + error!("error whilest updating proxy url: {:?}", e); + } + + } + ServerType::Backend{ + name, + cluster + } => { + let mut url = match cluster{ + ServerCluster::Auth => { + server_controller.insecure_backend_url.write().await + } + ServerCluster::Secure => { + server_controller.secure_backend_url.write().await + } + }; + + *url = name; + drop(url); + + server_controller.update_urls(cluster).await; + + new_rmc_gateway_connection(stream.into(), |_| server_controller); + } + } + }); + } +} diff --git a/src/executables/proxy_insecure.rs b/src/executables/proxy_insecure.rs index 5860f13..f5a1741 100644 --- a/src/executables/proxy_insecure.rs +++ b/src/executables/proxy_insecure.rs @@ -1,13 +1,17 @@ +use rust_nex::executables::common::{LocalProxy, ProxyManagement, RemoteController, OWN_IP_PUBLIC}; use std::env; use std::ffi::CStr; use std::io::{Read, Write}; use std::net::{Ipv4Addr, SocketAddrV4, TcpListener, TcpStream}; +use std::sync::{Arc, OnceLock}; +use std::time::Duration; use bytemuck::{Pod, Zeroable}; use chacha20::{ChaCha20, Key}; use chacha20::cipher::{Iv, KeyIvInit, StreamCipher}; -use log::error; +use log::{error, warn}; +use macros::rmc_struct; use once_cell::sync::Lazy; use rsa::pkcs8::{DecodePrivateKey, DecodePublicKey, Document}; use rsa::{BigUint, Pkcs1v15Encrypt, RsaPrivateKey, RsaPublicKey}; @@ -16,13 +20,20 @@ use rsa::pss::BlindedSigningKey; use rsa::signature::{RandomizedSigner, SignatureEncoding}; use sha2::Sha256; use tokio::net::TcpSocket; +use tokio::sync::RwLock; use tokio::task; +use tokio::time::sleep; use rust_nex::common::setup; use rust_nex::executables::common::{OWN_IP_PRIVATE, SERVER_PORT}; +use rust_nex::executables::common::ServerCluster::Auth; +use rust_nex::executables::common::ServerType::{Backend, Proxy}; use rust_nex::prudp::packet::VirtualPort; use rust_nex::prudp::router::Router; +use rust_nex::prudp::station_url::StationUrl; use rust_nex::prudp::unsecure::Unsecure; use rust_nex::reggie::{establish_tls_connection_to, UnitPacketRead, UnitPacketWrite}; +use rust_nex::rmc::protocols::OnlyRemote; +use rust_nex::rmc::response::ErrorCode; use rust_nex::rmc::structures::RmcSerialize; use rust_nex::rnex_proxy_common::ConnectionInitData; @@ -33,30 +44,41 @@ static FORWARD_DESTINATION: Lazy = static FORWARD_DESTINATION_NAME: Lazy = Lazy::new(|| env::var("FORWARD_DESTINATION_NAME").expect("no forward destination name given")); -static RSA_PRIVKEY: Lazy = Lazy::new(|| { - let path = env::var("RSA_PRIVKEY") - .expect("RSA_PRIVKEY not set"); +#[rmc_struct(Proxy)] +#[derive(Default)] +struct DestinationHolder{ + url: RwLock +} - RsaPrivateKey::read_pkcs8_pem_file(&path) - .expect("unable to read private key") -}); +impl ProxyManagement for DestinationHolder{ + async fn update_url(&self, new_url: String) -> Result<(), ErrorCode> { + println!("updating url"); -static RSA_PUBKEY: Lazy = Lazy::new(|| { - RSA_PRIVKEY.to_public_key() -}); + let mut url = self.url.write().await; -static PUBKEY_ENCODED: Lazy = Lazy::new(|| { - RSA_PUBKEY.to_pkcs1_der().expect("unable to convert pubkey to der") -}); + *url = new_url; + + Ok(()) + } +} -static RSA_SIGNKEY: Lazy> = Lazy::new(|| - BlindedSigningKey::::new(RSA_PRIVKEY.clone()) -); #[tokio::main] async fn main() { setup(); + let conn = + rust_nex::reggie::rmc_connect_to( + "agmp-control.spfn.net", + Proxy { + addr: SocketAddrV4::new(*OWN_IP_PUBLIC, *SERVER_PORT), + cluster: Auth + }, + |r| Arc::new(DestinationHolder::default()) + ).await; + let dest_holder = conn.unwrap(); + + let (router_secure, _) = Router::new(SocketAddrV4::new(*OWN_IP_PRIVATE, *SERVER_PORT)) .await .expect("unable to start router"); @@ -76,9 +98,18 @@ async fn main() { return; }; + let dest_holder = dest_holder.clone(); + task::spawn(async move { + let dest = dest_holder.url.read().await; + + if *dest == ""{ + warn!("no destination set yet but connection attempted"); + return; + } + let mut stream - = establish_tls_connection_to(FORWARD_DESTINATION.as_str(), FORWARD_DESTINATION_NAME.as_str()).await; + = establish_tls_connection_to(&dest, &dest).await; if let Err(e) = stream.send_buffer(&ConnectionInitData{ prudpsock_addr: conn.socket_addr, @@ -113,6 +144,9 @@ async fn main() { return; } }, + _ = sleep(Duration::from_secs(10)) => { + conn.send([0,0,0,0,0].to_vec()).await; + } } } }); diff --git a/src/executables/proxy_secure.rs b/src/executables/proxy_secure.rs index 2fea0ac..7b2a1b3 100644 --- a/src/executables/proxy_secure.rs +++ b/src/executables/proxy_secure.rs @@ -1,43 +1,65 @@ - - -use std::env; -use std::ffi::CStr; -use std::io::{Read, Write}; -use std::net::{Ipv4Addr, SocketAddrV4, TcpListener, TcpStream}; -use bytemuck::{Pod, Zeroable}; -use chacha20::{ChaCha20, Key}; -use chacha20::cipher::{Iv, KeyIvInit, StreamCipher}; -use log::error; -use once_cell::sync::Lazy; -use rsa::pkcs8::{DecodePrivateKey, DecodePublicKey, Document}; -use rsa::{BigUint, Pkcs1v15Encrypt, RsaPrivateKey, RsaPublicKey}; -use rsa::pkcs1::EncodeRsaPublicKey; -use rsa::pss::BlindedSigningKey; -use rsa::signature::{RandomizedSigner, SignatureEncoding}; -use sha2::Sha256; -use tokio::net::TcpSocket; +use std::net::SocketAddrV4; +use std::sync::Arc; +use std::time::Duration; +use futures::future::Remote; +use log::{error, warn}; +use macros::rmc_struct; +use tokio::sync::RwLock; use tokio::task; +use tokio::time::sleep; use rust_nex::common::setup; -use rust_nex::executables::common::{OWN_IP_PRIVATE, SECURE_SERVER_ACCOUNT, SERVER_PORT}; +use rust_nex::executables::common::{ProxyManagement, RemoteController, RemoteControllerManagement, OWN_IP_PRIVATE, OWN_IP_PUBLIC, SECURE_SERVER_ACCOUNT, SERVER_PORT}; +use rust_nex::executables::common::ServerCluster::Auth; +use rust_nex::executables::common::ServerType::Proxy; use rust_nex::prudp::packet::VirtualPort; use rust_nex::prudp::router::Router; use rust_nex::prudp::secure::Secure; use rust_nex::prudp::unsecure::Unsecure; -use rust_nex::reggie::{establish_tls_connection_to, UnitPacketRead, UnitPacketWrite}; -use rust_nex::rmc::structures::RmcSerialize; +use rust_nex::reggie::establish_tls_connection_to; +use rust_nex::rmc::response::ErrorCode; use rust_nex::rnex_proxy_common::ConnectionInitData; +use rust_nex::executables::common::LocalProxy; +use rust_nex::reggie::UnitPacketWrite; +use rust_nex::rmc::structures::RmcSerialize; +use rust_nex::reggie::UnitPacketRead; +use rust_nex::rmc::protocols::RemoteInstantiatable; +#[rmc_struct(Proxy)] +struct DestinationHolder{ + url: RwLock, + controller: RemoteController +} +impl ProxyManagement for DestinationHolder{ + async fn update_url(&self, new_url: String) -> Result<(), ErrorCode> { + let mut url = self.url.write().await; + + *url = new_url; + + Ok(()) + } +} -static FORWARD_DESTINATION: Lazy = - Lazy::new(|| env::var("FORWARD_DESTINATION").expect("no forward destination given")); -static FORWARD_DESTINATION_NAME: Lazy = - Lazy::new(|| env::var("FORWARD_DESTINATION_NAME").expect("no forward destination name given")); #[tokio::main] async fn main() { setup(); + let conn = + rust_nex::reggie::rmc_connect_to( + "agmp-control.spfn.net", + Proxy { + addr: SocketAddrV4::new(*OWN_IP_PUBLIC, *SERVER_PORT), + cluster: Auth + }, + |r| Arc::new(DestinationHolder{ + url: Default::default(), + controller: RemoteController::new(r) + }) + ).await; + let dest_holder = conn.unwrap(); + + let (router_secure, _) = Router::new(SocketAddrV4::new(*OWN_IP_PRIVATE, *SERVER_PORT)) .await .expect("unable to start router"); @@ -45,7 +67,7 @@ async fn main() { let mut socket_secure = router_secure .add_socket(VirtualPort::new(1, 10), Secure( "6f599f81", - &SECURE_SERVER_ACCOUNT + dest_holder.controller.get_secure_account().await.unwrap() )) .await .expect("unable to add socket"); @@ -58,9 +80,18 @@ async fn main() { return; }; + let dest_holder = dest_holder.clone(); + task::spawn(async move { + let dest = dest_holder.url.read().await; + + if *dest == ""{ + warn!("no destination set yet but connection attempted"); + return; + } + let mut stream - = establish_tls_connection_to(FORWARD_DESTINATION.as_str(), FORWARD_DESTINATION_NAME.as_str()).await; + = establish_tls_connection_to(&dest, &dest).await; if let Err(e) = stream.send_buffer(&ConnectionInitData{ prudpsock_addr: conn.socket_addr, @@ -70,6 +101,8 @@ async fn main() { return; }; + + loop { tokio::select! { data = conn.recv() => { @@ -95,6 +128,9 @@ async fn main() { return; } }, + _ = sleep(Duration::from_secs(10)) => { + conn.send([0,0,0,0,0].to_vec()).await; + } } } }); diff --git a/src/grpc/account.rs b/src/grpc/account.rs index 4ec4752..d369501 100644 --- a/src/grpc/account.rs +++ b/src/grpc/account.rs @@ -1,23 +1,12 @@ use std::{env, result}; use std::array::TryFromSliceError; -use std::net::{Ipv4Addr}; use std::str::FromStr; use json::{object, JsonValue}; use once_cell::sync::Lazy; use reqwest::{Body, Method, Url}; use reqwest::header::HeaderValue; -use rocket::serde::json::Json; -use serde::Serialize; use thiserror::Error; -use tonic::metadata::{Ascii, MetadataValue}; -use tonic::{Request, transport}; -use tonic::codegen::InterceptedService; -use tonic::transport::Channel; use crate::grpc::account::Error::SomethingHappened; -use crate::grpc::InterceptorFunc; -use crate::grpc::protobufs::account::account_client::AccountClient; -use crate::grpc::protobufs::account::{GetNexPasswordRequest, GetUserDataRequest, GetUserDataResponse}; - static API_KEY: Lazy = Lazy::new(||{ let key = env::var("ACCOUNT_GQL_API_KEY") .expect("no graphql ip specified"); diff --git a/src/lib.rs b/src/lib.rs index fb941c0..53553c2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,11 @@ +#![allow(dead_code)] +// rnex makes extensive use of async functions in public traits +// this is however fine because these traits should never(and i mean NEVER) be used dynamically +#![allow(async_fn_in_trait)] +//#![warn(missing_docs)] + + + extern crate self as rust_nex; pub mod endianness; diff --git a/src/main.rs b/src/main.rs index 34350ba..338b9d3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ #![allow(dead_code)] -#![warn(missing_docs)] +#![allow(async_fn_in_trait)] +//#![warn(missing_docs)] //! # Splatoon RNEX server //! @@ -63,6 +64,7 @@ mod versions; mod web; pub mod reggie; pub mod util; +pub mod common; diff --git a/src/nex/account.rs b/src/nex/account.rs index f18dbe8..75c53ce 100644 --- a/src/nex/account.rs +++ b/src/nex/account.rs @@ -1,10 +1,11 @@ +use macros::RmcSerialize; - +#[derive(RmcSerialize)] +#[derive(Clone)] pub struct Account{ pub pid: u32, - pub username: Box, + pub username: String, pub kerbros_password: [u8; 16], - } impl Account{ diff --git a/src/nex/auth_handler.rs b/src/nex/auth_handler.rs index 04f0f4f..a2ba86d 100644 --- a/src/nex/auth_handler.rs +++ b/src/nex/auth_handler.rs @@ -1,3 +1,6 @@ +use crate::executables::common::RemoteControllerManagement; +use std::sync::Arc; +use rust_nex::executables::common::RemoteController; use crate::grpc::account; use crate::kerberos::{derive_key, KerberosDateTime, Ticket}; use crate::nex::account::Account; @@ -7,9 +10,9 @@ use crate::rmc::response::ErrorCode::Core_Unknown; use crate::rmc::structures::any::Any; use crate::rmc::structures::connection_data::ConnectionData; use crate::rmc::structures::qresult::QResult; -use crate::rmc::structures::RmcSerialize; -use crate::{define_rmc_proto, kerberos, rmc}; +use crate::{define_rmc_proto, kerberos}; use macros::rmc_struct; +use crate::rmc::protocols::OnlyRemote; define_rmc_proto!( proto AuthClientProtocol{ @@ -21,7 +24,8 @@ define_rmc_proto!( pub struct AuthHandler { pub destination_server_acct: &'static Account, pub build_name: &'static str, - pub station_url: &'static str, + //pub station_url: &'static str, + pub control_server: Arc>, } pub fn generate_ticket( @@ -56,14 +60,14 @@ async fn get_login_data_by_pid(pid: u32) -> Option<(u32, [u8; 16])> { } impl Auth for AuthHandler { - async fn login(&self, name: String) -> Result<(), ErrorCode> { + async fn login(&self, _name: String) -> Result<(), ErrorCode> { todo!() } async fn login_ex( &self, name: String, - extra_data: Any, + _extra_data: Any, ) -> Result<(QResult, u32, Vec, ConnectionData, String), ErrorCode> { let Ok(pid) = name.parse() else { return Err(ErrorCode::Core_InvalidArgument); @@ -83,9 +87,13 @@ impl Auth for AuthHandler { let ticket = generate_ticket(source_login_data, destination_login_data); let result = QResult::success(Core_Unknown); + + let Ok(addr) = self.control_server.get_secure_proxy_url().await else { + return Err(ErrorCode::Core_Exception); + }; let connection_data = ConnectionData { - station_url: self.station_url.to_string(), + station_url: addr, special_station_url: "".to_string(), //date_time: KerberosDateTime::new(1,1,1,1,1,1), date_time: KerberosDateTime::now(), @@ -126,11 +134,11 @@ impl Auth for AuthHandler { Ok((result, ticket.into())) } - async fn get_pid(&self, username: String) -> Result { + async fn get_pid(&self, _username: String) -> Result { Err(ErrorCode::Core_Exception) } - async fn get_name(&self, pid: u32) -> Result { + async fn get_name(&self, _pid: u32) -> Result { Err(ErrorCode::Core_Exception) } } diff --git a/src/nex/matchmake.rs b/src/nex/matchmake.rs index d82dce5..bba55f5 100644 --- a/src/nex/matchmake.rs +++ b/src/nex/matchmake.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::str::FromStr; use std::sync::{Arc, Weak}; use std::sync::atomic::AtomicU32; -use std::sync::atomic::Ordering::{Relaxed, Release}; +use std::sync::atomic::Ordering::Relaxed; use std::time::Duration; use log::info; use rand::random; @@ -50,7 +50,7 @@ impl MatchmakeManager{ async fn garbage_collect(&self){ info!("running rnex garbage collector over all sessions and users"); - let mut idx = 0; + let idx = 0; let mut to_be_deleted_gids = Vec::new(); @@ -64,7 +64,7 @@ impl MatchmakeManager{ session_pair }{ - let mut session = session.lock().await; + let session = session.lock().await; if !session.is_reachable(){ to_be_deleted_gids.push(gid); diff --git a/src/nex/remote_console.rs b/src/nex/remote_console.rs index 3744aa1..3d3cb49 100644 --- a/src/nex/remote_console.rs +++ b/src/nex/remote_console.rs @@ -1,8 +1,6 @@ -use macros::rmc_struct; -use crate::rmc::protocols::notifications::{Notification, NotificationEvent, RawNotification, RawNotificationInfo, RemoteNotification}; +use crate::rmc::protocols::notifications::{Notification, RawNotification, RawNotificationInfo, RemoteNotification}; use crate::rmc::protocols::nat_traversal::{NatTraversalConsole, RemoteNatTraversalConsole, RawNatTraversalConsoleInfo, RawNatTraversalConsole}; use crate::define_rmc_proto; -use crate::nex::user::RemoteUserProtocol; define_rmc_proto!( proto Console{ diff --git a/src/nex/user.rs b/src/nex/user.rs index 019b2bd..e166a80 100644 --- a/src/nex/user.rs +++ b/src/nex/user.rs @@ -1,14 +1,12 @@ -use std::io::ErrorKind::HostUnreachable; use crate::define_rmc_proto; use crate::nex::matchmake::{ExtendedMatchmakeSession, MatchmakeManager}; use crate::nex::remote_console::RemoteConsole; use crate::prudp::sockaddr::PRUDPSockAddr; -use crate::prudp::station_url::Type::{PRUDP, PRUDPS}; use crate::prudp::station_url::UrlOptions::{ - Address, NatFiltering, NatMapping, NatType, Platform, Port, PrincipalID, RVConnectionID, - StreamID, PMP, UPNP, + Address, NatFiltering, NatMapping, NatType, Port, PrincipalID, RVConnectionID, + }; -use crate::prudp::station_url::{nat_types, StationUrl, Type}; +use crate::prudp::station_url::{StationUrl}; use crate::rmc::protocols::matchmake::{ Matchmake, RawMatchmake, RawMatchmakeInfo, RemoteMatchmake, }; @@ -24,15 +22,12 @@ use crate::rmc::structures::matchmake::{AutoMatchmakeParam, CreateMatchmakeSessi use crate::rmc::structures::qresult::QResult; use macros::rmc_struct; -use std::net::{Ipv4Addr, SocketAddrV4}; use std::sync::{Arc, Weak}; -use log::{error, info}; -use rocket::http::ext::IntoCollection; +use log::info; use tokio::sync::{Mutex, RwLock}; -use tonic::Code::InvalidArgument; use crate::prudp::station_url::nat_types::PUBLIC; use crate::rmc::protocols::notifications::{NotificationEvent, RemoteNotification}; -use crate::rmc::response::ErrorCode::{Core_Exception, Core_InvalidArgument, RendezVous_AccountExpired, RendezVous_SessionVoid}; +use crate::rmc::response::ErrorCode::{Core_Exception, Core_InvalidArgument, RendezVous_AccountExpired}; define_rmc_proto!( proto UserProtocol{ @@ -129,7 +124,7 @@ impl Secure for User { public_station }; - let mut both = [&mut public_station, &mut private_station]; + let both = [&mut public_station, &mut private_station]; for station in both { station.options.retain(|v| { @@ -208,7 +203,7 @@ impl MatchmakeExtension for User { Ok(()) } - async fn get_playing_session(&self, pids: Vec) -> Result, ErrorCode> { + async fn get_playing_session(&self, _pids: Vec) -> Result, ErrorCode> { Ok(Vec::new()) } @@ -385,7 +380,7 @@ impl MatchmakeExtension for User { } impl Matchmake for User { - async fn unregister_gathering(&self, gid: u32) -> Result { + async fn unregister_gathering(&self, _gid: u32) -> Result { Ok(true) } async fn get_session_urls(&self, gid: u32) -> Result, ErrorCode> { @@ -460,7 +455,7 @@ impl Matchmake for User { Ok(()) } - async fn migrate_gathering_ownership(&self, gid: u32, candidates: Vec, participants_only: bool) -> Result<(), ErrorCode> { + async fn migrate_gathering_ownership(&self, gid: u32, candidates: Vec, _participants_only: bool) -> Result<(), ErrorCode> { let session = self.matchmake_manager.get_session(gid).await?; let mut session = session.lock().await; @@ -522,11 +517,11 @@ impl NatTraversal for User { Ok(()) } - async fn report_nat_traversal_result(&self, cid: u32, result: bool, rtt: u32) -> Result<(), ErrorCode> { + async fn report_nat_traversal_result(&self, _cid: u32, _result: bool, _rtt: u32) -> Result<(), ErrorCode> { Ok(()) } - async fn request_probe_initiation(&self, station_to_probe: String) -> Result<(), ErrorCode> { + async fn request_probe_initiation(&self, _station_to_probe: String) -> Result<(), ErrorCode> { info!("NO!"); Err(RendezVous_AccountExpired) } diff --git a/src/prudp/packet.rs b/src/prudp/packet.rs index 2bcd9c9..b66167c 100644 --- a/src/prudp/packet.rs +++ b/src/prudp/packet.rs @@ -9,7 +9,7 @@ use std::io::{Cursor, Read, Seek, Write}; use std::net::SocketAddrV4; use bytemuck::{Pod, Zeroable}; use hmac::{Hmac, Mac}; -use log::{error, trace, warn}; +use log::{error, warn}; use md5::{Md5, Digest}; use thiserror::Error; use v_byte_macros::{SwapEndian}; diff --git a/src/prudp/router.rs b/src/prudp/router.rs index ce58b90..3aa8bdd 100644 --- a/src/prudp/router.rs +++ b/src/prudp/router.rs @@ -5,11 +5,11 @@ use tokio::net::UdpSocket; use std::net::{SocketAddr, SocketAddrV4}; use std::net::SocketAddr::V4; use std::sync::{Arc, Weak}; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool}; use std::time::Duration; use tokio::task::JoinHandle; use once_cell::sync::Lazy; -use log::{error, info, trace}; +use log::{error, info}; use thiserror::Error; use tokio::select; use tokio::sync::RwLock; diff --git a/src/prudp/secure.rs b/src/prudp/secure.rs index ecdd70a..e0438d7 100644 --- a/src/prudp/secure.rs +++ b/src/prudp/secure.rs @@ -10,7 +10,6 @@ use crate::kerberos::{derive_key, TicketInternalData}; use crate::nex::account::Account; use crate::prudp::packet::PRUDPV1Packet; use crate::prudp::socket::{CryptoHandler, CryptoHandlerConnectionInstance, EncryptionPair}; -use crate::prudp::unsecure::UnsecureInstance; use crate::rmc::structures::RmcSerialize; pub fn read_secure_connection_data(data: &[u8], act: &Account) -> Option<([u8; 32], u32, u32)>{ @@ -103,7 +102,7 @@ pub fn generate_secure_encryption_pairs(mut session_key: [u8; 32], count: u8) -> } -pub struct Secure(pub &'static str, pub &'static Account); +pub struct Secure(pub &'static str, pub Account); pub struct SecureInstance { @@ -186,7 +185,7 @@ impl CryptoHandlerConnectionInstance for SecureInstance { packet.calculate_and_assign_signature(self.access_key, Some(self.session_key), Some(self.self_signature)); } - fn verify_packet(&self, packet: &PRUDPV1Packet) -> bool { + fn verify_packet(&self, _packet: &PRUDPV1Packet) -> bool { true } } \ No newline at end of file diff --git a/src/prudp/socket.rs b/src/prudp/socket.rs index 20b4244..0455bd7 100644 --- a/src/prudp/socket.rs +++ b/src/prudp/socket.rs @@ -1,37 +1,24 @@ -use crate::nex::account::Account; use crate::prudp::packet::flags::{ACK, HAS_SIZE, MULTI_ACK, NEED_ACK, RELIABLE}; use crate::prudp::packet::types::{CONNECT, DATA, DISCONNECT, PING, SYN}; use crate::prudp::packet::PacketOption::{ - ConnectionSignature, FragmentId, InitialSequenceId, MaximumSubstreamId, SupportedFunctions, + ConnectionSignature, FragmentId, MaximumSubstreamId, SupportedFunctions, }; -use crate::prudp::packet::{PRUDPV1Header, PRUDPV1Packet, PacketOption, TypesFlags, VirtualPort}; -use crate::prudp::router::{Error, Router}; +use crate::prudp::packet::{PRUDPV1Header, PRUDPV1Packet, TypesFlags, VirtualPort}; use crate::prudp::sockaddr::PRUDPSockAddr; use async_trait::async_trait; -use chrono::NaiveTime; -use hmac::digest::consts::U5; use log::info; -use log::{error, trace, warn}; -use once_cell::sync::Lazy; -use rand::random; -use rc4::{Key, KeyInit, Rc4, StreamCipher}; -use rocket::http::hyper::body::HttpBody; -use std::collections::{BTreeMap, HashMap, VecDeque}; -use std::fmt::{Debug, Formatter}; -use std::future::Future; +use log::error; +use rc4::StreamCipher; +use std::collections::{BTreeMap, HashMap}; use std::marker::PhantomData; -use std::mem; -use std::net::SocketAddrV4; use std::ops::Deref; -use std::pin::Pin; use std::sync::{Arc, Weak}; use std::time::Duration; use tokio::net::UdpSocket; use tokio::sync::mpsc::{channel, Receiver, Sender}; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::Mutex; use tokio::time::{sleep, Instant}; -use tokio_stream::Stream; // due to the way this is designed crashing the router thread causes deadlock, sorry ;-; // (maybe i will fix that some day) @@ -479,7 +466,7 @@ impl InternalSocket { self.send_packet_unbuffered(address, response).await; } - async fn handle_data(&self, address: PRUDPSockAddr, mut packet: PRUDPV1Packet) { + async fn handle_data(&self, address: PRUDPSockAddr, packet: PRUDPV1Packet) { info!("got data"); if packet.header.types_and_flags.get_flags() & (NEED_ACK | RELIABLE) @@ -533,7 +520,7 @@ impl InternalSocket { let conn = conn.clone(); drop(connections); - let mut conn = conn.lock().await; + let conn = conn.lock().await; let mut response = packet.base_acknowledgement_packet(); response.header.types_and_flags.set_flag(HAS_SIZE | ACK); @@ -553,7 +540,7 @@ impl InternalSocket { let conn = conn.clone(); drop(connections); - let mut conn = conn.lock().await; + let conn = conn.lock().await; let mut response = packet.base_acknowledgement_packet(); response.header.types_and_flags.set_flag(HAS_SIZE | ACK); diff --git a/src/prudp/station_url.rs b/src/prudp/station_url.rs index 0e20d6c..441c1b3 100644 --- a/src/prudp/station_url.rs +++ b/src/prudp/station_url.rs @@ -2,7 +2,6 @@ use std::net::Ipv4Addr; use log::error; use std::fmt::{Debug, Display, Formatter, Write}; use std::io::Read; -use rocket::delete; use crate::prudp::station_url::Type::{PRUDP, PRUDPS, UDP}; use crate::prudp::station_url::UrlOptions::{Address, ConnectionID, NatFiltering, NatMapping, NatType, Platform, PMP, Port, PrincipalID, RVConnectionID, StreamID, StreamType, UPNP, PID}; use crate::rmc::structures::Error::StationUrlInvalid; diff --git a/src/reggie.rs b/src/reggie.rs index 9985ec9..d3bd711 100644 --- a/src/reggie.rs +++ b/src/reggie.rs @@ -1,18 +1,26 @@ use std::{env, fs, io}; +use std::io::{Error, ErrorKind}; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; +use futures::{SinkExt, StreamExt}; use macros::{method_id, rmc_proto, rmc_struct}; use once_cell::sync::Lazy; use rustls::{ClientConfig, RootCertStore, ServerConfig}; use rustls::client::WebPkiServerVerifier; use rustls::server::WebPkiClientVerifier; use rustls_pki_types::{CertificateDer, PrivateKeyDer, ServerName, TrustAnchor}; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use tokio::net::TcpStream; +use thiserror::Error; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}; +use tokio::net::{TcpListener, TcpStream}; use tokio_rustls::{TlsAcceptor, TlsConnector}; use tokio_rustls::client::TlsStream; +use tokio_tungstenite::{connect_async, WebSocketStream}; +use tokio_tungstenite::tungstenite::Message; use webpki::anchor_from_trusted_cert; +use rust_nex::common::setup; use crate::define_rmc_proto; -use crate::endianness::IS_BIG_ENDIAN; +use crate::rmc::protocols::{new_rmc_gateway_connection, OnlyRemote, RmcCallable, RmcConnection}; use crate::rmc::response::ErrorCode; use crate::rmc::structures::RmcSerialize; @@ -116,12 +124,12 @@ pub trait UnitPacketWrite: AsyncWrite + Unpin{ impl UnitPacketWrite for T{} -pub async fn establish_tls_connection_to(address: &str, server_name: &'static str) -> TlsStream{ +pub async fn establish_tls_connection_to(address: &str, server_name: &str) -> TlsStream{ let connector = get_configured_tls_connector().await; let stream = TcpStream::connect(address).await.unwrap(); - let stream = connector.connect(ServerName::try_from(server_name).unwrap(), stream).await + let stream = connector.connect(ServerName::try_from(server_name.to_owned()).unwrap(), stream).await .expect("unable to connect via tls"); stream @@ -147,3 +155,213 @@ impl RmcTestProto for TestStruct{ Ok("heya".into()) } } + + +pub struct WebStreamSocket { + socket: WebSocketStream, + incoming_buffer: Vec, + finished_reading: bool, +} + +impl WebStreamSocket { + pub fn new(socket: WebSocketStream) -> Self{ + Self{ + incoming_buffer: Default::default(), + socket, + finished_reading: false, + } + } +} + +impl AsyncWrite for WebStreamSocket { + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + let this = &mut self.get_mut().socket; + + let msg = Message::binary(buf.to_vec()); + + match this.poll_ready_unpin(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(e)) => return Poll::Ready(Err(Error::new(ErrorKind::Other, e))), + Poll::Ready(Ok(())) => { + // continue on + } + } + + let Err(e) = this.start_send_unpin(msg) else { + return Poll::Ready(Ok(buf.len())); + }; + + + Poll::Ready(Err(Error::new(ErrorKind::Other, e))) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = &mut self.get_mut().socket; + + match this.poll_flush_unpin(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => Poll::Ready(Err(Error::new(ErrorKind::Other, e))), + Poll::Ready(Ok(())) => Poll::Ready(Ok(())) + } + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = &mut self.get_mut().socket; + + match this.poll_close_unpin(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => Poll::Ready(Err(Error::new(ErrorKind::Other, e))), + Poll::Ready(Ok(())) => Poll::Ready(Ok(())) + } + } +} + +impl AsyncRead for WebStreamSocket { + fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + let Self { + incoming_buffer, + socket, + finished_reading + } = &mut self.get_mut(); + + if !*finished_reading { + match socket.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(msg))) => { + let Message::Binary(data) = msg else { + return Poll::Ready(Err(Error::new(ErrorKind::InvalidData, "got non binary data when trying to emulate stream"))); + }; + + incoming_buffer.extend_from_slice(&data); + } + Poll::Ready(Some(Err(e))) if incoming_buffer.is_empty() => { + return Poll::Ready(Err(Error::new(ErrorKind::Other, e))); + } + Poll::Ready(None) if incoming_buffer.is_empty() => { + *finished_reading = true; + } + Poll::Pending if incoming_buffer.is_empty() => { + return Poll::Pending + } + _ => {} + } + } + + + + if !incoming_buffer.is_empty(){ + let read_ammount = buf.remaining(); + + let ammount_taken = read_ammount.min(incoming_buffer.len()); + + buf.put_slice(&incoming_buffer[0..ammount_taken]); + + *incoming_buffer = (&incoming_buffer.get(ammount_taken..).unwrap_or(&[])).to_vec(); + } + + Poll::Ready(Ok(())) + + + /*if buf.remaining() == 0{ + + + return Poll::Ready(Ok(())); + } + + match socket.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(msg))) => { + let Message::Binary(data) = msg else { + return Poll::Ready(Err(Error::new(ErrorKind::InvalidData, "got non binary data when trying to emulate stream"))); + }; + + if data.len() <= buf.remaining() { + // if no data remains there is no reason to store anything + buf.put_slice(&data); + } else { + let read_ammount = buf.remaining(); + + let ammount_taken = read_ammount.min(data.len()); + + buf.put_slice(&data[..ammount_taken]); + + *incoming_buffer = data[ammount_taken..].to_vec(); + } + + + Poll::Ready(Ok(())) + } + Poll::Ready(Some(Err(e))) => Poll::Ready(Err(Error::new(ErrorKind::Other, e))), + // EOF + Poll::Ready(None) => Poll::Ready(Ok(())), + Poll::Pending => Poll::Pending + }*/ + } +} + +#[derive(Error, Debug)] +pub enum ConnectError{ + #[error(transparent)] + Tungstenite(#[from] tungstenite::error::Error), + #[error(transparent)] + DataSendError(#[from] io::Error), +} + +pub async fn rmc_connect_to(url: &str, init_data: U, create_func: F) -> Result, ConnectError> + where + F: FnOnce(RmcConnection) -> Arc{ + let (stream, _)= connect_async(format!("ws://{}/", url)).await?; + + let webstreamsocket = WebStreamSocket::new(stream); + + let connector = get_configured_tls_connector().await; + + let mut connection = connector.connect(ServerName::try_from(url.to_string()).unwrap(), webstreamsocket).await.unwrap(); + + connection.send_buffer(&init_data.to_data()).await?; + + let rmc = new_rmc_gateway_connection(connection.into(), create_func); + + Ok(rmc) +} + +#[tokio::test] +async fn test(){ + setup(); + + let socket = connect_async("ws://192.168.178.120:12345/").await; + let (stream, resp) = socket.unwrap(); + + let mut webstreamsocket = WebStreamSocket::new(stream); + + let connector = get_configured_tls_connector().await; + + let connection = connector.connect(ServerName::try_from("agmp-tv.spfn.net").unwrap(), webstreamsocket).await.unwrap(); + + let rmc = new_rmc_gateway_connection(connection.into(), |r| { + Arc::new(OnlyRemote::::new(r)) + }); + + println!("{:?}", rmc.test().await); +} + +#[tokio::test] +async fn test_server(){ + setup(); + + let socket = TcpListener::bind("192.168.178.120:12345").await.unwrap(); + + let acceptor = get_configured_tls_acceptor().await; + + while let Ok((stream, _sock_addr)) = socket.accept().await{ + let websocket = tokio_tungstenite::accept_async(stream).await.unwrap(); + + let webstreamsocket = WebStreamSocket::new(websocket); + + let stream = acceptor.accept(webstreamsocket).await.unwrap(); + + new_rmc_gateway_connection(stream.into(), |_| { + Arc::new( + TestStruct + ) + }); + } +} diff --git a/src/rmc/protocols/matchmake_ext.rs b/src/rmc/protocols/matchmake_ext.rs index d492503..b795d3d 100644 --- a/src/rmc/protocols/matchmake_ext.rs +++ b/src/rmc/protocols/matchmake_ext.rs @@ -1,5 +1,4 @@ use macros::{method_id, rmc_proto}; -use crate::prudp::station_url::StationUrl; use crate::rmc::response::ErrorCode; #[rmc_proto(50)] diff --git a/src/rmc/protocols/mod.rs b/src/rmc/protocols/mod.rs index 6c9b1c3..b206b44 100644 --- a/src/rmc/protocols/mod.rs +++ b/src/rmc/protocols/mod.rs @@ -10,28 +10,21 @@ pub mod matchmake_ext; pub mod ranking; use crate::util::{SendingBufferConnection, SplittableBufferConnection}; -use crate::prudp::socket::{ExternalConnection, SendingConnection}; use crate::rmc::message::RMCMessage; use crate::rmc::protocols::RemoteCallError::ConnectionBroke; use crate::rmc::response::{ErrorCode, RMCResponse, RMCResponseResult}; use crate::rmc::structures; -use crate::rmc::structures::connection_data::ConnectionData; -use crate::rmc::structures::matchmake::AutoMatchmakeParam; -use crate::rmc::structures::{Error, RmcSerialize}; -use async_trait::async_trait; -use chrono::TimeDelta; +use crate::rmc::structures::RmcSerialize; use log::{error, info}; -use macros::method_id; -use macros::{rmc_proto, rmc_struct}; -use paste::paste; use std::collections::HashMap; +use std::future::Future; use std::io::Cursor; -use std::ops::{Add, Deref}; -use std::sync::{Arc, Condvar}; +use std::ops::Deref; +use std::sync::Arc; use std::time::Duration; use thiserror::Error; use tokio::sync::{Mutex, Notify}; -use tokio::time::{sleep_until, Instant}; +use tokio::time::{sleep, sleep_until, Instant}; use crate::result::ResultExtension; #[derive(Error, Debug)] @@ -74,6 +67,10 @@ impl RmcConnection { Ok(()) } + + pub async fn disconnect(&self){ + self.0.disconnect().await; + } } impl RmcResponseReceiver { @@ -173,6 +170,9 @@ macro_rules! define_rmc_proto { fn new(conn: rust_nex::rmc::protocols::RmcConnection) -> Self{ Self(conn) } + async fn disconnect(&self){ + self.0.disconnect().await; + } } impl rust_nex::rmc::protocols::HasRmcConnection for []{ @@ -192,11 +192,11 @@ macro_rules! define_rmc_proto { impl RmcCallable for () { async fn rmc_call( &self, - remote_response_connection: &SendingBufferConnection, - protocol_id: u16, - method_id: u32, - call_id: u32, - rest: Vec, + _remote_response_connection: &SendingBufferConnection, + _protocol_id: u16, + _method_id: u32, + _call_id: u32, + _rest: Vec, ) { //todo: maybe reply with not implemented(?) } @@ -204,6 +204,7 @@ impl RmcCallable for () { pub trait RemoteInstantiatable{ fn new(conn: RmcConnection) -> Self; + async fn disconnect(&self); } pub struct OnlyRemote(T); @@ -220,10 +221,15 @@ impl OnlyRemote{ pub fn new(conn: RmcConnection) -> Self{ Self(T::new(conn)) } + + pub async fn disconnect(&self) { + self.0.disconnect().await; + } } impl RmcCallable for OnlyRemote{ - fn rmc_call(&self, responder: &SendingBufferConnection, protocol_id: u16, method_id: u32, call_id: u32, rest: Vec) -> impl std::future::Future + Send { + fn rmc_call(&self, _responder: &SendingBufferConnection, _protocol_id: u16, _method_id: u32, _call_id: u32, _rest: Vec) -> impl Future + Send { + // maybe respond with not implemented or something async{} } } @@ -243,6 +249,12 @@ async fn handle_incoming( return }; + // protocol 0 is hardcoded to be the no protocol protocol aka keepalive protocol + if *proto_id == 0{ + println!("got keepalive"); + continue; + } + if (proto_id & 0x80) == 0{ let Some(response) = RMCResponse::new(&mut Cursor::new(v)).display_err_or_some() else { error!("ending rmc gateway."); @@ -292,6 +304,8 @@ where let rmc_conn = RmcConnection(sending_conn, response_recv); + let sending_conn = conn.duplicate_sender(); + let exposed_object = (create_internal)(rmc_conn); { @@ -304,7 +318,25 @@ where incoming ).await; }); + + + tokio::spawn(async move { + while sending_conn.is_alive(){ + sending_conn.send([0,0,0,0,0].to_vec()).await; + sleep(Duration::from_secs(10)).await; + } + }); } exposed_object } + +impl RmcCallable for Arc{ + fn rmc_call(&self, responder: &SendingBufferConnection, protocol_id: u16, method_id: u32, call_id: u32, rest: Vec) -> impl Future + Send { + self.as_ref().rmc_call(responder, protocol_id, method_id, call_id, rest) + } +} + +define_rmc_proto! { + proto NoProto{} +} \ No newline at end of file diff --git a/src/rmc/protocols/nat_traversal.rs b/src/rmc/protocols/nat_traversal.rs index 328a651..222e62c 100644 --- a/src/rmc/protocols/nat_traversal.rs +++ b/src/rmc/protocols/nat_traversal.rs @@ -1,6 +1,5 @@ use macros::{method_id, rmc_proto}; use crate::rmc::response::ErrorCode; -use crate::rmc::structures::matchmake::{CreateMatchmakeSessionParam, MatchmakeSession}; #[rmc_proto(3)] pub trait NatTraversal{ diff --git a/src/rmc/protocols/notifications.rs b/src/rmc/protocols/notifications.rs index 416238a..164bf83 100644 --- a/src/rmc/protocols/notifications.rs +++ b/src/rmc/protocols/notifications.rs @@ -1,6 +1,4 @@ use macros::{method_id, rmc_proto, rmc_struct, RmcSerialize}; -use crate::rmc::response::ErrorCode; -use crate::rmc::structures::qresult::QResult; pub mod notification_types{ pub const OWNERSHIP_CHANGED: u32 = 4000; diff --git a/src/rmc/protocols/ranking.rs b/src/rmc/protocols/ranking.rs index 974af41..298922f 100644 --- a/src/rmc/protocols/ranking.rs +++ b/src/rmc/protocols/ranking.rs @@ -1,4 +1,4 @@ -use macros::{method_id, rmc_proto}; +use macros::rmc_proto; #[rmc_proto(112)] pub trait Ranking{ diff --git a/src/rmc/protocols/secure.rs b/src/rmc/protocols/secure.rs index 39187bf..3071c3a 100644 --- a/src/rmc/protocols/secure.rs +++ b/src/rmc/protocols/secure.rs @@ -1,8 +1,6 @@ use macros::{method_id, rmc_proto}; use crate::prudp::station_url::StationUrl; use crate::rmc::response::ErrorCode; -use crate::rmc::structures::any::Any; -use crate::rmc::structures::connection_data::ConnectionData; use crate::rmc::structures::qresult::QResult; #[rmc_proto(11)] diff --git a/src/rmc/response.rs b/src/rmc/response.rs index c49552f..dcdebc6 100644 --- a/src/rmc/response.rs +++ b/src/rmc/response.rs @@ -1,3 +1,7 @@ +// i seriously dont know why the compiler is complaining about unused parentheses in the repr +// attributes but this gets it to not complain anymore +#![allow(unused_parens)] + use std::io; use std::io::{Read, Seek, Write}; use std::mem::transmute; @@ -5,14 +9,8 @@ use bytemuck::bytes_of; use log::error; use v_byte_macros::EnumTryInto; use crate::endianness::{ReadExtensions, IS_BIG_ENDIAN}; -use crate::prudp::packet::{PRUDPV1Packet}; -use crate::prudp::packet::flags::{NEED_ACK, RELIABLE}; -use crate::prudp::packet::PacketOption::FragmentId; -use crate::prudp::packet::types::DATA; -use crate::prudp::socket::{ExternalConnection, SendingConnection}; use crate::rmc::response::ErrorCode::Core_Exception; use crate::rmc::structures::qresult::ERROR_MASK; -use crate::rmc::structures::RmcSerialize; use crate::util::SendingBufferConnection; pub enum RMCResponseResult { @@ -72,7 +70,7 @@ impl RMCResponse { error_code: { match ErrorCode::try_from(error_code){ Ok(v) => v, - Err(e) => { + Err(()) => { error!("invalid error code {:#010x}", error_code); Core_Exception } diff --git a/src/rmc/structures/buffer.rs b/src/rmc/structures/buffer.rs index 5370cf5..5db53cf 100644 --- a/src/rmc/structures/buffer.rs +++ b/src/rmc/structures/buffer.rs @@ -18,7 +18,7 @@ impl<'a> RmcSerialize for &'a [u8]{ } } -impl<'a> RmcSerialize for Box<[u8]>{ +impl RmcSerialize for Box<[u8]>{ fn serialize(&self, writer: &mut dyn Write) -> crate::rmc::structures::Result<()> { (&self[..]).serialize(writer) } diff --git a/src/rmc/structures/connection_data.rs b/src/rmc/structures/connection_data.rs index 1adbb20..a949d91 100644 --- a/src/rmc/structures/connection_data.rs +++ b/src/rmc/structures/connection_data.rs @@ -1,8 +1,7 @@ -use std::io::{Read, Write}; -use bytemuck::bytes_of; + use macros::RmcSerialize; use crate::kerberos::KerberosDateTime; -use crate::rmc::structures::{rmc_struct, RmcSerialize}; +use crate::rmc::structures::RmcSerialize; #[derive(Debug, RmcSerialize)] #[rmc_struct(1)] diff --git a/src/rmc/structures/list.rs b/src/rmc/structures/list.rs index 6f6b456..870be65 100644 --- a/src/rmc/structures/list.rs +++ b/src/rmc/structures/list.rs @@ -1,5 +1,8 @@ +use std::array::from_fn; use std::io::{Read, Write}; +use std::mem::MaybeUninit; use bytemuck::bytes_of; +use serde::Serialize; use crate::endianness::{IS_BIG_ENDIAN, ReadExtensions}; use crate::rmc::structures::RmcSerialize; @@ -31,3 +34,27 @@ impl RmcSerialize for Vec{ Ok(vec) } } + +impl RmcSerialize for [T; LEN]{ + fn serialize(&self, writer: &mut dyn Write) -> crate::rmc::structures::Result<()> { + for i in 0..LEN{ + self[i].serialize(writer)?; + } + + Ok(()) + } + + fn deserialize(reader: &mut dyn Read) -> crate::rmc::structures::Result { + let mut arr = [const { MaybeUninit::::uninit() }; LEN]; + + for i in 0..LEN{ + arr[i] = MaybeUninit::new(T::deserialize(reader)?); + } + + // all of the elements are now initialized so it is safe to assume they are initialized + + let arr = arr.map(|v| unsafe{ v.assume_init() }); + + Ok(arr) + } +} diff --git a/src/rmc/structures/mod.rs b/src/rmc/structures/mod.rs index d6c765c..fd6d61d 100644 --- a/src/rmc/structures/mod.rs +++ b/src/rmc/structures/mod.rs @@ -35,9 +35,9 @@ pub mod variant; pub mod ranking; mod networking; -pub trait RmcSerialize: Sized{ +pub trait RmcSerialize{ fn serialize(&self, writer: &mut dyn Write) -> Result<()>; - fn deserialize(reader: &mut dyn Read) -> Result; + fn deserialize(reader: &mut dyn Read) -> Result where Self: Sized; fn to_data(&self) -> Vec{ let mut data = Vec::new(); diff --git a/src/rmc/structures/networking.rs b/src/rmc/structures/networking.rs index 2218ae9..9a11c5a 100644 --- a/src/rmc/structures/networking.rs +++ b/src/rmc/structures/networking.rs @@ -19,6 +19,7 @@ impl RmcSerialize for SocketAddrV4{ } } + impl RmcSerialize for VirtualPort{ fn serialize(&self, writer: &mut dyn Write) -> crate::rmc::structures::Result<()> { self.0.serialize(writer)?; diff --git a/src/rmc/structures/primitives.rs b/src/rmc/structures/primitives.rs index 7c345d5..a834d9a 100644 --- a/src/rmc/structures/primitives.rs +++ b/src/rmc/structures/primitives.rs @@ -229,4 +229,13 @@ impl RmcSerialize for Box{ + fn serialize(&self, writer: &mut dyn Write) -> crate::rmc::structures::Result<()> { + self.as_ref().serialize(writer) + } + fn deserialize(reader: &mut dyn Read) -> crate::rmc::structures::Result { + T::deserialize(reader).map(Box::new) + } } \ No newline at end of file diff --git a/src/rmc/structures/qbuffer.rs b/src/rmc/structures/qbuffer.rs index f87c49e..bb6c235 100644 --- a/src/rmc/structures/qbuffer.rs +++ b/src/rmc/structures/qbuffer.rs @@ -2,7 +2,6 @@ use std::io::{Read, Write}; use bytemuck::bytes_of; use crate::endianness::{IS_BIG_ENDIAN, ReadExtensions}; use crate::rmc::structures::{Result, RmcSerialize}; -use crate::rmc::structures::qresult::QResult; #[derive(Debug)] diff --git a/src/rnex_proxy_common.rs b/src/rnex_proxy_common.rs index f04cf3e..c300aa0 100644 --- a/src/rnex_proxy_common.rs +++ b/src/rnex_proxy_common.rs @@ -1,5 +1,4 @@ use macros::RmcSerialize; -use crate::kerberos::KerberosDateTime; use crate::prudp::sockaddr::PRUDPSockAddr; #[derive(Debug, RmcSerialize)] diff --git a/src/util.rs b/src/util.rs index 7556fc2..04c8400 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,14 +1,14 @@ -use std::cell::UnsafeCell; -use std::marker::PhantomData; use std::ops::Deref; -use log::error; -use tokio::io::{AsyncRead, AsyncWrite}; +use std::sync::Arc; +use log::{error, info}; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::sync::Notify; use tokio::task; use rust_nex::reggie::{UnitPacketRead, UnitPacketWrite}; #[derive(Clone)] -pub struct SendingBufferConnection(Sender>); +pub struct SendingBufferConnection(Sender>, Arc); pub struct SplittableBufferConnection(SendingBufferConnection, Receiver>); @@ -39,41 +39,53 @@ impl SplittableBufferConnection { let (outside_send, inside_recv) = channel::>(10); let (inside_send, outside_recv) = channel::>(10); - task::spawn(async move { - let sender = inside_send; - let mut recver = inside_recv; - let mut stream = stream; - loop { - tokio::select! { - data = recver.recv() => { - let Some(data) = data else { - break; - }; + let notify = Arc::new(Notify::new()); - if let Err(e) = stream.send_buffer(&data[..]).await{ - error!("error sending data to backend: {}", e); - break; - } - }, - data = stream.read_buffer() => { - let data = match data{ - Ok(d) => d, - Err(e) => { - error!("error reveiving data from backend: {}", e); + { + let notify = notify.clone(); + task::spawn(async move { + let sender = inside_send; + let mut recver = inside_recv; + let mut stream = stream; + + + loop { + tokio::select! { + data = recver.recv() => { + let Some(data) = data else { + break; + }; + + if let Err(e) = stream.send_buffer(&data[..]).await{ + error!("error sending data to backend: {}", e); break; } - }; + }, + data = stream.read_buffer() => { + let data = match data{ + Ok(d) => d, + Err(e) => { + error!("error reveiving data from backend: {}", e); + break; + } + }; - if let Err(e) = sender.send(data).await{ - error!("a send error occurred {}", e); - return; + if let Err(e) = sender.send(data).await{ + error!("a send error occurred {}", e); + return; + } + }, + _ = notify.notified() => { + info!("shutting down connection"); + break; } - }, + } } - } - }); + stream.shutdown().await; + }); + } - Self(SendingBufferConnection(outside_send), outside_recv) + Self(SendingBufferConnection(outside_send, notify), outside_recv) } } @@ -81,6 +93,15 @@ impl SendingBufferConnection{ pub async fn send(&self, buffer: Vec) -> Option<()>{ self.0.send(buffer).await.ok() } + pub fn is_alive(&self) -> bool{ + !self.0.is_closed() + } + pub async fn disconnect(&self) { + while !self.0.is_closed() { + self.1.notify_waiters(); + tokio::task::yield_now().await; + } + } } impl SplittableBufferConnection{ diff --git a/src/versions.rs b/src/versions.rs index 0fa85fb..7b5ec30 100644 --- a/src/versions.rs +++ b/src/versions.rs @@ -1,6 +1,6 @@ use std::marker::PhantomData; use std::ops::{BitAnd, BitOr}; -use typenum::{Cmp, IsEqual, IsLess, IsLessOrEqual, Unsigned, U1, U2, U3}; +use typenum::{Cmp, IsEqual, IsLess, IsLessOrEqual, Unsigned}; /// This trait represents a version at compile time trait Version{ diff --git a/src/web/mod.rs b/src/web/mod.rs index 16a661b..cd2bcf2 100644 --- a/src/web/mod.rs +++ b/src/web/mod.rs @@ -1,15 +1,10 @@ -use std::net::SocketAddrV4; use std::sync::Arc; use async_trait::async_trait; -use once_cell::sync::Lazy; -use rocket::{get, routes, Request, Rocket, State}; +use rocket::{get, routes, Request, State}; use rocket::request::{FromRequest, Outcome}; use rocket::serde::json::Json; use tokio::task::JoinHandle; -use serde::Serialize; -use tokio::sync::Mutex; use crate::nex::matchmake::MatchmakeManager; -use crate::rmc::protocols::HasRmcConnection; use crate::rmc::protocols::notifications::NotificationEvent; struct RnexApiAuth;