Start feed crawlers at startup

This commit is contained in:
Tyler Hallada 2023-07-15 02:30:25 -04:00
parent 0dfde7cd31
commit ea67690d1d

View File

@ -11,21 +11,23 @@ use axum::{
Router, Router,
}; };
use bytes::Bytes; use bytes::Bytes;
use chrono::{Duration, Utc};
use clap::Parser; use clap::Parser;
use dotenvy::dotenv; use dotenvy::dotenv;
use notify::Watcher; use notify::Watcher;
use reqwest::Client;
use sqlx::postgres::PgPoolOptions; use sqlx::postgres::PgPoolOptions;
use tokio::sync::watch::channel; use tokio::sync::watch::channel;
use tower::ServiceBuilder; use tower::ServiceBuilder;
use tower_http::{services::ServeDir, trace::TraceLayer}; use tower_http::{services::ServeDir, trace::TraceLayer};
use tower_livereload::LiveReloadLayer; use tower_livereload::LiveReloadLayer;
use tracing::debug; use tracing::{debug, info};
use lib::config::Config;
use lib::domain_locks::DomainLocks;
use lib::handlers; use lib::handlers;
use lib::log::init_tracing; use lib::log::init_tracing;
use lib::state::AppState; use lib::state::AppState;
use lib::{actors::feed_crawler::FeedCrawlerHandle, config::Config, models::feed::Feed};
use lib::{domain_locks::DomainLocks, models::feed::GetFeedsOptions};
async fn serve(app: Router, addr: SocketAddr) -> Result<()> { async fn serve(app: Router, addr: SocketAddr) -> Result<()> {
debug!("listening on {}", addr); debug!("listening on {}", addr);
@ -73,14 +75,42 @@ async fn main() -> Result<()> {
.route("/log/stream", get(handlers::log::stream)) .route("/log/stream", get(handlers::log::stream))
.nest_service("/static", ServeDir::new("static")) .nest_service("/static", ServeDir::new("static"))
.with_state(AppState { .with_state(AppState {
pool, pool: pool.clone(),
config, config: config.clone(),
log_receiver, log_receiver,
crawls, crawls,
domain_locks, domain_locks: domain_locks.clone(),
}) })
.layer(ServiceBuilder::new().layer(TraceLayer::new_for_http())); .layer(ServiceBuilder::new().layer(TraceLayer::new_for_http()));
info!("starting crawlers");
let mut options = GetFeedsOptions::default();
loop {
let feeds = Feed::get_all(&pool, options.clone()).await?;
if feeds.is_empty() {
break;
}
for feed in feeds.iter() {
let client = Client::new(); // TODO: store in state and reuse
if let Some(last_crawled_at) = feed.last_crawled_at {
if last_crawled_at
>= Utc::now() - Duration::minutes(feed.crawl_interval_minutes.into())
{
continue;
}
}
let feed_crawler = FeedCrawlerHandle::new(
pool.clone(),
client.clone(),
domain_locks.clone(),
config.content_dir.clone(),
);
let _ = feed_crawler.crawl(feed.feed_id).await;
}
options.before = feeds.last().map(|f| f.created_at);
}
info!("done starting crawlers");
if cfg!(debug_assertions) { if cfg!(debug_assertions) {
debug!("starting livereload"); debug!("starting livereload");
let livereload = LiveReloadLayer::new(); let livereload = LiveReloadLayer::new();