diff --git a/src/actors/feed_crawler.rs b/src/actors/feed_crawler.rs index 4a03bdb..d4b1158 100644 --- a/src/actors/feed_crawler.rs +++ b/src/actors/feed_crawler.rs @@ -1,19 +1,20 @@ +use std::cmp::Ordering; use std::fmt::{self, Display, Formatter}; -use chrono::Utc; +use chrono::{Duration, Utc}; use feed_rs::parser; use reqwest::Client; use sqlx::PgPool; use tokio::sync::{broadcast, mpsc}; use tracing::log::warn; -use tracing::{info, info_span, instrument}; +use tracing::{error, info, info_span, instrument}; use url::Url; use uuid::Uuid; use crate::actors::entry_crawler::EntryCrawlerHandle; use crate::domain_locks::DomainLocks; use crate::models::entry::{CreateEntry, Entry}; -use crate::models::feed::Feed; +use crate::models::feed::{Feed, MAX_CRAWL_INTERVAL_MINUTES, MIN_CRAWL_INTERVAL_MINUTES}; use crate::uuid::Base62Uuid; /// The `FeedCrawler` actor fetches a feed url, parses it, and saves it to the database. @@ -88,8 +89,8 @@ impl FeedCrawler { .await .map_err(|_| FeedCrawlerError::GetFeedError(Base62Uuid::from(feed_id)))?; info!("got feed from db"); - let url = Url::parse(&feed.url) - .map_err(|_| FeedCrawlerError::InvalidUrl(feed.url.clone()))?; + let url = + Url::parse(&feed.url).map_err(|_| FeedCrawlerError::InvalidUrl(feed.url.clone()))?; let domain = url .domain() .ok_or(FeedCrawlerError::InvalidUrl(feed.url.clone()))?; @@ -113,12 +114,32 @@ impl FeedCrawler { feed.url = url.to_string(); feed.feed_type = parsed_feed.feed_type.into(); feed.last_crawled_at = Some(Utc::now()); + feed.last_crawl_error = None; if let Some(title) = parsed_feed.title { feed.title = Some(title.content); } if let Some(description) = parsed_feed.description { feed.description = Some(description.content); } + let last_entry_published_at = parsed_feed.entries.iter().filter_map(|e| e.published).max(); + if let Some(prev_last_entry_published_at) = feed.last_entry_published_at { + if let Some(published_at) = last_entry_published_at { + let time_since_last_entry = published_at - prev_last_entry_published_at; + match time_since_last_entry + .cmp(&Duration::minutes(feed.crawl_interval_minutes.into())) + { + Ordering::Greater => { + feed.crawl_interval_minutes = + i32::max(feed.crawl_interval_minutes * 2, MAX_CRAWL_INTERVAL_MINUTES); + }, + Ordering::Less => { + feed.crawl_interval_minutes = + i32::max(feed.crawl_interval_minutes / 2, MIN_CRAWL_INTERVAL_MINUTES); + }, + Ordering::Equal => {}, + } + } + } let feed = feed .save(&self.pool) .await @@ -173,6 +194,13 @@ impl FeedCrawler { respond_to, } => { let result = self.crawl_feed(feed_id).await; + if let Err(error) = &result { + match Feed::update_crawl_error(&self.pool, feed_id, format!("{}", error)).await { + Ok(_) => info!("updated feed last_crawl_error"), + Err(e) => error!("failed to update feed last_crawl_error: {}", e), + } + } + // ignore the result since the initiator may have cancelled waiting for the // response, and that is ok let _ = respond_to.send(FeedCrawlerHandleMessage::Feed(result)); @@ -227,10 +255,7 @@ impl FeedCrawlerHandle { /// Sends a `FeedCrawlerMessage::Crawl` message to the running `FeedCrawler` actor. /// /// Listen to the result of the crawl via the returned `broadcast::Receiver`. - pub async fn crawl( - &self, - feed_id: Uuid, - ) -> broadcast::Receiver { + pub async fn crawl(&self, feed_id: Uuid) -> broadcast::Receiver { let (sender, receiver) = broadcast::channel(8); let msg = FeedCrawlerMessage::Crawl { feed_id, diff --git a/src/models/feed.rs b/src/models/feed.rs index ab4f77a..909c974 100644 --- a/src/models/feed.rs +++ b/src/models/feed.rs @@ -2,7 +2,7 @@ use std::str::FromStr; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; -use sqlx::{FromRow, PgPool}; +use sqlx::{FromRow, PgPool, postgres::PgQueryResult}; use uuid::Uuid; use validator::Validate; @@ -47,6 +47,9 @@ impl From for FeedType { } } +pub const MIN_CRAWL_INTERVAL_MINUTES: i32 = 1; +pub const MAX_CRAWL_INTERVAL_MINUTES: i32 = 5040; + #[derive(Debug, Serialize, Deserialize, Clone, FromRow)] pub struct Feed { pub feed_id: Uuid, @@ -276,6 +279,18 @@ impl Feed { Ok(()) } + pub async fn update_crawl_error(pool: &PgPool, feed_id: Uuid, last_crawl_error: String) -> Result { + Ok(sqlx::query!( + r#"update feed set + last_crawl_error = $2 + where feed_id = $1"#, + feed_id, + last_crawl_error, + ) + .execute(pool) + .await?) + } + pub async fn save(&self, pool: &PgPool) -> Result { Ok(sqlx::query_as!( Feed,