Log spans, track size & undownloadable files

This commit is contained in:
Tyler Hallada 2021-07-11 22:49:29 -04:00
parent 792e78391c
commit b609e7059a
10 changed files with 104 additions and 47 deletions

View File

@ -30,7 +30,9 @@ CREATE TABLE IF NOT EXISTS "files" (
"category" VARCHAR(255), "category" VARCHAR(255),
"version" VARCHAR(255), "version" VARCHAR(255),
"mod_version" VARCHAR(255), "mod_version" VARCHAR(255),
"size" BIGINT NOT NULL,
"uploaded_at" timestamp(3) NOT NULL, "uploaded_at" timestamp(3) NOT NULL,
"has_download_link" BOOLEAN NOT NULL DEFAULT true,
"created_at" timestamp(3) NOT NULL, "created_at" timestamp(3) NOT NULL,
"updated_at" timestamp(3) NOT NULL "updated_at" timestamp(3) NOT NULL
); );
@ -43,6 +45,7 @@ CREATE TABLE IF NOT EXISTS "plugins" (
"hash" BIGINT NOT NULL, "hash" BIGINT NOT NULL,
"file_id" INTEGER REFERENCES "files"(id) NOT NULL, "file_id" INTEGER REFERENCES "files"(id) NOT NULL,
"version" FLOAT, "version" FLOAT,
"size" BIGINT NOT NULL,
"author" TEXT, "author" TEXT,
"description" TEXT, "description" TEXT,
"masters" VARCHAR(255)[], "masters" VARCHAR(255)[],

View File

@ -1,6 +1,7 @@
use anyhow::Result; use anyhow::Result;
use compress_tools::{list_archive_files, uncompress_archive_file}; use compress_tools::{list_archive_files, uncompress_archive_file};
use dotenv::dotenv; use dotenv::dotenv;
use reqwest::StatusCode;
use skyrim_cell_dump::parse_plugin; use skyrim_cell_dump::parse_plugin;
use sqlx::postgres::PgPoolOptions; use sqlx::postgres::PgPoolOptions;
use std::convert::TryInto; use std::convert::TryInto;
@ -12,7 +13,7 @@ use std::time::Duration;
use tempfile::tempdir; use tempfile::tempdir;
use tokio::io::{AsyncReadExt, AsyncSeekExt}; use tokio::io::{AsyncReadExt, AsyncSeekExt};
use tokio::time::sleep; use tokio::time::sleep;
use tracing::{debug, error, info, instrument, warn}; use tracing::{debug, error, info, info_span, warn};
use unrar::Archive; use unrar::Archive;
use zip::write::{FileOptions, ZipWriter}; use zip::write::{FileOptions, ZipWriter};
@ -20,15 +21,14 @@ mod models;
mod nexus_api; mod nexus_api;
mod nexus_scraper; mod nexus_scraper;
use models::cell::insert_cell; use models::cell;
use models::file::{insert_file, File}; use models::game;
use models::game::insert_game; use models::plugin;
use models::game_mod::{get_mod_by_nexus_mod_id, insert_mod, Mod}; use models::plugin_cell;
use models::plugin::insert_plugin; use models::{file, file::File};
use models::plugin_cell::insert_plugin_cell; use models::{game_mod, game_mod::Mod};
use nexus_api::{GAME_ID, GAME_NAME}; use nexus_api::{GAME_ID, GAME_NAME};
#[instrument(level = "debug", skip(plugin_buf, pool, plugin_archive, db_file, mod_obj), fields(name = ?mod_obj.name, id = mod_obj.nexus_mod_id))]
async fn process_plugin<W>( async fn process_plugin<W>(
plugin_buf: &mut [u8], plugin_buf: &mut [u8],
pool: &sqlx::Pool<sqlx::Postgres>, pool: &sqlx::Pool<sqlx::Postgres>,
@ -40,15 +40,17 @@ async fn process_plugin<W>(
where where
W: std::io::Write + std::io::Seek, W: std::io::Write + std::io::Seek,
{ {
info!(bytes = plugin_buf.len(), "parsing plugin");
let plugin = parse_plugin(&plugin_buf)?; let plugin = parse_plugin(&plugin_buf)?;
info!(file_name, num_cells = plugin.cells.len(), "parsed plugin"); info!(num_cells = plugin.cells.len(), "parse finished");
let hash = seahash::hash(&plugin_buf); let hash = seahash::hash(&plugin_buf);
let plugin_row = insert_plugin( let plugin_row = plugin::insert(
&pool, &pool,
&db_file.name, &db_file.name,
hash as i64, hash as i64,
db_file.id, db_file.id,
Some(plugin.header.version as f64), Some(plugin.header.version as f64),
plugin_buf.len() as i64,
plugin.header.author, plugin.header.author,
plugin.header.description, plugin.header.description,
Some( Some(
@ -62,7 +64,7 @@ where
) )
.await?; .await?;
for cell in plugin.cells { for cell in plugin.cells {
let cell_row = insert_cell( let cell_row = cell::insert(
&pool, &pool,
cell.form_id.try_into().unwrap(), cell.form_id.try_into().unwrap(),
cell.x, cell.x,
@ -70,7 +72,7 @@ where
cell.is_persistent, cell.is_persistent,
) )
.await?; .await?;
insert_plugin_cell(&pool, plugin_row.id, cell_row.id, cell.editor_id).await?; plugin_cell::insert(&pool, plugin_row.id, cell_row.id, cell.editor_id).await?;
} }
plugin_archive.start_file( plugin_archive.start_file(
format!( format!(
@ -110,22 +112,24 @@ pub async fn main() -> Result<()> {
.max_connections(5) .max_connections(5)
.connect(&env::var("DATABASE_URL")?) .connect(&env::var("DATABASE_URL")?)
.await?; .await?;
let game = insert_game(&pool, GAME_NAME, GAME_ID as i32).await?; let game = game::insert(&pool, GAME_NAME, GAME_ID as i32).await?;
let client = reqwest::Client::new(); let client = reqwest::Client::new();
let mut page: i32 = 1; let mut page: i32 = 1;
let mut has_next_page = true; let mut has_next_page = true;
while has_next_page { while has_next_page {
let page_span = info_span!("page", page);
let _page_span = page_span.enter();
let mod_list_resp = nexus_scraper::get_mod_list_page(&client, page).await?; let mod_list_resp = nexus_scraper::get_mod_list_page(&client, page).await?;
let scraped = mod_list_resp.scrape_mods()?; let scraped = mod_list_resp.scrape_mods()?;
has_next_page = scraped.has_next_page; has_next_page = scraped.has_next_page;
let mut mods = Vec::new(); let mut mods = Vec::new();
for scraped_mod in scraped.mods { for scraped_mod in scraped.mods {
if let None = get_mod_by_nexus_mod_id(&pool, scraped_mod.nexus_mod_id).await? { if let None = game_mod::get_by_nexus_mod_id(&pool, scraped_mod.nexus_mod_id).await? {
mods.push( mods.push(
insert_mod( game_mod::insert(
&pool, &pool,
scraped_mod.name, scraped_mod.name,
scraped_mod.nexus_mod_id, scraped_mod.nexus_mod_id,
@ -140,27 +144,26 @@ pub async fn main() -> Result<()> {
} }
for db_mod in mods { for db_mod in mods {
info!( let mod_span = info_span!("mod", name = ?&db_mod.name, id = &db_mod.nexus_mod_id);
mod_name = ?&db_mod.name, let _mod_span = mod_span.enter();
mod_id = ?&db_mod.nexus_mod_id,
"fetching files for mod"
);
let files_resp = nexus_api::files::get(&client, db_mod.nexus_mod_id).await?; let files_resp = nexus_api::files::get(&client, db_mod.nexus_mod_id).await?;
// TODO: download other files than just MAIN files
// let files = files.into_iter().filter(|file| {
// if let Some(category_name) = file.get("category_name") {
// category_name.as_str() == Some("MAIN")
// } else {
// false
// }
// });
if let Some(duration) = files_resp.wait { if let Some(duration) = files_resp.wait {
debug!(?duration, "sleeping"); debug!(?duration, "sleeping");
sleep(duration).await; sleep(duration).await;
} }
for api_file in files_resp.files()? { // Filter out replaced/deleted files (indicated by null category)
let db_file = insert_file( let files = files_resp
.files()?
.into_iter()
.filter(|file| file.category.is_some());
for api_file in files {
let file_span =
info_span!("file", name = &api_file.file_name, id = &api_file.file_id);
let _file_span = file_span.enter();
let db_file = file::insert(
&pool, &pool,
api_file.name, api_file.name,
api_file.file_name, api_file.file_name,
@ -169,6 +172,7 @@ pub async fn main() -> Result<()> {
api_file.category, api_file.category,
api_file.version, api_file.version,
api_file.mod_version, api_file.mod_version,
api_file.size,
api_file.uploaded_at, api_file.uploaded_at,
) )
.await?; .await?;
@ -177,8 +181,23 @@ pub async fn main() -> Result<()> {
let download_link_resp = let download_link_resp =
nexus_api::download_link::get(&client, db_mod.nexus_mod_id, api_file.file_id) nexus_api::download_link::get(&client, db_mod.nexus_mod_id, api_file.file_id)
.await?; .await;
if let Err(err) = &download_link_resp {
if let Some(reqwest_err) = err.downcast_ref::<reqwest::Error>() {
if reqwest_err.status() == Some(StatusCode::NOT_FOUND) {
warn!(
status = ?reqwest_err.status(),
file_id = api_file.file_id,
"failed to get download link for file"
);
file::update_has_download_link(&pool, db_file.id, false).await?;
continue;
}
}
}
let download_link_resp = download_link_resp?;
let mut tokio_file = download_link_resp.download_file(&client).await?; let mut tokio_file = download_link_resp.download_file(&client).await?;
info!(bytes = api_file.size, "download finished");
initialize_plugins_archive(db_mod.nexus_mod_id, db_file.nexus_file_id)?; initialize_plugins_archive(db_mod.nexus_mod_id, db_file.nexus_file_id)?;
let mut plugins_archive = ZipWriter::new_append( let mut plugins_archive = ZipWriter::new_append(
@ -215,11 +234,10 @@ pub async fn main() -> Result<()> {
); );
for file_name in plugin_file_paths.iter() { for file_name in plugin_file_paths.iter() {
let plugin_span = info_span!("plugin", name = ?file_name);
let _plugin_span = plugin_span.enter();
file.seek(SeekFrom::Start(0))?; file.seek(SeekFrom::Start(0))?;
info!( info!("attempting to uncompress plugin file from downloaded archive");
?file_name,
"attempting to uncompress file from downloaded archive"
);
let mut buf = Vec::default(); let mut buf = Vec::default();
match uncompress_archive_file(&mut file, &mut buf, file_name) { match uncompress_archive_file(&mut file, &mut buf, file_name) {
Ok(_) => { Ok(_) => {

View File

@ -15,7 +15,7 @@ pub struct Cell {
} }
#[instrument(level = "debug", skip(pool))] #[instrument(level = "debug", skip(pool))]
pub async fn insert_cell( pub async fn insert(
pool: &sqlx::Pool<sqlx::Postgres>, pool: &sqlx::Pool<sqlx::Postgres>,
form_id: i32, form_id: i32,
x: Option<i32>, x: Option<i32>,

View File

@ -13,13 +13,15 @@ pub struct File {
pub category: Option<String>, pub category: Option<String>,
pub version: Option<String>, pub version: Option<String>,
pub mod_version: Option<String>, pub mod_version: Option<String>,
pub size: i64,
pub uploaded_at: NaiveDateTime, pub uploaded_at: NaiveDateTime,
pub has_download_link: bool,
pub updated_at: NaiveDateTime, pub updated_at: NaiveDateTime,
pub created_at: NaiveDateTime, pub created_at: NaiveDateTime,
} }
#[instrument(level = "debug", skip(pool))] #[instrument(level = "debug", skip(pool))]
pub async fn insert_file( pub async fn insert(
pool: &sqlx::Pool<sqlx::Postgres>, pool: &sqlx::Pool<sqlx::Postgres>,
name: &str, name: &str,
file_name: &str, file_name: &str,
@ -28,13 +30,14 @@ pub async fn insert_file(
category: Option<&str>, category: Option<&str>,
version: Option<&str>, version: Option<&str>,
mod_version: Option<&str>, mod_version: Option<&str>,
size: i64,
uploaded_at: NaiveDateTime, uploaded_at: NaiveDateTime,
) -> Result<File> { ) -> Result<File> {
sqlx::query_as!( sqlx::query_as!(
File, File,
"INSERT INTO files "INSERT INTO files
(name, file_name, nexus_file_id, mod_id, category, version, mod_version, uploaded_at, created_at, updated_at) (name, file_name, nexus_file_id, mod_id, category, version, mod_version, size, uploaded_at, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, now(), now()) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, now(), now())
ON CONFLICT (mod_id, nexus_file_id) DO UPDATE ON CONFLICT (mod_id, nexus_file_id) DO UPDATE
SET (name, file_name, category, version, mod_version, uploaded_at, updated_at) = SET (name, file_name, category, version, mod_version, uploaded_at, updated_at) =
(EXCLUDED.name, EXCLUDED.file_name, EXCLUDED.category, EXCLUDED.version, EXCLUDED.mod_version, EXCLUDED.uploaded_at, now()) (EXCLUDED.name, EXCLUDED.file_name, EXCLUDED.category, EXCLUDED.version, EXCLUDED.mod_version, EXCLUDED.uploaded_at, now())
@ -46,9 +49,30 @@ pub async fn insert_file(
category, category,
version, version,
mod_version, mod_version,
size,
uploaded_at uploaded_at
) )
.fetch_one(pool) .fetch_one(pool)
.await .await
.context("Failed to insert file") .context("Failed to insert file")
} }
#[instrument(level = "debug", skip(pool))]
pub async fn update_has_download_link(
pool: &sqlx::Pool<sqlx::Postgres>,
id: i32,
has_download_link: bool,
) -> Result<File> {
sqlx::query_as!(
File,
"UPDATE files
SET has_download_link = $2
WHERE id = $1
RETURNING *",
id,
has_download_link,
)
.fetch_one(pool)
.await
.context("Failed to update file")
}

View File

@ -13,7 +13,7 @@ pub struct Game {
} }
#[instrument(level = "debug", skip(pool))] #[instrument(level = "debug", skip(pool))]
pub async fn insert_game( pub async fn insert(
pool: &sqlx::Pool<sqlx::Postgres>, pool: &sqlx::Pool<sqlx::Postgres>,
name: &str, name: &str,
nexus_game_id: i32, nexus_game_id: i32,

View File

@ -17,7 +17,7 @@ pub struct Mod {
} }
#[instrument(level = "debug", skip(pool))] #[instrument(level = "debug", skip(pool))]
pub async fn get_mod_by_nexus_mod_id( pub async fn get_by_nexus_mod_id(
pool: &sqlx::Pool<sqlx::Postgres>, pool: &sqlx::Pool<sqlx::Postgres>,
nexus_mod_id: i32, nexus_mod_id: i32,
) -> Result<Option<Mod>> { ) -> Result<Option<Mod>> {
@ -32,7 +32,7 @@ pub async fn get_mod_by_nexus_mod_id(
} }
#[instrument(level = "debug", skip(pool))] #[instrument(level = "debug", skip(pool))]
pub async fn insert_mod( pub async fn insert(
pool: &sqlx::Pool<sqlx::Postgres>, pool: &sqlx::Pool<sqlx::Postgres>,
name: &str, name: &str,
nexus_mod_id: i32, nexus_mod_id: i32,

View File

@ -10,6 +10,7 @@ pub struct Plugin {
pub hash: i64, pub hash: i64,
pub file_id: i32, pub file_id: i32,
pub version: Option<f64>, pub version: Option<f64>,
pub size: i64,
pub author: Option<String>, pub author: Option<String>,
pub description: Option<String>, pub description: Option<String>,
pub masters: Option<Vec<String>>, pub masters: Option<Vec<String>>,
@ -18,12 +19,13 @@ pub struct Plugin {
} }
#[instrument(level = "debug", skip(pool))] #[instrument(level = "debug", skip(pool))]
pub async fn insert_plugin( pub async fn insert(
pool: &sqlx::Pool<sqlx::Postgres>, pool: &sqlx::Pool<sqlx::Postgres>,
name: &str, name: &str,
hash: i64, hash: i64,
file_id: i32, file_id: i32,
version: Option<f64>, version: Option<f64>,
size: i64,
author: Option<&str>, author: Option<&str>,
description: Option<&str>, description: Option<&str>,
masters: Option<&[String]>, masters: Option<&[String]>,
@ -31,8 +33,8 @@ pub async fn insert_plugin(
sqlx::query_as!( sqlx::query_as!(
Plugin, Plugin,
"INSERT INTO plugins "INSERT INTO plugins
(name, hash, file_id, version, author, description, masters, created_at, updated_at) (name, hash, file_id, version, size, author, description, masters, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, now(), now()) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, now(), now())
ON CONFLICT (file_id, name) DO UPDATE ON CONFLICT (file_id, name) DO UPDATE
SET (hash, version, author, description, masters, updated_at) = SET (hash, version, author, description, masters, updated_at) =
(EXCLUDED.hash, EXCLUDED.version, EXCLUDED.author, EXCLUDED.description, EXCLUDED.masters, now()) (EXCLUDED.hash, EXCLUDED.version, EXCLUDED.author, EXCLUDED.description, EXCLUDED.masters, now())
@ -41,6 +43,7 @@ pub async fn insert_plugin(
hash, hash,
file_id, file_id,
version, version,
size,
author, author,
description, description,
masters masters

View File

@ -13,7 +13,7 @@ pub struct PluginCell {
} }
#[instrument(level = "debug", skip(pool))] #[instrument(level = "debug", skip(pool))]
pub async fn insert_plugin_cell( pub async fn insert(
pool: &sqlx::Pool<sqlx::Postgres>, pool: &sqlx::Pool<sqlx::Postgres>,
plugin_id: i32, plugin_id: i32,
cell_id: i32, cell_id: i32,

View File

@ -61,7 +61,7 @@ impl DownloadLinkResponse {
.send() .send()
.await? .await?
.error_for_status()?; .error_for_status()?;
info!(status = %res.status(), "downloaded file from nexus"); info!(status = %res.status(), "downloading file from nexus");
// See: https://github.com/benkay86/async-applied/blob/master/reqwest-tokio-compat/src/main.rs // See: https://github.com/benkay86/async-applied/blob/master/reqwest-tokio-compat/src/main.rs
let mut byte_stream = res let mut byte_stream = res

View File

@ -19,6 +19,7 @@ pub struct ApiFile<'a> {
pub category: Option<&'a str>, pub category: Option<&'a str>,
pub version: Option<&'a str>, pub version: Option<&'a str>,
pub mod_version: Option<&'a str>, pub mod_version: Option<&'a str>,
pub size: i64,
pub uploaded_at: NaiveDateTime, pub uploaded_at: NaiveDateTime,
} }
@ -84,6 +85,13 @@ impl FilesResponse {
.get("mod_version") .get("mod_version")
.ok_or_else(|| anyhow!("Missing mod_version key in file in API response"))? .ok_or_else(|| anyhow!("Missing mod_version key in file in API response"))?
.as_str(); .as_str();
let size = file
.get("size_in_bytes")
.ok_or_else(|| anyhow!("Missing size_in_bytes key in file in API response"))?
.as_i64()
.ok_or_else(|| {
anyhow!("size_in_bytes value in API response file is not a number")
})?;
let uploaded_timestamp = file let uploaded_timestamp = file
.get("uploaded_timestamp") .get("uploaded_timestamp")
.ok_or_else(|| { .ok_or_else(|| {
@ -102,6 +110,7 @@ impl FilesResponse {
category, category,
version, version,
mod_version, mod_version,
size,
uploaded_at, uploaded_at,
}) })
}) })