Add more status columns to fix continuation of unprocessed files

This commit is contained in:
Tyler Hallada 2021-07-29 22:55:34 -04:00
parent e1e7dd2e5d
commit 3ec7467571
5 changed files with 69 additions and 11 deletions

View File

@ -0,0 +1 @@
ALTER TABLE "files" ADD COLUMN "has_plugin" BOOLEAN NOT NULL DEFAULT true;

View File

@ -0,0 +1 @@
ALTER TABLE "files" ADD COLUMN "unable_to_extract_plugins" BOOLEAN NOT NULL DEFAULT false;

View File

@ -261,8 +261,8 @@ pub async fn main() -> Result<()> {
Some(_) => true, Some(_) => true,
}); });
let present_file_ids: HashSet<i32> = let processed_file_ids: HashSet<i32> =
file::get_nexus_file_ids_by_mod_id(&pool, db_mod.id) file::get_processed_nexus_file_ids_by_mod_id(&pool, db_mod.id)
.await? .await?
.into_iter() .into_iter()
.collect(); .collect();
@ -272,8 +272,8 @@ pub async fn main() -> Result<()> {
info_span!("file", name = &api_file.file_name, id = &api_file.file_id); info_span!("file", name = &api_file.file_name, id = &api_file.file_id);
let _file_span = file_span.enter(); let _file_span = file_span.enter();
if present_file_ids.contains(&(api_file.file_id as i32)) { if processed_file_ids.contains(&(api_file.file_id as i32)) {
info!("skipping file already present in database"); info!("skipping file already present and processed in database");
continue; continue;
} }
let db_file = file::insert( let db_file = file::insert(
@ -297,6 +297,7 @@ pub async fn main() -> Result<()> {
checked_metadata = true; checked_metadata = true;
if !contains_plugin { if !contains_plugin {
info!("file metadata does not contain a plugin, skip downloading"); info!("file metadata does not contain a plugin, skip downloading");
file::update_has_plugin(&pool, db_file.id, false).await?;
continue; continue;
} }
} else { } else {
@ -342,6 +343,7 @@ pub async fn main() -> Result<()> {
match tokio_file.read_exact(&mut initial_bytes).await { match tokio_file.read_exact(&mut initial_bytes).await {
Err(err) => { Err(err) => {
warn!(error = %err, "failed to read initial bytes, skipping file"); warn!(error = %err, "failed to read initial bytes, skipping file");
file::update_unable_to_extract_plugins(&pool, db_file.id, true).await?;
continue; continue;
} }
_ => {} _ => {}
@ -384,6 +386,8 @@ pub async fn main() -> Result<()> {
} else { } else {
if !checked_metadata { if !checked_metadata {
warn!("failed to read archive and server has no metadata, skipping file"); warn!("failed to read archive and server has no metadata, skipping file");
file::update_unable_to_extract_plugins(&pool, db_file.id, true)
.await?;
continue; continue;
} else { } else {
error!("failed to read archive, but server had metadata"); error!("failed to read archive, but server had metadata");
@ -434,6 +438,8 @@ pub async fn main() -> Result<()> {
Err(err) => { Err(err) => {
if !checked_metadata { if !checked_metadata {
warn!(error = %err, "failed to read archive and server has no metadata, skipping file"); warn!(error = %err, "failed to read archive and server has no metadata, skipping file");
file::update_unable_to_extract_plugins(&pool, db_file.id, true)
.await?;
continue; continue;
} else { } else {
error!(error = %err, "failed to read archive, but server had metadata"); error!(error = %err, "failed to read archive, but server had metadata");

View File

@ -19,6 +19,8 @@ pub struct File {
pub updated_at: NaiveDateTime, pub updated_at: NaiveDateTime,
pub created_at: NaiveDateTime, pub created_at: NaiveDateTime,
pub downloaded_at: Option<NaiveDateTime>, pub downloaded_at: Option<NaiveDateTime>,
pub has_plugin: bool,
pub unable_to_extract_plugins: bool,
} }
#[instrument(level = "debug", skip(pool))] #[instrument(level = "debug", skip(pool))]
@ -37,11 +39,19 @@ pub async fn get_by_nexus_file_id(
} }
#[instrument(level = "debug", skip(pool))] #[instrument(level = "debug", skip(pool))]
pub async fn get_nexus_file_ids_by_mod_id( pub async fn get_processed_nexus_file_ids_by_mod_id(
pool: &sqlx::Pool<sqlx::Postgres>, pool: &sqlx::Pool<sqlx::Postgres>,
mod_id: i32, mod_id: i32,
) -> Result<Vec<i32>> { ) -> Result<Vec<i32>> {
sqlx::query!("SELECT nexus_file_id FROM files WHERE mod_id = $1", mod_id) sqlx::query!(
"SELECT nexus_file_id FROM files
WHERE mod_id = $1 AND (
downloaded_at IS NOT NULL OR
has_plugin = false OR
has_download_link = false
)",
mod_id
)
.map(|row| row.nexus_file_id) .map(|row| row.nexus_file_id)
.fetch_all(pool) .fetch_all(pool)
.await .await
@ -119,3 +129,43 @@ pub async fn update_downloaded_at(pool: &sqlx::Pool<sqlx::Postgres>, id: i32) ->
.await .await
.context("Failed to update file") .context("Failed to update file")
} }
#[instrument(level = "debug", skip(pool))]
pub async fn update_has_plugin(
pool: &sqlx::Pool<sqlx::Postgres>,
id: i32,
has_plugin: bool,
) -> Result<File> {
sqlx::query_as!(
File,
"UPDATE files
SET has_plugin = $2
WHERE id = $1
RETURNING *",
id,
has_plugin,
)
.fetch_one(pool)
.await
.context("Failed to update file")
}
#[instrument(level = "debug", skip(pool))]
pub async fn update_unable_to_extract_plugins(
pool: &sqlx::Pool<sqlx::Postgres>,
id: i32,
unable_to_extract_plugins: bool,
) -> Result<File> {
sqlx::query_as!(
File,
"UPDATE files
SET unable_to_extract_plugins = $2
WHERE id = $1
RETURNING *",
id,
update_unable_to_extract_plugins,
)
.fetch_one(pool)
.await
.context("Failed to update file")
}

View File

@ -35,7 +35,7 @@ pub fn rate_limit_wait_duration(res: &Response) -> Result<std::time::Duration> {
.expect("hourly reset in response headers"); .expect("hourly reset in response headers");
info!(daily_remaining, hourly_remaining, "rate limit check"); info!(daily_remaining, hourly_remaining, "rate limit check");
if daily_remaining == 0 && hourly_remaining == 0 { if daily_remaining <= 1 && hourly_remaining <= 1 {
let hourly_reset = hourly_reset.to_str()?.trim(); let hourly_reset = hourly_reset.to_str()?.trim();
let hourly_reset: DateTime<Utc> = let hourly_reset: DateTime<Utc> =
(DateTime::parse_from_str(hourly_reset, "%Y-%m-%d %H:%M:%S %z")? (DateTime::parse_from_str(hourly_reset, "%Y-%m-%d %H:%M:%S %z")?