Retry connect failures, write plugins to disk instead of zip archive

Writing to the zip was starting to take forever. It makes more sense to just use my big HD and then zip after I'm done downloading every file.
This commit is contained in:
Tyler Hallada 2021-07-26 19:31:25 -04:00
parent f62324d36c
commit 8a356ac7f5
6 changed files with 80 additions and 82 deletions

3
.gitignore vendored
View File

@ -1,3 +1,4 @@
/target
.env
plugins.zip
plugins.zip
plugins

View File

@ -8,18 +8,18 @@ use sqlx::postgres::PgPoolOptions;
use std::borrow::Borrow;
use std::convert::TryInto;
use std::env;
use std::fs::OpenOptions;
use std::io::Seek;
use std::io::SeekFrom;
use std::path::Path;
use std::process::Command;
use std::time::Duration;
use tempfile::tempdir;
use tokio::fs::create_dir_all;
use tokio::io::AsyncWriteExt;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
use tokio::time::sleep;
use tracing::{debug, error, info, info_span, warn};
use unrar::Archive;
use zip::write::{FileOptions, ZipWriter};
mod models;
mod nexus_api;
@ -59,17 +59,13 @@ fn get_local_form_id_and_master<'a>(
Ok((local_form_id, masters[master_index]))
}
async fn process_plugin<W>(
async fn process_plugin(
plugin_buf: &mut [u8],
pool: &sqlx::Pool<sqlx::Postgres>,
plugin_archive: &mut ZipWriter<W>,
db_file: &File,
mod_obj: &Mod,
db_mod: &Mod,
file_path: &str,
) -> Result<()>
where
W: std::io::Write + std::io::Seek,
{
) -> Result<()> {
if plugin_buf.len() == 0 {
warn!("skipping processing of invalid empty plugin");
return Ok(());
@ -174,31 +170,19 @@ where
warn!(error = %err, "Failed to parse plugin, skipping plugin");
}
}
plugin_archive.start_file(
format!(
"{}/{}/{}/{}",
GAME_NAME, mod_obj.nexus_mod_id, db_file.nexus_file_id, file_path
),
FileOptions::default(),
)?;
let mut reader = std::io::Cursor::new(&plugin_buf);
std::io::copy(&mut reader, plugin_archive)?;
Ok(())
}
fn initialize_plugins_archive(mod_id: i32, file_id: i32) -> Result<()> {
let mut plugins_archive = ZipWriter::new(
OpenOptions::new()
.write(true)
.create(true)
.open("plugins.zip")?,
let plugin_path = format!(
"plugins/{}/{}/{}/{}",
GAME_NAME, db_mod.nexus_mod_id, db_file.nexus_file_id, file_path
);
plugins_archive.add_directory(
format!("{}/{}/{}", GAME_NAME, mod_id, file_id),
FileOptions::default(),
)?;
plugins_archive.finish()?;
let plugin_path = Path::new(&plugin_path);
if let Some(dir) = plugin_path.parent() {
create_dir_all(dir).await?;
}
let mut file = tokio::fs::File::create(plugin_path).await?;
info!(path = %plugin_path.display(), "saving plugin to disk");
file.write_all(&plugin_buf).await?;
Ok(())
}
@ -331,13 +315,11 @@ pub async fn main() -> Result<()> {
let mut tokio_file = download_link_resp.download_file(&client).await?;
info!(bytes = api_file.size, "download finished");
initialize_plugins_archive(db_mod.nexus_mod_id, db_file.nexus_file_id)?;
let mut plugins_archive = ZipWriter::new_append(
OpenOptions::new()
.read(true)
.write(true)
.open("plugins.zip")?,
)?;
create_dir_all(format!(
"plugins/{}/{}/{}",
GAME_NAME, db_mod.nexus_mod_id, db_file.nexus_file_id
))
.await?;
let mut initial_bytes = [0; 8];
tokio_file.seek(SeekFrom::Start(0)).await?;
@ -417,7 +399,6 @@ pub async fn main() -> Result<()> {
process_plugin(
&mut plugin_buf,
&pool,
&mut plugins_archive,
&db_file,
&db_mod,
&file_path.to_string_lossy(),
@ -498,7 +479,6 @@ pub async fn main() -> Result<()> {
process_plugin(
&mut plugin_buf,
&pool,
&mut plugins_archive,
&db_file,
&db_mod,
file_path,
@ -511,20 +491,11 @@ pub async fn main() -> Result<()> {
Err(err)
}
}?;
process_plugin(
&mut buf,
&pool,
&mut plugins_archive,
&db_file,
&db_mod,
file_path,
)
.await?;
process_plugin(&mut buf, &pool, &db_file, &db_mod, file_path).await?;
}
}
}
plugins_archive.finish()?;
debug!(duration = ?download_link_resp.wait, "sleeping");
sleep(download_link_resp.wait).await;
}

View File

@ -5,11 +5,10 @@ use serde_json::Value;
use std::{env, time::Duration};
use tempfile::tempfile;
use tokio::fs::File;
use tokio::time::sleep;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::{info, instrument, warn};
use tracing::{info, instrument};
use super::{rate_limit_wait_duration, GAME_NAME, USER_AGENT};
use super::{rate_limit_wait_duration, warn_and_sleep, GAME_NAME, USER_AGENT};
pub struct DownloadLinkResponse {
pub wait: Duration,
@ -28,13 +27,17 @@ pub async fn get(client: &Client, mod_id: i32, file_id: i64) -> Result<DownloadL
.header("apikey", env::var("NEXUS_API_KEY")?)
.header("user-agent", USER_AGENT)
.send()
.await?
.error_for_status()
.await
{
Ok(res) => res,
Ok(res) => match res.error_for_status() {
Ok(res) => res,
Err(err) => {
warn_and_sleep("download_link::get", anyhow!(err), attempt).await;
continue;
}
},
Err(err) => {
warn!(error = %err, attempt, "Failed to get download link for file, trying again after 1 second");
sleep(std::time::Duration::from_secs(1)).await;
warn_and_sleep("download_link::get", anyhow!(err), attempt).await;
continue;
}
};
@ -69,13 +72,25 @@ impl DownloadLinkResponse {
pub async fn download_file(&self, client: &Client) -> Result<File> {
for attempt in 1..=3 {
let mut tokio_file = File::from_std(tempfile()?);
let res = client
let res = match client
.get(self.link()?)
.header("apikey", env::var("NEXUS_API_KEY")?)
.header("user-agent", USER_AGENT)
.send()
.await?
.error_for_status()?;
.await
{
Ok(res) => match res.error_for_status() {
Ok(res) => res,
Err(err) => {
warn_and_sleep("download_link::download_file", anyhow!(err), attempt).await;
continue;
}
},
Err(err) => {
warn_and_sleep("download_link::download_file", anyhow!(err), attempt).await;
continue;
}
};
info!(status = %res.status(), "downloading file from nexus");
// See: https://github.com/benkay86/async-applied/blob/master/reqwest-tokio-compat/src/main.rs
@ -90,8 +105,7 @@ impl DownloadLinkResponse {
return Ok(tokio_file);
}
Err(err) => {
warn!(error = %err, attempt, "Failed to download file, trying again after 1 second");
sleep(std::time::Duration::from_secs(1)).await;
warn_and_sleep("download_link::download_file", anyhow!(err), attempt).await
}
}
}

View File

@ -3,10 +3,9 @@ use chrono::NaiveDateTime;
use reqwest::Client;
use serde_json::Value;
use std::{env, time::Duration};
use tokio::time::sleep;
use tracing::{info, instrument, warn};
use tracing::{info, instrument};
use super::{rate_limit_wait_duration, GAME_NAME, USER_AGENT};
use super::{rate_limit_wait_duration, warn_and_sleep, GAME_NAME, USER_AGENT};
pub struct FilesResponse {
pub wait: Duration,
@ -37,13 +36,17 @@ pub async fn get(client: &Client, nexus_mod_id: i32) -> Result<FilesResponse> {
.header("apikey", env::var("NEXUS_API_KEY")?)
.header("user-agent", USER_AGENT)
.send()
.await?
.error_for_status()
.await
{
Ok(res) => res,
Ok(res) => match res.error_for_status() {
Ok(res) => res,
Err(err) => {
warn_and_sleep("files::get", anyhow!(err), attempt).await;
continue;
}
},
Err(err) => {
warn!(error = %err, attempt, "Failed to get files for mod, trying again after 1 second");
sleep(std::time::Duration::from_secs(1)).await;
warn_and_sleep("files::get", anyhow!(err), attempt).await;
continue;
}
};

View File

@ -2,11 +2,10 @@ use anyhow::{anyhow, Result};
use reqwest::Client;
use serde_json::Value;
use std::env;
use tokio::time::sleep;
use tracing::{info, instrument, warn};
use tracing::{info, instrument};
use super::files::ApiFile;
use super::USER_AGENT;
use super::{warn_and_sleep, USER_AGENT};
fn has_plugin(json: &Value) -> Result<bool> {
let node_type = json
@ -53,13 +52,17 @@ pub async fn contains_plugin(client: &Client, api_file: &ApiFile<'_>) -> Result<
.header("apikey", env::var("NEXUS_API_KEY")?)
.header("user-agent", USER_AGENT)
.send()
.await?
.error_for_status()
.await
{
Ok(res) => res,
Ok(res) => match res.error_for_status() {
Ok(res) => res,
Err(err) => {
warn_and_sleep("metadata::contains_plugin", anyhow!(err), attempt).await;
continue;
}
},
Err(err) => {
warn!(error = %err, attempt, "Failed to get metadata for file, trying again after 1 second");
sleep(std::time::Duration::from_secs(1)).await;
warn_and_sleep("metadata::contains_plugin", anyhow!(err), attempt).await;
continue;
}
};

View File

@ -3,7 +3,8 @@ use chrono::DateTime;
use chrono::Duration;
use chrono::Utc;
use reqwest::Response;
use tracing::info;
use tokio::time::sleep;
use tracing::{info, warn};
pub mod download_link;
pub mod files;
@ -51,3 +52,8 @@ pub fn rate_limit_wait_duration(res: &Response) -> Result<std::time::Duration> {
Ok(std::time::Duration::from_secs(1))
}
}
async fn warn_and_sleep(request_name: &str, err: anyhow::Error, attempt: i32) {
warn!(error = %err, attempt, "{} request failed, trying again after 1 second", request_name);
sleep(std::time::Duration::from_secs(1)).await;
}