Add CrawlScheduler actor, shared client w/ last modified headers

This commit is contained in:
2023-07-15 21:40:31 -04:00
parent 02d5cb9976
commit d17f909312
14 changed files with 494 additions and 109 deletions

View File

@@ -0,0 +1,284 @@
use std::fmt::{self, Display, Formatter};
use std::time::Duration;
use chrono::Utc;
use reqwest::Client;
use sqlx::PgPool;
use tokio::sync::{broadcast, mpsc};
use tokio::time::{interval_at, Instant};
use tracing::{debug, error, info, instrument};
use uuid::Uuid;
use crate::actors::feed_crawler::{FeedCrawlerError, FeedCrawlerHandle, FeedCrawlerHandleMessage};
use crate::domain_locks::DomainLocks;
use crate::models::feed::{Feed, GetFeedsOptions};
struct CrawlScheduler {
receiver: mpsc::Receiver<CrawlSchedulerMessage>,
pool: PgPool,
client: Client,
domain_locks: DomainLocks,
content_dir: String,
}
#[derive(Debug)]
enum CrawlSchedulerMessage {
Schedule {
feed_id: Uuid,
respond_to: broadcast::Sender<CrawlSchedulerHandleMessage>,
},
Bootstrap {
respond_to: broadcast::Sender<CrawlSchedulerHandleMessage>,
},
}
impl Display for CrawlSchedulerMessage {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
CrawlSchedulerMessage::Schedule { feed_id, .. } => write!(f, "Schedule({})", feed_id),
CrawlSchedulerMessage::Bootstrap { .. } => write!(f, "Bootstrap"),
}
}
}
/// An error type that enumerates possible failures during a crawl and is cloneable and can be sent
/// across threads (does not reference the originating Errors which are usually not cloneable).
#[derive(thiserror::Error, Debug, Clone)]
pub enum CrawlSchedulerError {
#[error("failed to fetch feed from database: {0}")]
FetchFeedError(String),
#[error("failed to fetch feeds from database: {0}")]
FetchFeedsError(String),
#[error("failed to crawl feed: {0}")]
FeedCrawlerError(FeedCrawlerError),
}
pub type CrawlSchedulerResult<T, E = CrawlSchedulerError> = ::std::result::Result<T, E>;
impl CrawlScheduler {
fn new(
receiver: mpsc::Receiver<CrawlSchedulerMessage>,
pool: PgPool,
client: Client,
domain_locks: DomainLocks,
content_dir: String,
) -> Self {
CrawlScheduler {
receiver,
pool,
client,
domain_locks,
content_dir,
}
}
#[instrument(skip_all)]
async fn bootstrap(
&self,
respond_to: broadcast::Sender<CrawlSchedulerHandleMessage>,
) -> CrawlSchedulerResult<()> {
debug!("scheduling crawlers");
let mut options = GetFeedsOptions::default();
loop {
info!("fetching feeds before: {:?}", options.before);
let feeds = match Feed::get_all(&self.pool, options.clone()).await {
Err(err) => {
return Err(CrawlSchedulerError::FetchFeedsError(err.to_string()));
}
Ok(feeds) if feeds.is_empty() => {
info!("no more feeds found");
break;
}
Ok(feeds) => feeds,
};
info!("found {} feeds", feeds.len());
options.before = feeds.last().map(|f| f.created_at);
for feed in feeds.into_iter() {
self.spawn_crawler_loop(feed, respond_to.clone());
}
}
debug!("done scheduling crawlers");
Ok(())
}
#[instrument(skip_all, fields(feed_id = %feed_id))]
async fn schedule(
&self,
feed_id: Uuid,
respond_to: broadcast::Sender<CrawlSchedulerHandleMessage>,
) -> CrawlSchedulerResult<()> {
let feed = Feed::get(&self.pool, feed_id)
.await
.map_err(|err| CrawlSchedulerError::FetchFeedError(err.to_string()))?;
self.spawn_crawler_loop(feed, respond_to);
Ok(())
}
#[instrument(skip_all, fields(feed_id = %feed.feed_id))]
fn spawn_crawler_loop(
&self,
feed: Feed,
respond_to: broadcast::Sender<CrawlSchedulerHandleMessage>,
) {
let crawl_interval = Duration::from_secs(feed.crawl_interval_minutes as u64 * 60);
let mut interval = tokio::time::interval(crawl_interval);
if let Some(last_crawled_at) = feed.last_crawled_at {
if let Ok(duration_since_last_crawl) = (Utc::now() - last_crawled_at).to_std() {
if duration_since_last_crawl < crawl_interval {
info!(
"last crawled at {:?}, crawling again in {:?}",
last_crawled_at,
crawl_interval - duration_since_last_crawl
);
interval = interval_at(
Instant::now() + (crawl_interval - duration_since_last_crawl),
crawl_interval,
);
}
}
}
let feed_crawler = FeedCrawlerHandle::new(
self.pool.clone(),
self.client.clone(),
self.domain_locks.clone(),
self.content_dir.clone(),
);
tokio::spawn(async move {
loop {
debug!("spawned crawler for feed");
interval.tick().await;
debug!("tick!");
let mut receiver = feed_crawler.crawl(feed.feed_id).await;
match receiver.recv().await {
Ok(FeedCrawlerHandleMessage::Feed(Ok(feed))) => {
let crawl_interval =
Duration::from_secs(feed.crawl_interval_minutes as u64 * 60);
interval = interval_at(Instant::now() + crawl_interval, crawl_interval);
info!(
minutes = feed.crawl_interval_minutes,
"updated crawl interval"
);
let _ = respond_to.send(CrawlSchedulerHandleMessage::FeedCrawler(
FeedCrawlerHandleMessage::Feed(Ok(feed)),
));
}
Ok(result) => {
let _ = respond_to.send(CrawlSchedulerHandleMessage::FeedCrawler(result));
}
_ => {}
}
}
});
}
#[instrument(skip_all, fields(msg = %msg))]
async fn handle_message(&mut self, msg: CrawlSchedulerMessage) {
match msg {
CrawlSchedulerMessage::Bootstrap { respond_to } => {
let result = self.bootstrap(respond_to.clone()).await;
if let Err(err) = &result {
error!("failed to bootstrap: {}", err);
}
// ignore the result since the initiator may have cancelled waiting for the
// response, and that is ok
let _ = respond_to.send(CrawlSchedulerHandleMessage::Bootstrap(result));
}
CrawlSchedulerMessage::Schedule {
feed_id,
respond_to,
} => {
let result = self.schedule(feed_id, respond_to.clone()).await;
if let Err(err) = &result {
error!("failed to schedule: {}", err);
}
// ignore the result since the initiator may have cancelled waiting for the
// response, and that is ok
let _ = respond_to.send(CrawlSchedulerHandleMessage::Schedule(result));
}
}
}
#[instrument(skip_all)]
async fn run(&mut self) {
debug!("starting crawl scheduler");
while let Some(msg) = self.receiver.recv().await {
self.handle_message(msg).await;
}
}
}
/// The `CrawlSchedulerHandle` is used to initialize and communicate with a `CrawlScheduler` actor.
///
/// Spawns an async task separate from the main web server that fetches all feeds from the database
/// and then spawns a long-lived async task for each feed that repeatedly crawls the feed at the
/// interval specified by each feeds' `crawl_interval_minutes`.
///
/// Initially, all feeds will immediately be crawled unless the `last_crawled_at` timestamp set in
/// the database is less than the current time minus its `crawl_interval` in which case the crawl
/// will be scheduled in the future.
///
/// After each crawl, the interval may be updated based on the result of the crawl.
#[derive(Clone)]
pub struct CrawlSchedulerHandle {
sender: mpsc::Sender<CrawlSchedulerMessage>,
}
/// The `CrawlSchedulerHandleMessage` is the response to a `CrawlSchedulerMessage` sent to the
/// `CrawlSchedulerHandle`.
///
/// `CrawlSchedulerHandleMessage::Feed` contains the result of crawling a feed url.
#[derive(Clone)]
pub enum CrawlSchedulerHandleMessage {
Bootstrap(CrawlSchedulerResult<()>),
Schedule(CrawlSchedulerResult<()>),
FeedCrawler(FeedCrawlerHandleMessage),
}
impl CrawlSchedulerHandle {
/// Creates an async actor task that will listen for messages on the `sender` channel.
pub fn new(
pool: PgPool,
client: Client,
domain_locks: DomainLocks,
content_dir: String,
) -> Self {
let (sender, receiver) = mpsc::channel(8);
let mut scheduler = CrawlScheduler::new(receiver, pool, client, domain_locks, content_dir);
tokio::spawn(async move { scheduler.run().await });
Self { sender }
}
/// Sends a `CrawlSchedulerMessage::Bootstrap` message to the running `CrawlScheduler` actor.
///
/// Listen to the result of the scheduling via the returned `broadcast::Receiver`.
pub async fn bootstrap(&self) -> broadcast::Receiver<CrawlSchedulerHandleMessage> {
let (sender, receiver) = broadcast::channel(8);
let msg = CrawlSchedulerMessage::Bootstrap { respond_to: sender };
self.sender
.send(msg)
.await
.expect("crawl scheduler task has died");
receiver
}
/// Sends a `CrawlSchedulerMessage::Schedule` message to the running `CrawlScheduler` actor.
///
/// Listen to the result of the scheduling via the returned `broadcast::Receiver`.
pub async fn schedule(&self, feed_id: Uuid) -> broadcast::Receiver<CrawlSchedulerHandleMessage> {
let (sender, receiver) = broadcast::channel(8);
let msg = CrawlSchedulerMessage::Schedule {
feed_id,
respond_to: sender,
};
self.sender
.send(msg)
.await
.expect("crawl scheduler task has died");
receiver
}
}

