diff --git a/src/bin/crawler.rs b/src/bin/crawler.rs index ca9dd51..60b9412 100644 --- a/src/bin/crawler.rs +++ b/src/bin/crawler.rs @@ -5,8 +5,7 @@ use apalis_cron::{CronStream, Schedule}; use apalis_redis::RedisStorage; use chrono::{DateTime, Utc}; use clap::Parser; -use lib::jobs::AsyncJob; -use lib::models::feed::{Feed, GetFeedsOptions}; +use dotenvy::dotenv; use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; use std::str::FromStr; @@ -14,9 +13,10 @@ use std::sync::Arc; use thiserror::Error; use tracing::{info, instrument}; -use dotenvy::dotenv; use lib::config::Config; +use lib::jobs::{AsyncJob, CrawlFeedJob}; use lib::log::init_worker_tracing; +use lib::models::feed::{Feed, GetFeedsOptions}; #[derive(Default, Debug, Clone)] struct Crawl(DateTime); @@ -64,8 +64,9 @@ pub async fn crawl_fn(job: Crawl, state: Data>) -> Result<(), CrawlEr for feed in feeds.into_iter() { // self.spawn_crawler_loop(feed, respond_to.clone()); + // TODO: implement uniqueness on jobs per feed for ~1 minute apalis - .push(AsyncJob::HelloWorld(feed.feed_id.to_string())) + .push(AsyncJob::CrawlFeed(CrawlFeedJob { feed })) .await .map_err(|err| CrawlError::QueueJobError(err.to_string()))?; } diff --git a/src/bin/worker.rs b/src/bin/worker.rs index d92939b..b8884d2 100644 --- a/src/bin/worker.rs +++ b/src/bin/worker.rs @@ -1,17 +1,15 @@ use anyhow::Result; +use apalis::layers::retry::RetryPolicy; use apalis::layers::tracing::TraceLayer; use apalis::prelude::*; use apalis_redis::RedisStorage; use clap::Parser; - use dotenvy::dotenv; -use lib::config::Config; -use lib::jobs::AsyncJob; -use lib::log::init_worker_tracing; +use tower::retry::RetryLayer; -pub async fn worker_fn(job: AsyncJob) { - tracing::info!(job = ?job, "Hello, world!"); -} +use lib::config::Config; +use lib::jobs::{handle_async_job, AsyncJob}; +use lib::log::init_worker_tracing; #[tokio::main] async fn main() -> Result<()> { @@ -28,9 +26,10 @@ async fn main() -> Result<()> { Monitor::::new() .register_with_count(2, { WorkerBuilder::new("worker") + .layer(RetryLayer::new(RetryPolicy::default())) .layer(TraceLayer::new()) .backend(apalis_storage) - .build_fn(worker_fn) + .build_fn(handle_async_job) }) .run() .await diff --git a/src/jobs/crawl_feed.rs b/src/jobs/crawl_feed.rs index b05a241..812a5ca 100644 --- a/src/jobs/crawl_feed.rs +++ b/src/jobs/crawl_feed.rs @@ -1,4 +1,6 @@ +use anyhow::Result; use serde::{Deserialize, Serialize}; +use tracing::{info, instrument}; use crate::models::feed::Feed; @@ -6,3 +8,9 @@ use crate::models::feed::Feed; pub struct CrawlFeedJob { pub feed: Feed, } + +#[instrument(skip_all, fields(feed_id = %feed.feed_id))] +pub async fn crawl_feed(CrawlFeedJob { feed }: CrawlFeedJob) -> Result<()> { + info!("Crawling feed: {:?}", feed.feed_id); + Err(anyhow::anyhow!("Not implemented")) +} diff --git a/src/jobs/mod.rs b/src/jobs/mod.rs index 290a1af..43a9cd4 100644 --- a/src/jobs/mod.rs +++ b/src/jobs/mod.rs @@ -1,8 +1,47 @@ +use apalis::prelude::*; +use apalis_redis::RedisStorage; use serde::{Deserialize, Serialize}; +use thiserror::Error; +use tracing::{error, info, instrument}; mod crawl_feed; +pub use crawl_feed::CrawlFeedJob; + #[derive(Debug, Deserialize, Serialize, Clone)] pub enum AsyncJob { HelloWorld(String), + CrawlFeed(CrawlFeedJob), +} + +#[derive(Debug, Error)] +pub enum AsyncJobError { + #[error("error executing job")] + JobError(#[from] anyhow::Error), +} + +#[instrument(skip(job, worker_id), fields(worker_id = %worker_id))] +pub async fn handle_async_job( + job: AsyncJob, + worker_id: WorkerId, + // TODO: add task_id to tracing instrumentation context + // it casuses a panic in 0.6.0 currently, see: https://github.com/geofmureithi/apalis/issues/398 + // task_id: Data, +) -> Result<(), AsyncJobError> { + let result = match job { + AsyncJob::HelloWorld(name) => { + info!("Hello, {}!", name); + Ok(()) + } + AsyncJob::CrawlFeed(job) => crawl_feed::crawl_feed(job).await, + }; + + match result { + Ok(_) => info!("Job completed successfully"), + Err(err) => { + error!("Job failed: {err:?}"); + return Err(AsyncJobError::JobError(err)); + } + }; + Ok(()) }