feat & chore: clean up and push current progress on splatfest matchmaking

This commit is contained in:
DJMrTV 2025-05-14 09:52:24 +02:00
commit 7703aafe3c
32 changed files with 436 additions and 1181 deletions

View file

@ -1,26 +0,0 @@
use std::io::Cursor;
use std::sync::Arc;
use log::error;
use tokio::sync::Mutex;
use crate::protocols::auth::AuthProtocolConfig;
use crate::rmc::message::RMCMessage;
use crate::rmc::response::{ErrorCode, RMCResponseResult};
use crate::rmc::structures::RmcSerialize;
pub async fn login(rmcmessage: &RMCMessage, _name: &str) -> RMCResponseResult{
rmcmessage.error_result_with_code(ErrorCode::Core_NotImplemented)
}
pub async fn login_raw_params(rmcmessage: &RMCMessage, _: &Arc<SocketData>, _: &Arc<Mutex<ConnectionData>>, _data: AuthProtocolConfig) -> RMCResponseResult{
let mut reader = Cursor::new(&rmcmessage.rest_of_data);
let Ok(str) = String::deserialize(&mut reader) else {
error!("error reading packet");
return rmcmessage.error_result_with_code(ErrorCode::Core_InvalidArgument);
};
login(rmcmessage, &str).await
}

View file

@ -1,81 +0,0 @@
use std::io::{Cursor, Write};
use std::sync::Arc;
use bytemuck::bytes_of;
use log::{error};
use tokio::sync::Mutex;
use crate::grpc::account;
use crate::kerberos::KerberosDateTime;
use crate::protocols::auth::AuthProtocolConfig;
use crate::protocols::auth::ticket_generation::generate_ticket;
use crate::rmc;
use crate::rmc::message::RMCMessage;
use crate::rmc::response::{ErrorCode, RMCResponseResult};
use crate::rmc::structures::{RmcSerialize};
use crate::rmc::structures::any::Any;
use crate::rmc::structures::qresult::QResult;
pub async fn login_ex(rmcmessage: &RMCMessage, proto_data: AuthProtocolConfig, pid: u32) -> RMCResponseResult{
// todo: figure out how the AuthenticationInfo struct works, parse it and validate login info
let Ok(mut client) = account::Client::new().await else {
return rmcmessage.error_result_with_code(ErrorCode::Core_Exception);
};
let Ok(passwd) = client.get_nex_password(pid).await else{
return rmcmessage.error_result_with_code(ErrorCode::Core_Exception);
};
let source_login_data = (pid, passwd);
let destination_login_data = proto_data.secure_server_account.get_login_data();
let ticket = generate_ticket(source_login_data, destination_login_data);
let result = QResult::success(ErrorCode::Core_Unknown);
let connection_data = rmc::structures::connection_data::ConnectionData{
station_url: proto_data.station_url,
special_station_url: "",
date_time: KerberosDateTime::now(),
special_protocols: Vec::new()
};
let mut response: Vec<u8> = Vec::new();
result.serialize(&mut response).expect("failed serializing result");
response.write_all(bytes_of(&source_login_data.0)).expect("failed writing pid");
ticket.serialize(&mut response).expect("failed serializing ticket");
connection_data.serialize(&mut response).expect("failed writing connection data");
proto_data.build_name.serialize(&mut response).expect("failed writing build name");
return rmcmessage.success_with_data(response);
}
pub async fn login_ex_raw_params(rmcmessage: &RMCMessage, _: &Arc<SocketData>, _: &Arc<Mutex<ConnectionData>>, data: AuthProtocolConfig) -> RMCResponseResult{
let mut reader = Cursor::new(&rmcmessage.rest_of_data);
let Ok(str) = String::deserialize(&mut reader) else {
error!("error reading packet");
return rmcmessage.error_result_with_code(ErrorCode::Core_InvalidArgument);
};
let Ok(any) = Any::deserialize(&mut reader) else {
error!("error reading packet");
return rmcmessage.error_result_with_code(ErrorCode::Core_InvalidArgument);
};
match any.name.as_ref(){
"AuthenticationInfo" => {
}
v => {
error!("error reading packet: invalid structure type: {}", v);
return rmcmessage.error_result_with_code(ErrorCode::Core_InvalidArgument);
}
}
let Ok(pid) = str.parse() else {
return rmcmessage.error_result_with_code(ErrorCode::Core_InvalidArgument);
};
login_ex(rmcmessage, data, pid).await
}

View file

@ -1,51 +0,0 @@
use std::io::Cursor;
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::endianness::{IS_BIG_ENDIAN, ReadExtensions};
use crate::protocols::auth::{AuthProtocolConfig, get_login_data_by_pid};
use crate::protocols::auth::ticket_generation::generate_ticket;
use crate::rmc::message::RMCMessage;
use crate::rmc::response::{ErrorCode, RMCResponseResult};
use crate::rmc::response::ErrorCode::Core_Unknown;
use crate::rmc::structures::qresult::QResult;
use crate::rmc::structures::RmcSerialize;
pub async fn request_ticket(rmcmessage: &RMCMessage, data: AuthProtocolConfig, source_pid: u32, destination_pid: u32) -> RMCResponseResult{
let Some(source_login_data) = get_login_data_by_pid(source_pid).await else {
return rmcmessage.error_result_with_code(ErrorCode::Core_Exception);
};
let desgination_login_data = if destination_pid == data.secure_server_account.pid{
data.secure_server_account.get_login_data()
} else {
let Some(login) = get_login_data_by_pid(destination_pid).await else {
return rmcmessage.error_result_with_code(ErrorCode::Core_Exception);
};
login
};
let result = QResult::success(Core_Unknown);
let ticket = generate_ticket(source_login_data, desgination_login_data);
let mut response: Vec<u8> = Vec::new();
result.serialize(&mut response).expect("failed serializing result");
ticket.serialize(&mut response).expect("failed serializing ticket");
rmcmessage.success_with_data(response)
}
pub async fn request_ticket_raw_params(rmcmessage: &RMCMessage, _: &Arc<SocketData>, _: &Arc<Mutex<ConnectionData>>, data: AuthProtocolConfig) -> RMCResponseResult{
let mut reader = Cursor::new(&rmcmessage.rest_of_data);
let Ok(source_pid) = reader.read_struct(IS_BIG_ENDIAN) else {
return rmcmessage.error_result_with_code(ErrorCode::Core_InvalidArgument);
};
let Ok(destination_pid) = reader.read_struct(IS_BIG_ENDIAN) else {
return rmcmessage.error_result_with_code(ErrorCode::Core_InvalidArgument);
};
request_ticket(rmcmessage, data, source_pid, destination_pid).await
}

View file

@ -1,38 +0,0 @@
mod method_login_ex;
mod method_login;
mod ticket_generation;
mod method_request_ticket;
use crate::define_protocol;
use crate::grpc::account;
use crate::nex::account::Account;
use crate::protocols::auth::method_login::login_raw_params;
use crate::protocols::auth::method_login_ex::login_ex_raw_params;
use crate::protocols::auth::method_request_ticket::request_ticket_raw_params;
#[derive(Copy, Clone)]
pub struct AuthProtocolConfig {
pub secure_server_account: &'static Account,
pub build_name: &'static str,
pub station_url: &'static str
}
define_protocol!{
10(proto_data: AuthProtocolConfig) => {
0x01 => login_raw_params,
0x02 => login_ex_raw_params,
0x03 => request_ticket_raw_params
}
}
async fn get_login_data_by_pid(pid: u32) -> Option<(u32, [u8; 16])> {
let Ok(mut client) = account::Client::new().await else {
return None
};
let Ok(passwd) = client.get_nex_password(pid).await else{
return None
};
Some((pid, passwd))
}

View file

