Add an async actor FeedCrawler for fetching feed details

Currently, this allows the browser to subscribe to the response of the
asynchronous crawl after they add a new feed.

Eventually I will also use this in the main scheduled crawls. Right now,
it only upserts feed metadata based on the parsed feed.
This commit is contained in:
Tyler Hallada 2023-07-09 21:18:19 -04:00
parent 8f4db1d8d9
commit f13c7e5e70
14 changed files with 405 additions and 58 deletions

View File

@ -41,7 +41,7 @@ tokio-stream = { version = "0.1", features = ["sync"] }
tower = "0.4"
tower-livereload = "0.8"
tower-http = { version = "0.4", features = ["trace", "fs"] }
tracing = { version = "0.1", features = ["valuable"] }
tracing = { version = "0.1", features = ["valuable", "attributes"] }
tracing-appender = "0.2"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uuid = { version = "1.3", features = ["serde"] }

160
src/actors/feed_crawler.rs Normal file
View File

@ -0,0 +1,160 @@
use std::fmt::{self, Display, Formatter};
use feed_rs::parser;
use reqwest::Client;
use sqlx::PgPool;
use tokio::sync::{broadcast, mpsc};
use tracing::{info, instrument};
use url::Url;
use crate::models::entry::Entry;
use crate::models::feed::{upsert_feed, CreateFeed, Feed};
/// The `FeedCrawler` actor fetches a feed url, parses it, and saves it to the database.
///
/// It receives `FeedCrawlerMessage` messages via the `receiver` channel. It communicates back to
/// the sender of those messages via the `respond_to` channel on the `FeedCrawlerMessage`.
///
/// `FeedCrawler` should not be instantiated directly. Instead, use the `FeedCrawlerHandle`.
struct FeedCrawler {
receiver: mpsc::Receiver<FeedCrawlerMessage>,
pool: PgPool,
client: Client,
}
#[derive(Debug)]
enum FeedCrawlerMessage {
Crawl {
url: Url,
respond_to: broadcast::Sender<FeedCrawlerHandleMessage>,
},
}
impl Display for FeedCrawlerMessage {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
FeedCrawlerMessage::Crawl { url, .. } => write!(f, "Crawl({})", url),
}
}
}
/// An error type that enumerates possible failures during a crawl and is cloneable and can be sent
/// across threads (does not reference the originating Errors which are usually not cloneable).
#[derive(thiserror::Error, Debug, Clone)]
pub enum FeedCrawlerError {
#[error("failed to fetch feed: {0}")]
FetchError(Url),
#[error("failed to parse feed: {0}")]
ParseError(Url),
#[error("failed to create feed: {0}")]
CreateFeedError(Url),
}
pub type FeedCrawlerResult<T, E = FeedCrawlerError> = ::std::result::Result<T, E>;
impl FeedCrawler {
fn new(receiver: mpsc::Receiver<FeedCrawlerMessage>, pool: PgPool, client: Client) -> Self {
FeedCrawler {
receiver,
pool,
client,
}
}
#[instrument(skip_all, fields(url = %url))]
async fn crawl_feed(&self, url: Url) -> FeedCrawlerResult<Feed> {
let bytes = self
.client
.get(url.clone())
.send()
.await
.map_err(|_| FeedCrawlerError::FetchError(url.clone()))?
.bytes()
.await
.map_err(|_| FeedCrawlerError::FetchError(url.clone()))?;
info!("fetched feed");
let parsed_feed =
parser::parse(&bytes[..]).map_err(|_| FeedCrawlerError::ParseError(url.clone()))?;
info!("parsed feed");
let feed = upsert_feed(
&self.pool,
CreateFeed {
title: parsed_feed.title.map(|text| text.content),
url: url.to_string(),
feed_type: parsed_feed.feed_type.into(),
description: parsed_feed.description.map(|text| text.content),
},
)
.await
.map_err(|_| FeedCrawlerError::CreateFeedError(url.clone()))?;
info!(%feed.feed_id, "upserted feed");
Ok(feed)
}
#[instrument(skip_all, fields(msg = %msg))]
async fn handle_message(&mut self, msg: FeedCrawlerMessage) {
match msg {
FeedCrawlerMessage::Crawl { url, respond_to } => {
let result = self.crawl_feed(url).await;
// ignore the result since the initiator may have cancelled waiting for the
// response, and that is ok
let _ = respond_to.send(FeedCrawlerHandleMessage::Feed(result));
}
}
}
#[instrument(skip_all)]
async fn run(&mut self) {
info!("starting feed crawler");
while let Some(msg) = self.receiver.recv().await {
self.handle_message(msg).await;
}
}
}
/// The `FeedCrawlerHandle` is used to initialize and communicate with a `FeedCrawler` actor.
///
/// The `FeedCrawler` actor fetches a feed url, parses it, and saves it to the database. It runs as
/// a separate asynchronous task from the main web server and communicates via channels.
#[derive(Clone)]
pub struct FeedCrawlerHandle {
sender: mpsc::Sender<FeedCrawlerMessage>,
}
/// The `FeedCrawlerHandleMessage` is the response to a `FeedCrawlerMessage` sent to the
/// `FeedCrawlerHandle`.
///
/// `FeedCrawlerHandleMessage::Feed` contains the result of crawling a feed url.
/// `FeedCrawlerHandleMessage::Entry` contains the result of crawling an entry url.
#[derive(Clone)]
pub enum FeedCrawlerHandleMessage {
Feed(FeedCrawlerResult<Feed>),
Entry(FeedCrawlerResult<Entry>),
}
impl FeedCrawlerHandle {
/// Creates an async actor task that will listen for messages on the `sender` channel.
pub fn new(pool: PgPool, client: Client) -> Self {
let (sender, receiver) = mpsc::channel(8);
let mut crawler = FeedCrawler::new(receiver, pool, client);
tokio::spawn(async move { crawler.run().await });
Self { sender }
}
/// Sends a `FeedCrawlerMessage::Crawl` message to the running `FeedCrawler` actor.
///
/// Listen to the result of the crawl via the returned `broadcast::Receiver`.
pub async fn crawl(&self, url: Url) -> broadcast::Receiver<FeedCrawlerHandleMessage> {
let (sender, receiver) = broadcast::channel(8);
let msg = FeedCrawlerMessage::Crawl {
url,
respond_to: sender,
};
self.sender
.send(msg)
.await
.expect("feed crawler task has died");
receiver
}
}

