Add crawl_entry job

This commit is contained in:
Tyler Hallada 2024-08-27 21:54:14 -04:00
parent 65eac1975c
commit 6912ef9017
5 changed files with 87 additions and 17 deletions

View File

@ -114,7 +114,7 @@ async fn main() -> Result<()> {
config.content_dir.clone(), config.content_dir.clone(),
crawls.clone(), crawls.clone(),
); );
let _ = crawl_scheduler.bootstrap().await; // let _ = crawl_scheduler.bootstrap().await;
let importer = ImporterHandle::new(db.clone(), crawl_scheduler.clone(), imports.clone()); let importer = ImporterHandle::new(db.clone(), crawl_scheduler.clone(), imports.clone());
let ip_source_extension = config.ip_source.0.clone().into_extension(); let ip_source_extension = config.ip_source.0.clone().into_extension();

View File

@ -49,6 +49,8 @@ async fn main() -> Result<()> {
.data(http_client) .data(http_client)
.data(db) .data(db)
.data(domain_request_limiter) .data(domain_request_limiter)
.data(config)
.data(apalis_storage.clone())
.backend(apalis_storage) .backend(apalis_storage)
.build_fn(handle_async_job) .build_fn(handle_async_job)
}) })

62
src/jobs/crawl_entry.rs Normal file
View File

@ -0,0 +1,62 @@
use std::fs;
use std::path::Path;
use ammonia::clean;
use anyhow::{anyhow, Result};
use apalis::prelude::*;
use bytes::Buf;
use readability::extractor;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use tracing::{info, instrument};
use url::Url;
use uuid::Uuid;
use crate::config::Config;
use crate::domain_request_limiter::DomainRequestLimiter;
use crate::models::entry::Entry;
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct CrawlEntryJob {
pub entry_id: Uuid,
}
#[instrument(skip_all, fields(entry_id = %entry_id))]
pub async fn crawl_entry(
CrawlEntryJob { entry_id }: CrawlEntryJob,
http_client: Data<Client>,
db: Data<PgPool>,
domain_request_limiter: Data<DomainRequestLimiter>,
config: Data<Config>,
) -> Result<()> {
let entry = Entry::get(&*db, entry_id).await?;
info!("got entry from db");
let content_dir = Path::new(&*config.content_dir);
let url = Url::parse(&entry.url)?;
let domain = url
.domain()
.ok_or(anyhow!("invalid url: {:?}", entry.url.clone()))?;
info!(url=%url, "starting fetch");
domain_request_limiter.acquire(domain).await?;
let bytes = http_client.get(url.clone()).send().await?.bytes().await?;
info!(url=%url, "fetched entry");
let article = extractor::extract(&mut bytes.reader(), &url)?;
info!("extracted content");
let id = entry.entry_id;
// TODO: update entry with scraped data
// if let Some(date) = article.date {
// // prefer scraped date over rss feed date
// let mut updated_entry = entry.clone();
// updated_entry.published_at = date;
// entry = update_entry(&self.pool, updated_entry)
// .await
// .map_err(|_| EntryCrawlerError::CreateEntryError(entry.url.clone()))?;
// };
let content = clean(&article.content);
info!("sanitized content");
fs::write(content_dir.join(format!("{}.html", id)), content)?;
fs::write(content_dir.join(format!("{}.txt", id)), article.text)?;
info!("saved content to filesystem");
Ok(())
}

View File

