Merge pull request #1 from Splatfestival-Network/feat/proxy

Feat/proxy
This commit is contained in:
DJMrTV 2025-06-13 12:36:52 +02:00 committed by GitHub
commit 79e021751c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
28 changed files with 1487 additions and 455 deletions

912
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,5 +1,5 @@
[package]
name = "splatoon-server-rust"
name = "rust-nex"
version = "0.1.0"
edition = "2021"
@ -25,8 +25,8 @@ simplelog = "0.12.2"
chrono = "0.4.39"
log = "0.4.25"
anyhow = "1.0.95"
rand = "0.9.0-beta.3"
rustls = "^0.23.21"
rand = "0.8.5"
hmac = "0.12.1"
md-5 = "^0.10.6"
tokio = { version = "1.43.0", features = ["macros", "rt-multi-thread", "net", "sync", "fs"] }
@ -45,6 +45,15 @@ futures = "0.3.31"
reqwest = "0.12.18"
json = "0.12.4"
ctrlc = "3.4.7"
rsa = "0.9.8"
sha2 = "0.10.9"
chacha20 = "0.9.1"
rustls = "0.23.27"
rustls-pki-types = "1.12.0"
rustls-webpki = "0.103.3"
tokio-rustls = "0.26.2"
@ -57,3 +66,19 @@ default = ["secure", "auth"]
secure = []
auth = []
[[bin]]
name = "proxy_insecure"
path = "src/executables/proxy_insecure.rs"
[[bin]]
name = "proxy_secure"
path = "src/executables/proxy_secure.rs"
[[bin]]
name = "backend_server_insecure"
path = "src/executables/backend_server_insecure.rs"
[[bin]]
name = "backend_server_secure"
path = "src/executables/backend_server_secure.rs"

View file

