diff --git a/prudpv0/src/server.rs b/prudpv0/src/server.rs index 34c7647..a336abb 100644 --- a/prudpv0/src/server.rs +++ b/prudpv0/src/server.rs @@ -106,50 +106,55 @@ impl Server { .expect("packet malformed in creation"), );*/ let mut inner = conn.inner.lock().await; - let seq = inner.server_packet_counter; - let packet = new_data_packet( - NEED_ACK | RELIABLE, - self.param.virtual_port, - conn.addr.virtual_port, - data, - inner.server_packet_counter, - conn.session_id, - 0, - &mut inner.crypto_instance, - &self.crypto, - ); - inner.server_packet_counter += 1; + let pieces = data.chunks(1000); + let max_piece = pieces.len() - 1; + let mut frag_num = 1; + for (i, piece) in pieces.enumerate() { + let seq = inner.server_packet_counter; + let packet = new_data_packet( + NEED_ACK | RELIABLE, + (&self).param.virtual_port, + conn.addr.virtual_port, + piece, + inner.server_packet_counter, + conn.session_id, + if i == max_piece { 0 } else { frag_num }, + &mut inner.crypto_instance, + &(&self).crypto, + ); + inner.server_packet_counter += 1; - let packet = Arc::new(packet); - let packet_ref = Arc::downgrade(&packet); + let packet = Arc::new(packet); + let packet_ref = Arc::downgrade(&packet); - inner.unacknowledged_packets.insert(seq, packet); + inner.unacknowledged_packets.insert(seq, packet); + let conn = Arc::downgrade(&conn); + let this = Arc::downgrade(&self); + + spawn(async move { + for n in 0..5 { + let Some(data) = packet_ref.upgrade() else { + return; + }; + let Some(conn) = conn.upgrade() else { + return; + }; + let Some(this) = this.upgrade() else { + return; + }; + info!("send attempt {}", n); + + self.socket + .send_to(&data, conn.addr.regular_socket_addr) + .await; + + break; + } + }); + frag_num += 1; + } drop(inner); - - let conn = Arc::downgrade(&conn); - let this = Arc::downgrade(&self); - - spawn(async move { - for n in 0..5 { - let Some(data) = packet_ref.upgrade() else { - return; - }; - let Some(conn) = conn.upgrade() else { - return; - }; - let Some(this) = this.upgrade() else { - return; - }; - info!("send attempt {}", n); - - self.socket - .send_to(&data, conn.addr.regular_socket_addr) - .await; - - break; - } - }); } async fn connection_thread( self: Arc,