Upgrade apalis, add fred pool to state, start publishing in jobs

This commit is contained in:
Tyler Hallada 2024-09-22 13:44:55 -04:00
parent 6912ef9017
commit e41085425a
8 changed files with 75 additions and 25 deletions

20
Cargo.lock generated
View File

@ -140,9 +140,9 @@ checksum = "25bdb32cbbdce2b519a9cd7df3a678443100e265d5e25ca763b7572a5104f5f3"
[[package]]
name = "apalis"
version = "0.6.0-rc.5"
version = "0.6.0-rc.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "940921ba55b294cdb5535bb67a0fc29a8b637e0225f99337e4c86432d80da3f0"
checksum = "7fe9f6044555ce7984b4dff510f869c25ee8258d13277eadcfee11efa0a827a0"
dependencies = [
"apalis-core",
"futures",
@ -157,9 +157,9 @@ dependencies = [
[[package]]
name = "apalis-core"
version = "0.6.0-rc.5"
version = "0.6.0-rc.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ca0df44161ac62045e29a56fd7d4356b6ac138413b761a23e5a0ce9973c1239"
checksum = "1f7959df1edc75df26a1ee13a46d1b59fd63ba6642a6c29039583f5aedfd13aa"
dependencies = [
"async-oneshot",
"futures",
@ -174,9 +174,9 @@ dependencies = [
[[package]]
name = "apalis-cron"
version = "0.6.0-rc.5"
version = "0.6.0-rc.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebc25afa9a8d8c1408d1969e15639b120afc53ee1cfa6464ca4fb4fa3c1f1d3f"
checksum = "821d4b219ed2fa48e6ff722f23a97db3a70dcb0beeb96d3cf9391d88d23050c2"
dependencies = [
"apalis-core",
"async-stream",
@ -188,9 +188,9 @@ dependencies = [
[[package]]
name = "apalis-redis"
version = "0.6.0-rc.5"
version = "0.6.0-rc.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "58b90be4cc2db77f062b3ee08ed913dffba0c509b71fd944e4e8241c2618705f"
checksum = "405b6457c973eb82ed4a20048131b767ebca684cdc4897d42f622ed115641f21"
dependencies = [
"apalis-core",
"async-stream",
@ -2523,9 +2523,9 @@ dependencies = [
[[package]]
name = "redis"
version = "0.25.3"
version = "0.25.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6472825949c09872e8f2c50bde59fcefc17748b6be5c90fd67cd8b4daca73bfd"
checksum = "e0d7a6955c7511f60f3ba9e86c6d02b3c3f144f8c24b288d1f4e18074ab8bbec"
dependencies = [
"arc-swap",
"async-trait",

View File

@ -16,9 +16,9 @@ ammonia = "4"
ansi-to-html = "0.2"
anyhow = "1"
# apalis v0.6 fixes this issue: https://github.com/geofmureithi/apalis/issues/351
apalis = { version = "0.6.0-rc.5", features = ["retry"] }
apalis-cron = "0.6.0-rc.5"
apalis-redis = "0.6.0-rc.5"
apalis = { version = "0.6.0-rc.7", features = ["retry"] }
apalis-cron = "0.6.0-rc.7"
apalis-redis = "0.6.0-rc.7"
async-trait = "0.1"
axum = { version = "0.7", features = ["form", "multipart", "query"] }
axum-client-ip = "0.6"

View File

@ -16,6 +16,7 @@ use base64::prelude::*;
use bytes::Bytes;
use clap::Parser;
use dotenvy::dotenv;
use fred::prelude::*;
use lettre::transport::smtp::authentication::Credentials;
use lettre::SmtpTransport;
use notify::Watcher;
@ -28,11 +29,18 @@ use tower::ServiceBuilder;
use tower_http::{services::ServeDir, trace::TraceLayer};
use tower_livereload::LiveReloadLayer;
use tower_sessions::cookie::Key;
use tower_sessions_redis_store::{fred::prelude::*, RedisStore};
use tower_sessions_redis_store::{
fred::{
interfaces::ClientLike as TowerSessionsRedisClientLike,
prelude::{RedisConfig as TowerSessionsRedisConfig, RedisPool as TowerSessionsRedisPool},
},
RedisStore,
};
use tracing::debug;
use lib::config::Config;
use lib::domain_locks::DomainLocks;
use lib::domain_request_limiter::DomainRequestLimiter;
use lib::handlers;
use lib::jobs::AsyncJob;
use lib::log::init_tracing;
@ -75,8 +83,20 @@ async fn main() -> Result<()> {
let redis_config = RedisConfig::from_url(&config.redis_url)?;
let redis_pool = RedisPool::new(redis_config, None, None, None, config.redis_pool_size)?;
redis_pool.init().await?;
let domain_request_limiter = DomainRequestLimiter::new(redis_pool.clone(), 10, 5, 100, 0.5);
let session_store = RedisStore::new(redis_pool);
// TODO: is it possible to use the same fred RedisPool that the web app uses?
let sessions_redis_config = TowerSessionsRedisConfig::from_url(&config.redis_url)?;
let sessions_redis_pool = TowerSessionsRedisPool::new(
sessions_redis_config,
None,
None,
None,
config.redis_pool_size,
)?;
sessions_redis_pool.init().await?;
let session_store = RedisStore::new(sessions_redis_pool);
let session_layer = SessionManagerLayer::new(session_store)
.with_secure(!cfg!(debug_assertions))
.with_expiry(Expiry::OnInactivity(Duration::days(
@ -159,12 +179,14 @@ async fn main() -> Result<()> {
log_receiver,
crawls,
domain_locks,
domain_request_limiter,
client,
crawl_scheduler,
importer,
imports,
mailer,
apalis,
redis: redis_pool,
})
.layer(ServiceBuilder::new().layer(TraceLayer::new_for_http()))
.layer(auth_layer)

View File

@ -30,9 +30,8 @@ async fn main() -> Result<()> {
let redis_config = RedisConfig::from_url(&config.redis_url)?;
let redis_pool = RedisPool::new(redis_config, None, None, None, 5)?;
redis_pool.connect();
redis_pool.wait_for_connect().await?;
let domain_request_limiter = DomainRequestLimiter::new(redis_pool, 10, 5, 100, 0.5);
redis_pool.init().await?;
let domain_request_limiter = DomainRequestLimiter::new(redis_pool.clone(), 10, 5, 100, 0.5);
let http_client = Client::builder().user_agent(USER_AGENT).build()?;
let db = PgPoolOptions::new()
@ -51,6 +50,7 @@ async fn main() -> Result<()> {
.data(domain_request_limiter)
.data(config)
.data(apalis_storage.clone())
.data(redis_pool)
.backend(apalis_storage)
.build_fn(handle_async_job)
})

View File

@ -5,6 +5,7 @@ use ammonia::clean;
use anyhow::{anyhow, Result};
use apalis::prelude::*;
use bytes::Buf;
use fred::prelude::*;
use readability::extractor;
use reqwest::Client;
use serde::{Deserialize, Serialize};
@ -29,6 +30,7 @@ pub async fn crawl_entry(
db: Data<PgPool>,
domain_request_limiter: Data<DomainRequestLimiter>,
config: Data<Config>,
redis: Data<RedisPool>,
) -> Result<()> {
let entry = Entry::get(&*db, entry_id).await?;
info!("got entry from db");
@ -58,5 +60,9 @@ pub async fn crawl_entry(
fs::write(content_dir.join(format!("{}.html", id)), content)?;
fs::write(content_dir.join(format!("{}.txt", id)), article.text)?;
info!("saved content to filesystem");
redis
.next()
.publish("entries", entry_id.to_string())
.await?;
Ok(())
}

View File

@ -5,6 +5,7 @@ use apalis::prelude::*;
use apalis_redis::RedisStorage;
use chrono::{Duration, Utc};
use feed_rs::parser;
use fred::prelude::*;
use http::{header, HeaderMap, StatusCode};
use reqwest::Client;
use serde::{Deserialize, Serialize};
@ -30,6 +31,7 @@ pub async fn crawl_feed(
db: Data<PgPool>,
domain_request_limiter: Data<DomainRequestLimiter>,
apalis: Data<RedisStorage<AsyncJob>>,
redis: Data<RedisPool>,
) -> Result<()> {
let mut feed = Feed::get(&*db, feed_id).await?;
info!("got feed from db");
@ -181,5 +183,7 @@ pub async fn crawl_feed(
}))
.await?;
}
redis.next().publish("feeds", feed_id.to_string()).await?;
Ok(())
}

View File

@ -1,5 +1,6 @@
use apalis::prelude::*;
use apalis_redis::RedisStorage;
use fred::prelude::*;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
@ -27,18 +28,17 @@ pub enum AsyncJobError {
JobError(#[from] anyhow::Error),
}
#[instrument(skip_all, fields(worker_id = %worker_id))]
#[instrument(skip_all, fields(worker_id = ?worker_id, task_id = ?task_id))]
pub async fn handle_async_job(
job: AsyncJob,
worker_id: WorkerId,
// TODO: add task_id to tracing instrumentation context
// it casuses a panic in 0.6.0 currently, see: https://github.com/geofmureithi/apalis/issues/398
// task_id: Data<TaskId>,
worker_id: Data<WorkerId>,
task_id: Data<TaskId>,
http_client: Data<Client>,
db: Data<PgPool>,
domain_request_limiter: Data<DomainRequestLimiter>,
config: Data<Config>,
apalis: Data<RedisStorage<AsyncJob>>,
redis: Data<RedisPool>,
) -> Result<(), AsyncJobError> {
let result = match job {
AsyncJob::HelloWorld(name) => {
@ -46,10 +46,12 @@ pub async fn handle_async_job(
Ok(())
}
AsyncJob::CrawlFeed(job) => {
crawl_feed::crawl_feed(job, http_client, db, domain_request_limiter, apalis).await
crawl_feed::crawl_feed(job, http_client, db, domain_request_limiter, apalis, redis)
.await
}
AsyncJob::CrawlEntry(job) => {
crawl_entry::crawl_entry(job, http_client, db, domain_request_limiter, config).await
crawl_entry::crawl_entry(job, http_client, db, domain_request_limiter, config, redis)
.await
}
};

View File

@ -4,6 +4,7 @@ use std::sync::Arc;
use apalis_redis::RedisStorage;
use axum::extract::FromRef;
use bytes::Bytes;
use fred::clients::RedisPool;
use lettre::SmtpTransport;
use reqwest::Client;
use sqlx::PgPool;
@ -14,6 +15,7 @@ use crate::actors::crawl_scheduler::{CrawlSchedulerHandle, CrawlSchedulerHandleM
use crate::actors::importer::{ImporterHandle, ImporterHandleMessage};
use crate::config::Config;
use crate::domain_locks::DomainLocks;
use crate::domain_request_limiter::DomainRequestLimiter;
use crate::jobs::AsyncJob;
/// A map of feed IDs to a channel receiver for the active `CrawlScheduler` running a feed crawl
@ -46,12 +48,14 @@ pub struct AppState {
pub log_receiver: watch::Receiver<Bytes>,
pub crawls: Crawls,
pub domain_locks: DomainLocks,
pub domain_request_limiter: DomainRequestLimiter,
pub client: Client,
pub crawl_scheduler: CrawlSchedulerHandle,
pub importer: ImporterHandle,
pub imports: Imports,
pub mailer: SmtpTransport,
pub apalis: RedisStorage<AsyncJob>,
pub redis: RedisPool,
}
impl FromRef<AppState> for PgPool {
@ -84,6 +88,12 @@ impl FromRef<AppState> for DomainLocks {
}
}
impl FromRef<AppState> for DomainRequestLimiter {
fn from_ref(state: &AppState) -> Self {
state.domain_request_limiter.clone()
}
}
impl FromRef<AppState> for Client {
fn from_ref(state: &AppState) -> Self {
state.client.clone()
@ -119,3 +129,9 @@ impl FromRef<AppState> for RedisStorage<AsyncJob> {
state.apalis.clone()
}
}
impl FromRef<AppState> for RedisPool {
fn from_ref(state: &AppState) -> Self {
state.redis.clone()
}
}