feat: implement timeouts
This commit is contained in:
parent
8f40e95480
commit
e7d0a17500
6 changed files with 248 additions and 124 deletions
|
|
@ -409,6 +409,10 @@ impl Matchmake for User {
|
||||||
|
|
||||||
println!("{:?}", urls);
|
println!("{:?}", urls);
|
||||||
|
|
||||||
|
if urls.is_empty(){
|
||||||
|
return Err(ErrorCode::RendezVous_NotParticipatedGathering)
|
||||||
|
}
|
||||||
|
|
||||||
Ok(urls)
|
Ok(urls)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -44,26 +44,27 @@ pub type Result<T> = std::result::Result<T, Error>;
|
||||||
pub struct TypesFlags(u16);
|
pub struct TypesFlags(u16);
|
||||||
|
|
||||||
impl TypesFlags {
|
impl TypesFlags {
|
||||||
|
#[inline]
|
||||||
pub const fn get_types(self) -> u8 {
|
pub const fn get_types(self) -> u8 {
|
||||||
(self.0 & 0x000F) as u8
|
(self.0 & 0x000F) as u8
|
||||||
}
|
}
|
||||||
|
#[inline]
|
||||||
pub const fn get_flags(self) -> u16 {
|
pub const fn get_flags(self) -> u16 {
|
||||||
(self.0 & 0xFFF0) >> 4
|
(self.0 & 0xFFF0) >> 4
|
||||||
}
|
}
|
||||||
|
#[inline]
|
||||||
pub const fn types(self, val: u8) -> Self {
|
pub const fn types(self, val: u8) -> Self {
|
||||||
Self((self.0 & 0xFFF0) | (val as u16 & 0x000F))
|
Self((self.0 & 0xFFF0) | (val as u16 & 0x000F))
|
||||||
}
|
}
|
||||||
|
#[inline]
|
||||||
pub const fn flags(self, val: u16) -> Self {
|
pub const fn flags(self, val: u16) -> Self {
|
||||||
Self((self.0 & 0x000F) | ((val << 4) & 0xFFF0))
|
Self((self.0 & 0x000F) | ((val << 4) & 0xFFF0))
|
||||||
}
|
}
|
||||||
|
#[inline]
|
||||||
pub const fn set_flag(&mut self, val: u16){
|
pub const fn set_flag(&mut self, val: u16){
|
||||||
self.0 |= (val & 0xFFF) << 4;
|
self.0 |= (val & 0xFFF) << 4;
|
||||||
}
|
}
|
||||||
|
#[inline]
|
||||||
pub const fn set_types(&mut self, val: u8){
|
pub const fn set_types(&mut self, val: u8){
|
||||||
self.0 |= val as u16 & 0x0F;
|
self.0 |= val as u16 & 0x0F;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -64,7 +64,7 @@ impl Router {
|
||||||
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
endpoint.recieve_packet(connection, packet).await
|
endpoint.receive_packet(connection, packet).await
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,14 @@
|
||||||
|
use crate::nex::account::Account;
|
||||||
use crate::prudp::packet::flags::{ACK, HAS_SIZE, MULTI_ACK, NEED_ACK, RELIABLE};
|
use crate::prudp::packet::flags::{ACK, HAS_SIZE, MULTI_ACK, NEED_ACK, RELIABLE};
|
||||||
use crate::prudp::packet::types::{CONNECT, DATA, DISCONNECT, PING, SYN};
|
use crate::prudp::packet::types::{CONNECT, DATA, DISCONNECT, PING, SYN};
|
||||||
use crate::prudp::packet::PacketOption::{ConnectionSignature, FragmentId, InitialSequenceId, MaximumSubstreamId, SupportedFunctions};
|
use crate::prudp::packet::PacketOption::{
|
||||||
|
ConnectionSignature, FragmentId, InitialSequenceId, MaximumSubstreamId, SupportedFunctions,
|
||||||
|
};
|
||||||
use crate::prudp::packet::{PRUDPHeader, PRUDPPacket, PacketOption, TypesFlags, VirtualPort};
|
use crate::prudp::packet::{PRUDPHeader, PRUDPPacket, PacketOption, TypesFlags, VirtualPort};
|
||||||
use crate::prudp::router::{Error, Router};
|
use crate::prudp::router::{Error, Router};
|
||||||
use crate::prudp::sockaddr::PRUDPSockAddr;
|
use crate::prudp::sockaddr::PRUDPSockAddr;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
use chrono::NaiveTime;
|
||||||
use hmac::digest::consts::U5;
|
use hmac::digest::consts::U5;
|
||||||
use log::info;
|
use log::info;
|
||||||
use log::{error, trace, warn};
|
use log::{error, trace, warn};
|
||||||
|
|
@ -21,17 +25,18 @@ use std::net::SocketAddrV4;
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::{Arc, Weak};
|
use std::sync::{Arc, Weak};
|
||||||
|
|
||||||
|
use std::time::Duration;
|
||||||
use tokio::net::UdpSocket;
|
use tokio::net::UdpSocket;
|
||||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||||
use tokio::sync::{Mutex, RwLock};
|
use tokio::sync::{Mutex, RwLock};
|
||||||
|
use tokio::time::{sleep, Instant};
|
||||||
use tokio_stream::Stream;
|
use tokio_stream::Stream;
|
||||||
use crate::nex::account::Account;
|
|
||||||
// due to the way this is designed crashing the router thread causes deadlock, sorry ;-;
|
// due to the way this is designed crashing the router thread causes deadlock, sorry ;-;
|
||||||
// (maybe i will fix that some day)
|
// (maybe i will fix that some day)
|
||||||
|
|
||||||
/// PRUDP Socket for accepting connections to then send and recieve data from those clients
|
/// PRUDP Socket for accepting connections to then send and recieve data from those clients
|
||||||
|
|
||||||
|
|
||||||
pub struct EncryptionPair<T: StreamCipher + Send> {
|
pub struct EncryptionPair<T: StreamCipher + Send> {
|
||||||
pub send: T,
|
pub send: T,
|
||||||
pub recv: T,
|
pub recv: T,
|
||||||
|
|
@ -46,11 +51,6 @@ impl<T: StreamCipher + Send> EncryptionPair<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct NewEncryptionPair<E: StreamCipher> {
|
|
||||||
pub send: E,
|
|
||||||
pub recv: E,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct CommonConnection {
|
pub struct CommonConnection {
|
||||||
pub user_id: u32,
|
pub user_id: u32,
|
||||||
pub socket_addr: PRUDPSockAddr,
|
pub socket_addr: PRUDPSockAddr,
|
||||||
|
|
@ -60,6 +60,7 @@ pub struct CommonConnection {
|
||||||
|
|
||||||
struct InternalConnection<E: CryptoHandlerConnectionInstance> {
|
struct InternalConnection<E: CryptoHandlerConnectionInstance> {
|
||||||
common: Arc<CommonConnection>,
|
common: Arc<CommonConnection>,
|
||||||
|
connections: Weak<Mutex<BTreeMap<PRUDPSockAddr, Arc<Mutex<InternalConnection<E>>>>>>,
|
||||||
reliable_server_counter: u16,
|
reliable_server_counter: u16,
|
||||||
reliable_client_counter: u16,
|
reliable_client_counter: u16,
|
||||||
// maybe add connection id(need to see if its even needed)
|
// maybe add connection id(need to see if its even needed)
|
||||||
|
|
@ -67,23 +68,40 @@ struct InternalConnection<E: CryptoHandlerConnectionInstance> {
|
||||||
data_sender: Sender<Vec<u8>>,
|
data_sender: Sender<Vec<u8>>,
|
||||||
socket: Arc<UdpSocket>,
|
socket: Arc<UdpSocket>,
|
||||||
packet_queue: HashMap<u16, PRUDPPacket>,
|
packet_queue: HashMap<u16, PRUDPPacket>,
|
||||||
|
last_packet_time: Instant,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<E: CryptoHandlerConnectionInstance> Deref for InternalConnection<E>{
|
impl<E: CryptoHandlerConnectionInstance> Deref for InternalConnection<E> {
|
||||||
type Target = CommonConnection;
|
type Target = CommonConnection;
|
||||||
fn deref(&self) -> &Self::Target {
|
fn deref(&self) -> &Self::Target {
|
||||||
&self.common
|
&self.common
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<E: CryptoHandlerConnectionInstance> InternalConnection<E>{
|
impl<E: CryptoHandlerConnectionInstance> InternalConnection<E> {
|
||||||
fn next_server_count(&mut self) -> u16{
|
fn next_server_count(&mut self) -> u16 {
|
||||||
let prev_val = self.reliable_server_counter;
|
let prev_val = self.reliable_server_counter;
|
||||||
let (val, _) = self.reliable_server_counter.overflowing_add(1);
|
let (val, _) = self.reliable_server_counter.overflowing_add(1);
|
||||||
self.reliable_server_counter = val;
|
self.reliable_server_counter = val;
|
||||||
|
|
||||||
prev_val
|
prev_val
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
async fn send_raw_packet(&self, mut prudp_packet: PRUDPPacket) {
|
||||||
|
prudp_packet.set_sizes();
|
||||||
|
|
||||||
|
let mut vec = Vec::new();
|
||||||
|
|
||||||
|
prudp_packet
|
||||||
|
.write_to(&mut vec)
|
||||||
|
.expect("somehow failed to convert backet to bytes");
|
||||||
|
|
||||||
|
self.socket
|
||||||
|
.send_to(&vec, self.socket_addr.regular_socket_addr)
|
||||||
|
.await
|
||||||
|
.expect("failed to send data back");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct ExternalConnection {
|
pub struct ExternalConnection {
|
||||||
|
|
@ -92,9 +110,9 @@ pub struct ExternalConnection {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct SendingConnection{
|
pub struct SendingConnection {
|
||||||
common: Arc<CommonConnection>,
|
common: Arc<CommonConnection>,
|
||||||
inernal: Weak<Mutex<dyn AnyInternalConnection>>
|
internal: Weak<Mutex<dyn AnyInternalConnection>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct CommonSocket {
|
pub struct CommonSocket {
|
||||||
|
|
@ -120,8 +138,8 @@ pub struct ExternalSocket {
|
||||||
internal: Weak<dyn AnyInternalSocket>,
|
internal: Weak<dyn AnyInternalSocket>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ExternalSocket{
|
impl ExternalSocket {
|
||||||
pub async fn connect(&mut self, addr: PRUDPSockAddr) -> Option<ExternalConnection>{
|
pub async fn connect(&mut self, addr: PRUDPSockAddr) -> Option<ExternalConnection> {
|
||||||
let socket = self.internal.upgrade()?;
|
let socket = self.internal.upgrade()?;
|
||||||
|
|
||||||
socket.connect(addr).await;
|
socket.connect(addr).await;
|
||||||
|
|
@ -129,7 +147,7 @@ impl ExternalSocket{
|
||||||
self.connection_receiver.recv().await
|
self.connection_receiver.recv().await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn accept(&mut self) -> Option<ExternalConnection>{
|
pub async fn accept(&mut self) -> Option<ExternalConnection> {
|
||||||
self.connection_receiver.recv().await
|
self.connection_receiver.recv().await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -152,7 +170,7 @@ impl<T: CryptoHandler> Deref for InternalSocket<T> {
|
||||||
pub(super) trait AnyInternalSocket:
|
pub(super) trait AnyInternalSocket:
|
||||||
Send + Sync + Deref<Target = CommonSocket> + 'static
|
Send + Sync + Deref<Target = CommonSocket> + 'static
|
||||||
{
|
{
|
||||||
async fn recieve_packet(&self, address: PRUDPSockAddr, packet: PRUDPPacket);
|
async fn receive_packet(&self, address: PRUDPSockAddr, packet: PRUDPPacket);
|
||||||
async fn connect(&self, address: PRUDPSockAddr) -> Option<()>;
|
async fn connect(&self, address: PRUDPSockAddr) -> Option<()>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -161,13 +179,15 @@ pub(super) trait AnyInternalConnection:
|
||||||
Send + Sync + Deref<Target = CommonConnection> + 'static
|
Send + Sync + Deref<Target = CommonConnection> + 'static
|
||||||
{
|
{
|
||||||
async fn send_data_packet(&mut self, data: Vec<u8>);
|
async fn send_data_packet(&mut self, data: Vec<u8>);
|
||||||
|
|
||||||
|
async fn close_connection(&mut self);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl<T: CryptoHandlerConnectionInstance> AnyInternalConnection for InternalConnection<T>{
|
impl<T: CryptoHandlerConnectionInstance> AnyInternalConnection for InternalConnection<T> {
|
||||||
async fn send_data_packet(&mut self, data: Vec<u8>) {
|
async fn send_data_packet(&mut self, data: Vec<u8>) {
|
||||||
let mut packet = PRUDPPacket{
|
let mut packet = PRUDPPacket {
|
||||||
header: PRUDPHeader{
|
header: PRUDPHeader {
|
||||||
sequence_id: self.next_server_count(),
|
sequence_id: self.next_server_count(),
|
||||||
substream_id: 0,
|
substream_id: 0,
|
||||||
session_id: self.session_id,
|
session_id: self.session_id,
|
||||||
|
|
@ -181,26 +201,74 @@ impl<T: CryptoHandlerConnectionInstance> AnyInternalConnection for InternalConne
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
self.crypto_handler_instance.encrypt_outgoing(0, &mut packet.payload[..]);
|
self.crypto_handler_instance
|
||||||
|
.encrypt_outgoing(0, &mut packet.payload[..]);
|
||||||
|
|
||||||
packet.set_sizes();
|
packet.set_sizes();
|
||||||
|
|
||||||
self.crypto_handler_instance.sign_packet(&mut packet);
|
self.crypto_handler_instance.sign_packet(&mut packet);
|
||||||
|
|
||||||
let mut vec = Vec::new();
|
self.send_raw_packet(packet).await;
|
||||||
|
}
|
||||||
|
|
||||||
packet
|
async fn close_connection(&mut self) {
|
||||||
.write_to(&mut vec)
|
// jon confirmed that this should be a safe way to dc a client
|
||||||
.expect("somehow failed to convert backet to bytes");
|
|
||||||
|
|
||||||
self.socket
|
let mut packet = PRUDPPacket {
|
||||||
.send_to(&vec, self.socket_addr.regular_socket_addr)
|
header: PRUDPHeader {
|
||||||
.await
|
sequence_id: self.next_server_count(),
|
||||||
.expect("failed to send data back");
|
substream_id: 0,
|
||||||
|
session_id: self.session_id,
|
||||||
|
types_and_flags: TypesFlags::default().types(DISCONNECT),
|
||||||
|
destination_port: self.common.socket_addr.virtual_port,
|
||||||
|
source_port: self.server_port,
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
payload: Vec::new(),
|
||||||
|
options: vec![FragmentId(0)],
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
// no need for encryption the, the payload is empty
|
||||||
|
|
||||||
|
packet.set_sizes();
|
||||||
|
|
||||||
|
self.crypto_handler_instance.sign_packet(&mut packet);
|
||||||
|
|
||||||
|
self.send_raw_packet(packet).await;
|
||||||
|
|
||||||
|
let Some(conns) = self.connections.upgrade() else {
|
||||||
|
// this is fine as it implies the server has already quit, thus meaning that we dont
|
||||||
|
// have to remove ourselves from the server
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut conns = conns.lock().await;
|
||||||
|
|
||||||
|
conns.remove(&self.socket_addr);
|
||||||
|
|
||||||
|
// the connection will now drop as soon as we leave this due to no longer having a permanent
|
||||||
|
// reference
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: CryptoHandler> InternalSocket<T> {
|
impl<T: CryptoHandler> InternalSocket<T> {
|
||||||
|
async fn get_connection(
|
||||||
|
&self,
|
||||||
|
addr: PRUDPSockAddr,
|
||||||
|
) -> Option<Arc<Mutex<InternalConnection<T::CryptoConnectionInstance>>>> {
|
||||||
|
let connections = self.internal_connections.lock().await;
|
||||||
|
let Some(conn) = connections.get(&addr) else {
|
||||||
|
error!("tried to send data on inactive connection!");
|
||||||
|
return None;
|
||||||
|
};
|
||||||
|
|
||||||
|
let conn = conn.clone();
|
||||||
|
drop(connections);
|
||||||
|
|
||||||
|
Some(conn)
|
||||||
|
}
|
||||||
|
|
||||||
async fn send_packet_unbuffered(&self, dest: PRUDPSockAddr, mut packet: PRUDPPacket) {
|
async fn send_packet_unbuffered(&self, dest: PRUDPSockAddr, mut packet: PRUDPPacket) {
|
||||||
packet.set_sizes();
|
packet.set_sizes();
|
||||||
|
|
||||||
|
|
@ -232,12 +300,12 @@ impl<T: CryptoHandler> InternalSocket<T> {
|
||||||
// todo: refactor this to be more readable(low priority cause it doesnt change anything api wise)
|
// todo: refactor this to be more readable(low priority cause it doesnt change anything api wise)
|
||||||
for options in &packet.options {
|
for options in &packet.options {
|
||||||
match options {
|
match options {
|
||||||
SupportedFunctions(functions) => response
|
SupportedFunctions(functions) => {
|
||||||
.options
|
response.options.push(SupportedFunctions(*functions & 0x04))
|
||||||
.push(SupportedFunctions(*functions & 0x04)),
|
}
|
||||||
MaximumSubstreamId(max_substream) => response
|
MaximumSubstreamId(max_substream) => {
|
||||||
.options
|
response.options.push(MaximumSubstreamId(*max_substream))
|
||||||
.push(MaximumSubstreamId(*max_substream)),
|
}
|
||||||
_ => { /* ??? */ }
|
_ => { /* ??? */ }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -248,41 +316,41 @@ impl<T: CryptoHandler> InternalSocket<T> {
|
||||||
|
|
||||||
//println!("got syn: {:?}", response);
|
//println!("got syn: {:?}", response);
|
||||||
|
|
||||||
self.send_packet_unbuffered(address, response)
|
self.send_packet_unbuffered(address, response).await;
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn connection_thread(
|
async fn connection_thread(
|
||||||
socket: Arc<UdpSocket>,
|
connection: Weak<Mutex<InternalConnection<T::CryptoConnectionInstance>>>,
|
||||||
self_port: VirtualPort,
|
|
||||||
connection: Arc<Mutex<InternalConnection<T::CryptoConnectionInstance>>>,
|
|
||||||
mut data_recv: Receiver<Vec<u8>>
|
|
||||||
) {
|
) {
|
||||||
//todo: handle stuff like resending packets if they arent acknowledged in here
|
//todo: handle stuff like resending packets if they arent acknowledged in here
|
||||||
while let Some(data) = data_recv.recv().await{
|
|
||||||
let mut locked_conn = connection.lock().await;
|
while let Some(conn) = connection.upgrade() {
|
||||||
let packet = PRUDPPacket{
|
let mut conn = conn.lock().await;
|
||||||
header: PRUDPHeader{
|
|
||||||
sequence_id: locked_conn.next_server_count(),
|
if conn.last_packet_time < (Instant::now() - Duration::from_secs(5)) {
|
||||||
substream_id: 0,
|
conn.send_raw_packet(PRUDPPacket {
|
||||||
session_id: locked_conn.session_id,
|
header: PRUDPHeader {
|
||||||
types_and_flags: TypesFlags::default().types(DATA).flags(RELIABLE | NEED_ACK),
|
sequence_id: 0,
|
||||||
destination_port: locked_conn.common.socket_addr.virtual_port,
|
substream_id: 0,
|
||||||
source_port: self_port,
|
session_id: 0,
|
||||||
|
types_and_flags: TypesFlags::default().types(PING).flags(NEED_ACK),
|
||||||
|
destination_port: conn.common.socket_addr.virtual_port,
|
||||||
|
source_port: conn.server_port,
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
payload: Vec::new(),
|
||||||
|
options: vec![],
|
||||||
..Default::default()
|
..Default::default()
|
||||||
},
|
})
|
||||||
payload: data,
|
.await;
|
||||||
options: vec![FragmentId(0)],
|
}
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
|
|
||||||
//packet.
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if conn.last_packet_time < (Instant::now() - Duration::from_secs(30)) {
|
||||||
|
conn.close_connection().await;
|
||||||
|
}
|
||||||
|
drop(conn);
|
||||||
|
|
||||||
|
sleep(Duration::from_secs(5)).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -297,7 +365,7 @@ impl<T: CryptoHandler> InternalSocket<T> {
|
||||||
user_id: crypto_handler_instance.get_user_id(),
|
user_id: crypto_handler_instance.get_user_id(),
|
||||||
socket_addr,
|
socket_addr,
|
||||||
session_id,
|
session_id,
|
||||||
server_port: self.virtual_port
|
server_port: self.virtual_port,
|
||||||
});
|
});
|
||||||
|
|
||||||
let (data_sender_from_client, data_receiver_from_client) = channel(16);
|
let (data_sender_from_client, data_receiver_from_client) = channel(16);
|
||||||
|
|
@ -305,11 +373,13 @@ impl<T: CryptoHandler> InternalSocket<T> {
|
||||||
let internal = InternalConnection {
|
let internal = InternalConnection {
|
||||||
common: common.clone(),
|
common: common.clone(),
|
||||||
crypto_handler_instance,
|
crypto_handler_instance,
|
||||||
reliable_client_counter: if is_instantiator { 1 } else { 2 } ,
|
connections: Arc::downgrade(&self.internal_connections),
|
||||||
|
reliable_client_counter: if is_instantiator { 1 } else { 2 },
|
||||||
reliable_server_counter: if is_instantiator { 2 } else { 1 },
|
reliable_server_counter: if is_instantiator { 2 } else { 1 },
|
||||||
data_sender: data_sender_from_client,
|
data_sender: data_sender_from_client,
|
||||||
socket: self.socket.clone(),
|
socket: self.socket.clone(),
|
||||||
packet_queue: Default::default()
|
packet_queue: Default::default(),
|
||||||
|
last_packet_time: Instant::now(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let internal = Arc::new(Mutex::new(internal));
|
let internal = Arc::new(Mutex::new(internal));
|
||||||
|
|
@ -317,12 +387,11 @@ impl<T: CryptoHandler> InternalSocket<T> {
|
||||||
let dyn_internal: Arc<Mutex<dyn AnyInternalConnection>> = internal.clone();
|
let dyn_internal: Arc<Mutex<dyn AnyInternalConnection>> = internal.clone();
|
||||||
|
|
||||||
let external = ExternalConnection {
|
let external = ExternalConnection {
|
||||||
sending: SendingConnection{
|
sending: SendingConnection {
|
||||||
common,
|
common,
|
||||||
inernal: Arc::downgrade(&dyn_internal)
|
internal: Arc::downgrade(&dyn_internal),
|
||||||
},
|
},
|
||||||
data_receiver: data_receiver_from_client,
|
data_receiver: data_receiver_from_client,
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut connections = self.internal_connections.lock().await;
|
let mut connections = self.internal_connections.lock().await;
|
||||||
|
|
@ -331,6 +400,8 @@ impl<T: CryptoHandler> InternalSocket<T> {
|
||||||
|
|
||||||
drop(connections);
|
drop(connections);
|
||||||
|
|
||||||
|
tokio::spawn(Self::connection_thread(Arc::downgrade(&internal)));
|
||||||
|
|
||||||
self.connection_sender
|
self.connection_sender
|
||||||
.send(external)
|
.send(external)
|
||||||
.await
|
.await
|
||||||
|
|
@ -380,7 +451,6 @@ impl<T: CryptoHandler> InternalSocket<T> {
|
||||||
|
|
||||||
response.payload = return_data;
|
response.payload = return_data;
|
||||||
|
|
||||||
|
|
||||||
//let remote_signature = address.calculate_connection_signature();
|
//let remote_signature = address.calculate_connection_signature();
|
||||||
|
|
||||||
response
|
response
|
||||||
|
|
@ -389,24 +459,22 @@ impl<T: CryptoHandler> InternalSocket<T> {
|
||||||
|
|
||||||
for option in &packet.options {
|
for option in &packet.options {
|
||||||
match option {
|
match option {
|
||||||
MaximumSubstreamId(max_substream) => response
|
MaximumSubstreamId(max_substream) => {
|
||||||
.options
|
response.options.push(MaximumSubstreamId(*max_substream))
|
||||||
.push(MaximumSubstreamId(*max_substream)),
|
|
||||||
SupportedFunctions(funcs) => {
|
|
||||||
response.options.push(SupportedFunctions(*funcs))
|
|
||||||
}
|
}
|
||||||
|
SupportedFunctions(funcs) => response.options.push(SupportedFunctions(*funcs)),
|
||||||
_ => { /* ? */ }
|
_ => { /* ? */ }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
response.set_sizes();
|
response.set_sizes();
|
||||||
|
|
||||||
crypto.sign_connect(&mut response);
|
crypto.sign_connect(&mut response);
|
||||||
|
|
||||||
//println!("connect out: {:?}", response);
|
//println!("connect out: {:?}", response);
|
||||||
|
|
||||||
self.create_connection(crypto, address, session_id, false).await;
|
self.create_connection(crypto, address, session_id, false)
|
||||||
|
.await;
|
||||||
|
|
||||||
self.send_packet_unbuffered(address, response).await;
|
self.send_packet_unbuffered(address, response).await;
|
||||||
}
|
}
|
||||||
|
|
@ -414,14 +482,16 @@ impl<T: CryptoHandler> InternalSocket<T> {
|
||||||
async fn handle_data(&self, address: PRUDPSockAddr, mut packet: PRUDPPacket) {
|
async fn handle_data(&self, address: PRUDPSockAddr, mut packet: PRUDPPacket) {
|
||||||
info!("got data");
|
info!("got data");
|
||||||
|
|
||||||
if packet.header.types_and_flags.get_flags() & (NEED_ACK | RELIABLE) != (NEED_ACK | RELIABLE){
|
if packet.header.types_and_flags.get_flags() & (NEED_ACK | RELIABLE)
|
||||||
|
!= (NEED_ACK | RELIABLE)
|
||||||
|
{
|
||||||
error!("invalid or unimplemented packet flags");
|
error!("invalid or unimplemented packet flags");
|
||||||
}
|
}
|
||||||
|
|
||||||
let connections = self.internal_connections.lock().await;
|
let connections = self.internal_connections.lock().await;
|
||||||
let Some(conn) = connections.get(&address) else{
|
let Some(conn) = connections.get(&address) else {
|
||||||
error!("tried to send data on inactive connection!");
|
error!("tried to send data on inactive connection!");
|
||||||
return
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
let conn = conn.clone();
|
let conn = conn.clone();
|
||||||
|
|
@ -436,7 +506,8 @@ impl<T: CryptoHandler> InternalSocket<T> {
|
||||||
let mut counter = conn.reliable_client_counter;
|
let mut counter = conn.reliable_client_counter;
|
||||||
|
|
||||||
while let Some(mut packet) = conn.packet_queue.remove(&counter) {
|
while let Some(mut packet) = conn.packet_queue.remove(&counter) {
|
||||||
conn.crypto_handler_instance.decrypt_incoming(packet.header.substream_id, &mut packet.payload[..]);
|
conn.crypto_handler_instance
|
||||||
|
.decrypt_incoming(packet.header.substream_id, &mut packet.payload[..]);
|
||||||
|
|
||||||
let mut response = packet.base_acknowledgement_packet();
|
let mut response = packet.base_acknowledgement_packet();
|
||||||
response.header.types_and_flags.set_flag(HAS_SIZE | ACK);
|
response.header.types_and_flags.set_flag(HAS_SIZE | ACK);
|
||||||
|
|
@ -453,11 +524,11 @@ impl<T: CryptoHandler> InternalSocket<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_ping(&self, address: PRUDPSockAddr, packet: PRUDPPacket){
|
async fn handle_ping(&self, address: PRUDPSockAddr, packet: PRUDPPacket) {
|
||||||
let connections = self.internal_connections.lock().await;
|
let connections = self.internal_connections.lock().await;
|
||||||
let Some(conn) = connections.get(&address) else{
|
let Some(conn) = connections.get(&address) else {
|
||||||
error!("tried to send data on inactive connection!");
|
error!("tried to send data on inactive connection!");
|
||||||
return
|
return;
|
||||||
};
|
};
|
||||||
let conn = conn.clone();
|
let conn = conn.clone();
|
||||||
drop(connections);
|
drop(connections);
|
||||||
|
|
@ -473,11 +544,11 @@ impl<T: CryptoHandler> InternalSocket<T> {
|
||||||
self.send_packet_unbuffered(address, response).await;
|
self.send_packet_unbuffered(address, response).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_disconnect(&self, address: PRUDPSockAddr, packet: PRUDPPacket){
|
async fn handle_disconnect(&self, address: PRUDPSockAddr, packet: PRUDPPacket) {
|
||||||
let connections = self.internal_connections.lock().await;
|
let connections = self.internal_connections.lock().await;
|
||||||
let Some(conn) = connections.get(&address) else{
|
let Some(conn) = connections.get(&address) else {
|
||||||
error!("tried to send data on inactive connection!");
|
error!("tried to send data on inactive connection!");
|
||||||
return
|
return;
|
||||||
};
|
};
|
||||||
let conn = conn.clone();
|
let conn = conn.clone();
|
||||||
drop(connections);
|
drop(connections);
|
||||||
|
|
@ -493,37 +564,53 @@ impl<T: CryptoHandler> InternalSocket<T> {
|
||||||
self.send_packet_unbuffered(address, response.clone()).await;
|
self.send_packet_unbuffered(address, response.clone()).await;
|
||||||
self.send_packet_unbuffered(address, response.clone()).await;
|
self.send_packet_unbuffered(address, response.clone()).await;
|
||||||
self.send_packet_unbuffered(address, response).await;
|
self.send_packet_unbuffered(address, response).await;
|
||||||
|
|
||||||
|
//self.internal_connections.lock().await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl<T: CryptoHandler> AnyInternalSocket for InternalSocket<T> {
|
impl<T: CryptoHandler> AnyInternalSocket for InternalSocket<T> {
|
||||||
async fn recieve_packet(&self, address: PRUDPSockAddr, packet: PRUDPPacket) {
|
async fn receive_packet(&self, address: PRUDPSockAddr, packet: PRUDPPacket) {
|
||||||
// todo: handle acks
|
// todo: handle acks and resending
|
||||||
|
|
||||||
|
if let Some(conn) = self.get_connection(address).await {
|
||||||
|
let mut conn = conn.lock().await;
|
||||||
|
|
||||||
|
// reset timeout
|
||||||
|
conn.last_packet_time = Instant::now();
|
||||||
|
}
|
||||||
|
|
||||||
if (packet.header.types_and_flags.get_flags() & ACK) != 0 {
|
if (packet.header.types_and_flags.get_flags() & ACK) != 0 {
|
||||||
info!("got ack");
|
info!("got ack");
|
||||||
if packet.header.types_and_flags.get_types() == SYN ||
|
|
||||||
packet.header.types_and_flags.get_types() == CONNECT{
|
|
||||||
|
|
||||||
if packet.header.types_and_flags.get_types() == SYN{
|
|
||||||
|
if packet.header.types_and_flags.get_types() == SYN
|
||||||
|
|| packet.header.types_and_flags.get_types() == CONNECT
|
||||||
|
{
|
||||||
|
if packet.header.types_and_flags.get_types() == SYN {
|
||||||
println!("Syn: {:?}", packet);
|
println!("Syn: {:?}", packet);
|
||||||
}
|
}
|
||||||
|
|
||||||
if packet.header.types_and_flags.get_types() == CONNECT{
|
if packet.header.types_and_flags.get_types() == CONNECT {
|
||||||
println!("Connect: {:?}", packet);
|
println!("Connect: {:?}", packet);
|
||||||
}
|
}
|
||||||
|
|
||||||
let sender = self.connection_establishment_data_sender.lock().await;
|
let sender = self.connection_establishment_data_sender.lock().await;
|
||||||
info!("redirecting ack to active connection establishment code");
|
info!("redirecting ack to active connection establishment code");
|
||||||
|
|
||||||
if let Some(conn) = sender.as_ref(){
|
if let Some(conn) = sender.as_ref() {
|
||||||
if let Err(e) = conn.send(packet).await {
|
if let Err(e) = conn.send(packet).await {
|
||||||
error!("error whilest sending data to connection establishment: {}", e);
|
error!(
|
||||||
|
"error whilest sending data to connection establishment: {}",
|
||||||
|
e
|
||||||
|
);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
error!("got connection response without the active reciever being present");
|
error!("got connection response without the active reciever being present");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -556,8 +643,8 @@ impl<T: CryptoHandler> AnyInternalSocket for InternalSocket<T> {
|
||||||
|
|
||||||
let remote_signature = address.calculate_connection_signature();
|
let remote_signature = address.calculate_connection_signature();
|
||||||
|
|
||||||
let packet = PRUDPPacket{
|
let packet = PRUDPPacket {
|
||||||
header: PRUDPHeader{
|
header: PRUDPHeader {
|
||||||
source_port: self.virtual_port,
|
source_port: self.virtual_port,
|
||||||
destination_port: address.virtual_port,
|
destination_port: address.virtual_port,
|
||||||
types_and_flags: TypesFlags::default().types(SYN).flags(NEED_ACK),
|
types_and_flags: TypesFlags::default().types(SYN).flags(NEED_ACK),
|
||||||
|
|
@ -566,16 +653,14 @@ impl<T: CryptoHandler> AnyInternalSocket for InternalSocket<T> {
|
||||||
options: vec![
|
options: vec![
|
||||||
SupportedFunctions(0x104),
|
SupportedFunctions(0x104),
|
||||||
MaximumSubstreamId(0),
|
MaximumSubstreamId(0),
|
||||||
ConnectionSignature(remote_signature)
|
ConnectionSignature(remote_signature),
|
||||||
],
|
],
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
self.send_packet_unbuffered(address, packet).await;
|
self.send_packet_unbuffered(address, packet).await;
|
||||||
|
|
||||||
let Some(syn_ack_packet) = recv.recv().await else{
|
let Some(syn_ack_packet) = recv.recv().await else {
|
||||||
error!("what");
|
error!("what");
|
||||||
return None;
|
return None;
|
||||||
};
|
};
|
||||||
|
|
@ -589,10 +674,8 @@ impl<T: CryptoHandler> AnyInternalSocket for InternalSocket<T> {
|
||||||
return None;
|
return None;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let packet = PRUDPPacket {
|
||||||
|
header: PRUDPHeader {
|
||||||
let packet = PRUDPPacket{
|
|
||||||
header: PRUDPHeader{
|
|
||||||
source_port: self.virtual_port,
|
source_port: self.virtual_port,
|
||||||
destination_port: address.virtual_port,
|
destination_port: address.virtual_port,
|
||||||
types_and_flags: TypesFlags::default().types(CONNECT).flags(NEED_ACK),
|
types_and_flags: TypesFlags::default().types(CONNECT).flags(NEED_ACK),
|
||||||
|
|
@ -601,19 +684,21 @@ impl<T: CryptoHandler> AnyInternalSocket for InternalSocket<T> {
|
||||||
options: vec![
|
options: vec![
|
||||||
SupportedFunctions(0x04),
|
SupportedFunctions(0x04),
|
||||||
MaximumSubstreamId(0),
|
MaximumSubstreamId(0),
|
||||||
ConnectionSignature(remote_signature)
|
ConnectionSignature(remote_signature),
|
||||||
],
|
],
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send_packet_unbuffered(address, packet).await;
|
self.send_packet_unbuffered(address, packet).await;
|
||||||
|
|
||||||
let Some(connect_ack_packet) = recv.recv().await else{
|
let Some(connect_ack_packet) = recv.recv().await else {
|
||||||
error!("what");
|
error!("what");
|
||||||
return None;
|
return None;
|
||||||
};
|
};
|
||||||
|
|
||||||
let (_, crypt) = self.crypto_handler.instantiate(remote_signature, *own_signature, &[], 1)?;
|
let (_, crypt) =
|
||||||
|
self.crypto_handler
|
||||||
|
.instantiate(remote_signature, *own_signature, &[], 1)?;
|
||||||
|
|
||||||
//todo: make this work for secure servers as well
|
//todo: make this work for secure servers as well
|
||||||
self.create_connection(crypt, address, 0, true).await;
|
self.create_connection(crypt, address, 0, true).await;
|
||||||
|
|
@ -680,38 +765,54 @@ pub trait CryptoHandler: Send + Sync + 'static {
|
||||||
fn sign_pre_handshake(&self, packet: &mut PRUDPPacket);
|
fn sign_pre_handshake(&self, packet: &mut PRUDPPacket);
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Deref for ExternalConnection{
|
impl Deref for ExternalConnection {
|
||||||
type Target = SendingConnection;
|
type Target = SendingConnection;
|
||||||
fn deref(&self) -> &Self::Target {
|
fn deref(&self) -> &Self::Target {
|
||||||
&self.sending
|
&self.sending
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Deref for SendingConnection{
|
impl Deref for SendingConnection {
|
||||||
type Target = CommonConnection;
|
type Target = CommonConnection;
|
||||||
fn deref(&self) -> &Self::Target {
|
fn deref(&self) -> &Self::Target {
|
||||||
&self.common
|
&self.common
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ExternalConnection{
|
impl ExternalConnection {
|
||||||
pub async fn recv(&mut self) -> Option<Vec<u8>>{
|
pub async fn recv(&mut self) -> Option<Vec<u8>> {
|
||||||
self.data_receiver.recv().await
|
self.data_receiver.recv().await
|
||||||
}
|
}
|
||||||
//todo: make this an actual result instead of an option
|
//todo: make this an actual result instead of an option
|
||||||
|
|
||||||
pub fn duplicate_sender(&self) -> SendingConnection{
|
pub fn duplicate_sender(&self) -> SendingConnection {
|
||||||
self.sending.clone()
|
self.sending.clone()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SendingConnection{
|
impl SendingConnection {
|
||||||
pub async fn send(&self, data: Vec<u8>) -> Option<()> {
|
pub async fn send(&self, data: Vec<u8>) -> Option<()> {
|
||||||
let internal = self.inernal.upgrade()?;
|
let internal = self.internal.upgrade()?;
|
||||||
|
|
||||||
let mut internal = internal.lock().await;
|
let mut internal = internal.lock().await;
|
||||||
|
|
||||||
internal.send_data_packet(data).await;
|
internal.send_data_packet(data).await;
|
||||||
Some(())
|
Some(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn close_connection(&self) {
|
||||||
|
let Some(internal) = self.internal.upgrade() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut internal = internal.lock().await;
|
||||||
|
|
||||||
|
internal.close_connection().await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<E: CryptoHandlerConnectionInstance> Drop for InternalConnection<E> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
println!("yatta");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -274,6 +274,8 @@ async fn handle_incoming<T: RmcCallable + Send + Sync + 'static>(
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
info!("rmc disconnected")
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_rmc_gateway_connection<T: RmcCallable + Sync + Send + 'static,F>(conn: ExternalConnection, create_internal: F) -> Arc<T>
|
pub fn new_rmc_gateway_connection<T: RmcCallable + Sync + Send + 'static,F>(conn: ExternalConnection, create_internal: F) -> Arc<T>
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,7 @@ use tokio::task::JoinHandle;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
use crate::nex::matchmake::MatchmakeManager;
|
use crate::nex::matchmake::MatchmakeManager;
|
||||||
|
use crate::rmc::protocols::HasRmcConnection;
|
||||||
use crate::rmc::protocols::notifications::NotificationEvent;
|
use crate::rmc::protocols::notifications::NotificationEvent;
|
||||||
|
|
||||||
struct RnexApiAuth;
|
struct RnexApiAuth;
|
||||||
|
|
@ -45,6 +46,21 @@ async fn players_in_match(mmm: &State<Arc<MatchmakeManager>>, gid: u32) -> Optio
|
||||||
Some(Json(gathering.connected_players.iter().filter_map(|p| p.upgrade()).map(|p| p.pid).collect()))
|
Some(Json(gathering.connected_players.iter().filter_map(|p| p.upgrade()).map(|p| p.pid).collect()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[get("/player/<pid>/disconnect")]
|
||||||
|
async fn disconnect_player(_auth: RnexApiAuth, mmm: &State<Arc<MatchmakeManager>>, pid: u32) -> Option<()>{
|
||||||
|
// this doesnt work and is broken, there might be some other way to remotely close gatherings...
|
||||||
|
// also if anyone gets this working change it to POST cause the only reason its get is because
|
||||||
|
// that makes testing it easier
|
||||||
|
let mmm = mmm.users.read().await;
|
||||||
|
|
||||||
|
for player in mmm.values().filter_map(|p| p.upgrade()).filter(|p| p.pid == pid) {
|
||||||
|
player.remote.get_connection().0.close_connection().await;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Some(())
|
||||||
|
}
|
||||||
|
|
||||||
#[get("/gathering/<gid>/close")]
|
#[get("/gathering/<gid>/close")]
|
||||||
async fn close_gathering(_auth: RnexApiAuth, mmm: &State<Arc<MatchmakeManager>>, gid: u32) -> Option<()>{
|
async fn close_gathering(_auth: RnexApiAuth, mmm: &State<Arc<MatchmakeManager>>, gid: u32) -> Option<()>{
|
||||||
// this doesnt work and is broken, there might be some other way to remotely close gatherings...
|
// this doesnt work and is broken, there might be some other way to remotely close gatherings...
|
||||||
|
|
@ -73,7 +89,7 @@ async fn close_gathering(_auth: RnexApiAuth, mmm: &State<Arc<MatchmakeManager>>,
|
||||||
pub async fn start_web(mgr: Arc<MatchmakeManager>) -> JoinHandle<()> {
|
pub async fn start_web(mgr: Arc<MatchmakeManager>) -> JoinHandle<()> {
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
rocket::build()
|
rocket::build()
|
||||||
.mount("/", routes![gatherings, players_in_match, close_gathering])
|
.mount("/", routes![gatherings, players_in_match, close_gathering, disconnect_player])
|
||||||
.manage(mgr)
|
.manage(mgr)
|
||||||
.launch().await
|
.launch().await
|
||||||
.expect("unable to start webserver");
|
.expect("unable to start webserver");
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue