diff --git a/proxy-common/src/lib.rs b/proxy-common/src/lib.rs index fae29df..93c16c8 100644 --- a/proxy-common/src/lib.rs +++ b/proxy-common/src/lib.rs @@ -161,6 +161,8 @@ pub async fn setup_edge_node_connection( .unwrap(), ) .await; + + println!("{:?}", param.self_public); //leave the inner object floating so that it gets destroyed once we disconnect new_rmc_gateway_connection(conn, move |r| { Arc::new(OnRemoteDrop::::new( diff --git a/prudpv0/src/lib.rs b/prudpv0/src/lib.rs index 0df222a..444899a 100644 --- a/prudpv0/src/lib.rs +++ b/prudpv0/src/lib.rs @@ -44,8 +44,6 @@ cfg_if::cfg_if! { //implementations, e.g. secure and insecure(this also includes special cases like friends) async fn start_proxy(param: ProxyStartupParam) { - setup_edge_node_connection(¶m, || abort()); - info!("creating cryptography instance"); let mut crypto = Arc::new(T::new()); info!("binding to socket"); diff --git a/prudpv0/src/packet.rs b/prudpv0/src/packet.rs index d7b6705..f55ff18 100644 --- a/prudpv0/src/packet.rs +++ b/prudpv0/src/packet.rs @@ -6,7 +6,7 @@ use rnex_core::prudp::{ types_flags::{ self, TypesFlags, flags::{HAS_SIZE, NEED_ACK}, - types::{CONNECT, DATA, SYN}, + types::{CONNECT, DATA, PING, SYN}, }, virtual_port::VirtualPort, }; @@ -313,3 +313,41 @@ pub fn new_data_packet( packet.0 } + +pub fn new_ping_packet( + flags: u16, + source: VirtualPort, + destination: VirtualPort, + sequence_id: u16, + session_id: u8, + crypto_instance: &mut impl CryptoInstance, + crypto: &impl Crypto, +) -> Vec { + let type_flags = TypesFlags::default().types(PING).flags(flags); + + let vec = vec![0; precalc_size(type_flags, 0)]; + let mut packet = PRUDPV0Packet::new(vec); + + let packet_signature = crypto_instance.generate_signature(type_flags, &[]); + + let header = packet.header_mut().expect("packet malformed in creation"); + + *header = PRUDPV0Header { + destination, + source, + packet_signature, + sequence_id, + session_id, + type_flags, + }; + + *packet.checksum_mut().expect("packet malformed in creation") = crypto.calculate_checksum( + packet + .checksummed_data() + .expect("packet malformed in creation"), + ); + + info!("header: {:?}", packet.header()); + + packet.0 +} diff --git a/prudpv0/src/server.rs b/prudpv0/src/server.rs index bd4b492..87e5fbd 100644 --- a/prudpv0/src/server.rs +++ b/prudpv0/src/server.rs @@ -19,7 +19,7 @@ use rnex_core::{ types_flags::{ TypesFlags, flags::{ACK, HAS_SIZE, NEED_ACK, RELIABLE}, - types::{CONNECT, DATA, SYN}, + types::{CONNECT, DATA, PING, SYN}, }, virtual_port::VirtualPort, }, @@ -36,8 +36,8 @@ use tokio::{ use crate::{ crypto::{Crypto, CryptoInstance}, packet::{ - PRUDPV0Header, PRUDPV0Packet, new_connect_packet, new_data_packet, new_syn_packet, - precalc_size, + PRUDPV0Header, PRUDPV0Packet, new_connect_packet, new_data_packet, new_ping_packet, + new_syn_packet, precalc_size, }, }; @@ -167,7 +167,31 @@ impl Server { } async fn timeout_thread(self: Arc, conn: Arc>) { loop { - sleep(Duration::from_secs(5)); + sleep(Duration::from_secs(3)); + let mut inner = conn.inner.lock().await; + if (Instant::now() - inner.last_action).as_secs() > 5 { + let packet = new_ping_packet( + NEED_ACK, + self.param.virtual_port, + conn.addr.virtual_port, + 0, + conn.session_id, + &mut inner.crypto_instance, + &self.crypto, + ); + + self.socket + .send_to(&packet, conn.addr.regular_socket_addr) + .await; + } + + if (Instant::now() - conn.inner.lock().await.last_action).as_secs() > 15 { + let mut conns = self.connections.write().await; + conns.remove(&conn.addr); + drop(conns); + } + + drop(inner); } } async fn handle_syn(self: Arc, packet: PRUDPV0Packet>, addr: PRUDPSockAddr) { @@ -311,11 +335,38 @@ impl Server { } drop(conn); } - async fn process_packet<'a>( - self: Arc, - packet: PRUDPV0Packet>, - addr: SocketAddrV4, - ) { + + async fn handle_ping(self: Arc, mut packet: PRUDPV0Packet>, addr: PRUDPSockAddr) { + let header = packet.header().unwrap(); + + let Some(conn) = self.get_connection(addr).await else { + warn!("ping on inactive connection: {:?}", addr); + return; + }; + let mut inner = conn.inner.lock().await; + let packet = new_ping_packet( + ACK, + self.param.virtual_port, + addr.virtual_port, + header.sequence_id, + header.session_id, + &mut inner.crypto_instance, + &self.crypto, + ); + drop(inner); + drop(conn); + + self.socket.send_to(&packet, addr.regular_socket_addr).await; + } + + async fn get_connection(&self, addr: PRUDPSockAddr) -> Option>> { + let rd = self.connections.read().await; + let res = rd.get(&addr).cloned(); + drop(rd); + res + } + + async fn process_packet(self: Arc, packet: PRUDPV0Packet>, addr: SocketAddrV4) { if !packet.check_checksum(&self.crypto) { warn!("invalid checksum from: {}", addr); return; @@ -332,7 +383,11 @@ impl Server { info!("got ack(acks are ignored for now)"); return; } - + if let Some(conn) = self.get_connection(addr).await { + let mut inner = conn.inner.lock().await; + inner.last_action = Instant::now(); + drop(inner); + }; println!("{:?}", header); match header.type_flags.get_types() { SYN => { @@ -344,6 +399,9 @@ impl Server { DATA => { self.handle_data(packet, addr).await; } + PING => { + self.handle_ping(packet, addr).await; + } v => { println!("unimplemented packed type: {}", v); } @@ -376,7 +434,7 @@ impl Server { } } pub async fn new(param: ProxyStartupParam) -> Self { - let socket = UdpSocket::bind(SocketAddrV4::new(*OWN_IP_PRIVATE, *SERVER_PORT)) + let socket = UdpSocket::bind(param.self_private) .await .expect("unable to bind socket"); Self {