actually implement edge node holder server
This commit is contained in:
parent
11b0393e6a
commit
a9fffa8d2d
5 changed files with 99 additions and 5 deletions
|
|
@ -1,6 +1,92 @@
|
|||
use std::io::Cursor;
|
||||
use std::net::SocketAddrV4;
|
||||
use std::sync::{Arc, Weak};
|
||||
use log::error;
|
||||
use macros::rmc_struct;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::RwLock;
|
||||
use rust_nex::common::setup;
|
||||
use rust_nex::executables::common::{OWN_IP_PRIVATE, SERVER_PORT};
|
||||
use rust_nex::reggie::{EdgeNodeHolderConnectOption, EdgeNodeManagement, LocalEdgeNodeHolder};
|
||||
use rust_nex::rmc::protocols::new_rmc_gateway_connection;
|
||||
use rust_nex::rmc::response::ErrorCode;
|
||||
use rust_nex::util::SplittableBufferConnection;
|
||||
use rust_nex::rmc::structures::RmcSerialize;
|
||||
|
||||
#[rmc_struct(EdgeNodeHolder)]
|
||||
struct EdgeNode{
|
||||
data_holder: Arc<DataHolder>,
|
||||
address: SocketAddrV4
|
||||
}
|
||||
|
||||
impl EdgeNodeManagement for EdgeNode{
|
||||
async fn get_url(&self, seed: u64) -> Result<SocketAddrV4, ErrorCode> {
|
||||
self.data_holder.get_url(seed).await
|
||||
}
|
||||
}
|
||||
|
||||
#[rmc_struct(EdgeNodeHolder)]
|
||||
#[derive(Default)]
|
||||
struct DataHolder{
|
||||
edge_nodes: RwLock<Vec<Weak<EdgeNode>>>
|
||||
}
|
||||
|
||||
impl EdgeNodeManagement for DataHolder{
|
||||
async fn get_url(&self, seed: u64) -> Result<SocketAddrV4, ErrorCode> {
|
||||
let nodes = self.edge_nodes.read().await;
|
||||
|
||||
let nodes: Vec<_> = nodes.iter().filter_map(|n| n.upgrade()).collect();
|
||||
|
||||
// avoid a devide by zero
|
||||
if nodes.len() == 0{
|
||||
return Err(ErrorCode::Core_InvalidIndex);
|
||||
};
|
||||
|
||||
let node = &nodes[seed as usize % nodes.len()];
|
||||
|
||||
Ok(node.address)
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
setup();
|
||||
|
||||
let listen = TcpListener::bind(SocketAddrV4::new(*OWN_IP_PRIVATE, *SERVER_PORT)).await.unwrap();
|
||||
|
||||
let holder: Arc<DataHolder> = Default::default();
|
||||
|
||||
while let Ok((mut stream, addr)) = listen.accept().await {
|
||||
let mut conn: SplittableBufferConnection = stream.into();
|
||||
|
||||
let Some(data) = conn.recv().await else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let Ok(data) = EdgeNodeHolderConnectOption::deserialize(&mut Cursor::new(data)) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let holder = holder.clone();
|
||||
|
||||
match data{
|
||||
EdgeNodeHolderConnectOption::DontRegister => {
|
||||
|
||||
new_rmc_gateway_connection(conn, |_| holder);
|
||||
},
|
||||
EdgeNodeHolderConnectOption::Register(address) => {
|
||||
let edge_node = EdgeNode{
|
||||
address,
|
||||
data_holder: holder.clone()
|
||||
};
|
||||
|
||||
let node = new_rmc_gateway_connection(conn, move |_| Arc::new(edge_node));
|
||||
|
||||
let mut nodes = holder.edge_nodes.write().await;
|
||||
nodes.push(Arc::downgrade(&node));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -47,7 +47,7 @@ async fn main() {
|
|||
|
||||
let conn: SplittableBufferConnection = conn.into();
|
||||
|
||||
conn.send(Register(SocketAddrV4::new(*OWN_IP_PUBLIC, *SERVER_PORT).to_string()).to_data()).await;
|
||||
conn.send(Register(SocketAddrV4::new(*OWN_IP_PUBLIC, *SERVER_PORT)).to_data()).await;
|
||||
|
||||
let conn = new_rmc_gateway_connection(conn, |r| Arc::new(OnlyRemote::<RemoteEdgeNodeHolder>::new(r)));
|
||||
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ async fn main() {
|
|||
|
||||
let conn: SplittableBufferConnection = conn.into();
|
||||
|
||||
conn.send(Register(SocketAddrV4::new(*OWN_IP_PUBLIC, *SERVER_PORT).to_string()).to_data()).await;
|
||||
conn.send(Register(SocketAddrV4::new(*OWN_IP_PUBLIC, *SERVER_PORT)).to_data()).await;
|
||||
|
||||
let conn = new_rmc_gateway_connection(conn, |r| Arc::new(OnlyRemote::<RemoteEdgeNodeHolder>::new(r)));
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
use std::hash::{DefaultHasher, Hasher};
|
||||
use std::net::SocketAddrV4;
|
||||
use std::sync::Arc;
|
||||
use crate::grpc::account;
|
||||
use crate::kerberos::{derive_key, KerberosDateTime, Ticket};
|
||||
|
|
@ -59,6 +60,13 @@ async fn get_login_data_by_pid(pid: u32) -> Option<(u32, [u8; 16])> {
|
|||
Some((pid, passwd))
|
||||
}
|
||||
|
||||
fn station_url_from_sock_addr(sock_addr: SocketAddrV4) -> String{
|
||||
format!(
|
||||
"prudps:/PID=2;sid=1;stream=10;type=2;address={};port={};CID=1",
|
||||
sock_addr.ip(), sock_addr.port()
|
||||
)
|
||||
}
|
||||
|
||||
impl Auth for AuthHandler {
|
||||
async fn login(&self, _name: String) -> Result<(), ErrorCode> {
|
||||
todo!()
|
||||
|
|
@ -97,7 +105,7 @@ impl Auth for AuthHandler {
|
|||
};
|
||||
|
||||
let connection_data = ConnectionData {
|
||||
station_url: addr,
|
||||
station_url: station_url_from_sock_addr(addr),
|
||||
special_station_url: "".to_string(),
|
||||
//date_time: KerberosDateTime::new(1,1,1,1,1,1),
|
||||
date_time: KerberosDateTime::now(),
|
||||
|
|
|
|||
|
|
@ -65,7 +65,7 @@ impl<T: AsyncWrite + Unpin> UnitPacketWrite for T{}
|
|||
#[rmc_proto(1)]
|
||||
pub trait EdgeNodeManagement {
|
||||
#[method_id(1)]
|
||||
async fn get_url(&self, seed: u64) -> Result<String, ErrorCode>;
|
||||
async fn get_url(&self, seed: u64) -> Result<SocketAddrV4, ErrorCode>;
|
||||
}
|
||||
|
||||
define_rmc_proto!(
|
||||
|
|
@ -78,5 +78,5 @@ define_rmc_proto!(
|
|||
#[repr(u32)]
|
||||
pub enum EdgeNodeHolderConnectOption{
|
||||
DontRegister = 0,
|
||||
Register(String) = 1
|
||||
Register(SocketAddrV4) = 1
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue