Add crawl metadata to feed & improve model interface
This commit is contained in:
@@ -8,11 +8,13 @@ use tokio::sync::{broadcast, mpsc};
|
||||
use tracing::log::warn;
|
||||
use tracing::{info, info_span, instrument};
|
||||
use url::Url;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::actors::entry_crawler::EntryCrawlerHandle;
|
||||
use crate::domain_locks::DomainLocks;
|
||||
use crate::models::entry::{upsert_entries, CreateEntry, Entry};
|
||||
use crate::models::feed::{upsert_feed, CreateFeed, Feed};
|
||||
use crate::models::entry::{CreateEntry, Entry};
|
||||
use crate::models::feed::Feed;
|
||||
use crate::uuid::Base62Uuid;
|
||||
|
||||
/// The `FeedCrawler` actor fetches a feed url, parses it, and saves it to the database.
|
||||
///
|
||||
@@ -31,7 +33,7 @@ struct FeedCrawler {
|
||||
#[derive(Debug)]
|
||||
enum FeedCrawlerMessage {
|
||||
Crawl {
|
||||
url: Url,
|
||||
feed_id: Uuid,
|
||||
respond_to: broadcast::Sender<FeedCrawlerHandleMessage>,
|
||||
},
|
||||
}
|
||||
@@ -39,7 +41,7 @@ enum FeedCrawlerMessage {
|
||||
impl Display for FeedCrawlerMessage {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
FeedCrawlerMessage::Crawl { url, .. } => write!(f, "Crawl({})", url),
|
||||
FeedCrawlerMessage::Crawl { feed_id, .. } => write!(f, "Crawl({})", feed_id),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -49,11 +51,13 @@ impl Display for FeedCrawlerMessage {
|
||||
#[derive(thiserror::Error, Debug, Clone)]
|
||||
pub enum FeedCrawlerError {
|
||||
#[error("invalid feed url: {0}")]
|
||||
InvalidUrl(Url),
|
||||
InvalidUrl(String),
|
||||
#[error("failed to fetch feed: {0}")]
|
||||
FetchError(Url),
|
||||
#[error("failed to parse feed: {0}")]
|
||||
ParseError(Url),
|
||||
#[error("failed to find feed in database: {0}")]
|
||||
GetFeedError(Base62Uuid),
|
||||
#[error("failed to create feed: {0}")]
|
||||
CreateFeedError(Url),
|
||||
#[error("failed to create feed entries: {0}")]
|
||||
@@ -78,11 +82,17 @@ impl FeedCrawler {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(url = %url))]
|
||||
async fn crawl_feed(&self, url: Url) -> FeedCrawlerResult<Feed> {
|
||||
#[instrument(skip_all, fields(feed_id = %feed_id))]
|
||||
async fn crawl_feed(&self, feed_id: Uuid) -> FeedCrawlerResult<Feed> {
|
||||
let mut feed = Feed::get(&self.pool, feed_id)
|
||||
.await
|
||||
.map_err(|_| FeedCrawlerError::GetFeedError(Base62Uuid::from(feed_id)))?;
|
||||
info!("got feed from db");
|
||||
let url = Url::parse(&feed.url)
|
||||
.map_err(|_| FeedCrawlerError::InvalidUrl(feed.url.clone()))?;
|
||||
let domain = url
|
||||
.domain()
|
||||
.ok_or(FeedCrawlerError::InvalidUrl(url.clone()))?;
|
||||
.ok_or(FeedCrawlerError::InvalidUrl(feed.url.clone()))?;
|
||||
let bytes = self
|
||||
.domain_locks
|
||||
.run_request(domain, async {
|
||||
@@ -96,22 +106,24 @@ impl FeedCrawler {
|
||||
.map_err(|_| FeedCrawlerError::FetchError(url.clone()))
|
||||
})
|
||||
.await?;
|
||||
info!("fetched feed");
|
||||
info!(url=%url, "fetched feed");
|
||||
let parsed_feed =
|
||||
parser::parse(&bytes[..]).map_err(|_| FeedCrawlerError::ParseError(url.clone()))?;
|
||||
info!("parsed feed");
|
||||
let feed = upsert_feed(
|
||||
&self.pool,
|
||||
CreateFeed {
|
||||
title: parsed_feed.title.map(|text| text.content),
|
||||
url: url.to_string(),
|
||||
feed_type: parsed_feed.feed_type.into(),
|
||||
description: parsed_feed.description.map(|text| text.content),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.map_err(|_| FeedCrawlerError::CreateFeedError(url.clone()))?;
|
||||
info!(%feed.feed_id, "upserted feed");
|
||||
feed.url = url.to_string();
|
||||
feed.feed_type = parsed_feed.feed_type.into();
|
||||
feed.last_crawled_at = Some(Utc::now());
|
||||
if let Some(title) = parsed_feed.title {
|
||||
feed.title = Some(title.content);
|
||||
}
|
||||
if let Some(description) = parsed_feed.description {
|
||||
feed.description = Some(description.content);
|
||||
}
|
||||
let feed = feed
|
||||
.save(&self.pool)
|
||||
.await
|
||||
.map_err(|_| FeedCrawlerError::CreateFeedError(url.clone()))?;
|
||||
info!("updated feed in db");
|
||||
|
||||
let mut payload = Vec::with_capacity(parsed_feed.entries.len());
|
||||
for entry in parsed_feed.entries {
|
||||
@@ -132,7 +144,7 @@ impl FeedCrawler {
|
||||
warn!("Skipping feed entry with no links");
|
||||
}
|
||||
}
|
||||
let entries = upsert_entries(&self.pool, payload)
|
||||
let entries = Entry::bulk_upsert(&self.pool, payload)
|
||||
.await
|
||||
.map_err(|_| FeedCrawlerError::CreateFeedEntriesError(url.clone()))?;
|
||||
let (new, updated) = entries
|
||||
@@ -156,8 +168,11 @@ impl FeedCrawler {
|
||||
#[instrument(skip_all, fields(msg = %msg))]
|
||||
async fn handle_message(&mut self, msg: FeedCrawlerMessage) {
|
||||
match msg {
|
||||
FeedCrawlerMessage::Crawl { url, respond_to } => {
|
||||
let result = self.crawl_feed(url).await;
|
||||
FeedCrawlerMessage::Crawl {
|
||||
feed_id,
|
||||
respond_to,
|
||||
} => {
|
||||
let result = self.crawl_feed(feed_id).await;
|
||||
// ignore the result since the initiator may have cancelled waiting for the
|
||||
// response, and that is ok
|
||||
let _ = respond_to.send(FeedCrawlerHandleMessage::Feed(result));
|
||||
@@ -212,10 +227,13 @@ impl FeedCrawlerHandle {
|
||||
/// Sends a `FeedCrawlerMessage::Crawl` message to the running `FeedCrawler` actor.
|
||||
///
|
||||
/// Listen to the result of the crawl via the returned `broadcast::Receiver`.
|
||||
pub async fn crawl(&self, url: Url) -> broadcast::Receiver<FeedCrawlerHandleMessage> {
|
||||
pub async fn crawl(
|
||||
&self,
|
||||
feed_id: Uuid,
|
||||
) -> broadcast::Receiver<FeedCrawlerHandleMessage> {
|
||||
let (sender, receiver) = broadcast::channel(8);
|
||||
let msg = FeedCrawlerMessage::Crawl {
|
||||
url,
|
||||
feed_id,
|
||||
respond_to: sender,
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user