Add EntryCrawler that uses readability lib
Actors delegating to actors baybeeee
This commit is contained in:
180
src/actors/entry_crawler.rs
Normal file
180
src/actors/entry_crawler.rs
Normal file
@@ -0,0 +1,180 @@
|
||||
use std::fmt::{self, Display, Formatter};
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
use bytes::Buf;
|
||||
use feed_rs::parser;
|
||||
use readability::extractor;
|
||||
use reqwest::Client;
|
||||
use sqlx::PgPool;
|
||||
use tokio::sync::{broadcast, mpsc, Mutex};
|
||||
use tracing::{info, instrument};
|
||||
use url::Url;
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::models::entry::{update_entry, CreateEntry, Entry};
|
||||
use crate::models::feed::{upsert_feed, CreateFeed, Feed};
|
||||
|
||||
/// The `EntryCrawler` actor fetches an entry url, extracts the content, and saves the content to
|
||||
/// the file system and any associated metadata to the database.
|
||||
///
|
||||
/// It receives `EntryCrawlerMessage` messages via the `receiver` channel. It communicates back to
|
||||
/// the sender of those messages via the `respond_to` channel on the `EntryCrawlerMessage`.
|
||||
///
|
||||
/// `EntryCrawler` should not be instantiated directly. Instead, use the `EntryCrawlerHandle`.
|
||||
struct EntryCrawler {
|
||||
receiver: mpsc::Receiver<EntryCrawlerMessage>,
|
||||
pool: PgPool,
|
||||
client: Client,
|
||||
content_dir: String,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum EntryCrawlerMessage {
|
||||
Crawl {
|
||||
entry: Entry,
|
||||
respond_to: broadcast::Sender<EntryCrawlerHandleMessage>,
|
||||
},
|
||||
}
|
||||
|
||||
impl Display for EntryCrawlerMessage {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
EntryCrawlerMessage::Crawl { entry, .. } => write!(f, "Crawl({})", entry.url),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An error type that enumerates possible failures during a crawl and is cloneable and can be sent
|
||||
/// across threads (does not reference the originating Errors which are usually not cloneable).
|
||||
#[derive(thiserror::Error, Debug, Clone)]
|
||||
pub enum EntryCrawlerError {
|
||||
#[error("invalid entry url: {0}")]
|
||||
InvalidUrl(String),
|
||||
#[error("failed to fetch entry: {0}")]
|
||||
FetchError(String),
|
||||
#[error("failed to extract content for entry: {0}")]
|
||||
ExtractError(String),
|
||||
#[error("failed to create entry: {0}")]
|
||||
CreateEntryError(String),
|
||||
#[error("failed to save entry content: {0}")]
|
||||
SaveContentError(String),
|
||||
}
|
||||
pub type EntryCrawlerResult<T, E = EntryCrawlerError> = ::std::result::Result<T, E>;
|
||||
|
||||
impl EntryCrawler {
|
||||
fn new(
|
||||
receiver: mpsc::Receiver<EntryCrawlerMessage>,
|
||||
pool: PgPool,
|
||||
client: Client,
|
||||
content_dir: String,
|
||||
) -> Self {
|
||||
EntryCrawler {
|
||||
receiver,
|
||||
pool,
|
||||
client,
|
||||
content_dir,
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(entry = %entry.url))]
|
||||
async fn crawl_entry(&self, entry: Entry) -> EntryCrawlerResult<Entry> {
|
||||
info!("Fetching and parsing entry");
|
||||
let content_dir = Path::new(&self.content_dir);
|
||||
let url =
|
||||
Url::parse(&entry.url).map_err(|_| EntryCrawlerError::InvalidUrl(entry.url.clone()))?;
|
||||
let bytes = self
|
||||
.client
|
||||
.get(url.clone())
|
||||
.send()
|
||||
.await
|
||||
.map_err(|_| EntryCrawlerError::FetchError(entry.url.clone()))?
|
||||
.bytes()
|
||||
.await
|
||||
.map_err(|_| EntryCrawlerError::FetchError(entry.url.clone()))?;
|
||||
let article = extractor::extract(&mut bytes.reader(), &url)
|
||||
.map_err(|_| EntryCrawlerError::ExtractError(entry.url.clone()))?;
|
||||
let id = entry.entry_id;
|
||||
// TODO: update entry with scraped data
|
||||
// if let Some(date) = article.date {
|
||||
// // prefer scraped date over rss feed date
|
||||
// let mut updated_entry = entry.clone();
|
||||
// updated_entry.published_at = date;
|
||||
// entry = update_entry(&self.pool, updated_entry)
|
||||
// .await
|
||||
// .map_err(|_| EntryCrawlerError::CreateEntryError(entry.url.clone()))?;
|
||||
// };
|
||||
fs::write(content_dir.join(format!("{}.html", id)), article.content)
|
||||
.map_err(|_| EntryCrawlerError::SaveContentError(entry.url.clone()))?;
|
||||
fs::write(content_dir.join(format!("{}.txt", id)), article.text)
|
||||
.map_err(|_| EntryCrawlerError::SaveContentError(entry.url.clone()))?;
|
||||
Ok(entry)
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(msg = %msg))]
|
||||
async fn handle_message(&mut self, msg: EntryCrawlerMessage) {
|
||||
match msg {
|
||||
EntryCrawlerMessage::Crawl { entry, respond_to } => {
|
||||
let result = self.crawl_entry(entry).await;
|
||||
// ignore the result since the initiator may have cancelled waiting for the
|
||||
// response, and that is ok
|
||||
let _ = respond_to.send(EntryCrawlerHandleMessage::Entry(result));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn run(&mut self) {
|
||||
info!("starting entry crawler");
|
||||
while let Some(msg) = self.receiver.recv().await {
|
||||
self.handle_message(msg).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The `EntryCrawlerHandle` is used to initialize and communicate with a `EntryCrawler` actor.
|
||||
///
|
||||
/// The `EntryCrawler` actor fetches a feed url, parses it, and saves it to the database. It runs
|
||||
/// as a separate asynchronous task from the main web server and communicates via channels.
|
||||
#[derive(Clone)]
|
||||
pub struct EntryCrawlerHandle {
|
||||
sender: mpsc::Sender<EntryCrawlerMessage>,
|
||||
}
|
||||
|
||||
/// The `EntryCrawlerHandleMessage` is the response to a `EntryCrawlerMessage` sent to the
|
||||
/// `EntryCrawlerHandle`.
|
||||
///
|
||||
/// `EntryCrawlerHandleMessage::Entry` contains the result of crawling an entry url.
|
||||
#[derive(Clone)]
|
||||
pub enum EntryCrawlerHandleMessage {
|
||||
Entry(EntryCrawlerResult<Entry>),
|
||||
}
|
||||
|
||||
impl EntryCrawlerHandle {
|
||||
/// Creates an async actor task that will listen for messages on the `sender` channel.
|
||||
pub fn new(pool: PgPool, client: Client, content_dir: String) -> Self {
|
||||
let (sender, receiver) = mpsc::channel(8);
|
||||
let mut crawler = EntryCrawler::new(receiver, pool, client, content_dir);
|
||||
tokio::spawn(async move { crawler.run().await });
|
||||
|
||||
Self { sender }
|
||||
}
|
||||
|
||||
/// Sends a `EntryCrawlerMessage::Crawl` message to the running `EntryCrawler` actor.
|
||||
///
|
||||
/// Listen to the result of the crawl via the returned `broadcast::Receiver`.
|
||||
pub async fn crawl(&self, entry: Entry) -> broadcast::Receiver<EntryCrawlerHandleMessage> {
|
||||
let (sender, receiver) = broadcast::channel(8);
|
||||
let msg = EntryCrawlerMessage::Crawl {
|
||||
entry,
|
||||
respond_to: sender,
|
||||
};
|
||||
|
||||
self.sender
|
||||
.send(msg)
|
||||
.await
|
||||
.expect("entry crawler task has died");
|
||||
receiver
|
||||
}
|
||||
}
|
||||
@@ -1,18 +1,21 @@
|
||||
use std::fmt::{self, Display, Formatter};
|
||||
|
||||
use chrono::Utc;
|
||||
use feed_rs::parser;
|
||||
use reqwest::Client;
|
||||
use sqlx::PgPool;
|
||||
use tokio::sync::{broadcast, mpsc};
|
||||
use tracing::{info, instrument};
|
||||
use tracing::log::warn;
|
||||
use tracing::{info, info_span, instrument};
|
||||
use url::Url;
|
||||
|
||||
use crate::models::entry::Entry;
|
||||
use crate::actors::entry_crawler::EntryCrawlerHandle;
|
||||
use crate::models::entry::{upsert_entries, CreateEntry, Entry};
|
||||
use crate::models::feed::{upsert_feed, CreateFeed, Feed};
|
||||
|
||||
/// The `FeedCrawler` actor fetches a feed url, parses it, and saves it to the database.
|
||||
///
|
||||
/// It receives `FeedCrawlerMessage` messages via the `receiver` channel. It communicates back to
|
||||
/// It receives `FeedCrawlerMessage` messages via the `receiver` channel. It communicates back to
|
||||
/// the sender of those messages via the `respond_to` channel on the `FeedCrawlerMessage`.
|
||||
///
|
||||
/// `FeedCrawler` should not be instantiated directly. Instead, use the `FeedCrawlerHandle`.
|
||||
@@ -20,6 +23,7 @@ struct FeedCrawler {
|
||||
receiver: mpsc::Receiver<FeedCrawlerMessage>,
|
||||
pool: PgPool,
|
||||
client: Client,
|
||||
content_dir: String,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -38,7 +42,7 @@ impl Display for FeedCrawlerMessage {
|
||||
}
|
||||
}
|
||||
|
||||
/// An error type that enumerates possible failures during a crawl and is cloneable and can be sent
|
||||
/// An error type that enumerates possible failures during a crawl and is cloneable and can be sent
|
||||
/// across threads (does not reference the originating Errors which are usually not cloneable).
|
||||
#[derive(thiserror::Error, Debug, Clone)]
|
||||
pub enum FeedCrawlerError {
|
||||
@@ -48,15 +52,23 @@ pub enum FeedCrawlerError {
|
||||
ParseError(Url),
|
||||
#[error("failed to create feed: {0}")]
|
||||
CreateFeedError(Url),
|
||||
#[error("failed to create feed entries: {0}")]
|
||||
CreateFeedEntriesError(Url),
|
||||
}
|
||||
pub type FeedCrawlerResult<T, E = FeedCrawlerError> = ::std::result::Result<T, E>;
|
||||
|
||||
impl FeedCrawler {
|
||||
fn new(receiver: mpsc::Receiver<FeedCrawlerMessage>, pool: PgPool, client: Client) -> Self {
|
||||
fn new(
|
||||
receiver: mpsc::Receiver<FeedCrawlerMessage>,
|
||||
pool: PgPool,
|
||||
client: Client,
|
||||
content_dir: String,
|
||||
) -> Self {
|
||||
FeedCrawler {
|
||||
receiver,
|
||||
pool,
|
||||
client,
|
||||
content_dir,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,6 +99,40 @@ impl FeedCrawler {
|
||||
.await
|
||||
.map_err(|_| FeedCrawlerError::CreateFeedError(url.clone()))?;
|
||||
info!(%feed.feed_id, "upserted feed");
|
||||
|
||||
let mut payload = Vec::with_capacity(parsed_feed.entries.len());
|
||||
for entry in parsed_feed.entries {
|
||||
let entry_span = info_span!("entry", id = entry.id);
|
||||
let _entry_span_guard = entry_span.enter();
|
||||
if let Some(link) = entry.links.get(0) {
|
||||
// if no scraped or feed date is available, fallback to the current time
|
||||
let published_at = entry.published.unwrap_or_else(Utc::now);
|
||||
let entry = CreateEntry {
|
||||
title: entry.title.map(|t| t.content),
|
||||
url: link.href.clone(),
|
||||
description: entry.summary.map(|s| s.content),
|
||||
feed_id: feed.feed_id,
|
||||
published_at,
|
||||
};
|
||||
payload.push(entry);
|
||||
} else {
|
||||
warn!("Skipping feed entry with no links");
|
||||
}
|
||||
}
|
||||
let entries = upsert_entries(&self.pool, payload)
|
||||
.await
|
||||
.map_err(|_| FeedCrawlerError::CreateFeedEntriesError(url.clone()))?;
|
||||
info!("Created {} entries", entries.len());
|
||||
|
||||
for entry in entries {
|
||||
let entry_crawler = EntryCrawlerHandle::new(
|
||||
self.pool.clone(),
|
||||
self.client.clone(),
|
||||
self.content_dir.clone(),
|
||||
);
|
||||
// TODO: ignoring this receiver for the time being, pipe through events eventually
|
||||
let _ = entry_crawler.crawl(entry).await;
|
||||
}
|
||||
Ok(feed)
|
||||
}
|
||||
|
||||
@@ -124,7 +170,7 @@ pub struct FeedCrawlerHandle {
|
||||
/// `FeedCrawlerHandle`.
|
||||
///
|
||||
/// `FeedCrawlerHandleMessage::Feed` contains the result of crawling a feed url.
|
||||
/// `FeedCrawlerHandleMessage::Entry` contains the result of crawling an entry url.
|
||||
/// `FeedCrawlerHandleMessage::Entry` contains the result of crawling an entry url within the feed.
|
||||
#[derive(Clone)]
|
||||
pub enum FeedCrawlerHandleMessage {
|
||||
Feed(FeedCrawlerResult<Feed>),
|
||||
@@ -133,9 +179,9 @@ pub enum FeedCrawlerHandleMessage {
|
||||
|
||||
impl FeedCrawlerHandle {
|
||||
/// Creates an async actor task that will listen for messages on the `sender` channel.
|
||||
pub fn new(pool: PgPool, client: Client) -> Self {
|
||||
pub fn new(pool: PgPool, client: Client, content_dir: String) -> Self {
|
||||
let (sender, receiver) = mpsc::channel(8);
|
||||
let mut crawler = FeedCrawler::new(receiver, pool, client);
|
||||
let mut crawler = FeedCrawler::new(receiver, pool, client, content_dir);
|
||||
tokio::spawn(async move { crawler.run().await });
|
||||
|
||||
Self { sender }
|
||||
|
||||
@@ -1 +1,2 @@
|
||||
pub mod entry_crawler;
|
||||
pub mod feed_crawler;
|
||||
|
||||
@@ -17,6 +17,7 @@ use tokio_stream::StreamExt;
|
||||
use url::Url;
|
||||
|
||||
use crate::actors::feed_crawler::{FeedCrawlerHandle, FeedCrawlerHandleMessage};
|
||||
use crate::config::Config;
|
||||
use crate::error::{Error, Result};
|
||||
use crate::models::entry::get_entries_for_feed;
|
||||
use crate::models::feed::{create_feed, delete_feed, get_feed, CreateFeed, FeedType};
|
||||
@@ -108,11 +109,13 @@ impl IntoResponse for AddFeedError {
|
||||
pub async fn post(
|
||||
State(pool): State<PgPool>,
|
||||
State(crawls): State<Crawls>,
|
||||
State(config): State<Config>,
|
||||
Form(add_feed): Form<AddFeed>,
|
||||
) -> AddFeedResult<Response> {
|
||||
// TODO: store the client in axum state (as long as it can be used concurrently?)
|
||||
let client = Client::new();
|
||||
let feed_crawler = FeedCrawlerHandle::new(pool.clone(), client.clone());
|
||||
let feed_crawler =
|
||||
FeedCrawlerHandle::new(pool.clone(), client.clone(), config.content_dir.clone());
|
||||
|
||||
let feed = create_feed(
|
||||
&pool,
|
||||
|
||||
@@ -13,6 +13,7 @@ use crate::models::feed::get_feeds;
|
||||
use crate::models::entry::{update_entry, upsert_entries, CreateEntry};
|
||||
use crate::uuid::Base62Uuid;
|
||||
|
||||
/// DEPRECATED: Use FeedCrawler instead, keeping this for reference until I set up scheduled jobs.
|
||||
/// For every feed in the database, fetches the feed, parses it, and saves new entries to the
|
||||
/// database.
|
||||
pub async fn crawl(pool: &PgPool) -> anyhow::Result<()> {
|
||||
|
||||
@@ -146,6 +146,37 @@ pub async fn create_entry(pool: &PgPool, payload: CreateEntry) -> Result<Entry>
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn upsert_entry(pool: &PgPool, payload: CreateEntry) -> Result<Entry> {
|
||||
payload.validate()?;
|
||||
sqlx::query_as!(
|
||||
Entry,
|
||||
"insert into entry (
|
||||
title, url, description, feed_id, published_at
|
||||
) values (
|
||||
$1, $2, $3, $4, $5
|
||||
) on conflict (url, feed_id) do update set
|
||||
title = excluded.title,
|
||||
description = excluded.description,
|
||||
published_at = excluded.published_at
|
||||
returning *",
|
||||
payload.title,
|
||||
payload.url,
|
||||
payload.description,
|
||||
payload.feed_id,
|
||||
payload.published_at,
|
||||
)
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
.map_err(|error| {
|
||||
if let sqlx::error::Error::Database(ref psql_error) = error {
|
||||
if psql_error.code().as_deref() == Some("23503") {
|
||||
return Error::RelationNotFound("feed");
|
||||
}
|
||||
}
|
||||
Error::Sqlx(error)
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn create_entries(pool: &PgPool, payload: Vec<CreateEntry>) -> Result<Vec<Entry>> {
|
||||
let mut titles = Vec::with_capacity(payload.len());
|
||||
let mut urls = Vec::with_capacity(payload.len());
|
||||
@@ -209,7 +240,10 @@ pub async fn upsert_entries(pool: &PgPool, payload: Vec<CreateEntry>) -> Result<
|
||||
"insert into entry (
|
||||
title, url, description, feed_id, published_at
|
||||
) select * from unnest($1::text[], $2::text[], $3::text[], $4::uuid[], $5::timestamptz[])
|
||||
on conflict do nothing
|
||||
on conflict (url, feed_id) do update set
|
||||
title = excluded.title,
|
||||
description = excluded.description,
|
||||
published_at = excluded.published_at
|
||||
returning *",
|
||||
titles.as_slice() as &[Option<String>],
|
||||
urls.as_slice(),
|
||||
|
||||
@@ -94,6 +94,7 @@ impl Layout {
|
||||
head {
|
||||
meta charset="utf-8";
|
||||
title { (self.title) }
|
||||
// TODO: vendor this before going to prod
|
||||
script type="module" {
|
||||
r#"import * as Turbo from 'https://cdn.skypack.dev/@hotwired/turbo';"#
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user