Replace hotwire with htmx
In the process, also improve the feedback from the import/add feed forms. I also replaced the frontend code to replace utc timestamps with local time strings with @hotwired/stimulus with vanilla js.
This commit is contained in:
@@ -5,11 +5,13 @@ use bytes::Bytes;
|
||||
use opml::OPML;
|
||||
use sqlx::PgPool;
|
||||
use tokio::sync::{broadcast, mpsc};
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::{debug, error, instrument};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::actors::crawl_scheduler::{CrawlSchedulerHandle, CrawlSchedulerHandleMessage};
|
||||
use crate::models::feed::{Feed, UpsertFeed};
|
||||
use crate::error::Error;
|
||||
use crate::models::feed::{Feed, CreateFeed};
|
||||
use crate::state::Imports;
|
||||
use crate::uuid::Base62Uuid;
|
||||
|
||||
@@ -51,11 +53,12 @@ async fn listen_to_crawl(
|
||||
feed_id: Uuid,
|
||||
crawl_scheduler: CrawlSchedulerHandle,
|
||||
respond_to: broadcast::Sender<ImporterHandleMessage>,
|
||||
) {
|
||||
) -> Uuid {
|
||||
let mut receiver = crawl_scheduler.schedule(feed_id).await;
|
||||
while let Ok(msg) = receiver.recv().await {
|
||||
let _ = respond_to.send(ImporterHandleMessage::CrawlScheduler(msg));
|
||||
}
|
||||
feed_id
|
||||
}
|
||||
|
||||
/// An error type that enumerates possible failures during a crawl and is cloneable and can be sent
|
||||
@@ -95,25 +98,38 @@ impl Importer {
|
||||
let document = OPML::from_reader(&mut Cursor::new(bytes)).map_err(|_| {
|
||||
ImporterError::InvalidOPML(file_name.unwrap_or(Base62Uuid::from(import_id).to_string()))
|
||||
})?;
|
||||
let mut crawls = JoinSet::new();
|
||||
for url in Self::gather_feed_urls(document.body.outlines) {
|
||||
let feed = Feed::upsert(
|
||||
dbg!(&url);
|
||||
let feed = Feed::create(
|
||||
&self.pool,
|
||||
UpsertFeed {
|
||||
CreateFeed {
|
||||
url: url.clone(),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await
|
||||
.map_err(|_| ImporterError::CreateFeedError(url))?;
|
||||
if feed.updated_at.is_none() {
|
||||
tokio::spawn(listen_to_crawl(
|
||||
.await;
|
||||
if let Err(Error::Sqlx(sqlx::error::Error::Database(err))) = feed {
|
||||
if err.is_unique_violation() {
|
||||
dbg!("already imported", &url);
|
||||
let _ = respond_to.send(ImporterHandleMessage::AlreadyImported(url));
|
||||
}
|
||||
} else if let Ok(feed) = feed {
|
||||
crawls.spawn(listen_to_crawl(
|
||||
feed.feed_id,
|
||||
self.crawl_scheduler.clone(),
|
||||
respond_to.clone(),
|
||||
));
|
||||
} else {
|
||||
let _ = respond_to.send(ImporterHandleMessage::CreateFeedError(url));
|
||||
}
|
||||
}
|
||||
|
||||
while let Some(feed_id) = crawls.join_next().await {
|
||||
dbg!("done crawling feed", feed_id);
|
||||
}
|
||||
dbg!("done import_opml");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -137,6 +153,7 @@ impl Importer {
|
||||
bytes,
|
||||
respond_to,
|
||||
} => {
|
||||
dbg!("handle_message", import_id);
|
||||
let result = self
|
||||
.import_opml(import_id, file_name, bytes, respond_to.clone())
|
||||
.await;
|
||||
@@ -178,6 +195,8 @@ pub struct ImporterHandle {
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ImporterHandleMessage {
|
||||
Import(ImporterResult<()>),
|
||||
CreateFeedError(String),
|
||||
AlreadyImported(String),
|
||||
CrawlScheduler(CrawlSchedulerHandleMessage),
|
||||
}
|
||||
|
||||
@@ -200,6 +219,7 @@ impl ImporterHandle {
|
||||
file_name: Option<String>,
|
||||
bytes: Bytes,
|
||||
) -> broadcast::Receiver<ImporterHandleMessage> {
|
||||
dbg!(import_id, &file_name, bytes.len());
|
||||
let (sender, receiver) = broadcast::channel(8);
|
||||
let msg = ImporterMessage::Import {
|
||||
import_id,
|
||||
|
||||
Reference in New Issue
Block a user