Finish implemtning OPML importer
Now with progress messages!
This commit is contained in:
@@ -123,7 +123,11 @@ impl CrawlScheduler {
|
||||
let crawl_interval = Duration::from_secs(feed.crawl_interval_minutes as u64 * 60);
|
||||
let mut interval = tokio::time::interval(crawl_interval);
|
||||
if let Some(last_crawled_at) = feed.last_crawled_at {
|
||||
dbg!(last_crawled_at);
|
||||
dbg!(Utc::now());
|
||||
if let Ok(duration_since_last_crawl) = (Utc::now() - last_crawled_at).to_std() {
|
||||
dbg!(duration_since_last_crawl);
|
||||
dbg!(crawl_interval);
|
||||
if duration_since_last_crawl < crawl_interval {
|
||||
info!(
|
||||
"last crawled at {:?}, crawling again in {:?}",
|
||||
@@ -145,27 +149,27 @@ impl CrawlScheduler {
|
||||
);
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
debug!("spawned crawler for feed");
|
||||
interval.tick().await;
|
||||
debug!("tick!");
|
||||
let mut receiver = feed_crawler.crawl(feed.feed_id).await;
|
||||
match receiver.recv().await {
|
||||
Ok(FeedCrawlerHandleMessage::Feed(Ok(feed))) => {
|
||||
let crawl_interval =
|
||||
Duration::from_secs(feed.crawl_interval_minutes as u64 * 60);
|
||||
interval = interval_at(Instant::now() + crawl_interval, crawl_interval);
|
||||
info!(
|
||||
minutes = feed.crawl_interval_minutes,
|
||||
"updated crawl interval"
|
||||
);
|
||||
let _ = respond_to.send(CrawlSchedulerHandleMessage::FeedCrawler(
|
||||
FeedCrawlerHandleMessage::Feed(Ok(feed)),
|
||||
));
|
||||
while let Ok(msg) = receiver.recv().await {
|
||||
match msg {
|
||||
FeedCrawlerHandleMessage::Feed(Ok(feed)) => {
|
||||
let crawl_interval =
|
||||
Duration::from_secs(feed.crawl_interval_minutes as u64 * 60);
|
||||
interval = interval_at(Instant::now() + crawl_interval, crawl_interval);
|
||||
info!(
|
||||
minutes = feed.crawl_interval_minutes,
|
||||
"updated crawl interval"
|
||||
);
|
||||
let _ = respond_to.send(CrawlSchedulerHandleMessage::FeedCrawler(
|
||||
FeedCrawlerHandleMessage::Feed(Ok(feed)),
|
||||
));
|
||||
}
|
||||
result => {
|
||||
let _ =
|
||||
respond_to.send(CrawlSchedulerHandleMessage::FeedCrawler(result));
|
||||
}
|
||||
}
|
||||
Ok(result) => {
|
||||
let _ = respond_to.send(CrawlSchedulerHandleMessage::FeedCrawler(result));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -229,7 +233,7 @@ pub struct CrawlSchedulerHandle {
|
||||
/// `CrawlSchedulerHandle`.
|
||||
///
|
||||
/// `CrawlSchedulerHandleMessage::Feed` contains the result of crawling a feed url.
|
||||
#[derive(Clone)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum CrawlSchedulerHandleMessage {
|
||||
Bootstrap(CrawlSchedulerResult<()>),
|
||||
Schedule(CrawlSchedulerResult<()>),
|
||||
@@ -268,7 +272,10 @@ impl CrawlSchedulerHandle {
|
||||
/// Sends a `CrawlSchedulerMessage::Schedule` message to the running `CrawlScheduler` actor.
|
||||
///
|
||||
/// Listen to the result of the scheduling via the returned `broadcast::Receiver`.
|
||||
pub async fn schedule(&self, feed_id: Uuid) -> broadcast::Receiver<CrawlSchedulerHandleMessage> {
|
||||
pub async fn schedule(
|
||||
&self,
|
||||
feed_id: Uuid,
|
||||
) -> broadcast::Receiver<CrawlSchedulerHandleMessage> {
|
||||
let (sender, receiver) = broadcast::channel(8);
|
||||
let msg = CrawlSchedulerMessage::Schedule {
|
||||
feed_id,
|
||||
|
||||
@@ -14,7 +14,9 @@ use tracing::{debug, error, info, info_span, instrument, warn};
|
||||
use url::Url;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::actors::entry_crawler::EntryCrawlerHandle;
|
||||
use crate::actors::entry_crawler::{
|
||||
EntryCrawlerHandle, EntryCrawlerHandleMessage, EntryCrawlerResult,
|
||||
};
|
||||
use crate::domain_locks::DomainLocks;
|
||||
use crate::models::entry::{CreateEntry, Entry};
|
||||
use crate::models::feed::{Feed, MAX_CRAWL_INTERVAL_MINUTES, MIN_CRAWL_INTERVAL_MINUTES};
|
||||
@@ -87,7 +89,11 @@ impl FeedCrawler {
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(feed_id = %feed_id))]
|
||||
async fn crawl_feed(&self, feed_id: Uuid) -> FeedCrawlerResult<Feed> {
|
||||
async fn crawl_feed(
|
||||
&self,
|
||||
feed_id: Uuid,
|
||||
respond_to: broadcast::Sender<FeedCrawlerHandleMessage>,
|
||||
) -> FeedCrawlerResult<Feed> {
|
||||
let mut feed = Feed::get(&self.pool, feed_id)
|
||||
.await
|
||||
.map_err(|_| FeedCrawlerError::GetFeedError(Base62Uuid::from(feed_id)))?;
|
||||
@@ -159,6 +165,7 @@ impl FeedCrawler {
|
||||
return Ok(feed);
|
||||
} else if !resp.status().is_success() {
|
||||
warn!("feed returned non-successful status");
|
||||
feed.last_crawled_at = Some(Utc::now());
|
||||
feed.last_crawl_error = resp.status().canonical_reason().map(|s| s.to_string());
|
||||
let feed = feed
|
||||
.save(&self.pool)
|
||||
@@ -172,7 +179,7 @@ impl FeedCrawler {
|
||||
.bytes()
|
||||
.await
|
||||
.map_err(|_| FeedCrawlerError::FetchError(url.clone()))?;
|
||||
|
||||
|
||||
let parsed_feed =
|
||||
parser::parse(&bytes[..]).map_err(|_| FeedCrawlerError::ParseError(url.clone()))?;
|
||||
info!("parsed feed");
|
||||
@@ -246,8 +253,10 @@ impl FeedCrawler {
|
||||
self.domain_locks.clone(),
|
||||
self.content_dir.clone(),
|
||||
);
|
||||
// TODO: ignoring this receiver for the time being, pipe through events eventually
|
||||
let _ = entry_crawler.crawl(entry).await;
|
||||
let mut entry_receiver = entry_crawler.crawl(entry).await;
|
||||
while let Ok(EntryCrawlerHandleMessage::Entry(result)) = entry_receiver.recv().await {
|
||||
let _ = respond_to.send(FeedCrawlerHandleMessage::Entry(result));
|
||||
}
|
||||
}
|
||||
Ok(feed)
|
||||
}
|
||||
@@ -259,7 +268,7 @@ impl FeedCrawler {
|
||||
feed_id,
|
||||
respond_to,
|
||||
} => {
|
||||
let result = self.crawl_feed(feed_id).await;
|
||||
let result = self.crawl_feed(feed_id, respond_to.clone()).await;
|
||||
if let Err(error) = &result {
|
||||
match Feed::update_crawl_error(&self.pool, feed_id, format!("{}", error)).await
|
||||
{
|
||||
@@ -298,10 +307,10 @@ pub struct FeedCrawlerHandle {
|
||||
///
|
||||
/// `FeedCrawlerHandleMessage::Feed` contains the result of crawling a feed url.
|
||||
/// `FeedCrawlerHandleMessage::Entry` contains the result of crawling an entry url within the feed.
|
||||
#[derive(Clone)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum FeedCrawlerHandleMessage {
|
||||
Feed(FeedCrawlerResult<Feed>),
|
||||
Entry(FeedCrawlerResult<Entry>),
|
||||
Entry(EntryCrawlerResult<Entry>),
|
||||
}
|
||||
|
||||
impl FeedCrawlerHandle {
|
||||
|
||||
@@ -8,8 +8,10 @@ use opml::OPML;
|
||||
use sqlx::PgPool;
|
||||
use tokio::sync::{broadcast, mpsc};
|
||||
use tracing::{debug, error, instrument};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::actors::crawl_scheduler::{CrawlSchedulerHandle, CrawlSchedulerHandleMessage};
|
||||
use crate::actors::feed_crawler::FeedCrawlerHandleMessage;
|
||||
use crate::models::feed::{Feed, UpsertFeed};
|
||||
use crate::uuid::Base62Uuid;
|
||||
|
||||
@@ -46,6 +48,17 @@ impl Display for ImporterMessage {
|
||||
}
|
||||
}
|
||||
|
||||
async fn listen_to_crawl(
|
||||
feed_id: Uuid,
|
||||
crawl_scheduler: CrawlSchedulerHandle,
|
||||
respond_to: broadcast::Sender<ImporterHandleMessage>,
|
||||
) {
|
||||
let mut receiver = crawl_scheduler.schedule(feed_id).await;
|
||||
while let Ok(msg) = receiver.recv().await {
|
||||
let _ = respond_to.send(ImporterHandleMessage::CrawlScheduler(msg));
|
||||
}
|
||||
}
|
||||
|
||||
/// An error type that enumerates possible failures during a crawl and is cloneable and can be sent
|
||||
/// across threads (does not reference the originating Errors which are usually not cloneable).
|
||||
#[derive(thiserror::Error, Debug, Clone)]
|
||||
@@ -80,7 +93,6 @@ impl Importer {
|
||||
) -> ImporterResult<()> {
|
||||
let document = OPML::from_reader(&mut Cursor::new(bytes))
|
||||
.map_err(|_| ImporterError::InvalidOPML(file_name.unwrap_or(import_id.to_string())))?;
|
||||
let mut receivers = Vec::new();
|
||||
for url in Self::gather_feed_urls(document.body.outlines) {
|
||||
let feed = Feed::upsert(
|
||||
&self.pool,
|
||||
@@ -91,19 +103,15 @@ impl Importer {
|
||||
)
|
||||
.await
|
||||
.map_err(|_| ImporterError::CreateFeedError(url))?;
|
||||
if feed.updated_at.is_some() {
|
||||
receivers.push(self.crawl_scheduler.schedule(feed.feed_id).await);
|
||||
if feed.updated_at.is_none() {
|
||||
tokio::spawn(listen_to_crawl(
|
||||
feed.feed_id,
|
||||
self.crawl_scheduler.clone(),
|
||||
respond_to.clone(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
let mut future_recvs: FuturesUnordered<_> =
|
||||
receivers.iter_mut().map(|rx| rx.recv()).collect();
|
||||
|
||||
while let Some(result) = future_recvs.next().await {
|
||||
if let Ok(crawl_scheduler_msg) = result {
|
||||
let _ = respond_to.send(ImporterHandleMessage::CrawlScheduler(crawl_scheduler_msg));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -161,7 +169,7 @@ pub struct ImporterHandle {
|
||||
///
|
||||
/// `ImporterHandleMessage::Import` contains the result of importing the OPML file.
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
#[derive(Clone)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ImporterHandleMessage {
|
||||
// TODO: send stats of import or forward crawler messages?
|
||||
Import(ImporterResult<()>),
|
||||
|
||||
Reference in New Issue
Block a user