@ -1,19 +0,0 @@
use crate::kerberos;
use crate::kerberos::{derive_key, Ticket};
pub fn generate_ticket(source_act_login_data: (u32, [u8;16]), dest_act_login_data: (u32, [u8;16])) -> Box<[u8]>{
let source_key = derive_key(source_act_login_data.0, source_act_login_data.1);
let dest_key = derive_key(dest_act_login_data.0, dest_act_login_data.1);
let internal_data = kerberos::TicketInternalData::new(source_act_login_data.0);
let encrypted_inner = internal_data.encrypt(dest_key);
let encrypted_session_ticket = Ticket{
pid: dest_act_login_data.0,
session_key: internal_data.session_key,
}.encrypt(source_key, &encrypted_inner);
encrypted_session_ticket
}

View file

@ -1,35 +0,0 @@
use std::io::Cursor;
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
use crate::protocols::matchmake_common::MatchmakeData;
use crate::rmc::message::RMCMessage;
use crate::rmc::response::{ErrorCode, RMCResponseResult};
use crate::rmc::structures::qresult::QResult;
use crate::rmc::structures::RmcSerialize;
pub async fn unregister_gathering(rmcmessage: &RMCMessage, gid: u32, data: Arc<RwLock<MatchmakeData>>) -> RMCResponseResult{
let mut rd = data.write().await;
rd.matchmake_sessions.remove(&gid);
let result = QResult::success(ErrorCode::Core_Unknown);
let mut response = Vec::new();
result.serialize(&mut response).expect("aaa");
rmcmessage.success_with_data(response)
}
pub async fn unregister_gathering_raw_params(rmcmessage: &RMCMessage, _: &Arc<SocketData>, _: &Arc<Mutex<ConnectionData>>, data: Arc<RwLock<MatchmakeData>>) -> RMCResponseResult{
let mut reader = Cursor::new(&rmcmessage.rest_of_data);
let Ok(gid) = u32::deserialize(&mut reader) else {
return rmcmessage.error_result_with_code(ErrorCode::Core_InvalidArgument);
};
unregister_gathering(rmcmessage, gid, data).await
}

View file

@ -1,13 +0,0 @@
mod method_unregister_gathering;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::define_protocol;
use crate::protocols::matchmake::method_unregister_gathering::unregister_gathering_raw_params;
use crate::protocols::matchmake_common::MatchmakeData;
define_protocol!{
21(matchmake_data: Arc<RwLock<MatchmakeData>>) => {
2 => unregister_gathering_raw_params
}
}

View file

@ -1,136 +0,0 @@
use std::collections::{BTreeMap};
use std::sync::Arc;
use log::error;
use rand::random;
use tokio::sync::{Mutex, RwLock};
use crate::kerberos::KerberosDateTime;
use crate::protocols::notification::Notification;
use crate::rmc::structures::matchmake::{Gathering, MatchmakeParam, MatchmakeSession};
use crate::rmc::structures::variant::Variant;
#[derive(Default, Debug)]
pub struct ExtendedMatchmakeSession{
pub session: MatchmakeSession,
pub connected_players: Vec<Arc<Mutex<ConnectionData>>>,
}
pub struct MatchmakeData{
pub(crate) matchmake_sessions: BTreeMap<u32, Arc<Mutex<ExtendedMatchmakeSession>>>
}
impl ExtendedMatchmakeSession{
pub async fn from_matchmake_session(session: MatchmakeSession, host: &Mutex<ConnectionData>) -> Self{
let host = host.lock().await;
let ConnectionData{
active_connection_data,
..
} = &*host;
let Some(active_connection_data) = active_connection_data else{
return Default::default();
};
let ActiveConnectionData{
active_secure_connection_data,
..
} = active_connection_data;
let Some(active_secure_connection_data) = active_secure_connection_data else{
return Default::default();
};
let mm_session = MatchmakeSession{
gathering: Gathering{
self_gid: 1,
owner_pid: active_secure_connection_data.pid,
host_pid: active_secure_connection_data.pid,
..session.gathering.clone()
},
datetime: KerberosDateTime::now(),
session_key: (0..32).map(|_| random()).collect(),
matchmake_param: MatchmakeParam{
params: vec![
("@SR".to_owned(), Variant::Bool(true)),
("@GIR".to_owned(), Variant::SInt64(3))
]
},
system_password_enabled: false,
..session
};
Self{
session: mm_session,
connected_players: Default::default()
}
}
pub async fn add_player(&mut self, socket: &SocketData, conn: Arc<Mutex<ConnectionData>>, join_msg: String) {
let locked = conn.lock().await;
let Some(joining_pid) = locked.active_connection_data.as_ref()
.map(|c|
c.active_secure_connection_data.as_ref()
.map(|c| c.pid)
).flatten() else {
error!("tried to add player without secure connection");
return
};
drop(locked);
self.connected_players.push(conn);
self.session.participation_count = self.connected_players.len() as u32;
for other_connection in &self.connected_players{
let mut conn = other_connection.lock().await;
let Some(other_pid) = conn.active_connection_data.as_ref()
.map(|c|
c.active_secure_connection_data.as_ref()
.map(|c| c.pid
)
).flatten() else {
error!("tried to send connection notification to player secure connection");
return
};
/*if other_pid == self.session.gathering.owner_pid &&
joining_pid == self.session.gathering.owner_pid{
continue;
}*/
conn.send_notification(socket, Notification{
pid_source: joining_pid,
notif_type: 3001,
param_1: self.session.gathering.self_gid,
param_2: other_pid,
str_param: join_msg.clone(),
param_3: self.session.participation_count
}).await;
}
}
}
pub async fn add_matchmake_session(mm_data: Arc<RwLock<MatchmakeData>>,session: ExtendedMatchmakeSession) -> Arc<Mutex<ExtendedMatchmakeSession>> {
let gid = session.session.gathering.self_gid;
let mut mm_data = mm_data.write().await;
let session = Arc::new(Mutex::new(session));
mm_data.matchmake_sessions.insert(gid, session.clone());
session
}
impl MatchmakeData {
pub async fn try_find_session_with_criteria(&self, ) -> Option<Arc<Mutex<ExtendedMatchmakeSession>>>{
None
}
}

View file

@ -1,98 +0,0 @@
use std::io::Cursor;
use std::sync::Arc;
use std::time::Duration;
use chrono::SecondsFormat::Millis;
use log::info;
use rand::random;
use tokio::sync::{Mutex, RwLock};
use tokio::time::sleep;
use crate::protocols::matchmake_common::{ExtendedMatchmakeSession, MatchmakeData};
use crate::rmc::message::RMCMessage;
use crate::rmc::response::{ErrorCode, RMCResponseResult};
use crate::rmc::structures::matchmake::{AutoMatchmakeParam, MatchmakeSession};
use crate::rmc::structures::RmcSerialize;
pub async fn auto_matchmake_with_param_postpone(
rmcmessage: &RMCMessage,
conn: &Arc<Mutex<ConnectionData>>,
socket: &Arc<SocketData>,
mm_data: Arc<RwLock<MatchmakeData>>,
auto_matchmake_param: AutoMatchmakeParam
) -> RMCResponseResult{
//println!("auto_matchmake_with_param_postpone: {:?}", auto_matchmake_param);
let locked_conn = conn.lock().await;
let Some(secure_conn) =
locked_conn.active_connection_data.as_ref().map(|a| a.active_secure_connection_data.as_ref()).flatten() else {
return rmcmessage.error_result_with_code(ErrorCode::Core_Exception);
};
let pid = secure_conn.pid;
drop(locked_conn);
let mm_data_read = mm_data.read().await;
//todo: there is a bit of a race condition here, i dont have any idea on how to fix it though...
let session = if let Some(session) = mm_data_read.try_find_session_with_criteria().await{
session
} else {
// drop it first so that we dont cause a deadlock, also drop it right here so we dont hold
// up anything else unnescesarily
drop(mm_data_read);
let session =
ExtendedMatchmakeSession::from_matchmake_session(auto_matchmake_param.matchmake_session, &conn).await;
let gid = session.session.gathering.self_gid;
let mut mm_data = mm_data.write().await;
let session = Arc::new(Mutex::new(session));
mm_data.matchmake_sessions.insert(gid, session.clone());
session
};
let mut locked_session = session.lock().await;
//todo: refactor so that this works
{
let session = session.clone();
let socket = socket.clone();
let connection = conn.clone();
let join_msg = auto_matchmake_param.join_message.clone();
tokio::spawn(async move{
sleep(Duration::from_millis(500)).await;
println!("adding player");
let mut session = session.lock().await;
session.add_player(&socket, connection, join_msg).await;
});
}
info!("new session: {:?}", locked_session);
let mut response = Vec::new();
locked_session.session.serialize(&mut response).expect("unable to serialize matchmake session");
rmcmessage.success_with_data(response)
}
pub async fn auto_matchmake_with_param_postpone_raw_params(
rmcmessage: &RMCMessage,
socket: &Arc<SocketData>,
connection_data: &Arc<Mutex<ConnectionData>>,
data: Arc<RwLock<MatchmakeData>>
) -> RMCResponseResult{
let mut reader = Cursor::new(&rmcmessage.rest_of_data);
let Ok(matchmake_param) = AutoMatchmakeParam::deserialize(&mut reader) else {
return rmcmessage.error_result_with_code(ErrorCode::Core_InvalidArgument);
};
auto_matchmake_with_param_postpone(rmcmessage, connection_data, socket, data, matchmake_param).await
}

