Add world tables and columns, temporary backfill

Too lazy to make a new script, main() has a backfill using plugins.zip. Once I run it I will remove it.
This commit is contained in:
Tyler Hallada 2021-07-21 21:35:11 -04:00
parent 0e0fdfd59d
commit 4b333b3b99
9 changed files with 449 additions and 82 deletions

View File

@ -18,6 +18,18 @@ CREATE TABLE IF NOT EXISTS "plugin_worlds" (
CREATE UNIQUE INDEX "plugin_worlds_unique_plugin_id_and_world_id" ON "plugin_worlds" ("plugin_id", "world_id"); CREATE UNIQUE INDEX "plugin_worlds_unique_plugin_id_and_world_id" ON "plugin_worlds" ("plugin_id", "world_id");
CREATE INDEX "plugin_worlds_world_id" ON "plugin_worlds" ("world_id"); CREATE INDEX "plugin_worlds_world_id" ON "plugin_worlds" ("world_id");
DELETE FROM "plugin_cells";
DELETE FROM "plugins";
DELETE FROM "cells";
ALTER TABLE "cells" ADD COLUMN "world_id" INTEGER REFERENCES "worlds"(id); ALTER TABLE "cells" ADD COLUMN "world_id" INTEGER REFERENCES "worlds"(id);
CREATE UNIQUE INDEX "cells_unique_form_id_and_world_id" ON "cells" ("form_id", "world_id"); ALTER TABLE "cells" ADD COLUMN "master" VARCHAR(255) NOT NULL;
DROP INDEX "cells_unique_form_id"; CREATE UNIQUE INDEX "cells_unique_form_id_master_and_world_id" ON "cells" ("form_id", "master", "world_id");
DROP INDEX "cells_unique_form_id";
ALTER TABLE "plugins" ADD COLUMN "file_name" VARCHAR(255) NOT NULL;
ALTER TABLE "plugins" ADD COLUMN "file_path" TEXT NOT NULL;
ALTER TABLE "plugins" ALTER COLUMN "version" SET NOT NULL;
ALTER TABLE "plugins" ALTER COLUMN "masters" SET NOT NULL;
DROP INDEX "plugins_unique_name_and_file_id";
CREATE UNIQUE INDEX "plugins_unique_file_id_and_file_path" ON "plugins" ("file_id", "file_path");

View File

@ -7,8 +7,11 @@ use sqlx::postgres::PgPoolOptions;
use std::convert::TryInto; use std::convert::TryInto;
use std::env; use std::env;
use std::fs::OpenOptions; use std::fs::OpenOptions;
use std::io::Read;
use std::io::Seek; use std::io::Seek;
use std::io::SeekFrom; use std::io::SeekFrom;
use std::ops::Index;
use std::path::Path;
use std::process::Command; use std::process::Command;
use std::time::Duration; use std::time::Duration;
use tempfile::tempdir; use tempfile::tempdir;
@ -17,29 +20,45 @@ use tokio::time::sleep;
use tracing::{debug, info, info_span, warn}; use tracing::{debug, info, info_span, warn};
use unrar::Archive; use unrar::Archive;
use zip::write::{FileOptions, ZipWriter}; use zip::write::{FileOptions, ZipWriter};
use zip::ZipArchive;
mod models; mod models;
mod nexus_api; mod nexus_api;
mod nexus_scraper; mod nexus_scraper;
use models::cell;
use models::game; use models::game;
use models::plugin; use models::plugin;
use models::plugin_cell; use models::{cell, cell::UnsavedCell};
use models::{file, file::File}; use models::{file, file::File};
use models::{game_mod, game_mod::Mod}; use models::{game_mod, game_mod::Mod};
use models::{plugin_cell, plugin_cell::UnsavedPluginCell};
use models::{plugin_world, plugin_world::UnsavedPluginWorld};
use models::{world, world::UnsavedWorld};
use nexus_api::{GAME_ID, GAME_NAME}; use nexus_api::{GAME_ID, GAME_NAME};
async fn process_plugin<W>( fn get_local_form_id_and_master<'a>(
form_id: u32,
masters: &'a [&str],
file_name: &'a str,
) -> Result<(i32, &'a str)> {
let master_index = (form_id >> 24) as usize;
let local_form_id = (form_id & 0xFFFFFF).try_into()?;
if master_index >= masters.len() {
return Ok((local_form_id, file_name));
}
Ok((local_form_id, masters[master_index]))
}
async fn process_plugin(
plugin_buf: &mut [u8], plugin_buf: &mut [u8],
pool: &sqlx::Pool<sqlx::Postgres>, pool: &sqlx::Pool<sqlx::Postgres>,
plugin_archive: &mut ZipWriter<W>, // plugin_archive: &mut ZipWriter<W>,
db_file: &File, db_file: &File,
mod_obj: &Mod, mod_obj: &Mod,
file_name: &str, file_path: &str,
) -> Result<()> ) -> Result<()>
where // where
W: std::io::Write + std::io::Seek, // W: std::io::Write + std::io::Seek,
{ {
if plugin_buf.len() == 0 { if plugin_buf.len() == 0 {
warn!("skipping processing of invalid empty plugin"); warn!("skipping processing of invalid empty plugin");
@ -48,55 +67,128 @@ where
info!(bytes = plugin_buf.len(), "parsing plugin"); info!(bytes = plugin_buf.len(), "parsing plugin");
match parse_plugin(&plugin_buf) { match parse_plugin(&plugin_buf) {
Ok(plugin) => { Ok(plugin) => {
info!(num_cells = plugin.cells.len(), "parse finished"); info!(
num_worlds = plugin.worlds.len(),
num_cells = plugin.cells.len(),
"parse finished"
);
let hash = seahash::hash(&plugin_buf); let hash = seahash::hash(&plugin_buf);
let file_name = Path::new(file_path)
.file_name()
.expect("plugin path ends in a valid file_name")
.to_string_lossy();
let plugin_row = plugin::insert( let plugin_row = plugin::insert(
&pool, &pool,
&db_file.name, &db_file.name,
hash as i64, hash as i64,
db_file.id, db_file.id,
Some(plugin.header.version as f64), plugin.header.version as f64,
plugin_buf.len() as i64, plugin_buf.len() as i64,
plugin.header.author, plugin.header.author,
plugin.header.description, plugin.header.description,
Some( &plugin
&plugin .header
.header .masters
.masters .iter()
.iter() .map(|s| s.to_string())
.map(|s| s.to_string()) .collect::<Vec<String>>(),
.collect::<Vec<String>>(), &file_name,
), file_path,
) )
.await?; .await?;
for cell in plugin.cells {
let cell_row = cell::insert( let worlds: Vec<UnsavedWorld> = plugin
&pool, .worlds
cell.form_id.try_into().unwrap(), .iter()
cell.x, .map(|world| {
cell.y, let (form_id, master) = get_local_form_id_and_master(
// TODO: fill in world_id here world.form_id,
None, &plugin.header.masters,
cell.is_persistent, &file_name,
) )
.await?; .expect("form_id to be a valid i32");
plugin_cell::insert(&pool, plugin_row.id, cell_row.id, cell.editor_id).await?; UnsavedWorld {
} form_id,
master: master.to_string(),
}
})
.collect();
let db_worlds = world::batched_insert(&pool, &worlds).await?;
let plugin_worlds: Vec<UnsavedPluginWorld> = db_worlds
.iter()
.zip(&plugin.worlds)
.map(|(db_world, plugin_world)| UnsavedPluginWorld {
plugin_id: plugin_row.id,
world_id: db_world.id,
editor_id: plugin_world.editor_id.clone(),
})
.collect();
plugin_world::batched_insert(&pool, &plugin_worlds).await?;
let cells: Vec<UnsavedCell> = plugin
.cells
.iter()
.map(|cell| {
let world_id = if let Some(world_form_id) = cell.world_form_id {
let (form_id, master) = get_local_form_id_and_master(
world_form_id,
&plugin.header.masters,
&file_name,
)
.expect("form_id to be valid i32");
Some(
db_worlds
.iter()
.find(|&world| world.form_id == form_id && world.master == master)
.expect("cell references world in the plugin worlds")
.id,
)
} else {
None
};
let (form_id, master) = get_local_form_id_and_master(
cell.form_id,
&plugin.header.masters,
&file_name,
)
.expect("form_id is a valid i32");
UnsavedCell {
form_id,
master: master.to_string(),
x: cell.x,
y: cell.y,
world_id,
is_persistent: cell.is_persistent,
}
})
.collect();
let db_cells = cell::batched_insert(&pool, &cells).await?;
let plugin_cells: Vec<UnsavedPluginCell> = db_cells
.iter()
.zip(&plugin.cells)
.map(|(db_cell, plugin_cell)| UnsavedPluginCell {
plugin_id: plugin_row.id,
cell_id: db_cell.id,
editor_id: plugin_cell.editor_id.clone(),
})
.collect();
plugin_cell::batched_insert(&pool, &plugin_cells).await?;
} }
Err(err) => { Err(err) => {
warn!(error = %err, "Failed to parse plugin, skipping plugin"); warn!(error = %err, "Failed to parse plugin, skipping plugin");
} }
} }
plugin_archive.start_file( // TODO: re-enable after db fix
format!( // plugin_archive.start_file(
"{}/{}/{}/{}", // format!(
GAME_NAME, mod_obj.nexus_mod_id, db_file.nexus_file_id, file_name // "{}/{}/{}/{}",
), // GAME_NAME, mod_obj.nexus_mod_id, db_file.nexus_file_id, file_path
FileOptions::default(), // ),
)?; // FileOptions::default(),
// )?;
let mut reader = std::io::Cursor::new(&plugin_buf); // let mut reader = std::io::Cursor::new(&plugin_buf);
std::io::copy(&mut reader, plugin_archive)?; // std::io::copy(&mut reader, plugin_archive)?;
Ok(()) Ok(())
} }
@ -128,6 +220,58 @@ pub async fn main() -> Result<()> {
let game = game::insert(&pool, GAME_NAME, GAME_ID as i32).await?; let game = game::insert(&pool, GAME_NAME, GAME_ID as i32).await?;
let client = reqwest::Client::new(); let client = reqwest::Client::new();
// DELETEME: just running this to clean up the existing database rows
let plugins_archive = std::fs::File::open("plugins.zip")?;
let mut plugins_archive = ZipArchive::new(plugins_archive)?;
let file_paths: Vec<String> = plugins_archive
.file_names()
.map(|s| s.to_string())
.collect();
for (i, file_name) in file_paths.iter().enumerate() {
info!("plugin: {:?} / {:?}. {}", i, file_paths.len(), file_name);
let file_path = Path::new(file_name);
let mut components = file_path.components();
let _game_name = components.next().expect("game directory");
let nexus_mod_id: i32 = components
.next()
.expect("mod_id directory")
.as_os_str()
.to_string_lossy()
.parse()?;
let nexus_file_id: i32 = components
.next()
.expect("file_id directory")
.as_os_str()
.to_string_lossy()
.parse()?;
let original_file_path: &Path = components.as_ref();
let original_file_path = original_file_path.to_string_lossy();
if let Some(db_mod) = game_mod::get_by_nexus_mod_id(&pool, nexus_mod_id).await? {
if let Some(db_file) = file::get_by_nexus_file_id(&pool, nexus_file_id).await? {
let mut plugin_file = plugins_archive.by_name(file_name)?;
let mut plugin_buf = Vec::new();
plugin_file.read_to_end(&mut plugin_buf)?;
info!(
nexus_mod_id,
nexus_file_id, %original_file_path, "processing plugin"
);
process_plugin(
&mut plugin_buf,
&pool,
&db_file,
&db_mod,
&original_file_path,
)
.await?;
} else {
warn!(nexus_file_id, "missing db file!");
}
} else {
warn!(nexus_mod_id, "missing db mod!");
}
}
return Ok(());
let mut page: i32 = 1; let mut page: i32 = 1;
let mut has_next_page = true; let mut has_next_page = true;
@ -140,7 +284,10 @@ pub async fn main() -> Result<()> {
has_next_page = scraped.has_next_page; has_next_page = scraped.has_next_page;
let mut mods = Vec::new(); let mut mods = Vec::new();
for scraped_mod in scraped.mods { for scraped_mod in scraped.mods {
if let None = game_mod::get_by_nexus_mod_id(&pool, scraped_mod.nexus_mod_id).await? { // TODO: this logic needs to change once I clean up the existing database rows
if let Some(game_mod) =
game_mod::get_by_nexus_mod_id(&pool, scraped_mod.nexus_mod_id).await?
{
mods.push( mods.push(
game_mod::insert( game_mod::insert(
&pool, &pool,
@ -313,20 +460,20 @@ pub async fn main() -> Result<()> {
.process() .process()
.expect("failed to extract"); .expect("failed to extract");
for file_name in plugin_file_paths.iter() { for file_path in plugin_file_paths.iter() {
info!( info!(
?file_name, ?file_path,
"processing uncompressed file from downloaded archive" "processing uncompressed file from downloaded archive"
); );
let mut plugin_buf = let mut plugin_buf =
std::fs::read(temp_dir.path().join(file_name))?; std::fs::read(temp_dir.path().join(file_path))?;
process_plugin( process_plugin(
&mut plugin_buf, &mut plugin_buf,
&pool, &pool,
&mut plugins_archive, // &mut plugins_archive,
&db_file, &db_file,
&db_mod, &db_mod,
file_name, file_path,
) )
.await?; .await?;
} }
@ -338,12 +485,12 @@ pub async fn main() -> Result<()> {
let mut file = tokio_file.try_clone().await?.into_std().await; let mut file = tokio_file.try_clone().await?.into_std().await;
let mut plugin_file_paths = Vec::new(); let mut plugin_file_paths = Vec::new();
for file_name in list_archive_files(&file)? { for file_path in list_archive_files(&file)? {
if file_name.ends_with(".esp") if file_path.ends_with(".esp")
|| file_name.ends_with(".esm") || file_path.ends_with(".esm")
|| file_name.ends_with(".esl") || file_path.ends_with(".esl")
{ {
plugin_file_paths.push(file_name); plugin_file_paths.push(file_path);
} }
} }
info!( info!(
@ -351,13 +498,13 @@ pub async fn main() -> Result<()> {
"listed plugins in downloaded archive" "listed plugins in downloaded archive"
); );
for file_name in plugin_file_paths.iter() { for file_path in plugin_file_paths.iter() {
let plugin_span = info_span!("plugin", name = ?file_name); let plugin_span = info_span!("plugin", name = ?file_path);
let plugin_span = plugin_span.enter(); let plugin_span = plugin_span.enter();
file.seek(SeekFrom::Start(0))?; file.seek(SeekFrom::Start(0))?;
let mut buf = Vec::default(); let mut buf = Vec::default();
info!("uncompressing plugin file from downloaded archive"); info!("uncompressing plugin file from downloaded archive");
match uncompress_archive_file(&mut file, &mut buf, file_name) { match uncompress_archive_file(&mut file, &mut buf, file_path) {
Ok(_) => Ok(()), Ok(_) => Ok(()),
Err(err) => { Err(err) => {
if kind.mime_type() == "application/zip" { if kind.mime_type() == "application/zip" {
@ -382,20 +529,20 @@ pub async fn main() -> Result<()> {
]) ])
.status()?; .status()?;
for file_name in plugin_file_paths.iter() { for file_path in plugin_file_paths.iter() {
let plugin_span = let plugin_span =
info_span!("plugin", name = ?file_name); info_span!("plugin", name = ?file_path);
let _plugin_span = plugin_span.enter(); let _plugin_span = plugin_span.enter();
info!("processing uncompressed file from downloaded archive"); info!("processing uncompressed file from downloaded archive");
let mut plugin_buf = let mut plugin_buf =
std::fs::read(extracted_path.join(file_name))?; std::fs::read(extracted_path.join(file_path))?;
process_plugin( process_plugin(
&mut plugin_buf, &mut plugin_buf,
&pool, &pool,
&mut plugins_archive, // &mut plugins_archive,
&db_file, &db_file,
&db_mod, &db_mod,
file_name, file_path,
) )
.await?; .await?;
} }
@ -406,12 +553,8 @@ pub async fn main() -> Result<()> {
} }
}?; }?;
process_plugin( process_plugin(
&mut buf, &mut buf, &pool, // &mut plugins_archive,
&pool, &db_file, &db_mod, file_path,
&mut plugins_archive,
&db_file,
&db_mod,
file_name,
) )
.await?; .await?;
} }

View File

@ -1,25 +1,39 @@
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use chrono::NaiveDateTime; use chrono::NaiveDateTime;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use tracing::instrument; use tracing::instrument;
#[derive(Debug, Serialize, Deserialize)] use super::BATCH_SIZE;
#[derive(Debug, Serialize, Deserialize, FromRow)]
pub struct Cell { pub struct Cell {
pub id: i32, pub id: i32,
pub form_id: i32, pub form_id: i32,
pub master: String,
pub x: Option<i32>, pub x: Option<i32>,
pub y: Option<i32>, pub y: Option<i32>,
// TODO: make this not nullable
pub world_id: Option<i32>, pub world_id: Option<i32>,
pub is_persistent: bool, pub is_persistent: bool,
pub updated_at: NaiveDateTime, pub updated_at: NaiveDateTime,
pub created_at: NaiveDateTime, pub created_at: NaiveDateTime,
} }
#[derive(Debug, Serialize, Deserialize)]
pub struct UnsavedCell {
pub form_id: i32,
pub master: String,
pub x: Option<i32>,
pub y: Option<i32>,
pub world_id: Option<i32>,
pub is_persistent: bool,
}
#[instrument(level = "debug", skip(pool))] #[instrument(level = "debug", skip(pool))]
pub async fn insert( pub async fn insert(
pool: &sqlx::Pool<sqlx::Postgres>, pool: &sqlx::Pool<sqlx::Postgres>,
form_id: i32, form_id: i32,
master: &str,
x: Option<i32>, x: Option<i32>,
y: Option<i32>, y: Option<i32>,
world_id: Option<i32>, world_id: Option<i32>,
@ -28,13 +42,14 @@ pub async fn insert(
sqlx::query_as!( sqlx::query_as!(
Cell, Cell,
"INSERT INTO cells "INSERT INTO cells
(form_id, x, y, world_id, is_persistent, created_at, updated_at) (form_id, master, x, y, world_id, is_persistent, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, now(), now()) VALUES ($1, $2, $3, $4, $5, $6, now(), now())
ON CONFLICT (form_id, world_id) DO UPDATE ON CONFLICT (form_id, master, world_id) DO UPDATE
SET (x, y, is_persistent, updated_at) = SET (x, y, is_persistent, updated_at) =
(EXCLUDED.x, EXCLUDED.y, EXCLUDED.is_persistent, now()) (EXCLUDED.x, EXCLUDED.y, EXCLUDED.is_persistent, now())
RETURNING *", RETURNING *",
form_id, form_id,
master,
x, x,
y, y,
world_id, world_id,
@ -44,3 +59,48 @@ pub async fn insert(
.await .await
.context("Failed to insert cell") .context("Failed to insert cell")
} }
#[instrument(level = "debug", skip(pool))]
pub async fn batched_insert(
pool: &sqlx::Pool<sqlx::Postgres>,
cells: &[UnsavedCell],
) -> Result<Vec<Cell>> {
let mut saved_cells = vec![];
for batch in cells.chunks(BATCH_SIZE) {
let mut form_ids: Vec<i32> = vec![];
let mut masters: Vec<String> = vec![];
let mut xs: Vec<Option<i32>> = vec![];
let mut ys: Vec<Option<i32>> = vec![];
let mut world_ids: Vec<Option<i32>> = vec![];
let mut is_persistents: Vec<bool> = vec![];
batch.into_iter().for_each(|unsaved_cell| {
form_ids.push(unsaved_cell.form_id);
masters.push(unsaved_cell.master.clone());
xs.push(unsaved_cell.x);
ys.push(unsaved_cell.y);
world_ids.push(unsaved_cell.world_id);
is_persistents.push(unsaved_cell.is_persistent);
});
saved_cells.append(
// sqlx doesn't understand arrays of Options with the query_as! macro
&mut sqlx::query_as(
r#"INSERT INTO cells (form_id, master, x, y, world_id, is_persistent, created_at, updated_at)
SELECT *, now(), now() FROM UNNEST($1::int[], $2::text[], $3::int[], $4::int[], $5::int[], $6::bool[])
ON CONFLICT (form_id, master, world_id) DO UPDATE
SET (x, y, is_persistent, updated_at) =
(EXCLUDED.x, EXCLUDED.y, EXCLUDED.is_persistent, now())
RETURNING *"#,
)
.bind(&form_ids)
.bind(&masters)
.bind(&xs)
.bind(&ys)
.bind(&world_ids)
.bind(&is_persistents)
.fetch_all(pool)
.await
.context("Failed to insert cells")?,
);
}
Ok(saved_cells)
}

View File

@ -20,6 +20,21 @@ pub struct File {
pub created_at: NaiveDateTime, pub created_at: NaiveDateTime,
} }
#[instrument(level = "debug", skip(pool))]
pub async fn get_by_nexus_file_id(
pool: &sqlx::Pool<sqlx::Postgres>,
nexus_file_id: i32,
) -> Result<Option<File>> {
sqlx::query_as!(
File,
"SELECT * FROM files WHERE nexus_file_id = $1",
nexus_file_id,
)
.fetch_optional(pool)
.await
.context("Failed to get file")
}
#[instrument(level = "debug", skip(pool))] #[instrument(level = "debug", skip(pool))]
pub async fn insert( pub async fn insert(
pool: &sqlx::Pool<sqlx::Postgres>, pool: &sqlx::Pool<sqlx::Postgres>,

View File

@ -6,3 +6,5 @@ pub mod plugin;
pub mod plugin_cell; pub mod plugin_cell;
pub mod plugin_world; pub mod plugin_world;
pub mod world; pub mod world;
pub const BATCH_SIZE: usize = 50;

View File

@ -9,11 +9,13 @@ pub struct Plugin {
pub name: String, pub name: String,
pub hash: i64, pub hash: i64,
pub file_id: i32, pub file_id: i32,
pub version: Option<f64>, pub version: f64,
pub size: i64, pub size: i64,
pub author: Option<String>, pub author: Option<String>,
pub description: Option<String>, pub description: Option<String>,
pub masters: Option<Vec<String>>, pub masters: Vec<String>,
pub file_name: String,
pub file_path: String,
pub updated_at: NaiveDateTime, pub updated_at: NaiveDateTime,
pub created_at: NaiveDateTime, pub created_at: NaiveDateTime,
} }
@ -24,20 +26,22 @@ pub async fn insert(
name: &str, name: &str,
hash: i64, hash: i64,
file_id: i32, file_id: i32,
version: Option<f64>, version: f64,
size: i64, size: i64,
author: Option<&str>, author: Option<&str>,
description: Option<&str>, description: Option<&str>,
masters: Option<&[String]>, masters: &[String],
file_name: &str,
file_path: &str,
) -> Result<Plugin> { ) -> Result<Plugin> {
sqlx::query_as!( sqlx::query_as!(
Plugin, Plugin,
"INSERT INTO plugins "INSERT INTO plugins
(name, hash, file_id, version, size, author, description, masters, created_at, updated_at) (name, hash, file_id, version, size, author, description, masters, file_name, file_path, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, now(), now()) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, now(), now())
ON CONFLICT (file_id, name) DO UPDATE ON CONFLICT (file_id, file_path) DO UPDATE
SET (hash, version, author, description, masters, updated_at) = SET (name, hash, version, author, description, masters, file_name, updated_at) =
(EXCLUDED.hash, EXCLUDED.version, EXCLUDED.author, EXCLUDED.description, EXCLUDED.masters, now()) (EXCLUDED.name, EXCLUDED.hash, EXCLUDED.version, EXCLUDED.author, EXCLUDED.description, EXCLUDED.masters, EXCLUDED.file_name, now())
RETURNING *", RETURNING *",
name, name,
hash, hash,
@ -46,7 +50,9 @@ pub async fn insert(
size, size,
author, author,
description, description,
masters masters,
file_name,
file_path
) )
.fetch_one(pool) .fetch_one(pool)
.await .await

View File

@ -1,9 +1,12 @@
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use chrono::NaiveDateTime; use chrono::NaiveDateTime;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use tracing::instrument; use tracing::instrument;
#[derive(Debug, Serialize, Deserialize)] use super::BATCH_SIZE;
#[derive(Debug, Serialize, Deserialize, FromRow)]
pub struct PluginCell { pub struct PluginCell {
pub id: i32, pub id: i32,
pub plugin_id: i32, pub plugin_id: i32,
@ -13,6 +16,13 @@ pub struct PluginCell {
pub created_at: NaiveDateTime, pub created_at: NaiveDateTime,
} }
#[derive(Debug, Serialize, Deserialize)]
pub struct UnsavedPluginCell {
pub plugin_id: i32,
pub cell_id: i32,
pub editor_id: Option<String>,
}
#[instrument(level = "debug", skip(pool))] #[instrument(level = "debug", skip(pool))]
pub async fn insert( pub async fn insert(
pool: &sqlx::Pool<sqlx::Postgres>, pool: &sqlx::Pool<sqlx::Postgres>,
@ -36,3 +46,38 @@ pub async fn insert(
.await .await
.context("Failed to insert plugin_cell") .context("Failed to insert plugin_cell")
} }
#[instrument(level = "debug", skip(pool))]
pub async fn batched_insert(
pool: &sqlx::Pool<sqlx::Postgres>,
plugin_cells: &[UnsavedPluginCell],
) -> Result<Vec<PluginCell>> {
let mut saved_plugin_cells = vec![];
for batch in plugin_cells.chunks(BATCH_SIZE) {
let mut plugin_ids: Vec<i32> = vec![];
let mut cell_ids: Vec<i32> = vec![];
let mut editor_ids: Vec<Option<String>> = vec![];
batch.into_iter().for_each(|unsaved_plugin_cell| {
plugin_ids.push(unsaved_plugin_cell.plugin_id);
cell_ids.push(unsaved_plugin_cell.cell_id);
editor_ids.push(unsaved_plugin_cell.editor_id.as_ref().map(|s| s.clone()));
});
saved_plugin_cells.append(
// sqlx doesn't understand arrays of Options with the query_as! macro
&mut sqlx::query_as(
r#"INSERT INTO plugin_cells (plugin_id, cell_id, editor_id, created_at, updated_at)
SELECT *, now(), now() FROM UNNEST($1::int[], $2::int[], $3::text[])
ON CONFLICT (plugin_id, cell_id) DO UPDATE
SET (editor_id, updated_at) = (EXCLUDED.editor_id, now())
RETURNING *"#,
)
.bind(&plugin_ids)
.bind(&cell_ids)
.bind(&editor_ids)
.fetch_all(pool)
.await
.context("Failed to insert plugin_cells")?,
);
}
Ok(saved_plugin_cells)
}

View File

@ -3,6 +3,8 @@ use chrono::NaiveDateTime;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tracing::instrument; use tracing::instrument;
use super::BATCH_SIZE;
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct PluginWorld { pub struct PluginWorld {
pub id: i32, pub id: i32,
@ -13,6 +15,13 @@ pub struct PluginWorld {
pub created_at: NaiveDateTime, pub created_at: NaiveDateTime,
} }
#[derive(Debug, Serialize, Deserialize)]
pub struct UnsavedPluginWorld {
pub plugin_id: i32,
pub world_id: i32,
pub editor_id: String,
}
#[instrument(level = "debug", skip(pool))] #[instrument(level = "debug", skip(pool))]
pub async fn insert( pub async fn insert(
pool: &sqlx::Pool<sqlx::Postgres>, pool: &sqlx::Pool<sqlx::Postgres>,
@ -36,3 +45,38 @@ pub async fn insert(
.await .await
.context("Failed to insert plugin_world") .context("Failed to insert plugin_world")
} }
#[instrument(level = "debug", skip(pool))]
pub async fn batched_insert(
pool: &sqlx::Pool<sqlx::Postgres>,
plugin_worlds: &[UnsavedPluginWorld],
) -> Result<Vec<PluginWorld>> {
let mut saved_plugin_worlds = vec![];
for batch in plugin_worlds.chunks(BATCH_SIZE) {
let mut plugin_ids: Vec<i32> = vec![];
let mut world_ids: Vec<i32> = vec![];
let mut editor_ids: Vec<String> = vec![];
batch.into_iter().for_each(|unsaved_plugin_world| {
plugin_ids.push(unsaved_plugin_world.plugin_id);
world_ids.push(unsaved_plugin_world.world_id);
editor_ids.push(unsaved_plugin_world.editor_id.clone());
});
saved_plugin_worlds.append(
&mut sqlx::query_as!(
PluginWorld,
r#"INSERT INTO plugin_worlds (plugin_id, world_id, editor_id, created_at, updated_at)
SELECT *, now(), now() FROM UNNEST($1::int[], $2::int[], $3::text[])
ON CONFLICT (plugin_id, world_id) DO UPDATE
SET (editor_id, updated_at) = (EXCLUDED.editor_id, now())
RETURNING *"#,
&plugin_ids,
&world_ids,
&editor_ids,
)
.fetch_all(pool)
.await
.context("Failed to insert plugin_worlds")?,
);
}
Ok(saved_plugin_worlds)
}

View File

@ -3,6 +3,8 @@ use chrono::NaiveDateTime;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tracing::instrument; use tracing::instrument;
use super::BATCH_SIZE;
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct World { pub struct World {
pub id: i32, pub id: i32,
@ -12,6 +14,12 @@ pub struct World {
pub created_at: NaiveDateTime, pub created_at: NaiveDateTime,
} }
#[derive(Debug, Serialize, Deserialize)]
pub struct UnsavedWorld {
pub form_id: i32,
pub master: String,
}
#[instrument(level = "debug", skip(pool))] #[instrument(level = "debug", skip(pool))]
pub async fn insert( pub async fn insert(
pool: &sqlx::Pool<sqlx::Postgres>, pool: &sqlx::Pool<sqlx::Postgres>,
@ -33,3 +41,35 @@ pub async fn insert(
.await .await
.context("Failed to insert world") .context("Failed to insert world")
} }
#[instrument(level = "debug", skip(pool))]
pub async fn batched_insert(
pool: &sqlx::Pool<sqlx::Postgres>,
worlds: &[UnsavedWorld],
) -> Result<Vec<World>> {
let mut saved_worlds = vec![];
for batch in worlds.chunks(BATCH_SIZE) {
let mut form_ids: Vec<i32> = vec![];
let mut masters: Vec<String> = vec![];
batch.into_iter().for_each(|unsaved_world| {
form_ids.push(unsaved_world.form_id);
masters.push(unsaved_world.master.clone());
});
saved_worlds.append(
&mut sqlx::query_as!(
World,
r#"INSERT INTO worlds (form_id, master, created_at, updated_at)
SELECT *, now(), now() FROM UNNEST($1::int[], $2::text[])
ON CONFLICT (form_id, master) DO UPDATE
SET updated_at = now()
RETURNING *"#,
&form_ids,
&masters
)
.fetch_all(pool)
.await
.context("Failed to insert worlds")?,
);
}
Ok(saved_worlds)
}