Migrate file storage to S3

This commit is contained in:
red binder 2026-03-28 21:49:40 +00:00 committed by Maple
commit bb0c06faf4
10 changed files with 181 additions and 165 deletions

View file

@ -1,15 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO files (key, data)\n VALUES ($1, $2)\n ON CONFLICT (key)\n DO UPDATE SET\n key = EXCLUDED.key,\n data = EXCLUDED.data;\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Bytea"
]
},
"nullable": []
},
"hash": "08d43b2c58526766fe48ad002daa78fa04f3a3ef3895d0f72ae9469619a31178"
}

View file

@ -1,28 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO files_wup (\n file_key,\n task_id,\n boss_app_ids,\n supported_countries,\n supported_languages,\n attributes,\n creator_user,\n name,\n type,\n hash,\n size,\n notify_on_new,\n notify_led,\n condition_played,\n auto_delete,\n created,\n updated\n )\n VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15,\n NOW() AT TIME ZONE 'UTC',\n NOW() AT TIME ZONE 'UTC'\n )\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Text",
"TextArray",
"TextArray",
"TextArray",
"Jsonb",
"Text",
"Text",
"Text",
"Text",
"Int8",
"TextArray",
"Bool",
"Int8",
"Bool"
]
},
"nullable": []
},
"hash": "696db3dcba46ed502e8714b71c78735a64bab16b7c5c08d7667faf673bbaf5c8"
}

View file

@ -0,0 +1,28 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO files_wup (\n file_key, task_id, boss_app_ids, supported_countries,\n supported_languages, attributes, creator_user, name,\n type, hash, size, notify_on_new, notify_led,\n condition_played, auto_delete, created, updated\n )\n VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15,\n NOW() AT TIME ZONE 'UTC',\n NOW() AT TIME ZONE 'UTC'\n )\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Text",
"TextArray",
"TextArray",
"TextArray",
"Jsonb",
"Text",
"Text",
"Text",
"Text",
"Int8",
"TextArray",
"Bool",
"Int8",
"Bool"
]
},
"nullable": []
},
"hash": "700baac8452c3a36b7193f44c06c0fc1b0faed7d05ae5d9d50b0ff66d9a9c625"
}

View file

@ -1,22 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "SELECT data FROM files WHERE key = $1",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "data",
"type_info": "Bytea"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
false
]
},
"hash": "c059d43146d4dc58ca32bdebd55b7734a3f4a3c3cac5a1525a84e8121b6a9c9c"
}

View file

@ -24,3 +24,7 @@ rand = "0.8"
anyhow = "1.0"
base64 = "0.21"
reqwest = { version = "0.11", features = ["json", "rustls-tls"] }
aws-config = { version = "1.1", features = ["behavior-version-latest"] }
aws-sdk-s3 = "1.17"
tokio = { version = "1.50.0", features = ["full"] }
log = "0.4.29"

View file