@ -2,6 +2,7 @@ use std::cmp::Ordering;
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use apalis::prelude::*; use apalis::prelude::*;
use apalis_redis::RedisStorage;
use chrono::{Duration, Utc}; use chrono::{Duration, Utc};
use feed_rs::parser; use feed_rs::parser;
use http::{header, HeaderMap, StatusCode}; use http::{header, HeaderMap, StatusCode};
@ -13,6 +14,7 @@ use url::Url;
use uuid::Uuid; use uuid::Uuid;
use crate::domain_request_limiter::DomainRequestLimiter; use crate::domain_request_limiter::DomainRequestLimiter;
use crate::jobs::{AsyncJob, CrawlEntryJob};
use crate::models::entry::{CreateEntry, Entry}; use crate::models::entry::{CreateEntry, Entry};
use crate::models::feed::{Feed, MAX_CRAWL_INTERVAL_MINUTES, MIN_CRAWL_INTERVAL_MINUTES}; use crate::models::feed::{Feed, MAX_CRAWL_INTERVAL_MINUTES, MIN_CRAWL_INTERVAL_MINUTES};
@ -27,6 +29,7 @@ pub async fn crawl_feed(
http_client: Data<Client>, http_client: Data<Client>,
db: Data<PgPool>, db: Data<PgPool>,
domain_request_limiter: Data<DomainRequestLimiter>, domain_request_limiter: Data<DomainRequestLimiter>,
apalis: Data<RedisStorage<AsyncJob>>,
) -> Result<()> { ) -> Result<()> {
let mut feed = Feed::get(&*db, feed_id).await?; let mut feed = Feed::get(&*db, feed_id).await?;
info!("got feed from db"); info!("got feed from db");
@ -149,7 +152,7 @@ pub async fn crawl_feed(
for entry in parsed_feed.entries { for entry in parsed_feed.entries {
let entry_span = info_span!("entry", id = entry.id); let entry_span = info_span!("entry", id = entry.id);
let _entry_span_guard = entry_span.enter(); let _entry_span_guard = entry_span.enter();
if let Some(link) = entry.links.get(0) { if let Some(link) = entry.links.first() {
// if no scraped or feed date is available, fallback to the current time // if no scraped or feed date is available, fallback to the current time
let published_at = entry.published.unwrap_or_else(Utc::now); let published_at = entry.published.unwrap_or_else(Utc::now);
let entry = CreateEntry { let entry = CreateEntry {
@ -171,18 +174,12 @@ pub async fn crawl_feed(
info!(new = new.len(), updated = updated.len(), "saved entries"); info!(new = new.len(), updated = updated.len(), "saved entries");
for entry in new { for entry in new {
// TODO: queue the entry crawls (*apalis)
// .clone() // TODO: clone bad?
// let entry_crawler = EntryCrawlerHandle::new( .push(AsyncJob::CrawlEntry(CrawlEntryJob {
// self.pool.clone(), entry_id: entry.entry_id,
// self.client.clone(), }))
// self.domain_locks.clone(), .await?;
// self.content_dir.clone(),
// );
// let mut entry_receiver = entry_crawler.crawl(entry).await;
// while let Ok(EntryCrawlerHandleMessage::Entry(result)) = entry_receiver.recv().await {
// let _ = respond_to.send(FeedCrawlerHandleMessage::Entry(result));
// }
} }
Ok(()) Ok(())
} }

View File

@ -1,20 +1,24 @@
use apalis::prelude::*; use apalis::prelude::*;
use apalis_redis::RedisStorage;
use reqwest::Client; use reqwest::Client;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sqlx::PgPool; use sqlx::PgPool;
use thiserror::Error; use thiserror::Error;
use tracing::{error, info, instrument}; use tracing::{error, info, instrument};
mod crawl_entry;
mod crawl_feed; mod crawl_feed;
pub use crawl_entry::CrawlEntryJob;
pub use crawl_feed::CrawlFeedJob; pub use crawl_feed::CrawlFeedJob;
use crate::domain_request_limiter::DomainRequestLimiter; use crate::{config::Config, domain_request_limiter::DomainRequestLimiter};
#[derive(Debug, Deserialize, Serialize, Clone)] #[derive(Debug, Deserialize, Serialize, Clone)]
pub enum AsyncJob { pub enum AsyncJob {
HelloWorld(String), HelloWorld(String),
CrawlFeed(CrawlFeedJob), CrawlFeed(CrawlFeedJob),
CrawlEntry(CrawlEntryJob),
} }
#[derive(Debug, Error)] #[derive(Debug, Error)]
@ -23,7 +27,7 @@ pub enum AsyncJobError {
JobError(#[from] anyhow::Error), JobError(#[from] anyhow::Error),
} }
#[instrument(skip(job, worker_id), fields(worker_id = %worker_id))] #[instrument(skip_all, fields(worker_id = %worker_id))]
pub async fn handle_async_job( pub async fn handle_async_job(
job: AsyncJob, job: AsyncJob,
worker_id: WorkerId, worker_id: WorkerId,
@ -33,6 +37,8 @@ pub async fn handle_async_job(
http_client: Data<Client>, http_client: Data<Client>,
db: Data<PgPool>, db: Data<PgPool>,
domain_request_limiter: Data<DomainRequestLimiter>, domain_request_limiter: Data<DomainRequestLimiter>,
config: Data<Config>,
apalis: Data<RedisStorage<AsyncJob>>,
) -> Result<(), AsyncJobError> { ) -> Result<(), AsyncJobError> {
let result = match job { let result = match job {
AsyncJob::HelloWorld(name) => { AsyncJob::HelloWorld(name) => {
@ -40,7 +46,10 @@ pub async fn handle_async_job(
Ok(()) Ok(())
} }
AsyncJob::CrawlFeed(job) => { AsyncJob::CrawlFeed(job) => {
crawl_feed::crawl_feed(job, http_client, db, domain_request_limiter).await crawl_feed::crawl_feed(job, http_client, db, domain_request_limiter, apalis).await
}
AsyncJob::CrawlEntry(job) => {
crawl_entry::crawl_entry(job, http_client, db, domain_request_limiter, config).await
} }
}; };