1
src/actors/mod.rs Normal file
View File

@ -0,0 +1 @@
pub mod feed_crawler;

View File

@ -1,8 +1,8 @@
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use axum::Json;
use tracing::error;
use serde_with::DisplayFromStr;
use tracing::error;
use uuid::Uuid;
use validator::ValidationErrors;
@ -31,6 +31,9 @@ pub enum Error {
#[error("referenced {0} not found")]
RelationNotFound(&'static str),
#[error("an internal server error occurred")]
InternalServerError,
}
pub type Result<T, E = Error> = ::std::result::Result<T, E>;
@ -72,7 +75,9 @@ impl Error {
match self {
NotFound(_, _) => StatusCode::NOT_FOUND,
Sqlx(_) | Anyhow(_) | Reqwest(_) => StatusCode::INTERNAL_SERVER_ERROR,
InternalServerError | Sqlx(_) | Anyhow(_) | Reqwest(_) => {
StatusCode::INTERNAL_SERVER_ERROR
}
InvalidEntity(_) | RelationNotFound(_) => StatusCode::UNPROCESSABLE_ENTITY,
}
}

View File

@ -1,6 +1,9 @@
use std::time::Duration;
use axum::extract::{Path, State};
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response, Redirect};
use axum::response::sse::{Event, KeepAlive};
use axum::response::{IntoResponse, Redirect, Response, Sse};
use axum::Form;
use feed_rs::parser;
use maud::html;
@ -8,13 +11,19 @@ use reqwest::Client;
use serde::Deserialize;
use serde_with::{serde_as, NoneAsEmptyString};
use sqlx::PgPool;
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::StreamExt;
use url::Url;
use crate::actors::feed_crawler::{FeedCrawlerHandle, FeedCrawlerHandleMessage};
use crate::error::{Error, Result};
use crate::models::entry::get_entries_for_feed;
use crate::models::feed::{create_feed, get_feed, CreateFeed, delete_feed};
use crate::models::feed::{create_feed, delete_feed, get_feed, CreateFeed, FeedType};
use crate::partials::{entry_list::entry_list, feed_link::feed_link, layout::Layout};
use crate::uuid::Base62Uuid;
use crate::state::Crawls;
use crate::turbo_stream::TurboStream;
use crate::uuid::Base62Uuid;
pub async fn get(
Path(id): Path<Base62Uuid>,
@ -51,12 +60,16 @@ pub struct AddFeed {
#[derive(thiserror::Error, Debug)]
pub enum AddFeedError {
#[error("invalid feed url: {0}")]
InvalidUrl(String, #[source] url::ParseError),
#[error("failed to fetch feed: {0}")]
FetchError(String, #[source] reqwest::Error),
#[error("failed to parse feed: {0}")]
ParseError(String, #[source] parser::ParseFeedError),
#[error("failed to create feed: {0}")]
CreateFeedError(String, #[source] Error),
#[error("feed already exists: {0}")]
FeedAlreadyExists(String, #[source] Error),
}
pub type AddFeedResult<T, E = AddFeedError> = ::std::result::Result<T, E>;
@ -65,7 +78,9 @@ impl AddFeedError {
use AddFeedError::*;
match self {
FetchError(..) | ParseError(..) => StatusCode::UNPROCESSABLE_ENTITY,
InvalidUrl(..) | FetchError(..) | ParseError(..) | FeedAlreadyExists(..) => {
StatusCode::UNPROCESSABLE_ENTITY
}
CreateFeedError(..) => StatusCode::INTERNAL_SERVER_ERROR,
}
}
@ -92,41 +107,56 @@ impl IntoResponse for AddFeedError {
pub async fn post(
State(pool): State<PgPool>,
State(crawls): State<Crawls>,
Form(add_feed): Form<AddFeed>,
) -> AddFeedResult<Response> {
// TODO: store the client in axum state (as long as it can be used concurrently?)
let client = Client::new();
let bytes = client
.get(&add_feed.url)
.send()
.await
.map_err(|err| AddFeedError::FetchError(add_feed.url.clone(), err))?
.bytes()
.await
.map_err(|err| AddFeedError::FetchError(add_feed.url.clone(), err))?;
let parsed_feed = parser::parse(&bytes[..])
.map_err(|err| AddFeedError::ParseError(add_feed.url.clone(), err))?;
let feed_crawler = FeedCrawlerHandle::new(pool.clone(), client.clone());
let feed = create_feed(
&pool,
CreateFeed {
title: add_feed
.title
.map_or_else(|| parsed_feed.title.map(|text| text.content), Some),
title: add_feed.title,
url: add_feed.url.clone(),
feed_type: parsed_feed.feed_type.into(),
description: add_feed
.description
.map_or_else(|| parsed_feed.description.map(|text| text.content), Some),
feed_type: FeedType::Rss, // eh, get rid of this
description: add_feed.description,
},
)
.await
.map_err(|err| AddFeedError::CreateFeedError(add_feed.url.clone(), err))?;
.map_err(|err| {
if let Error::Sqlx(sqlx::error::Error::Database(db_error)) = &err {
if let Some(code) = db_error.code() {
if let Some(constraint) = db_error.constraint() {
if code == "23505" && constraint == "feed_url_idx" {
return AddFeedError::FeedAlreadyExists(add_feed.url.clone(), err);
}
}
}
}
AddFeedError::CreateFeedError(add_feed.url.clone(), err)
})?;
let url: Url = Url::parse(&add_feed.url)
.map_err(|err| AddFeedError::InvalidUrl(add_feed.url.clone(), err))?;
let receiver = feed_crawler.crawl(url).await;
{
let mut crawls = crawls.lock().map_err(|_| {
AddFeedError::CreateFeedError(add_feed.url.clone(), Error::InternalServerError)
})?;
crawls.insert(feed.feed_id, receiver);
}
let feed_id = format!("feed-{}", Base62Uuid::from(feed.feed_id));
let feed_stream = format!("/feed/{}/stream", Base62Uuid::from(feed.feed_id));
Ok((
StatusCode::CREATED,
TurboStream(
html! {
turbo-stream-source src=(feed_stream) id="feed-stream" {}
turbo-stream action="append" target="feeds" {
template {
li { (feed_link(&feed)) }
li id=(feed_id) { (feed_link(&feed, true)) }
}
}
}
@ -136,10 +166,76 @@ pub async fn post(
.into_response())
}
pub async fn delete(
State(pool): State<PgPool>,
pub async fn stream(
Path(id): Path<Base62Uuid>,
) -> Result<Redirect> {
State(crawls): State<Crawls>,
) -> Result<impl IntoResponse> {
let receiver = {
let mut crawls = crawls.lock().expect("crawls lock poisoned");
crawls.remove(&id.as_uuid())
}
.ok_or_else(|| Error::NotFound("feed stream", id.as_uuid()))?;
let stream = BroadcastStream::new(receiver);
let feed_id = format!("feed-{}", id);
let stream = stream.map(move |msg| match msg {
Ok(FeedCrawlerHandleMessage::Feed(Ok(feed))) => Ok::<Event, String>(
Event::default().data(
html! {
turbo-stream action="remove" target="feed-stream" {}
turbo-stream action="replace" target=(feed_id) {
template {
li id=(feed_id) { (feed_link(&feed, false)) }
}
}
}
.into_string(),
),
),
Ok(FeedCrawlerHandleMessage::Feed(Err(error))) => Ok(Event::default().data(
html! {
turbo-stream action="remove" target="feed-stream" {}
turbo-stream action="replace" target=(feed_id) {
template {
li id=(feed_id) { span class="error" { (error) } }
}
}
}
.into_string(),
)),
// TODO: these Entry messages are not yet sent, need to handle them better
Ok(FeedCrawlerHandleMessage::Entry(Ok(_))) => Ok(Event::default().data(
html! {
turbo-stream action="remove" target="feed-stream" {}
turbo-stream action="replace" target=(feed_id) {
template {
li id=(feed_id) { "fetched entry" }
}
}
}
.into_string(),
)),
Ok(FeedCrawlerHandleMessage::Entry(Err(error))) => Ok(Event::default().data(
html! {
turbo-stream action="remove" target="feed-stream" {}
turbo-stream action="replace" target=(feed_id) {
template {
li id=(feed_id) { span class="error" { (error) } }
}
}
}
.into_string(),
)),
Err(BroadcastStreamRecvError::Lagged(_)) => Ok(Event::default()),
});
Ok(Sse::new(stream).keep_alive(
KeepAlive::new()
.interval(Duration::from_secs(15))
.text("keep-alive-text"),
))
}
pub async fn delete(State(pool): State<PgPool>, Path(id): Path<Base62Uuid>) -> Result<Redirect> {
delete_feed(&pool, id.as_uuid()).await?;
Ok(Redirect::to("/feeds"))
}

View File

@ -14,7 +14,7 @@ pub async fn get(State(pool): State<PgPool>, layout: Layout) -> Result<Response>
div class="feeds" {
ul id="feeds" {
@for feed in feeds {
li { (feed_link(&feed)) }
li { (feed_link(&feed, false)) }
}
}
div class="add-feed" {

View File

@ -4,6 +4,7 @@ use std::time::Duration;
use ansi_to_html::convert_escaped;
use axum::extract::State;
use axum::response::sse::KeepAlive;
use axum::response::{
sse::{Event, Sse},
Response,
@ -44,7 +45,7 @@ pub async fn stream(
))
});
Sse::new(log_stream).keep_alive(
axum::response::sse::KeepAlive::new()
KeepAlive::new()
.interval(Duration::from_secs(15))
.text("keep-alive-text"),
)

View File

@ -1,3 +1,4 @@
pub mod actors;
pub mod config;
pub mod error;
pub mod handlers;

View File

@ -1,4 +1,9 @@
use std::{path::Path, net::SocketAddr};
use std::{
collections::HashMap,
net::SocketAddr,
path::Path,
sync::{Arc, Mutex},
};
use anyhow::Result;
use axum::{
@ -12,7 +17,7 @@ use notify::Watcher;
use sqlx::postgres::PgPoolOptions;
use tokio::sync::watch::channel;
use tower::ServiceBuilder;
use tower_http::{trace::TraceLayer, services::ServeDir};
use tower_http::{services::ServeDir, trace::TraceLayer};
use tower_livereload::LiveReloadLayer;
use tracing::debug;
@ -38,6 +43,8 @@ async fn main() -> Result<()> {
let (log_sender, log_receiver) = channel::<Bytes>(Bytes::new());
let _guards = init_tracing(&config, log_sender)?;
let crawls = Arc::new(Mutex::new(HashMap::new()));
let pool = PgPoolOptions::new()
.max_connections(config.database_max_connections)
.connect(&config.database_url)
@ -57,6 +64,7 @@ async fn main() -> Result<()> {
.route("/feeds", get(handlers::feeds::get))
.route("/feed", post(handlers::feed::post))
.route("/feed/:id", get(handlers::feed::get))
.route("/feed/:id/stream", get(handlers::feed::stream))
.route("/feed/:id/delete", post(handlers::feed::delete))
.route("/entry/:id", get(handlers::entry::get))
.route("/log", get(handlers::log::get))
@ -66,6 +74,7 @@ async fn main() -> Result<()> {
pool,
config,
log_receiver,
crawls,
})
.layer(ServiceBuilder::new().layer(TraceLayer::new_for_http()));
@ -74,10 +83,7 @@ async fn main() -> Result<()> {
let livereload = LiveReloadLayer::new();
let reloader = livereload.reloader();
let mut watcher = notify::recommended_watcher(move |_| reloader.reload())?;
watcher.watch(
Path::new("static"),
notify::RecursiveMode::Recursive,
)?;
watcher.watch(Path::new("static"), notify::RecursiveMode::Recursive)?;
app = app.layer(livereload);
serve(app, addr).await?;
} else {

View File

@ -8,7 +8,7 @@ use crate::error::{Error, Result};
const DEFAULT_ENTRIES_PAGE_SIZE: i64 = 50;
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Entry {
pub entry_id: Uuid,
pub title: Option<String>,
@ -51,10 +51,7 @@ pub struct GetEntriesOptions {
pub limit: Option<i64>,
}
pub async fn get_entries(
pool: &PgPool,
options: GetEntriesOptions,
) -> sqlx::Result<Vec<Entry>> {
pub async fn get_entries(pool: &PgPool, options: GetEntriesOptions) -> sqlx::Result<Vec<Entry>> {
if let Some(published_before) = options.published_before {
sqlx::query_as!(
Entry,
@ -81,7 +78,6 @@ pub async fn get_entries(
)
.fetch_all(pool)
.await
}
}
@ -120,7 +116,6 @@ pub async fn get_entries_for_feed(
)
.fetch_all(pool)
.await
}
}
@ -266,8 +261,11 @@ pub async fn update_entry(pool: &PgPool, payload: Entry) -> Result<Entry> {
}
pub async fn delete_entry(pool: &PgPool, entry_id: Uuid) -> Result<()> {
sqlx::query!("update entry set deleted_at = now() where entry_id = $1", entry_id)
.execute(pool)
.await?;
sqlx::query!(
"update entry set deleted_at = now() where entry_id = $1",
entry_id
)
.execute(pool)
.await?;
Ok(())
}

View File

@ -37,7 +37,7 @@ impl From<feed_rs::model::FeedType> for FeedType {
}
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Feed {
pub feed_id: Uuid,
pub title: Option<String>,
@ -135,9 +135,44 @@ pub async fn create_feed(pool: &PgPool, payload: CreateFeed) -> Result<Feed> {
.await?)
}
pub async fn upsert_feed(pool: &PgPool, payload: CreateFeed) -> Result<Feed> {
payload.validate()?;
Ok(sqlx::query_as!(
Feed,
r#"insert into feed (
title, url, type, description
) values (
$1, $2, $3, $4
) on conflict (url) do update set
title = excluded.title,
url = excluded.url,
type = excluded.type,
description = excluded.description
returning
feed_id,
title,
url,
type as "feed_type: FeedType",
description,
created_at,
updated_at,
deleted_at
"#,
payload.title,
payload.url,
payload.feed_type as FeedType,
payload.description
)
.fetch_one(pool)
.await?)
}
pub async fn delete_feed(pool: &PgPool, feed_id: Uuid) -> Result<()> {
sqlx::query!("update feed set deleted_at = now() where feed_id = $1", feed_id)
.execute(pool)
.await?;
sqlx::query!(
"update feed set deleted_at = now() where feed_id = $1",
feed_id
)
.execute(pool)
.await?;
Ok(())
}

View File

@ -3,8 +3,14 @@ use maud::{html, Markup};
use crate::models::feed::Feed;
use crate::uuid::Base62Uuid;
pub fn feed_link(feed: &Feed) -> Markup {
let title = feed.title.clone().unwrap_or_else(|| "Untitled Feed".to_string());
pub fn feed_link(feed: &Feed, pending_crawl: bool) -> Markup {
let title = feed.title.clone().unwrap_or_else(|| {
if pending_crawl {
"Crawling feed...".to_string()
} else {
"Untitled Feed".to_string()
}
});
let feed_url = format!("/feed/{}", Base62Uuid::from(feed.feed_id));
html! {
a href=(feed_url) { (title) }

View File

@ -1,16 +1,34 @@
use tokio::sync::watch::Receiver;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::sync::{broadcast, watch};
use axum::extract::FromRef;
use bytes::Bytes;
use sqlx::PgPool;
use uuid::Uuid;
use crate::actors::feed_crawler::FeedCrawlerHandleMessage;
use crate::config::Config;
/// A map of feed IDs to a channel receiver for the active `FeedCrawler` running a crawl for that
/// feed.
///
/// Currently, the only purpose of this is to keep track of active crawls so that axum handlers can
/// subscribe to the result of the crawl via the receiver channel which are then sent to end-users
/// as a stream of server-sent events.
///
/// This map should only contain crawls that have just been created but not yet subscribed to.
/// Entries are only added when a user adds a feed in the UI and entries are removed by the same
/// user once a server-sent event connection is established.
pub type Crawls = Arc<Mutex<HashMap<Uuid, broadcast::Receiver<FeedCrawlerHandleMessage>>>>;
#[derive(Clone)]
pub struct AppState {
pub pool: PgPool,
pub config: Config,
pub log_receiver: Receiver<Bytes>,
pub log_receiver: watch::Receiver<Bytes>,
pub crawls: Crawls,
}
impl FromRef<AppState> for PgPool {
@ -25,8 +43,14 @@ impl FromRef<AppState> for Config {
}
}
impl FromRef<AppState> for Receiver<Bytes> {
impl FromRef<AppState> for watch::Receiver<Bytes> {
fn from_ref(state: &AppState) -> Self {
state.log_receiver.clone()
}
}
impl FromRef<AppState> for Crawls {
fn from_ref(state: &AppState) -> Self {
state.crawls.clone()
}
}

View File

@ -1,21 +1,35 @@
use std::fmt::{Display, Formatter, self};
use std::fmt::{self, Display, Formatter};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
const BASE62_CHARS: &[u8] = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
/// A wrapper around a UUID (from `uuid::Uuid`) that serializes to a Base62 string.
///
/// Database rows have a UUID primary key, but they are encoded in Base62 to be shorter and more
/// URL-friendly for the frontend.
#[derive(Debug, Serialize, Deserialize)]
pub struct Base62Uuid(
#[serde(deserialize_with = "uuid_from_base62_str")]
#[serde(serialize_with = "uuid_to_base62_str")]
Uuid
Uuid,
);
impl Base62Uuid {
pub fn as_uuid(&self) -> Uuid {
self.0
}
pub fn new() -> Self {
Self(Uuid::new_v4())
}
}
impl Default for Base62Uuid {
fn default() -> Self {
Self::new()
}
}
impl From<Uuid> for Base62Uuid {