View file

@ -1,87 +0,0 @@
use std::io::Cursor;
use std::sync::Arc;
use std::time::Duration;
use log::info;
use tokio::sync::{Mutex, RwLock};
use tokio::time::sleep;
use crate::protocols::matchmake_common::{add_matchmake_session, ExtendedMatchmakeSession, MatchmakeData};
use crate::protocols::matchmake_extension::method_auto_matchmake_with_param_postpone::auto_matchmake_with_param_postpone;
use crate::rmc::message::RMCMessage;
use crate::rmc::response::{ErrorCode, RMCResponseResult};
use crate::rmc::structures::matchmake::{AutoMatchmakeParam, CreateMatchmakeSessionParam};
use crate::rmc::structures::RmcSerialize;
pub async fn create_matchmake_session_with_param(
rmcmessage: &RMCMessage,
conn: &Arc<Mutex<ConnectionData>>,
socket: &Arc<SocketData>,
mm_data: Arc<RwLock<MatchmakeData>>,
create_matchmake_session: CreateMatchmakeSessionParam
) -> RMCResponseResult {
let mut session =
ExtendedMatchmakeSession::from_matchmake_session(create_matchmake_session.matchmake_session, &conn).await;
session.session.participation_count = create_matchmake_session.participation_count as u32;
let session = add_matchmake_session(mm_data, session).await;
let mut session = session.lock().await;
session.add_player(&socket, conn.clone(), create_matchmake_session.join_message).await;
let mut response = Vec::new();
session.session.serialize(&mut response).expect("unable to serialize session");
println!("{}", hex::encode(&response));
rmcmessage.success_with_data(response)
}
pub async fn create_matchmake_session_with_param_raw_params(
rmcmessage: &RMCMessage,
socket: &Arc<SocketData>,
connection_data: &Arc<Mutex<ConnectionData>>,
data: Arc<RwLock<MatchmakeData>>
) -> RMCResponseResult{
let mut reader = Cursor::new(&rmcmessage.rest_of_data);
let Ok(matchmake_param) = CreateMatchmakeSessionParam::deserialize(&mut reader) else {
return rmcmessage.error_result_with_code(ErrorCode::Core_InvalidArgument);
};
create_matchmake_session_with_param(rmcmessage, connection_data, socket, data, matchmake_param).await
}
#[cfg(test)]
mod test{
use std::io::Cursor;
use crate::prudp::packet::PRUDPPacket;
use crate::rmc::message::RMCMessage;
use crate::rmc::structures::matchmake::MatchmakeSession;
use crate::rmc::structures::RmcSerialize;
#[test]
fn test(){
let data = hex::decode("ead001030000a1af12001800050002010000000000000000000000000000000000").unwrap();
let packet = PRUDPPacket::new(&mut Cursor::new(data)).unwrap();
println!("{:?}", packet);
}
#[test]
fn test_2(){
let data = hex::decode("250000008e0100000001000000001700000051b39957b90b00000100000051b3995701000001000000").unwrap();
let msg = RMCMessage::new(&mut Cursor::new(data)).unwrap();
println!("{:?}", msg)
}
}

View file

@ -1,34 +0,0 @@
use std::io::Cursor;
use std::sync::Arc;
use log::info;
use tokio::sync::{Mutex, RwLock};
use crate::protocols::matchmake_common::MatchmakeData;
use crate::rmc::message::RMCMessage;
use crate::rmc::response::{ErrorCode, RMCResponseResult};
use crate::rmc::structures::RmcSerialize;
type PIDList = Vec<u32>;
async fn get_playing_session(rmcmessage: &RMCMessage, _data: Arc<RwLock<MatchmakeData>>) -> RMCResponseResult {
//todo: propperly implement this
let cheeseburger = PIDList::new();
let mut vec = Vec::new();
cheeseburger.serialize(&mut vec).expect("somehow unable to write cheeseburger");
rmcmessage.success_with_data(vec)
}
pub async fn get_playing_session_raw_params(rmcmessage: &RMCMessage, _: &Arc<SocketData>, _: &Arc<Mutex<ConnectionData>>, data: Arc<RwLock<MatchmakeData>>) -> RMCResponseResult{
let mut reader = Cursor::new(&rmcmessage.rest_of_data);
let Ok(list) = PIDList::deserialize(&mut reader) else {
return rmcmessage.error_result_with_code(ErrorCode::FPD_FriendNotExists);
};
info!("get_playing_session got called with {:?}", list);
get_playing_session(rmcmessage, data).await
}

View file

@ -1,19 +0,0 @@
mod method_get_playing_session;
mod method_auto_matchmake_with_param_postpone;
mod method_create_matchmake_session_with_param;
use std::sync::Arc;
use tokio::sync::{RwLock};
use crate::define_protocol;
use crate::protocols::matchmake_common::MatchmakeData;
use method_get_playing_session::get_playing_session_raw_params;
use method_auto_matchmake_with_param_postpone::auto_matchmake_with_param_postpone_raw_params;
use crate::protocols::matchmake_extension::method_create_matchmake_session_with_param::create_matchmake_session_with_param_raw_params;
define_protocol!{
109(matchmake_data: Arc<RwLock<MatchmakeData>>) => {
16 => get_playing_session_raw_params,
38 => create_matchmake_session_with_param_raw_params,
40 => auto_matchmake_with_param_postpone_raw_params
}
}

View file

