From 2f39be4152e9ef22977510a0f1b9d06905955c7c Mon Sep 17 00:00:00 2001 From: Tyler Hallada Date: Tue, 29 Aug 2023 00:35:19 -0400 Subject: [PATCH] Finish implemtning OPML importer Now with progress messages! --- frontend/css/styles.css | 20 +++++++++++-- src/actors/crawl_scheduler.rs | 47 +++++++++++++++++------------- src/actors/feed_crawler.rs | 25 ++++++++++------ src/actors/importer.rs | 32 +++++++++++++-------- src/handlers/feeds.rs | 3 +- src/handlers/import.rs | 54 ++++++++++++++++++++++++++++++----- src/models/feed.rs | 2 +- src/partials/entry_link.rs | 14 +++++++++ src/partials/entry_list.rs | 8 ++---- src/partials/mod.rs | 1 + 10 files changed, 148 insertions(+), 58 deletions(-) create mode 100644 src/partials/entry_link.rs diff --git a/frontend/css/styles.css b/frontend/css/styles.css index e66128c..b98ec08 100644 --- a/frontend/css/styles.css +++ b/frontend/css/styles.css @@ -43,15 +43,15 @@ ul.entries { font-size: 16px; } -ul.entries li { +li.entry { margin-bottom: 8px; } -ul.entries li a { +a.entry-link { text-decoration: none; } -ul.entries li em.domain { +em.entry-link-domain { margin-left: 8px; color: rgba(0, 0, 0, 0.75); } @@ -152,6 +152,20 @@ form.feed-form .form-grid button { grid-column: 3 / 4; } +ul#add-feed-messages { + list-style: none; + padding: 0; + margin: 0; + overflow-x: hidden; + white-space: nowrap; +} + +ul#add-feed-messages li { + overflow: hidden; + white-space: no-wrap; + text-overflow: ellipsis; +} + /* Feed */ header.feed-header { diff --git a/src/actors/crawl_scheduler.rs b/src/actors/crawl_scheduler.rs index 69a9f2f..4213477 100644 --- a/src/actors/crawl_scheduler.rs +++ b/src/actors/crawl_scheduler.rs @@ -123,7 +123,11 @@ impl CrawlScheduler { let crawl_interval = Duration::from_secs(feed.crawl_interval_minutes as u64 * 60); let mut interval = tokio::time::interval(crawl_interval); if let Some(last_crawled_at) = feed.last_crawled_at { + dbg!(last_crawled_at); + dbg!(Utc::now()); if let Ok(duration_since_last_crawl) = (Utc::now() - last_crawled_at).to_std() { + dbg!(duration_since_last_crawl); + dbg!(crawl_interval); if duration_since_last_crawl < crawl_interval { info!( "last crawled at {:?}, crawling again in {:?}", @@ -145,27 +149,27 @@ impl CrawlScheduler { ); tokio::spawn(async move { loop { - debug!("spawned crawler for feed"); interval.tick().await; - debug!("tick!"); let mut receiver = feed_crawler.crawl(feed.feed_id).await; - match receiver.recv().await { - Ok(FeedCrawlerHandleMessage::Feed(Ok(feed))) => { - let crawl_interval = - Duration::from_secs(feed.crawl_interval_minutes as u64 * 60); - interval = interval_at(Instant::now() + crawl_interval, crawl_interval); - info!( - minutes = feed.crawl_interval_minutes, - "updated crawl interval" - ); - let _ = respond_to.send(CrawlSchedulerHandleMessage::FeedCrawler( - FeedCrawlerHandleMessage::Feed(Ok(feed)), - )); + while let Ok(msg) = receiver.recv().await { + match msg { + FeedCrawlerHandleMessage::Feed(Ok(feed)) => { + let crawl_interval = + Duration::from_secs(feed.crawl_interval_minutes as u64 * 60); + interval = interval_at(Instant::now() + crawl_interval, crawl_interval); + info!( + minutes = feed.crawl_interval_minutes, + "updated crawl interval" + ); + let _ = respond_to.send(CrawlSchedulerHandleMessage::FeedCrawler( + FeedCrawlerHandleMessage::Feed(Ok(feed)), + )); + } + result => { + let _ = + respond_to.send(CrawlSchedulerHandleMessage::FeedCrawler(result)); + } } - Ok(result) => { - let _ = respond_to.send(CrawlSchedulerHandleMessage::FeedCrawler(result)); - } - _ => {} } } }); @@ -229,7 +233,7 @@ pub struct CrawlSchedulerHandle { /// `CrawlSchedulerHandle`. /// /// `CrawlSchedulerHandleMessage::Feed` contains the result of crawling a feed url. -#[derive(Clone)] +#[derive(Debug, Clone)] pub enum CrawlSchedulerHandleMessage { Bootstrap(CrawlSchedulerResult<()>), Schedule(CrawlSchedulerResult<()>), @@ -268,7 +272,10 @@ impl CrawlSchedulerHandle { /// Sends a `CrawlSchedulerMessage::Schedule` message to the running `CrawlScheduler` actor. /// /// Listen to the result of the scheduling via the returned `broadcast::Receiver`. - pub async fn schedule(&self, feed_id: Uuid) -> broadcast::Receiver { + pub async fn schedule( + &self, + feed_id: Uuid, + ) -> broadcast::Receiver { let (sender, receiver) = broadcast::channel(8); let msg = CrawlSchedulerMessage::Schedule { feed_id, diff --git a/src/actors/feed_crawler.rs b/src/actors/feed_crawler.rs index f2770f2..face3d1 100644 --- a/src/actors/feed_crawler.rs +++ b/src/actors/feed_crawler.rs @@ -14,7 +14,9 @@ use tracing::{debug, error, info, info_span, instrument, warn}; use url::Url; use uuid::Uuid; -use crate::actors::entry_crawler::EntryCrawlerHandle; +use crate::actors::entry_crawler::{ + EntryCrawlerHandle, EntryCrawlerHandleMessage, EntryCrawlerResult, +}; use crate::domain_locks::DomainLocks; use crate::models::entry::{CreateEntry, Entry}; use crate::models::feed::{Feed, MAX_CRAWL_INTERVAL_MINUTES, MIN_CRAWL_INTERVAL_MINUTES}; @@ -87,7 +89,11 @@ impl FeedCrawler { } #[instrument(skip_all, fields(feed_id = %feed_id))] - async fn crawl_feed(&self, feed_id: Uuid) -> FeedCrawlerResult { + async fn crawl_feed( + &self, + feed_id: Uuid, + respond_to: broadcast::Sender, + ) -> FeedCrawlerResult { let mut feed = Feed::get(&self.pool, feed_id) .await .map_err(|_| FeedCrawlerError::GetFeedError(Base62Uuid::from(feed_id)))?; @@ -159,6 +165,7 @@ impl FeedCrawler { return Ok(feed); } else if !resp.status().is_success() { warn!("feed returned non-successful status"); + feed.last_crawled_at = Some(Utc::now()); feed.last_crawl_error = resp.status().canonical_reason().map(|s| s.to_string()); let feed = feed .save(&self.pool) @@ -172,7 +179,7 @@ impl FeedCrawler { .bytes() .await .map_err(|_| FeedCrawlerError::FetchError(url.clone()))?; - + let parsed_feed = parser::parse(&bytes[..]).map_err(|_| FeedCrawlerError::ParseError(url.clone()))?; info!("parsed feed"); @@ -246,8 +253,10 @@ impl FeedCrawler { self.domain_locks.clone(), self.content_dir.clone(), ); - // TODO: ignoring this receiver for the time being, pipe through events eventually - let _ = entry_crawler.crawl(entry).await; + let mut entry_receiver = entry_crawler.crawl(entry).await; + while let Ok(EntryCrawlerHandleMessage::Entry(result)) = entry_receiver.recv().await { + let _ = respond_to.send(FeedCrawlerHandleMessage::Entry(result)); + } } Ok(feed) } @@ -259,7 +268,7 @@ impl FeedCrawler { feed_id, respond_to, } => { - let result = self.crawl_feed(feed_id).await; + let result = self.crawl_feed(feed_id, respond_to.clone()).await; if let Err(error) = &result { match Feed::update_crawl_error(&self.pool, feed_id, format!("{}", error)).await { @@ -298,10 +307,10 @@ pub struct FeedCrawlerHandle { /// /// `FeedCrawlerHandleMessage::Feed` contains the result of crawling a feed url. /// `FeedCrawlerHandleMessage::Entry` contains the result of crawling an entry url within the feed. -#[derive(Clone)] +#[derive(Debug, Clone)] pub enum FeedCrawlerHandleMessage { Feed(FeedCrawlerResult), - Entry(FeedCrawlerResult), + Entry(EntryCrawlerResult), } impl FeedCrawlerHandle { diff --git a/src/actors/importer.rs b/src/actors/importer.rs index fa9853e..a5a940e 100644 --- a/src/actors/importer.rs +++ b/src/actors/importer.rs @@ -8,8 +8,10 @@ use opml::OPML; use sqlx::PgPool; use tokio::sync::{broadcast, mpsc}; use tracing::{debug, error, instrument}; +use uuid::Uuid; use crate::actors::crawl_scheduler::{CrawlSchedulerHandle, CrawlSchedulerHandleMessage}; +use crate::actors::feed_crawler::FeedCrawlerHandleMessage; use crate::models::feed::{Feed, UpsertFeed}; use crate::uuid::Base62Uuid; @@ -46,6 +48,17 @@ impl Display for ImporterMessage { } } +async fn listen_to_crawl( + feed_id: Uuid, + crawl_scheduler: CrawlSchedulerHandle, + respond_to: broadcast::Sender, +) { + let mut receiver = crawl_scheduler.schedule(feed_id).await; + while let Ok(msg) = receiver.recv().await { + let _ = respond_to.send(ImporterHandleMessage::CrawlScheduler(msg)); + } +} + /// An error type that enumerates possible failures during a crawl and is cloneable and can be sent /// across threads (does not reference the originating Errors which are usually not cloneable). #[derive(thiserror::Error, Debug, Clone)] @@ -80,7 +93,6 @@ impl Importer { ) -> ImporterResult<()> { let document = OPML::from_reader(&mut Cursor::new(bytes)) .map_err(|_| ImporterError::InvalidOPML(file_name.unwrap_or(import_id.to_string())))?; - let mut receivers = Vec::new(); for url in Self::gather_feed_urls(document.body.outlines) { let feed = Feed::upsert( &self.pool, @@ -91,19 +103,15 @@ impl Importer { ) .await .map_err(|_| ImporterError::CreateFeedError(url))?; - if feed.updated_at.is_some() { - receivers.push(self.crawl_scheduler.schedule(feed.feed_id).await); + if feed.updated_at.is_none() { + tokio::spawn(listen_to_crawl( + feed.feed_id, + self.crawl_scheduler.clone(), + respond_to.clone(), + )); } } - let mut future_recvs: FuturesUnordered<_> = - receivers.iter_mut().map(|rx| rx.recv()).collect(); - - while let Some(result) = future_recvs.next().await { - if let Ok(crawl_scheduler_msg) = result { - let _ = respond_to.send(ImporterHandleMessage::CrawlScheduler(crawl_scheduler_msg)); - } - } Ok(()) } @@ -161,7 +169,7 @@ pub struct ImporterHandle { /// /// `ImporterHandleMessage::Import` contains the result of importing the OPML file. #[allow(clippy::large_enum_variant)] -#[derive(Clone)] +#[derive(Debug, Clone)] pub enum ImporterHandleMessage { // TODO: send stats of import or forward crawler messages? Import(ImporterResult<()>), diff --git a/src/handlers/feeds.rs b/src/handlers/feeds.rs index 3c2b7ca..24ec26b 100644 --- a/src/handlers/feeds.rs +++ b/src/handlers/feeds.rs @@ -38,13 +38,14 @@ pub async fn get(State(pool): State, layout: Layout) -> Result button type="submit" { "Add Feed" } } } - form action="/import/opml" method="post" enctype="mulipart/form-data" class="feed-form" { + form action="/import/opml" method="post" enctype="multipart/form-data" class="feed-form" { div class="form-grid" { label for="opml" { "OPML: " } input type="file" id="opml" name="opml" required="true" accept="text/x-opml,application/xml,text/xml"; button type="submit" { "Import Feeds" } } } + ul id="add-feed-messages" {} } } })) diff --git a/src/handlers/import.rs b/src/handlers/import.rs index 8426d84..d3c631b 100644 --- a/src/handlers/import.rs +++ b/src/handlers/import.rs @@ -12,6 +12,7 @@ use crate::actors::crawl_scheduler::CrawlSchedulerHandleMessage; use crate::actors::feed_crawler::FeedCrawlerHandleMessage; use crate::actors::importer::{ImporterHandle, ImporterHandleMessage}; use crate::error::{Error, Result}; +use crate::partials::entry_link::entry_link; use crate::partials::feed_link::feed_link; use crate::state::Imports; use crate::turbo_stream::TurboStream; @@ -23,7 +24,10 @@ pub async fn opml( mut multipart: Multipart, ) -> Result { dbg!("opml handler"); - if let Some(field) = multipart.next_field().await.map_err(|err| { dbg!(&err); err })? { + if let Some(field) = multipart.next_field().await.map_err(|err| { + dbg!(&err); + err + })? { let import_id = Base62Uuid::new(); dbg!(&import_id); let file_name = field.file_name().map(|s| s.to_string()); @@ -43,9 +47,9 @@ pub async fn opml( TurboStream( html! { turbo-stream-source src=(import_stream) id="import-stream" {} - turbo-stream action="append" target="feeds" { + turbo-stream action="append" target="add-feed-messages" { template { - li id=(import_html_id) { "Importing..." } + li { "Uploading file..." } } } turbo-stream action="remove" target="no-feeds"; @@ -75,10 +79,21 @@ pub async fn stream( Ok(ImporterHandleMessage::Import(Ok(_))) => Ok::( Event::default().data( html! { - turbo-stream action="remove" target="import-stream" {} - turbo-stream action="replace" target=(import_html_id) { + turbo-stream action="append" target="add-feed-messages" { + template { li { "Importing...." } } + } + } + .into_string(), + ), + ), + Ok(ImporterHandleMessage::CrawlScheduler(CrawlSchedulerHandleMessage::FeedCrawler( + FeedCrawlerHandleMessage::Entry(Ok(entry)), + ))) => Ok::( + Event::default().data( + html! { + turbo-stream action="append" target="add-feed-messages" { template { - li id=(import_html_id) { "Done importing" } + li { "Imported: " (entry_link(entry)) } } } } @@ -90,6 +105,12 @@ pub async fn stream( ))) => Ok::( Event::default().data( html! { + turbo-stream action="remove" target="import-stream" {} + turbo-stream action="append" target="add-feed-messages" { + template { + li { "Finished import." } + } + } turbo-stream action="prepend" target="feeds" { template { li id=(format!("feed-{}", feed.feed_id)) { (feed_link(&feed, false)) } @@ -104,7 +125,21 @@ pub async fn stream( ))) => Ok::( Event::default().data( html! { - turbo-stream action="prepend" target="feeds" { + turbo-stream action="append" target="add-feed-messages" { + template { + li { span class="error" { (error) } } + } + } + } + .into_string(), + ), + ), + Ok(ImporterHandleMessage::CrawlScheduler(CrawlSchedulerHandleMessage::FeedCrawler( + FeedCrawlerHandleMessage::Entry(Err(error)), + ))) => Ok::( + Event::default().data( + html! { + turbo-stream action="append" target="add-feed-messages" { template { li { span class="error" { (error) } } } @@ -116,6 +151,11 @@ pub async fn stream( Ok(ImporterHandleMessage::Import(Err(error))) => Ok(Event::default().data( html! { turbo-stream action="remove" target="import-stream" {} + turbo-stream action="append" target="add-feed-messages" { + template { + li { span class="error" { (error) } } + } + } turbo-stream action="replace" target=(import_html_id) { template { li id=(import_html_id) { span class="error" { (error) } } diff --git a/src/models/feed.rs b/src/models/feed.rs index 2f52827..6c910a8 100644 --- a/src/models/feed.rs +++ b/src/models/feed.rs @@ -444,7 +444,7 @@ impl Feed { r#"insert into feed ( title, url, type, description ) values ( - $1, $2, $3, $4 + $1, $2, COALESCE($3, 'unknown'::feed_type), $4 ) on conflict (url) do update set title = excluded.title, url = excluded.url, diff --git a/src/partials/entry_link.rs b/src/partials/entry_link.rs new file mode 100644 index 0000000..4d206e5 --- /dev/null +++ b/src/partials/entry_link.rs @@ -0,0 +1,14 @@ +use maud::{html, Markup}; + +use crate::models::entry::Entry; +use crate::utils::get_domain; +use crate::uuid::Base62Uuid; + +pub fn entry_link(entry: Entry) -> Markup { + let title = entry.title.unwrap_or_else(|| "Untitled".to_string()); + let url = format!("/entry/{}", Base62Uuid::from(entry.entry_id)); + let domain = get_domain(&entry.url).unwrap_or_default(); + html! { + a href=(url) class="entry-link" { (title) } em class="entry-link-domain" { (domain) } + } +} diff --git a/src/partials/entry_list.rs b/src/partials/entry_list.rs index 3c7cac0..23bcd90 100644 --- a/src/partials/entry_list.rs +++ b/src/partials/entry_list.rs @@ -1,17 +1,13 @@ use maud::{html, Markup}; use crate::models::entry::Entry; -use crate::utils::get_domain; -use crate::uuid::Base62Uuid; +use crate::partials::entry_link::entry_link; pub fn entry_list(entries: Vec) -> Markup { html! { ul class="entries" { @for entry in entries { - @let title = entry.title.unwrap_or_else(|| "Untitled".to_string()); - @let url = format!("/entry/{}", Base62Uuid::from(entry.entry_id)); - @let domain = get_domain(&entry.url).unwrap_or_default(); - li { a href=(url) { (title) } em class="domain" { (domain) }} + li class="entry" { (entry_link(entry)) } } } } diff --git a/src/partials/mod.rs b/src/partials/mod.rs index 063d2a2..98b8441 100644 --- a/src/partials/mod.rs +++ b/src/partials/mod.rs @@ -1,3 +1,4 @@ +pub mod entry_link; pub mod entry_list; pub mod feed_link; pub mod header;