Add very basic crawl job

Loops through feeds and adds items from each feed.
This commit is contained in:
2023-05-09 23:46:42 -04:00
parent 89fdf8f95a
commit ae8f15f19b
18 changed files with 327 additions and 56 deletions

View File

@@ -1,12 +1,13 @@
use anyhow::Result;
use argh::FromArgs;
use dotenvy::dotenv;
use tracing::info;
use sqlx::postgres::PgPoolOptions;
use std::env;
use tracing::info;
use lib::models::feed::{CreateFeed, FeedType};
use lib::commands::add_feed::add_feed;
use lib::jobs::crawl::crawl;
use lib::models::feed::{create_feed, delete_feed, CreateFeed, FeedType};
use lib::models::item::{create_item, delete_item, CreateItem};
#[derive(FromArgs)]
/// CLI for crawlect
@@ -19,6 +20,10 @@ struct Args {
#[argh(subcommand)]
enum Commands {
AddFeed(AddFeed),
DeleteFeed(DeleteFeed),
AddItem(AddItem),
DeleteItem(DeleteItem),
Crawl(Crawl),
}
#[derive(FromArgs)]
@@ -39,6 +44,46 @@ struct AddFeed {
description: Option<String>,
}
#[derive(FromArgs)]
/// Delete a feed from the database
#[argh(subcommand, name = "delete-feed")]
struct DeleteFeed {
#[argh(positional)]
/// id of the feed to delete
id: i32,
}
#[derive(FromArgs)]
/// Add an item to the database
#[argh(subcommand, name = "add-item")]
struct AddItem {
#[argh(option)]
/// title of the item (max 255 characters)
title: String,
#[argh(option)]
/// URL of the item (max 2048 characters)
url: String,
#[argh(option)]
/// description of the item
description: Option<String>,
#[argh(option)]
/// source feed for the item
feed_id: i32,
}
#[derive(FromArgs)]
/// Delete an item from the database
#[argh(subcommand, name = "delete-item")]
struct DeleteItem {
#[argh(positional)]
/// id of the item to delete
id: i32,
}
#[derive(FromArgs)]
/// Delete an item from the database
#[argh(subcommand, name = "crawl")]
struct Crawl {}
#[tokio::main]
pub async fn main() -> Result<()> {
@@ -53,13 +98,47 @@ pub async fn main() -> Result<()> {
let args: Args = argh::from_env();
if let Commands::AddFeed(add_feed_args) = args.commands {
add_feed(pool, CreateFeed {
title: add_feed_args.title,
url: add_feed_args.url,
feed_type: add_feed_args.feed_type,
description: add_feed_args.description,
}).await?;
info!("hello?");
match args.commands {
Commands::AddFeed(args) => {
let feed = create_feed(
&pool,
CreateFeed {
title: args.title,
url: args.url,
feed_type: args.feed_type,
description: args.description,
},
)
.await?;
info!("Created feed with id {}", feed.id);
}
Commands::DeleteFeed(args) => {
delete_feed(&pool, args.id).await?;
info!("Deleted feed with id {}", args.id);
}
Commands::AddItem(args) => {
let item = create_item(
&pool,
CreateItem {
title: args.title,
url: args.url,
description: args.description,
feed_id: args.feed_id,
},
)
.await?;
info!("Created item with id {}", item.id);
}
Commands::DeleteItem(args) => {
delete_item(&pool, args.id).await?;
info!("Deleted item with id {}", args.id);
}
Commands::Crawl(_) => {
info!("Crawling...");
crawl(&pool).await?;
}
}
Ok(())

View File

@@ -1,8 +0,0 @@
use sqlx::PgPool;
use crate::models::feed::{create_feed, CreateFeed, Feed};
use crate::error::Result;
pub async fn add_feed(pool: PgPool, payload: CreateFeed) -> Result<Feed> {
create_feed(pool, payload).await
}

View File

@@ -1 +0,0 @@
pub mod add_feed;

View File

@@ -25,8 +25,8 @@ pub enum Error {
#[error("{0}: {1} not found")]
NotFound(&'static str, i32),
#[error("referenced {0}: {1} not found")]
RelationNotFound(&'static str, i32),
#[error("referenced {0} not found")]
RelationNotFound(&'static str),
}
pub type Result<T, E = Error> = ::std::result::Result<T, E>;
@@ -69,7 +69,7 @@ impl Error {
match self {
NotFound(_, _) => StatusCode::NOT_FOUND,
Sqlx(_) | Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR,
InvalidEntity(_) | RelationNotFound(_, _) => StatusCode::UNPROCESSABLE_ENTITY,
InvalidEntity(_) | RelationNotFound(_) => StatusCode::UNPROCESSABLE_ENTITY,
}
}
}

View File

@@ -4,16 +4,20 @@ use axum::{
};
use sqlx::PgPool;
use crate::error::Error;
use crate::models::feed::{create_feed, get_feed, CreateFeed, Feed};
use crate::error::{Error, Result};
use crate::models::feed::{create_feed, get_feed, delete_feed, CreateFeed, Feed};
pub async fn get(State(pool): State<PgPool>, Path(id): Path<i32>) -> Result<Json<Feed>, Error> {
Ok(Json(get_feed(pool, id).await?))
pub async fn get(State(pool): State<PgPool>, Path(id): Path<i32>) -> Result<Json<Feed>> {
Ok(Json(get_feed(&pool, id).await?))
}
pub async fn post(
State(pool): State<PgPool>,
Json(payload): Json<CreateFeed>,
) -> Result<Json<Feed>, Error> {
Ok(Json(create_feed(pool, payload).await?))
Ok(Json(create_feed(&pool, payload).await?))
}
pub async fn delete(State(pool): State<PgPool>, Path(id): Path<i32>) -> Result<()> {
delete_feed(&pool, id).await
}

View File

@@ -5,5 +5,5 @@ use crate::error::Error;
use crate::models::feed::{get_feeds, Feed};
pub async fn get(State(pool): State<PgPool>) -> Result<Json<Vec<Feed>>, Error> {
Ok(Json(get_feeds(pool).await?))
Ok(Json(get_feeds(&pool).await?))
}

View File

@@ -8,12 +8,12 @@ use crate::error::Error;
use crate::models::item::{create_item, get_item, CreateItem, Item};
pub async fn get(State(pool): State<PgPool>, Path(id): Path<i32>) -> Result<Json<Item>, Error> {
Ok(Json(get_item(pool, id).await?))
Ok(Json(get_item(&pool, id).await?))
}
pub async fn post(
State(pool): State<PgPool>,
Json(payload): Json<CreateItem>,
) -> Result<Json<Item>, Error> {
Ok(Json(create_item(pool, payload).await?))
Ok(Json(create_item(&pool, payload).await?))
}

View File

@@ -5,5 +5,5 @@ use crate::error::Error;
use crate::models::item::{get_items, Item};
pub async fn get(State(pool): State<PgPool>) -> Result<Json<Vec<Item>>, Error> {
Ok(Json(get_items(pool).await?))
Ok(Json(get_items(&pool).await?))
}

36
src/jobs/crawl.rs Normal file
View File

@@ -0,0 +1,36 @@
use feed_rs::parser;
use reqwest::Client;
use sqlx::PgPool;
use tracing::info;
use crate::models::feed::get_feeds;
use crate::models::item::{upsert_items, CreateItem};
/// For every feed in the database, fetches the feed, parses it, and saves new items to the
/// database.
pub async fn crawl(pool: &PgPool) -> anyhow::Result<()> {
let client = Client::new();
let feeds = get_feeds(pool).await?;
for feed in feeds {
let bytes = client.get(feed.url).send().await?.bytes().await?;
let parsed_feed = parser::parse(&bytes[..])?;
let mut payload = Vec::with_capacity(parsed_feed.entries.len());
for entry in parsed_feed.entries {
let item = CreateItem {
title: entry
.title
.map_or_else(|| "No title".to_string(), |t| t.content),
url: entry
.links
.get(0)
.map_or_else(|| "https://example.com".to_string(), |l| l.href.clone()),
description: entry.summary.map(|s| s.content),
feed_id: feed.id,
};
payload.push(item);
}
let items = upsert_items(pool, payload).await?;
info!("Created {} items for feed {}", items.len(), feed.id);
}
Ok(())
}

1
src/jobs/mod.rs Normal file
View File

@@ -0,0 +1 @@
pub mod crawl;

View File

@@ -1,4 +1,4 @@
pub mod commands;
pub mod error;
pub mod handlers;
pub mod jobs;
pub mod models;

View File

@@ -51,7 +51,7 @@ pub struct CreateFeed {
pub description: Option<String>,
}
pub async fn get_feed(pool: PgPool, id: i32) -> Result<Feed> {
pub async fn get_feed(pool: &PgPool, id: i32) -> Result<Feed> {
sqlx::query_as!(
Feed,
// Unable to SELECT * here due to https://github.com/launchbadge/sqlx/issues/1004
@@ -67,7 +67,7 @@ pub async fn get_feed(pool: PgPool, id: i32) -> Result<Feed> {
FROM feeds WHERE id = $1"#,
id
)
.fetch_one(&pool)
.fetch_one(pool)
.await
.map_err(|error| {
if let sqlx::error::Error::RowNotFound = error {
@@ -77,7 +77,7 @@ pub async fn get_feed(pool: PgPool, id: i32) -> Result<Feed> {
})
}
pub async fn get_feeds(pool: PgPool) -> sqlx::Result<Vec<Feed>> {
pub async fn get_feeds(pool: &PgPool) -> sqlx::Result<Vec<Feed>> {
sqlx::query_as!(
Feed,
r#"SELECT
@@ -89,12 +89,14 @@ pub async fn get_feeds(pool: PgPool) -> sqlx::Result<Vec<Feed>> {
created_at,
updated_at,
deleted_at
FROM feeds"#)
.fetch_all(&pool)
.await
FROM feeds
WHERE deleted_at IS NULL"#
)
.fetch_all(pool)
.await
}
pub async fn create_feed(pool: PgPool, payload: CreateFeed) -> Result<Feed> {
pub async fn create_feed(pool: &PgPool, payload: CreateFeed) -> Result<Feed> {
payload.validate()?;
Ok(sqlx::query_as!(
Feed,
@@ -117,6 +119,13 @@ pub async fn create_feed(pool: PgPool, payload: CreateFeed) -> Result<Feed> {
payload.feed_type as FeedType,
payload.description
)
.fetch_one(&pool)
.fetch_one(pool)
.await?)
}
pub async fn delete_feed(pool: &PgPool, id: i32) -> Result<()> {
sqlx::query!("UPDATE feeds SET deleted_at = now() WHERE id = $1", id)
.execute(pool)
.await?;
Ok(())
}

View File

@@ -1,7 +1,7 @@
use chrono::NaiveDateTime;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use validator::Validate;
use validator::{Validate, ValidationErrors};
use crate::error::{Error, Result};
@@ -29,9 +29,9 @@ pub struct CreateItem {
pub feed_id: i32,
}
pub async fn get_item(pool: PgPool, id: i32) -> Result<Item> {
pub async fn get_item(pool: &PgPool, id: i32) -> Result<Item> {
sqlx::query_as!(Item, "SELECT * FROM items WHERE id = $1", id)
.fetch_one(&pool)
.fetch_one(pool)
.await
.map_err(|error| {
if let sqlx::error::Error::RowNotFound = error {
@@ -41,13 +41,13 @@ pub async fn get_item(pool: PgPool, id: i32) -> Result<Item> {
})
}
pub async fn get_items(pool: PgPool) -> sqlx::Result<Vec<Item>> {
sqlx::query_as!(Item, "SELECT * FROM items")
.fetch_all(&pool)
pub async fn get_items(pool: &PgPool) -> sqlx::Result<Vec<Item>> {
sqlx::query_as!(Item, "SELECT * FROM items WHERE deleted_at IS NULL")
.fetch_all(pool)
.await
}
pub async fn create_item(pool: PgPool, payload: CreateItem) -> Result<Item> {
pub async fn create_item(pool: &PgPool, payload: CreateItem) -> Result<Item> {
payload.validate()?;
sqlx::query_as!(
Item,
@@ -61,14 +61,92 @@ pub async fn create_item(pool: PgPool, payload: CreateItem) -> Result<Item> {
payload.description,
payload.feed_id,
)
.fetch_one(&pool)
.fetch_one(pool)
.await
.map_err(|error| {
if let sqlx::error::Error::Database(ref psql_error) = error {
if psql_error.code().as_deref() == Some("23503") {
return Error::RelationNotFound("feed", payload.feed_id);
return Error::RelationNotFound("feed");
}
}
Error::Sqlx(error)
})
}
pub async fn create_items(pool: &PgPool, payload: Vec<CreateItem>) -> Result<Vec<Item>> {
let mut titles = Vec::with_capacity(payload.len());
let mut urls = Vec::with_capacity(payload.len());
let mut descriptions: Vec<Option<String>> = Vec::with_capacity(payload.len());
let mut feed_ids = Vec::with_capacity(payload.len());
payload.iter().map(|item| {
titles.push(item.title.clone());
urls.push(item.url.clone());
descriptions.push(item.description.clone());
feed_ids.push(item.feed_id);
item.validate()
}).collect::<Result<Vec<()>, ValidationErrors>>()?;
sqlx::query_as!(
Item,
"INSERT INTO items (
title, url, description, feed_id, created_at, updated_at
) SELECT *, now(), now() FROM UNNEST($1::text[], $2::text[], $3::text[], $4::int[])
RETURNING *",
titles.as_slice(),
urls.as_slice(),
descriptions.as_slice() as &[Option<String>],
feed_ids.as_slice(),
)
.fetch_all(pool)
.await
.map_err(|error| {
if let sqlx::error::Error::Database(ref psql_error) = error {
if psql_error.code().as_deref() == Some("23503") {
return Error::RelationNotFound("feed");
}
}
Error::Sqlx(error)
})
}
pub async fn upsert_items(pool: &PgPool, payload: Vec<CreateItem>) -> Result<Vec<Item>> {
let mut titles = Vec::with_capacity(payload.len());
let mut urls = Vec::with_capacity(payload.len());
let mut descriptions: Vec<Option<String>> = Vec::with_capacity(payload.len());
let mut feed_ids = Vec::with_capacity(payload.len());
payload.iter().map(|item| {
titles.push(item.title.clone());
urls.push(item.url.clone());
descriptions.push(item.description.clone());
feed_ids.push(item.feed_id);
item.validate()
}).collect::<Result<Vec<()>, ValidationErrors>>()?;
sqlx::query_as!(
Item,
"INSERT INTO items (
title, url, description, feed_id, created_at, updated_at
) SELECT *, now(), now() FROM UNNEST($1::text[], $2::text[], $3::text[], $4::int[])
ON CONFLICT DO NOTHING
RETURNING *",
titles.as_slice(),
urls.as_slice(),
descriptions.as_slice() as &[Option<String>],
feed_ids.as_slice(),
)
.fetch_all(pool)
.await
.map_err(|error| {
if let sqlx::error::Error::Database(ref psql_error) = error {
if psql_error.code().as_deref() == Some("23503") {
return Error::RelationNotFound("feed");
}
}
Error::Sqlx(error)
})
}
pub async fn delete_item(pool: &PgPool, id: i32) -> Result<()> {
sqlx::query!("UPDATE items SET deleted_at = now() WHERE id = $1", id)
.execute(pool)
.await?;
Ok(())
}