@ -1,118 +0,0 @@
use std::env;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use log::warn;
use once_cell::sync::Lazy;
use tokio::sync::Mutex;
use crate::grpc;
use crate::rmc::message::RMCMessage;
use crate::rmc::response::{ErrorCode, RMCResponse};
pub mod auth;
pub mod server;
pub mod secure;
pub mod matchmake_extension;
pub mod matchmake_common;
pub mod matchmake;
pub mod notification;
pub mod nat_traversal;
static IS_MAINTENANCE: Lazy<bool> = Lazy::new(|| {
env::var("IS_MAINTENANCE")
.ok()
.map(|v| v.parse().expect("IS_MAINTENANCE should be a boolean value"))
.unwrap_or(false)
});
static BYPASS_LEVEL: Lazy<i32> = Lazy::new(|| {
env::var("MAINTENANCE_BYPASS_MINIMUM_ACCESS_LEVEL")
.ok()
.map(|v| v.parse().expect("IS_MAINTENANCE should be a boolean value"))
.unwrap_or(3)
});
pub fn block_if_maintenance<'a>(rmcmessage: &'a RMCMessage, _: &'a Arc<SocketData> , conn: &'a Arc<Mutex<ConnectionData>>) -> Pin<Box<(dyn Future<Output=Option<RMCResponse>> + Send + 'a)>> {
Box::pin(async move {
let conn = conn.lock().await;
if let Some(active_conn) = conn.active_connection_data.as_ref() {
if let Some(secure_conn) = active_conn.active_secure_connection_data.as_ref() {
if let Ok(mut client) = grpc::account::Client::new().await {
if let Ok(client_data) = client.get_user_data(secure_conn.pid).await{
if client_data.access_level >= *BYPASS_LEVEL{
return None;
}
}
}
}
}
warn!("login attempted whilest servers are in maintenance");
if *IS_MAINTENANCE {
Some(RMCResponse {
protocol_id: rmcmessage.protocol_id as u8,
response_result: rmcmessage.error_result_with_code(ErrorCode::RendezVous_GameServerMaintenance),
})
} else {
None
}
})
}
#[macro_export]
macro_rules! define_protocol {
($id:literal ($($varname:ident : $ty:ty),*) => {$($func_id:literal => $func:path),*} ) => {
#[allow(unused_parens)]
async fn protocol (rmcmessage: &crate::RMCMessage, socket: &::std::sync::Arc<crate::prudp::socket::SocketData>, connection: &::std::sync::Arc<::tokio::sync::Mutex<crate::protocols::ConnectionData>>, $($varname : $ty),*) -> Option<crate::rmc::response::RMCResponse>{
if rmcmessage.protocol_id != $id{
return None;
}
let self_data: ( $( $ty ),* ) = ($( $varname ),*);
let response_result = match rmcmessage.method_id{
$(
$func_id => $func ( rmcmessage, socket, connection, self_data).await,
)*
_ => {
log::error!("invalid method id sent to protocol {}: {:?}", $id, rmcmessage.method_id);
return Some(
crate::rmc::response::RMCResponse{
protocol_id: $id,
response_result: rmcmessage.error_result_with_code(crate::rmc::response::ErrorCode::Core_NotImplemented)
}
);
}
};
Some(crate::rmc::response::RMCResponse{
protocol_id: $id,
response_result
})
}
#[allow(unused_parens)]
pub fn bound_protocol($($varname : $ty,)*) -> Box<dyn for<'message_lifetime> Fn(&'message_lifetime crate::RMCMessage, &'message_lifetime ::std::sync::Arc<crate::prudp::socket::SocketData>, &'message_lifetime ::std::sync::Arc<::tokio::sync::Mutex<crate::protocols::ConnectionData>>)
-> ::std::pin::Pin<Box<dyn ::std::future::Future<Output = Option<crate::rmc::response::RMCResponse>> + Send + 'message_lifetime>> + Send + Sync>{
Box::new(
move |v, s, cd| {
Box::pin({
$(
let $varname = $varname.clone();
)*
async move {
$(
let $varname = $varname.clone();
)*
protocol(v, s, cd, $($varname,)*).await
}
})
}
)
}
};
}

View file

@ -1,29 +0,0 @@
use std::io::Cursor;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Mutex, RwLock};
use tokio::time::sleep;
use crate::protocols::matchmake_common::MatchmakeData;
use crate::rmc::message::RMCMessage;
use crate::rmc::response::{ErrorCode, RMCResponseResult};
use crate::rmc::structures::matchmake::CreateMatchmakeSessionParam;
pub async fn report_nat_properties(
rmcmessage: &RMCMessage,
socket: &Arc<SocketData>,
connection_data: &Arc<Mutex<ConnectionData>>,
) -> RMCResponseResult{
sleep(Duration::from_millis(50)).await;
rmcmessage.success_with_data(Vec::new())
}
pub async fn report_nat_properties_raw_params(
rmcmessage: &RMCMessage,
socket: &Arc<SocketData>,
connection_data: &Arc<Mutex<ConnectionData>>,
_: ()
) -> RMCResponseResult{
let mut reader = Cursor::new(&rmcmessage.rest_of_data);
report_nat_properties(rmcmessage, socket, connection_data).await
}

View file

@ -1,10 +0,0 @@
mod method_report_nat_properties;
use crate::define_protocol;
use crate::protocols::nat_traversal::method_report_nat_properties::report_nat_properties_raw_params;
define_protocol!{
3() => {
5 => report_nat_properties_raw_params
}
}

View file

@ -1,159 +0,0 @@
use macros::RmcSerialize;
use rand::random;
use crate::prudp::packet::{PRUDPHeader, PRUDPPacket, PacketOption, TypesFlags};
use crate::prudp::packet::flags::{NEED_ACK, RELIABLE};
use crate::prudp::packet::types::DATA;
use crate::rmc::message::RMCMessage;
use crate::rmc::structures::RmcSerialize;
#[derive(Debug, Eq, PartialEq, RmcSerialize)]
#[rmc_struct(0)]
pub struct Notification{
pub pid_source: u32,
pub notif_type: u32,
pub param_1: u32,
pub param_2: u32,
pub str_param: String,
pub param_3: u32,
}
impl ConnectionData{
pub async fn send_notification(&mut self, socket: &SocketData, notif: Notification){
println!("sending notification");
let mut data = Vec::new();
notif.serialize(&mut data).expect("unable to write");
let message = RMCMessage{
protocol_id: 14,
method_id: 1,
call_id: 1,
rest_of_data: data
};
println!("notif: {}", hex::encode(message.to_data()));
let mut prudp_packet = PRUDPPacket{
header: PRUDPHeader{
types_and_flags: TypesFlags::default().types(DATA).flags(NEED_ACK | RELIABLE),
source_port: socket.get_virual_port(),
destination_port: self.sock_addr.virtual_port,
..Default::default()
},
options: vec![
PacketOption::FragmentId(0),
],
payload: message.to_data(),
packet_signature: [0;16]
};
self.finish_and_send_packet_to(socket, prudp_packet).await;
}
}
#[cfg(test)]
mod test{
use std::io::Cursor;
use rand::random;
use crate::protocols::notification::Notification;
use crate::prudp::packet::{PRUDPHeader, PRUDPPacket, PacketOption, TypesFlags};
use crate::prudp::packet::flags::{NEED_ACK, RELIABLE};
use crate::prudp::packet::types::DATA;
use crate::rmc::message::RMCMessage;
use crate::rmc::structures::RmcSerialize;
#[test]
fn test(){
let data = hex::decode("ead001032900a1af62000000000000000000000000000000000000000000020100250000000e57238a6601000000001700000051b39957b90b00003661636851b3995701000001000000").unwrap();
let packet = PRUDPPacket::new(&mut Cursor::new(data)).expect("invalid packet");
println!("{:?}", packet);
let rmc = RMCMessage::new(&mut Cursor::new(packet.payload)).expect("invalid rmc message");
println!("{:?}", rmc);
let notif = Notification::deserialize(&mut Cursor::new(rmc.rest_of_data)).expect("invalid notification");
println!("{:?}", notif);
}
#[test]
fn test2(){
let data = hex::decode("250000000e57b6801001000000001700000051b39957b90b0000248a5a9851b3995701000001000000").unwrap();
//let packet = PRUDPPacket::new(&mut Cursor::new(data)).expect("invalid packet");
//println!("{:?}", packet);
let rmc = RMCMessage::new(&mut Cursor::new(data)).expect("invalid rmc message");
println!("{:?}", rmc);
let notif = Notification::deserialize(&mut Cursor::new(rmc.rest_of_data)).expect("invalid notification");
println!("{:?}", notif);
}
#[test]
fn test_rmc_serialization(){
let notif = Notification{
pid_source: random(),
notif_type: random(),
param_1: random(),
param_2: random(),
str_param: "".to_string(),
param_3: random(),
};
let mut notif_data = Vec::new();
notif.serialize(&mut notif_data).unwrap();
let message = RMCMessage{
protocol_id: 14,
method_id: 1,
call_id: random(),
rest_of_data: notif_data
};
let mut prudp_packet = PRUDPPacket{
header: PRUDPHeader{
..Default::default()
},
options: vec![
PacketOption::FragmentId(0),
],
payload: message.to_data(),
packet_signature: [0;16]
};
prudp_packet.set_sizes();
let mut packet_data: Vec<u8> = Vec::new();
prudp_packet.write_to(&mut packet_data).expect("what");
let packet_deserialized = PRUDPPacket::new(&mut Cursor::new(packet_data)).unwrap();
assert_eq!(prudp_packet, packet_deserialized);
let message_deserialized = RMCMessage::new(&mut Cursor::new(packet_deserialized.payload)).unwrap();
assert_eq!(message, message_deserialized);
let notification_deserialized = Notification::deserialize(&mut Cursor::new(message_deserialized.rest_of_data)).unwrap();
assert_eq!(notification_deserialized, notif);
}
}

