Add published_at to entries, begin to support pagination

Articles will be sorted by their published_at dates for now.
This commit is contained in:
Tyler Hallada 2023-06-08 01:20:21 -04:00
parent 3f29138bd1
commit 758e644173
6 changed files with 106 additions and 36 deletions

View File

@ -20,9 +20,10 @@ CREATE TABLE IF NOT EXISTS "entries" (
"description" TEXT, "description" TEXT,
"html_content" TEXT, "html_content" TEXT,
"feed_id" INTEGER REFERENCES "feeds"(id) NOT NULL, "feed_id" INTEGER REFERENCES "feeds"(id) NOT NULL,
"published_at" timestamp(3) NOT NULL,
"created_at" timestamp(3) NOT NULL, "created_at" timestamp(3) NOT NULL,
"updated_at" timestamp(3) NOT NULL, "updated_at" timestamp(3) NOT NULL,
"deleted_at" timestamp(3) "deleted_at" timestamp(3)
); );
CREATE INDEX "entries_deleted_at" ON "entries" ("deleted_at"); CREATE INDEX "entries_published_at_where_deleted_at_is_null" ON "entries" ("published_at" DESC) WHERE "deleted_at" IS NULL;
CREATE UNIQUE INDEX "entries_url_and_feed_id" ON "entries" ("url", "feed_id"); CREATE UNIQUE INDEX "entries_url_and_feed_id" ON "entries" ("url", "feed_id");

View File

@ -1,5 +1,6 @@
use anyhow::Result; use anyhow::Result;
use argh::FromArgs; use argh::FromArgs;
use chrono::Utc;
use dotenvy::dotenv; use dotenvy::dotenv;
use sqlx::postgres::PgPoolOptions; use sqlx::postgres::PgPoolOptions;
use std::env; use std::env;
@ -125,6 +126,7 @@ pub async fn main() -> Result<()> {
description: args.description, description: args.description,
html_content: None, html_content: None,
feed_id: args.feed_id, feed_id: args.feed_id,
published_at: Utc::now().naive_utc(),
}, },
) )
.await?; .await?;

View File

@ -2,8 +2,8 @@ use axum::{extract::State, Json};
use sqlx::PgPool; use sqlx::PgPool;
use crate::error::Error; use crate::error::Error;
use crate::models::entry::{get_entries, Entry}; use crate::models::entry::{get_entries, Entry, GetEntriesOptions};
pub async fn get(State(pool): State<PgPool>) -> Result<Json<Vec<Entry>>, Error> { pub async fn get(State(pool): State<PgPool>) -> Result<Json<Vec<Entry>>, Error> {
Ok(Json(get_entries(&pool).await?)) Ok(Json(get_entries(&pool, GetEntriesOptions::default()).await?))
} }

View File

@ -4,11 +4,11 @@ use maud::html;
use sqlx::PgPool; use sqlx::PgPool;
use crate::error::Result; use crate::error::Result;
use crate::models::entry::get_entries; use crate::models::entry::{get_entries, GetEntriesOptions};
use crate::partials::layout::Layout; use crate::partials::layout::Layout;
pub async fn get(State(pool): State<PgPool>, layout: Layout) -> Result<Response> { pub async fn get(State(pool): State<PgPool>, layout: Layout) -> Result<Response> {
let entries = get_entries(&pool).await?; let entries = get_entries(&pool, GetEntriesOptions::default()).await?;
Ok(layout.render(html! { Ok(layout.render(html! {
ul { ul {
@for entry in entries { @for entry in entries {

View File

@ -1,8 +1,9 @@
use article_scraper::ArticleScraper; use article_scraper::ArticleScraper;
use chrono::Utc;
use feed_rs::parser; use feed_rs::parser;
use reqwest::{Client, Url}; use reqwest::{Client, Url};
use sqlx::PgPool; use sqlx::PgPool;
use tracing::{info, warn}; use tracing::{info, info_span, warn};
use crate::models::feed::get_feeds; use crate::models::feed::get_feeds;
use crate::models::entry::{upsert_entries, CreateEntry}; use crate::models::entry::{upsert_entries, CreateEntry};
@ -14,24 +15,39 @@ pub async fn crawl(pool: &PgPool) -> anyhow::Result<()> {
let client = Client::new(); let client = Client::new();
let feeds = get_feeds(pool).await?; let feeds = get_feeds(pool).await?;
for feed in feeds { for feed in feeds {
info!("Fetching feed {}: {}", feed.id, feed.url); let _feed_span = info_span!("feed", id = feed.id, url = feed.url.as_str());
info!("Fetching feed");
// TODO: handle these results
let bytes = client.get(feed.url).send().await?.bytes().await?; let bytes = client.get(feed.url).send().await?.bytes().await?;
info!("Parsing feed");
let parsed_feed = parser::parse(&bytes[..])?; let parsed_feed = parser::parse(&bytes[..])?;
let mut payload = Vec::with_capacity(parsed_feed.entries.len()); let mut payload = Vec::with_capacity(parsed_feed.entries.len());
for entry in parsed_feed.entries { for entry in parsed_feed.entries {
let _entry_span = info_span!("entry", id = entry.id, title = entry.title.clone().map(|t| t.content));
if let Some(link) = entry.links.get(0) { if let Some(link) = entry.links.get(0) {
info!("Fetching entry article: {}", link.href); // if no scraped or feed date is available, fallback to the current time
let article = scraper.parse(&Url::parse(&link.href)?, true, &client, None).await?; let published_at = entry.published.unwrap_or_else(Utc::now).naive_utc();
let entry = CreateEntry { let mut entry = CreateEntry {
title: entry.title.map(|t| t.content), title: entry.title.map(|t| t.content),
url: link.href.clone(), url: link.href.clone(),
description: entry.summary.map(|s| s.content), description: entry.summary.map(|s| s.content),
html_content: article.get_content(), html_content: None,
feed_id: feed.id, feed_id: feed.id,
published_at,
}; };
info!("Fetching and parsing entry link: {}", link.href);
if let Ok(article) = scraper.parse(&Url::parse(&link.href)?, true, &client, None).await {
if let Some(date) = article.date {
// prefer scraped date over rss feed date
entry.published_at = date.naive_utc()
};
entry.html_content = article.get_content();
} else {
warn!("Failed to fetch article for entry: {:?}", link);
}
payload.push(entry); payload.push(entry);
} else { } else {
warn!("Feed entry has no links: {:?}", entry); warn!("Skipping feed entry with no links");
} }
} }
let entries = upsert_entries(pool, payload).await?; let entries = upsert_entries(pool, payload).await?;

View File

@ -5,6 +5,8 @@ use validator::{Validate, ValidationErrors};
use crate::error::{Error, Result}; use crate::error::{Error, Result};
const DEFAULT_ENTRIES_PAGE_SIZE: i64 = 50;
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct Entry { pub struct Entry {
pub id: i32, pub id: i32,
@ -13,6 +15,7 @@ pub struct Entry {
pub description: Option<String>, pub description: Option<String>,
pub html_content: Option<String>, pub html_content: Option<String>,
pub feed_id: i32, pub feed_id: i32,
pub published_at: NaiveDateTime,
pub created_at: NaiveDateTime, pub created_at: NaiveDateTime,
pub updated_at: NaiveDateTime, pub updated_at: NaiveDateTime,
pub deleted_at: Option<NaiveDateTime>, pub deleted_at: Option<NaiveDateTime>,
@ -29,6 +32,7 @@ pub struct CreateEntry {
pub html_content: Option<String>, pub html_content: Option<String>,
#[validate(range(min = 1))] #[validate(range(min = 1))]
pub feed_id: i32, pub feed_id: i32,
pub published_at: NaiveDateTime,
} }
pub async fn get_entry(pool: &PgPool, id: i32) -> Result<Entry> { pub async fn get_entry(pool: &PgPool, id: i32) -> Result<Entry> {
@ -43,10 +47,44 @@ pub async fn get_entry(pool: &PgPool, id: i32) -> Result<Entry> {
}) })
} }
pub async fn get_entries(pool: &PgPool) -> sqlx::Result<Vec<Entry>> { #[derive(Default)]
sqlx::query_as!(Entry, "SELECT * FROM entries WHERE deleted_at IS NULL") pub struct GetEntriesOptions {
pub published_before: Option<NaiveDateTime>,
pub limit: Option<i64>,
}
pub async fn get_entries(
pool: &PgPool,
options: GetEntriesOptions,
) -> sqlx::Result<Vec<Entry>> {
if let Some(published_before) = options.published_before {
sqlx::query_as!(
Entry,
"SELECT * FROM entries
WHERE deleted_at IS NULL
AND published_at < $1
ORDER BY published_at DESC
LIMIT $2
",
published_before,
options.limit.unwrap_or(DEFAULT_ENTRIES_PAGE_SIZE)
)
.fetch_all(pool) .fetch_all(pool)
.await .await
} else {
sqlx::query_as!(
Entry,
"SELECT * FROM entries
WHERE deleted_at IS NULL
ORDER BY published_at DESC
LIMIT $1
",
options.limit.unwrap_or(DEFAULT_ENTRIES_PAGE_SIZE)
)
.fetch_all(pool)
.await
}
} }
pub async fn create_entry(pool: &PgPool, payload: CreateEntry) -> Result<Entry> { pub async fn create_entry(pool: &PgPool, payload: CreateEntry) -> Result<Entry> {
@ -54,15 +92,16 @@ pub async fn create_entry(pool: &PgPool, payload: CreateEntry) -> Result<Entry>
sqlx::query_as!( sqlx::query_as!(
Entry, Entry,
"INSERT INTO entries ( "INSERT INTO entries (
title, url, description, html_content, feed_id, created_at, updated_at title, url, description, html_content, feed_id, published_at, created_at, updated_at
) VALUES ( ) VALUES (
$1, $2, $3, $4, $5, now(), now() $1, $2, $3, $4, $5, $6, now(), now()
) RETURNING *", ) RETURNING *",
payload.title, payload.title,
payload.url, payload.url,
payload.description, payload.description,
payload.html_content, payload.html_content,
payload.feed_id, payload.feed_id,
payload.published_at,
) )
.fetch_one(pool) .fetch_one(pool)
.await .await
@ -82,25 +121,31 @@ pub async fn create_entries(pool: &PgPool, payload: Vec<CreateEntry>) -> Result<
let mut descriptions: Vec<Option<String>> = Vec::with_capacity(payload.len()); let mut descriptions: Vec<Option<String>> = Vec::with_capacity(payload.len());
let mut html_contents: Vec<Option<String>> = Vec::with_capacity(payload.len()); let mut html_contents: Vec<Option<String>> = Vec::with_capacity(payload.len());
let mut feed_ids = Vec::with_capacity(payload.len()); let mut feed_ids = Vec::with_capacity(payload.len());
payload.iter().map(|entry| { let mut published_ats = Vec::with_capacity(payload.len());
titles.push(entry.title.clone()); payload
urls.push(entry.url.clone()); .iter()
descriptions.push(entry.description.clone()); .map(|entry| {
html_contents.push(entry.html_content.clone()); titles.push(entry.title.clone());
feed_ids.push(entry.feed_id); urls.push(entry.url.clone());
entry.validate() descriptions.push(entry.description.clone());
}).collect::<Result<Vec<()>, ValidationErrors>>()?; html_contents.push(entry.html_content.clone());
feed_ids.push(entry.feed_id);
published_ats.push(entry.published_at);
entry.validate()
})
.collect::<Result<Vec<()>, ValidationErrors>>()?;
sqlx::query_as!( sqlx::query_as!(
Entry, Entry,
"INSERT INTO entries ( "INSERT INTO entries (
title, url, description, html_content, feed_id, created_at, updated_at title, url, description, html_content, feed_id, published_at, created_at, updated_at
) SELECT *, now(), now() FROM UNNEST($1::text[], $2::text[], $3::text[], $4::text[], $5::int[]) ) SELECT *, now(), now() FROM UNNEST($1::text[], $2::text[], $3::text[], $4::text[], $5::int[], $6::timestamp(3)[])
RETURNING *", RETURNING *",
titles.as_slice() as &[Option<String>], titles.as_slice() as &[Option<String>],
urls.as_slice(), urls.as_slice(),
descriptions.as_slice() as &[Option<String>], descriptions.as_slice() as &[Option<String>],
html_contents.as_slice() as &[Option<String>], html_contents.as_slice() as &[Option<String>],
feed_ids.as_slice(), feed_ids.as_slice(),
published_ats.as_slice(),
) )
.fetch_all(pool) .fetch_all(pool)
.await .await
@ -120,19 +165,24 @@ pub async fn upsert_entries(pool: &PgPool, payload: Vec<CreateEntry>) -> Result<
let mut descriptions: Vec<Option<String>> = Vec::with_capacity(payload.len()); let mut descriptions: Vec<Option<String>> = Vec::with_capacity(payload.len());
let mut html_contents: Vec<Option<String>> = Vec::with_capacity(payload.len()); let mut html_contents: Vec<Option<String>> = Vec::with_capacity(payload.len());
let mut feed_ids = Vec::with_capacity(payload.len()); let mut feed_ids = Vec::with_capacity(payload.len());
payload.iter().map(|entry| { let mut published_ats = Vec::with_capacity(payload.len());
titles.push(entry.title.clone()); payload
urls.push(entry.url.clone()); .iter()
descriptions.push(entry.description.clone()); .map(|entry| {
html_contents.push(entry.html_content.clone()); titles.push(entry.title.clone());
feed_ids.push(entry.feed_id); urls.push(entry.url.clone());
entry.validate() descriptions.push(entry.description.clone());
}).collect::<Result<Vec<()>, ValidationErrors>>()?; html_contents.push(entry.html_content.clone());
feed_ids.push(entry.feed_id);
published_ats.push(entry.published_at);
entry.validate()
})
.collect::<Result<Vec<()>, ValidationErrors>>()?;
sqlx::query_as!( sqlx::query_as!(
Entry, Entry,
"INSERT INTO entries ( "INSERT INTO entries (
title, url, description, html_content, feed_id, created_at, updated_at title, url, description, html_content, feed_id, published_at, created_at, updated_at
) SELECT *, now(), now() FROM UNNEST($1::text[], $2::text[], $3::text[], $4::text[], $5::int[]) ) SELECT *, now(), now() FROM UNNEST($1::text[], $2::text[], $3::text[], $4::text[], $5::int[], $6::timestamp(3)[])
ON CONFLICT DO NOTHING ON CONFLICT DO NOTHING
RETURNING *", RETURNING *",
titles.as_slice() as &[Option<String>], titles.as_slice() as &[Option<String>],
@ -140,6 +190,7 @@ pub async fn upsert_entries(pool: &PgPool, payload: Vec<CreateEntry>) -> Result<
descriptions.as_slice() as &[Option<String>], descriptions.as_slice() as &[Option<String>],
html_contents.as_slice() as &[Option<String>], html_contents.as_slice() as &[Option<String>],
feed_ids.as_slice(), feed_ids.as_slice(),
published_ats.as_slice(),
) )
.fetch_all(pool) .fetch_all(pool)
.await .await