use crate::define_rmc_proto; use macros::rmc_struct; use rnex_core::prudp::socket_addr::PRUDPSockAddr; use std::sync::{Weak}; use chrono::Utc; use sqlx::types::time; use sqlx::types::time::PrimitiveDateTime; use rnex_core::PID; use rnex_core::nex::remote_console::RemoteConsole; use rnex_core::nex::s3presigner::S3Presigner; use rnex_core::rmc::response::ErrorCode; use rnex_core::rmc::protocols::secure::{Secure, RawSecure, RawSecureInfo, RemoteSecure}; use rnex_core::rmc::protocols::datastore::{BufferQueueParam, CompletePostParam, DataStoreCustomRankingResult, DataStoreGetCustomRankingByDataIDParam, GetMetaInfo, GetMetaParam, KeyValue, Permission, PersistenceTarget, RateCustomRankingParam, RatingInfo, RatingInfoWithSlot}; use rnex_core::rmc::protocols::datastore::{DataStore, RawDataStore, RawDataStoreInfo, RemoteDataStore, PreparePostParam, ReqPostInfo}; use crate::nex::user::User; use rnex_core::executables::common::{RNEX_DATASTORE_S3_BUCKET, RNEX_DATASTORE_S3_ENDPOINT, get_db}; use rnex_core::rmc::structures::qbuffer::QBuffer; use sqlx::types::chrono::DateTime; use rnex_core::kerberos::KerberosDateTime; use rnex_core::rmc::structures::qresult::QResult; fn map_row_to_meta_info( row_data_id: i64, row_owner: i32, row_size: i32, row_name: String, row_data_type: i16, row_meta_binary: Vec, row_permission: i16, row_permission_recipients: Vec, row_delete_permission: i16, row_delete_permission_recipients: Vec, row_period: i16, row_refer_data_id: i64, row_flag: i32, row_tags: Vec, row_creation_date: chrono::NaiveDateTime, row_update_date: chrono::NaiveDateTime, ratings: Vec ) -> GetMetaInfo { GetMetaInfo { dataid: row_data_id as u64, owner: row_owner as u32, size: row_size as u32, name: row_name, data_type: row_data_type as u16, meta_binary: QBuffer(row_meta_binary), permission: Permission { permission: row_permission as u8, recipient_ids: row_permission_recipients.into_iter().map(|id| id as u32).collect(), }, del_permission: Permission { permission: row_delete_permission as u8, recipient_ids: row_delete_permission_recipients.into_iter().map(|id| id as u32).collect(), }, period: row_period as u16, status: 0, referred_count: 0, refer_dat_id: row_refer_data_id as u32, flag: row_flag as u32, tags: row_tags, expire_time: KerberosDateTime::from_u64(0x9C3F3E0000), created_time: KerberosDateTime::from_naive(row_creation_date), updated_time: KerberosDateTime::from_naive(row_update_date), referred_time: KerberosDateTime::from_naive(row_creation_date), ratings, } } async fn check_object_availability(data_id: u64, password: u64) -> Result<(), ErrorCode> { let row = sqlx::query!( r#" SELECT under_review, access_password FROM datastore.objects WHERE data_id = $1 AND upload_completed = TRUE AND deleted = FALSE "#, data_id as i64 ) .fetch_optional(get_db()) .await .map_err(|e| { eprintln!("Availability check DB error: {:?}", e); ErrorCode::DataStore_SystemFileError })? .ok_or(ErrorCode::DataStore_NotFound)?; let access_password = row.access_password as u64; if access_password != 0 && access_password != password { return Err(ErrorCode::DataStore_InvalidPassword); } if row.under_review { return Err(ErrorCode::DataStore_UnderReviewing); } Ok(()) } async fn get_object_ratings(data_id: u64, password: u64) -> Result, ErrorCode> { check_object_availability(data_id, password).await?; let rows = sqlx::query!( r#" SELECT slot, total_value, count, initial_value FROM datastore.object_ratings WHERE data_id = $1 "#, data_id as i64 ) .fetch_all(get_db()) .await .map_err(|e| { eprintln!("Ratings fetch error: {:?}", e); ErrorCode::DataStore_SystemFileError })?; let ratings = rows.into_iter().map(|row| { RatingInfoWithSlot { slot: row.slot as i8, rating: RatingInfo { total_value: row.total_value.unwrap_or(0), count: row.count as u32, initial_value: row.initial_value.unwrap_or(0), }, } }).collect(); Ok(ratings) } async fn get_object_info_by_data_id(data_id: u64, password: u64) -> Result { check_object_availability(data_id, password).await?; let row = sqlx::query!( r#"SELECT data_id, owner, size, name, data_type, meta_binary, permission, permission_recipients, delete_permission, delete_permission_recipients, period, refer_data_id, flag, tags, creation_date, update_date FROM datastore.objects WHERE data_id = $1"#, data_id as i64 ) .fetch_optional(get_db()) .await .map_err(|_| ErrorCode::DataStore_SystemFileError)? .ok_or(ErrorCode::DataStore_NotFound)?; let ratings = get_object_ratings(data_id, password).await?; Ok(map_row_to_meta_info( row.data_id, row.owner.unwrap_or(0), row.size.unwrap_or(0), row.name.unwrap_or_default(), row.data_type.unwrap_or(0) as i16, row.meta_binary.unwrap_or_default(), row.permission.unwrap_or(0) as i16, row.permission_recipients.unwrap_or_default().into_iter().map(|id| id as i64).collect(), row.delete_permission.unwrap_or(0) as i16, row.delete_permission_recipients.unwrap_or_default().into_iter().map(|id| id as i64).collect(), row.period.unwrap_or(0) as i16, row.refer_data_id.unwrap_or(0), row.flag.unwrap_or(0), row.tags.unwrap_or_default(), row.creation_date.map(|dt| chrono::NaiveDateTime::new( chrono::NaiveDate::from_ymd_opt(dt.year(), dt.month() as u32, dt.day() as u32).unwrap(), chrono::NaiveTime::from_hms_opt(dt.hour() as u32, dt.minute() as u32, dt.second() as u32).unwrap() )).unwrap_or_default(), row.update_date.map(|dt| chrono::NaiveDateTime::new( chrono::NaiveDate::from_ymd_opt(dt.year(), dt.month() as u32, dt.day() as u32).unwrap(), chrono::NaiveTime::from_hms_opt(dt.hour() as u32, dt.minute() as u32, dt.second() as u32).unwrap() )).unwrap_or_default(), ratings )) } async fn get_object_info_by_persistence_target(target: PersistenceTarget, password: u64) -> Result { let row = sqlx::query!( r#"SELECT data_id, owner, size, name, data_type, meta_binary, permission, permission_recipients, delete_permission, delete_permission_recipients, period, refer_data_id, flag, tags, creation_date, update_date, access_password, under_review FROM datastore.objects WHERE owner = $1 AND persistence_slot_id = $2 AND upload_completed = TRUE AND deleted = FALSE"#, target.owner as i32, target.persistence_slot_id as i16 ) .fetch_optional(get_db()) .await .map_err(|_| ErrorCode::DataStore_SystemFileError)? .ok_or(ErrorCode::DataStore_NotFound)?; let db_password = row.access_password as u64; if db_password != 0 && db_password != password { return Err(ErrorCode::DataStore_InvalidPassword); } if row.under_review { return Err(ErrorCode::DataStore_UnderReviewing); } let ratings = get_object_ratings(row.data_id as u64, password).await?; Ok(map_row_to_meta_info( row.data_id, row.owner.unwrap_or(0), row.size.unwrap_or(0), row.name.unwrap_or_default(), row.data_type.unwrap_or(0) as i16, row.meta_binary.unwrap_or_default(), row.permission.unwrap_or(0) as i16, row.permission_recipients.unwrap_or_default().into_iter().map(|id| id as i64).collect(), row.delete_permission.unwrap_or(0) as i16, row.delete_permission_recipients.unwrap_or_default().into_iter().map(|id| id as i64).collect(), row.period.unwrap_or(0) as i16, row.refer_data_id.unwrap_or(0), row.flag.unwrap_or(0), row.tags.unwrap_or_default(), row.creation_date.map(|dt| chrono::NaiveDateTime::new( chrono::NaiveDate::from_ymd_opt(dt.year(), dt.month() as u32, dt.day() as u32).unwrap(), chrono::NaiveTime::from_hms_opt(dt.hour() as u32, dt.minute() as u32, dt.second() as u32).unwrap() )).unwrap_or_default(), row.update_date.map(|dt| chrono::NaiveDateTime::new( chrono::NaiveDate::from_ymd_opt(dt.year(), dt.month() as u32, dt.day() as u32).unwrap(), chrono::NaiveTime::from_hms_opt(dt.hour() as u32, dt.minute() as u32, dt.second() as u32).unwrap() )).unwrap_or_default(), ratings )) } async fn get_buffer_queues_by_data_id_and_slot( data_id: u64, slot: u32 ) -> Result, ErrorCode> { check_object_availability(data_id, 0).await?; let rows = sqlx::query!( r#" SELECT buffer FROM datastore.buffer_queues WHERE data_id = $1 AND slot = $2 ORDER BY creation_date ASC "#, data_id as i64, slot as i32 ) .fetch_all(get_db()) .await .map_err(|e| { log::error!("Buffer queue fetch error: {:?}", e); ErrorCode::DataStore_SystemFileError })?; let buffer_queues = rows .into_iter() .map(|row| QBuffer(row.buffer)) .collect(); Ok(buffer_queues) } fn verify_object_permission( owner_id: u32, viewer_id: u32, permission: &Permission, ) -> Result<(), ErrorCode> { if owner_id == viewer_id { return Ok(()); } match permission.permission { 0 => Ok(()), // All can read 1 => Err(ErrorCode::DataStore_PermissionDenied), // Friends only, unimplemented 2 => { // Recipient IDs can read if permission.recipient_ids.contains(&viewer_id) { Ok(()) } else { Err(ErrorCode::DataStore_PermissionDenied) } } 3 => Err(ErrorCode::DataStore_PermissionDenied), // Owner only, redundant _ => Err(ErrorCode::DataStore_InvalidArgument), // ??? haxx0r } } fn filter_properties_by_result_option( meta_info: &mut GetMetaInfo, result_option: u8, ) { if (result_option & 0x01) == 0 { meta_info.meta_binary = QBuffer(Vec::new()); } if (result_option & 0x04) == 0 { meta_info.ratings = Vec::new(); } // No idea what the other things do. :shrug: } // Dawg... async fn get_custom_rankings_by_data_ids( application_id: u32, data_ids: Vec ) -> Vec { let mut results = Vec::with_capacity(data_ids.len()); let rows = sqlx::query!( r#" SELECT rankings.data_id, rankings.value FROM datastore.object_custom_rankings rankings JOIN UNNEST($1::bigint[]) WITH ORDINALITY AS rows(data_id, ord) ON rankings.data_id = rows.data_id AND rankings.application_id = $2 ORDER BY rows.ord "#, &data_ids.iter().map(|&id| id as i64).collect::>(), application_id as i32 ) .fetch_all(get_db()) .await; let rows = match rows { Ok(r) => r, Err(e) => { log::error!("Custom ranking query error: {:?}", e); return results; } }; for row in rows { let data_id = row.data_id as u64; let score = row.value.unwrap_or(0) as u32; if let Ok(meta) = get_object_info_by_data_id(data_id, 0).await { results.push(DataStoreCustomRankingResult { score, meta_info: meta, }); } else { log::warn!("Could not find metadata for ranked object {}", data_id); } } results } impl DataStore for User { async fn get_meta(&self, mut metaparam: GetMetaParam) -> Result { let mut meta_info = if metaparam.dataid != 0 { get_object_info_by_data_id(metaparam.dataid, metaparam.access_password).await? } else { get_object_info_by_persistence_target(metaparam.persistence_target, metaparam.access_password).await? }; let current_pid = self.pid; verify_object_permission(meta_info.owner, current_pid, &meta_info.permission)?; filter_properties_by_result_option(&mut meta_info, metaparam.result_option); Ok(meta_info) } async fn prepare_post_object(&self, postparam: PreparePostParam) -> Result { let recipient_ids: Vec = postparam.permission.recipient_ids.iter().map(|&id| id as i32).collect(); let del_recipient_ids: Vec = postparam.del_permission.recipient_ids.iter().map(|&id| id as i32).collect(); let now = time::OffsetDateTime::now_utc(); let row = sqlx::query!( r#" INSERT INTO datastore.objects ( owner, size, name, data_type, meta_binary, permission, permission_recipients, delete_permission, delete_permission_recipients, flag, period, refer_data_id, tags, persistence_slot_id, extra_data, creation_date, update_date ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17 ) RETURNING data_id "#, self.pid as i32, postparam.size as i32, postparam.name, postparam.data_type as i32, &postparam.meta_binary.0, postparam.permission.permission as i32, &recipient_ids, postparam.del_permission.permission as i32, &del_recipient_ids, postparam.flag as i32, postparam.period as i32, postparam.refer_data_id as i64, &postparam.tags, postparam.persistence_init_param.persistence_slot_id as i32, &postparam.extra_data, time::PrimitiveDateTime::new(now.date(), now.time()), time::PrimitiveDateTime::new(now.date(), now.time()) ) .fetch_one(get_db()) .await .map_err(|e| { log::error!("DB Error: {:?}", e); ErrorCode::DataStore_SystemFileError })?; let data_id = row.data_id as u64; let presigner = S3Presigner::new( &format!("https://{}", *RNEX_DATASTORE_S3_ENDPOINT), format!("{}", *RNEX_DATASTORE_S3_BUCKET) ).await; let key = format!("data/{}.bin", data_id); let (upload_url, fields) = presigner.generate_presigned_post(&key).await; let form_fields = fields.into_iter().map(|(k, v)| { KeyValue { key: k, value: v } }).collect(); Ok(ReqPostInfo { dataid: data_id, url: upload_url, request_headers: vec![], form_fields, root_ca_cert: vec![], }) } async fn complete_post_object(&self, completeparam: CompletePostParam) -> Result<(), ErrorCode> { let record = sqlx::query!( r#"SELECT owner, under_review FROM datastore.objects WHERE data_id = $1"#, completeparam.dataid as i64 ) .fetch_optional(get_db()) .await .map_err(|e| { eprintln!("select error: {:?}", e); ErrorCode::DataStore_SystemFileError })?; let record = record.ok_or(ErrorCode::DataStore_NotFound)?; if record.under_review { return Err(ErrorCode::DataStore_UnderReviewing); } if record.owner.unwrap_or(0) as u32 != self.pid { return Err(ErrorCode::DataStore_PermissionDenied); } if completeparam.success { sqlx::query!( r#"UPDATE datastore.objects SET upload_completed = true WHERE data_id = $1"#, completeparam.dataid as i64 ) .execute(get_db()) .await .map_err(|e| { eprintln!("update error: {:?}", e); ErrorCode::DataStore_SystemFileError })?; } else { return Err(ErrorCode::Transport_TemporaryServerError); } Ok(()) } async fn rate_custom_ranking(&self, rankingparam: Vec) -> Result<(), ErrorCode> { for param in rankingparam { let exists = sqlx::query_scalar!( r#"SELECT EXISTS(SELECT 1 FROM datastore.objects WHERE data_id = $1)"#, param.dataid as i64 ) .fetch_one(get_db()) .await .map_err(|_| ErrorCode::DataStore_SystemFileError)?; if !exists.unwrap_or(false) { return Err(ErrorCode::DataStore_NotFound); } sqlx::query!( r#" INSERT INTO datastore.object_custom_rankings (data_id, application_id, value) VALUES ($1, $2, $3) ON CONFLICT (data_id, application_id) DO UPDATE SET value = datastore.object_custom_rankings.value + EXCLUDED.value "#, param.dataid as i64, param.appid as i32, param.score as i32 ) .execute(get_db()) .await .map_err(|e| { log::error!("update/insert error: {:?}", e); ErrorCode::DataStore_SystemFileError })?; } Ok(()) } async fn get_application_config(&self, appid: u32) -> Result, ErrorCode> { const MAX_COURSE_UPLOADS: i32 = 100; let config = match appid { 0 => vec![ 0x00000001, 0x00000032, 0x00000096, 0x0000012c, 0x000001f4, 0x00000320, 0x00000514, 0x000007d0, 0x00000bb8, 0x00001388, MAX_COURSE_UPLOADS, 0x00000014, 0x0000001e, 0x00000028, 0x00000032, 0x0000003c, 0x00000046, 0x00000050, 0x0000005a, 0x00000064, 0x00000023, 0x0000004b, 0x00000023, 0x0000004b, 0x00000032, 0x00000000, 0x00000003, 0x00000003, 0x00000064, 0x00000006, 0x00000001, 0x00000060, 0x00000005, 0x00000060, 0x00000000, 0x000007e4, 0x00000001, 0x00000001, 0x0000000c, 0x00000000, ], 1 => vec![ 2, 1770179696, 1770179664, 1770179640, 1770180827, 1770180777, 1770180745, 1770177625, 1770177590, ], 2 => vec![0x000007df, 0x0000000c, 0x00000016, 0x00000005, 0x00000000], 10 => vec![35, 75, 96, 40, 5, 6], _ => { log::error!("unknown SMM app id: {}", appid); return Err(ErrorCode::DataStore_Unknown); } }; Ok(config) } async fn get_custom_ranking_by_data_id( &self, param: DataStoreGetCustomRankingByDataIDParam ) -> Result<(Vec, Vec), ErrorCode> { let mut ranking_results = get_custom_rankings_by_data_ids(param.application_id, param.data_id_list).await; let mut q_results = Vec::with_capacity(ranking_results.len()); for result in &mut ranking_results { if (param.result_option & 0x01) == 0 { result.meta_info.tags = Vec::new(); } if (param.result_option & 0x02) == 0 { result.meta_info.ratings = Vec::new(); } if (param.result_option & 0x04) == 0 { result.meta_info.meta_binary = QBuffer(Vec::new()); } if (param.result_option & 0x20) == 0 { result.score = 0; } q_results.push(QResult::success(ErrorCode::Core_Unknown)); } Ok((ranking_results, q_results)) } async fn get_buffer_queue(&self, param: BufferQueueParam) -> Result, ErrorCode> { // log::info!("GetBufferQueue: dataid={}, slot={}", param.dataid, param.slot); let buffers = get_buffer_queues_by_data_id_and_slot(param.dataid, param.slot).await?; Ok(buffers) } }