View file

@ -1,63 +0,0 @@
use std::io::{Cursor, Write};
use std::sync::Arc;
use bytemuck::bytes_of;
use tokio::sync::Mutex;
use crate::prudp::station_url::{nat_types, StationUrl};
use crate::prudp::station_url::Type::PRUDPS;
use crate::prudp::station_url::UrlOptions::{Address, NatFiltering, NatMapping, NatType, Port, PrincipalID, RVConnectionID};
use crate::rmc::message::RMCMessage;
use crate::rmc::response::{ErrorCode, RMCResponseResult};
use crate::rmc::structures::qresult::QResult;
use crate::rmc::structures::RmcSerialize;
type StringList = Vec<String>;
pub async fn register(rmcmessage: &RMCMessage, _station_urls: Vec<StationUrl>, conn_data: &Arc<Mutex<ConnectionData>>) -> RMCResponseResult{
let locked = conn_data.lock().await;
let Some(active_connection_data) = locked.active_connection_data.as_ref() else {
return rmcmessage.error_result_with_code(ErrorCode::RendezVous_NotAuthenticated)
};
let Some(active_secure_connection_data) = active_connection_data.active_secure_connection_data.as_ref() else {
return rmcmessage.error_result_with_code(ErrorCode::RendezVous_NotAuthenticated)
};
let public_station = StationUrl{
url_type: PRUDPS,
options: vec![
RVConnectionID(active_connection_data.connection_id),
Address(*locked.sock_addr.regular_socket_addr.ip()),
Port(locked.sock_addr.regular_socket_addr.port()),
NatFiltering(0),
NatMapping(0),
NatType(nat_types::BEHIND_NAT),
PrincipalID(active_secure_connection_data.pid),
]
};
let result = QResult::success(ErrorCode::Core_Unknown);
let mut response = Vec::new();
result.serialize(&mut response).expect("unable to serialize result");
response.write_all(bytes_of(&active_connection_data.connection_id)).expect("unable to serialize connection id");
public_station.to_string().serialize(&mut response).expect("unable to serialize station id");
rmcmessage.success_with_data(response)
}
pub async fn register_raw_params(rmcmessage: &RMCMessage, _: &Arc<SocketData>, conn_data: &Arc<Mutex<ConnectionData>>, _: ()) -> RMCResponseResult{
let mut reader = Cursor::new(&rmcmessage.rest_of_data);
let Ok(station_urls) = StringList::deserialize(&mut reader) else {
return rmcmessage.error_result_with_code(ErrorCode::Core_InvalidArgument);
};
let Ok(station_urls): Result<Vec<StationUrl>, _> = station_urls.iter().map(|c| StationUrl::try_from((&c) as &str)).collect() else {
return rmcmessage.error_result_with_code(ErrorCode::Core_InvalidArgument);
};
register(rmcmessage, station_urls, conn_data).await
}

View file

@ -1,35 +0,0 @@
use std::io::Cursor;
use std::sync::Arc;
use log::error;
use tokio::sync::Mutex;
use crate::endianness::{IS_BIG_ENDIAN, ReadExtensions};
use crate::rmc::message::RMCMessage;
use crate::rmc::response::{RMCResponseResult};
use crate::rmc::response::ErrorCode::Core_InvalidArgument;
use crate::rmc::structures::{qbuffer, RmcSerialize};
use crate::rmc::structures::qbuffer::QBuffer;
pub async fn send_report(rmcmessage: &RMCMessage, report_id: u32, data: Vec<u8>) -> RMCResponseResult{
let result = tokio::fs::write(format!("./reports/{}", report_id), data).await;
match result{
Ok(_) => {},
Err(e) => error!("{}", e)
}
rmcmessage.success_with_data(Vec::new())
}
pub async fn send_report_raw_params(rmcmessage: &RMCMessage, _: &Arc<SocketData>, _conn_data: &Arc<Mutex<ConnectionData>>, _: ()) -> RMCResponseResult{
let mut reader = Cursor::new(&rmcmessage.rest_of_data);
let Ok(error_id) = reader.read_struct(IS_BIG_ENDIAN) else {
return rmcmessage.error_result_with_code(Core_InvalidArgument);
};
let Ok(QBuffer(data)) = QBuffer::deserialize(&mut reader) else {
return rmcmessage.error_result_with_code(Core_InvalidArgument);
};
send_report(rmcmessage, error_id, data).await
}

View file

@ -1,13 +0,0 @@
mod method_register;
mod method_send_report;
use crate::define_protocol;
use crate::protocols::secure::method_register::register_raw_params;
use crate::protocols::secure::method_send_report::send_report_raw_params;
define_protocol!{
11() => {
0x01 => register_raw_params,
0x08 => send_report_raw_params
}
}

View file

@ -1,59 +0,0 @@
use std::future::Future;
use std::io::Cursor;
use std::pin::Pin;
use std::sync::Arc;
use log::error;
use tokio::sync::Mutex;
use crate::prudp::packet::PRUDPPacket;
use crate::rmc::message::RMCMessage;
use crate::rmc::response::{RMCResponse, RMCResponseResult, send_response};
use crate::rmc::response::ErrorCode::Core_NotImplemented;
use crate::web::DirectionalData::Incoming;
use crate::web::WEB_DATA;
type ContainedProtocolList = Box<[Box<dyn for<'a> Fn(&'a RMCMessage, &'a Arc<SocketData>, &'a Arc<Mutex<ConnectionData>>) -> Pin<Box<dyn Future<Output = Option<RMCResponse>> + Send + 'a>> + Send + Sync>]>;
pub struct RMCProtocolServer(ContainedProtocolList);
impl RMCProtocolServer{
pub fn new(protocols: ContainedProtocolList) -> Arc<Self>{
Arc::new(Self(protocols))
}
pub async fn process_message(&self, packet: PRUDPPacket, socket: Arc<SocketData>, connection: Arc<Mutex<ConnectionData>>){
let locked = connection.lock().await;
let addr = locked.sock_addr.regular_socket_addr;
drop(locked);
let mut web = WEB_DATA.lock().await;
web.data.push((addr, Incoming(hex::encode(&packet.payload))));
drop(web);
let Ok(rmc) = RMCMessage::new(&mut Cursor::new(&packet.payload)) else {
error!("error reading rmc message");
return;
};
for proto in &self.0 {
if let Some(response) = proto(&rmc, &socket, &connection).await {
if matches!(response.response_result, RMCResponseResult::Error {..}){
error!("an rmc error occurred")
}
let mut locked = connection.lock().await;
send_response(&packet, &socket, &mut locked, response).await;
drop(locked);
return;
}
}
error!("tried to send message to unimplemented protocol {} with method id {}", rmc.protocol_id, rmc.method_id);
let mut locked = connection.lock().await;
send_response(&packet, &socket, &mut locked, RMCResponse{
protocol_id: rmc.protocol_id as u8,
response_result: RMCResponseResult::Error {
call_id: rmc.call_id,
error_code: Core_NotImplemented
}
}).await;
}
}

View file

@ -97,7 +97,7 @@ impl Auth for AuthHandler {
source_login_data.0,
ticket.into(),
connection_data,
self.build_name.to_owned(),
format!("{}; Rust NEX Version {} by DJMrTV", self.build_name, env!("CARGO_PKG_VERSION")),
))
}

View file

