From 6912ef9017d0197671b7e158a7268675cb77e7ee Mon Sep 17 00:00:00 2001 From: Tyler Hallada Date: Tue, 27 Aug 2024 21:54:14 -0400 Subject: [PATCH] Add crawl_entry job --- src/bin/web.rs | 2 +- src/bin/worker.rs | 2 ++ src/jobs/crawl_entry.rs | 62 +++++++++++++++++++++++++++++++++++++++++ src/jobs/crawl_feed.rs | 23 +++++++-------- src/jobs/mod.rs | 15 ++++++++-- 5 files changed, 87 insertions(+), 17 deletions(-) create mode 100644 src/jobs/crawl_entry.rs diff --git a/src/bin/web.rs b/src/bin/web.rs index e536b03..9c5ebf9 100644 --- a/src/bin/web.rs +++ b/src/bin/web.rs @@ -114,7 +114,7 @@ async fn main() -> Result<()> { config.content_dir.clone(), crawls.clone(), ); - let _ = crawl_scheduler.bootstrap().await; + // let _ = crawl_scheduler.bootstrap().await; let importer = ImporterHandle::new(db.clone(), crawl_scheduler.clone(), imports.clone()); let ip_source_extension = config.ip_source.0.clone().into_extension(); diff --git a/src/bin/worker.rs b/src/bin/worker.rs index b8e88f1..363a713 100644 --- a/src/bin/worker.rs +++ b/src/bin/worker.rs @@ -49,6 +49,8 @@ async fn main() -> Result<()> { .data(http_client) .data(db) .data(domain_request_limiter) + .data(config) + .data(apalis_storage.clone()) .backend(apalis_storage) .build_fn(handle_async_job) }) diff --git a/src/jobs/crawl_entry.rs b/src/jobs/crawl_entry.rs new file mode 100644 index 0000000..dc5a9e6 --- /dev/null +++ b/src/jobs/crawl_entry.rs @@ -0,0 +1,62 @@ +use std::fs; +use std::path::Path; + +use ammonia::clean; +use anyhow::{anyhow, Result}; +use apalis::prelude::*; +use bytes::Buf; +use readability::extractor; +use reqwest::Client; +use serde::{Deserialize, Serialize}; +use sqlx::PgPool; +use tracing::{info, instrument}; +use url::Url; +use uuid::Uuid; + +use crate::config::Config; +use crate::domain_request_limiter::DomainRequestLimiter; +use crate::models::entry::Entry; + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct CrawlEntryJob { + pub entry_id: Uuid, +} + +#[instrument(skip_all, fields(entry_id = %entry_id))] +pub async fn crawl_entry( + CrawlEntryJob { entry_id }: CrawlEntryJob, + http_client: Data, + db: Data, + domain_request_limiter: Data, + config: Data, +) -> Result<()> { + let entry = Entry::get(&*db, entry_id).await?; + info!("got entry from db"); + let content_dir = Path::new(&*config.content_dir); + let url = Url::parse(&entry.url)?; + let domain = url + .domain() + .ok_or(anyhow!("invalid url: {:?}", entry.url.clone()))?; + info!(url=%url, "starting fetch"); + domain_request_limiter.acquire(domain).await?; + let bytes = http_client.get(url.clone()).send().await?.bytes().await?; + info!(url=%url, "fetched entry"); + let article = extractor::extract(&mut bytes.reader(), &url)?; + info!("extracted content"); + let id = entry.entry_id; + // TODO: update entry with scraped data + // if let Some(date) = article.date { + // // prefer scraped date over rss feed date + // let mut updated_entry = entry.clone(); + // updated_entry.published_at = date; + // entry = update_entry(&self.pool, updated_entry) + // .await + // .map_err(|_| EntryCrawlerError::CreateEntryError(entry.url.clone()))?; + // }; + let content = clean(&article.content); + info!("sanitized content"); + fs::write(content_dir.join(format!("{}.html", id)), content)?; + fs::write(content_dir.join(format!("{}.txt", id)), article.text)?; + info!("saved content to filesystem"); + Ok(()) +} diff --git a/src/jobs/crawl_feed.rs b/src/jobs/crawl_feed.rs index 5ffba26..dc48e61 100644 --- a/src/jobs/crawl_feed.rs +++ b/src/jobs/crawl_feed.rs @@ -2,6 +2,7 @@ use std::cmp::Ordering; use anyhow::{anyhow, Result}; use apalis::prelude::*; +use apalis_redis::RedisStorage; use chrono::{Duration, Utc}; use feed_rs::parser; use http::{header, HeaderMap, StatusCode}; @@ -13,6 +14,7 @@ use url::Url; use uuid::Uuid; use crate::domain_request_limiter::DomainRequestLimiter; +use crate::jobs::{AsyncJob, CrawlEntryJob}; use crate::models::entry::{CreateEntry, Entry}; use crate::models::feed::{Feed, MAX_CRAWL_INTERVAL_MINUTES, MIN_CRAWL_INTERVAL_MINUTES}; @@ -27,6 +29,7 @@ pub async fn crawl_feed( http_client: Data, db: Data, domain_request_limiter: Data, + apalis: Data>, ) -> Result<()> { let mut feed = Feed::get(&*db, feed_id).await?; info!("got feed from db"); @@ -149,7 +152,7 @@ pub async fn crawl_feed( for entry in parsed_feed.entries { let entry_span = info_span!("entry", id = entry.id); let _entry_span_guard = entry_span.enter(); - if let Some(link) = entry.links.get(0) { + if let Some(link) = entry.links.first() { // if no scraped or feed date is available, fallback to the current time let published_at = entry.published.unwrap_or_else(Utc::now); let entry = CreateEntry { @@ -171,18 +174,12 @@ pub async fn crawl_feed( info!(new = new.len(), updated = updated.len(), "saved entries"); for entry in new { - // TODO: queue the entry crawls - // - // let entry_crawler = EntryCrawlerHandle::new( - // self.pool.clone(), - // self.client.clone(), - // self.domain_locks.clone(), - // self.content_dir.clone(), - // ); - // let mut entry_receiver = entry_crawler.crawl(entry).await; - // while let Ok(EntryCrawlerHandleMessage::Entry(result)) = entry_receiver.recv().await { - // let _ = respond_to.send(FeedCrawlerHandleMessage::Entry(result)); - // } + (*apalis) + .clone() // TODO: clone bad? + .push(AsyncJob::CrawlEntry(CrawlEntryJob { + entry_id: entry.entry_id, + })) + .await?; } Ok(()) } diff --git a/src/jobs/mod.rs b/src/jobs/mod.rs index 34003d5..6d266fb 100644 --- a/src/jobs/mod.rs +++ b/src/jobs/mod.rs @@ -1,20 +1,24 @@ use apalis::prelude::*; +use apalis_redis::RedisStorage; use reqwest::Client; use serde::{Deserialize, Serialize}; use sqlx::PgPool; use thiserror::Error; use tracing::{error, info, instrument}; +mod crawl_entry; mod crawl_feed; +pub use crawl_entry::CrawlEntryJob; pub use crawl_feed::CrawlFeedJob; -use crate::domain_request_limiter::DomainRequestLimiter; +use crate::{config::Config, domain_request_limiter::DomainRequestLimiter}; #[derive(Debug, Deserialize, Serialize, Clone)] pub enum AsyncJob { HelloWorld(String), CrawlFeed(CrawlFeedJob), + CrawlEntry(CrawlEntryJob), } #[derive(Debug, Error)] @@ -23,7 +27,7 @@ pub enum AsyncJobError { JobError(#[from] anyhow::Error), } -#[instrument(skip(job, worker_id), fields(worker_id = %worker_id))] +#[instrument(skip_all, fields(worker_id = %worker_id))] pub async fn handle_async_job( job: AsyncJob, worker_id: WorkerId, @@ -33,6 +37,8 @@ pub async fn handle_async_job( http_client: Data, db: Data, domain_request_limiter: Data, + config: Data, + apalis: Data>, ) -> Result<(), AsyncJobError> { let result = match job { AsyncJob::HelloWorld(name) => { @@ -40,7 +46,10 @@ pub async fn handle_async_job( Ok(()) } AsyncJob::CrawlFeed(job) => { - crawl_feed::crawl_feed(job, http_client, db, domain_request_limiter).await + crawl_feed::crawl_feed(job, http_client, db, domain_request_limiter, apalis).await + } + AsyncJob::CrawlEntry(job) => { + crawl_entry::crawl_entry(job, http_client, db, domain_request_limiter, config).await } };