Batch insert mods, add lifetimes to batch inserts

Using `&str`s prevents unneeded cloning everywhere, but unfortunately it doesn't work with the sqlx macros.
This commit is contained in:
Tyler Hallada 2021-07-21 23:20:51 -04:00
parent 3fe09a8f8f
commit f337917c99
6 changed files with 139 additions and 59 deletions

View File

@ -27,7 +27,10 @@ use models::game;
use models::plugin; use models::plugin;
use models::{cell, cell::UnsavedCell}; use models::{cell, cell::UnsavedCell};
use models::{file, file::File}; use models::{file, file::File};
use models::{game_mod, game_mod::Mod}; use models::{
game_mod,
game_mod::{Mod, UnsavedMod},
};
use models::{plugin_cell, plugin_cell::UnsavedPluginCell}; use models::{plugin_cell, plugin_cell::UnsavedPluginCell};
use models::{plugin_world, plugin_world::UnsavedPluginWorld}; use models::{plugin_world, plugin_world::UnsavedPluginWorld};
use models::{world, world::UnsavedWorld}; use models::{world, world::UnsavedWorld};
@ -104,10 +107,7 @@ where
&file_name, &file_name,
) )
.expect("form_id to be a valid i32"); .expect("form_id to be a valid i32");
UnsavedWorld { UnsavedWorld { form_id, master }
form_id,
master: master.to_string(),
}
}) })
.collect(); .collect();
let db_worlds = world::batched_insert(&pool, &worlds).await?; let db_worlds = world::batched_insert(&pool, &worlds).await?;
@ -117,7 +117,7 @@ where
.map(|(db_world, plugin_world)| UnsavedPluginWorld { .map(|(db_world, plugin_world)| UnsavedPluginWorld {
plugin_id: plugin_row.id, plugin_id: plugin_row.id,
world_id: db_world.id, world_id: db_world.id,
editor_id: plugin_world.editor_id.clone(), editor_id: &plugin_world.editor_id,
}) })
.collect(); .collect();
plugin_world::batched_insert(&pool, &plugin_worlds).await?; plugin_world::batched_insert(&pool, &plugin_worlds).await?;
@ -151,7 +151,7 @@ where
.expect("form_id is a valid i32"); .expect("form_id is a valid i32");
UnsavedCell { UnsavedCell {
form_id, form_id,
master: master.to_string(), master,
x: cell.x, x: cell.x,
y: cell.y, y: cell.y,
world_id, world_id,
@ -166,7 +166,7 @@ where
.map(|(db_cell, plugin_cell)| UnsavedPluginCell { .map(|(db_cell, plugin_cell)| UnsavedPluginCell {
plugin_id: plugin_row.id, plugin_id: plugin_row.id,
cell_id: db_cell.id, cell_id: db_cell.id,
editor_id: plugin_cell.editor_id.clone(), editor_id: plugin_cell.editor_id.as_ref().map(|id| id.as_ref()),
}) })
.collect(); .collect();
plugin_cell::batched_insert(&pool, &plugin_cells).await?; plugin_cell::batched_insert(&pool, &plugin_cells).await?;
@ -213,7 +213,7 @@ pub async fn main() -> Result<()> {
.max_connections(5) .max_connections(5)
.connect(&env::var("DATABASE_URL")?) .connect(&env::var("DATABASE_URL")?)
.await?; .await?;
let _game = game::insert(&pool, GAME_NAME, GAME_ID as i32).await?; let game = game::insert(&pool, GAME_NAME, GAME_ID as i32).await?;
let client = reqwest::Client::new(); let client = reqwest::Client::new();
let mut page: i32 = 1; let mut page: i32 = 1;
@ -226,7 +226,7 @@ pub async fn main() -> Result<()> {
let scraped = mod_list_resp.scrape_mods()?; let scraped = mod_list_resp.scrape_mods()?;
has_next_page = scraped.has_next_page; has_next_page = scraped.has_next_page;
let mods = game_mod::bulk_get_by_nexus_mod_id( let present_mods = game_mod::bulk_get_present_nexus_mod_ids(
&pool, &pool,
&scraped &scraped
.mods .mods
@ -235,6 +235,21 @@ pub async fn main() -> Result<()> {
.collect::<Vec<i32>>(), .collect::<Vec<i32>>(),
) )
.await?; .await?;
let mods_to_create: Vec<UnsavedMod> = scraped
.mods
.iter()
.filter(|scraped_mod| !present_mods.contains(&scraped_mod.nexus_mod_id))
.map(|scraped_mod| UnsavedMod {
name: scraped_mod.name,
nexus_mod_id: scraped_mod.nexus_mod_id,
author: scraped_mod.author,
category: scraped_mod.category,
description: scraped_mod.desc,
game_id: game.id,
})
.collect();
let mods = game_mod::batched_insert(&pool, &mods_to_create).await?;
for db_mod in mods { for db_mod in mods {
let mod_span = info_span!("mod", name = ?&db_mod.name, id = &db_mod.nexus_mod_id); let mod_span = info_span!("mod", name = ?&db_mod.name, id = &db_mod.nexus_mod_id);
@ -501,15 +516,12 @@ pub async fn main() -> Result<()> {
plugins_archive.finish()?; plugins_archive.finish()?;
debug!(duration = ?download_link_resp.wait, "sleeping"); debug!(duration = ?download_link_resp.wait, "sleeping");
sleep(download_link_resp.wait).await; sleep(download_link_resp.wait).await;
break;
} }
break;
} }
page += 1; page += 1;
debug!(?page, ?has_next_page, "sleeping 1 second"); debug!(?page, ?has_next_page, "sleeping 1 second");
sleep(Duration::from_secs(1)).await; sleep(Duration::from_secs(1)).await;
break;
} }
Ok(()) Ok(())

View File

@ -19,10 +19,10 @@ pub struct Cell {
pub created_at: NaiveDateTime, pub created_at: NaiveDateTime,
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug)]
pub struct UnsavedCell { pub struct UnsavedCell<'a> {
pub form_id: i32, pub form_id: i32,
pub master: String, pub master: &'a str,
pub x: Option<i32>, pub x: Option<i32>,
pub y: Option<i32>, pub y: Option<i32>,
pub world_id: Option<i32>, pub world_id: Option<i32>,
@ -61,21 +61,21 @@ pub async fn insert(
} }
#[instrument(level = "debug", skip(pool))] #[instrument(level = "debug", skip(pool))]
pub async fn batched_insert( pub async fn batched_insert<'a>(
pool: &sqlx::Pool<sqlx::Postgres>, pool: &sqlx::Pool<sqlx::Postgres>,
cells: &[UnsavedCell], cells: &[UnsavedCell<'a>],
) -> Result<Vec<Cell>> { ) -> Result<Vec<Cell>> {
let mut saved_cells = vec![]; let mut saved_cells = vec![];
for batch in cells.chunks(BATCH_SIZE) { for batch in cells.chunks(BATCH_SIZE) {
let mut form_ids: Vec<i32> = vec![]; let mut form_ids: Vec<i32> = vec![];
let mut masters: Vec<String> = vec![]; let mut masters: Vec<&str> = vec![];
let mut xs: Vec<Option<i32>> = vec![]; let mut xs: Vec<Option<i32>> = vec![];
let mut ys: Vec<Option<i32>> = vec![]; let mut ys: Vec<Option<i32>> = vec![];
let mut world_ids: Vec<Option<i32>> = vec![]; let mut world_ids: Vec<Option<i32>> = vec![];
let mut is_persistents: Vec<bool> = vec![]; let mut is_persistents: Vec<bool> = vec![];
batch.into_iter().for_each(|unsaved_cell| { batch.into_iter().for_each(|unsaved_cell| {
form_ids.push(unsaved_cell.form_id); form_ids.push(unsaved_cell.form_id);
masters.push(unsaved_cell.master.clone()); masters.push(unsaved_cell.master);
xs.push(unsaved_cell.x); xs.push(unsaved_cell.x);
ys.push(unsaved_cell.y); ys.push(unsaved_cell.y);
world_ids.push(unsaved_cell.world_id); world_ids.push(unsaved_cell.world_id);

View File

@ -1,9 +1,12 @@
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use chrono::NaiveDateTime; use chrono::NaiveDateTime;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use tracing::instrument; use tracing::instrument;
#[derive(Debug, Serialize, Deserialize)] use super::BATCH_SIZE;
#[derive(Debug, Serialize, Deserialize, FromRow)]
pub struct Mod { pub struct Mod {
pub id: i32, pub id: i32,
pub name: String, pub name: String,
@ -16,6 +19,16 @@ pub struct Mod {
pub created_at: NaiveDateTime, pub created_at: NaiveDateTime,
} }
#[derive(Debug)]
pub struct UnsavedMod<'a> {
pub name: &'a str,
pub nexus_mod_id: i32,
pub author: &'a str,
pub category: &'a str,
pub description: Option<&'a str>,
pub game_id: i32,
}
#[instrument(level = "debug", skip(pool))] #[instrument(level = "debug", skip(pool))]
pub async fn get_by_nexus_mod_id( pub async fn get_by_nexus_mod_id(
pool: &sqlx::Pool<sqlx::Postgres>, pool: &sqlx::Pool<sqlx::Postgres>,
@ -32,18 +45,25 @@ pub async fn get_by_nexus_mod_id(
} }
#[instrument(level = "debug", skip(pool))] #[instrument(level = "debug", skip(pool))]
pub async fn bulk_get_by_nexus_mod_id( pub async fn bulk_get_present_nexus_mod_ids(
pool: &sqlx::Pool<sqlx::Postgres>, pool: &sqlx::Pool<sqlx::Postgres>,
nexus_mod_ids: &[i32], nexus_mod_ids: &[i32],
) -> Result<Vec<Mod>> { ) -> Result<Vec<i32>> {
sqlx::query_as!( struct Row {
Mod, nexus_mod_id: i32,
"SELECT * FROM mods WHERE nexus_mod_id = ANY($1::int[])", }
Ok(sqlx::query_as!(
Row,
"SELECT nexus_mod_id FROM mods WHERE nexus_mod_id = ANY($1::int[])",
nexus_mod_ids, nexus_mod_ids,
) )
.fetch_all(pool) .fetch_all(pool)
.await .await
.context("Failed to get mods") .context("Failed to get mods")?
.into_iter()
.map(|row| row.nexus_mod_id)
.collect())
} }
#[instrument(level = "debug", skip(pool))] #[instrument(level = "debug", skip(pool))]
@ -76,3 +96,50 @@ pub async fn insert(
.await .await
.context("Failed to insert or update mod") .context("Failed to insert or update mod")
} }
#[instrument(level = "debug", skip(pool))]
pub async fn batched_insert<'a>(
pool: &sqlx::Pool<sqlx::Postgres>,
mods: &[UnsavedMod<'a>],
) -> Result<Vec<Mod>> {
let mut saved_mods = vec![];
for batch in mods.chunks(BATCH_SIZE) {
let mut names: Vec<&str> = vec![];
let mut nexus_mod_ids: Vec<i32> = vec![];
let mut authors: Vec<&str> = vec![];
let mut categories: Vec<&str> = vec![];
let mut descriptions: Vec<Option<&str>> = vec![];
let mut game_ids: Vec<i32> = vec![];
batch.into_iter().for_each(|unsaved_mod| {
names.push(unsaved_mod.name);
nexus_mod_ids.push(unsaved_mod.nexus_mod_id);
authors.push(unsaved_mod.author);
categories.push(unsaved_mod.category);
descriptions.push(unsaved_mod.description);
game_ids.push(unsaved_mod.game_id);
});
saved_mods.append(
// sqlx doesn't understand arrays of Options with the query_as! macro
&mut sqlx::query_as(
r#"INSERT INTO mods
(name, nexus_mod_id, author, category, description, game_id, created_at, updated_at)
SELECT *, now(), now()
FROM UNNEST($1::text[], $2::int[], $3::text[], $4::text[], $5::text[], $6::int[])
ON CONFLICT (game_id, nexus_mod_id) DO UPDATE
SET (name, author, category, description, updated_at) =
(EXCLUDED.name, EXCLUDED.author, EXCLUDED.category, EXCLUDED.description, now())
RETURNING *"#,
)
.bind(&names)
.bind(&nexus_mod_ids)
.bind(&authors)
.bind(&categories)
.bind(&descriptions)
.bind(&game_ids)
.fetch_all(pool)
.await
.context("Failed to insert mods")?,
);
}
Ok(saved_mods)
}

View File

@ -16,11 +16,11 @@ pub struct PluginCell {
pub created_at: NaiveDateTime, pub created_at: NaiveDateTime,
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug)]
pub struct UnsavedPluginCell { pub struct UnsavedPluginCell<'a> {
pub plugin_id: i32, pub plugin_id: i32,
pub cell_id: i32, pub cell_id: i32,
pub editor_id: Option<String>, pub editor_id: Option<&'a str>,
} }
#[instrument(level = "debug", skip(pool))] #[instrument(level = "debug", skip(pool))]
@ -48,19 +48,19 @@ pub async fn insert(
} }
#[instrument(level = "debug", skip(pool))] #[instrument(level = "debug", skip(pool))]
pub async fn batched_insert( pub async fn batched_insert<'a>(
pool: &sqlx::Pool<sqlx::Postgres>, pool: &sqlx::Pool<sqlx::Postgres>,
plugin_cells: &[UnsavedPluginCell], plugin_cells: &[UnsavedPluginCell<'a>],
) -> Result<Vec<PluginCell>> { ) -> Result<Vec<PluginCell>> {
let mut saved_plugin_cells = vec![]; let mut saved_plugin_cells = vec![];
for batch in plugin_cells.chunks(BATCH_SIZE) { for batch in plugin_cells.chunks(BATCH_SIZE) {
let mut plugin_ids: Vec<i32> = vec![]; let mut plugin_ids: Vec<i32> = vec![];
let mut cell_ids: Vec<i32> = vec![]; let mut cell_ids: Vec<i32> = vec![];
let mut editor_ids: Vec<Option<String>> = vec![]; let mut editor_ids: Vec<Option<&str>> = vec![];
batch.into_iter().for_each(|unsaved_plugin_cell| { batch.into_iter().for_each(|unsaved_plugin_cell| {
plugin_ids.push(unsaved_plugin_cell.plugin_id); plugin_ids.push(unsaved_plugin_cell.plugin_id);
cell_ids.push(unsaved_plugin_cell.cell_id); cell_ids.push(unsaved_plugin_cell.cell_id);
editor_ids.push(unsaved_plugin_cell.editor_id.as_ref().map(|s| s.clone())); editor_ids.push(unsaved_plugin_cell.editor_id);
}); });
saved_plugin_cells.append( saved_plugin_cells.append(
// sqlx doesn't understand arrays of Options with the query_as! macro // sqlx doesn't understand arrays of Options with the query_as! macro

View File

@ -1,11 +1,12 @@
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use chrono::NaiveDateTime; use chrono::NaiveDateTime;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use tracing::instrument; use tracing::instrument;
use super::BATCH_SIZE; use super::BATCH_SIZE;
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize, FromRow)]
pub struct PluginWorld { pub struct PluginWorld {
pub id: i32, pub id: i32,
pub plugin_id: i32, pub plugin_id: i32,
@ -15,11 +16,11 @@ pub struct PluginWorld {
pub created_at: NaiveDateTime, pub created_at: NaiveDateTime,
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug)]
pub struct UnsavedPluginWorld { pub struct UnsavedPluginWorld<'a> {
pub plugin_id: i32, pub plugin_id: i32,
pub world_id: i32, pub world_id: i32,
pub editor_id: String, pub editor_id: &'a str,
} }
#[instrument(level = "debug", skip(pool))] #[instrument(level = "debug", skip(pool))]
@ -47,32 +48,31 @@ pub async fn insert(
} }
#[instrument(level = "debug", skip(pool))] #[instrument(level = "debug", skip(pool))]
pub async fn batched_insert( pub async fn batched_insert<'a>(
pool: &sqlx::Pool<sqlx::Postgres>, pool: &sqlx::Pool<sqlx::Postgres>,
plugin_worlds: &[UnsavedPluginWorld], plugin_worlds: &[UnsavedPluginWorld<'a>],
) -> Result<Vec<PluginWorld>> { ) -> Result<Vec<PluginWorld>> {
let mut saved_plugin_worlds = vec![]; let mut saved_plugin_worlds = vec![];
for batch in plugin_worlds.chunks(BATCH_SIZE) { for batch in plugin_worlds.chunks(BATCH_SIZE) {
let mut plugin_ids: Vec<i32> = vec![]; let mut plugin_ids: Vec<i32> = vec![];
let mut world_ids: Vec<i32> = vec![]; let mut world_ids: Vec<i32> = vec![];
let mut editor_ids: Vec<String> = vec![]; let mut editor_ids: Vec<&str> = vec![];
batch.into_iter().for_each(|unsaved_plugin_world| { batch.into_iter().for_each(|unsaved_plugin_world| {
plugin_ids.push(unsaved_plugin_world.plugin_id); plugin_ids.push(unsaved_plugin_world.plugin_id);
world_ids.push(unsaved_plugin_world.world_id); world_ids.push(unsaved_plugin_world.world_id);
editor_ids.push(unsaved_plugin_world.editor_id.clone()); editor_ids.push(unsaved_plugin_world.editor_id.clone());
}); });
saved_plugin_worlds.append( saved_plugin_worlds.append(
&mut sqlx::query_as!( &mut sqlx::query_as(
PluginWorld,
r#"INSERT INTO plugin_worlds (plugin_id, world_id, editor_id, created_at, updated_at) r#"INSERT INTO plugin_worlds (plugin_id, world_id, editor_id, created_at, updated_at)
SELECT *, now(), now() FROM UNNEST($1::int[], $2::int[], $3::text[]) SELECT *, now(), now() FROM UNNEST($1::int[], $2::int[], $3::text[])
ON CONFLICT (plugin_id, world_id) DO UPDATE ON CONFLICT (plugin_id, world_id) DO UPDATE
SET (editor_id, updated_at) = (EXCLUDED.editor_id, now()) SET (editor_id, updated_at) = (EXCLUDED.editor_id, now())
RETURNING *"#, RETURNING *"#
&plugin_ids,
&world_ids,
&editor_ids,
) )
.bind(&plugin_ids)
.bind(&world_ids)
.bind(&editor_ids)
.fetch_all(pool) .fetch_all(pool)
.await .await
.context("Failed to insert plugin_worlds")?, .context("Failed to insert plugin_worlds")?,

View File

@ -1,11 +1,12 @@
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use chrono::NaiveDateTime; use chrono::NaiveDateTime;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use tracing::instrument; use tracing::instrument;
use super::BATCH_SIZE; use super::BATCH_SIZE;
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize, FromRow)]
pub struct World { pub struct World {
pub id: i32, pub id: i32,
pub form_id: i32, pub form_id: i32,
@ -14,10 +15,10 @@ pub struct World {
pub created_at: NaiveDateTime, pub created_at: NaiveDateTime,
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug)]
pub struct UnsavedWorld { pub struct UnsavedWorld<'a> {
pub form_id: i32, pub form_id: i32,
pub master: String, pub master: &'a str,
} }
#[instrument(level = "debug", skip(pool))] #[instrument(level = "debug", skip(pool))]
@ -43,29 +44,29 @@ pub async fn insert(
} }
#[instrument(level = "debug", skip(pool))] #[instrument(level = "debug", skip(pool))]
pub async fn batched_insert( pub async fn batched_insert<'a>(
pool: &sqlx::Pool<sqlx::Postgres>, pool: &sqlx::Pool<sqlx::Postgres>,
worlds: &[UnsavedWorld], worlds: &[UnsavedWorld<'a>],
) -> Result<Vec<World>> { ) -> Result<Vec<World>> {
let mut saved_worlds = vec![]; let mut saved_worlds = vec![];
for batch in worlds.chunks(BATCH_SIZE) { for batch in worlds.chunks(BATCH_SIZE) {
let mut form_ids: Vec<i32> = vec![]; let mut form_ids: Vec<i32> = vec![];
let mut masters: Vec<String> = vec![]; let mut masters: Vec<&str> = vec![];
batch.into_iter().for_each(|unsaved_world| { batch.into_iter().for_each(|unsaved_world| {
form_ids.push(unsaved_world.form_id); form_ids.push(unsaved_world.form_id);
masters.push(unsaved_world.master.clone()); masters.push(unsaved_world.master);
}); });
saved_worlds.append( saved_worlds.append(
&mut sqlx::query_as!( // cannot use macro with types that have lifetimes: https://github.com/launchbadge/sqlx/issues/280
World, &mut sqlx::query_as(
r#"INSERT INTO worlds (form_id, master, created_at, updated_at) r#"INSERT INTO worlds (form_id, master, created_at, updated_at)
SELECT *, now(), now() FROM UNNEST($1::int[], $2::text[]) SELECT *, now(), now() FROM UNNEST($1::int[], $2::text[])
ON CONFLICT (form_id, master) DO UPDATE ON CONFLICT (form_id, master) DO UPDATE
SET updated_at = now() SET updated_at = now()
RETURNING *"#, RETURNING *"#,
&form_ids,
&masters
) )
.bind(&form_ids)
.bind(&masters)
.fetch_all(pool) .fetch_all(pool)
.await .await
.context("Failed to insert worlds")?, .context("Failed to insert worlds")?,