@ -189,7 +189,7 @@ pub fn rmc_serialize(input: TokenStream) -> TokenStream {
quote! {
#pre_inner
crate::rmc::structures::rmc_struct::write_struct(writer, #version, |mut writer|{
rust_nex::rmc::structures::rmc_struct::write_struct(writer, #version, |mut writer|{
#serialize_base_content
})?;
@ -218,7 +218,7 @@ pub fn rmc_serialize(input: TokenStream) -> TokenStream {
quote! {
#pre_inner
Ok(crate::rmc::structures::rmc_struct::read_struct(reader, #version, move |mut reader|{
Ok(rust_nex::rmc::structures::rmc_struct::read_struct(reader, #version, move |mut reader|{
#deserialize_base_content
})?)
}
@ -229,14 +229,14 @@ pub fn rmc_serialize(input: TokenStream) -> TokenStream {
let ident = derive_input.ident;
let tokens = quote! {
impl crate::rmc::structures::RmcSerialize for #ident{
fn serialize(&self, writer: &mut dyn ::std::io::Write) -> crate::rmc::structures::Result<()>{
impl rust_nex::rmc::structures::RmcSerialize for #ident{
fn serialize(&self, writer: &mut dyn ::std::io::Write) -> rust_nex::rmc::structures::Result<()>{
#serialize_base_content
}
fn deserialize(reader: &mut dyn ::std::io::Read) -> crate::rmc::structures::Result<Self>{
fn deserialize(reader: &mut dyn ::std::io::Read) -> rust_nex::rmc::structures::Result<Self>{
#deserialize_base_content
}
}
@ -367,8 +367,8 @@ pub fn rmc_struct(attr: TokenStream, input: TokenStream) -> TokenStream{
}
impl crate::rmc::protocols::RmcCallable for #struct_name{
async fn rmc_call(&self, remote_response_connection: &crate::prudp::socket::SendingConnection, protocol_id: u16, method_id: u32, call_id: u32, rest: Vec<u8>){
impl rust_nex::rmc::protocols::RmcCallable for #struct_name{
async fn rmc_call(&self, remote_response_connection: &rust_nex::util::SendingBufferConnection, protocol_id: u16, method_id: u32, call_id: u32, rest: Vec<u8>){
<Self as #ident>::rmc_call(self, remote_response_connection, protocol_id, method_id, call_id, rest).await;
}
}

View file

@ -73,7 +73,7 @@ impl RmcProtocolData{
for (param_name, param_type) in parameters{
quote!{
let Ok(#param_name) =
<#param_type as crate::rmc::structures::RmcSerialize>::deserialize(
<#param_type as rust_nex::rmc::structures::RmcSerialize>::deserialize(
&mut cursor
) else
}.to_tokens(tokens);
@ -84,7 +84,7 @@ impl RmcProtocolData{
quote! {
{
log::error!(#error_msg);
return Err(crate::rmc::response::ErrorCode::Core_InvalidArgument);
return Err(rust_nex::rmc::response::ErrorCode::Core_InvalidArgument);
};
}.to_tokens(tokens)
} else {
@ -116,7 +116,7 @@ impl RmcProtocolData{
quote!{
let retval = retval?;
let mut vec = Vec::new();
crate::rmc::structures::RmcSerialize::serialize(&retval, &mut vec).ok();
rust_nex::rmc::structures::RmcSerialize::serialize(&retval, &mut vec).ok();
Ok(vec)
}.to_tokens(tokens);
}
@ -126,7 +126,7 @@ impl RmcProtocolData{
quote!{
async fn rmc_call_proto(
&self,
remote_response_connection: &crate::prudp::socket::SendingConnection,
remote_response_connection: &rust_nex::util::SendingBufferConnection,
method_id: u32,
call_id: u32,
data: Vec<u8>,
@ -165,7 +165,7 @@ impl RmcProtocolData{
}.to_tokens(tokens);
if self.has_returns {
quote! {
Err(crate::rmc::response::ErrorCode::Core_NotImplemented)
Err(rust_nex::rmc::response::ErrorCode::Core_NotImplemented)
}.to_tokens(tokens);
}
});
@ -176,7 +176,7 @@ impl RmcProtocolData{
if *has_returns{
quote!{
crate::rmc::response::send_result(
rust_nex::rmc::response::send_result(
remote_response_connection,
ret,
#id,
@ -209,7 +209,7 @@ impl RmcProtocolData{
// boilerplate tokens which all raw traits need
quote!{
#[doc(hidden)]
pub trait #remote_name: crate::rmc::protocols::HasRmcConnection
pub trait #remote_name: rust_nex::rmc::protocols::HasRmcConnection
}.to_tokens(tokens);
// generate the body of the raw protocol trait
@ -247,12 +247,12 @@ impl RmcProtocolData{
for (param_name, param_type) in parameters{
quote!{
crate::result::ResultExtension::display_err_or_some(
<#param_type as crate::rmc::structures::RmcSerialize>::serialize(
rust_nex::result::ResultExtension::display_err_or_some(
<#param_type as rust_nex::rmc::structures::RmcSerialize>::serialize(
&#param_name,
&mut cursor
)
).ok_or(crate::rmc::response::ErrorCode::Core_InvalidArgument)
).ok_or(rust_nex::rmc::response::ErrorCode::Core_InvalidArgument)
}.to_tokens(tokens);
if self.has_returns {
quote! {
@ -268,25 +268,25 @@ impl RmcProtocolData{
quote!{
let call_id = rand::random();
let message = crate::rmc::message::RMCMessage{
let message = rust_nex::rmc::message::RMCMessage{
call_id,
method_id: #method_id,
protocol_id: #proto_id,
rest_of_data: send_data
};
let rmc_conn = <Self as crate::rmc::protocols::HasRmcConnection>::get_connection(self);
let rmc_conn = <Self as rust_nex::rmc::protocols::HasRmcConnection>::get_connection(self);
}.to_tokens(tokens);
if *has_returns{
quote!{
crate::result::ResultExtension::display_err_or_some(
rust_nex::result::ResultExtension::display_err_or_some(
rmc_conn.make_raw_call(&message).await
).ok_or(crate::rmc::response::ErrorCode::Core_Exception)
).ok_or(rust_nex::rmc::response::ErrorCode::Core_Exception)
}.to_tokens(tokens);
} else {
quote!{
crate::result::ResultExtension::display_err_or_some(
rust_nex::result::ResultExtension::display_err_or_some(
rmc_conn.make_raw_call_no_response(&message).await
);
}.to_tokens(tokens);

37
src/common.rs Normal file
View file

@ -0,0 +1,37 @@
use std::fs;
use std::fs::File;
use chrono::{Local, SecondsFormat};
use log::LevelFilter;
use simplelog::{ColorChoice, CombinedLogger, Config, TermLogger, TerminalMode, WriteLogger};
pub fn setup(){
CombinedLogger::init(vec![
TermLogger::new(
LevelFilter::Info,
Config::default(),
TerminalMode::Mixed,
ColorChoice::Auto,
),
WriteLogger::new(LevelFilter::max(), Config::default(), {
fs::create_dir_all("log").unwrap();
let date = Local::now().to_rfc3339_opts(SecondsFormat::Secs, false);
// this fixes windows being windows
let date = date.replace(":", "-");
let filename = format!("{}.log", date);
if cfg!(windows) {
File::create(format!("log\\{}", filename)).unwrap()
} else {
File::create(format!("log/{}", filename)).unwrap()
}
}),
])
.unwrap();
/*ctrlc::set_handler(||{
FORCE_EXIT.call_once_force(|_|{
println!("attempting exit");
});
}).unwrap();*/
dotenv::dotenv().ok();
}

View file

@ -0,0 +1,100 @@
use rust_nex::reggie::UnitPacketRead;
use log::{error, info};
use once_cell::sync::Lazy;
use rustls::client::danger::HandshakeSignatureValid;
use rustls::pki_types::{CertificateDer, TrustAnchor, UnixTime};
use rustls::server::danger::{ClientCertVerified, ClientCertVerifier};
use rustls::server::{ClientCertVerifierBuilder, WebPkiClientVerifier};
use rustls::{
DigitallySignedStruct, DistinguishedName, Error, RootCertStore, ServerConfig, ServerConnection,
SignatureScheme,
};
use rustls_pki_types::PrivateKeyDer;
use rust_nex::common::setup;
use rust_nex::reggie::{get_configured_tls_acceptor, TestStruct, ROOT_TRUST_ANCHOR, SELF_CERT, SELF_KEY};
use std::borrow::ToOwned;
use std::{env, fs};
use std::io::Cursor;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::Arc;
use macros::{method_id, rmc_proto, rmc_struct};
use tokio::io::AsyncReadExt;
use tokio::net::{TcpListener, TcpSocket};
use tokio::task;
use tokio_rustls::TlsAcceptor;
use rust_nex::define_rmc_proto;
use rust_nex::executables::common::{OWN_IP_PRIVATE, SECURE_SERVER_ACCOUNT, SERVER_PORT};
use rust_nex::nex::auth_handler::AuthHandler;
use rust_nex::rmc::protocols::new_rmc_gateway_connection;
use rust_nex::rmc::response::ErrorCode;
use rust_nex::rmc::structures::RmcSerialize;
use rust_nex::rnex_proxy_common::ConnectionInitData;
pub static SECURE_PROXY_ADDR: Lazy<Ipv4Addr> = Lazy::new(|| {
env::var("SECURE_PROXY_ADDR")
.ok()
.and_then(|s| s.parse().ok())
.expect("no secure proxy ip specified")
});
pub static SECURE_PROXY_PORT: Lazy<u16> = Lazy::new(|| {
env::var("SECURE_PROXY_PORT")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(10000)
});
static SECURE_STATION_URL: Lazy<String> = Lazy::new(|| {
format!(
"prudps:/PID=2;sid=1;stream=10;type=2;address={};port={};CID=1",
*SECURE_PROXY_ADDR, *SECURE_PROXY_PORT
)
});
#[tokio::main]
async fn main() {
setup();
let acceptor = get_configured_tls_acceptor().await;
let listen = TcpListener::bind(SocketAddrV4::new(*OWN_IP_PRIVATE, *SERVER_PORT)).await.unwrap();
while let Ok((stream, addr)) = listen.accept().await {
let mut stream = match acceptor.accept(stream).await {
Ok(v) => v,
Err(e) => {
error!("an error ocurred whilest accepting tls connection: {:?}", e);
continue;
}
};
let buffer = match stream.read_buffer().await{
Ok(v) => v,
Err(e) => {
error!("an error ocurred whilest reading connection data buffer: {:?}", e);
continue;
}
};
let user_connection_data = ConnectionInitData::deserialize(&mut Cursor::new(buffer));
let user_connection_data = match user_connection_data{
Ok(v) => v,
Err(e) => {
error!("an error ocurred whilest reading connection data: {:?}", e);
continue;
}
};
task::spawn(async move {
info!("connection to secure backend established");
new_rmc_gateway_connection(stream.into(), |_| {
Arc::new(AuthHandler {
destination_server_acct: &SECURE_SERVER_ACCOUNT,
build_name: "branch:origin/project/wup-agmj build:3_8_15_2004_0",
station_url: &SECURE_STATION_URL,
})
});
});
}
}

View file

@ -0,0 +1,84 @@
use std::io::Cursor;
use rust_nex::rmc::structures::RmcSerialize;
use rust_nex::reggie::UnitPacketRead;
use std::net::SocketAddrV4;
use std::sync::Arc;
use std::sync::atomic::AtomicU32;
use log::{error, info};
use tokio::net::TcpListener;
use tokio::task;
use rust_nex::common::setup;
use rust_nex::executables::common::{OWN_IP_PRIVATE, SERVER_PORT};
use rust_nex::nex::matchmake::MatchmakeManager;
use rust_nex::nex::remote_console::RemoteConsole;
use rust_nex::nex::user::User;
use rust_nex::reggie::get_configured_tls_acceptor;
use rust_nex::rmc::protocols::new_rmc_gateway_connection;
use rust_nex::rnex_proxy_common::ConnectionInitData;
use rust_nex::rmc::protocols::RemoteInstantiatable;
#[tokio::main]
async fn main() {
setup();
let acceptor = get_configured_tls_acceptor().await;
let listen = TcpListener::bind(SocketAddrV4::new(*OWN_IP_PRIVATE, *SERVER_PORT)).await.unwrap();
let mmm = Arc::new(MatchmakeManager{
gid_counter: AtomicU32::new(1),
sessions: Default::default(),
users: Default::default(),
rv_cid_counter: AtomicU32::new(1),
});
let weak_mmm = Arc::downgrade(&mmm);
MatchmakeManager::initialize_garbage_collect_thread(weak_mmm).await;
while let Ok((stream, addr)) = listen.accept().await {
let mut stream = match acceptor.accept(stream).await {
Ok(v) => v,
Err(e) => {
error!("an error ocurred whilest accepting tls connection: {:?}", e);
continue;
}
};
let buffer = match stream.read_buffer().await{
Ok(v) => v,
Err(e) => {
error!("an error ocurred whilest reading connection data buffer: {:?}", e);
continue;
}
};
let user_connection_data = ConnectionInitData::deserialize(&mut Cursor::new(buffer));
let user_connection_data = match user_connection_data{
Ok(v) => v,
Err(e) => {
error!("an error ocurred whilest reading connection data: {:?}", e);
continue;
}
};
let mmm = mmm.clone();
task::spawn(async move {
info!("connection to secure backend established");
new_rmc_gateway_connection(stream.into(), |r| {
Arc::new_cyclic(|this| User{
this: this.clone(),
ip: user_connection_data.prudpsock_addr,
pid: user_connection_data.pid,
remote: RemoteConsole::new(r),
matchmake_manager: mmm,
station_url: Default::default()
})
});
});
}
}

29
src/executables/common.rs Normal file
View file

@ -0,0 +1,29 @@
use std::env;
use std::net::Ipv4Addr;
use once_cell::sync::Lazy;
use crate::nex::account::Account;
pub static OWN_IP_PRIVATE: Lazy<Ipv4Addr> = Lazy::new(|| {
env::var("SERVER_IP")
.ok()
.and_then(|s| s.parse().ok())
.expect("no private ip specified")
});
pub static SERVER_PORT: Lazy<u16> = Lazy::new(|| {
env::var("SERVER_PORT")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(10000)
});
pub static KERBEROS_SERVER_PASSWORD: Lazy<String> = Lazy::new(|| {
env::var("AUTH_SERVER_PASSWORD")
.ok()
.unwrap_or("password".to_owned())
});
pub static AUTH_SERVER_ACCOUNT: Lazy<Account> =
Lazy::new(|| Account::new(1, "Quazal Authentication", &KERBEROS_SERVER_PASSWORD));
pub static SECURE_SERVER_ACCOUNT: Lazy<Account> =
Lazy::new(|| Account::new(2, "Quazal Rendez-Vous", &KERBEROS_SERVER_PASSWORD));

1
src/executables/mod.rs Normal file
View file

@ -0,0 +1 @@
pub mod common;

View file

@ -0,0 +1,120 @@
use std::env;
use std::ffi::CStr;
use std::io::{Read, Write};
use std::net::{Ipv4Addr, SocketAddrV4, TcpListener, TcpStream};
use bytemuck::{Pod, Zeroable};
use chacha20::{ChaCha20, Key};
use chacha20::cipher::{Iv, KeyIvInit, StreamCipher};
use log::error;
use once_cell::sync::Lazy;
use rsa::pkcs8::{DecodePrivateKey, DecodePublicKey, Document};
use rsa::{BigUint, Pkcs1v15Encrypt, RsaPrivateKey, RsaPublicKey};
use rsa::pkcs1::EncodeRsaPublicKey;
use rsa::pss::BlindedSigningKey;
use rsa::signature::{RandomizedSigner, SignatureEncoding};
use sha2::Sha256;
use tokio::net::TcpSocket;
use tokio::task;
use rust_nex::common::setup;
use rust_nex::executables::common::{OWN_IP_PRIVATE, SERVER_PORT};
use rust_nex::prudp::packet::VirtualPort;
use rust_nex::prudp::router::Router;
use rust_nex::prudp::unsecure::Unsecure;
use rust_nex::reggie::{establish_tls_connection_to, UnitPacketRead, UnitPacketWrite};
use rust_nex::rmc::structures::RmcSerialize;
use rust_nex::rnex_proxy_common::ConnectionInitData;
static FORWARD_DESTINATION: Lazy<String> =
Lazy::new(|| env::var("FORWARD_DESTINATION").expect("no forward destination given"));
static FORWARD_DESTINATION_NAME: Lazy<String> =
Lazy::new(|| env::var("FORWARD_DESTINATION_NAME").expect("no forward destination name given"));
static RSA_PRIVKEY: Lazy<RsaPrivateKey> = Lazy::new(|| {
let path = env::var("RSA_PRIVKEY")
.expect("RSA_PRIVKEY not set");
RsaPrivateKey::read_pkcs8_pem_file(&path)
.expect("unable to read private key")
});
static RSA_PUBKEY: Lazy<RsaPublicKey> = Lazy::new(|| {
RSA_PRIVKEY.to_public_key()
});
static PUBKEY_ENCODED: Lazy<Document> = Lazy::new(|| {
RSA_PUBKEY.to_pkcs1_der().expect("unable to convert pubkey to der")
});
static RSA_SIGNKEY: Lazy<BlindedSigningKey<Sha256>> = Lazy::new(||
BlindedSigningKey::<Sha256>::new(RSA_PRIVKEY.clone())
);
#[tokio::main]
async fn main() {
setup();
let (router_secure, _) = Router::new(SocketAddrV4::new(*OWN_IP_PRIVATE, *SERVER_PORT))
.await
.expect("unable to start router");
let mut socket_secure = router_secure
.add_socket(VirtualPort::new(1, 10), Unsecure(
"6f599f81"
))
.await
.expect("unable to add socket");
// let conn = socket_secure.connect(auth_sockaddr).await.unwrap();
loop {
let Some(mut conn) = socket_secure.accept().await else {
error!("server crashed");
return;
};
task::spawn(async move {
let mut stream
= establish_tls_connection_to(FORWARD_DESTINATION.as_str(), FORWARD_DESTINATION_NAME.as_str()).await;
if let Err(e) = stream.send_buffer(&ConnectionInitData{
prudpsock_addr: conn.socket_addr,
pid: conn.user_id
}.to_data()).await{
error!("error connecting to backend: {}", e);
return;
};
loop {
tokio::select! {
data = conn.recv() => {
let Some(data) = data else {
break;
};
if let Err(e) = stream.send_buffer(&data[..]).await{
error!("error sending data to backend: {}", e);
break;
}
},
data = stream.read_buffer() => {
let data = match data{
Ok(d) => d,
Err(e) => {
error!("error reveiving data from backend: {}", e);
break;
}
};
if conn.send(data).await == None{
return;
}
},
}
}
});
}
}

View file

@ -0,0 +1,102 @@
use std::env;
use std::ffi::CStr;
use std::io::{Read, Write};
use std::net::{Ipv4Addr, SocketAddrV4, TcpListener, TcpStream};
use bytemuck::{Pod, Zeroable};
use chacha20::{ChaCha20, Key};
use chacha20::cipher::{Iv, KeyIvInit, StreamCipher};
use log::error;
use once_cell::sync::Lazy;
use rsa::pkcs8::{DecodePrivateKey, DecodePublicKey, Document};
use rsa::{BigUint, Pkcs1v15Encrypt, RsaPrivateKey, RsaPublicKey};
use rsa::pkcs1::EncodeRsaPublicKey;
use rsa::pss::BlindedSigningKey;
use rsa::signature::{RandomizedSigner, SignatureEncoding};
use sha2::Sha256;
use tokio::net::TcpSocket;
use tokio::task;
use rust_nex::common::setup;
use rust_nex::executables::common::{OWN_IP_PRIVATE, SECURE_SERVER_ACCOUNT, SERVER_PORT};
use rust_nex::prudp::packet::VirtualPort;
use rust_nex::prudp::router::Router;
use rust_nex::prudp::secure::Secure;
use rust_nex::prudp::unsecure::Unsecure;
use rust_nex::reggie::{establish_tls_connection_to, UnitPacketRead, UnitPacketWrite};
use rust_nex::rmc::structures::RmcSerialize;
use rust_nex::rnex_proxy_common::ConnectionInitData;
static FORWARD_DESTINATION: Lazy<String> =
Lazy::new(|| env::var("FORWARD_DESTINATION").expect("no forward destination given"));
static FORWARD_DESTINATION_NAME: Lazy<String> =
Lazy::new(|| env::var("FORWARD_DESTINATION_NAME").expect("no forward destination name given"));
#[tokio::main]
async fn main() {
setup();
let (router_secure, _) = Router::new(SocketAddrV4::new(*OWN_IP_PRIVATE, *SERVER_PORT))
.await
.expect("unable to start router");
let mut socket_secure = router_secure
.add_socket(VirtualPort::new(1, 10), Secure(
"6f599f81",
&SECURE_SERVER_ACCOUNT
))
.await
.expect("unable to add socket");
// let conn = socket_secure.connect(auth_sockaddr).await.unwrap();
loop {
let Some(mut conn) = socket_secure.accept().await else {
error!("server crashed");
return;
};
task::spawn(async move {
let mut stream
= establish_tls_connection_to(FORWARD_DESTINATION.as_str(), FORWARD_DESTINATION_NAME.as_str()).await;
if let Err(e) = stream.send_buffer(&ConnectionInitData{
prudpsock_addr: conn.socket_addr,
pid: conn.user_id
}.to_data()).await{
error!("error connecting to backend: {}", e);
return;
};
loop {
tokio::select! {
data = conn.recv() => {
let Some(data) = data else {
break;
};
if let Err(e) = stream.send_buffer(&data[..]).await{
error!("error sending data to backend: {}", e);
break;
}
},
data = stream.read_buffer() => {
let data = match data{
Ok(d) => d,
Err(e) => {
error!("error reveiving data from backend: {}", e);
break;
}
};
if conn.send(data).await == None{
return;
}
},
}
}
});
}
}

View file

@ -20,7 +20,7 @@ use crate::grpc::protobufs::account::{GetNexPasswordRequest, GetUserDataRequest,
static API_KEY: Lazy<String> = Lazy::new(||{
let key = env::var("ACCOUNT_GQL_API_KEY")
.expect("no public ip specified");
.expect("no graphql ip specified");
key
});
@ -29,7 +29,7 @@ static CLIENT_URI: Lazy<String> = Lazy::new(||{
env::var("ACCOUNT_GQL_URL")
.ok()
.and_then(|s| s.parse().ok())
.expect("no public ip specified")
.expect("no graphql ip specified")
});

View file

@ -1,11 +1,18 @@
mod endianness;
mod prudp;
extern crate self as rust_nex;
pub mod endianness;
pub mod prudp;
pub mod rmc;
//mod protocols;
mod grpc;
mod kerberos;
mod nex;
mod result;
mod versions;
mod web;
pub mod grpc;
pub mod kerberos;
pub mod nex;
pub mod result;
pub mod versions;
pub mod web;
pub mod common;
pub mod reggie;
pub mod rnex_proxy_common;
pub mod util;
pub mod executables;

View file

@ -7,6 +7,8 @@
//! also the first and only current usage of rnex, expect this and rnex to be split into seperate
//! repos soon.
extern crate self as rust_nex;
use crate::nex::account::Account;
use crate::nex::auth_handler::{AuthHandler, RemoteAuthClientProtocol};
use crate::nex::remote_console::RemoteConsole;
@ -59,6 +61,10 @@ mod nex;
mod result;
mod versions;
mod web;
pub mod reggie;
pub mod util;
static KERBEROS_SERVER_PASSWORD: Lazy<String> = Lazy::new(|| {
env::var("AUTH_SERVER_PASSWORD")
@ -88,7 +94,7 @@ static OWN_IP_PRIVATE: Lazy<Ipv4Addr> = Lazy::new(|| {
env::var("SERVER_IP")
.ok()
.and_then(|s| s.parse().ok())
.expect("no public ip specified")
.expect("no private ip specified")
});
static OWN_IP_PUBLIC: Lazy<String> =
@ -135,7 +141,7 @@ async fn main() {
dotenv::dotenv().ok();
start_servers().await;
//start_servers().await;
}
/*
@ -278,7 +284,7 @@ async fn start_secure_server() -> SecureServer{
socket,
}
}*/
/*
async fn start_auth() -> JoinHandle<()> {
tokio::spawn(async {
let (router_secure, _) = Router::new(SocketAddrV4::new(*OWN_IP_PRIVATE, *AUTH_SERVER_PORT))
@ -414,3 +420,4 @@ async fn start_servers() {
#[cfg(feature = "secure")]
secure_server.await.expect("auth server crashed");
}
*/

View file

@ -144,7 +144,7 @@ impl Debug for VirtualPort {
#[repr(C)]
#[derive(Debug, Copy, Clone, Pod, Zeroable, SwapEndian, Eq, PartialEq)]
pub struct PRUDPHeader {
pub struct PRUDPV1Header {
pub magic: [u8; 2],
pub version: u8,
pub packet_specific_size: u8,
@ -157,7 +157,7 @@ pub struct PRUDPHeader {
pub sequence_id: u16,
}
impl Default for PRUDPHeader{
impl Default for PRUDPV1Header {
fn default() -> Self {
Self{
magic: [0xEA, 0xD0],
@ -239,8 +239,8 @@ impl PacketOption{
}
#[derive(Debug, Default, Clone, Eq, PartialEq)]
pub struct PRUDPPacket {
pub header: PRUDPHeader,
pub struct PRUDPV1Packet {
pub header: PRUDPV1Header,
pub packet_signature: [u8; 16],
pub payload: Vec<u8>,
pub options: Vec<PacketOption>,
@ -277,9 +277,9 @@ impl Into<u8> for OptionId {
}
}
impl PRUDPPacket {
impl PRUDPV1Packet {
pub fn new(reader: &mut (impl Read + Seek)) -> Result<Self> {
let header: PRUDPHeader = reader.read_struct(IS_BIG_ENDIAN)?;
let header: PRUDPV1Header = reader.read_struct(IS_BIG_ENDIAN)?;
if header.magic[0] != 0xEA ||
header.magic[1] != 0xD0 {
@ -372,7 +372,7 @@ impl PRUDPPacket {
Self{
header: PRUDPHeader{
header: PRUDPV1Header {
types_and_flags: flags,
sequence_id: self.header.sequence_id,
substream_id: self.header.substream_id,
@ -444,7 +444,7 @@ impl PRUDPPacket {
pub fn base_response_packet(&self) -> Self {
Self {
header: PRUDPHeader {
header: PRUDPV1Header {
magic: [0xEA, 0xD0],
types_and_flags: TypesFlags(0),
destination_port: self.header.source_port,
@ -481,10 +481,10 @@ impl PRUDPPacket {
mod test {
use crate::prudp::packet::flags::{NEED_ACK, RELIABLE};
use crate::prudp::packet::types::DATA;
use super::{OptionId, PacketOption, PRUDPHeader, TypesFlags, VirtualPort};
use super::{OptionId, PacketOption, PRUDPV1Header, TypesFlags, VirtualPort};
#[test]
fn size_test() {
assert_eq!(size_of::<PRUDPHeader>(), 14);
assert_eq!(size_of::<PRUDPV1Header>(), 14);
}
#[test]
@ -511,7 +511,7 @@ mod test {
#[test]
fn header_read(){
let header = PRUDPHeader{
let header = PRUDPV1Header {
version: 0,
destination_port: VirtualPort(0),
substream_id: 0,

View file

@ -15,7 +15,7 @@ use tokio::select;
use tokio::sync::RwLock;
use tokio::time::sleep;
use crate::prudp::socket::{new_socket_pair, AnyInternalSocket, CryptoHandler, ExternalSocket};
use crate::prudp::packet::{PRUDPPacket, VirtualPort};
use crate::prudp::packet::{PRUDPV1Packet, VirtualPort};
use crate::prudp::router::Error::VirtualPortTaken;
static SERVER_DATAGRAMS: Lazy<u8> = Lazy::new(||{
@ -31,6 +31,7 @@ pub struct Router {
socket: Arc<UdpSocket>,
_no_outside_construction: PhantomData<()>
}
#[derive(Debug, Error)]
pub enum Error{
#[error("tried to register socket to a port which is already taken (port: {0})")]
@ -43,7 +44,7 @@ impl Router {
let mut stream = Cursor::new(&udp_message);
while stream.position() as usize != udp_message.len() {
let packet = match PRUDPPacket::new(&mut stream){
let packet = match PRUDPV1Packet::new(&mut stream){
Ok(p) => p,
Err(e) => {
error!("Somebody({}) is fucking with the servers or their connection is bad (reason: {})", addr, e);
@ -155,7 +156,7 @@ impl Router {
}
// returns Some(()) i
pub(crate) async fn add_socket<E: CryptoHandler>(&self, virtual_port: VirtualPort, encryption: E)
pub async fn add_socket<E: CryptoHandler>(&self, virtual_port: VirtualPort, encryption: E)
-> Result<ExternalSocket, Error>{
let mut endpoints = self.endpoints.write().await;

View file

@ -8,7 +8,7 @@ use typenum::U5;
use crate::endianness::{IS_BIG_ENDIAN, ReadExtensions};
use crate::kerberos::{derive_key, TicketInternalData};
use crate::nex::account::Account;
use crate::prudp::packet::PRUDPPacket;
use crate::prudp::packet::PRUDPV1Packet;
use crate::prudp::socket::{CryptoHandler, CryptoHandlerConnectionInstance, EncryptionPair};
use crate::prudp::unsecure::UnsecureInstance;
use crate::rmc::structures::RmcSerialize;
@ -150,7 +150,7 @@ impl CryptoHandler for Secure {
))
}
fn sign_pre_handshake(&self, packet: &mut PRUDPPacket) {
fn sign_pre_handshake(&self, packet: &mut PRUDPV1Packet) {
packet.set_sizes();
packet.calculate_and_assign_signature(self.0, None, None);
}
@ -176,17 +176,17 @@ impl CryptoHandlerConnectionInstance for SecureInstance {
self.pid
}
fn sign_connect(&self, packet: &mut PRUDPPacket) {
fn sign_connect(&self, packet: &mut PRUDPV1Packet) {
packet.set_sizes();
packet.calculate_and_assign_signature(self.access_key, None, Some(self.self_signature));
}
fn sign_packet(&self, packet: &mut PRUDPPacket) {
fn sign_packet(&self, packet: &mut PRUDPV1Packet) {
packet.set_sizes();
packet.calculate_and_assign_signature(self.access_key, Some(self.session_key), Some(self.self_signature));
}
fn verify_packet(&self, packet: &PRUDPPacket) -> bool {
fn verify_packet(&self, packet: &PRUDPV1Packet) -> bool {
true
}
}

View file

@ -1,11 +1,13 @@
use std::io::Write;
use std::net::SocketAddrV4;
use hmac::{Hmac, Mac};
use macros::RmcSerialize;
use crate::prudp::packet::VirtualPort;
type Md5Hmac = Hmac<md5::Md5>;
#[derive(Eq, PartialEq, Hash, Debug, Copy, Clone, Ord, PartialOrd)]
#[derive(Eq, PartialEq, Hash, Debug, Copy, Clone, Ord, PartialOrd, RmcSerialize)]
#[rmc_struct(0)]
pub struct PRUDPSockAddr{
pub regular_socket_addr: SocketAddrV4,
pub virtual_port: VirtualPort

View file

@ -4,7 +4,7 @@ use crate::prudp::packet::types::{CONNECT, DATA, DISCONNECT, PING, SYN};
use crate::prudp::packet::PacketOption::{
ConnectionSignature, FragmentId, InitialSequenceId, MaximumSubstreamId, SupportedFunctions,
};
use crate::prudp::packet::{PRUDPHeader, PRUDPPacket, PacketOption, TypesFlags, VirtualPort};
use crate::prudp::packet::{PRUDPV1Header, PRUDPV1Packet, PacketOption, TypesFlags, VirtualPort};
use crate::prudp::router::{Error, Router};
use crate::prudp::sockaddr::PRUDPSockAddr;
use async_trait::async_trait;
@ -67,7 +67,7 @@ struct InternalConnection<E: CryptoHandlerConnectionInstance> {
crypto_handler_instance: E,
data_sender: Sender<Vec<u8>>,
socket: Arc<UdpSocket>,
packet_queue: HashMap<u16, PRUDPPacket>,
packet_queue: HashMap<u16, PRUDPV1Packet>,
last_packet_time: Instant,
}
@ -88,7 +88,7 @@ impl<E: CryptoHandlerConnectionInstance> InternalConnection<E> {
}
#[inline]
async fn send_raw_packet(&self, mut prudp_packet: PRUDPPacket) {
async fn send_raw_packet(&self, mut prudp_packet: PRUDPV1Packet) {
prudp_packet.set_sizes();
let mut vec = Vec::new();
@ -128,7 +128,7 @@ pub(super) struct InternalSocket<T: CryptoHandler> {
internal_connections: Arc<
Mutex<BTreeMap<PRUDPSockAddr, Arc<Mutex<InternalConnection<T::CryptoConnectionInstance>>>>>,
>,
connection_establishment_data_sender: Mutex<Option<Sender<PRUDPPacket>>>,
connection_establishment_data_sender: Mutex<Option<Sender<PRUDPV1Packet>>>,
connection_sender: Sender<ExternalConnection>,
}
@ -170,7 +170,7 @@ impl<T: CryptoHandler> Deref for InternalSocket<T> {
pub(super) trait AnyInternalSocket:
Send + Sync + Deref<Target = CommonSocket> + 'static
{
async fn receive_packet(&self, address: PRUDPSockAddr, packet: PRUDPPacket);
async fn receive_packet(&self, address: PRUDPSockAddr, packet: PRUDPV1Packet);
async fn connect(&self, address: PRUDPSockAddr) -> Option<()>;
}
@ -186,8 +186,8 @@ pub(super) trait AnyInternalConnection:
#[async_trait]
impl<T: CryptoHandlerConnectionInstance> AnyInternalConnection for InternalConnection<T> {
async fn send_data_packet(&mut self, data: Vec<u8>) {
let mut packet = PRUDPPacket {
header: PRUDPHeader {
let mut packet = PRUDPV1Packet {
header: PRUDPV1Header {
sequence_id: self.next_server_count(),
substream_id: 0,
session_id: self.session_id,
@ -214,8 +214,8 @@ impl<T: CryptoHandlerConnectionInstance> AnyInternalConnection for InternalConne
async fn close_connection(&mut self) {
// jon confirmed that this should be a safe way to dc a client
let mut packet = PRUDPPacket {
header: PRUDPHeader {
let mut packet = PRUDPV1Packet {
header: PRUDPV1Header {
sequence_id: self.next_server_count(),
substream_id: 0,
session_id: self.session_id,
@ -269,7 +269,7 @@ impl<T: CryptoHandler> InternalSocket<T> {
Some(conn)
}
async fn send_packet_unbuffered(&self, dest: PRUDPSockAddr, mut packet: PRUDPPacket) {
async fn send_packet_unbuffered(&self, dest: PRUDPSockAddr, mut packet: PRUDPV1Packet) {
packet.set_sizes();
let mut vec = Vec::new();
@ -284,7 +284,7 @@ impl<T: CryptoHandler> InternalSocket<T> {
.expect("failed to send data back");
}
async fn handle_syn(&self, address: PRUDPSockAddr, packet: PRUDPPacket) {
async fn handle_syn(&self, address: PRUDPSockAddr, packet: PRUDPV1Packet) {
info!("got syn");
let mut response = packet.base_response_packet();
@ -328,8 +328,8 @@ impl<T: CryptoHandler> InternalSocket<T> {
let mut conn = conn.lock().await;
if conn.last_packet_time < (Instant::now() - Duration::from_secs(5)) {
conn.send_raw_packet(PRUDPPacket {
header: PRUDPHeader {
conn.send_raw_packet(PRUDPV1Packet {
header: PRUDPV1Header {
sequence_id: 0,
substream_id: 0,
session_id: 0,
@ -408,7 +408,7 @@ impl<T: CryptoHandler> InternalSocket<T> {
.expect("connection to external socket lost");
}
async fn handle_connect(&self, address: PRUDPSockAddr, packet: PRUDPPacket) {
async fn handle_connect(&self, address: PRUDPSockAddr, packet: PRUDPV1Packet) {
info!("got connect");
let Some(MaximumSubstreamId(max_substream)) = packet
.options
@ -479,7 +479,7 @@ impl<T: CryptoHandler> InternalSocket<T> {
self.send_packet_unbuffered(address, response).await;
}
async fn handle_data(&self, address: PRUDPSockAddr, mut packet: PRUDPPacket) {
async fn handle_data(&self, address: PRUDPSockAddr, mut packet: PRUDPV1Packet) {
info!("got data");
if packet.header.types_and_flags.get_flags() & (NEED_ACK | RELIABLE)
@ -524,7 +524,7 @@ impl<T: CryptoHandler> InternalSocket<T> {
}
}
async fn handle_ping(&self, address: PRUDPSockAddr, packet: PRUDPPacket) {
async fn handle_ping(&self, address: PRUDPSockAddr, packet: PRUDPV1Packet) {
let connections = self.internal_connections.lock().await;
let Some(conn) = connections.get(&address) else {
error!("tried to send data on inactive connection!");
@ -544,7 +544,7 @@ impl<T: CryptoHandler> InternalSocket<T> {
self.send_packet_unbuffered(address, response).await;
}
async fn handle_disconnect(&self, address: PRUDPSockAddr, packet: PRUDPPacket) {
async fn handle_disconnect(&self, address: PRUDPSockAddr, packet: PRUDPV1Packet) {
let connections = self.internal_connections.lock().await;
let Some(conn) = connections.get(&address) else {
error!("tried to send data on inactive connection!");
@ -571,7 +571,7 @@ impl<T: CryptoHandler> InternalSocket<T> {
#[async_trait]
impl<T: CryptoHandler> AnyInternalSocket for InternalSocket<T> {
async fn receive_packet(&self, address: PRUDPSockAddr, packet: PRUDPPacket) {
async fn receive_packet(&self, address: PRUDPSockAddr, packet: PRUDPV1Packet) {
// todo: handle acks and resending
if let Some(conn) = self.get_connection(address).await {
@ -643,8 +643,8 @@ impl<T: CryptoHandler> AnyInternalSocket for InternalSocket<T> {
let remote_signature = address.calculate_connection_signature();
let packet = PRUDPPacket {
header: PRUDPHeader {
let packet = PRUDPV1Packet {
header: PRUDPV1Header {
source_port: self.virtual_port,
destination_port: address.virtual_port,
types_and_flags: TypesFlags::default().types(SYN).flags(NEED_ACK),
@ -674,8 +674,8 @@ impl<T: CryptoHandler> AnyInternalSocket for InternalSocket<T> {
return None;
};
let packet = PRUDPPacket {
header: PRUDPHeader {
let packet = PRUDPV1Packet {
header: PRUDPV1Header {
source_port: self.virtual_port,
destination_port: address.virtual_port,
types_and_flags: TypesFlags::default().types(CONNECT).flags(NEED_ACK),
@ -746,9 +746,9 @@ pub trait CryptoHandlerConnectionInstance: Send + Sync + 'static {
fn encrypt_outgoing(&mut self, substream: u8, data: &mut [u8]);
fn get_user_id(&self) -> u32;
fn sign_connect(&self, packet: &mut PRUDPPacket);
fn sign_packet(&self, packet: &mut PRUDPPacket);
fn verify_packet(&self, packet: &PRUDPPacket) -> bool;
fn sign_connect(&self, packet: &mut PRUDPV1Packet);
fn sign_packet(&self, packet: &mut PRUDPV1Packet);
fn verify_packet(&self, packet: &PRUDPV1Packet) -> bool;
}
pub trait CryptoHandler: Send + Sync + 'static {
@ -762,7 +762,7 @@ pub trait CryptoHandler: Send + Sync + 'static {
substream_count: u8,
) -> Option<(Vec<u8>, Self::CryptoConnectionInstance)>;
fn sign_pre_handshake(&self, packet: &mut PRUDPPacket);
fn sign_pre_handshake(&self, packet: &mut PRUDPV1Packet);
}
impl Deref for ExternalConnection {
@ -813,6 +813,12 @@ impl SendingConnection {
impl<E: CryptoHandlerConnectionInstance> Drop for InternalConnection<E> {
fn drop(&mut self) {
println!("yatta");
println!("yatta(internal conn)");
}
}
impl Drop for CommonConnection {
fn drop(&mut self) {
println!("yatta(common conn)");
}
}

View file

@ -1,7 +1,7 @@
use once_cell::sync::Lazy;
use rc4::{Key, KeyInit, Rc4, StreamCipher};
use typenum::U5;
use crate::prudp::packet::PRUDPPacket;
use crate::prudp::packet::PRUDPV1Packet;
use crate::prudp::socket::{CryptoHandler, CryptoHandlerConnectionInstance, EncryptionPair};
pub struct Unsecure(pub &'static str);
@ -43,7 +43,7 @@ impl CryptoHandler for Unsecure {
))
}
fn sign_pre_handshake(&self, packet: &mut PRUDPPacket) {
fn sign_pre_handshake(&self, packet: &mut PRUDPV1Packet) {
packet.set_sizes();
packet.calculate_and_assign_signature(self.0, None, None);
}
@ -68,17 +68,17 @@ impl CryptoHandlerConnectionInstance for UnsecureInstance {
0
}
fn sign_connect(&self, packet: &mut PRUDPPacket) {
fn sign_connect(&self, packet: &mut PRUDPV1Packet) {
packet.set_sizes();
packet.calculate_and_assign_signature(self.key, None, Some(self.self_signature));
}
fn sign_packet(&self, packet: &mut PRUDPPacket) {
fn sign_packet(&self, packet: &mut PRUDPV1Packet) {
packet.set_sizes();
packet.calculate_and_assign_signature(self.key, None, Some(self.self_signature));
}
fn verify_packet(&self, packet: &PRUDPPacket) -> bool {
fn verify_packet(&self, packet: &PRUDPV1Packet) -> bool {
true
}
}

149
src/reggie.rs Normal file
View file

@ -0,0 +1,149 @@
use std::{env, fs, io};
use std::sync::Arc;
use macros::{method_id, rmc_proto, rmc_struct};
use once_cell::sync::Lazy;
use rustls::{ClientConfig, RootCertStore, ServerConfig};
use rustls::client::WebPkiServerVerifier;
use rustls::server::WebPkiClientVerifier;
use rustls_pki_types::{CertificateDer, PrivateKeyDer, ServerName, TrustAnchor};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio_rustls::{TlsAcceptor, TlsConnector};
use tokio_rustls::client::TlsStream;
use webpki::anchor_from_trusted_cert;
use crate::define_rmc_proto;
use crate::endianness::IS_BIG_ENDIAN;
use crate::rmc::response::ErrorCode;
use crate::rmc::structures::RmcSerialize;
pub static SERVER_NAME: Lazy<String> = Lazy::new(|| {
env::var("REGGIE_SERVER_NAME").expect("no server name specified")
});
pub static SELF_CERT: Lazy<CertificateDer<'static>> = Lazy::new(|| CertificateDer::from(fs::read(&format!("/opt/reggie/certs/{}.crt", SERVER_NAME.as_str())).expect("failed to read self cpub ertificate")));
pub static ROOT_CA: Lazy<CertificateDer<'static>> = Lazy::new(|| CertificateDer::from(fs::read("/opt/reggie/certs/CA.crt").expect("failed to read root certipub ficate")));
pub static SELF_KEY: Lazy<PrivateKeyDer<'static>> = Lazy::new(|| PrivateKeyDer::try_from(fs::read(&format!("/opt/reggie/certs/{}.key", SERVER_NAME.as_str())).expect("failed to read self pub key")).expect("failed to read self key"));
pub static ROOT_TRUST_ANCHOR: Lazy<TrustAnchor<'static>> = Lazy::new(|| anchor_from_trusted_cert(&*ROOT_CA).expect("unable to create root ca trust anchor"));
pub fn get_root_store() -> RootCertStore {
RootCertStore {
roots: vec![
ROOT_TRUST_ANCHOR.clone()
],
}
}
pub fn get_root_cert_verifier() -> RootCertStore {
RootCertStore {
roots: vec![
ROOT_TRUST_ANCHOR.clone()
],
}
}
pub async fn get_configured_tls_acceptor() -> TlsAcceptor{
let store = get_root_store();
let cert_verifier = WebPkiClientVerifier::builder(store.into())
.build()
.expect("unable to build cert verifier");
let config = ServerConfig::builder()
//.with_no_client_auth()
.with_client_cert_verifier(cert_verifier)
.with_single_cert(vec![
SELF_CERT.clone(),
ROOT_CA.clone()
], SELF_KEY.clone_key())
.expect("unable to create server config");
TlsAcceptor::from(Arc::new(config))
}
pub async fn get_configured_tls_connector() -> TlsConnector{
let store = get_root_store();
let cert_verifier = WebPkiServerVerifier::builder(store.into())
.build()
.expect("unable to build cert verifier");
let config = ClientConfig::builder()
//.with_root_certificates(get_root_store())
.with_webpki_verifier(cert_verifier)
.with_client_auth_cert(vec![
SELF_CERT.clone(),
ROOT_CA.clone()
], SELF_KEY.clone_key())
.expect("unable to create client config");
TlsConnector::from(Arc::new(config))
}
pub trait UnitPacketRead: AsyncRead + Unpin{
async fn read_buffer(&mut self) -> Result<Vec<u8>, io::Error>{
let mut len_raw: [u8; 4] = [0; 4];
self.read_exact(&mut len_raw).await?;
let len = u32::from_le_bytes(len_raw);
let mut vec = vec![0u8; len as _];
self.read_exact(&mut vec).await?;
Ok(vec)
}
}
impl<T: AsyncRead + Unpin> UnitPacketRead for T{}
pub trait UnitPacketWrite: AsyncWrite + Unpin{
async fn send_buffer(&mut self, data: &[u8]) -> Result<(), io::Error> {
let mut dest_data = Vec::new();
data.serialize(&mut dest_data).expect("ran out of memory or something");
self.write_all(&dest_data[..]).await?;
self.flush().await?;
Ok(())
}
}
impl<T: AsyncWrite + Unpin> UnitPacketWrite for T{}
pub async fn establish_tls_connection_to(address: &str, server_name: &'static str) -> TlsStream<TcpStream>{
let connector = get_configured_tls_connector().await;
let stream = TcpStream::connect(address).await.unwrap();
let stream = connector.connect(ServerName::try_from(server_name).unwrap(), stream).await
.expect("unable to connect via tls");
stream
}
#[rmc_proto(1)]
pub trait RmcTestProto{
#[method_id(1)]
async fn test(&self) -> Result<String, ErrorCode>;
}
define_rmc_proto!(
proto TestProto{
RmcTestProto
}
);
#[rmc_struct(TestProto)]
pub struct TestStruct;
impl RmcTestProto for TestStruct{
async fn test(&self) -> Result<String, ErrorCode> {
Ok("heya".into())
}
}

View file

@ -9,6 +9,7 @@ pub mod nat_traversal;
pub mod matchmake_ext;
pub mod ranking;
use crate::util::{SendingBufferConnection, SplittableBufferConnection};
use crate::prudp::socket::{ExternalConnection, SendingConnection};
use crate::rmc::message::RMCMessage;
use crate::rmc::protocols::RemoteCallError::ConnectionBroke;
@ -45,7 +46,7 @@ pub enum RemoteCallError {
InvalidResponse(#[from] structures::Error),
}
pub struct RmcConnection(pub SendingConnection, pub RmcResponseReceiver);
pub struct RmcConnection(pub SendingBufferConnection, pub RmcResponseReceiver);
pub struct RmcResponseReceiver(Arc<Notify>, Arc<Mutex<HashMap<u32, RMCResponse>>>);
@ -141,7 +142,7 @@ pub trait RmcCallable {
//type Remote: RemoteObject;
fn rmc_call(
&self,
responder: &SendingConnection,
responder: &SendingBufferConnection,
protocol_id: u16,
method_id: u32,
call_id: u32,
@ -156,7 +157,7 @@ macro_rules! define_rmc_proto {
}) => {
paste::paste!{
pub trait [<Local $name>]: std::any::Any $( + [<Raw $protocol>] + $protocol)* {
async fn rmc_call(&self, remote_response_connection: &crate::prudp::socket::SendingConnection, protocol_id: u16, method_id: u32, call_id: u32, rest: Vec<u8>){
async fn rmc_call(&self, remote_response_connection: &rust_nex::util::SendingBufferConnection, protocol_id: u16, method_id: u32, call_id: u32, rest: Vec<u8>){
match protocol_id{
$(
[<Raw $protocol Info>]::PROTOCOL_ID => <Self as [<Raw $protocol>]>::rmc_call_proto(self, remote_response_connection, method_id, call_id, rest).await,
@ -166,16 +167,16 @@ macro_rules! define_rmc_proto {
}
}
pub struct [<Remote $name>](crate::rmc::protocols::RmcConnection);
pub struct [<Remote $name>](rust_nex::rmc::protocols::RmcConnection);
impl crate::rmc::protocols::RemoteInstantiatable for [<Remote $name>]{
fn new(conn: crate::rmc::protocols::RmcConnection) -> Self{
impl rust_nex::rmc::protocols::RemoteInstantiatable for [<Remote $name>]{
fn new(conn: rust_nex::rmc::protocols::RmcConnection) -> Self{
Self(conn)
}
}
impl crate::rmc::protocols::HasRmcConnection for [<Remote $name>]{
fn get_connection(&self) -> &crate::rmc::protocols::RmcConnection{
impl rust_nex::rmc::protocols::HasRmcConnection for [<Remote $name>]{
fn get_connection(&self) -> &rust_nex::rmc::protocols::RmcConnection{
&self.0
}
}
@ -191,7 +192,7 @@ macro_rules! define_rmc_proto {
impl RmcCallable for () {
async fn rmc_call(
&self,
remote_response_connection: &crate::prudp::socket::SendingConnection,
remote_response_connection: &SendingBufferConnection,
protocol_id: u16,
method_id: u32,
call_id: u32,
@ -222,13 +223,13 @@ impl<T: RemoteInstantiatable> OnlyRemote<T>{
}
impl<T: RemoteInstantiatable> RmcCallable for OnlyRemote<T>{
fn rmc_call(&self, responder: &SendingConnection, protocol_id: u16, method_id: u32, call_id: u32, rest: Vec<u8>) -> impl std::future::Future<Output = ()> + Send {
fn rmc_call(&self, responder: &SendingBufferConnection, protocol_id: u16, method_id: u32, call_id: u32, rest: Vec<u8>) -> impl std::future::Future<Output = ()> + Send {
async{}
}
}
async fn handle_incoming<T: RmcCallable + Send + Sync + 'static>(
mut connection: ExternalConnection,
mut connection: SplittableBufferConnection,
remote: Arc<T>,
notify: Arc<Notify>,
incoming: Arc<Mutex<HashMap<u32, RMCResponse>>>,
@ -278,7 +279,7 @@ async fn handle_incoming<T: RmcCallable + Send + Sync + 'static>(
info!("rmc disconnected")
}
pub fn new_rmc_gateway_connection<T: RmcCallable + Sync + Send + 'static,F>(conn: ExternalConnection, create_internal: F) -> Arc<T>
pub fn new_rmc_gateway_connection<T: RmcCallable + Sync + Send + 'static,F>(conn: SplittableBufferConnection, create_internal: F) -> Arc<T>
where
F: FnOnce(RmcConnection) -> Arc<T>,
{

View file

@ -5,7 +5,7 @@ use bytemuck::bytes_of;
use log::error;
use v_byte_macros::EnumTryInto;
use crate::endianness::{ReadExtensions, IS_BIG_ENDIAN};
use crate::prudp::packet::{PRUDPPacket};
use crate::prudp::packet::{PRUDPV1Packet};
use crate::prudp::packet::flags::{NEED_ACK, RELIABLE};
use crate::prudp::packet::PacketOption::FragmentId;
use crate::prudp::packet::types::DATA;
@ -13,6 +13,8 @@ use crate::prudp::socket::{ExternalConnection, SendingConnection};
use crate::rmc::response::ErrorCode::Core_Exception;
use crate::rmc::structures::qresult::ERROR_MASK;
use crate::rmc::structures::RmcSerialize;
use crate::util::SendingBufferConnection;
pub enum RMCResponseResult {
Success {
call_id: u32,
@ -145,7 +147,7 @@ pub fn generate_response(protocol_id: u8, response: RMCResponseResult) -> io::Re
}
pub async fn send_result(
connection: &SendingConnection,
connection: &SendingBufferConnection,
result: Result<Vec<u8>, ErrorCode>,
protocol_id: u8,
method_id: u32,
@ -173,7 +175,7 @@ pub async fn send_result(
send_response(connection, response).await
}
pub async fn send_response(connection: &SendingConnection, rmcresponse: RMCResponse) {
pub async fn send_response(connection: &SendingBufferConnection, rmcresponse: RMCResponse) {
connection.send(rmcresponse.to_data()).await;
}

View file

@ -33,10 +33,19 @@ pub mod primitives;
pub mod matchmake;
pub mod variant;
pub mod ranking;
mod networking;
pub trait RmcSerialize: Sized{
fn serialize(&self, writer: &mut dyn Write) -> Result<()>;
fn deserialize(reader: &mut dyn Read) -> Result<Self>;
fn to_data(&self) -> Vec<u8>{
let mut data = Vec::new();
self.serialize(&mut data).expect("out of memory or something");
data
}
}
impl RmcSerialize for (){
@ -46,4 +55,6 @@ impl RmcSerialize for (){
fn deserialize(reader: &mut dyn Read) -> Result<Self> {
Ok(())
}
}

View file

@ -0,0 +1,32 @@
use std::io::{Read, Write};
use std::net::{Ipv4Addr, SocketAddrV4};
use crate::prudp::packet::VirtualPort;
use crate::rmc::structures::RmcSerialize;
impl RmcSerialize for SocketAddrV4{
fn serialize(&self, writer: &mut dyn Write) -> crate::rmc::structures::Result<()> {
self.ip().to_bits().serialize(writer)?;
self.port().serialize(writer)?;
Ok(())
}
fn deserialize(reader: &mut dyn Read) -> crate::rmc::structures::Result<Self> {
let ip = u32::deserialize(reader)?;
let port = u16::deserialize(reader)?;
Ok(SocketAddrV4::new(Ipv4Addr::from_bits(ip), port))
}
}
impl RmcSerialize for VirtualPort{
fn serialize(&self, writer: &mut dyn Write) -> crate::rmc::structures::Result<()> {
self.0.serialize(writer)?;
Ok(())
}
fn deserialize(reader: &mut dyn Read) -> crate::rmc::structures::Result<Self> {
Ok(Self(u8::deserialize(reader)?))
}
}

12
src/rnex_proxy_common.rs Normal file
View file

@ -0,0 +1,12 @@
use macros::RmcSerialize;
use crate::kerberos::KerberosDateTime;
use crate::prudp::sockaddr::PRUDPSockAddr;
#[derive(Debug, RmcSerialize)]
#[rmc_struct(0)]
pub struct ConnectionInitData{
pub prudpsock_addr: PRUDPSockAddr,
pub pid: u32,
}

94
src/util.rs Normal file
View file

@ -0,0 +1,94 @@
use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::ops::Deref;
use log::error;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::task;
use rust_nex::reggie::{UnitPacketRead, UnitPacketWrite};
#[derive(Clone)]
pub struct SendingBufferConnection(Sender<Vec<u8>>);
pub struct SplittableBufferConnection(SendingBufferConnection, Receiver<Vec<u8>>);
impl AsRef<SendingBufferConnection> for SplittableBufferConnection{
fn as_ref(&self) -> &SendingBufferConnection {
&self.0
}
}
impl Deref for SplittableBufferConnection{
type Target = SendingBufferConnection;
fn deref(&self) -> &Self::Target {
self.as_ref()
}
}
impl<T: Send + Unpin + AsyncWrite + AsyncRead + 'static> From<T> for SplittableBufferConnection{
fn from(value: T) -> Self {
Self::new(value)
}
}
impl SplittableBufferConnection {
fn new<T: Send + Unpin + AsyncWrite + AsyncRead + 'static>(stream: T) -> Self {
let (outside_send, inside_recv) = channel::<Vec<u8>>(10);
let (inside_send, outside_recv) = channel::<Vec<u8>>(10);
task::spawn(async move {
let sender = inside_send;
let mut recver = inside_recv;
let mut stream = stream;
loop {
tokio::select! {
data = recver.recv() => {
let Some(data) = data else {
break;
};
if let Err(e) = stream.send_buffer(&data[..]).await{
error!("error sending data to backend: {}", e);
break;
}
},
data = stream.read_buffer() => {
let data = match data{
Ok(d) => d,
Err(e) => {
error!("error reveiving data from backend: {}", e);
break;
}
};
if let Err(e) = sender.send(data).await{
error!("a send error occurred {}", e);
return;
}
},
}
}
});
Self(SendingBufferConnection(outside_send), outside_recv)
}
}
impl SendingBufferConnection{
pub async fn send(&self, buffer: Vec<u8>) -> Option<()>{
self.0.send(buffer).await.ok()
}
}
impl SplittableBufferConnection{
pub async fn recv(&mut self) -> Option<Vec<u8>>{
self.1.recv().await
}
pub fn duplicate_sender(&self) -> SendingBufferConnection{
self.0.clone()
}
}

View file

@ -45,7 +45,7 @@ async fn players_in_match(mmm: &State<Arc<MatchmakeManager>>, gid: u32) -> Optio
Some(Json(gathering.connected_players.iter().filter_map(|p| p.upgrade()).map(|p| p.pid).collect()))
}
/*
#[get("/player/<pid>/disconnect")]
async fn disconnect_player(_auth: RnexApiAuth, mmm: &State<Arc<MatchmakeManager>>, pid: u32) -> Option<()>{
// this doesnt work and is broken, there might be some other way to remotely close gatherings...
@ -59,7 +59,7 @@ async fn disconnect_player(_auth: RnexApiAuth, mmm: &State<Arc<MatchmakeManager>
Some(())
}
}*/
#[get("/gathering/<gid>/close")]
async fn close_gathering(_auth: RnexApiAuth, mmm: &State<Arc<MatchmakeManager>>, gid: u32) -> Option<()>{
@ -89,7 +89,7 @@ async fn close_gathering(_auth: RnexApiAuth, mmm: &State<Arc<MatchmakeManager>>,
pub async fn start_web(mgr: Arc<MatchmakeManager>) -> JoinHandle<()> {
tokio::spawn(async move {
rocket::build()
.mount("/", routes![gatherings, players_in_match, close_gathering, disconnect_player])
.mount("/", routes![gatherings, players_in_match, close_gathering])
.manage(mgr)
.launch().await
.expect("unable to start webserver");