From 3d5c0b78c378d87c39319f6fed3ce19b862c0756 Mon Sep 17 00:00:00 2001 From: Tyler Hallada Date: Sun, 22 Sep 2024 13:44:55 -0400 Subject: [PATCH] Upgrade apalis, add fred pool to state, start publishing in jobs --- Cargo.lock | 20 ++++++++++---------- Cargo.toml | 6 +++--- src/bin/web.rs | 26 ++++++++++++++++++++++++-- src/bin/worker.rs | 6 +++--- src/jobs/crawl_entry.rs | 6 ++++++ src/jobs/crawl_feed.rs | 4 ++++ src/jobs/mod.rs | 8 +++----- src/state.rs | 16 ++++++++++++++++ 8 files changed, 69 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1c4a207..b84cb1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -140,9 +140,9 @@ checksum = "25bdb32cbbdce2b519a9cd7df3a678443100e265d5e25ca763b7572a5104f5f3" [[package]] name = "apalis" -version = "0.6.0-rc.5" +version = "0.6.0-rc.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "940921ba55b294cdb5535bb67a0fc29a8b637e0225f99337e4c86432d80da3f0" +checksum = "7fe9f6044555ce7984b4dff510f869c25ee8258d13277eadcfee11efa0a827a0" dependencies = [ "apalis-core", "futures", @@ -157,9 +157,9 @@ dependencies = [ [[package]] name = "apalis-core" -version = "0.6.0-rc.5" +version = "0.6.0-rc.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ca0df44161ac62045e29a56fd7d4356b6ac138413b761a23e5a0ce9973c1239" +checksum = "1f7959df1edc75df26a1ee13a46d1b59fd63ba6642a6c29039583f5aedfd13aa" dependencies = [ "async-oneshot", "futures", @@ -174,9 +174,9 @@ dependencies = [ [[package]] name = "apalis-cron" -version = "0.6.0-rc.5" +version = "0.6.0-rc.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebc25afa9a8d8c1408d1969e15639b120afc53ee1cfa6464ca4fb4fa3c1f1d3f" +checksum = "821d4b219ed2fa48e6ff722f23a97db3a70dcb0beeb96d3cf9391d88d23050c2" dependencies = [ "apalis-core", "async-stream", @@ -188,9 +188,9 @@ dependencies = [ [[package]] name = "apalis-redis" -version = "0.6.0-rc.5" +version = "0.6.0-rc.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58b90be4cc2db77f062b3ee08ed913dffba0c509b71fd944e4e8241c2618705f" +checksum = "405b6457c973eb82ed4a20048131b767ebca684cdc4897d42f622ed115641f21" dependencies = [ "apalis-core", "async-stream", @@ -2523,9 +2523,9 @@ dependencies = [ [[package]] name = "redis" -version = "0.25.3" +version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6472825949c09872e8f2c50bde59fcefc17748b6be5c90fd67cd8b4daca73bfd" +checksum = "e0d7a6955c7511f60f3ba9e86c6d02b3c3f144f8c24b288d1f4e18074ab8bbec" dependencies = [ "arc-swap", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 218bd89..e44ff5e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,9 +16,9 @@ ammonia = "4" ansi-to-html = "0.2" anyhow = "1" # apalis v0.6 fixes this issue: https://github.com/geofmureithi/apalis/issues/351 -apalis = { version = "0.6.0-rc.5", features = ["retry"] } -apalis-cron = "0.6.0-rc.5" -apalis-redis = "0.6.0-rc.5" +apalis = { version = "0.6.0-rc.7", features = ["retry"] } +apalis-cron = "0.6.0-rc.7" +apalis-redis = "0.6.0-rc.7" async-trait = "0.1" axum = { version = "0.7", features = ["form", "multipart", "query"] } axum-client-ip = "0.6" diff --git a/src/bin/web.rs b/src/bin/web.rs index 9c5ebf9..489cafa 100644 --- a/src/bin/web.rs +++ b/src/bin/web.rs @@ -16,6 +16,7 @@ use base64::prelude::*; use bytes::Bytes; use clap::Parser; use dotenvy::dotenv; +use fred::prelude::*; use lettre::transport::smtp::authentication::Credentials; use lettre::SmtpTransport; use notify::Watcher; @@ -28,11 +29,18 @@ use tower::ServiceBuilder; use tower_http::{services::ServeDir, trace::TraceLayer}; use tower_livereload::LiveReloadLayer; use tower_sessions::cookie::Key; -use tower_sessions_redis_store::{fred::prelude::*, RedisStore}; +use tower_sessions_redis_store::{ + fred::{ + interfaces::ClientLike as TowerSessionsRedisClientLike, + prelude::{RedisConfig as TowerSessionsRedisConfig, RedisPool as TowerSessionsRedisPool}, + }, + RedisStore, +}; use tracing::debug; use lib::config::Config; use lib::domain_locks::DomainLocks; +use lib::domain_request_limiter::DomainRequestLimiter; use lib::handlers; use lib::jobs::AsyncJob; use lib::log::init_tracing; @@ -75,8 +83,20 @@ async fn main() -> Result<()> { let redis_config = RedisConfig::from_url(&config.redis_url)?; let redis_pool = RedisPool::new(redis_config, None, None, None, config.redis_pool_size)?; redis_pool.init().await?; + let domain_request_limiter = DomainRequestLimiter::new(redis_pool.clone(), 10, 5, 100, 0.5); - let session_store = RedisStore::new(redis_pool); + // TODO: is it possible to use the same fred RedisPool that the web app uses? + let sessions_redis_config = TowerSessionsRedisConfig::from_url(&config.redis_url)?; + let sessions_redis_pool = TowerSessionsRedisPool::new( + sessions_redis_config, + None, + None, + None, + config.redis_pool_size, + )?; + sessions_redis_pool.init().await?; + + let session_store = RedisStore::new(sessions_redis_pool); let session_layer = SessionManagerLayer::new(session_store) .with_secure(!cfg!(debug_assertions)) .with_expiry(Expiry::OnInactivity(Duration::days( @@ -159,12 +179,14 @@ async fn main() -> Result<()> { log_receiver, crawls, domain_locks, + domain_request_limiter, client, crawl_scheduler, importer, imports, mailer, apalis, + redis: redis_pool, }) .layer(ServiceBuilder::new().layer(TraceLayer::new_for_http())) .layer(auth_layer) diff --git a/src/bin/worker.rs b/src/bin/worker.rs index 363a713..e5dccc4 100644 --- a/src/bin/worker.rs +++ b/src/bin/worker.rs @@ -30,9 +30,8 @@ async fn main() -> Result<()> { let redis_config = RedisConfig::from_url(&config.redis_url)?; let redis_pool = RedisPool::new(redis_config, None, None, None, 5)?; - redis_pool.connect(); - redis_pool.wait_for_connect().await?; - let domain_request_limiter = DomainRequestLimiter::new(redis_pool, 10, 5, 100, 0.5); + redis_pool.init().await?; + let domain_request_limiter = DomainRequestLimiter::new(redis_pool.clone(), 10, 5, 100, 0.5); let http_client = Client::builder().user_agent(USER_AGENT).build()?; let db = PgPoolOptions::new() @@ -51,6 +50,7 @@ async fn main() -> Result<()> { .data(domain_request_limiter) .data(config) .data(apalis_storage.clone()) + .data(redis_pool) .backend(apalis_storage) .build_fn(handle_async_job) }) diff --git a/src/jobs/crawl_entry.rs b/src/jobs/crawl_entry.rs index dc5a9e6..a98ada2 100644 --- a/src/jobs/crawl_entry.rs +++ b/src/jobs/crawl_entry.rs @@ -5,6 +5,7 @@ use ammonia::clean; use anyhow::{anyhow, Result}; use apalis::prelude::*; use bytes::Buf; +use fred::prelude::*; use readability::extractor; use reqwest::Client; use serde::{Deserialize, Serialize}; @@ -29,6 +30,7 @@ pub async fn crawl_entry( db: Data, domain_request_limiter: Data, config: Data, + redis: Data, ) -> Result<()> { let entry = Entry::get(&*db, entry_id).await?; info!("got entry from db"); @@ -58,5 +60,9 @@ pub async fn crawl_entry( fs::write(content_dir.join(format!("{}.html", id)), content)?; fs::write(content_dir.join(format!("{}.txt", id)), article.text)?; info!("saved content to filesystem"); + redis + .next() + .publish("entries", entry_id.to_string()) + .await?; Ok(()) } diff --git a/src/jobs/crawl_feed.rs b/src/jobs/crawl_feed.rs index dc48e61..4dbd55c 100644 --- a/src/jobs/crawl_feed.rs +++ b/src/jobs/crawl_feed.rs @@ -5,6 +5,7 @@ use apalis::prelude::*; use apalis_redis::RedisStorage; use chrono::{Duration, Utc}; use feed_rs::parser; +use fred::prelude::*; use http::{header, HeaderMap, StatusCode}; use reqwest::Client; use serde::{Deserialize, Serialize}; @@ -30,6 +31,7 @@ pub async fn crawl_feed( db: Data, domain_request_limiter: Data, apalis: Data>, + redis: Data, ) -> Result<()> { let mut feed = Feed::get(&*db, feed_id).await?; info!("got feed from db"); @@ -181,5 +183,7 @@ pub async fn crawl_feed( })) .await?; } + + redis.next().publish("feeds", feed_id.to_string()).await?; Ok(()) } diff --git a/src/jobs/mod.rs b/src/jobs/mod.rs index 6d266fb..e3592e9 100644 --- a/src/jobs/mod.rs +++ b/src/jobs/mod.rs @@ -27,13 +27,11 @@ pub enum AsyncJobError { JobError(#[from] anyhow::Error), } -#[instrument(skip_all, fields(worker_id = %worker_id))] +#[instrument(skip_all, fields(worker_id = ?worker_id, task_id = ?task_id))] pub async fn handle_async_job( job: AsyncJob, - worker_id: WorkerId, - // TODO: add task_id to tracing instrumentation context - // it casuses a panic in 0.6.0 currently, see: https://github.com/geofmureithi/apalis/issues/398 - // task_id: Data, + worker_id: Data, + task_id: Data, http_client: Data, db: Data, domain_request_limiter: Data, diff --git a/src/state.rs b/src/state.rs index dc13187..b061302 100644 --- a/src/state.rs +++ b/src/state.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use apalis_redis::RedisStorage; use axum::extract::FromRef; use bytes::Bytes; +use fred::clients::RedisPool; use lettre::SmtpTransport; use reqwest::Client; use sqlx::PgPool; @@ -14,6 +15,7 @@ use crate::actors::crawl_scheduler::{CrawlSchedulerHandle, CrawlSchedulerHandleM use crate::actors::importer::{ImporterHandle, ImporterHandleMessage}; use crate::config::Config; use crate::domain_locks::DomainLocks; +use crate::domain_request_limiter::DomainRequestLimiter; use crate::jobs::AsyncJob; /// A map of feed IDs to a channel receiver for the active `CrawlScheduler` running a feed crawl @@ -46,12 +48,14 @@ pub struct AppState { pub log_receiver: watch::Receiver, pub crawls: Crawls, pub domain_locks: DomainLocks, + pub domain_request_limiter: DomainRequestLimiter, pub client: Client, pub crawl_scheduler: CrawlSchedulerHandle, pub importer: ImporterHandle, pub imports: Imports, pub mailer: SmtpTransport, pub apalis: RedisStorage, + pub redis: RedisPool, } impl FromRef for PgPool { @@ -84,6 +88,12 @@ impl FromRef for DomainLocks { } } +impl FromRef for DomainRequestLimiter { + fn from_ref(state: &AppState) -> Self { + state.domain_request_limiter.clone() + } +} + impl FromRef for Client { fn from_ref(state: &AppState) -> Self { state.client.clone() @@ -119,3 +129,9 @@ impl FromRef for RedisStorage { state.apalis.clone() } } + +impl FromRef for RedisPool { + fn from_ref(state: &AppState) -> Self { + state.redis.clone() + } +}