diff --git a/README.md b/README.md index 2cd5be4..7b3d478 100644 --- a/README.md +++ b/README.md @@ -88,8 +88,3 @@ You can also build the binary in release mode for running in production with the This project also comes with a CLI binary which allows you to manipulate the database directly without needing to go through the REST API server. Run `cli --help` to see all of the available commands. - -## Running jobs - -To periodically fetch new items from all of the feeds execute the `cli crawl` -command in a cronjob. diff --git a/migrations/20230507201612_initial.sql b/migrations/20230507201612_initial.sql index c0b9e80..cb1a879 100644 --- a/migrations/20230507201612_initial.sql +++ b/migrations/20230507201612_initial.sql @@ -31,14 +31,18 @@ $$ language plpgsql; -- over things like usernames and emails, ithout needing to remember to do case-conversion. create collation case_insensitive (provider = icu, locale = 'und-u-ks-level2', deterministic = false); -create type feed_type as enum ('atom', 'rss'); +create type feed_type as enum ('atom', 'json', 'rss0', 'rss1', 'rss2', 'unknown'); create table if not exists "feed" ( feed_id uuid primary key default uuid_generate_v1mc(), title text, url varchar(2048) not null, - type feed_type not null, - description text, + type feed_type not null default 'unknown', + description text default null, + crawl_interval_minutes int not null default 180, + last_crawl_error text default null, + last_crawled_at timestamptz default null, + last_entry_published_at timestamptz default null, created_at timestamptz not null default now(), updated_at timestamptz, deleted_at timestamptz diff --git a/src/actors/feed_crawler.rs b/src/actors/feed_crawler.rs index cd44080..4a03bdb 100644 --- a/src/actors/feed_crawler.rs +++ b/src/actors/feed_crawler.rs @@ -8,11 +8,13 @@ use tokio::sync::{broadcast, mpsc}; use tracing::log::warn; use tracing::{info, info_span, instrument}; use url::Url; +use uuid::Uuid; use crate::actors::entry_crawler::EntryCrawlerHandle; use crate::domain_locks::DomainLocks; -use crate::models::entry::{upsert_entries, CreateEntry, Entry}; -use crate::models::feed::{upsert_feed, CreateFeed, Feed}; +use crate::models::entry::{CreateEntry, Entry}; +use crate::models::feed::Feed; +use crate::uuid::Base62Uuid; /// The `FeedCrawler` actor fetches a feed url, parses it, and saves it to the database. /// @@ -31,7 +33,7 @@ struct FeedCrawler { #[derive(Debug)] enum FeedCrawlerMessage { Crawl { - url: Url, + feed_id: Uuid, respond_to: broadcast::Sender, }, } @@ -39,7 +41,7 @@ enum FeedCrawlerMessage { impl Display for FeedCrawlerMessage { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { - FeedCrawlerMessage::Crawl { url, .. } => write!(f, "Crawl({})", url), + FeedCrawlerMessage::Crawl { feed_id, .. } => write!(f, "Crawl({})", feed_id), } } } @@ -49,11 +51,13 @@ impl Display for FeedCrawlerMessage { #[derive(thiserror::Error, Debug, Clone)] pub enum FeedCrawlerError { #[error("invalid feed url: {0}")] - InvalidUrl(Url), + InvalidUrl(String), #[error("failed to fetch feed: {0}")] FetchError(Url), #[error("failed to parse feed: {0}")] ParseError(Url), + #[error("failed to find feed in database: {0}")] + GetFeedError(Base62Uuid), #[error("failed to create feed: {0}")] CreateFeedError(Url), #[error("failed to create feed entries: {0}")] @@ -78,11 +82,17 @@ impl FeedCrawler { } } - #[instrument(skip_all, fields(url = %url))] - async fn crawl_feed(&self, url: Url) -> FeedCrawlerResult { + #[instrument(skip_all, fields(feed_id = %feed_id))] + async fn crawl_feed(&self, feed_id: Uuid) -> FeedCrawlerResult { + let mut feed = Feed::get(&self.pool, feed_id) + .await + .map_err(|_| FeedCrawlerError::GetFeedError(Base62Uuid::from(feed_id)))?; + info!("got feed from db"); + let url = Url::parse(&feed.url) + .map_err(|_| FeedCrawlerError::InvalidUrl(feed.url.clone()))?; let domain = url .domain() - .ok_or(FeedCrawlerError::InvalidUrl(url.clone()))?; + .ok_or(FeedCrawlerError::InvalidUrl(feed.url.clone()))?; let bytes = self .domain_locks .run_request(domain, async { @@ -96,22 +106,24 @@ impl FeedCrawler { .map_err(|_| FeedCrawlerError::FetchError(url.clone())) }) .await?; - info!("fetched feed"); + info!(url=%url, "fetched feed"); let parsed_feed = parser::parse(&bytes[..]).map_err(|_| FeedCrawlerError::ParseError(url.clone()))?; info!("parsed feed"); - let feed = upsert_feed( - &self.pool, - CreateFeed { - title: parsed_feed.title.map(|text| text.content), - url: url.to_string(), - feed_type: parsed_feed.feed_type.into(), - description: parsed_feed.description.map(|text| text.content), - }, - ) - .await - .map_err(|_| FeedCrawlerError::CreateFeedError(url.clone()))?; - info!(%feed.feed_id, "upserted feed"); + feed.url = url.to_string(); + feed.feed_type = parsed_feed.feed_type.into(); + feed.last_crawled_at = Some(Utc::now()); + if let Some(title) = parsed_feed.title { + feed.title = Some(title.content); + } + if let Some(description) = parsed_feed.description { + feed.description = Some(description.content); + } + let feed = feed + .save(&self.pool) + .await + .map_err(|_| FeedCrawlerError::CreateFeedError(url.clone()))?; + info!("updated feed in db"); let mut payload = Vec::with_capacity(parsed_feed.entries.len()); for entry in parsed_feed.entries { @@ -132,7 +144,7 @@ impl FeedCrawler { warn!("Skipping feed entry with no links"); } } - let entries = upsert_entries(&self.pool, payload) + let entries = Entry::bulk_upsert(&self.pool, payload) .await .map_err(|_| FeedCrawlerError::CreateFeedEntriesError(url.clone()))?; let (new, updated) = entries @@ -156,8 +168,11 @@ impl FeedCrawler { #[instrument(skip_all, fields(msg = %msg))] async fn handle_message(&mut self, msg: FeedCrawlerMessage) { match msg { - FeedCrawlerMessage::Crawl { url, respond_to } => { - let result = self.crawl_feed(url).await; + FeedCrawlerMessage::Crawl { + feed_id, + respond_to, + } => { + let result = self.crawl_feed(feed_id).await; // ignore the result since the initiator may have cancelled waiting for the // response, and that is ok let _ = respond_to.send(FeedCrawlerHandleMessage::Feed(result)); @@ -212,10 +227,13 @@ impl FeedCrawlerHandle { /// Sends a `FeedCrawlerMessage::Crawl` message to the running `FeedCrawler` actor. /// /// Listen to the result of the crawl via the returned `broadcast::Receiver`. - pub async fn crawl(&self, url: Url) -> broadcast::Receiver { + pub async fn crawl( + &self, + feed_id: Uuid, + ) -> broadcast::Receiver { let (sender, receiver) = broadcast::channel(8); let msg = FeedCrawlerMessage::Crawl { - url, + feed_id, respond_to: sender, }; diff --git a/src/bin/cli.rs b/src/bin/cli.rs index eefad14..e5f1fd2 100644 --- a/src/bin/cli.rs +++ b/src/bin/cli.rs @@ -1,15 +1,17 @@ use anyhow::Result; -use clap::{Args, Parser, Subcommand}; use chrono::Utc; +use clap::{Args, Parser, Subcommand}; use dotenvy::dotenv; +use lib::actors::feed_crawler::FeedCrawlerHandle; +use lib::domain_locks::DomainLocks; +use reqwest::Client; use sqlx::postgres::PgPoolOptions; use std::env; use tracing::info; use uuid::Uuid; -use lib::jobs::crawl::crawl; -use lib::models::feed::{create_feed, delete_feed, CreateFeed, FeedType}; -use lib::models::entry::{create_entry, delete_entry, CreateEntry}; +use lib::models::entry::{Entry, CreateEntry}; +use lib::models::feed::{CreateFeed, Feed, FeedType}; use lib::uuid::Base62Uuid; /// CLI for crawlnicle @@ -23,14 +25,20 @@ struct Cli { #[derive(Subcommand)] enum Commands { - /// Fetches new entries from all feeds in the database - Crawl, + Crawl(CrawlFeed), AddFeed(AddFeed), DeleteFeed(DeleteFeed), AddEntry(AddEntry), DeleteEntry(DeleteEntry), } +#[derive(Args)] +/// Crawl a feed (get new entries) +struct CrawlFeed { + /// id of the feed to crawl + id: Uuid, +} + /// Add a feed to the database #[derive(Args)] struct AddFeed { @@ -94,12 +102,11 @@ pub async fn main() -> Result<()> { match cli.commands { Commands::AddFeed(args) => { - let feed = create_feed( + let feed = Feed::create( &pool, CreateFeed { title: args.title, url: args.url, - feed_type: args.feed_type, description: args.description, }, ) @@ -107,11 +114,11 @@ pub async fn main() -> Result<()> { info!("Created feed with id {}", Base62Uuid::from(feed.feed_id)); } Commands::DeleteFeed(args) => { - delete_feed(&pool, args.id).await?; + Feed::delete(&pool, args.id).await?; info!("Deleted feed with id {}", Base62Uuid::from(args.id)); } Commands::AddEntry(args) => { - let entry = create_entry( + let entry = Entry::create( &pool, CreateEntry { title: args.title, @@ -125,12 +132,22 @@ pub async fn main() -> Result<()> { info!("Created entry with id {}", Base62Uuid::from(entry.entry_id)); } Commands::DeleteEntry(args) => { - delete_entry(&pool, args.id).await?; + Entry::delete(&pool, args.id).await?; info!("Deleted entry with id {}", Base62Uuid::from(args.id)); } - Commands::Crawl => { - info!("Crawling..."); - crawl(&pool).await?; + Commands::Crawl(CrawlFeed { id }) => { + info!("Crawling feed {}...", Base62Uuid::from(id)); + let client = Client::new(); + // NOTE: this is not the same DomainLocks as the one used in the server so, if the + // server is running, it will *not* serialize same-domain requests with it. + let domain_locks = DomainLocks::new(); + let feed_crawler = FeedCrawlerHandle::new( + pool.clone(), + client.clone(), + domain_locks.clone(), + env::var("CONTENT_DIR")?, + ); + let _ = feed_crawler.crawl(id).await; } } diff --git a/src/handlers/api/entries.rs b/src/handlers/api/entries.rs index 11ef05a..5c6ff2b 100644 --- a/src/handlers/api/entries.rs +++ b/src/handlers/api/entries.rs @@ -2,8 +2,8 @@ use axum::{extract::State, Json}; use sqlx::PgPool; use crate::error::Error; -use crate::models::entry::{get_entries, Entry, GetEntriesOptions}; +use crate::models::entry::Entry; pub async fn get(State(pool): State) -> Result>, Error> { - Ok(Json(get_entries(&pool, GetEntriesOptions::default()).await?)) + Ok(Json(Entry::get_all(&pool, Default::default()).await?)) } diff --git a/src/handlers/api/entry.rs b/src/handlers/api/entry.rs index bcb810f..349bdb3 100644 --- a/src/handlers/api/entry.rs +++ b/src/handlers/api/entry.rs @@ -5,19 +5,19 @@ use axum::{ use sqlx::PgPool; use crate::error::Error; -use crate::models::entry::{create_entry, get_entry, CreateEntry, Entry}; +use crate::models::entry::{CreateEntry, Entry}; use crate::uuid::Base62Uuid; pub async fn get( State(pool): State, Path(id): Path, ) -> Result, Error> { - Ok(Json(get_entry(&pool, id.as_uuid()).await?)) + Ok(Json(Entry::get(&pool, id.as_uuid()).await?)) } pub async fn post( State(pool): State, Json(payload): Json, ) -> Result, Error> { - Ok(Json(create_entry(&pool, payload).await?)) + Ok(Json(Entry::create(&pool, payload).await?)) } diff --git a/src/handlers/api/feed.rs b/src/handlers/api/feed.rs index db2b924..2c78f68 100644 --- a/src/handlers/api/feed.rs +++ b/src/handlers/api/feed.rs @@ -5,20 +5,20 @@ use axum::{ use sqlx::PgPool; use crate::error::{Error, Result}; -use crate::models::feed::{create_feed, delete_feed, get_feed, CreateFeed, Feed}; +use crate::models::feed::{CreateFeed, Feed}; use crate::uuid::Base62Uuid; pub async fn get(State(pool): State, Path(id): Path) -> Result> { - Ok(Json(get_feed(&pool, id.as_uuid()).await?)) + Ok(Json(Feed::get(&pool, id.as_uuid()).await?)) } pub async fn post( State(pool): State, Json(payload): Json, ) -> Result, Error> { - Ok(Json(create_feed(&pool, payload).await?)) + Ok(Json(Feed::create(&pool, payload).await?)) } pub async fn delete(State(pool): State, Path(id): Path) -> Result<()> { - delete_feed(&pool, id.as_uuid()).await + Feed::delete(&pool, id.as_uuid()).await } diff --git a/src/handlers/api/feeds.rs b/src/handlers/api/feeds.rs index 1507648..89e1fe0 100644 --- a/src/handlers/api/feeds.rs +++ b/src/handlers/api/feeds.rs @@ -2,8 +2,9 @@ use axum::{extract::State, Json}; use sqlx::PgPool; use crate::error::Error; -use crate::models::feed::{get_feeds, Feed}; +use crate::models::feed::Feed; pub async fn get(State(pool): State) -> Result>, Error> { - Ok(Json(get_feeds(&pool).await?)) + // TODO: pagination + Ok(Json(Feed::get_all(&pool).await?)) } diff --git a/src/handlers/entry.rs b/src/handlers/entry.rs index bb0ea64..51d60e5 100644 --- a/src/handlers/entry.rs +++ b/src/handlers/entry.rs @@ -7,7 +7,7 @@ use sqlx::PgPool; use crate::config::Config; use crate::error::Result; -use crate::models::entry::get_entry; +use crate::models::entry::Entry; use crate::partials::layout::Layout; use crate::uuid::Base62Uuid; @@ -17,7 +17,7 @@ pub async fn get( State(config): State, layout: Layout, ) -> Result { - let entry = get_entry(&pool, id.as_uuid()).await?; + let entry = Entry::get(&pool, id.as_uuid()).await?; let content_dir = std::path::Path::new(&config.content_dir); let content_path = content_dir.join(format!("{}.html", entry.entry_id)); Ok(layout.render(html! { diff --git a/src/handlers/feed.rs b/src/handlers/feed.rs index 753009c..a8eb0e8 100644 --- a/src/handlers/feed.rs +++ b/src/handlers/feed.rs @@ -20,8 +20,8 @@ use crate::actors::feed_crawler::{FeedCrawlerHandle, FeedCrawlerHandleMessage}; use crate::config::Config; use crate::domain_locks::DomainLocks; use crate::error::{Error, Result}; -use crate::models::entry::get_entries_for_feed; -use crate::models::feed::{create_feed, delete_feed, get_feed, CreateFeed, FeedType}; +use crate::models::entry::Entry; +use crate::models::feed::{CreateFeed, Feed}; use crate::partials::{entry_list::entry_list, feed_link::feed_link, layout::Layout}; use crate::state::Crawls; use crate::turbo_stream::TurboStream; @@ -32,8 +32,8 @@ pub async fn get( State(pool): State, layout: Layout, ) -> Result { - let feed = get_feed(&pool, id.as_uuid()).await?; - let entries = get_entries_for_feed(&pool, feed.feed_id, Default::default()).await?; + let feed = Feed::get(&pool, id.as_uuid()).await?; + let entries = Entry::get_all_for_feed(&pool, feed.feed_id, Default::default()).await?; let delete_url = format!("/feed/{}/delete", id); Ok(layout.render(html! { header class="feed-header" { @@ -123,12 +123,11 @@ pub async fn post( config.content_dir.clone(), ); - let feed = create_feed( + let feed = Feed::create( &pool, CreateFeed { title: add_feed.title, url: add_feed.url.clone(), - feed_type: FeedType::Rss, // eh, get rid of this description: add_feed.description, }, ) @@ -148,7 +147,7 @@ pub async fn post( let url: Url = Url::parse(&add_feed.url) .map_err(|err| AddFeedError::InvalidUrl(add_feed.url.clone(), err))?; - let receiver = feed_crawler.crawl(url).await; + let receiver = feed_crawler.crawl(feed.feed_id).await; { let mut crawls = crawls.lock().map_err(|_| { AddFeedError::CreateFeedError(add_feed.url.clone(), Error::InternalServerError) @@ -245,6 +244,6 @@ pub async fn stream( } pub async fn delete(State(pool): State, Path(id): Path) -> Result { - delete_feed(&pool, id.as_uuid()).await?; + Feed::delete(&pool, id.as_uuid()).await?; Ok(Redirect::to("/feeds")) } diff --git a/src/handlers/feeds.rs b/src/handlers/feeds.rs index 133c314..cbf1453 100644 --- a/src/handlers/feeds.rs +++ b/src/handlers/feeds.rs @@ -4,11 +4,12 @@ use maud::html; use sqlx::PgPool; use crate::error::Result; -use crate::models::feed::get_feeds; +use crate::models::feed::Feed; use crate::partials::{feed_link::feed_link, layout::Layout}; pub async fn get(State(pool): State, layout: Layout) -> Result { - let feeds = get_feeds(&pool).await?; + // TODO: pagination + let feeds = Feed::get_all(&pool).await?; Ok(layout.render(html! { h2 { "Feeds" } div class="feeds" { diff --git a/src/handlers/home.rs b/src/handlers/home.rs index 247d3e7..da7a1f8 100644 --- a/src/handlers/home.rs +++ b/src/handlers/home.rs @@ -3,10 +3,10 @@ use axum::response::Response; use sqlx::PgPool; use crate::error::Result; -use crate::models::entry::{get_entries, GetEntriesOptions}; +use crate::models::entry::Entry; use crate::partials::{layout::Layout, entry_list::entry_list}; pub async fn get(State(pool): State, layout: Layout) -> Result { - let entries = get_entries(&pool, GetEntriesOptions::default()).await?; + let entries = Entry::get_all(&pool, Default::default()).await?; Ok(layout.render(entry_list(entries))) } diff --git a/src/jobs/crawl.rs b/src/jobs/crawl.rs deleted file mode 100644 index 421f99e..0000000 --- a/src/jobs/crawl.rs +++ /dev/null @@ -1,77 +0,0 @@ -use std::fs; -use std::env; -use std::path::Path; - -use article_scraper::ArticleScraper; -use chrono::Utc; -use feed_rs::parser; -use reqwest::{Client, Url}; -use sqlx::PgPool; -use tracing::{info, info_span, warn}; - -use crate::models::feed::get_feeds; -use crate::models::entry::{update_entry, upsert_entries, CreateEntry}; -use crate::uuid::Base62Uuid; - -/// DEPRECATED: Use FeedCrawler instead, keeping this for reference until I set up scheduled jobs. -/// For every feed in the database, fetches the feed, parses it, and saves new entries to the -/// database. -pub async fn crawl(pool: &PgPool) -> anyhow::Result<()> { - let scraper = ArticleScraper::new(None).await; - let client = Client::new(); - let content_dir = env::var("CONTENT_DIR")?; - let content_dir = Path::new(&content_dir); - let feeds = get_feeds(pool).await?; - for feed in feeds { - let feed_id_str: String = Base62Uuid::from(feed.feed_id).into(); - let feed_span = info_span!("feed", id = feed_id_str, url = feed.url.as_str()); - let _feed_span_guard = feed_span.enter(); - info!("Fetching feed"); - // TODO: handle these results - let bytes = client.get(feed.url).send().await?.bytes().await?; - info!("Parsing feed"); - let parsed_feed = parser::parse(&bytes[..])?; - let mut payload = Vec::with_capacity(parsed_feed.entries.len()); - for entry in parsed_feed.entries { - let entry_span = info_span!("entry", id = entry.id, title = entry.title.clone().map(|t| t.content)); - let _entry_span_guard = entry_span.enter(); - if let Some(link) = entry.links.get(0) { - // if no scraped or feed date is available, fallback to the current time - let published_at = entry.published.unwrap_or_else(Utc::now); - let entry = CreateEntry { - title: entry.title.map(|t| t.content), - url: link.href.clone(), - description: entry.summary.map(|s| s.content), - feed_id: feed.feed_id, - published_at, - }; - payload.push(entry); - } else { - warn!("Skipping feed entry with no links"); - } - } - let entries = upsert_entries(pool, payload).await?; - info!("Created {} entries", entries.len()); - - // TODO: figure out how to do this in parallel. ArticleScraper uses some libxml thing that - // doesn't implement Send so this isn't trivial. - for mut entry in entries { - info!("Fetching and parsing entry link: {}", entry.url); - if let Ok(article) = scraper.parse(&Url::parse(&entry.url)?, true, &client, None).await { - let id = entry.entry_id; - if let Some(date) = article.date { - // prefer scraped date over rss feed date - entry.published_at = date; - update_entry(pool, entry).await?; - }; - let html_content = article.get_content(); - if let Some(content) = html_content { - fs::write(content_dir.join(format!("{}.html", id)), content)?; - } - } else { - warn!("Failed to fetch article for entry: {:?}", &entry.url); - } - } - } - Ok(()) -} diff --git a/src/jobs/mod.rs b/src/jobs/mod.rs deleted file mode 100644 index f274818..0000000 --- a/src/jobs/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod crawl; diff --git a/src/lib.rs b/src/lib.rs index 99192bc..213ef93 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,7 +3,6 @@ pub mod config; pub mod domain_locks; pub mod error; pub mod handlers; -pub mod jobs; pub mod log; pub mod models; pub mod partials; diff --git a/src/models/entry.rs b/src/models/entry.rs index 79d9c45..2561ff8 100644 --- a/src/models/entry.rs +++ b/src/models/entry.rs @@ -33,273 +33,275 @@ pub struct CreateEntry { pub published_at: DateTime, } -pub async fn get_entry(pool: &PgPool, entry_id: Uuid) -> Result { - sqlx::query_as!(Entry, "select * from entry where entry_id = $1", entry_id) - .fetch_one(pool) - .await - .map_err(|error| { - if let sqlx::error::Error::RowNotFound = error { - return Error::NotFound("entry", entry_id); - } - Error::Sqlx(error) - }) -} - #[derive(Default)] pub struct GetEntriesOptions { pub published_before: Option>, pub limit: Option, } -pub async fn get_entries(pool: &PgPool, options: GetEntriesOptions) -> sqlx::Result> { - if let Some(published_before) = options.published_before { +impl Entry { + pub async fn get(pool: &PgPool, entry_id: Uuid) -> Result { + sqlx::query_as!(Entry, "select * from entry where entry_id = $1", entry_id) + .fetch_one(pool) + .await + .map_err(|error| { + if let sqlx::error::Error::RowNotFound = error { + return Error::NotFound("entry", entry_id); + } + Error::Sqlx(error) + }) + } + + pub async fn get_all(pool: &PgPool, options: GetEntriesOptions) -> sqlx::Result> { + if let Some(published_before) = options.published_before { + sqlx::query_as!( + Entry, + "select * from entry + where deleted_at is null + and published_at < $1 + order by published_at desc + limit $2 + ", + published_before, + options.limit.unwrap_or(DEFAULT_ENTRIES_PAGE_SIZE) + ) + .fetch_all(pool) + .await + } else { + sqlx::query_as!( + Entry, + "select * from entry + where deleted_at is null + order by published_at desc + limit $1 + ", + options.limit.unwrap_or(DEFAULT_ENTRIES_PAGE_SIZE) + ) + .fetch_all(pool) + .await + } + } + + pub async fn get_all_for_feed( + pool: &PgPool, + feed_id: Uuid, + options: GetEntriesOptions, + ) -> sqlx::Result> { + if let Some(published_before) = options.published_before { + sqlx::query_as!( + Entry, + "select * from entry + where deleted_at is null + and feed_id = $1 + and published_at < $2 + order by published_at desc + limit $3 + ", + feed_id, + published_before, + options.limit.unwrap_or(DEFAULT_ENTRIES_PAGE_SIZE) + ) + .fetch_all(pool) + .await + } else { + sqlx::query_as!( + Entry, + "select * from entry + where deleted_at is null + and feed_id = $1 + order by published_at desc + limit $2 + ", + feed_id, + options.limit.unwrap_or(DEFAULT_ENTRIES_PAGE_SIZE) + ) + .fetch_all(pool) + .await + } + } + + pub async fn create(pool: &PgPool, payload: CreateEntry) -> Result { + payload.validate()?; sqlx::query_as!( Entry, - "select * from entry - where deleted_at is null - and published_at < $1 - order by published_at desc - limit $2 - ", - published_before, - options.limit.unwrap_or(DEFAULT_ENTRIES_PAGE_SIZE) + "insert into entry ( + title, url, description, feed_id, published_at + ) values ( + $1, $2, $3, $4, $5 + ) returning *", + payload.title, + payload.url, + payload.description, + payload.feed_id, + payload.published_at, + ) + .fetch_one(pool) + .await + .map_err(|error| { + if let sqlx::error::Error::Database(ref psql_error) = error { + if psql_error.code().as_deref() == Some("23503") { + return Error::RelationNotFound("feed"); + } + } + Error::Sqlx(error) + }) + } + + pub async fn upsert(pool: &PgPool, payload: CreateEntry) -> Result { + payload.validate()?; + sqlx::query_as!( + Entry, + "insert into entry ( + title, url, description, feed_id, published_at + ) values ( + $1, $2, $3, $4, $5 + ) on conflict (url, feed_id) do update set + title = excluded.title, + description = excluded.description, + published_at = excluded.published_at + returning *", + payload.title, + payload.url, + payload.description, + payload.feed_id, + payload.published_at, + ) + .fetch_one(pool) + .await + .map_err(|error| { + if let sqlx::error::Error::Database(ref psql_error) = error { + if psql_error.code().as_deref() == Some("23503") { + return Error::RelationNotFound("feed"); + } + } + Error::Sqlx(error) + }) + } + + pub async fn bulk_create(pool: &PgPool, payload: Vec) -> Result> { + let mut titles = Vec::with_capacity(payload.len()); + let mut urls = Vec::with_capacity(payload.len()); + let mut descriptions: Vec> = Vec::with_capacity(payload.len()); + let mut feed_ids = Vec::with_capacity(payload.len()); + let mut published_ats = Vec::with_capacity(payload.len()); + payload + .iter() + .map(|entry| { + titles.push(entry.title.clone()); + urls.push(entry.url.clone()); + descriptions.push(entry.description.clone()); + feed_ids.push(entry.feed_id); + published_ats.push(entry.published_at); + entry.validate() + }) + .collect::, ValidationErrors>>()?; + sqlx::query_as!( + Entry, + "insert into entry ( + title, url, description, feed_id, published_at + ) select * from unnest($1::text[], $2::text[], $3::text[], $4::uuid[], $5::timestamptz[]) + returning *", + titles.as_slice() as &[Option], + urls.as_slice(), + descriptions.as_slice() as &[Option], + feed_ids.as_slice(), + published_ats.as_slice(), ) .fetch_all(pool) .await - } else { + .map_err(|error| { + if let sqlx::error::Error::Database(ref psql_error) = error { + if psql_error.code().as_deref() == Some("23503") { + return Error::RelationNotFound("feed"); + } + } + Error::Sqlx(error) + }) + } + + pub async fn bulk_upsert(pool: &PgPool, payload: Vec) -> Result> { + let mut titles = Vec::with_capacity(payload.len()); + let mut urls = Vec::with_capacity(payload.len()); + let mut descriptions: Vec> = Vec::with_capacity(payload.len()); + let mut feed_ids = Vec::with_capacity(payload.len()); + let mut published_ats = Vec::with_capacity(payload.len()); + payload + .iter() + .map(|entry| { + titles.push(entry.title.clone()); + urls.push(entry.url.clone()); + descriptions.push(entry.description.clone()); + feed_ids.push(entry.feed_id); + published_ats.push(entry.published_at); + entry.validate() + }) + .collect::, ValidationErrors>>()?; sqlx::query_as!( Entry, - "select * from entry - where deleted_at is null - order by published_at desc - limit $1 - ", - options.limit.unwrap_or(DEFAULT_ENTRIES_PAGE_SIZE) + "insert into entry ( + title, url, description, feed_id, published_at + ) select * from unnest($1::text[], $2::text[], $3::text[], $4::uuid[], $5::timestamptz[]) + on conflict (url, feed_id) do update set + title = excluded.title, + description = excluded.description, + published_at = excluded.published_at + returning *", + titles.as_slice() as &[Option], + urls.as_slice(), + descriptions.as_slice() as &[Option], + feed_ids.as_slice(), + published_ats.as_slice(), ) .fetch_all(pool) .await + .map_err(|error| { + if let sqlx::error::Error::Database(ref psql_error) = error { + if psql_error.code().as_deref() == Some("23503") { + return Error::RelationNotFound("feed"); + } + } + Error::Sqlx(error) + }) + } + + pub async fn update(pool: &PgPool, payload: Entry) -> Result { + sqlx::query_as!( + Entry, + "update entry set + title = $2, + url = $3, + description = $4, + feed_id = $5, + published_at = $6 + where entry_id = $1 + returning * + ", + payload.entry_id, + payload.title, + payload.url, + payload.description, + payload.feed_id, + payload.published_at, + ) + .fetch_one(pool) + .await + .map_err(|error| { + if let sqlx::error::Error::Database(ref psql_error) = error { + if psql_error.code().as_deref() == Some("23503") { + return Error::RelationNotFound("feed"); + } + } + Error::Sqlx(error) + }) + } + + pub async fn delete(pool: &PgPool, entry_id: Uuid) -> Result<()> { + sqlx::query!( + "update entry set deleted_at = now() where entry_id = $1", + entry_id + ) + .execute(pool) + .await?; + Ok(()) } } - -pub async fn get_entries_for_feed( - pool: &PgPool, - feed_id: Uuid, - options: GetEntriesOptions, -) -> sqlx::Result> { - if let Some(published_before) = options.published_before { - sqlx::query_as!( - Entry, - "select * from entry - where deleted_at is null - and feed_id = $1 - and published_at < $2 - order by published_at desc - limit $3 - ", - feed_id, - published_before, - options.limit.unwrap_or(DEFAULT_ENTRIES_PAGE_SIZE) - ) - .fetch_all(pool) - .await - } else { - sqlx::query_as!( - Entry, - "select * from entry - where deleted_at is null - and feed_id = $1 - order by published_at desc - limit $2 - ", - feed_id, - options.limit.unwrap_or(DEFAULT_ENTRIES_PAGE_SIZE) - ) - .fetch_all(pool) - .await - } -} - -pub async fn create_entry(pool: &PgPool, payload: CreateEntry) -> Result { - payload.validate()?; - sqlx::query_as!( - Entry, - "insert into entry ( - title, url, description, feed_id, published_at - ) values ( - $1, $2, $3, $4, $5 - ) returning *", - payload.title, - payload.url, - payload.description, - payload.feed_id, - payload.published_at, - ) - .fetch_one(pool) - .await - .map_err(|error| { - if let sqlx::error::Error::Database(ref psql_error) = error { - if psql_error.code().as_deref() == Some("23503") { - return Error::RelationNotFound("feed"); - } - } - Error::Sqlx(error) - }) -} - -pub async fn upsert_entry(pool: &PgPool, payload: CreateEntry) -> Result { - payload.validate()?; - sqlx::query_as!( - Entry, - "insert into entry ( - title, url, description, feed_id, published_at - ) values ( - $1, $2, $3, $4, $5 - ) on conflict (url, feed_id) do update set - title = excluded.title, - description = excluded.description, - published_at = excluded.published_at - returning *", - payload.title, - payload.url, - payload.description, - payload.feed_id, - payload.published_at, - ) - .fetch_one(pool) - .await - .map_err(|error| { - if let sqlx::error::Error::Database(ref psql_error) = error { - if psql_error.code().as_deref() == Some("23503") { - return Error::RelationNotFound("feed"); - } - } - Error::Sqlx(error) - }) -} - -pub async fn create_entries(pool: &PgPool, payload: Vec) -> Result> { - let mut titles = Vec::with_capacity(payload.len()); - let mut urls = Vec::with_capacity(payload.len()); - let mut descriptions: Vec> = Vec::with_capacity(payload.len()); - let mut feed_ids = Vec::with_capacity(payload.len()); - let mut published_ats = Vec::with_capacity(payload.len()); - payload - .iter() - .map(|entry| { - titles.push(entry.title.clone()); - urls.push(entry.url.clone()); - descriptions.push(entry.description.clone()); - feed_ids.push(entry.feed_id); - published_ats.push(entry.published_at); - entry.validate() - }) - .collect::, ValidationErrors>>()?; - sqlx::query_as!( - Entry, - "insert into entry ( - title, url, description, feed_id, published_at - ) select * from unnest($1::text[], $2::text[], $3::text[], $4::uuid[], $5::timestamptz[]) - returning *", - titles.as_slice() as &[Option], - urls.as_slice(), - descriptions.as_slice() as &[Option], - feed_ids.as_slice(), - published_ats.as_slice(), - ) - .fetch_all(pool) - .await - .map_err(|error| { - if let sqlx::error::Error::Database(ref psql_error) = error { - if psql_error.code().as_deref() == Some("23503") { - return Error::RelationNotFound("feed"); - } - } - Error::Sqlx(error) - }) -} - -pub async fn upsert_entries(pool: &PgPool, payload: Vec) -> Result> { - let mut titles = Vec::with_capacity(payload.len()); - let mut urls = Vec::with_capacity(payload.len()); - let mut descriptions: Vec> = Vec::with_capacity(payload.len()); - let mut feed_ids = Vec::with_capacity(payload.len()); - let mut published_ats = Vec::with_capacity(payload.len()); - payload - .iter() - .map(|entry| { - titles.push(entry.title.clone()); - urls.push(entry.url.clone()); - descriptions.push(entry.description.clone()); - feed_ids.push(entry.feed_id); - published_ats.push(entry.published_at); - entry.validate() - }) - .collect::, ValidationErrors>>()?; - sqlx::query_as!( - Entry, - "insert into entry ( - title, url, description, feed_id, published_at - ) select * from unnest($1::text[], $2::text[], $3::text[], $4::uuid[], $5::timestamptz[]) - on conflict (url, feed_id) do update set - title = excluded.title, - description = excluded.description, - published_at = excluded.published_at - returning *", - titles.as_slice() as &[Option], - urls.as_slice(), - descriptions.as_slice() as &[Option], - feed_ids.as_slice(), - published_ats.as_slice(), - ) - .fetch_all(pool) - .await - .map_err(|error| { - if let sqlx::error::Error::Database(ref psql_error) = error { - if psql_error.code().as_deref() == Some("23503") { - return Error::RelationNotFound("feed"); - } - } - Error::Sqlx(error) - }) -} - -pub async fn update_entry(pool: &PgPool, payload: Entry) -> Result { - sqlx::query_as!( - Entry, - "update entry set - title = $2, - url = $3, - description = $4, - feed_id = $5, - published_at = $6 - where entry_id = $1 - returning * - ", - payload.entry_id, - payload.title, - payload.url, - payload.description, - payload.feed_id, - payload.published_at, - ) - .fetch_one(pool) - .await - .map_err(|error| { - if let sqlx::error::Error::Database(ref psql_error) = error { - if psql_error.code().as_deref() == Some("23503") { - return Error::RelationNotFound("feed"); - } - } - Error::Sqlx(error) - }) -} - -pub async fn delete_entry(pool: &PgPool, entry_id: Uuid) -> Result<()> { - sqlx::query!( - "update entry set deleted_at = now() where entry_id = $1", - entry_id - ) - .execute(pool) - .await?; - Ok(()) -} diff --git a/src/models/feed.rs b/src/models/feed.rs index 812979b..ab4f77a 100644 --- a/src/models/feed.rs +++ b/src/models/feed.rs @@ -2,18 +2,22 @@ use std::str::FromStr; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; -use sqlx::PgPool; +use sqlx::{FromRow, PgPool}; use uuid::Uuid; use validator::Validate; use crate::error::{Error, Result}; -#[derive(Debug, Serialize, Deserialize, sqlx::Type, Clone)] +#[derive(Debug, Serialize, Deserialize, sqlx::Type, Clone, Copy)] #[sqlx(type_name = "feed_type", rename_all = "lowercase")] #[serde(rename_all = "lowercase")] pub enum FeedType { Atom, - Rss, + JSON, + RSS0, + RSS1, + RSS2, + Unknown, } impl FromStr for FeedType { @@ -21,7 +25,11 @@ impl FromStr for FeedType { fn from_str(s: &str) -> Result { match s { "atom" => Ok(FeedType::Atom), - "rss" => Ok(FeedType::Rss), + "json" => Ok(FeedType::JSON), + "rss0" => Ok(FeedType::RSS0), + "rss1" => Ok(FeedType::RSS1), + "rss2" => Ok(FeedType::RSS2), + "unknown" => Ok(FeedType::Unknown), _ => Err(format!("invalid feed type: {}", s)), } } @@ -31,13 +39,15 @@ impl From for FeedType { fn from(value: feed_rs::model::FeedType) -> Self { match value { feed_rs::model::FeedType::Atom => FeedType::Atom, - // TODO: this isn't really accurate - _ => FeedType::Rss, + feed_rs::model::FeedType::JSON => FeedType::JSON, + feed_rs::model::FeedType::RSS0 => FeedType::RSS0, + feed_rs::model::FeedType::RSS1 => FeedType::RSS1, + feed_rs::model::FeedType::RSS2 => FeedType::RSS2, } } } -#[derive(Debug, Serialize, Deserialize, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone, FromRow)] pub struct Feed { pub feed_id: Uuid, pub title: Option, @@ -45,6 +55,10 @@ pub struct Feed { #[serde(rename = "type")] pub feed_type: FeedType, pub description: Option, + pub crawl_interval_minutes: i32, + pub last_crawl_error: Option, + pub last_crawled_at: Option>, + pub last_entry_published_at: Option>, pub created_at: DateTime, pub updated_at: Option>, pub deleted_at: Option>, @@ -56,123 +70,250 @@ pub struct CreateFeed { pub title: Option, #[validate(url)] pub url: String, - #[serde(rename = "type")] - pub feed_type: FeedType, #[validate(length(max = 524288))] pub description: Option, } -pub async fn get_feed(pool: &PgPool, feed_id: Uuid) -> Result { - sqlx::query_as!( - Feed, - // Unable to SELECT * here due to https://github.com/launchbadge/sqlx/issues/1004 - // language=PostGreSQL - r#"select - feed_id, - title, - url, - type as "feed_type: FeedType", - description, - created_at, - updated_at, - deleted_at - from feed where feed_id = $1"#, - feed_id - ) - .fetch_one(pool) - .await - .map_err(|error| { - if let sqlx::error::Error::RowNotFound = error { - return Error::NotFound("feed", feed_id); +#[derive(Debug, Deserialize, Validate)] +pub struct UpsertFeed { + #[validate(length(max = 255))] + pub title: Option, + #[validate(url)] + pub url: String, + pub feed_type: Option, + #[validate(length(max = 524288))] + pub description: Option, + pub last_crawl_error: Option, + pub last_crawled_at: Option>, +} + +#[derive(Debug, Deserialize, Default, Validate)] +pub struct UpdateFeed { + #[validate(length(max = 255))] + pub title: Option>, + #[validate(url)] + pub url: Option, + pub feed_type: Option, + #[validate(length(max = 524288))] + pub description: Option>, + pub crawl_interval_minutes: Option, + pub last_crawl_error: Option>, + pub last_crawled_at: Option>>, + pub last_entry_published_at: Option>>, +} + +impl Feed { + pub async fn get(pool: &PgPool, feed_id: Uuid) -> Result { + sqlx::query_as!( + Feed, + // Unable to SELECT * here due to https://github.com/launchbadge/sqlx/issues/1004 + // language=PostGreSQL + r#"select + feed_id, + title, + url, + type as "feed_type: FeedType", + description, + crawl_interval_minutes, + last_crawl_error, + last_crawled_at, + last_entry_published_at, + created_at, + updated_at, + deleted_at + from feed where feed_id = $1"#, + feed_id + ) + .fetch_one(pool) + .await + .map_err(|error| { + if let sqlx::error::Error::RowNotFound = error { + return Error::NotFound("feed", feed_id); + } + Error::Sqlx(error) + }) + } + + pub async fn get_all(pool: &PgPool) -> sqlx::Result> { + sqlx::query_as!( + Feed, + r#"select + feed_id, + title, + url, + type as "feed_type: FeedType", + description, + crawl_interval_minutes, + last_crawl_error, + last_crawled_at, + last_entry_published_at, + created_at, + updated_at, + deleted_at + from feed + where deleted_at is null"# + ) + .fetch_all(pool) + .await + } + + pub async fn create(pool: &PgPool, payload: CreateFeed) -> Result { + payload.validate()?; + Ok(sqlx::query_as!( + Feed, + r#"insert into feed ( + title, url, description + ) values ( + $1, $2, $3 + ) returning + feed_id, + title, + url, + type as "feed_type: FeedType", + description, + crawl_interval_minutes, + last_crawl_error, + last_crawled_at, + last_entry_published_at, + created_at, + updated_at, + deleted_at + "#, + payload.title, + payload.url, + payload.description + ) + .fetch_one(pool) + .await?) + } + + pub async fn upsert(pool: &PgPool, payload: UpsertFeed) -> Result { + payload.validate()?; + Ok(sqlx::query_as!( + Feed, + r#"insert into feed ( + title, url, type, description + ) values ( + $1, $2, $3, $4 + ) on conflict (url) do update set + title = excluded.title, + url = excluded.url, + type = COALESCE(excluded.type, feed.type), + description = excluded.description + returning + feed_id, + title, + url, + type as "feed_type: FeedType", + description, + crawl_interval_minutes, + last_crawl_error, + last_crawled_at, + last_entry_published_at, + created_at, + updated_at, + deleted_at + "#, + payload.title, + payload.url, + payload.feed_type as Option, + payload.description + ) + .fetch_one(pool) + .await?) + } + + pub async fn update(pool: &PgPool, feed_id: Uuid, payload: UpdateFeed) -> Result { + payload.validate()?; + let mut query = sqlx::QueryBuilder::new("UPDATE feed SET "); + + let mut updates = query.separated(", "); + if let Some(title) = payload.title { + updates.push_unseparated("title = "); + updates.push_bind(title); + } + if let Some(url) = payload.url { + updates.push_unseparated("url = "); + updates.push_bind(url); + } + if let Some(description) = payload.description { + updates.push_unseparated("description = "); + updates.push_bind(description); + } + if let Some(crawl_interval_minutes) = payload.crawl_interval_minutes { + updates.push("crawl_interval_minutes = "); + updates.push_bind(crawl_interval_minutes); + } + if let Some(last_crawl_error) = payload.last_crawl_error { + updates.push_unseparated("last_crawl_error = "); + updates.push_bind(last_crawl_error); + } + if let Some(last_crawled_at) = payload.last_crawled_at { + updates.push_unseparated("last_crawled_at = "); + updates.push_bind(last_crawled_at); + } + if let Some(last_entry_published_at) = payload.last_entry_published_at { + updates.push_unseparated("last_entry_published_at = "); + updates.push_bind(last_entry_published_at); } - Error::Sqlx(error) - }) -} -pub async fn get_feeds(pool: &PgPool) -> sqlx::Result> { - sqlx::query_as!( - Feed, - r#"select - feed_id, - title, - url, - type as "feed_type: FeedType", - description, - created_at, - updated_at, - deleted_at - from feed - where deleted_at is null"# - ) - .fetch_all(pool) - .await -} + query.push(" WHERE id = "); + query.push_bind(feed_id); + query.push(" RETURNING *"); -pub async fn create_feed(pool: &PgPool, payload: CreateFeed) -> Result { - payload.validate()?; - Ok(sqlx::query_as!( - Feed, - r#"insert into feed ( - title, url, type, description - ) values ( - $1, $2, $3, $4 - ) returning - feed_id, - title, - url, - type as "feed_type: FeedType", - description, - created_at, - updated_at, - deleted_at - "#, - payload.title, - payload.url, - payload.feed_type as FeedType, - payload.description - ) - .fetch_one(pool) - .await?) -} + let query = query.build_query_as(); -pub async fn upsert_feed(pool: &PgPool, payload: CreateFeed) -> Result { - payload.validate()?; - Ok(sqlx::query_as!( - Feed, - r#"insert into feed ( - title, url, type, description - ) values ( - $1, $2, $3, $4 - ) on conflict (url) do update set - title = excluded.title, - url = excluded.url, - type = excluded.type, - description = excluded.description - returning - feed_id, - title, - url, - type as "feed_type: FeedType", - description, - created_at, - updated_at, - deleted_at - "#, - payload.title, - payload.url, - payload.feed_type as FeedType, - payload.description - ) - .fetch_one(pool) - .await?) -} + Ok(query.fetch_one(pool).await?) + } -pub async fn delete_feed(pool: &PgPool, feed_id: Uuid) -> Result<()> { - sqlx::query!( - "update feed set deleted_at = now() where feed_id = $1", - feed_id - ) - .execute(pool) - .await?; - Ok(()) + pub async fn delete(pool: &PgPool, feed_id: Uuid) -> Result<()> { + sqlx::query!( + "update feed set deleted_at = now() where feed_id = $1", + feed_id + ) + .execute(pool) + .await?; + Ok(()) + } + + pub async fn save(&self, pool: &PgPool) -> Result { + Ok(sqlx::query_as!( + Feed, + r#"update feed set + title = $2, + url = $3, + type = $4, + description = $5, + crawl_interval_minutes = $6, + last_crawl_error = $7, + last_crawled_at = $8, + last_entry_published_at = $9 + where feed_id = $1 + returning + feed_id, + title, + url, + type as "feed_type: FeedType", + description, + crawl_interval_minutes, + last_crawl_error, + last_crawled_at, + last_entry_published_at, + created_at, + updated_at, + deleted_at + "#, + self.feed_id, + self.title, + self.url, + self.feed_type as FeedType, + self.description, + self.crawl_interval_minutes, + self.last_crawl_error, + self.last_crawled_at, + self.last_entry_published_at, + ) + .fetch_one(pool) + .await?) + } } diff --git a/src/uuid.rs b/src/uuid.rs index 8453458..191a7aa 100644 --- a/src/uuid.rs +++ b/src/uuid.rs @@ -9,7 +9,7 @@ const BASE62_CHARS: &[u8] = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmn /// /// Database rows have a UUID primary key, but they are encoded in Base62 to be shorter and more /// URL-friendly for the frontend. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone, Copy)] pub struct Base62Uuid( #[serde(deserialize_with = "uuid_from_base62_str")] #[serde(serialize_with = "uuid_to_base62_str")]