use std::time::Duration; use axum::extract::{Path, State}; use axum::http::StatusCode; use axum::response::sse::{Event, KeepAlive}; use axum::response::{IntoResponse, Redirect, Response, Sse}; use axum::Form; use feed_rs::parser; use maud::html; use serde::Deserialize; use serde_with::{serde_as, NoneAsEmptyString}; use sqlx::PgPool; use tokio_stream::wrappers::BroadcastStream; use tokio_stream::StreamExt; use crate::actors::crawl_scheduler::{CrawlSchedulerHandle, CrawlSchedulerHandleMessage}; use crate::actors::feed_crawler::FeedCrawlerHandleMessage; use crate::error::{Error, Result}; use crate::models::entry::Entry; use crate::models::feed::{CreateFeed, Feed}; use crate::partials::{entry_list::entry_list, feed_link::feed_link, layout::Layout}; use crate::state::Crawls; use crate::turbo_stream::TurboStream; use crate::uuid::Base62Uuid; pub async fn get( Path(id): Path, State(pool): State, layout: Layout, ) -> Result { let feed = Feed::get(&pool, id.as_uuid()).await?; let entries = Entry::get_all_for_feed(&pool, feed.feed_id, Default::default()).await?; let delete_url = format!("/feed/{}/delete", id); Ok(layout.render(html! { header class="feed-header" { h2 { (feed.title.unwrap_or_else(|| "Untitled Feed".to_string())) } button class="edit-feed" { "✏️ Edit feed" } form action=(delete_url) method="post" { button type="submit" class="remove-feed" data-controller="remove-feed" { "❌ Remove feed" } } } @if let Some(description) = feed.description { p { (description) } } (entry_list(entries)) })) } #[serde_as] #[derive(Deserialize)] pub struct AddFeed { url: String, #[serde_as(as = "NoneAsEmptyString")] title: Option, #[serde_as(as = "NoneAsEmptyString")] description: Option, } #[derive(thiserror::Error, Debug)] pub enum AddFeedError { #[error("invalid feed url: {0}")] InvalidUrl(String, #[source] url::ParseError), #[error("failed to fetch feed: {0}")] FetchError(String, #[source] reqwest::Error), #[error("failed to parse feed: {0}")] ParseError(String, #[source] parser::ParseFeedError), #[error("failed to create feed: {0}")] CreateFeedError(String, #[source] Error), #[error("feed already exists: {0}")] FeedAlreadyExists(String, #[source] Error), } pub type AddFeedResult = ::std::result::Result; impl AddFeedError { fn status_code(&self) -> StatusCode { use AddFeedError::*; match self { InvalidUrl(..) | FetchError(..) | ParseError(..) | FeedAlreadyExists(..) => { StatusCode::UNPROCESSABLE_ENTITY } CreateFeedError(..) => StatusCode::INTERNAL_SERVER_ERROR, } } } impl IntoResponse for AddFeedError { fn into_response(self) -> Response { ( self.status_code(), TurboStream( html! { turbo-stream action="append" target="feeds" { template { li { span class="error" { (self) } } } } } .into_string(), ), ) .into_response() } } pub async fn post( State(pool): State, State(crawls): State, State(crawl_scheduler): State, Form(add_feed): Form, ) -> AddFeedResult { let feed = Feed::create( &pool, CreateFeed { title: add_feed.title, url: add_feed.url.clone(), description: add_feed.description, }, ) .await .map_err(|err| { if let Error::Sqlx(sqlx::error::Error::Database(db_error)) = &err { if let Some(code) = db_error.code() { if let Some(constraint) = db_error.constraint() { if code == "23505" && constraint == "feed_url_idx" { return AddFeedError::FeedAlreadyExists(add_feed.url.clone(), err); } } } } AddFeedError::CreateFeedError(add_feed.url.clone(), err) })?; let receiver = crawl_scheduler.schedule(feed.feed_id).await; { let mut crawls = crawls.lock().await; crawls.insert(feed.feed_id, receiver); } let feed_id = format!("feed-{}", Base62Uuid::from(feed.feed_id)); let feed_stream = format!("/feed/{}/stream", Base62Uuid::from(feed.feed_id)); Ok(( StatusCode::CREATED, TurboStream( html! { turbo-stream-source src=(feed_stream) id="feed-stream" {} turbo-stream action="append" target="feeds" { template { li id=(feed_id) { (feed_link(&feed, true)) } } } turbo-stream action="remove" target="no-feeds"; } .into_string(), ), ) .into_response()) } pub async fn stream( Path(id): Path, State(crawls): State, ) -> Result { let receiver = { let mut crawls = crawls.lock().await; crawls.remove(&id.as_uuid()) } .ok_or_else(|| Error::NotFound("feed stream", id.as_uuid()))?; let stream = BroadcastStream::new(receiver); let feed_id = format!("feed-{}", id); let stream = stream.map(move |msg| match msg { Ok(CrawlSchedulerHandleMessage::FeedCrawler(FeedCrawlerHandleMessage::Feed(Ok(feed)))) => { Ok::( Event::default().data( html! { turbo-stream action="remove" target="feed-stream" {} turbo-stream action="replace" target=(feed_id) { template { li id=(feed_id) { (feed_link(&feed, false)) } } } } .into_string(), ), ) } Ok(CrawlSchedulerHandleMessage::FeedCrawler(FeedCrawlerHandleMessage::Feed(Err( error, )))) => Ok(Event::default().data( html! { turbo-stream action="remove" target="feed-stream" {} turbo-stream action="replace" target=(feed_id) { template { li id=(feed_id) { span class="error" { (error) } } } } } .into_string(), )), // TODO: these Entry messages are not yet sent, need to handle them better Ok(CrawlSchedulerHandleMessage::FeedCrawler(FeedCrawlerHandleMessage::Entry(Ok(_)))) => { Ok(Event::default().data( html! { turbo-stream action="remove" target="feed-stream" {} turbo-stream action="replace" target=(feed_id) { template { li id=(feed_id) { "fetched entry" } } } } .into_string(), )) } Ok(CrawlSchedulerHandleMessage::FeedCrawler(FeedCrawlerHandleMessage::Entry(Err( error, )))) => Ok(Event::default().data( html! { turbo-stream action="remove" target="feed-stream" {} turbo-stream action="replace" target=(feed_id) { template { li id=(feed_id) { span class="error" { (error) } } } } } .into_string(), )), _ => Ok(Event::default()), }); Ok(Sse::new(stream).keep_alive( KeepAlive::new() .interval(Duration::from_secs(15)) .text("keep-alive-text"), )) } pub async fn delete(State(pool): State, Path(id): Path) -> Result { Feed::delete(&pool, id.as_uuid()).await?; Ok(Redirect::to("/feeds")) }