From a9fffa8d2d9275da22b0fb2cbe9f406c11035739 Mon Sep 17 00:00:00 2001 From: Maple Date: Wed, 30 Jul 2025 22:15:31 +0200 Subject: [PATCH] actually implement edge node holder server --- src/executables/edge_node_holder_server.rs | 86 ++++++++++++++++++++++ src/executables/proxy_insecure.rs | 2 +- src/executables/proxy_secure.rs | 2 +- src/nex/auth_handler.rs | 10 ++- src/reggie.rs | 4 +- 5 files changed, 99 insertions(+), 5 deletions(-) diff --git a/src/executables/edge_node_holder_server.rs b/src/executables/edge_node_holder_server.rs index f9ea39c..021dbb1 100644 --- a/src/executables/edge_node_holder_server.rs +++ b/src/executables/edge_node_holder_server.rs @@ -1,6 +1,92 @@ +use std::io::Cursor; +use std::net::SocketAddrV4; +use std::sync::{Arc, Weak}; +use log::error; +use macros::rmc_struct; +use tokio::net::TcpListener; +use tokio::sync::RwLock; use rust_nex::common::setup; +use rust_nex::executables::common::{OWN_IP_PRIVATE, SERVER_PORT}; +use rust_nex::reggie::{EdgeNodeHolderConnectOption, EdgeNodeManagement, LocalEdgeNodeHolder}; +use rust_nex::rmc::protocols::new_rmc_gateway_connection; +use rust_nex::rmc::response::ErrorCode; +use rust_nex::util::SplittableBufferConnection; +use rust_nex::rmc::structures::RmcSerialize; + +#[rmc_struct(EdgeNodeHolder)] +struct EdgeNode{ + data_holder: Arc, + address: SocketAddrV4 +} + +impl EdgeNodeManagement for EdgeNode{ + async fn get_url(&self, seed: u64) -> Result { + self.data_holder.get_url(seed).await + } +} + +#[rmc_struct(EdgeNodeHolder)] +#[derive(Default)] +struct DataHolder{ + edge_nodes: RwLock>> +} + +impl EdgeNodeManagement for DataHolder{ + async fn get_url(&self, seed: u64) -> Result { + let nodes = self.edge_nodes.read().await; + + let nodes: Vec<_> = nodes.iter().filter_map(|n| n.upgrade()).collect(); + + // avoid a devide by zero + if nodes.len() == 0{ + return Err(ErrorCode::Core_InvalidIndex); + }; + + let node = &nodes[seed as usize % nodes.len()]; + + Ok(node.address) + } +} #[tokio::main] async fn main() { setup(); + + let listen = TcpListener::bind(SocketAddrV4::new(*OWN_IP_PRIVATE, *SERVER_PORT)).await.unwrap(); + + let holder: Arc = Default::default(); + + while let Ok((mut stream, addr)) = listen.accept().await { + let mut conn: SplittableBufferConnection = stream.into(); + + let Some(data) = conn.recv().await else { + continue; + }; + + let Ok(data) = EdgeNodeHolderConnectOption::deserialize(&mut Cursor::new(data)) else { + continue; + }; + + let holder = holder.clone(); + + match data{ + EdgeNodeHolderConnectOption::DontRegister => { + + new_rmc_gateway_connection(conn, |_| holder); + }, + EdgeNodeHolderConnectOption::Register(address) => { + let edge_node = EdgeNode{ + address, + data_holder: holder.clone() + }; + + let node = new_rmc_gateway_connection(conn, move |_| Arc::new(edge_node)); + + let mut nodes = holder.edge_nodes.write().await; + nodes.push(Arc::downgrade(&node)); + } + } + + + } } \ No newline at end of file diff --git a/src/executables/proxy_insecure.rs b/src/executables/proxy_insecure.rs index e3ba789..1232dc3 100644 --- a/src/executables/proxy_insecure.rs +++ b/src/executables/proxy_insecure.rs @@ -47,7 +47,7 @@ async fn main() { let conn: SplittableBufferConnection = conn.into(); - conn.send(Register(SocketAddrV4::new(*OWN_IP_PUBLIC, *SERVER_PORT).to_string()).to_data()).await; + conn.send(Register(SocketAddrV4::new(*OWN_IP_PUBLIC, *SERVER_PORT)).to_data()).await; let conn = new_rmc_gateway_connection(conn, |r| Arc::new(OnlyRemote::::new(r))); diff --git a/src/executables/proxy_secure.rs b/src/executables/proxy_secure.rs index 115e8ee..33e07fb 100644 --- a/src/executables/proxy_secure.rs +++ b/src/executables/proxy_secure.rs @@ -33,7 +33,7 @@ async fn main() { let conn: SplittableBufferConnection = conn.into(); - conn.send(Register(SocketAddrV4::new(*OWN_IP_PUBLIC, *SERVER_PORT).to_string()).to_data()).await; + conn.send(Register(SocketAddrV4::new(*OWN_IP_PUBLIC, *SERVER_PORT)).to_data()).await; let conn = new_rmc_gateway_connection(conn, |r| Arc::new(OnlyRemote::::new(r))); diff --git a/src/nex/auth_handler.rs b/src/nex/auth_handler.rs index 6de0385..ca7f970 100644 --- a/src/nex/auth_handler.rs +++ b/src/nex/auth_handler.rs @@ -1,4 +1,5 @@ use std::hash::{DefaultHasher, Hasher}; +use std::net::SocketAddrV4; use std::sync::Arc; use crate::grpc::account; use crate::kerberos::{derive_key, KerberosDateTime, Ticket}; @@ -59,6 +60,13 @@ async fn get_login_data_by_pid(pid: u32) -> Option<(u32, [u8; 16])> { Some((pid, passwd)) } +fn station_url_from_sock_addr(sock_addr: SocketAddrV4) -> String{ + format!( + "prudps:/PID=2;sid=1;stream=10;type=2;address={};port={};CID=1", + sock_addr.ip(), sock_addr.port() + ) +} + impl Auth for AuthHandler { async fn login(&self, _name: String) -> Result<(), ErrorCode> { todo!() @@ -97,7 +105,7 @@ impl Auth for AuthHandler { }; let connection_data = ConnectionData { - station_url: addr, + station_url: station_url_from_sock_addr(addr), special_station_url: "".to_string(), //date_time: KerberosDateTime::new(1,1,1,1,1,1), date_time: KerberosDateTime::now(), diff --git a/src/reggie.rs b/src/reggie.rs index 1215307..f57ebb0 100644 --- a/src/reggie.rs +++ b/src/reggie.rs @@ -65,7 +65,7 @@ impl UnitPacketWrite for T{} #[rmc_proto(1)] pub trait EdgeNodeManagement { #[method_id(1)] - async fn get_url(&self, seed: u64) -> Result; + async fn get_url(&self, seed: u64) -> Result; } define_rmc_proto!( @@ -78,5 +78,5 @@ define_rmc_proto!( #[repr(u32)] pub enum EdgeNodeHolderConnectOption{ DontRegister = 0, - Register(String) = 1 + Register(SocketAddrV4) = 1 }