dont lock up the connection whilest sending fragments
This commit is contained in:
parent
9275f3b09f
commit
dc6307cfcf
3 changed files with 947 additions and 677 deletions
|
|
@ -16,6 +16,7 @@ use std::io::Cursor;
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
use std::sync::{Arc, Weak};
|
use std::sync::{Arc, Weak};
|
||||||
|
use tokio::spawn;
|
||||||
use v_byte_helpers::ReadExtensions;
|
use v_byte_helpers::ReadExtensions;
|
||||||
use v_byte_helpers::little_endian::read_u16;
|
use v_byte_helpers::little_endian::read_u16;
|
||||||
|
|
||||||
|
|
@ -68,6 +69,33 @@ impl<E: CryptoHandlerConnectionInstance> InternalConnection<E> {
|
||||||
prev_val
|
prev_val
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn close_connection(&mut self) {
|
||||||
|
let mut packet = PRUDPV1Packet {
|
||||||
|
header: PRUDPV1Header {
|
||||||
|
sequence_id: self.next_server_count(),
|
||||||
|
substream_id: 0,
|
||||||
|
session_id: self.session_id,
|
||||||
|
types_and_flags: TypesFlags::default().types(DISCONNECT),
|
||||||
|
destination_port: self.common.socket_addr.virtual_port,
|
||||||
|
source_port: self.server_port,
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
payload: Vec::new(),
|
||||||
|
options: vec![FragmentId(0)],
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
// no need for encryption the, the payload is empty
|
||||||
|
|
||||||
|
packet.set_sizes();
|
||||||
|
|
||||||
|
self.crypto_handler_instance.sign_packet(&mut packet);
|
||||||
|
|
||||||
|
self.send_raw_packet(&packet).await;
|
||||||
|
|
||||||
|
self.delete_connection().await;
|
||||||
|
}
|
||||||
|
|
||||||
/// Sends a raw packet to a given client on the connection
|
/// Sends a raw packet to a given client on the connection
|
||||||
///
|
///
|
||||||
/// a raw packet is one which does not get processed any further(other than to send it
|
/// a raw packet is one which does not get processed any further(other than to send it
|
||||||
|
|
@ -103,7 +131,7 @@ pub struct ExternalConnection {
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct SendingConnection {
|
pub struct SendingConnection {
|
||||||
common: Arc<CommonConnection>,
|
common: Arc<CommonConnection>,
|
||||||
internal: Weak<Mutex<dyn AnyInternalConnection>>,
|
internal: Weak<dyn AnyInternalConnection>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct CommonSocket {
|
pub struct CommonSocket {
|
||||||
|
|
@ -166,80 +194,67 @@ pub(super) trait AnyInternalSocket:
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub(super) trait AnyInternalConnection:
|
pub(super) trait AnyInternalConnection: Send + Sync + 'static {
|
||||||
Send + Sync + Deref<Target = CommonConnection> + 'static
|
async fn send_data_packet(&self, data: Vec<u8>);
|
||||||
{
|
|
||||||
async fn send_data_packet(&mut self, data: Vec<u8>);
|
|
||||||
|
|
||||||
async fn close_connection(&mut self);
|
async fn close_connection(&self);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl<T: CryptoHandlerConnectionInstance> AnyInternalConnection for InternalConnection<T> {
|
impl<T: CryptoHandlerConnectionInstance> AnyInternalConnection for Mutex<InternalConnection<T>> {
|
||||||
async fn send_data_packet(&mut self, data: Vec<u8>) {
|
async fn send_data_packet(&self, data: Vec<u8>) {
|
||||||
let pieces = data.chunks(600);
|
let pieces = data.chunks(600);
|
||||||
let max_piece = pieces.len() - 1;
|
let max_piece = pieces.len() - 1;
|
||||||
let mut piece_num = 1;
|
let mut piece_num = 1;
|
||||||
for (i, piece) in pieces.enumerate() {
|
let mut locked = self.lock().await;
|
||||||
let mut packet = PRUDPV1Packet {
|
let packets: Vec<_> = pieces
|
||||||
header: PRUDPV1Header {
|
.enumerate()
|
||||||
sequence_id: self.next_server_count(),
|
.map(|(i, piece)| {
|
||||||
substream_id: 0,
|
let mut packet = PRUDPV1Packet {
|
||||||
session_id: self.session_id,
|
header: PRUDPV1Header {
|
||||||
types_and_flags: TypesFlags::default().types(DATA).flags(RELIABLE | NEED_ACK),
|
sequence_id: locked.next_server_count(),
|
||||||
destination_port: self.common.socket_addr.virtual_port,
|
substream_id: 0,
|
||||||
source_port: self.server_port,
|
session_id: locked.session_id,
|
||||||
|
types_and_flags: TypesFlags::default()
|
||||||
|
.types(DATA)
|
||||||
|
.flags(RELIABLE | NEED_ACK),
|
||||||
|
destination_port: locked.common.socket_addr.virtual_port,
|
||||||
|
source_port: locked.server_port,
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
payload: piece.to_owned(),
|
||||||
|
options: vec![FragmentId(if i == max_piece { 0 } else { piece_num })],
|
||||||
..Default::default()
|
..Default::default()
|
||||||
},
|
};
|
||||||
payload: piece.to_owned(),
|
|
||||||
options: vec![FragmentId(if i == max_piece { 0 } else { piece_num })],
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
|
|
||||||
self.crypto_handler_instance
|
locked
|
||||||
.encrypt_outgoing(0, &mut packet.payload[..]);
|
.crypto_handler_instance
|
||||||
|
.encrypt_outgoing(0, &mut packet.payload[..]);
|
||||||
|
|
||||||
packet.set_sizes();
|
packet.set_sizes();
|
||||||
|
|
||||||
self.crypto_handler_instance.sign_packet(&mut packet);
|
locked.crypto_handler_instance.sign_packet(&mut packet);
|
||||||
|
|
||||||
self.send_raw_packet(&packet).await;
|
piece_num += 1;
|
||||||
|
packet
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
drop(locked);
|
||||||
|
|
||||||
self.unacknowleged_packets.push((Instant::now(), packet));
|
for packet in packets {
|
||||||
|
let mut locked = self.lock().await;
|
||||||
|
locked.send_raw_packet(&packet).await;
|
||||||
|
|
||||||
|
locked.unacknowleged_packets.push((Instant::now(), packet));
|
||||||
|
drop(locked);
|
||||||
sleep(Duration::from_secs(16)).await;
|
sleep(Duration::from_secs(16)).await;
|
||||||
|
|
||||||
piece_num += 1;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn close_connection(&mut self) {
|
async fn close_connection(&self) {
|
||||||
// jon confirmed that this should be a safe way to dc a client
|
let mut locked = self.lock().await;
|
||||||
|
|
||||||
let mut packet = PRUDPV1Packet {
|
locked.close_connection().await;
|
||||||
header: PRUDPV1Header {
|
|
||||||
sequence_id: self.next_server_count(),
|
|
||||||
substream_id: 0,
|
|
||||||
session_id: self.session_id,
|
|
||||||
types_and_flags: TypesFlags::default().types(DISCONNECT),
|
|
||||||
destination_port: self.common.socket_addr.virtual_port,
|
|
||||||
source_port: self.server_port,
|
|
||||||
..Default::default()
|
|
||||||
},
|
|
||||||
payload: Vec::new(),
|
|
||||||
options: vec![FragmentId(0)],
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
|
|
||||||
// no need for encryption the, the payload is empty
|
|
||||||
|
|
||||||
packet.set_sizes();
|
|
||||||
|
|
||||||
self.crypto_handler_instance.sign_packet(&mut packet);
|
|
||||||
|
|
||||||
self.send_raw_packet(&packet).await;
|
|
||||||
|
|
||||||
self.delete_connection().await;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -403,7 +418,7 @@ impl<T: CryptoHandler> InternalSocket<T> {
|
||||||
|
|
||||||
let internal = Arc::new(Mutex::new(internal));
|
let internal = Arc::new(Mutex::new(internal));
|
||||||
|
|
||||||
let dyn_internal: Arc<Mutex<dyn AnyInternalConnection>> = internal.clone();
|
let dyn_internal: Arc<dyn AnyInternalConnection> = internal.clone();
|
||||||
|
|
||||||
let external = ExternalConnection {
|
let external = ExternalConnection {
|
||||||
sending: SendingConnection {
|
sending: SendingConnection {
|
||||||
|
|
@ -884,10 +899,9 @@ impl ExternalConnection {
|
||||||
impl SendingConnection {
|
impl SendingConnection {
|
||||||
pub async fn send(&self, data: Vec<u8>) -> Option<()> {
|
pub async fn send(&self, data: Vec<u8>) -> Option<()> {
|
||||||
let internal = self.internal.upgrade()?;
|
let internal = self.internal.upgrade()?;
|
||||||
|
spawn(async move {
|
||||||
let mut internal = internal.lock().await;
|
internal.send_data_packet(data).await;
|
||||||
|
});
|
||||||
internal.send_data_packet(data).await;
|
|
||||||
Some(())
|
Some(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -896,8 +910,6 @@ impl SendingConnection {
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut internal = internal.lock().await;
|
|
||||||
|
|
||||||
internal.close_connection().await;
|
internal.close_connection().await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -22,6 +22,7 @@ const IP_REQ_SERVICE_URL: &str = "https://ipinfo.io/ip";
|
||||||
|
|
||||||
cfg_if! {
|
cfg_if! {
|
||||||
if #[cfg(feature = "datastore")] {
|
if #[cfg(feature = "datastore")] {
|
||||||
|
use std::sync::{LazyLock, OnceLock};
|
||||||
pub static RNEX_DATASTORE_DATABASE_URL: LazyLock<String> = LazyLock::new(|| {
|
pub static RNEX_DATASTORE_DATABASE_URL: LazyLock<String> = LazyLock::new(|| {
|
||||||
std::env::var("RNEX_DATASTORE_DATABASE_URL")
|
std::env::var("RNEX_DATASTORE_DATABASE_URL")
|
||||||
.expect("RNEX_DATASTORE_DATABASE_URL must be set")
|
.expect("RNEX_DATASTORE_DATABASE_URL must be set")
|
||||||
|
|
|
||||||
File diff suppressed because it is too large
Load diff
Loading…
Add table
Add a link
Reference in a new issue