From ea67690d1d7b0997453a72a3fe8e7c6fdc16825a Mon Sep 17 00:00:00 2001 From: Tyler Hallada Date: Sat, 15 Jul 2023 02:30:25 -0400 Subject: [PATCH] Start feed crawlers at startup --- src/main.rs | 42 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/src/main.rs b/src/main.rs index 6ed3eff..eb7d4c5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,21 +11,23 @@ use axum::{ Router, }; use bytes::Bytes; +use chrono::{Duration, Utc}; use clap::Parser; use dotenvy::dotenv; use notify::Watcher; +use reqwest::Client; use sqlx::postgres::PgPoolOptions; use tokio::sync::watch::channel; use tower::ServiceBuilder; use tower_http::{services::ServeDir, trace::TraceLayer}; use tower_livereload::LiveReloadLayer; -use tracing::debug; +use tracing::{debug, info}; -use lib::config::Config; -use lib::domain_locks::DomainLocks; use lib::handlers; use lib::log::init_tracing; use lib::state::AppState; +use lib::{actors::feed_crawler::FeedCrawlerHandle, config::Config, models::feed::Feed}; +use lib::{domain_locks::DomainLocks, models::feed::GetFeedsOptions}; async fn serve(app: Router, addr: SocketAddr) -> Result<()> { debug!("listening on {}", addr); @@ -73,14 +75,42 @@ async fn main() -> Result<()> { .route("/log/stream", get(handlers::log::stream)) .nest_service("/static", ServeDir::new("static")) .with_state(AppState { - pool, - config, + pool: pool.clone(), + config: config.clone(), log_receiver, crawls, - domain_locks, + domain_locks: domain_locks.clone(), }) .layer(ServiceBuilder::new().layer(TraceLayer::new_for_http())); + info!("starting crawlers"); + let mut options = GetFeedsOptions::default(); + loop { + let feeds = Feed::get_all(&pool, options.clone()).await?; + if feeds.is_empty() { + break; + } + for feed in feeds.iter() { + let client = Client::new(); // TODO: store in state and reuse + if let Some(last_crawled_at) = feed.last_crawled_at { + if last_crawled_at + >= Utc::now() - Duration::minutes(feed.crawl_interval_minutes.into()) + { + continue; + } + } + let feed_crawler = FeedCrawlerHandle::new( + pool.clone(), + client.clone(), + domain_locks.clone(), + config.content_dir.clone(), + ); + let _ = feed_crawler.crawl(feed.feed_id).await; + } + options.before = feeds.last().map(|f| f.created_at); + } + info!("done starting crawlers"); + if cfg!(debug_assertions) { debug!("starting livereload"); let livereload = LiveReloadLayer::new();