View File

@@ -7,7 +7,7 @@ use readability::extractor;
use reqwest::Client;
use sqlx::PgPool;
use tokio::sync::{broadcast, mpsc};
use tracing::{info, instrument};
use tracing::{debug, info, instrument};
use url::Url;
use crate::domain_locks::DomainLocks;
@@ -80,7 +80,7 @@ impl EntryCrawler {
#[instrument(skip_all, fields(entry = %entry.url))]
async fn crawl_entry(&self, entry: Entry) -> EntryCrawlerResult<Entry> {
info!("Fetching and parsing entry");
info!("starting fetch");
let content_dir = Path::new(&self.content_dir);
let url =
Url::parse(&entry.url).map_err(|_| EntryCrawlerError::InvalidUrl(entry.url.clone()))?;
@@ -136,7 +136,7 @@ impl EntryCrawler {
#[instrument(skip_all)]
async fn run(&mut self) {
info!("starting entry crawler");
debug!("starting entry crawler");
while let Some(msg) = self.receiver.recv().await {
self.handle_message(msg).await;
}

View File

@@ -3,11 +3,14 @@ use std::fmt::{self, Display, Formatter};
use chrono::{Duration, Utc};
use feed_rs::parser;
use reqwest::Client;
use reqwest::StatusCode;
use reqwest::{
header::{self, HeaderMap},
Client,
};
use sqlx::PgPool;
use tokio::sync::{broadcast, mpsc};
use tracing::log::warn;
use tracing::{error, info, info_span, instrument};
use tracing::{debug, error, info, info_span, instrument, warn};
use url::Url;
use uuid::Uuid;
@@ -94,20 +97,82 @@ impl FeedCrawler {
let domain = url
.domain()
.ok_or(FeedCrawlerError::InvalidUrl(feed.url.clone()))?;
let bytes = self
let mut headers = HeaderMap::new();
if let Some(etag) = &feed.etag_header {
if let Ok(etag) = etag.parse() {
headers.insert(header::IF_NONE_MATCH, etag);
} else {
warn!(%etag, "failed to parse saved etag header");
}
}
if let Some(last_modified) = &feed.last_modified_header {
if let Ok(last_modified) = last_modified.parse() {
headers.insert(header::IF_MODIFIED_SINCE, last_modified);
} else {
warn!(
%last_modified,
"failed to parse saved last_modified header",
);
}
}
info!(url=%url, "starting fetch");
let resp = self
.domain_locks
.run_request(domain, async {
self.client
.get(url.clone())
.headers(headers)
.send()
.await
.map_err(|_| FeedCrawlerError::FetchError(url.clone()))?
.bytes()
.await
.map_err(|_| FeedCrawlerError::FetchError(url.clone()))
})
.await?;
let headers = resp.headers();
if let Some(etag) = headers.get(header::ETAG) {
if let Ok(etag) = etag.to_str() {
feed.etag_header = Some(etag.to_string());
} else {
warn!(?etag, "failed to convert response etag header to string");
}
}
if let Some(last_modified) = headers.get(header::LAST_MODIFIED) {
if let Ok(last_modified) = last_modified.to_str() {
feed.last_modified_header = Some(last_modified.to_string());
} else {
warn!(
?last_modified,
"failed to convert response last_modified header to string",
);
}
}
info!(url=%url, "fetched feed");
if resp.status() == StatusCode::NOT_MODIFIED {
info!("feed returned not modified status");
feed.last_crawled_at = Some(Utc::now());
feed.last_crawl_error = None;
let feed = feed
.save(&self.pool)
.await
.map_err(|_| FeedCrawlerError::CreateFeedError(url.clone()))?;
info!("updated feed in db");
return Ok(feed);
} else if !resp.status().is_success() {
warn!("feed returned non-successful status");
feed.last_crawl_error = resp.status().canonical_reason().map(|s| s.to_string());
let feed = feed
.save(&self.pool)
.await
.map_err(|_| FeedCrawlerError::CreateFeedError(url.clone()))?;
info!("updated feed in db");
return Ok(feed);
}
let bytes = resp
.bytes()
.await
.map_err(|_| FeedCrawlerError::FetchError(url.clone()))?;
let parsed_feed =
parser::parse(&bytes[..]).map_err(|_| FeedCrawlerError::ParseError(url.clone()))?;
info!("parsed feed");
@@ -131,15 +196,16 @@ impl FeedCrawler {
Ordering::Greater => {
feed.crawl_interval_minutes =
i32::max(feed.crawl_interval_minutes * 2, MAX_CRAWL_INTERVAL_MINUTES);
},
}
Ordering::Less => {
feed.crawl_interval_minutes =
i32::max(feed.crawl_interval_minutes / 2, MIN_CRAWL_INTERVAL_MINUTES);
},
Ordering::Equal => {},
}
Ordering::Equal => {}
}
}
}
feed.last_entry_published_at = last_entry_published_at;
let feed = feed
.save(&self.pool)
.await
@@ -162,7 +228,7 @@ impl FeedCrawler {
};
payload.push(entry);
} else {
warn!("Skipping feed entry with no links");
warn!("skipping feed entry with no links");
}
}
let entries = Entry::bulk_upsert(&self.pool, payload)
@@ -195,12 +261,13 @@ impl FeedCrawler {
} => {
let result = self.crawl_feed(feed_id).await;
if let Err(error) = &result {
match Feed::update_crawl_error(&self.pool, feed_id, format!("{}", error)).await {
match Feed::update_crawl_error(&self.pool, feed_id, format!("{}", error)).await
{
Ok(_) => info!("updated feed last_crawl_error"),
Err(e) => error!("failed to update feed last_crawl_error: {}", e),
}
}
// ignore the result since the initiator may have cancelled waiting for the
// response, and that is ok
let _ = respond_to.send(FeedCrawlerHandleMessage::Feed(result));
@@ -210,7 +277,7 @@ impl FeedCrawler {
#[instrument(skip_all)]
async fn run(&mut self) {
info!("starting feed crawler");
debug!("starting feed crawler");
while let Some(msg) = self.receiver.recv().await {
self.handle_message(msg).await;
}

View File

@@ -1,2 +1,3 @@
pub mod crawl_scheduler;
pub mod entry_crawler;
pub mod feed_crawler;