@ -1,8 +1,3 @@
CREATE TABLE files (
key TEXT PRIMARY KEY,
data BYTEA NOT NULL
);
CREATE SEQUENCE tasks_data_id_seq START WITH 1 INCREMENT BY 1;
CREATE TABLE tasks (

View file

@ -8,6 +8,9 @@ use crate::Pool;
use crate::admin_auth::AdminAuth;
use crate::models::file_wup::{is_valid_countries, is_valid_file_notify_conditions, is_valid_file_type, is_valid_languages};
use crate::{crypto::wiiu::encrypt_wiiu, models::file_wup::FileWUPAttributes};
use aws_sdk_s3::Client as S3Client;
use aws_sdk_s3::primitives::ByteStream;
use log::{error, info, warn};
#[derive(Deserialize)]
pub struct UploadedWUP {
@ -27,94 +30,132 @@ pub struct UploadedWUP {
}
#[rocket::post("/api/v1/upl_wup", data = "<input>")]
pub async fn upload_file_wup(pool: &State<Pool>, input: Json<UploadedWUP>, auth: AdminAuth) -> Result<(), Status> {
pub async fn upload_file_wup(
pool: &State<Pool>,
s3: &State<S3Client>,
input: Json<UploadedWUP>,
auth: AdminAuth
) -> Result<(), Status> {
let pool = pool.inner();
let s3 = s3.inner();
let data = input.into_inner();
let admin_username = auth.0;
info!("starting WUP upload for task: {} by user: {}", data.task_id, admin_username);
let bucket_name = env::var("S3_BUCKET_NAME").unwrap_or_else(|_| "boss-files".to_string());
let aes_key = match env::var("AES_KEY") {
Ok(key) => key,
Err(_) => return Err(Status::InternalServerError),
Err(_) => {
error!("environment variable AES_KEY not found");
return Err(Status::InternalServerError);
}
};
let hmac_key = match env::var("HMAC_KEY") {
Ok(key) => key,
Err(_) => return Err(Status::InternalServerError),
Err(_) => {
error!("environment variable HMAC_KEY not found");
return Err(Status::InternalServerError);
}
};
let data_bytes = match base64::decode(data.data) {
let data_bytes = match base64::decode(&data.data) {
Ok(data) => data,
Err(_) => return Err(Status::BadRequest)
Err(e) => {
warn!("failed to decode base64 data: {}", e);
return Err(Status::BadRequest);
}
};
if data.name.is_none() && !data.name_equals_data_id {
warn!("validation failed: name is None and name_equals_data_id is false");
return Err(Status::BadRequest);
}
let name = data.name.clone().unwrap_or_default();
if data.name == None && !data.name_equals_data_id { return Err(Status::BadRequest) };
let name = data.name.clone().unwrap();
for id in &data.boss_app_ids { if id.len() != 16 { return Err(Status::BadRequest); }; };
//if data.boss_app_id.len() != 16 { return Err(Status::BadRequest) };
if !is_valid_countries(&data.supported_countries) { return Err(Status::BadRequest) };
if !is_valid_languages(&data.supported_languages) { return Err(Status::BadRequest) };
if !is_valid_file_type(&data.r#type) { return Err(Status::BadRequest) };
if !is_valid_file_notify_conditions(&data.notify_on_new) { return Err(Status::BadRequest) };
if data_bytes.len() == 0 { return Err(Status::BadRequest) };
for id in &data.boss_app_ids {
if id.len() != 16 {
warn!("validation failed: boss_app_id {} length is not 16", id);
return Err(Status::BadRequest);
};
}
if !is_valid_countries(&data.supported_countries) {
warn!("validation failed: Invalid supported_countries");
return Err(Status::BadRequest);
}
if !is_valid_languages(&data.supported_languages) {
warn!("validation failed: Invalid supported_languages");
return Err(Status::BadRequest);
}
if !is_valid_file_type(&data.r#type) {
warn!("validation failed: Invalid file type {}", data.r#type);
return Err(Status::BadRequest);
}
if !is_valid_file_notify_conditions(&data.notify_on_new) {
warn!("validation failed: Invalid notify_on_new conditions");
return Err(Status::BadRequest);
}
if data_bytes.is_empty() {
warn!("validation failed: data_bytes is empty");
return Err(Status::BadRequest);
}
let attributes = match data.attributes {
Some(attr) => attr,
None => FileWUPAttributes{
None => FileWUPAttributes {
attribute1: "".to_string(),
attribute2: "".to_string(),
attribute3: "".to_string(),
description: "".to_string(),
}
};
let attributes = match to_value(attributes) {
let attributes_json = match to_value(attributes) {
Ok(attr) => attr,
Err(_) => return Err(Status::InternalServerError),
Err(e) => {
error!("failed to serialize attributes to JSON: {}", e);
return Err(Status::InternalServerError);
}
};
let encrypted_data = match &data_bytes[0..4] == b"boss" {
true => data_bytes,
false => match encrypt_wiiu(&data_bytes, &aes_key.as_bytes(), &hmac_key.as_bytes()) {
true => {
info!("data already contains BOSS header, skipping encryption");
data_bytes
},
false => match encrypt_wiiu(&data_bytes, aes_key.as_bytes(), hmac_key.as_bytes()) {
Ok(data) => data,
Err(_) => return Err(Status::InternalServerError)
Err(e) => {
error!("encryption failed: {:?}", e);
return Err(Status::InternalServerError);
}
},
};
let mut hasher = Md5::new();
hasher.update(encrypted_data.clone());
let hash_bytes = &hasher.finalize()[..];
let hash = hex::encode(hash_bytes);
//let file_key = format!("{}/{}/{}", data.boss_app_id, data.task_id, hash);
hasher.update(&encrypted_data);
let hash = hex::encode(hasher.finalize());
let file_key = format!("{}/{}", data.task_id, hash);
let _ = sqlx::query!(
r#"
INSERT INTO files (key, data)
VALUES ($1, $2)
ON CONFLICT (key)
DO UPDATE SET
key = EXCLUDED.key,
data = EXCLUDED.data;
"#,
file_key,
encrypted_data,
)
.execute(pool)
.await;
info!("uploading file to S3: bucket={}, key={}", bucket_name, file_key);
s3.put_object()
.bucket(bucket_name)
.key(&file_key)
.body(ByteStream::from(encrypted_data.clone()))
.send()
.await
.map_err(|e| {
error!("S3 PutObject failed: {:?}", e);
Status::InternalServerError
})?;
// Set old file to deleted if it previously existed
let _ = sqlx::query!(
// Database Operations
info!("updating database records for task {}", data.task_id);
let update_res = sqlx::query!(
r#"
UPDATE files_wup
SET deleted = TRUE,
@ -128,37 +169,19 @@ pub async fn upload_file_wup(pool: &State<Pool>, input: Json<UploadedWUP>, auth:
.execute(pool)
.await;
let file_name = match data.name.clone() {
Some(name) => {
if data.name_equals_data_id {
"".to_string()
} else {
name
}
},
None => "".to_string(),
};
if let Err(e) = update_res {
error!("database UPDATE failed: {}", e);
}
let file_name = if data.name_equals_data_id { "".to_string() } else { name };
let result = sqlx::query!(
r#"
INSERT INTO files_wup (
file_key,
task_id,
boss_app_ids,
supported_countries,
supported_languages,
attributes,
creator_user,
name,
type,
hash,
size,
notify_on_new,
notify_led,
condition_played,
auto_delete,
created,
updated
file_key, task_id, boss_app_ids, supported_countries,
supported_languages, attributes, creator_user, name,
type, hash, size, notify_on_new, notify_led,
condition_played, auto_delete, created, updated
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15,
NOW() AT TIME ZONE 'UTC',
@ -170,7 +193,7 @@ pub async fn upload_file_wup(pool: &State<Pool>, input: Json<UploadedWUP>, auth:
&data.boss_app_ids,
&data.supported_countries,
&data.supported_languages,
attributes,
attributes_json,
admin_username,
file_name,
data.r#type,
@ -185,7 +208,13 @@ pub async fn upload_file_wup(pool: &State<Pool>, input: Json<UploadedWUP>, auth:
.await;
match result {
Ok(_) => Ok(()),
Err(_) => Err(Status::InternalServerError)
Ok(_) => {
info!("successfully processed WUP upload for task: {}", data.task_id);
Ok(())
},
Err(e) => {
error!("database INSERT failed: {}", e);
Err(Status::InternalServerError)
}
}
}

View file

@ -5,6 +5,8 @@ use rocket::{routes, catchers, Request};
use rocket::http::{Method, ContentType, Status};
use rocket::response::content::RawXml;
use rocket_cors::{AllowedOrigins, CorsOptions};
use aws_sdk_s3::Client as S3Client;
use aws_sdk_s3::config::{Builder, Credentials, Region};
mod models;
mod services;
@ -40,6 +42,17 @@ async fn launch() -> _ {
dotenv().ok();
let database_url = env::var("DATABASE_URL").expect("No database URL set");
let config = aws_config::load_from_env().await;
let s3_endpoint = env::var("S3_ENDPOINT").expect("S3_ENDPOINT must be set");
let access_key = env::var("S3_ACCESS_KEY").expect("S3_ACCESS_KEY must be set");
let secret_key = env::var("S3_SECRET_KEY").expect("s3 secret key must be set");
let s3_config = Builder::from(&config)
.endpoint_url(s3_endpoint)
.credentials_provider(Credentials::new(access_key, secret_key, None, None, "static"))
.force_path_style(true)
.build();
let s3_client = S3Client::from_conf(s3_config);
let pool = PgPoolOptions::new()
.max_connections(5)
@ -59,6 +72,7 @@ async fn launch() -> _ {
rocket::build()
.attach(cors.to_cors().unwrap())
.manage(pool)
.manage(s3_client)
.mount("/", routes![
services::nppl::policylist,
services::nppl::policylist_consoletype,

View file

@ -4,6 +4,8 @@ use rocket::{Data, State};
use rocket::response::{Response, Responder};
use crate::Pool;
use crate::database::get_wup_task_file_by_data_id;
use std::env;
use aws_sdk_s3::Client as S3Client;
#[derive(Responder)]
pub struct DataResponder<T> {
@ -27,8 +29,10 @@ impl<'r, 'o: 'r, T: Responder<'r, 'o>> DataResponder<T> {
}
#[rocket::get("/p01/data/1/<boss_app_id>/<data_id>/<file_hash>")]
pub async fn data(pool: &State<Pool>, boss_app_id: String, data_id: i64, file_hash: String) -> Result<DataResponder<Vec<u8>>, Status> {
pub async fn data(pool: &State<Pool>, s3: &State<S3Client>,boss_app_id: String, data_id: i64, file_hash: String) -> Result<DataResponder<Vec<u8>>, Status> {
let pool = pool.inner();
let s3 = s3.inner();
let bucket_name = env::var("S3_BUCKET_NAME").unwrap_or_else(|_| "bossdata".to_string());
let file_wup = get_wup_task_file_by_data_id(pool, data_id).await;
@ -37,20 +41,25 @@ pub async fn data(pool: &State<Pool>, boss_app_id: String, data_id: i64, file_ha
None => return Err(Status::NotFound),
};
if file_wup.hash != file_hash || !file_wup.boss_app_ids.contains(&boss_app_id) { return Err(Status::NotFound); }
if file_wup.hash != file_hash || !file_wup.boss_app_ids.contains(&boss_app_id) {
return Err(Status::NotFound);
}
let file = sqlx::query_scalar!(
"SELECT data FROM files WHERE key = $1",
file_wup.file_key,
)
.fetch_optional(pool)
.await;
let s3_output = s3.get_object()
.bucket(bucket_name)
.key(&file_wup.file_key)
.send()
.await;
let file = match file {
Ok(Some(file)) => { file },
Ok(None) => return Err(Status::NotFound),
let object = match s3_output {
Ok(output) => output,
Err(_) => return Err(Status::NotFound),
};
Ok(DataResponder::new(file, file_wup.size.to_string()))
let data = object.body.collect().await
.map_err(|_| Status::InternalServerError)?
.into_bytes()
.to_vec();
Ok(DataResponder::new(data, file_wup.size.to_string()))
}

View file

@ -10,6 +10,8 @@ use crate::Pool;
// TODO:
// - Use database for policy lists for easier modification if needed
// Do we really need to though? Policylists are usually static and if they need updating a commit is warranted anyways
#[derive(Serialize)]
enum TaskLevel {
STOPPED,