@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::{Arc, Weak};
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering::{Relaxed, Release};
@ -7,7 +8,11 @@ use tokio::sync::{Mutex, RwLock};
use crate::kerberos::KerberosDateTime;
use crate::nex::user::User;
use crate::rmc::protocols::notifications::{NotificationEvent, RemoteNotification};
use crate::rmc::structures::matchmake::{Gathering, MatchmakeParam, MatchmakeSession};
use crate::rmc::protocols::notifications::notification_types::{HOST_CHANGED, OWNERSHIP_CHANGED};
use crate::rmc::response::ErrorCode;
use crate::rmc::response::ErrorCode::{Core_InvalidArgument, RendezVous_SessionVoid};
use crate::rmc::structures::matchmake::{Gathering, MatchmakeParam, MatchmakeSession, MatchmakeSessionSearchCriteria};
use crate::rmc::structures::matchmake::gathering_flags::PERSISTENT_GATHERING;
use crate::rmc::structures::variant::Variant;
pub struct MatchmakeManager{
@ -25,6 +30,19 @@ impl MatchmakeManager{
pub fn next_cid(&self) -> u32{
self.rv_cid_counter.fetch_add(1, Relaxed)
}
pub async fn get_session(&self, gid: u32) -> Result<Arc<Mutex<ExtendedMatchmakeSession>>, ErrorCode>{
let sessions = self.sessions.read().await;
let Some(session) = sessions.get(&gid) else {
return Err(RendezVous_SessionVoid);
};
let session = session.clone();
drop(sessions);
Ok(session)
}
}
@ -34,7 +52,34 @@ pub struct ExtendedMatchmakeSession{
pub connected_players: Vec<Weak<User>>,
}
fn read_bounds_string<T: FromStr>(str: &str) -> Option<(T,T)>{
let bounds = str.split_once(",")?;
Some((T::from_str(bounds.0).ok()?, T::from_str(bounds.1).ok()?))
}
fn check_bounds_str<T: FromStr + PartialOrd>(compare: T, str: &str) -> Option<bool>{
let bounds: (T, T) = read_bounds_string(str)?;
Some(bounds.0 <= compare && compare <= bounds.1)
}
pub async fn broadcast_notification<T: AsRef<User>>(players: &[T], notification_event: &NotificationEvent){
for player in players{
let player = player.as_ref();
player.remote.process_notification_event(notification_event.clone()).await;
}
}
impl ExtendedMatchmakeSession{
pub fn get_active_players(&self) -> Vec<Arc<User>>{
self.connected_players.iter().filter_map(|u| u.upgrade()).collect()
}
pub async fn broadcast_notification(&self, notification_event: &NotificationEvent){
broadcast_notification(&self.get_active_players(), notification_event).await;
}
pub async fn from_matchmake_session(gid: u32, session: MatchmakeSession, host: &Weak<User>) -> Self{
let Some(host) = host.upgrade() else{
return Default::default();
@ -66,18 +111,40 @@ impl ExtendedMatchmakeSession{
}
}
pub async fn add_player(&mut self, conn: Weak<User>, join_msg: String) {
let Some(arc_conn) = conn.upgrade() else {
pub async fn add_players(&mut self, conns: &[Weak<User>], join_msg: String) {
let Some(initiating_user) = conns[0].upgrade() else {
return
};
let joining_pid = arc_conn.pid;
let initiating_pid = initiating_user.pid;
let old_particip = self.connected_players.clone();
self.connected_players.push(conn);
for conn in conns {
self.connected_players.push(conn.clone());
}
self.session.participation_count = self.connected_players.len() as u32;
for other_connection in &self.connected_players[1..]{
let Some(other_conn) = other_connection.upgrade() else {
continue;
};
let other_pid = other_conn.pid;
/*if other_pid == self.session.gathering.owner_pid &&
joining_pid == self.session.gathering.owner_pid{
continue;
}*/
other_conn.remote.process_notification_event(NotificationEvent{
pid_source: initiating_pid,
notif_type: 122000,
param_1: self.session.gathering.self_gid,
param_2: other_pid,
str_param: "".into(),
param_3: 0
}).await;
}
for other_connection in &self.connected_players{
let Some(other_conn) = other_connection.upgrade() else {
@ -92,7 +159,7 @@ impl ExtendedMatchmakeSession{
}*/
other_conn.remote.process_notification_event(NotificationEvent{
pid_source: joining_pid,
pid_source: initiating_pid,
notif_type: 3001,
param_1: self.session.gathering.self_gid,
param_2: other_pid,
@ -109,8 +176,8 @@ impl ExtendedMatchmakeSession{
let older_pid = old_conns.pid;
arc_conn.remote.process_notification_event(NotificationEvent{
pid_source: joining_pid,
initiating_user.remote.process_notification_event(NotificationEvent{
pid_source: initiating_pid,
notif_type: 3001,
param_1: self.session.gathering.self_gid,
param_2: older_pid,
@ -119,4 +186,150 @@ impl ExtendedMatchmakeSession{
}).await;
}
}
#[inline]
pub fn is_reachable(&self) -> bool{
if self.session.gathering.flags & PERSISTENT_GATHERING != 0{
if !self.connected_players.is_empty(){
true
} else {
self.session.open_participation
}
} else {
!self.connected_players.is_empty()
}
}
#[inline]
pub fn is_joinable(&self) -> bool{
self.is_reachable() && self.session.open_participation
}
pub fn matches_criteria(&self, search_criteria: &MatchmakeSessionSearchCriteria) -> Result<bool, ErrorCode>{
// todo: implement the rest of the search criteria
if search_criteria.vacant_only {
if (self.connected_players.len() as u16 + search_criteria.vacant_participants) > self.session.gathering.maximum_participants{
return Ok(false);
}
}
if search_criteria.exclude_locked{
if !self.session.open_participation{
return Ok(false);
}
}
if search_criteria.exclude_system_password_set{
if self.session.system_password_enabled{
return Ok(false);
}
}
if search_criteria.exclude_user_password_set{
if self.session.user_password_enabled{
return Ok(false);
}
}
if !check_bounds_str(self.session.gathering.minimum_participants, &search_criteria.minimum_participants).ok_or(Core_InvalidArgument)? {
return Ok(false);
}
if !check_bounds_str(self.session.gathering.maximum_participants, &search_criteria.maximum_participants).ok_or(Core_InvalidArgument)? {
return Ok(false);
}
let game_mode: u32 = search_criteria.game_mode.parse().map_err(|_| Core_InvalidArgument)?;
if self.session.gamemode != game_mode{
return Ok(false);
}
let mm_sys_type: u32 = search_criteria.matchmake_system_type.parse().map_err(|_| Core_InvalidArgument)?;
if self.session.matchmake_system_type != mm_sys_type{
return Ok(false);
}
;
if search_criteria.attribs.get(0).map(|str| str.parse().ok()).flatten() != self.session.attributes.get(0).map(|v| *v){
return Ok(false);
}
if search_criteria.attribs.get(2).map(|str| str.parse().ok()).flatten() != self.session.attributes.get(2).map(|v| *v){
return Ok(false);
}
if search_criteria.attribs.get(3).map(|str| str.parse().ok()).flatten() != self.session.attributes.get(3).map(|v| *v){
return Ok(false);
}
Ok(true)
}
pub async fn migrate_ownership(&mut self, initiator_pid: u32) -> Result<(), ErrorCode>{
let players: Vec<_> = self.connected_players.iter().filter_map(|p| p.upgrade()).collect();
let Some(new_owner) = players.iter().find(|p| p.pid != self.session.gathering.owner_pid) else {
self.session.gathering.owner_pid = 0;
return Ok(());
};
self.session.gathering.owner_pid = new_owner.pid;
self.broadcast_notification(&NotificationEvent{
pid_source: initiator_pid,
notif_type: OWNERSHIP_CHANGED,
param_1: self.session.gathering.self_gid,
param_2: new_owner.pid,
..Default::default()
}).await;
Ok(())
}
pub async fn migrate_host(&mut self, initiator_pid: u32) -> Result<(), ErrorCode>{
let players: Vec<_> = self.connected_players.iter().filter_map(|p| p.upgrade()).collect();
self.session.gathering.host_pid = self.session.gathering.owner_pid;
self.broadcast_notification(&NotificationEvent{
pid_source: initiator_pid,
notif_type: HOST_CHANGED,
param_1: self.session.gathering.self_gid,
..Default::default()
}).await;
Ok(())
}
pub async fn remove_player_from_session(&mut self, pid: u32, message: &str) -> Result<(), ErrorCode>{
self.connected_players.retain(|u| u.upgrade().is_some_and(|u| u.pid != pid));
self.session.participation_count = (self.connected_players.len() & u32::MAX as usize) as u32;
if pid == self.session.gathering.owner_pid {
self.migrate_ownership(pid).await?;
}
if pid == self.session.gathering.host_pid {
self.migrate_host(pid).await?;
}
// todo: support DisconnectChangeOwner
// todo: finish the rest of this
for player in self.connected_players.iter().filter_map(|p| p.upgrade()){
player.remote.process_notification_event(NotificationEvent{
notif_type: 3008,
pid_source: pid,
param_1: self.session.gathering.self_gid,
param_2: pid,
str_param: message.to_owned(),
.. Default::default()
}).await;
}
Ok(())
}
}

View file

@ -19,8 +19,10 @@ use crate::rmc::protocols::nat_traversal::{
NatTraversal, RawNatTraversal, RawNatTraversalInfo, RemoteNatTraversal,
};
use crate::rmc::protocols::secure::{RawSecure, RawSecureInfo, RemoteSecure, Secure};
use crate::rmc::protocols::matchmake_ext::{MatchmakeExt, RawMatchmakeExt, RawMatchmakeExtInfo, RemoteMatchmakeExt};
use crate::rmc::response::ErrorCode;
use crate::rmc::structures::matchmake::{AutoMatchmakeParam, CreateMatchmakeSessionParam, JoinMatchmakeSessionParam, MatchmakeSession};
use crate::rmc::structures::qresult::QResult;
use macros::rmc_struct;
use std::net::{Ipv4Addr, SocketAddrV4};
@ -29,12 +31,14 @@ use log::{error, info};
use rocket::http::ext::IntoCollection;
use tokio::sync::{Mutex, RwLock};
use crate::prudp::station_url::nat_types::PUBLIC;
use crate::rmc::protocols::notifications::{NotificationEvent, RemoteNotification};
use crate::rmc::response::ErrorCode::{Core_Exception, Core_InvalidArgument, RendezVous_AccountExpired, RendezVous_SessionVoid};
define_rmc_proto!(
proto UserProtocol{
Secure,
MatchmakeExtension,
MatchmakeExt,
Matchmake,
NatTraversal
}
@ -140,10 +144,12 @@ impl Secure for User {
let mut lock = self.station_url.write().await;
*lock = vec![
public_station.clone(),
private_station
//private_station.clone()
];
drop(lock);
let result = QResult::success(ErrorCode::Core_Unknown);
@ -181,19 +187,22 @@ impl Secure for User {
}
impl MatchmakeExtension for User {
async fn close_participation(&self, gid: u32) -> Result<(), ErrorCode> {
let session = self.matchmake_manager.get_session(gid).await?;
let mut session = session.lock().await;
session.session.open_participation = false;
Ok(())
}
async fn get_playing_session(&self, pids: Vec<u32>) -> Result<Vec<()>, ErrorCode> {
Ok(Vec::new())
}
async fn update_progress_score(&self, gid: u32, progress: u8) -> Result<(), ErrorCode> {
let mut sessions = self.matchmake_manager.sessions.read().await;
let Some(session) = sessions.get(&gid) else {
return Err(RendezVous_SessionVoid);
};
let session = session.clone();
drop(sessions);
let session = self.matchmake_manager.get_session(gid).await?;
let mut session = session.lock().await;
@ -204,22 +213,34 @@ impl MatchmakeExtension for User {
async fn create_matchmake_session_with_param(
&self,
session: CreateMatchmakeSessionParam,
create_session_param: CreateMatchmakeSessionParam,
) -> Result<MatchmakeSession, ErrorCode> {
println!("{:?}", session);
println!("{:?}", create_session_param);
let gid = self.matchmake_manager.next_gid();
let mut new_session = ExtendedMatchmakeSession::from_matchmake_session(
gid,
session.matchmake_session,
create_session_param.matchmake_session,
&self.this.clone(),
)
.await;
new_session.session.participation_count = session.participation_count as u32;
let mut joining_players = vec![self.this.clone()];
let users = self.matchmake_manager.users.read().await;
for pid in create_session_param.additional_participants{
if let Some(user) = users.get(&pid){
joining_players.push(user.clone());
}
}
drop(users);
new_session.session.participation_count = create_session_param.participation_count as u32;
new_session
.add_player(self.this.clone(), session.join_message)
.add_players(&joining_players, create_session_param.join_message)
.await;
let session = new_session.session.clone();
@ -235,21 +256,26 @@ impl MatchmakeExtension for User {
&self,
join_session_param: JoinMatchmakeSessionParam,
) -> Result<MatchmakeSession, ErrorCode> {
let mut sessions = self.matchmake_manager.sessions.read().await;
let Some(session) = sessions.get(&join_session_param.gid) else {
return Err(ErrorCode::RendezVous_SessionVoid);
};
let session = session.clone();
drop(sessions);
let session = self.matchmake_manager.get_session(join_session_param.gid).await?;
let mut session = session.lock().await;
session.connected_players.retain(|v| v.upgrade().is_some_and(|v| v.pid != self.pid));
let mut joining_players = vec![self.this.clone()];
let users = self.matchmake_manager.users.read().await;
for pid in join_session_param.additional_participants{
if let Some(user) = users.get(&pid){
joining_players.push(user.clone());
}
}
drop(users);
session
.add_player(self.this.clone(), join_session_param.join_message)
.add_players(&joining_players, join_session_param.join_message)
.await;
let mm_session = session.session.clone();
@ -257,8 +283,51 @@ impl MatchmakeExtension for User {
Ok(mm_session)
}
async fn auto_matchmake_with_param_postpone(&self, session: AutoMatchmakeParam) -> Result<MatchmakeSession, ErrorCode> {
println!("{:?}", session.search_criteria);
async fn auto_matchmake_with_param_postpone(&self, param: AutoMatchmakeParam) -> Result<MatchmakeSession, ErrorCode> {
println!("{:?}", param);
let mut joining_players = vec![self.this.clone()];
let users = self.matchmake_manager.users.read().await;
for pid in &param.additional_participants{
if let Some(user) = users.get(pid){
joining_players.push(user.clone());
}
}
drop(users);
let sessions = self.matchmake_manager.sessions.read().await;
for session in sessions.values(){
let mut session = session.lock().await;
println!("checking session!");
if !session.is_joinable(){
continue;
}
let mut bool_matched_criteria = false;
for criteria in &param.search_criteria{
if session.matches_criteria(criteria)?{
bool_matched_criteria = true;
}
}
if bool_matched_criteria {
session.add_players(&joining_players, param.join_message).await;
return Ok(session.session.clone());
}
}
drop(sessions);
println!("making new session!");
let AutoMatchmakeParam{
join_message,
@ -267,7 +336,7 @@ impl MatchmakeExtension for User {
matchmake_session,
additional_participants,
..
} = session;
} = param;
self.create_matchmake_session_with_param(CreateMatchmakeSessionParam{
join_message,
@ -278,6 +347,13 @@ impl MatchmakeExtension for User {
additional_participants
}).await
}
async fn find_matchmake_session_by_gathering_id_detail(&self, gid: u32) -> Result<MatchmakeSession, ErrorCode> {
let session = self.matchmake_manager.get_session(gid).await?;
let session = session.lock().await;
Ok(session.session.clone())
}
}
impl Matchmake for User {
@ -285,15 +361,7 @@ impl Matchmake for User {
Ok(true)
}
async fn get_session_urls(&self, gid: u32) -> Result<Vec<StationUrl>, ErrorCode> {
let sessions = self.matchmake_manager.sessions.read().await;
let Some(session) = sessions.get(&gid) else {
return Err(ErrorCode::RendezVous_SessionVoid);
};
let session = session.clone();
drop(sessions);
let session = self.matchmake_manager.get_session(gid).await?;
let session = session.lock().await;
@ -315,6 +383,61 @@ impl Matchmake for User {
Ok(urls)
}
async fn update_session_host(&self, gid: u32, change_session_owner: bool) -> Result<(), ErrorCode> {
let session = self.matchmake_manager.get_session(gid).await?;
let mut session = session.lock().await;
session.session.gathering.host_pid = self.pid;
for player in &session.connected_players{
let Some(player) = player.upgrade() else {
continue;
};
player.remote.process_notification_event(NotificationEvent{
notif_type: 3008,
pid_source: self.pid,
param_1: gid,
param_2: self.pid,
param_3: 0,
str_param: "".to_string(),
}).await;
}
if change_session_owner{
session.session.gathering.owner_pid = self.pid;
for player in &session.connected_players{
let Some(player) = player.upgrade() else {
continue;
};
player.remote.process_notification_event(NotificationEvent{
notif_type: 4000,
pid_source: self.pid,
param_1: gid,
param_2: self.pid,
param_3: 0,
str_param: "".to_string(),
}).await;
}
}
Ok(())
}
}
impl MatchmakeExt for User {
async fn end_participation(&self, gid: u32, message: String) -> Result<bool, ErrorCode> {
let session = self.matchmake_manager.get_session(gid).await?;
let mut session = session.lock().await;
session.remove_player_from_session(self.pid, &message).await?;
Ok(true)
}
}
impl NatTraversal for User {
@ -340,6 +463,10 @@ impl NatTraversal for User {
Ok(())
}
async fn report_nat_traversal_result(&self, cid: u32, result: bool, rtt: u32) -> Result<(), ErrorCode> {
Ok(())
}
async fn request_probe_initiation(&self, station_to_probe: String) -> Result<(), ErrorCode> {
info!("NO!");
Err(RendezVous_AccountExpired)

View file

@ -340,8 +340,7 @@ impl PRUDPPacket {
options.push(PacketOption::from(option_id, &option_data)?);
}
trace!("reading payload");
let mut payload = vec![0u8; header.payload_size as usize];
reader.read_exact(&mut payload)?;

View file

@ -47,8 +47,6 @@ impl Router {
},
};
trace!("got valid prudp packet from someone({}): \n{:?}", addr, packet);
let connection = packet.source_sockaddr(addr);
@ -63,8 +61,7 @@ impl Router {
// Dont keep the locked structure for too long
drop(endpoints);
trace!("sending packet to endpoint");
tokio::spawn(async move {
endpoint.recieve_packet(connection, packet).await
@ -95,7 +92,7 @@ impl Router {
}
pub async fn new(addr: SocketAddrV4) -> io::Result<(Arc<Self>, JoinHandle<()>)>{
trace!("starting router on {}", addr);
// trace!("starting router on {}", addr);
let socket = Arc::new(UdpSocket::bind(addr).await?);

View file

@ -8,4 +8,7 @@ pub trait Matchmake{
async fn unregister_gathering(&self, gid: u32) -> Result<bool, ErrorCode>;
#[method_id(41)]
async fn get_session_urls(&self, gid: u32) -> Result<Vec<StationUrl>, ErrorCode>;
#[method_id(42)]
async fn update_session_host(&self, gid: u32, change_owner: bool) -> Result<(), ErrorCode>;
}

View file

@ -0,0 +1,9 @@
use macros::{method_id, rmc_proto};
use crate::prudp::station_url::StationUrl;
use crate::rmc::response::ErrorCode;
#[rmc_proto(50)]
pub trait MatchmakeExt{
#[method_id(1)]
async fn end_participation(&self, gid: u32, message: String) -> Result<bool, ErrorCode>;
}

View file

@ -4,6 +4,9 @@ use crate::rmc::structures::matchmake::{AutoMatchmakeParam, CreateMatchmakeSessi
#[rmc_proto(109)]
pub trait MatchmakeExtension{
#[method_id(1)]
async fn close_participation(&self, gid: u32) -> Result<(), ErrorCode>;
#[method_id(16)]
async fn get_playing_session(&self, pids: Vec<u32>) -> Result<Vec<()>, ErrorCode>;
@ -17,4 +20,7 @@ pub trait MatchmakeExtension{
#[method_id(40)]
async fn auto_matchmake_with_param_postpone(&self, session: AutoMatchmakeParam) -> Result<MatchmakeSession, ErrorCode>;
#[method_id(41)]
async fn find_matchmake_session_by_gathering_id_detail(&self, gid: u32) -> Result<MatchmakeSession, ErrorCode>;
}

View file

@ -6,6 +6,7 @@ pub mod notifications;
pub mod matchmake;
pub mod matchmake_extension;
pub mod nat_traversal;
pub mod matchmake_ext;
use crate::prudp::socket::{ExternalConnection, SendingConnection};
use crate::rmc::message::RMCMessage;

View file

@ -10,6 +10,15 @@ pub trait NatTraversal{
#[method_id(3)]
async fn request_probe_initialization_ext(&self, target_list: Vec<String>, station_to_probe: String) -> Result<(),ErrorCode>;
#[method_id(4)]
async fn report_nat_traversal_result(&self, cid: u32, result: bool, rtt: u32) -> Result<(),ErrorCode>;
#[method_id(5)]
async fn report_nat_properties(&self, nat_mapping: u32, nat_filtering: u32, rtt: u32) -> Result<(),ErrorCode>;
}
}
/*
#[rmc_proto(3, NoReturn)]
pub trait NatTraversalConsole{
#[method_id(2)]
async fn request_probe_initiation(&self, station_to_probe: String) -> Result<(),ErrorCode>;
}*/

View file

@ -2,7 +2,12 @@ use macros::{method_id, rmc_proto, rmc_struct, RmcSerialize};
use crate::rmc::response::ErrorCode;
use crate::rmc::structures::qresult::QResult;
#[derive(RmcSerialize, Debug)]
pub mod notification_types{
pub const OWNERSHIP_CHANGED: u32 = 4000;
pub const HOST_CHANGED: u32 = 110000;
}
#[derive(RmcSerialize, Debug, Default, Clone)]
#[rmc_struct(0)]
pub struct NotificationEvent{
pub pid_source: u32,

View file

@ -1,6 +1,6 @@
use macros::RmcSerialize;
use crate::kerberos::KerberosDateTime;
use crate::rmc::structures::variant::Variant;
use macros::RmcSerialize;
// rmc structure
#[derive(RmcSerialize, Debug, Clone, Default)]
@ -25,7 +25,6 @@ pub struct MatchmakeParam {
pub params: Vec<(String, Variant)>,
}
// rmc structure
#[derive(RmcSerialize, Debug, Clone, Default)]
#[rmc_struct(3)]
@ -97,7 +96,7 @@ pub struct CreateMatchmakeSessionParam {
#[derive(RmcSerialize, Debug, Clone)]
#[rmc_struct(0)]
pub struct MatchmakeBlockListParam {
option_flag: u32
option_flag: u32,
}
#[derive(RmcSerialize, Debug, Clone)]
@ -114,4 +113,14 @@ pub struct JoinMatchmakeSessionParam {
pub participation_count: u16,
//pub extra_participant: u16,
//pub block_list_param: MatchmakeBlockListParam
}
}
pub mod gathering_flags {
pub const PERSISTENT_GATHERING: u32 = 0x1;
pub const DISCONNECT_CHANGE_OWNER: u32 = 0x10;
pub const PERSISTENT_GATHERING_LEAVE_PARTICIPATION: u32 = 0x40;
pub const PERSISTENT_GATHERING_ALLOW_ZERO_USERS: u32 = 0x80;
pub const PARTICIPANTS_CHANGE_OWNER: u32 = 0x200;
pub const VERBOSE_PARTICIPANTS: u32 = 0x400;
pub const VERBOSE_PARTICIPANTS_EX: u32 = 0x800;
}