preliminary DB support
This commit is contained in:
parent
3b103e12d4
commit
ee798408f0
6 changed files with 131 additions and 3 deletions
22
Cargo.lock
generated
22
Cargo.lock
generated
|
|
@ -2628,6 +2628,7 @@ checksum = "ee6798b1838b6a0f69c007c133b8df5866302197e404e8b6ee8ed3e3a5e68dc6"
|
|||
dependencies = [
|
||||
"base64",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"crc",
|
||||
"crossbeam-queue",
|
||||
"either",
|
||||
|
|
@ -2648,6 +2649,9 @@ dependencies = [
|
|||
"sha2",
|
||||
"smallvec",
|
||||
"thiserror",
|
||||
"time",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tracing",
|
||||
"url",
|
||||
]
|
||||
|
|
@ -2686,6 +2690,7 @@ dependencies = [
|
|||
"sqlx-postgres",
|
||||
"sqlx-sqlite",
|
||||
"syn 2.0.104",
|
||||
"tokio",
|
||||
"url",
|
||||
]
|
||||
|
||||
|
|
@ -2700,6 +2705,7 @@ dependencies = [
|
|||
"bitflags",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"crc",
|
||||
"digest",
|
||||
"dotenvy",
|
||||
|
|
@ -2727,6 +2733,7 @@ dependencies = [
|
|||
"sqlx-core",
|
||||
"stringprep",
|
||||
"thiserror",
|
||||
"time",
|
||||
"tracing",
|
||||
"whoami",
|
||||
]
|
||||
|
|
@ -2741,6 +2748,7 @@ dependencies = [
|
|||
"base64",
|
||||
"bitflags",
|
||||
"byteorder",
|
||||
"chrono",
|
||||
"crc",
|
||||
"dotenvy",
|
||||
"etcetera",
|
||||
|
|
@ -2764,6 +2772,7 @@ dependencies = [
|
|||
"sqlx-core",
|
||||
"stringprep",
|
||||
"thiserror",
|
||||
"time",
|
||||
"tracing",
|
||||
"whoami",
|
||||
]
|
||||
|
|
@ -2775,6 +2784,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "c2d12fe70b2c1b4401038055f90f151b78208de1f9f89a7dbfd41587a10c3eea"
|
||||
dependencies = [
|
||||
"atoi",
|
||||
"chrono",
|
||||
"flume",
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
|
|
@ -2788,6 +2798,7 @@ dependencies = [
|
|||
"serde_urlencoded",
|
||||
"sqlx-core",
|
||||
"thiserror",
|
||||
"time",
|
||||
"tracing",
|
||||
"url",
|
||||
]
|
||||
|
|
@ -2983,6 +2994,17 @@ dependencies = [
|
|||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-stream"
|
||||
version = "0.1.18"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-tungstenite"
|
||||
version = "0.28.0"
|
||||
|
|
|
|||
38
rnex-core/.sqlx/query-039d134f0d1b681b55b1394a710b44a630eb066846c777c462b8c9341e26b696.json
generated
Normal file
38
rnex-core/.sqlx/query-039d134f0d1b681b55b1394a710b44a630eb066846c777c462b8c9341e26b696.json
generated
Normal file
|
|
@ -0,0 +1,38 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n INSERT INTO datastore.objects (\n owner, size, name, data_type, meta_binary,\n permission, permission_recipients,\n delete_permission, delete_permission_recipients,\n flag, period, refer_data_id, tags,\n persistence_slot_id, extra_data, creation_date, update_date\n ) VALUES (\n $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17\n ) RETURNING data_id\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "data_id",
|
||||
"type_info": "Int8"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Int4",
|
||||
"Int4",
|
||||
"Text",
|
||||
"Int4",
|
||||
"Bytea",
|
||||
"Int4",
|
||||
"Int4Array",
|
||||
"Int4",
|
||||
"Int4Array",
|
||||
"Int4",
|
||||
"Int4",
|
||||
"Int8",
|
||||
"TextArray",
|
||||
"Int4",
|
||||
"TextArray",
|
||||
"Timestamp",
|
||||
"Timestamp"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "039d134f0d1b681b55b1394a710b44a630eb066846c777c462b8c9341e26b696"
|
||||
}
|
||||
|
|
@ -28,7 +28,7 @@ anyhow = "1.0.100"
|
|||
ureq = { version = "3.1.4", features = [ "json" ] }
|
||||
serde = { version = "1.0.228", features = [ "derive" ] }
|
||||
serde_json = "1.0.149"
|
||||
sqlx = { version = "0.8.6", optional = true }
|
||||
sqlx = { version = "0.8.6", optional = true, features = ["postgres", "runtime-tokio", "chrono", "time"] }
|
||||
aws-sdk-s3 = { version = "1.129.0", optional = true }
|
||||
aws-config = { version = "1.8.15", optional = true }
|
||||
base64 = "0.22.1"
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
use cfg_if::cfg_if;
|
||||
use rnex_core::common::setup;
|
||||
use rnex_core::executables::common::DB_POOL;
|
||||
use rnex_core::executables::common::new_simple_backend;
|
||||
use rnex_core::executables::friends_backend::start_friends_backend;
|
||||
use rnex_core::nex::matchmake::MatchmakeManager;
|
||||
|
|
@ -8,6 +9,7 @@ use rnex_core::nex::user::User;
|
|||
use rnex_core::rmc::protocols::{RemoteDisconnectable, RmcPureRemoteObject};
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicU32;
|
||||
use sqlx::PgPool;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
|
|
@ -16,6 +18,15 @@ async fn main() {
|
|||
cfg_if! {
|
||||
if #[cfg(feature = "friends")]{
|
||||
start_friends_backend().await;
|
||||
} else if #[cfg(feature = "datastore")] {
|
||||
let database_url = std::env::var("RNEX_DATASTORE_DATABASE_URL")
|
||||
.expect("RNEX_DATASTORE_DATABASE_URL must be set");
|
||||
|
||||
let pool = PgPool::connect(&database_url)
|
||||
.await
|
||||
.expect("Failed to create pool");
|
||||
|
||||
DB_POOL.set(pool).expect("failed to set global DB_POOL");
|
||||
} else {
|
||||
use rnex_core::executables::regular_backend;
|
||||
regular_backend::start_regular_backend().await
|
||||
|
|
|
|||
|
|
@ -9,6 +9,8 @@ use std::net::{Ipv4Addr, SocketAddrV4};
|
|||
use std::sync::Arc;
|
||||
use tokio::net::TcpListener;
|
||||
use std::sync::LazyLock;
|
||||
use std::sync::OnceLock;
|
||||
use sqlx::postgres::PgPool;
|
||||
|
||||
use log::error;
|
||||
use std::error::Error;
|
||||
|
|
@ -17,6 +19,16 @@ use crate::reggie::UnitPacketRead;
|
|||
|
||||
const IP_REQ_SERVICE_URL: &str = "https://ipinfo.io/ip";
|
||||
|
||||
pub static RNEX_DATASTORE_DATABASE_URL: LazyLock<String> = LazyLock::new(|| {
|
||||
std::env::var("RNEX_DATASTORE_DATABASE_URL")
|
||||
.expect("RNEX_DATASTORE_DATABASE_URL must be set")
|
||||
});
|
||||
|
||||
pub static DB_POOL: OnceLock<PgPool> = OnceLock::new();
|
||||
|
||||
pub fn get_db() -> &'static PgPool {
|
||||
DB_POOL.get().expect("db_pool not initialized")
|
||||
}
|
||||
pub static RNEX_DATASTORE_S3_ENDPOINT: LazyLock<String> = LazyLock::new(|| {
|
||||
std::env::var("RNEX_DATASTORE_S3_ENDPOINT")
|
||||
.expect("RNEX_DATASTORE_S3_ENDPOINT must be set")
|
||||
|
|
|
|||
|
|
@ -2,6 +2,9 @@ use crate::define_rmc_proto;
|
|||
use macros::rmc_struct;
|
||||
use rnex_core::prudp::socket_addr::PRUDPSockAddr;
|
||||
use std::sync::{Weak};
|
||||
use chrono::Utc;
|
||||
use sqlx::types::time;
|
||||
use sqlx::types::time::PrimitiveDateTime;
|
||||
use rnex_core::PID;
|
||||
use rnex_core::nex::remote_console::RemoteConsole;
|
||||
use rnex_core::nex::s3presigner::S3Presigner;
|
||||
|
|
@ -10,7 +13,7 @@ use rnex_core::rmc::protocols::secure::{Secure, RawSecure, RawSecureInfo, Remote
|
|||
use rnex_core::rmc::protocols::datastore::{CompletePostParam, GetMetaInfo, GetMetaParam, KeyValue, RateCustomRankingParam};
|
||||
use rnex_core::rmc::protocols::datastore::{DataStore, RawDataStore, RawDataStoreInfo, RemoteDataStore, PreparePostParam, ReqPostInfo};
|
||||
use crate::nex::user::User;
|
||||
use rnex_core::executables::common::{RNEX_DATASTORE_S3_BUCKET, RNEX_DATASTORE_S3_ENDPOINT};
|
||||
use rnex_core::executables::common::{RNEX_DATASTORE_S3_BUCKET, RNEX_DATASTORE_S3_ENDPOINT, get_db};
|
||||
|
||||
impl DataStore for User {
|
||||
async fn get_meta(&self, metaparam: GetMetaParam) -> Result<GetMetaInfo, ErrorCode> {
|
||||
|
|
@ -22,7 +25,49 @@ impl DataStore for User {
|
|||
}
|
||||
|
||||
async fn prepare_post_object(&self, postparam: PreparePostParam) -> Result<ReqPostInfo, ErrorCode> {
|
||||
let data_id: u64 = 9400001;
|
||||
// Prepare your arrays first (Postgres needs i32, not u32)
|
||||
let recipient_ids: Vec<i32> = postparam.permission.recipient_ids.iter().map(|&id| id as i32).collect();
|
||||
let del_recipient_ids: Vec<i32> = postparam.del_permission.recipient_ids.iter().map(|&id| id as i32).collect();
|
||||
let now = time::OffsetDateTime::now_utc();
|
||||
|
||||
let row = sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO datastore.objects (
|
||||
owner, size, name, data_type, meta_binary,
|
||||
permission, permission_recipients,
|
||||
delete_permission, delete_permission_recipients,
|
||||
flag, period, refer_data_id, tags,
|
||||
persistence_slot_id, extra_data, creation_date, update_date
|
||||
) VALUES (
|
||||
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17
|
||||
) RETURNING data_id
|
||||
"#,
|
||||
self.pid as i32,
|
||||
postparam.size as i32,
|
||||
postparam.name,
|
||||
postparam.data_type as i32,
|
||||
&postparam.meta_binary.0, // unwrap QBuffer to &[u8]
|
||||
postparam.permission.permission as i32,
|
||||
&recipient_ids,
|
||||
postparam.del_permission.permission as i32,
|
||||
&del_recipient_ids,
|
||||
postparam.flag as i32,
|
||||
postparam.period as i32,
|
||||
postparam.refer_data_id as i64,
|
||||
&postparam.tags,
|
||||
postparam.persistence_init_param.persistence_slot_id as i32,
|
||||
&postparam.extra_data,
|
||||
time::PrimitiveDateTime::new(now.date(), now.time()),
|
||||
time::PrimitiveDateTime::new(now.date(), now.time())
|
||||
)
|
||||
.fetch_one(get_db())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
log::error!("DB Error: {:?}", e);
|
||||
ErrorCode::DataStore_SystemFileError
|
||||
})?;
|
||||
|
||||
let data_id = row.data_id as u64;
|
||||
let presigner = S3Presigner::new(
|
||||
&format!("https://{}", *RNEX_DATASTORE_S3_ENDPOINT),
|
||||
format!("{}", *RNEX_DATASTORE_S3_BUCKET)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue