From ae8f15f19b777dd5447272f158b7fd76d2197205 Mon Sep 17 00:00:00 2001 From: Tyler Hallada Date: Tue, 9 May 2023 23:46:42 -0400 Subject: [PATCH] Add very basic crawl job Loops through feeds and adds items from each feed. --- .env | 5 -- .gitignore | 1 + Cargo.lock | 74 +++++++++++++++++++- Cargo.toml | 3 +- migrations/20230507201612_initial.sql | 4 ++ src/bin/cli.rs | 99 ++++++++++++++++++++++++--- src/commands/add_feed.rs | 8 --- src/commands/mod.rs | 1 - src/error.rs | 6 +- src/handlers/feed.rs | 14 ++-- src/handlers/feeds.rs | 2 +- src/handlers/item.rs | 4 +- src/handlers/items.rs | 2 +- src/jobs/crawl.rs | 36 ++++++++++ src/jobs/mod.rs | 1 + src/lib.rs | 2 +- src/models/feed.rs | 25 ++++--- src/models/item.rs | 96 +++++++++++++++++++++++--- 18 files changed, 327 insertions(+), 56 deletions(-) delete mode 100644 .env delete mode 100644 src/commands/add_feed.rs delete mode 100644 src/commands/mod.rs create mode 100644 src/jobs/crawl.rs create mode 100644 src/jobs/mod.rs diff --git a/.env b/.env deleted file mode 100644 index 49a8834..0000000 --- a/.env +++ /dev/null @@ -1,5 +0,0 @@ -HOST=127.0.0.1 -PORT=3000 -DATABASE_URL=postgres://crawlect:nKd6kuHVGZEwCE6xqSdTQAuem3tc2a@winhost:5432/crawlect -DATABASE_MAX_CONNECTIONS=5 -RUST_LOG=crawlect=debug,tower_http=debug diff --git a/.gitignore b/.gitignore index ea8c4bf..fedaa2b 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ /target +.env diff --git a/Cargo.lock b/Cargo.lock index bf36694..3e60b84 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -257,6 +257,7 @@ dependencies = [ "axum", "chrono", "dotenvy", + "feed-rs", "reqwest", "serde", "serde_with", @@ -481,6 +482,24 @@ dependencies = [ "instant", ] +[[package]] +name = "feed-rs" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dbec361cb401c1b86aea784fb809073733da06b1a1fd794222e7bf9845db327" +dependencies = [ + "chrono", + "lazy_static", + "mime", + "quick-xml", + "regex", + "serde", + "serde_json", + "siphasher", + "url", + "uuid", +] + [[package]] name = "fnv" version = "1.0.7" @@ -910,6 +929,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata", +] + [[package]] name = "matches" version = "0.1.10" @@ -1221,6 +1249,16 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "quick-xml" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffc053f057dd768a56f62cd7e434c42c831d296968997e9ac1f76ea7c2d14c41" +dependencies = [ + "encoding_rs", + "memchr", +] + [[package]] name = "quote" version = "1.0.26" @@ -1297,9 +1335,24 @@ checksum = "af83e617f331cc6ae2da5443c602dfa5af81e517212d9d611a5b3ba1777b5370" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.7.1", ] +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", +] + +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.7.1" @@ -1533,6 +1586,12 @@ dependencies = [ "libc", ] +[[package]] +name = "siphasher" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" + [[package]] name = "slab" version = "0.4.8" @@ -1979,10 +2038,14 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" dependencies = [ + "matchers", "nu-ansi-term", + "once_cell", + "regex", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", ] @@ -2049,6 +2112,15 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "uuid" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dad5567ad0cf5b760e5665964bec1b47dfd077ba8a2544b513f3556d3d239a2" +dependencies = [ + "getrandom", +] + [[package]] name = "validator" version = "0.16.0" diff --git a/Cargo.toml b/Cargo.toml index 2d8f719..e4343df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ argh = "0.1" axum = "0.6" chrono = { version = "0.4", features = ["serde"] } dotenvy = "0.15" +feed-rs = "1.3" reqwest = { version = "0.11", features = ["json"] } serde = { version = "1", features = ["derive"] } serde_with = "3" @@ -31,5 +32,5 @@ tokio = { version = "1", features = ["full"] } tower = "0.4" tower-http = { version = "0.4", features = ["trace"] } tracing = "0.1" -tracing-subscriber = "0.3" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } validator = { version = "0.16", features = ["derive"] } diff --git a/migrations/20230507201612_initial.sql b/migrations/20230507201612_initial.sql index ec86200..12a9771 100644 --- a/migrations/20230507201612_initial.sql +++ b/migrations/20230507201612_initial.sql @@ -10,6 +10,8 @@ CREATE TABLE IF NOT EXISTS "feeds" ( "updated_at" timestamp(3) NOT NULL, "deleted_at" timestamp(3) ); +CREATE INDEX "feeds_deleted_at" ON "feeds" ("deleted_at"); +CREATE UNIQUE INDEX "feeds_url" ON "feeds" ("url"); CREATE TABLE IF NOT EXISTS "items" ( "id" SERIAL PRIMARY KEY NOT NULL, @@ -21,3 +23,5 @@ CREATE TABLE IF NOT EXISTS "items" ( "updated_at" timestamp(3) NOT NULL, "deleted_at" timestamp(3) ); +CREATE INDEX "items_deleted_at" ON "items" ("deleted_at"); +CREATE UNIQUE INDEX "items_url_and_feed_id" ON "items" ("url", "feed_id"); diff --git a/src/bin/cli.rs b/src/bin/cli.rs index 429ea2d..c712958 100644 --- a/src/bin/cli.rs +++ b/src/bin/cli.rs @@ -1,12 +1,13 @@ use anyhow::Result; use argh::FromArgs; use dotenvy::dotenv; -use tracing::info; use sqlx::postgres::PgPoolOptions; use std::env; +use tracing::info; -use lib::models::feed::{CreateFeed, FeedType}; -use lib::commands::add_feed::add_feed; +use lib::jobs::crawl::crawl; +use lib::models::feed::{create_feed, delete_feed, CreateFeed, FeedType}; +use lib::models::item::{create_item, delete_item, CreateItem}; #[derive(FromArgs)] /// CLI for crawlect @@ -19,6 +20,10 @@ struct Args { #[argh(subcommand)] enum Commands { AddFeed(AddFeed), + DeleteFeed(DeleteFeed), + AddItem(AddItem), + DeleteItem(DeleteItem), + Crawl(Crawl), } #[derive(FromArgs)] @@ -39,6 +44,46 @@ struct AddFeed { description: Option, } +#[derive(FromArgs)] +/// Delete a feed from the database +#[argh(subcommand, name = "delete-feed")] +struct DeleteFeed { + #[argh(positional)] + /// id of the feed to delete + id: i32, +} + +#[derive(FromArgs)] +/// Add an item to the database +#[argh(subcommand, name = "add-item")] +struct AddItem { + #[argh(option)] + /// title of the item (max 255 characters) + title: String, + #[argh(option)] + /// URL of the item (max 2048 characters) + url: String, + #[argh(option)] + /// description of the item + description: Option, + #[argh(option)] + /// source feed for the item + feed_id: i32, +} + +#[derive(FromArgs)] +/// Delete an item from the database +#[argh(subcommand, name = "delete-item")] +struct DeleteItem { + #[argh(positional)] + /// id of the item to delete + id: i32, +} + +#[derive(FromArgs)] +/// Delete an item from the database +#[argh(subcommand, name = "crawl")] +struct Crawl {} #[tokio::main] pub async fn main() -> Result<()> { @@ -53,13 +98,47 @@ pub async fn main() -> Result<()> { let args: Args = argh::from_env(); - if let Commands::AddFeed(add_feed_args) = args.commands { - add_feed(pool, CreateFeed { - title: add_feed_args.title, - url: add_feed_args.url, - feed_type: add_feed_args.feed_type, - description: add_feed_args.description, - }).await?; + info!("hello?"); + + match args.commands { + Commands::AddFeed(args) => { + let feed = create_feed( + &pool, + CreateFeed { + title: args.title, + url: args.url, + feed_type: args.feed_type, + description: args.description, + }, + ) + .await?; + info!("Created feed with id {}", feed.id); + } + Commands::DeleteFeed(args) => { + delete_feed(&pool, args.id).await?; + info!("Deleted feed with id {}", args.id); + } + Commands::AddItem(args) => { + let item = create_item( + &pool, + CreateItem { + title: args.title, + url: args.url, + description: args.description, + feed_id: args.feed_id, + }, + ) + .await?; + info!("Created item with id {}", item.id); + } + Commands::DeleteItem(args) => { + delete_item(&pool, args.id).await?; + info!("Deleted item with id {}", args.id); + } + Commands::Crawl(_) => { + info!("Crawling..."); + crawl(&pool).await?; + } } Ok(()) diff --git a/src/commands/add_feed.rs b/src/commands/add_feed.rs deleted file mode 100644 index 700f750..0000000 --- a/src/commands/add_feed.rs +++ /dev/null @@ -1,8 +0,0 @@ -use sqlx::PgPool; - -use crate::models::feed::{create_feed, CreateFeed, Feed}; -use crate::error::Result; - -pub async fn add_feed(pool: PgPool, payload: CreateFeed) -> Result { - create_feed(pool, payload).await -} diff --git a/src/commands/mod.rs b/src/commands/mod.rs deleted file mode 100644 index ace59ec..0000000 --- a/src/commands/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod add_feed; diff --git a/src/error.rs b/src/error.rs index 4cfcc27..7b532bc 100644 --- a/src/error.rs +++ b/src/error.rs @@ -25,8 +25,8 @@ pub enum Error { #[error("{0}: {1} not found")] NotFound(&'static str, i32), - #[error("referenced {0}: {1} not found")] - RelationNotFound(&'static str, i32), + #[error("referenced {0} not found")] + RelationNotFound(&'static str), } pub type Result = ::std::result::Result; @@ -69,7 +69,7 @@ impl Error { match self { NotFound(_, _) => StatusCode::NOT_FOUND, Sqlx(_) | Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR, - InvalidEntity(_) | RelationNotFound(_, _) => StatusCode::UNPROCESSABLE_ENTITY, + InvalidEntity(_) | RelationNotFound(_) => StatusCode::UNPROCESSABLE_ENTITY, } } } diff --git a/src/handlers/feed.rs b/src/handlers/feed.rs index 33b1659..daecb16 100644 --- a/src/handlers/feed.rs +++ b/src/handlers/feed.rs @@ -4,16 +4,20 @@ use axum::{ }; use sqlx::PgPool; -use crate::error::Error; -use crate::models::feed::{create_feed, get_feed, CreateFeed, Feed}; +use crate::error::{Error, Result}; +use crate::models::feed::{create_feed, get_feed, delete_feed, CreateFeed, Feed}; -pub async fn get(State(pool): State, Path(id): Path) -> Result, Error> { - Ok(Json(get_feed(pool, id).await?)) +pub async fn get(State(pool): State, Path(id): Path) -> Result> { + Ok(Json(get_feed(&pool, id).await?)) } pub async fn post( State(pool): State, Json(payload): Json, ) -> Result, Error> { - Ok(Json(create_feed(pool, payload).await?)) + Ok(Json(create_feed(&pool, payload).await?)) +} + +pub async fn delete(State(pool): State, Path(id): Path) -> Result<()> { + delete_feed(&pool, id).await } diff --git a/src/handlers/feeds.rs b/src/handlers/feeds.rs index 7473152..1507648 100644 --- a/src/handlers/feeds.rs +++ b/src/handlers/feeds.rs @@ -5,5 +5,5 @@ use crate::error::Error; use crate::models::feed::{get_feeds, Feed}; pub async fn get(State(pool): State) -> Result>, Error> { - Ok(Json(get_feeds(pool).await?)) + Ok(Json(get_feeds(&pool).await?)) } diff --git a/src/handlers/item.rs b/src/handlers/item.rs index 2fb0319..3c08093 100644 --- a/src/handlers/item.rs +++ b/src/handlers/item.rs @@ -8,12 +8,12 @@ use crate::error::Error; use crate::models::item::{create_item, get_item, CreateItem, Item}; pub async fn get(State(pool): State, Path(id): Path) -> Result, Error> { - Ok(Json(get_item(pool, id).await?)) + Ok(Json(get_item(&pool, id).await?)) } pub async fn post( State(pool): State, Json(payload): Json, ) -> Result, Error> { - Ok(Json(create_item(pool, payload).await?)) + Ok(Json(create_item(&pool, payload).await?)) } diff --git a/src/handlers/items.rs b/src/handlers/items.rs index b6e2948..63485b9 100644 --- a/src/handlers/items.rs +++ b/src/handlers/items.rs @@ -5,5 +5,5 @@ use crate::error::Error; use crate::models::item::{get_items, Item}; pub async fn get(State(pool): State) -> Result>, Error> { - Ok(Json(get_items(pool).await?)) + Ok(Json(get_items(&pool).await?)) } diff --git a/src/jobs/crawl.rs b/src/jobs/crawl.rs new file mode 100644 index 0000000..bbc6305 --- /dev/null +++ b/src/jobs/crawl.rs @@ -0,0 +1,36 @@ +use feed_rs::parser; +use reqwest::Client; +use sqlx::PgPool; +use tracing::info; + +use crate::models::feed::get_feeds; +use crate::models::item::{upsert_items, CreateItem}; + +/// For every feed in the database, fetches the feed, parses it, and saves new items to the +/// database. +pub async fn crawl(pool: &PgPool) -> anyhow::Result<()> { + let client = Client::new(); + let feeds = get_feeds(pool).await?; + for feed in feeds { + let bytes = client.get(feed.url).send().await?.bytes().await?; + let parsed_feed = parser::parse(&bytes[..])?; + let mut payload = Vec::with_capacity(parsed_feed.entries.len()); + for entry in parsed_feed.entries { + let item = CreateItem { + title: entry + .title + .map_or_else(|| "No title".to_string(), |t| t.content), + url: entry + .links + .get(0) + .map_or_else(|| "https://example.com".to_string(), |l| l.href.clone()), + description: entry.summary.map(|s| s.content), + feed_id: feed.id, + }; + payload.push(item); + } + let items = upsert_items(pool, payload).await?; + info!("Created {} items for feed {}", items.len(), feed.id); + } + Ok(()) +} diff --git a/src/jobs/mod.rs b/src/jobs/mod.rs new file mode 100644 index 0000000..f274818 --- /dev/null +++ b/src/jobs/mod.rs @@ -0,0 +1 @@ +pub mod crawl; diff --git a/src/lib.rs b/src/lib.rs index 91c0470..e5592a0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -pub mod commands; pub mod error; pub mod handlers; +pub mod jobs; pub mod models; diff --git a/src/models/feed.rs b/src/models/feed.rs index 81c5ba4..fd7861c 100644 --- a/src/models/feed.rs +++ b/src/models/feed.rs @@ -51,7 +51,7 @@ pub struct CreateFeed { pub description: Option, } -pub async fn get_feed(pool: PgPool, id: i32) -> Result { +pub async fn get_feed(pool: &PgPool, id: i32) -> Result { sqlx::query_as!( Feed, // Unable to SELECT * here due to https://github.com/launchbadge/sqlx/issues/1004 @@ -67,7 +67,7 @@ pub async fn get_feed(pool: PgPool, id: i32) -> Result { FROM feeds WHERE id = $1"#, id ) - .fetch_one(&pool) + .fetch_one(pool) .await .map_err(|error| { if let sqlx::error::Error::RowNotFound = error { @@ -77,7 +77,7 @@ pub async fn get_feed(pool: PgPool, id: i32) -> Result { }) } -pub async fn get_feeds(pool: PgPool) -> sqlx::Result> { +pub async fn get_feeds(pool: &PgPool) -> sqlx::Result> { sqlx::query_as!( Feed, r#"SELECT @@ -89,12 +89,14 @@ pub async fn get_feeds(pool: PgPool) -> sqlx::Result> { created_at, updated_at, deleted_at - FROM feeds"#) - .fetch_all(&pool) - .await + FROM feeds + WHERE deleted_at IS NULL"# + ) + .fetch_all(pool) + .await } -pub async fn create_feed(pool: PgPool, payload: CreateFeed) -> Result { +pub async fn create_feed(pool: &PgPool, payload: CreateFeed) -> Result { payload.validate()?; Ok(sqlx::query_as!( Feed, @@ -117,6 +119,13 @@ pub async fn create_feed(pool: PgPool, payload: CreateFeed) -> Result { payload.feed_type as FeedType, payload.description ) - .fetch_one(&pool) + .fetch_one(pool) .await?) } + +pub async fn delete_feed(pool: &PgPool, id: i32) -> Result<()> { + sqlx::query!("UPDATE feeds SET deleted_at = now() WHERE id = $1", id) + .execute(pool) + .await?; + Ok(()) +} diff --git a/src/models/item.rs b/src/models/item.rs index 92ef918..1b8d291 100644 --- a/src/models/item.rs +++ b/src/models/item.rs @@ -1,7 +1,7 @@ use chrono::NaiveDateTime; use serde::{Deserialize, Serialize}; use sqlx::PgPool; -use validator::Validate; +use validator::{Validate, ValidationErrors}; use crate::error::{Error, Result}; @@ -29,9 +29,9 @@ pub struct CreateItem { pub feed_id: i32, } -pub async fn get_item(pool: PgPool, id: i32) -> Result { +pub async fn get_item(pool: &PgPool, id: i32) -> Result { sqlx::query_as!(Item, "SELECT * FROM items WHERE id = $1", id) - .fetch_one(&pool) + .fetch_one(pool) .await .map_err(|error| { if let sqlx::error::Error::RowNotFound = error { @@ -41,13 +41,13 @@ pub async fn get_item(pool: PgPool, id: i32) -> Result { }) } -pub async fn get_items(pool: PgPool) -> sqlx::Result> { - sqlx::query_as!(Item, "SELECT * FROM items") - .fetch_all(&pool) +pub async fn get_items(pool: &PgPool) -> sqlx::Result> { + sqlx::query_as!(Item, "SELECT * FROM items WHERE deleted_at IS NULL") + .fetch_all(pool) .await } -pub async fn create_item(pool: PgPool, payload: CreateItem) -> Result { +pub async fn create_item(pool: &PgPool, payload: CreateItem) -> Result { payload.validate()?; sqlx::query_as!( Item, @@ -61,14 +61,92 @@ pub async fn create_item(pool: PgPool, payload: CreateItem) -> Result { payload.description, payload.feed_id, ) - .fetch_one(&pool) + .fetch_one(pool) .await .map_err(|error| { if let sqlx::error::Error::Database(ref psql_error) = error { if psql_error.code().as_deref() == Some("23503") { - return Error::RelationNotFound("feed", payload.feed_id); + return Error::RelationNotFound("feed"); } } Error::Sqlx(error) }) } + +pub async fn create_items(pool: &PgPool, payload: Vec) -> Result> { + let mut titles = Vec::with_capacity(payload.len()); + let mut urls = Vec::with_capacity(payload.len()); + let mut descriptions: Vec> = Vec::with_capacity(payload.len()); + let mut feed_ids = Vec::with_capacity(payload.len()); + payload.iter().map(|item| { + titles.push(item.title.clone()); + urls.push(item.url.clone()); + descriptions.push(item.description.clone()); + feed_ids.push(item.feed_id); + item.validate() + }).collect::, ValidationErrors>>()?; + sqlx::query_as!( + Item, + "INSERT INTO items ( + title, url, description, feed_id, created_at, updated_at + ) SELECT *, now(), now() FROM UNNEST($1::text[], $2::text[], $3::text[], $4::int[]) + RETURNING *", + titles.as_slice(), + urls.as_slice(), + descriptions.as_slice() as &[Option], + feed_ids.as_slice(), + ) + .fetch_all(pool) + .await + .map_err(|error| { + if let sqlx::error::Error::Database(ref psql_error) = error { + if psql_error.code().as_deref() == Some("23503") { + return Error::RelationNotFound("feed"); + } + } + Error::Sqlx(error) + }) +} + +pub async fn upsert_items(pool: &PgPool, payload: Vec) -> Result> { + let mut titles = Vec::with_capacity(payload.len()); + let mut urls = Vec::with_capacity(payload.len()); + let mut descriptions: Vec> = Vec::with_capacity(payload.len()); + let mut feed_ids = Vec::with_capacity(payload.len()); + payload.iter().map(|item| { + titles.push(item.title.clone()); + urls.push(item.url.clone()); + descriptions.push(item.description.clone()); + feed_ids.push(item.feed_id); + item.validate() + }).collect::, ValidationErrors>>()?; + sqlx::query_as!( + Item, + "INSERT INTO items ( + title, url, description, feed_id, created_at, updated_at + ) SELECT *, now(), now() FROM UNNEST($1::text[], $2::text[], $3::text[], $4::int[]) + ON CONFLICT DO NOTHING + RETURNING *", + titles.as_slice(), + urls.as_slice(), + descriptions.as_slice() as &[Option], + feed_ids.as_slice(), + ) + .fetch_all(pool) + .await + .map_err(|error| { + if let sqlx::error::Error::Database(ref psql_error) = error { + if psql_error.code().as_deref() == Some("23503") { + return Error::RelationNotFound("feed"); + } + } + Error::Sqlx(error) + }) +} + +pub async fn delete_item(pool: &PgPool, id: i32) -> Result<()> { + sqlx::query!("UPDATE items SET deleted_at = now() WHERE id = $1", id) + .execute(pool) + .await?; + Ok(()) +}