diff --git a/Cargo.lock b/Cargo.lock index 3e1d57d..8fd84b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2628,6 +2628,7 @@ checksum = "ee6798b1838b6a0f69c007c133b8df5866302197e404e8b6ee8ed3e3a5e68dc6" dependencies = [ "base64", "bytes", + "chrono", "crc", "crossbeam-queue", "either", @@ -2648,6 +2649,9 @@ dependencies = [ "sha2", "smallvec", "thiserror", + "time", + "tokio", + "tokio-stream", "tracing", "url", ] @@ -2686,6 +2690,7 @@ dependencies = [ "sqlx-postgres", "sqlx-sqlite", "syn 2.0.104", + "tokio", "url", ] @@ -2700,6 +2705,7 @@ dependencies = [ "bitflags", "byteorder", "bytes", + "chrono", "crc", "digest", "dotenvy", @@ -2727,6 +2733,7 @@ dependencies = [ "sqlx-core", "stringprep", "thiserror", + "time", "tracing", "whoami", ] @@ -2741,6 +2748,7 @@ dependencies = [ "base64", "bitflags", "byteorder", + "chrono", "crc", "dotenvy", "etcetera", @@ -2764,6 +2772,7 @@ dependencies = [ "sqlx-core", "stringprep", "thiserror", + "time", "tracing", "whoami", ] @@ -2775,6 +2784,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2d12fe70b2c1b4401038055f90f151b78208de1f9f89a7dbfd41587a10c3eea" dependencies = [ "atoi", + "chrono", "flume", "futures-channel", "futures-core", @@ -2788,6 +2798,7 @@ dependencies = [ "serde_urlencoded", "sqlx-core", "thiserror", + "time", "tracing", "url", ] @@ -2983,6 +2994,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-tungstenite" version = "0.28.0" diff --git a/rnex-core/.sqlx/query-039d134f0d1b681b55b1394a710b44a630eb066846c777c462b8c9341e26b696.json b/rnex-core/.sqlx/query-039d134f0d1b681b55b1394a710b44a630eb066846c777c462b8c9341e26b696.json new file mode 100644 index 0000000..f91446a --- /dev/null +++ b/rnex-core/.sqlx/query-039d134f0d1b681b55b1394a710b44a630eb066846c777c462b8c9341e26b696.json @@ -0,0 +1,38 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO datastore.objects (\n owner, size, name, data_type, meta_binary,\n permission, permission_recipients,\n delete_permission, delete_permission_recipients,\n flag, period, refer_data_id, tags,\n persistence_slot_id, extra_data, creation_date, update_date\n ) VALUES (\n $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17\n ) RETURNING data_id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "data_id", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int4", + "Int4", + "Text", + "Int4", + "Bytea", + "Int4", + "Int4Array", + "Int4", + "Int4Array", + "Int4", + "Int4", + "Int8", + "TextArray", + "Int4", + "TextArray", + "Timestamp", + "Timestamp" + ] + }, + "nullable": [ + false + ] + }, + "hash": "039d134f0d1b681b55b1394a710b44a630eb066846c777c462b8c9341e26b696" +} diff --git a/rnex-core/Cargo.toml b/rnex-core/Cargo.toml index 07e5dd8..646b65e 100644 --- a/rnex-core/Cargo.toml +++ b/rnex-core/Cargo.toml @@ -28,7 +28,7 @@ anyhow = "1.0.100" ureq = { version = "3.1.4", features = [ "json" ] } serde = { version = "1.0.228", features = [ "derive" ] } serde_json = "1.0.149" -sqlx = { version = "0.8.6", optional = true } +sqlx = { version = "0.8.6", optional = true, features = ["postgres", "runtime-tokio", "chrono", "time"] } aws-sdk-s3 = { version = "1.129.0", optional = true } aws-config = { version = "1.8.15", optional = true } base64 = "0.22.1" diff --git a/rnex-core/src/executables/backend_server_secure.rs b/rnex-core/src/executables/backend_server_secure.rs index 91b83b3..a92ba9a 100644 --- a/rnex-core/src/executables/backend_server_secure.rs +++ b/rnex-core/src/executables/backend_server_secure.rs @@ -1,5 +1,6 @@ use cfg_if::cfg_if; use rnex_core::common::setup; +use rnex_core::executables::common::DB_POOL; use rnex_core::executables::common::new_simple_backend; use rnex_core::executables::friends_backend::start_friends_backend; use rnex_core::nex::matchmake::MatchmakeManager; @@ -8,6 +9,7 @@ use rnex_core::nex::user::User; use rnex_core::rmc::protocols::{RemoteDisconnectable, RmcPureRemoteObject}; use std::sync::Arc; use std::sync::atomic::AtomicU32; +use sqlx::PgPool; #[tokio::main] async fn main() { @@ -16,6 +18,15 @@ async fn main() { cfg_if! { if #[cfg(feature = "friends")]{ start_friends_backend().await; + } else if #[cfg(feature = "datastore")] { + let database_url = std::env::var("RNEX_DATASTORE_DATABASE_URL") + .expect("RNEX_DATASTORE_DATABASE_URL must be set"); + + let pool = PgPool::connect(&database_url) + .await + .expect("Failed to create pool"); + + DB_POOL.set(pool).expect("failed to set global DB_POOL"); } else { use rnex_core::executables::regular_backend; regular_backend::start_regular_backend().await diff --git a/rnex-core/src/executables/common.rs b/rnex-core/src/executables/common.rs index 08ba399..ec27f43 100644 --- a/rnex-core/src/executables/common.rs +++ b/rnex-core/src/executables/common.rs @@ -9,6 +9,8 @@ use std::net::{Ipv4Addr, SocketAddrV4}; use std::sync::Arc; use tokio::net::TcpListener; use std::sync::LazyLock; +use std::sync::OnceLock; +use sqlx::postgres::PgPool; use log::error; use std::error::Error; @@ -17,6 +19,16 @@ use crate::reggie::UnitPacketRead; const IP_REQ_SERVICE_URL: &str = "https://ipinfo.io/ip"; +pub static RNEX_DATASTORE_DATABASE_URL: LazyLock = LazyLock::new(|| { + std::env::var("RNEX_DATASTORE_DATABASE_URL") + .expect("RNEX_DATASTORE_DATABASE_URL must be set") +}); + +pub static DB_POOL: OnceLock = OnceLock::new(); + +pub fn get_db() -> &'static PgPool { + DB_POOL.get().expect("db_pool not initialized") +} pub static RNEX_DATASTORE_S3_ENDPOINT: LazyLock = LazyLock::new(|| { std::env::var("RNEX_DATASTORE_S3_ENDPOINT") .expect("RNEX_DATASTORE_S3_ENDPOINT must be set") diff --git a/rnex-core/src/nex/datastore.rs b/rnex-core/src/nex/datastore.rs index a2d8b8a..57635a7 100644 --- a/rnex-core/src/nex/datastore.rs +++ b/rnex-core/src/nex/datastore.rs @@ -2,6 +2,9 @@ use crate::define_rmc_proto; use macros::rmc_struct; use rnex_core::prudp::socket_addr::PRUDPSockAddr; use std::sync::{Weak}; +use chrono::Utc; +use sqlx::types::time; +use sqlx::types::time::PrimitiveDateTime; use rnex_core::PID; use rnex_core::nex::remote_console::RemoteConsole; use rnex_core::nex::s3presigner::S3Presigner; @@ -10,7 +13,7 @@ use rnex_core::rmc::protocols::secure::{Secure, RawSecure, RawSecureInfo, Remote use rnex_core::rmc::protocols::datastore::{CompletePostParam, GetMetaInfo, GetMetaParam, KeyValue, RateCustomRankingParam}; use rnex_core::rmc::protocols::datastore::{DataStore, RawDataStore, RawDataStoreInfo, RemoteDataStore, PreparePostParam, ReqPostInfo}; use crate::nex::user::User; -use rnex_core::executables::common::{RNEX_DATASTORE_S3_BUCKET, RNEX_DATASTORE_S3_ENDPOINT}; +use rnex_core::executables::common::{RNEX_DATASTORE_S3_BUCKET, RNEX_DATASTORE_S3_ENDPOINT, get_db}; impl DataStore for User { async fn get_meta(&self, metaparam: GetMetaParam) -> Result { @@ -22,7 +25,49 @@ impl DataStore for User { } async fn prepare_post_object(&self, postparam: PreparePostParam) -> Result { - let data_id: u64 = 9400001; + // Prepare your arrays first (Postgres needs i32, not u32) + let recipient_ids: Vec = postparam.permission.recipient_ids.iter().map(|&id| id as i32).collect(); + let del_recipient_ids: Vec = postparam.del_permission.recipient_ids.iter().map(|&id| id as i32).collect(); + let now = time::OffsetDateTime::now_utc(); + + let row = sqlx::query!( + r#" + INSERT INTO datastore.objects ( + owner, size, name, data_type, meta_binary, + permission, permission_recipients, + delete_permission, delete_permission_recipients, + flag, period, refer_data_id, tags, + persistence_slot_id, extra_data, creation_date, update_date + ) VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17 + ) RETURNING data_id + "#, + self.pid as i32, + postparam.size as i32, + postparam.name, + postparam.data_type as i32, + &postparam.meta_binary.0, // unwrap QBuffer to &[u8] + postparam.permission.permission as i32, + &recipient_ids, + postparam.del_permission.permission as i32, + &del_recipient_ids, + postparam.flag as i32, + postparam.period as i32, + postparam.refer_data_id as i64, + &postparam.tags, + postparam.persistence_init_param.persistence_slot_id as i32, + &postparam.extra_data, + time::PrimitiveDateTime::new(now.date(), now.time()), + time::PrimitiveDateTime::new(now.date(), now.time()) + ) + .fetch_one(get_db()) + .await + .map_err(|e| { + log::error!("DB Error: {:?}", e); + ErrorCode::DataStore_SystemFileError + })?; + + let data_id = row.data_id as u64; let presigner = S3Presigner::new( &format!("https://{}", *RNEX_DATASTORE_S3_ENDPOINT), format!("{}", *RNEX_DATASTORE_S3_BUCKET)