change to using different connections
This commit is contained in:
parent
f12904909a
commit
11b0393e6a
10 changed files with 86 additions and 736 deletions
|
|
@ -87,5 +87,5 @@ name = "backend_server_secure"
|
||||||
path = "src/executables/backend_server_secure.rs"
|
path = "src/executables/backend_server_secure.rs"
|
||||||
|
|
||||||
[[bin]]
|
[[bin]]
|
||||||
name = "control_server"
|
name = "edge_node_holder_server"
|
||||||
path = "src/executables/control_server.rs"
|
path = "src/executables/edge_node_holder_server.rs"
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
use rust_nex::reggie::{RemoteController, UnitPacketRead, WebStreamSocket};
|
use rust_nex::reggie::{RemoteEdgeNodeHolder, UnitPacketRead};
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use rustls::client::danger::HandshakeSignatureValid;
|
use rustls::client::danger::HandshakeSignatureValid;
|
||||||
|
|
@ -11,7 +11,6 @@ use rustls::{
|
||||||
};
|
};
|
||||||
use rustls_pki_types::PrivateKeyDer;
|
use rustls_pki_types::PrivateKeyDer;
|
||||||
use rust_nex::common::setup;
|
use rust_nex::common::setup;
|
||||||
use rust_nex::reggie::{get_configured_tls_acceptor, TestStruct, ROOT_TRUST_ANCHOR, SELF_CERT, SELF_KEY};
|
|
||||||
use std::borrow::ToOwned;
|
use std::borrow::ToOwned;
|
||||||
use std::{env, fs};
|
use std::{env, fs};
|
||||||
use std::io::Cursor;
|
use std::io::Cursor;
|
||||||
|
|
@ -19,18 +18,18 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use macros::{method_id, rmc_proto, rmc_struct};
|
use macros::{method_id, rmc_proto, rmc_struct};
|
||||||
use tokio::io::AsyncReadExt;
|
use tokio::io::AsyncReadExt;
|
||||||
use tokio::net::{TcpListener, TcpSocket};
|
use tokio::net::{TcpListener, TcpSocket, TcpStream};
|
||||||
use tokio::task;
|
use tokio::task;
|
||||||
use tokio_rustls::TlsAcceptor;
|
use tokio_rustls::TlsAcceptor;
|
||||||
use rust_nex::define_rmc_proto;
|
use rust_nex::define_rmc_proto;
|
||||||
use rust_nex::executables::common::{OWN_IP_PRIVATE, SECURE_SERVER_ACCOUNT, SERVER_PORT};
|
use rust_nex::executables::common::{OWN_IP_PRIVATE, SECURE_EDGE_NODE_HOLDER, SECURE_SERVER_ACCOUNT, SERVER_PORT};
|
||||||
use rust_nex::nex::auth_handler::AuthHandler;
|
use rust_nex::nex::auth_handler::AuthHandler;
|
||||||
use rust_nex::reggie::ServerCluster::Auth;
|
use rust_nex::reggie::EdgeNodeHolderConnectOption::DontRegister;
|
||||||
use rust_nex::reggie::ServerType::Backend;
|
|
||||||
use rust_nex::rmc::protocols::{new_rmc_gateway_connection, OnlyRemote};
|
use rust_nex::rmc::protocols::{new_rmc_gateway_connection, OnlyRemote};
|
||||||
use rust_nex::rmc::response::ErrorCode;
|
use rust_nex::rmc::response::ErrorCode;
|
||||||
use rust_nex::rmc::structures::RmcSerialize;
|
use rust_nex::rmc::structures::RmcSerialize;
|
||||||
use rust_nex::rnex_proxy_common::ConnectionInitData;
|
use rust_nex::rnex_proxy_common::ConnectionInitData;
|
||||||
|
use rust_nex::util::SplittableBufferConnection;
|
||||||
|
|
||||||
pub static SECURE_PROXY_ADDR: Lazy<Ipv4Addr> = Lazy::new(|| {
|
pub static SECURE_PROXY_ADDR: Lazy<Ipv4Addr> = Lazy::new(|| {
|
||||||
env::var("SECURE_PROXY_ADDR")
|
env::var("SECURE_PROXY_ADDR")
|
||||||
|
|
@ -46,41 +45,25 @@ pub static SECURE_PROXY_PORT: Lazy<u16> = Lazy::new(|| {
|
||||||
.unwrap_or(10000)
|
.unwrap_or(10000)
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
setup();
|
setup();
|
||||||
|
|
||||||
let conn = rust_nex::reggie::rmc_connect_to(
|
let conn = TcpStream::connect(&*SECURE_EDGE_NODE_HOLDER).await.unwrap();
|
||||||
"agmp-control.spfn.net",
|
|
||||||
Backend{
|
|
||||||
name: "agmp-auth-1.spfn.net".to_string(),
|
|
||||||
cluster: Auth
|
|
||||||
},
|
|
||||||
|r| Arc::new(OnlyRemote::<RemoteController>::new(r))
|
|
||||||
).await;
|
|
||||||
let conn = conn.unwrap();
|
|
||||||
|
|
||||||
|
let conn: SplittableBufferConnection = conn.into();
|
||||||
|
|
||||||
let acceptor = get_configured_tls_acceptor().await;
|
conn.send(DontRegister.to_data()).await;
|
||||||
|
|
||||||
|
let conn = new_rmc_gateway_connection(conn, |r| Arc::new(OnlyRemote::<RemoteEdgeNodeHolder>::new(r)));
|
||||||
|
|
||||||
let listen = TcpListener::bind(SocketAddrV4::new(*OWN_IP_PRIVATE, *SERVER_PORT)).await.unwrap();
|
let listen = TcpListener::bind(SocketAddrV4::new(*OWN_IP_PRIVATE, *SERVER_PORT)).await.unwrap();
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
while let Ok((stream, addr)) = listen.accept().await {
|
while let Ok((mut stream, addr)) = listen.accept().await {
|
||||||
let Ok(websocket) = tokio_tungstenite::accept_async(stream).await else {
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
|
|
||||||
let stream = WebStreamSocket::new(websocket);
|
|
||||||
|
|
||||||
let mut stream = match acceptor.accept(stream).await {
|
|
||||||
Ok(v) => v,
|
|
||||||
Err(e) => {
|
|
||||||
error!("an error ocurred whilest accepting tls connection: {:?}", e);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let buffer = match stream.read_buffer().await{
|
let buffer = match stream.read_buffer().await{
|
||||||
Ok(v) => v,
|
Ok(v) => v,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|
|
||||||
|
|
@ -1,41 +1,27 @@
|
||||||
use std::io::Cursor;
|
use std::io::Cursor;
|
||||||
use rust_nex::rmc::structures::RmcSerialize;
|
use rust_nex::rmc::structures::RmcSerialize;
|
||||||
use rust_nex::reggie::{RemoteController, UnitPacketRead, WebStreamSocket};
|
use rust_nex::reggie::{RemoteEdgeNodeHolder, UnitPacketRead};
|
||||||
use std::net::SocketAddrV4;
|
use std::net::SocketAddrV4;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::atomic::AtomicU32;
|
use std::sync::atomic::AtomicU32;
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
use tokio::task;
|
use tokio::task;
|
||||||
use rust_nex::common::setup;
|
use rust_nex::common::setup;
|
||||||
use rust_nex::executables::common::{OWN_IP_PRIVATE, SERVER_PORT};
|
use rust_nex::executables::common::{OWN_IP_PRIVATE, SECURE_EDGE_NODE_HOLDER, SERVER_PORT};
|
||||||
use rust_nex::nex::matchmake::MatchmakeManager;
|
use rust_nex::nex::matchmake::MatchmakeManager;
|
||||||
use rust_nex::nex::remote_console::RemoteConsole;
|
use rust_nex::nex::remote_console::RemoteConsole;
|
||||||
use rust_nex::nex::user::User;
|
use rust_nex::nex::user::User;
|
||||||
use rust_nex::reggie::get_configured_tls_acceptor;
|
use rust_nex::reggie::EdgeNodeHolderConnectOption::DontRegister;
|
||||||
use rust_nex::reggie::ServerCluster::Secure;
|
|
||||||
use rust_nex::reggie::ServerType::Backend;
|
|
||||||
use rust_nex::rmc::protocols::{new_rmc_gateway_connection, OnlyRemote};
|
use rust_nex::rmc::protocols::{new_rmc_gateway_connection, OnlyRemote};
|
||||||
use rust_nex::rnex_proxy_common::ConnectionInitData;
|
use rust_nex::rnex_proxy_common::ConnectionInitData;
|
||||||
use rust_nex::rmc::protocols::RemoteInstantiatable;
|
use rust_nex::rmc::protocols::RemoteInstantiatable;
|
||||||
|
use rust_nex::util::SplittableBufferConnection;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
setup();
|
setup();
|
||||||
|
|
||||||
let conn = rust_nex::reggie::rmc_connect_to(
|
|
||||||
"agmp-control.spfn.net",
|
|
||||||
Backend{
|
|
||||||
name: "agmp-secure-1.spfn.net".to_string(),
|
|
||||||
cluster: Secure
|
|
||||||
},
|
|
||||||
|r| Arc::new(OnlyRemote::<RemoteController>::new(r))
|
|
||||||
).await;
|
|
||||||
let conn = conn.unwrap();
|
|
||||||
|
|
||||||
let acceptor = get_configured_tls_acceptor().await;
|
|
||||||
|
|
||||||
let listen = TcpListener::bind(SocketAddrV4::new(*OWN_IP_PRIVATE, *SERVER_PORT)).await.unwrap();
|
let listen = TcpListener::bind(SocketAddrV4::new(*OWN_IP_PRIVATE, *SERVER_PORT)).await.unwrap();
|
||||||
|
|
||||||
let mmm = Arc::new(MatchmakeManager{
|
let mmm = Arc::new(MatchmakeManager{
|
||||||
|
|
@ -49,21 +35,7 @@ async fn main() {
|
||||||
|
|
||||||
MatchmakeManager::initialize_garbage_collect_thread(weak_mmm).await;
|
MatchmakeManager::initialize_garbage_collect_thread(weak_mmm).await;
|
||||||
|
|
||||||
while let Ok((stream, addr)) = listen.accept().await {
|
while let Ok((mut stream, addr)) = listen.accept().await {
|
||||||
let Ok(websocket) = tokio_tungstenite::accept_async(stream).await else {
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
|
|
||||||
let stream = WebStreamSocket::new(websocket);
|
|
||||||
|
|
||||||
let mut stream = match acceptor.accept(stream).await {
|
|
||||||
Ok(v) => v,
|
|
||||||
Err(e) => {
|
|
||||||
error!("an error ocurred whilest accepting tls connection: {:?}", e);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let buffer = match stream.read_buffer().await{
|
let buffer = match stream.read_buffer().await{
|
||||||
Ok(v) => v,
|
Ok(v) => v,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|
|
||||||
|
|
@ -40,4 +40,17 @@ pub static AUTH_SERVER_ACCOUNT: Lazy<Account> =
|
||||||
pub static SECURE_SERVER_ACCOUNT: Lazy<Account> =
|
pub static SECURE_SERVER_ACCOUNT: Lazy<Account> =
|
||||||
Lazy::new(|| Account::new(2, "Quazal Rendez-Vous", &KERBEROS_SERVER_PASSWORD));
|
Lazy::new(|| Account::new(2, "Quazal Rendez-Vous", &KERBEROS_SERVER_PASSWORD));
|
||||||
|
|
||||||
|
pub static SECURE_EDGE_NODE_HOLDER: Lazy<SocketAddrV4> = Lazy::new(||{
|
||||||
|
env::var("SECURE_EDGE_NODE_HOLDER")
|
||||||
|
.ok()
|
||||||
|
.and_then(|s| s.parse().ok())
|
||||||
|
.expect("SECURE_EDGE_NODE_HOLDER not set")
|
||||||
|
});
|
||||||
|
|
||||||
|
pub static FORWARD_DESTINATION: Lazy<SocketAddrV4> =
|
||||||
|
Lazy::new(||
|
||||||
|
env::var("FORWARD_DESTINATION")
|
||||||
|
.ok()
|
||||||
|
.and_then(|s| s.parse().ok())
|
||||||
|
.expect("SECURE_EDGE_NODE_HOLDER not set")
|
||||||
|
);
|
||||||
|
|
|
||||||
|
|
@ -1,215 +0,0 @@
|
||||||
use std::future::Future;
|
|
||||||
use rust_nex::rmc::protocols::{LocalNoProto, RmcCallable};
|
|
||||||
use rust_nex::rmc::structures::RmcSerialize;
|
|
||||||
use std::io::Cursor;
|
|
||||||
use std::net::{Ipv4Addr, SocketAddrV4};
|
|
||||||
use macros::rmc_struct;
|
|
||||||
use rust_nex::common::setup;
|
|
||||||
use rust_nex::prudp::station_url::StationUrl;
|
|
||||||
use rust_nex::reggie::{get_configured_tls_acceptor, ControllerManagement, RemoteProxy, ServerCluster, ServerType, TestStruct, WebStreamSocket};
|
|
||||||
use rust_nex::rmc::protocols::{new_rmc_gateway_connection, OnlyRemote};
|
|
||||||
use rust_nex::rmc::response::ErrorCode;
|
|
||||||
use rust_nex::reggie::UnitPacketRead;
|
|
||||||
use std::sync::{Arc, Weak};
|
|
||||||
use log::error;
|
|
||||||
use once_cell::sync::Lazy;
|
|
||||||
use rand::random;
|
|
||||||
use tokio::net::TcpListener;
|
|
||||||
use tokio::sync::RwLock;
|
|
||||||
use tokio::task;
|
|
||||||
use tungstenite::client;
|
|
||||||
use rust_nex::executables::common::KERBEROS_SERVER_PASSWORD;
|
|
||||||
use rust_nex::nex::account::Account;
|
|
||||||
use rust_nex::rmc::response::ErrorCode::{Core_Exception, Core_InvalidIndex};
|
|
||||||
use rust_nex::rmc::protocols::RemoteInstantiatable;
|
|
||||||
use rust_nex::util::SendingBufferConnection;
|
|
||||||
use rust_nex::reggie::LocalController;
|
|
||||||
use rust_nex::reggie::RemoteProxyManagement;
|
|
||||||
|
|
||||||
pub static AUTH_SERVER_ACCOUNT: Lazy<Account> =
|
|
||||||
Lazy::new(|| Account::new(1, "Quazal Authentication", &KERBEROS_SERVER_PASSWORD));
|
|
||||||
pub static SECURE_SERVER_ACCOUNT: Lazy<Account> =
|
|
||||||
Lazy::new(|| Account::new(2, "Quazal Rendez-Vous", &KERBEROS_SERVER_PASSWORD));
|
|
||||||
|
|
||||||
#[rmc_struct(Controller)]
|
|
||||||
struct ServerController {
|
|
||||||
insecure_proxies: RwLock<Vec<Weak<Proxy>>>,
|
|
||||||
insecure_backend_url: RwLock<String>,
|
|
||||||
secure_proxies: RwLock<Vec<Weak<Proxy>>>,
|
|
||||||
secure_backend_url: RwLock<String>,
|
|
||||||
account: Account
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ServerController{
|
|
||||||
async fn update_urls(&self, cluster: ServerCluster){
|
|
||||||
let url = match cluster{
|
|
||||||
ServerCluster::Auth => {
|
|
||||||
self.insecure_backend_url.read().await
|
|
||||||
}
|
|
||||||
ServerCluster::Secure => {
|
|
||||||
self.secure_backend_url.read().await
|
|
||||||
}
|
|
||||||
}.clone();
|
|
||||||
|
|
||||||
let read_lock = match cluster{
|
|
||||||
ServerCluster::Auth => {
|
|
||||||
self.insecure_proxies.read().await
|
|
||||||
}
|
|
||||||
ServerCluster::Secure => {
|
|
||||||
self.secure_proxies.read().await
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
for proxy in read_lock.iter().filter_map(|v| v.upgrade()){
|
|
||||||
if let Err(e) = proxy.proxy.update_url(url.clone()).await {
|
|
||||||
error!("error whilest updating proxy url: {:?}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct Proxy{
|
|
||||||
proxy: RemoteProxy,
|
|
||||||
ip: SocketAddrV4,
|
|
||||||
controller: Arc<ServerController>
|
|
||||||
}
|
|
||||||
|
|
||||||
impl RmcCallable for Proxy{
|
|
||||||
fn rmc_call(&self, responder: &SendingBufferConnection, protocol_id: u16, method_id: u32, call_id: u32, rest: Vec<u8>) -> impl Future<Output=()> + Send {
|
|
||||||
self.controller.rmc_call(responder, protocol_id, method_id, call_id, rest)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
impl ControllerManagement for ServerController {
|
|
||||||
async fn get_secure_proxy_url(&self) -> Result<String, ErrorCode> {
|
|
||||||
let proxy = self.secure_proxies.write().await;
|
|
||||||
|
|
||||||
let proxies = proxy.iter().filter_map(|v| v.upgrade());
|
|
||||||
|
|
||||||
let idx: usize = random::<usize>() % proxy.len();
|
|
||||||
// do not switch this to using regular array indexing i specifically wrote it like this as
|
|
||||||
// to have absolutely now way of panicking, we cant have the control server panicking after
|
|
||||||
// all
|
|
||||||
let Some(proxy) = proxies.clone().nth(idx).or_else(|| proxies.clone().nth(0)) else {
|
|
||||||
return Err(Core_InvalidIndex);
|
|
||||||
};
|
|
||||||
|
|
||||||
let station_url = format!(
|
|
||||||
"prudps:/PID=2;sid=1;stream=10;type=2;address={};port={};CID=1",
|
|
||||||
proxy.ip.ip(), proxy.ip.port()
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(station_url)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_secure_account(&self) -> Result<Account, ErrorCode> {
|
|
||||||
Ok(self.account.clone())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#[tokio::main]
|
|
||||||
async fn main() {
|
|
||||||
setup();
|
|
||||||
|
|
||||||
let socket = TcpListener::bind("0.0.0.0:10003").await.unwrap();
|
|
||||||
|
|
||||||
let acceptor = get_configured_tls_acceptor().await;
|
|
||||||
|
|
||||||
let server_controller = Arc::new(ServerController {
|
|
||||||
account: SECURE_SERVER_ACCOUNT.clone(),
|
|
||||||
secure_proxies: Default::default(),
|
|
||||||
secure_backend_url: Default::default(),
|
|
||||||
insecure_backend_url: Default::default(),
|
|
||||||
insecure_proxies: Default::default(),
|
|
||||||
});
|
|
||||||
|
|
||||||
while let Ok((stream, _sock_addr)) = socket.accept().await {
|
|
||||||
let Ok(websocket) = tokio_tungstenite::accept_async(stream).await else {
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
|
|
||||||
let stream = WebStreamSocket::new(websocket);
|
|
||||||
|
|
||||||
let Ok(mut stream) = acceptor.accept(stream).await else{
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
let server_controller = server_controller.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let server_controller = server_controller;
|
|
||||||
let Ok(server_type) = stream.read_buffer().await else {
|
|
||||||
error!("failed to read server type");
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
|
|
||||||
let Ok(server_type) = ServerType::deserialize(&mut Cursor::new(server_type)) else {
|
|
||||||
error!("failed to read server type");
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
|
|
||||||
match server_type {
|
|
||||||
ServerType::Proxy{
|
|
||||||
addr,
|
|
||||||
cluster
|
|
||||||
} => {
|
|
||||||
|
|
||||||
let mut write_lock = match cluster{
|
|
||||||
ServerCluster::Auth => {
|
|
||||||
server_controller.insecure_proxies.write().await
|
|
||||||
}
|
|
||||||
ServerCluster::Secure => {
|
|
||||||
server_controller.secure_proxies.write().await
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let server_controller_internal = server_controller.clone();
|
|
||||||
|
|
||||||
let remo = new_rmc_gateway_connection(stream.into(), move |r|
|
|
||||||
Arc::new(Proxy {
|
|
||||||
proxy: RemoteProxy::new(r),
|
|
||||||
ip: addr,
|
|
||||||
controller: server_controller_internal
|
|
||||||
}));
|
|
||||||
|
|
||||||
write_lock.push(Arc::downgrade(&remo));
|
|
||||||
|
|
||||||
let url = match cluster{
|
|
||||||
ServerCluster::Auth => {
|
|
||||||
server_controller.insecure_backend_url.read().await
|
|
||||||
}
|
|
||||||
ServerCluster::Secure => {
|
|
||||||
server_controller.secure_backend_url.read().await
|
|
||||||
}
|
|
||||||
}.clone();
|
|
||||||
|
|
||||||
if let Err(e) = remo.proxy.update_url(url.clone()).await {
|
|
||||||
error!("error whilest updating proxy url: {:?}", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
ServerType::Backend{
|
|
||||||
name,
|
|
||||||
cluster
|
|
||||||
} => {
|
|
||||||
let mut url = match cluster{
|
|
||||||
ServerCluster::Auth => {
|
|
||||||
server_controller.insecure_backend_url.write().await
|
|
||||||
}
|
|
||||||
ServerCluster::Secure => {
|
|
||||||
server_controller.secure_backend_url.write().await
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
*url = name;
|
|
||||||
drop(url);
|
|
||||||
|
|
||||||
server_controller.update_urls(cluster).await;
|
|
||||||
|
|
||||||
new_rmc_gateway_connection(stream.into(), |_| server_controller);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
6
src/executables/edge_node_holder_server.rs
Normal file
6
src/executables/edge_node_holder_server.rs
Normal file
|
|
@ -0,0 +1,6 @@
|
||||||
|
use rust_nex::common::setup;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() {
|
||||||
|
setup();
|
||||||
|
}
|
||||||
|
|
@ -1,9 +1,9 @@
|
||||||
|
|
||||||
use rust_nex::reggie::{tls_connect_to, LocalProxy};
|
use rust_nex::reggie::RemoteEdgeNodeHolder;
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::ffi::CStr;
|
use std::ffi::CStr;
|
||||||
use std::io::{Read, Write};
|
use std::io::{Read, Write};
|
||||||
use std::net::{Ipv4Addr, SocketAddrV4, TcpListener, TcpStream};
|
use std::net::{Ipv4Addr, SocketAddrV4};
|
||||||
use std::sync::{Arc, OnceLock};
|
use std::sync::{Arc, OnceLock};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use bytemuck::{Pod, Zeroable};
|
use bytemuck::{Pod, Zeroable};
|
||||||
|
|
@ -18,65 +18,38 @@ use rsa::pkcs1::EncodeRsaPublicKey;
|
||||||
use rsa::pss::BlindedSigningKey;
|
use rsa::pss::BlindedSigningKey;
|
||||||
use rsa::signature::{RandomizedSigner, SignatureEncoding};
|
use rsa::signature::{RandomizedSigner, SignatureEncoding};
|
||||||
use sha2::Sha256;
|
use sha2::Sha256;
|
||||||
use tokio::net::TcpSocket;
|
use tokio::net::{TcpSocket, TcpStream};
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
use tokio::task;
|
use tokio::task;
|
||||||
use tokio::time::sleep;
|
use tokio::time::sleep;
|
||||||
use rust_nex::common::setup;
|
use rust_nex::common::setup;
|
||||||
use rust_nex::executables::common::{OWN_IP_PRIVATE, OWN_IP_PUBLIC, SERVER_PORT};
|
use rust_nex::executables::common::{FORWARD_DESTINATION, OWN_IP_PRIVATE, OWN_IP_PUBLIC, SECURE_EDGE_NODE_HOLDER, SERVER_PORT};
|
||||||
use rust_nex::prudp::packet::VirtualPort;
|
use rust_nex::prudp::packet::VirtualPort;
|
||||||
use rust_nex::prudp::router::Router;
|
use rust_nex::prudp::router::Router;
|
||||||
use rust_nex::prudp::station_url::StationUrl;
|
use rust_nex::prudp::station_url::StationUrl;
|
||||||
use rust_nex::prudp::unsecure::Unsecure;
|
use rust_nex::prudp::unsecure::Unsecure;
|
||||||
use rust_nex::reggie::{establish_tls_connection_to, ProxyManagement, UnitPacketRead, UnitPacketWrite};
|
use rust_nex::reggie::{UnitPacketRead, UnitPacketWrite};
|
||||||
use rust_nex::reggie::ServerCluster::Auth;
|
use rust_nex::reggie::EdgeNodeHolderConnectOption::{DontRegister, Register};
|
||||||
use rust_nex::reggie::ServerType::Proxy;
|
use rust_nex::rmc::protocols::{new_rmc_gateway_connection, OnlyRemote};
|
||||||
use rust_nex::rmc::protocols::OnlyRemote;
|
|
||||||
use rust_nex::rmc::response::ErrorCode;
|
use rust_nex::rmc::response::ErrorCode;
|
||||||
use rust_nex::rmc::structures::RmcSerialize;
|
use rust_nex::rmc::structures::RmcSerialize;
|
||||||
use rust_nex::rnex_proxy_common::ConnectionInitData;
|
use rust_nex::rnex_proxy_common::ConnectionInitData;
|
||||||
|
use rust_nex::util::SplittableBufferConnection;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
static FORWARD_DESTINATION: Lazy<String> =
|
|
||||||
Lazy::new(|| env::var("FORWARD_DESTINATION").expect("no forward destination given"));
|
|
||||||
static FORWARD_DESTINATION_NAME: Lazy<String> =
|
|
||||||
Lazy::new(|| env::var("FORWARD_DESTINATION_NAME").expect("no forward destination name given"));
|
|
||||||
|
|
||||||
#[rmc_struct(Proxy)]
|
|
||||||
#[derive(Default)]
|
|
||||||
struct DestinationHolder{
|
|
||||||
url: RwLock<String>
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ProxyManagement for DestinationHolder{
|
|
||||||
async fn update_url(&self, new_url: String) -> Result<(), ErrorCode> {
|
|
||||||
println!("updating url");
|
|
||||||
|
|
||||||
let mut url = self.url.write().await;
|
|
||||||
|
|
||||||
*url = new_url;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
setup();
|
setup();
|
||||||
|
|
||||||
let conn =
|
let conn = tokio::net::TcpStream::connect(&*SECURE_EDGE_NODE_HOLDER).await.unwrap();
|
||||||
rust_nex::reggie::rmc_connect_to(
|
|
||||||
"agmp-control.spfn.net",
|
|
||||||
Proxy {
|
|
||||||
addr: SocketAddrV4::new(*OWN_IP_PUBLIC, *SERVER_PORT),
|
|
||||||
cluster: Auth
|
|
||||||
},
|
|
||||||
|r| Arc::new(DestinationHolder::default())
|
|
||||||
).await;
|
|
||||||
let dest_holder = conn.unwrap();
|
|
||||||
|
|
||||||
|
let conn: SplittableBufferConnection = conn.into();
|
||||||
|
|
||||||
|
conn.send(Register(SocketAddrV4::new(*OWN_IP_PUBLIC, *SERVER_PORT).to_string()).to_data()).await;
|
||||||
|
|
||||||
|
let conn = new_rmc_gateway_connection(conn, |r| Arc::new(OnlyRemote::<RemoteEdgeNodeHolder>::new(r)));
|
||||||
|
|
||||||
let (router_secure, _) = Router::new(SocketAddrV4::new(*OWN_IP_PRIVATE, *SERVER_PORT))
|
let (router_secure, _) = Router::new(SocketAddrV4::new(*OWN_IP_PRIVATE, *SERVER_PORT))
|
||||||
.await
|
.await
|
||||||
|
|
@ -97,18 +70,9 @@ async fn main() {
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
let dest_holder = dest_holder.clone();
|
|
||||||
|
|
||||||
task::spawn(async move {
|
task::spawn(async move {
|
||||||
let dest = dest_holder.url.read().await;
|
|
||||||
|
|
||||||
if *dest == ""{
|
|
||||||
warn!("no destination set yet but connection attempted");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut stream
|
let mut stream
|
||||||
= match tls_connect_to(&dest).await {
|
= match TcpStream::connect(*FORWARD_DESTINATION).await {
|
||||||
Ok(v) => v,
|
Ok(v) => v,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("unable to connect: {}", e);
|
error!("unable to connect: {}", e);
|
||||||
|
|
|
||||||
|
|
@ -11,58 +11,32 @@ use tokio::time::sleep;
|
||||||
use tokio_rustls::client::TlsStream;
|
use tokio_rustls::client::TlsStream;
|
||||||
use tokio_tungstenite::MaybeTlsStream;
|
use tokio_tungstenite::MaybeTlsStream;
|
||||||
use rust_nex::common::setup;
|
use rust_nex::common::setup;
|
||||||
use rust_nex::executables::common::{OWN_IP_PRIVATE, OWN_IP_PUBLIC, SERVER_PORT};
|
use rust_nex::executables::common::{AUTH_SERVER_ACCOUNT, FORWARD_DESTINATION, OWN_IP_PRIVATE, OWN_IP_PUBLIC, SECURE_EDGE_NODE_HOLDER, SECURE_SERVER_ACCOUNT, SERVER_PORT};
|
||||||
use rust_nex::prudp::packet::VirtualPort;
|
use rust_nex::prudp::packet::VirtualPort;
|
||||||
use rust_nex::prudp::router::Router;
|
use rust_nex::prudp::router::Router;
|
||||||
use rust_nex::prudp::secure::Secure;
|
use rust_nex::prudp::secure::Secure;
|
||||||
use rust_nex::prudp::unsecure::Unsecure;
|
use rust_nex::prudp::unsecure::Unsecure;
|
||||||
use rust_nex::reggie::{establish_tls_connection_to, tls_connect_to, ConnectError, ProxyManagement, RemoteController, WebStreamSocket};
|
use rust_nex::reggie::EdgeNodeHolderConnectOption::{DontRegister, Register};
|
||||||
use rust_nex::rmc::response::ErrorCode;
|
use rust_nex::rmc::response::ErrorCode;
|
||||||
use rust_nex::rnex_proxy_common::ConnectionInitData;
|
use rust_nex::rnex_proxy_common::ConnectionInitData;
|
||||||
use rust_nex::reggie::ServerCluster::Auth;
|
use rust_nex::reggie::{RemoteEdgeNodeHolder, UnitPacketWrite};
|
||||||
use rust_nex::reggie::ServerType::Proxy;
|
|
||||||
use rust_nex::reggie::UnitPacketWrite;
|
|
||||||
use rust_nex::rmc::structures::RmcSerialize;
|
use rust_nex::rmc::structures::RmcSerialize;
|
||||||
use rust_nex::reggie::UnitPacketRead;
|
use rust_nex::reggie::UnitPacketRead;
|
||||||
use rust_nex::rmc::protocols::RemoteInstantiatable;
|
use rust_nex::rmc::protocols::{new_rmc_gateway_connection, OnlyRemote, RemoteInstantiatable};
|
||||||
use rust_nex::reggie::LocalProxy;
|
use rust_nex::util::SplittableBufferConnection;
|
||||||
use rust_nex::reggie::RemoteControllerManagement;
|
|
||||||
|
|
||||||
|
|
||||||
#[rmc_struct(Proxy)]
|
|
||||||
struct DestinationHolder{
|
|
||||||
url: RwLock<String>,
|
|
||||||
controller: RemoteController
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ProxyManagement for DestinationHolder{
|
|
||||||
async fn update_url(&self, new_url: String) -> Result<(), ErrorCode> {
|
|
||||||
let mut url = self.url.write().await;
|
|
||||||
|
|
||||||
*url = new_url;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
setup();
|
setup();
|
||||||
|
|
||||||
let conn =
|
let conn = tokio::net::TcpStream::connect(&*SECURE_EDGE_NODE_HOLDER).await.unwrap();
|
||||||
rust_nex::reggie::rmc_connect_to(
|
|
||||||
"agmp-control.spfn.net",
|
let conn: SplittableBufferConnection = conn.into();
|
||||||
Proxy {
|
|
||||||
addr: SocketAddrV4::new(*OWN_IP_PUBLIC, *SERVER_PORT),
|
conn.send(Register(SocketAddrV4::new(*OWN_IP_PUBLIC, *SERVER_PORT).to_string()).to_data()).await;
|
||||||
cluster: Auth
|
|
||||||
},
|
let conn = new_rmc_gateway_connection(conn, |r| Arc::new(OnlyRemote::<RemoteEdgeNodeHolder>::new(r)));
|
||||||
|r| Arc::new(DestinationHolder{
|
|
||||||
url: Default::default(),
|
|
||||||
controller: RemoteController::new(r)
|
|
||||||
})
|
|
||||||
).await;
|
|
||||||
let dest_holder = conn.unwrap();
|
|
||||||
|
|
||||||
|
|
||||||
let (router_secure, _) = Router::new(SocketAddrV4::new(*OWN_IP_PRIVATE, *SERVER_PORT))
|
let (router_secure, _) = Router::new(SocketAddrV4::new(*OWN_IP_PRIVATE, *SERVER_PORT))
|
||||||
|
|
@ -72,7 +46,7 @@ async fn main() {
|
||||||
let mut socket_secure = router_secure
|
let mut socket_secure = router_secure
|
||||||
.add_socket(VirtualPort::new(1, 10), Secure(
|
.add_socket(VirtualPort::new(1, 10), Secure(
|
||||||
"6f599f81",
|
"6f599f81",
|
||||||
dest_holder.controller.get_secure_account().await.unwrap()
|
AUTH_SERVER_ACCOUNT.clone()
|
||||||
))
|
))
|
||||||
.await
|
.await
|
||||||
.expect("unable to add socket");
|
.expect("unable to add socket");
|
||||||
|
|
@ -85,18 +59,9 @@ async fn main() {
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
let dest_holder = dest_holder.clone();
|
|
||||||
|
|
||||||
task::spawn(async move {
|
task::spawn(async move {
|
||||||
let dest = dest_holder.url.read().await;
|
|
||||||
|
|
||||||
if *dest == ""{
|
|
||||||
warn!("no destination set yet but connection attempted");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut stream
|
let mut stream
|
||||||
= match tls_connect_to(&dest).await {
|
= match TcpStream::connect(*FORWARD_DESTINATION).await {
|
||||||
Ok(v) => v,
|
Ok(v) => v,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("unable to connect: {}", e);
|
error!("unable to connect: {}", e);
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,4 @@
|
||||||
|
use std::hash::{DefaultHasher, Hasher};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use crate::grpc::account;
|
use crate::grpc::account;
|
||||||
use crate::kerberos::{derive_key, KerberosDateTime, Ticket};
|
use crate::kerberos::{derive_key, KerberosDateTime, Ticket};
|
||||||
|
|
@ -10,7 +11,7 @@ use crate::rmc::structures::connection_data::ConnectionData;
|
||||||
use crate::rmc::structures::qresult::QResult;
|
use crate::rmc::structures::qresult::QResult;
|
||||||
use crate::{define_rmc_proto, kerberos};
|
use crate::{define_rmc_proto, kerberos};
|
||||||
use macros::rmc_struct;
|
use macros::rmc_struct;
|
||||||
use crate::reggie::{RemoteController, RemoteControllerManagement};
|
use crate::reggie::{RemoteEdgeNodeHolder, RemoteEdgeNodeManagement};
|
||||||
use crate::rmc::protocols::OnlyRemote;
|
use crate::rmc::protocols::OnlyRemote;
|
||||||
|
|
||||||
define_rmc_proto!(
|
define_rmc_proto!(
|
||||||
|
|
@ -24,7 +25,7 @@ pub struct AuthHandler {
|
||||||
pub destination_server_acct: &'static Account,
|
pub destination_server_acct: &'static Account,
|
||||||
pub build_name: &'static str,
|
pub build_name: &'static str,
|
||||||
//pub station_url: &'static str,
|
//pub station_url: &'static str,
|
||||||
pub control_server: Arc<OnlyRemote<RemoteController>>,
|
pub control_server: Arc<OnlyRemote<RemoteEdgeNodeHolder>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn generate_ticket(
|
pub fn generate_ticket(
|
||||||
|
|
@ -86,8 +87,12 @@ impl Auth for AuthHandler {
|
||||||
let ticket = generate_ticket(source_login_data, destination_login_data);
|
let ticket = generate_ticket(source_login_data, destination_login_data);
|
||||||
|
|
||||||
let result = QResult::success(Core_Unknown);
|
let result = QResult::success(Core_Unknown);
|
||||||
|
|
||||||
|
let mut hasher = DefaultHasher::new();
|
||||||
|
|
||||||
|
hasher.write(name.as_bytes());
|
||||||
|
|
||||||
let Ok(addr) = self.control_server.get_secure_proxy_url().await else {
|
let Ok(addr) = self.control_server.get_url(hasher.finish()).await else {
|
||||||
return Err(ErrorCode::Core_Exception);
|
return Err(ErrorCode::Core_Exception);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
||||||
361
src/reggie.rs
361
src/reggie.rs
|
|
@ -1,4 +1,5 @@
|
||||||
use std::{env, fs, io};
|
use std::{env, fs, io};
|
||||||
|
use std::hash::Hash;
|
||||||
use std::io::{Error, ErrorKind};
|
use std::io::{Error, ErrorKind};
|
||||||
use std::net::{SocketAddrV4, ToSocketAddrs};
|
use std::net::{SocketAddrV4, ToSocketAddrs};
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
|
|
@ -26,73 +27,6 @@ use crate::rmc::protocols::{new_rmc_gateway_connection, OnlyRemote, RmcCallable,
|
||||||
use crate::rmc::response::ErrorCode;
|
use crate::rmc::response::ErrorCode;
|
||||||
use crate::rmc::structures::RmcSerialize;
|
use crate::rmc::structures::RmcSerialize;
|
||||||
|
|
||||||
pub static SERVER_NAME: Lazy<String> = Lazy::new(|| {
|
|
||||||
env::var("REGGIE_SERVER_NAME").expect("no server name specified")
|
|
||||||
});
|
|
||||||
|
|
||||||
pub static SELF_CERT: Lazy<CertificateDer<'static>> = Lazy::new(|| CertificateDer::from(fs::read(&format!("/opt/reggie/certs/{}.crt", SERVER_NAME.as_str())).expect("failed to read self cpub ertificate")));
|
|
||||||
pub static ROOT_CA: Lazy<CertificateDer<'static>> = Lazy::new(|| CertificateDer::from(fs::read("/opt/reggie/certs/CA.crt").expect("failed to read root certipub ficate")));
|
|
||||||
pub static SELF_KEY: Lazy<PrivateKeyDer<'static>> = Lazy::new(|| PrivateKeyDer::try_from(fs::read(&format!("/opt/reggie/certs/{}.key", SERVER_NAME.as_str())).expect("failed to read self pub key")).expect("failed to read self key"));
|
|
||||||
pub static ROOT_TRUST_ANCHOR: Lazy<TrustAnchor<'static>> = Lazy::new(|| anchor_from_trusted_cert(&*ROOT_CA).expect("unable to create root ca trust anchor"));
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
pub fn get_root_store() -> RootCertStore {
|
|
||||||
RootCertStore {
|
|
||||||
roots: vec![
|
|
||||||
ROOT_TRUST_ANCHOR.clone()
|
|
||||||
],
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_root_cert_verifier() -> RootCertStore {
|
|
||||||
RootCertStore {
|
|
||||||
roots: vec![
|
|
||||||
ROOT_TRUST_ANCHOR.clone()
|
|
||||||
],
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
pub async fn get_configured_tls_acceptor() -> TlsAcceptor{
|
|
||||||
let store = get_root_store();
|
|
||||||
|
|
||||||
let cert_verifier = WebPkiClientVerifier::builder(store.into())
|
|
||||||
.build()
|
|
||||||
.expect("unable to build cert verifier");
|
|
||||||
|
|
||||||
let config = ServerConfig::builder()
|
|
||||||
//.with_no_client_auth()
|
|
||||||
.with_client_cert_verifier(cert_verifier)
|
|
||||||
.with_single_cert(vec![
|
|
||||||
SELF_CERT.clone(),
|
|
||||||
ROOT_CA.clone()
|
|
||||||
], SELF_KEY.clone_key())
|
|
||||||
.expect("unable to create server config");
|
|
||||||
|
|
||||||
TlsAcceptor::from(Arc::new(config))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn get_configured_tls_connector() -> TlsConnector{
|
|
||||||
let store = get_root_store();
|
|
||||||
|
|
||||||
let cert_verifier = WebPkiServerVerifier::builder(store.into())
|
|
||||||
.build()
|
|
||||||
.expect("unable to build cert verifier");
|
|
||||||
|
|
||||||
let config = ClientConfig::builder()
|
|
||||||
//.with_root_certificates(get_root_store())
|
|
||||||
.with_webpki_verifier(cert_verifier)
|
|
||||||
.with_client_auth_cert(vec![
|
|
||||||
SELF_CERT.clone(),
|
|
||||||
ROOT_CA.clone()
|
|
||||||
], SELF_KEY.clone_key())
|
|
||||||
.expect("unable to create client config");
|
|
||||||
|
|
||||||
|
|
||||||
TlsConnector::from(Arc::new(config))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub trait UnitPacketRead: AsyncRead + Unpin{
|
pub trait UnitPacketRead: AsyncRead + Unpin{
|
||||||
async fn read_buffer(&mut self) -> Result<Vec<u8>, io::Error>{
|
async fn read_buffer(&mut self) -> Result<Vec<u8>, io::Error>{
|
||||||
let mut len_raw: [u8; 4] = [0; 4];
|
let mut len_raw: [u8; 4] = [0; 4];
|
||||||
|
|
@ -126,300 +60,23 @@ pub trait UnitPacketWrite: AsyncWrite + Unpin{
|
||||||
|
|
||||||
impl<T: AsyncWrite + Unpin> UnitPacketWrite for T{}
|
impl<T: AsyncWrite + Unpin> UnitPacketWrite for T{}
|
||||||
|
|
||||||
pub async fn establish_tls_connection_to(address: &str, server_name: &str) -> TlsStream<TcpStream>{
|
|
||||||
let connector = get_configured_tls_connector().await;
|
|
||||||
|
|
||||||
let stream = TcpStream::connect((address, 80u16).to_socket_addrs().unwrap().next().unwrap()).await.unwrap();
|
|
||||||
|
|
||||||
let stream = connector.connect(ServerName::try_from(server_name.to_owned()).unwrap(), stream).await
|
|
||||||
.expect("unable to connect via tls");
|
|
||||||
|
|
||||||
stream
|
|
||||||
}
|
|
||||||
|
|
||||||
#[rmc_proto(1)]
|
|
||||||
pub trait RmcTestProto{
|
|
||||||
#[method_id(1)]
|
|
||||||
async fn test(&self) -> Result<String, ErrorCode>;
|
|
||||||
}
|
|
||||||
|
|
||||||
define_rmc_proto!(
|
|
||||||
proto TestProto{
|
|
||||||
RmcTestProto
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
#[rmc_struct(TestProto)]
|
|
||||||
pub struct TestStruct;
|
|
||||||
|
|
||||||
impl RmcTestProto for TestStruct{
|
|
||||||
async fn test(&self) -> Result<String, ErrorCode> {
|
|
||||||
Ok("heya".into())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
pub struct WebStreamSocket<T: AsyncRead + AsyncWrite + Unpin> {
|
|
||||||
socket: WebSocketStream<T>,
|
|
||||||
incoming_buffer: Vec<u8>,
|
|
||||||
finished_reading: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: AsyncRead + AsyncWrite + Unpin> WebStreamSocket<T> {
|
|
||||||
pub fn new(socket: WebSocketStream<T>) -> Self{
|
|
||||||
Self{
|
|
||||||
incoming_buffer: Default::default(),
|
|
||||||
socket,
|
|
||||||
finished_reading: false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: AsyncRead + AsyncWrite + Unpin> AsyncWrite for WebStreamSocket<T> {
|
|
||||||
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, Error>> {
|
|
||||||
let this = &mut self.get_mut().socket;
|
|
||||||
|
|
||||||
let msg = Message::binary(buf.to_vec());
|
|
||||||
|
|
||||||
match this.poll_ready_unpin(cx) {
|
|
||||||
Poll::Pending => return Poll::Pending,
|
|
||||||
Poll::Ready(Err(e)) => return Poll::Ready(Err(Error::new(ErrorKind::Other, e))),
|
|
||||||
Poll::Ready(Ok(())) => {
|
|
||||||
// continue on
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let Err(e) = this.start_send_unpin(msg) else {
|
|
||||||
return Poll::Ready(Ok(buf.len()));
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
Poll::Ready(Err(Error::new(ErrorKind::Other, e)))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
|
|
||||||
let this = &mut self.get_mut().socket;
|
|
||||||
|
|
||||||
match this.poll_flush_unpin(cx) {
|
|
||||||
Poll::Pending => Poll::Pending,
|
|
||||||
Poll::Ready(Err(e)) => Poll::Ready(Err(Error::new(ErrorKind::Other, e))),
|
|
||||||
Poll::Ready(Ok(())) => Poll::Ready(Ok(()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
|
|
||||||
let this = &mut self.get_mut().socket;
|
|
||||||
|
|
||||||
match this.poll_close_unpin(cx) {
|
|
||||||
Poll::Pending => Poll::Pending,
|
|
||||||
Poll::Ready(Err(e)) => Poll::Ready(Err(Error::new(ErrorKind::Other, e))),
|
|
||||||
Poll::Ready(Ok(())) => Poll::Ready(Ok(()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: AsyncRead + AsyncWrite + Unpin> AsyncRead for WebStreamSocket<T> {
|
|
||||||
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
|
|
||||||
let Self {
|
|
||||||
incoming_buffer,
|
|
||||||
socket,
|
|
||||||
finished_reading
|
|
||||||
} = &mut self.get_mut();
|
|
||||||
|
|
||||||
if !*finished_reading {
|
|
||||||
match socket.poll_next_unpin(cx) {
|
|
||||||
Poll::Ready(Some(Ok(msg))) => {
|
|
||||||
let Message::Binary(data) = msg else {
|
|
||||||
return Poll::Ready(Err(Error::new(ErrorKind::InvalidData, "got non binary data when trying to emulate stream")));
|
|
||||||
};
|
|
||||||
|
|
||||||
incoming_buffer.extend_from_slice(&data);
|
|
||||||
}
|
|
||||||
Poll::Ready(Some(Err(e))) if incoming_buffer.is_empty() => {
|
|
||||||
return Poll::Ready(Err(Error::new(ErrorKind::Other, e)));
|
|
||||||
}
|
|
||||||
Poll::Ready(None) if incoming_buffer.is_empty() => {
|
|
||||||
*finished_reading = true;
|
|
||||||
}
|
|
||||||
Poll::Pending if incoming_buffer.is_empty() => {
|
|
||||||
return Poll::Pending
|
|
||||||
}
|
|
||||||
_ => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if !incoming_buffer.is_empty(){
|
|
||||||
let read_ammount = buf.remaining();
|
|
||||||
|
|
||||||
let ammount_taken = read_ammount.min(incoming_buffer.len());
|
|
||||||
|
|
||||||
buf.put_slice(&incoming_buffer[0..ammount_taken]);
|
|
||||||
|
|
||||||
*incoming_buffer = (&incoming_buffer.get(ammount_taken..).unwrap_or(&[])).to_vec();
|
|
||||||
}
|
|
||||||
|
|
||||||
Poll::Ready(Ok(()))
|
|
||||||
|
|
||||||
|
|
||||||
/*if buf.remaining() == 0{
|
|
||||||
|
|
||||||
|
|
||||||
return Poll::Ready(Ok(()));
|
|
||||||
}
|
|
||||||
|
|
||||||
match socket.poll_next_unpin(cx) {
|
|
||||||
Poll::Ready(Some(Ok(msg))) => {
|
|
||||||
let Message::Binary(data) = msg else {
|
|
||||||
return Poll::Ready(Err(Error::new(ErrorKind::InvalidData, "got non binary data when trying to emulate stream")));
|
|
||||||
};
|
|
||||||
|
|
||||||
if data.len() <= buf.remaining() {
|
|
||||||
// if no data remains there is no reason to store anything
|
|
||||||
buf.put_slice(&data);
|
|
||||||
} else {
|
|
||||||
let read_ammount = buf.remaining();
|
|
||||||
|
|
||||||
let ammount_taken = read_ammount.min(data.len());
|
|
||||||
|
|
||||||
buf.put_slice(&data[..ammount_taken]);
|
|
||||||
|
|
||||||
*incoming_buffer = data[ammount_taken..].to_vec();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
Poll::Ready(Ok(()))
|
|
||||||
}
|
|
||||||
Poll::Ready(Some(Err(e))) => Poll::Ready(Err(Error::new(ErrorKind::Other, e))),
|
|
||||||
// EOF
|
|
||||||
Poll::Ready(None) => Poll::Ready(Ok(())),
|
|
||||||
Poll::Pending => Poll::Pending
|
|
||||||
}*/
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Error, Debug)]
|
|
||||||
pub enum ConnectError{
|
|
||||||
#[error(transparent)]
|
|
||||||
Tungstenite(#[from] tungstenite::error::Error),
|
|
||||||
#[error(transparent)]
|
|
||||||
DataSendError(#[from] io::Error),
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn tls_connect_to(url: &str) -> Result<TlsStream<WebStreamSocket<MaybeTlsStream<TcpStream>>>, ConnectError>{
|
|
||||||
let (stream, _)= connect_async(format!("ws://{}/", url)).await?;
|
|
||||||
|
|
||||||
let webstreamsocket = WebStreamSocket::new(stream);
|
|
||||||
|
|
||||||
let connector = get_configured_tls_connector().await;
|
|
||||||
|
|
||||||
let connection = connector.connect(ServerName::try_from(url.to_string()).unwrap(), webstreamsocket).await?;
|
|
||||||
|
|
||||||
Ok(connection)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn rmc_connect_to<T: RmcCallable + Sync + Send + 'static, U: RmcSerialize, F>(url: &str, init_data: U, create_func: F) -> Result<Arc<T>, ConnectError>
|
|
||||||
where
|
|
||||||
F: FnOnce(RmcConnection) -> Arc<T>{
|
|
||||||
let mut connection = tls_connect_to(url).await?;
|
|
||||||
|
|
||||||
connection.send_buffer(&init_data.to_data()).await?;
|
|
||||||
|
|
||||||
let rmc = new_rmc_gateway_connection(connection.into(), create_func);
|
|
||||||
|
|
||||||
Ok(rmc)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test(){
|
|
||||||
setup();
|
|
||||||
|
|
||||||
let socket = connect_async("ws://192.168.178.120:12345/").await;
|
|
||||||
let (stream, resp) = socket.unwrap();
|
|
||||||
|
|
||||||
let mut webstreamsocket = WebStreamSocket::new(stream);
|
|
||||||
|
|
||||||
let connector = get_configured_tls_connector().await;
|
|
||||||
|
|
||||||
let connection = connector.connect(ServerName::try_from("agmp-tv.spfn.net").unwrap(), webstreamsocket).await.unwrap();
|
|
||||||
|
|
||||||
let rmc = new_rmc_gateway_connection(connection.into(), |r| {
|
|
||||||
Arc::new(OnlyRemote::<RemoteTestProto>::new(r))
|
|
||||||
});
|
|
||||||
|
|
||||||
println!("{:?}", rmc.test().await);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_server(){
|
|
||||||
setup();
|
|
||||||
|
|
||||||
let socket = TcpListener::bind("192.168.178.120:12345").await.unwrap();
|
|
||||||
|
|
||||||
let acceptor = get_configured_tls_acceptor().await;
|
|
||||||
|
|
||||||
while let Ok((stream, _sock_addr)) = socket.accept().await{
|
|
||||||
let websocket = tokio_tungstenite::accept_async(stream).await.unwrap();
|
|
||||||
|
|
||||||
let webstreamsocket = WebStreamSocket::new(websocket);
|
|
||||||
|
|
||||||
let stream = acceptor.accept(webstreamsocket).await.unwrap();
|
|
||||||
|
|
||||||
new_rmc_gateway_connection(stream.into(), |_| {
|
|
||||||
Arc::new(
|
|
||||||
TestStruct
|
|
||||||
)
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#[rmc_proto(1)]
|
#[rmc_proto(1)]
|
||||||
pub trait ProxyManagement {
|
pub trait EdgeNodeManagement {
|
||||||
#[method_id(1)]
|
#[method_id(1)]
|
||||||
async fn update_url(&self, url: String) -> Result<(), ErrorCode>;
|
async fn get_url(&self, seed: u64) -> Result<String, ErrorCode>;
|
||||||
}
|
}
|
||||||
|
|
||||||
define_rmc_proto!(
|
define_rmc_proto!(
|
||||||
proto Proxy{
|
proto EdgeNodeHolder{
|
||||||
ProxyManagement
|
EdgeNodeManagement
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
#[rmc_proto(2)]
|
#[derive(RmcSerialize, Debug)]
|
||||||
pub trait ControllerManagement {
|
|
||||||
#[method_id(1)]
|
|
||||||
async fn get_secure_proxy_url(&self) -> Result<String, ErrorCode>;
|
|
||||||
|
|
||||||
#[method_id(2)]
|
|
||||||
async fn get_secure_account(&self) -> Result<Account, ErrorCode>;
|
|
||||||
}
|
|
||||||
|
|
||||||
define_rmc_proto!(
|
|
||||||
proto Controller{
|
|
||||||
ControllerManagement
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
#[derive(RmcSerialize)]
|
|
||||||
#[repr(u32)]
|
#[repr(u32)]
|
||||||
pub enum ServerCluster{
|
pub enum EdgeNodeHolderConnectOption{
|
||||||
Auth = 0,
|
DontRegister = 0,
|
||||||
Secure = 1
|
Register(String) = 1
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(RmcSerialize)]
|
|
||||||
#[repr(u32)]
|
|
||||||
pub enum ServerType{
|
|
||||||
Proxy{
|
|
||||||
addr: SocketAddrV4,
|
|
||||||
cluster: ServerCluster
|
|
||||||
} = 1,
|
|
||||||
Backend{
|
|
||||||
name: String,
|
|
||||||
cluster: ServerCluster
|
|
||||||
} = 2,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue