From b95656325509849a3453d0786040ca3de31aca6b Mon Sep 17 00:00:00 2001 From: Maple Date: Sat, 31 Jan 2026 20:57:19 +0100 Subject: [PATCH] add disconnection --- prudpv0/src/packet.rs | 37 ++++++++++++++++++++++++++- prudpv0/src/server.rs | 58 ++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 91 insertions(+), 4 deletions(-) diff --git a/prudpv0/src/packet.rs b/prudpv0/src/packet.rs index f55ff18..aa68194 100644 --- a/prudpv0/src/packet.rs +++ b/prudpv0/src/packet.rs @@ -6,7 +6,7 @@ use rnex_core::prudp::{ types_flags::{ self, TypesFlags, flags::{HAS_SIZE, NEED_ACK}, - types::{CONNECT, DATA, PING, SYN}, + types::{CONNECT, DATA, DISCONNECT, PING, SYN}, }, virtual_port::VirtualPort, }; @@ -347,6 +347,41 @@ pub fn new_ping_packet( .expect("packet malformed in creation"), ); + packet.0 +} +pub fn new_disconnect_packet( + flags: u16, + source: VirtualPort, + destination: VirtualPort, + sequence_id: u16, + session_id: u8, + crypto_instance: &mut impl CryptoInstance, + crypto: &impl Crypto, +) -> Vec { + let type_flags = TypesFlags::default().types(DISCONNECT).flags(flags); + + let vec = vec![0; precalc_size(type_flags, 0)]; + let mut packet = PRUDPV0Packet::new(vec); + + let packet_signature = crypto_instance.generate_signature(type_flags, &[]); + + let header = packet.header_mut().expect("packet malformed in creation"); + + *header = PRUDPV0Header { + destination, + source, + packet_signature, + sequence_id, + session_id, + type_flags, + }; + + *packet.checksum_mut().expect("packet malformed in creation") = crypto.calculate_checksum( + packet + .checksummed_data() + .expect("packet malformed in creation"), + ); + info!("header: {:?}", packet.header()); packet.0 diff --git a/prudpv0/src/server.rs b/prudpv0/src/server.rs index 3788478..a1e561c 100644 --- a/prudpv0/src/server.rs +++ b/prudpv0/src/server.rs @@ -19,7 +19,7 @@ use rnex_core::{ types_flags::{ TypesFlags, flags::{ACK, HAS_SIZE, NEED_ACK, RELIABLE}, - types::{CONNECT, DATA, PING, SYN}, + types::{CONNECT, DATA, DISCONNECT, PING, SYN}, }, virtual_port::VirtualPort, }, @@ -36,8 +36,8 @@ use tokio::{ use crate::{ crypto::{Crypto, CryptoInstance}, packet::{ - PRUDPV0Header, PRUDPV0Packet, new_connect_packet, new_data_packet, new_ping_packet, - new_syn_packet, precalc_size, + PRUDPV0Header, PRUDPV0Packet, new_connect_packet, new_data_packet, new_disconnect_packet, + new_ping_packet, new_syn_packet, precalc_size, }, }; @@ -170,6 +170,7 @@ impl Server { sleep(Duration::from_secs(3)); let mut inner = conn.inner.lock().await; if (Instant::now() - inner.last_action).as_secs() > 5 { + warn!("connection exceeded silence limit, sending ping"); let packet = new_ping_packet( NEED_ACK, self.param.virtual_port, @@ -186,6 +187,27 @@ impl Server { } if (Instant::now() - conn.inner.lock().await.last_action).as_secs() > 15 { + warn!("client timed out..."); + + let packet = new_disconnect_packet( + 0, + self.param.virtual_port, + conn.addr.virtual_port, + 0, + conn.session_id, + &mut inner.crypto_instance, + &self.crypto, + ); + + self.socket + .send_to(&packet, conn.addr.regular_socket_addr) + .await; + self.socket + .send_to(&packet, conn.addr.regular_socket_addr) + .await; + self.socket + .send_to(&packet, conn.addr.regular_socket_addr) + .await; let mut conns = self.connections.write().await; conns.remove(&conn.addr); drop(conns); @@ -358,7 +380,34 @@ impl Server { self.socket.send_to(&packet, addr.regular_socket_addr).await; } + async fn handle_disconnect( + self: Arc, + mut packet: PRUDPV0Packet>, + addr: PRUDPSockAddr, + ) { + let header = packet.header().unwrap(); + let Some(conn) = self.get_connection(addr).await else { + warn!("ping on inactive connection: {:?}", addr); + return; + }; + let mut inner = conn.inner.lock().await; + let packet = new_disconnect_packet( + ACK, + self.param.virtual_port, + addr.virtual_port, + header.sequence_id, + header.session_id, + &mut inner.crypto_instance, + &self.crypto, + ); + drop(inner); + drop(conn); + + self.socket.send_to(&packet, addr.regular_socket_addr).await; + self.socket.send_to(&packet, addr.regular_socket_addr).await; + self.socket.send_to(&packet, addr.regular_socket_addr).await; + } async fn get_connection(&self, addr: PRUDPSockAddr) -> Option>> { let rd = self.connections.read().await; let res = rd.get(&addr).cloned(); @@ -402,6 +451,9 @@ impl Server { PING => { self.handle_ping(packet, addr).await; } + DISCONNECT => { + self.handle_disconnect(packet, addr).await; + } v => { println!("unimplemented packed type: {}", v); }