Upgrade apalis, add fred pool to state, start publishing in jobs
This commit is contained in:
parent
6912ef9017
commit
3d5c0b78c3
20
Cargo.lock
generated
20
Cargo.lock
generated
@ -140,9 +140,9 @@ checksum = "25bdb32cbbdce2b519a9cd7df3a678443100e265d5e25ca763b7572a5104f5f3"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "apalis"
|
name = "apalis"
|
||||||
version = "0.6.0-rc.5"
|
version = "0.6.0-rc.7"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "940921ba55b294cdb5535bb67a0fc29a8b637e0225f99337e4c86432d80da3f0"
|
checksum = "7fe9f6044555ce7984b4dff510f869c25ee8258d13277eadcfee11efa0a827a0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"apalis-core",
|
"apalis-core",
|
||||||
"futures",
|
"futures",
|
||||||
@ -157,9 +157,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "apalis-core"
|
name = "apalis-core"
|
||||||
version = "0.6.0-rc.5"
|
version = "0.6.0-rc.7"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "9ca0df44161ac62045e29a56fd7d4356b6ac138413b761a23e5a0ce9973c1239"
|
checksum = "1f7959df1edc75df26a1ee13a46d1b59fd63ba6642a6c29039583f5aedfd13aa"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-oneshot",
|
"async-oneshot",
|
||||||
"futures",
|
"futures",
|
||||||
@ -174,9 +174,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "apalis-cron"
|
name = "apalis-cron"
|
||||||
version = "0.6.0-rc.5"
|
version = "0.6.0-rc.7"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "ebc25afa9a8d8c1408d1969e15639b120afc53ee1cfa6464ca4fb4fa3c1f1d3f"
|
checksum = "821d4b219ed2fa48e6ff722f23a97db3a70dcb0beeb96d3cf9391d88d23050c2"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"apalis-core",
|
"apalis-core",
|
||||||
"async-stream",
|
"async-stream",
|
||||||
@ -188,9 +188,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "apalis-redis"
|
name = "apalis-redis"
|
||||||
version = "0.6.0-rc.5"
|
version = "0.6.0-rc.7"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "58b90be4cc2db77f062b3ee08ed913dffba0c509b71fd944e4e8241c2618705f"
|
checksum = "405b6457c973eb82ed4a20048131b767ebca684cdc4897d42f622ed115641f21"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"apalis-core",
|
"apalis-core",
|
||||||
"async-stream",
|
"async-stream",
|
||||||
@ -2523,9 +2523,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "redis"
|
name = "redis"
|
||||||
version = "0.25.3"
|
version = "0.25.4"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "6472825949c09872e8f2c50bde59fcefc17748b6be5c90fd67cd8b4daca73bfd"
|
checksum = "e0d7a6955c7511f60f3ba9e86c6d02b3c3f144f8c24b288d1f4e18074ab8bbec"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arc-swap",
|
"arc-swap",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
|
@ -16,9 +16,9 @@ ammonia = "4"
|
|||||||
ansi-to-html = "0.2"
|
ansi-to-html = "0.2"
|
||||||
anyhow = "1"
|
anyhow = "1"
|
||||||
# apalis v0.6 fixes this issue: https://github.com/geofmureithi/apalis/issues/351
|
# apalis v0.6 fixes this issue: https://github.com/geofmureithi/apalis/issues/351
|
||||||
apalis = { version = "0.6.0-rc.5", features = ["retry"] }
|
apalis = { version = "0.6.0-rc.7", features = ["retry"] }
|
||||||
apalis-cron = "0.6.0-rc.5"
|
apalis-cron = "0.6.0-rc.7"
|
||||||
apalis-redis = "0.6.0-rc.5"
|
apalis-redis = "0.6.0-rc.7"
|
||||||
async-trait = "0.1"
|
async-trait = "0.1"
|
||||||
axum = { version = "0.7", features = ["form", "multipart", "query"] }
|
axum = { version = "0.7", features = ["form", "multipart", "query"] }
|
||||||
axum-client-ip = "0.6"
|
axum-client-ip = "0.6"
|
||||||
|
@ -16,6 +16,7 @@ use base64::prelude::*;
|
|||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use dotenvy::dotenv;
|
use dotenvy::dotenv;
|
||||||
|
use fred::prelude::*;
|
||||||
use lettre::transport::smtp::authentication::Credentials;
|
use lettre::transport::smtp::authentication::Credentials;
|
||||||
use lettre::SmtpTransport;
|
use lettre::SmtpTransport;
|
||||||
use notify::Watcher;
|
use notify::Watcher;
|
||||||
@ -28,11 +29,18 @@ use tower::ServiceBuilder;
|
|||||||
use tower_http::{services::ServeDir, trace::TraceLayer};
|
use tower_http::{services::ServeDir, trace::TraceLayer};
|
||||||
use tower_livereload::LiveReloadLayer;
|
use tower_livereload::LiveReloadLayer;
|
||||||
use tower_sessions::cookie::Key;
|
use tower_sessions::cookie::Key;
|
||||||
use tower_sessions_redis_store::{fred::prelude::*, RedisStore};
|
use tower_sessions_redis_store::{
|
||||||
|
fred::{
|
||||||
|
interfaces::ClientLike as TowerSessionsRedisClientLike,
|
||||||
|
prelude::{RedisConfig as TowerSessionsRedisConfig, RedisPool as TowerSessionsRedisPool},
|
||||||
|
},
|
||||||
|
RedisStore,
|
||||||
|
};
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
|
|
||||||
use lib::config::Config;
|
use lib::config::Config;
|
||||||
use lib::domain_locks::DomainLocks;
|
use lib::domain_locks::DomainLocks;
|
||||||
|
use lib::domain_request_limiter::DomainRequestLimiter;
|
||||||
use lib::handlers;
|
use lib::handlers;
|
||||||
use lib::jobs::AsyncJob;
|
use lib::jobs::AsyncJob;
|
||||||
use lib::log::init_tracing;
|
use lib::log::init_tracing;
|
||||||
@ -75,8 +83,20 @@ async fn main() -> Result<()> {
|
|||||||
let redis_config = RedisConfig::from_url(&config.redis_url)?;
|
let redis_config = RedisConfig::from_url(&config.redis_url)?;
|
||||||
let redis_pool = RedisPool::new(redis_config, None, None, None, config.redis_pool_size)?;
|
let redis_pool = RedisPool::new(redis_config, None, None, None, config.redis_pool_size)?;
|
||||||
redis_pool.init().await?;
|
redis_pool.init().await?;
|
||||||
|
let domain_request_limiter = DomainRequestLimiter::new(redis_pool.clone(), 10, 5, 100, 0.5);
|
||||||
|
|
||||||
let session_store = RedisStore::new(redis_pool);
|
// TODO: is it possible to use the same fred RedisPool that the web app uses?
|
||||||
|
let sessions_redis_config = TowerSessionsRedisConfig::from_url(&config.redis_url)?;
|
||||||
|
let sessions_redis_pool = TowerSessionsRedisPool::new(
|
||||||
|
sessions_redis_config,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
config.redis_pool_size,
|
||||||
|
)?;
|
||||||
|
sessions_redis_pool.init().await?;
|
||||||
|
|
||||||
|
let session_store = RedisStore::new(sessions_redis_pool);
|
||||||
let session_layer = SessionManagerLayer::new(session_store)
|
let session_layer = SessionManagerLayer::new(session_store)
|
||||||
.with_secure(!cfg!(debug_assertions))
|
.with_secure(!cfg!(debug_assertions))
|
||||||
.with_expiry(Expiry::OnInactivity(Duration::days(
|
.with_expiry(Expiry::OnInactivity(Duration::days(
|
||||||
@ -159,12 +179,14 @@ async fn main() -> Result<()> {
|
|||||||
log_receiver,
|
log_receiver,
|
||||||
crawls,
|
crawls,
|
||||||
domain_locks,
|
domain_locks,
|
||||||
|
domain_request_limiter,
|
||||||
client,
|
client,
|
||||||
crawl_scheduler,
|
crawl_scheduler,
|
||||||
importer,
|
importer,
|
||||||
imports,
|
imports,
|
||||||
mailer,
|
mailer,
|
||||||
apalis,
|
apalis,
|
||||||
|
redis: redis_pool,
|
||||||
})
|
})
|
||||||
.layer(ServiceBuilder::new().layer(TraceLayer::new_for_http()))
|
.layer(ServiceBuilder::new().layer(TraceLayer::new_for_http()))
|
||||||
.layer(auth_layer)
|
.layer(auth_layer)
|
||||||
|
@ -30,9 +30,8 @@ async fn main() -> Result<()> {
|
|||||||
|
|
||||||
let redis_config = RedisConfig::from_url(&config.redis_url)?;
|
let redis_config = RedisConfig::from_url(&config.redis_url)?;
|
||||||
let redis_pool = RedisPool::new(redis_config, None, None, None, 5)?;
|
let redis_pool = RedisPool::new(redis_config, None, None, None, 5)?;
|
||||||
redis_pool.connect();
|
redis_pool.init().await?;
|
||||||
redis_pool.wait_for_connect().await?;
|
let domain_request_limiter = DomainRequestLimiter::new(redis_pool.clone(), 10, 5, 100, 0.5);
|
||||||
let domain_request_limiter = DomainRequestLimiter::new(redis_pool, 10, 5, 100, 0.5);
|
|
||||||
|
|
||||||
let http_client = Client::builder().user_agent(USER_AGENT).build()?;
|
let http_client = Client::builder().user_agent(USER_AGENT).build()?;
|
||||||
let db = PgPoolOptions::new()
|
let db = PgPoolOptions::new()
|
||||||
@ -51,6 +50,7 @@ async fn main() -> Result<()> {
|
|||||||
.data(domain_request_limiter)
|
.data(domain_request_limiter)
|
||||||
.data(config)
|
.data(config)
|
||||||
.data(apalis_storage.clone())
|
.data(apalis_storage.clone())
|
||||||
|
.data(redis_pool)
|
||||||
.backend(apalis_storage)
|
.backend(apalis_storage)
|
||||||
.build_fn(handle_async_job)
|
.build_fn(handle_async_job)
|
||||||
})
|
})
|
||||||
|
@ -5,6 +5,7 @@ use ammonia::clean;
|
|||||||
use anyhow::{anyhow, Result};
|
use anyhow::{anyhow, Result};
|
||||||
use apalis::prelude::*;
|
use apalis::prelude::*;
|
||||||
use bytes::Buf;
|
use bytes::Buf;
|
||||||
|
use fred::prelude::*;
|
||||||
use readability::extractor;
|
use readability::extractor;
|
||||||
use reqwest::Client;
|
use reqwest::Client;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
@ -29,6 +30,7 @@ pub async fn crawl_entry(
|
|||||||
db: Data<PgPool>,
|
db: Data<PgPool>,
|
||||||
domain_request_limiter: Data<DomainRequestLimiter>,
|
domain_request_limiter: Data<DomainRequestLimiter>,
|
||||||
config: Data<Config>,
|
config: Data<Config>,
|
||||||
|
redis: Data<RedisPool>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let entry = Entry::get(&*db, entry_id).await?;
|
let entry = Entry::get(&*db, entry_id).await?;
|
||||||
info!("got entry from db");
|
info!("got entry from db");
|
||||||
@ -58,5 +60,9 @@ pub async fn crawl_entry(
|
|||||||
fs::write(content_dir.join(format!("{}.html", id)), content)?;
|
fs::write(content_dir.join(format!("{}.html", id)), content)?;
|
||||||
fs::write(content_dir.join(format!("{}.txt", id)), article.text)?;
|
fs::write(content_dir.join(format!("{}.txt", id)), article.text)?;
|
||||||
info!("saved content to filesystem");
|
info!("saved content to filesystem");
|
||||||
|
redis
|
||||||
|
.next()
|
||||||
|
.publish("entries", entry_id.to_string())
|
||||||
|
.await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -5,6 +5,7 @@ use apalis::prelude::*;
|
|||||||
use apalis_redis::RedisStorage;
|
use apalis_redis::RedisStorage;
|
||||||
use chrono::{Duration, Utc};
|
use chrono::{Duration, Utc};
|
||||||
use feed_rs::parser;
|
use feed_rs::parser;
|
||||||
|
use fred::prelude::*;
|
||||||
use http::{header, HeaderMap, StatusCode};
|
use http::{header, HeaderMap, StatusCode};
|
||||||
use reqwest::Client;
|
use reqwest::Client;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
@ -30,6 +31,7 @@ pub async fn crawl_feed(
|
|||||||
db: Data<PgPool>,
|
db: Data<PgPool>,
|
||||||
domain_request_limiter: Data<DomainRequestLimiter>,
|
domain_request_limiter: Data<DomainRequestLimiter>,
|
||||||
apalis: Data<RedisStorage<AsyncJob>>,
|
apalis: Data<RedisStorage<AsyncJob>>,
|
||||||
|
redis: Data<RedisPool>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let mut feed = Feed::get(&*db, feed_id).await?;
|
let mut feed = Feed::get(&*db, feed_id).await?;
|
||||||
info!("got feed from db");
|
info!("got feed from db");
|
||||||
@ -181,5 +183,7 @@ pub async fn crawl_feed(
|
|||||||
}))
|
}))
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
redis.next().publish("feeds", feed_id.to_string()).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -27,13 +27,11 @@ pub enum AsyncJobError {
|
|||||||
JobError(#[from] anyhow::Error),
|
JobError(#[from] anyhow::Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(skip_all, fields(worker_id = %worker_id))]
|
#[instrument(skip_all, fields(worker_id = ?worker_id, task_id = ?task_id))]
|
||||||
pub async fn handle_async_job(
|
pub async fn handle_async_job(
|
||||||
job: AsyncJob,
|
job: AsyncJob,
|
||||||
worker_id: WorkerId,
|
worker_id: Data<WorkerId>,
|
||||||
// TODO: add task_id to tracing instrumentation context
|
task_id: Data<TaskId>,
|
||||||
// it casuses a panic in 0.6.0 currently, see: https://github.com/geofmureithi/apalis/issues/398
|
|
||||||
// task_id: Data<TaskId>,
|
|
||||||
http_client: Data<Client>,
|
http_client: Data<Client>,
|
||||||
db: Data<PgPool>,
|
db: Data<PgPool>,
|
||||||
domain_request_limiter: Data<DomainRequestLimiter>,
|
domain_request_limiter: Data<DomainRequestLimiter>,
|
||||||
|
16
src/state.rs
16
src/state.rs
@ -4,6 +4,7 @@ use std::sync::Arc;
|
|||||||
use apalis_redis::RedisStorage;
|
use apalis_redis::RedisStorage;
|
||||||
use axum::extract::FromRef;
|
use axum::extract::FromRef;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
|
use fred::clients::RedisPool;
|
||||||
use lettre::SmtpTransport;
|
use lettre::SmtpTransport;
|
||||||
use reqwest::Client;
|
use reqwest::Client;
|
||||||
use sqlx::PgPool;
|
use sqlx::PgPool;
|
||||||
@ -14,6 +15,7 @@ use crate::actors::crawl_scheduler::{CrawlSchedulerHandle, CrawlSchedulerHandleM
|
|||||||
use crate::actors::importer::{ImporterHandle, ImporterHandleMessage};
|
use crate::actors::importer::{ImporterHandle, ImporterHandleMessage};
|
||||||
use crate::config::Config;
|
use crate::config::Config;
|
||||||
use crate::domain_locks::DomainLocks;
|
use crate::domain_locks::DomainLocks;
|
||||||
|
use crate::domain_request_limiter::DomainRequestLimiter;
|
||||||
use crate::jobs::AsyncJob;
|
use crate::jobs::AsyncJob;
|
||||||
|
|
||||||
/// A map of feed IDs to a channel receiver for the active `CrawlScheduler` running a feed crawl
|
/// A map of feed IDs to a channel receiver for the active `CrawlScheduler` running a feed crawl
|
||||||
@ -46,12 +48,14 @@ pub struct AppState {
|
|||||||
pub log_receiver: watch::Receiver<Bytes>,
|
pub log_receiver: watch::Receiver<Bytes>,
|
||||||
pub crawls: Crawls,
|
pub crawls: Crawls,
|
||||||
pub domain_locks: DomainLocks,
|
pub domain_locks: DomainLocks,
|
||||||
|
pub domain_request_limiter: DomainRequestLimiter,
|
||||||
pub client: Client,
|
pub client: Client,
|
||||||
pub crawl_scheduler: CrawlSchedulerHandle,
|
pub crawl_scheduler: CrawlSchedulerHandle,
|
||||||
pub importer: ImporterHandle,
|
pub importer: ImporterHandle,
|
||||||
pub imports: Imports,
|
pub imports: Imports,
|
||||||
pub mailer: SmtpTransport,
|
pub mailer: SmtpTransport,
|
||||||
pub apalis: RedisStorage<AsyncJob>,
|
pub apalis: RedisStorage<AsyncJob>,
|
||||||
|
pub redis: RedisPool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FromRef<AppState> for PgPool {
|
impl FromRef<AppState> for PgPool {
|
||||||
@ -84,6 +88,12 @@ impl FromRef<AppState> for DomainLocks {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl FromRef<AppState> for DomainRequestLimiter {
|
||||||
|
fn from_ref(state: &AppState) -> Self {
|
||||||
|
state.domain_request_limiter.clone()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl FromRef<AppState> for Client {
|
impl FromRef<AppState> for Client {
|
||||||
fn from_ref(state: &AppState) -> Self {
|
fn from_ref(state: &AppState) -> Self {
|
||||||
state.client.clone()
|
state.client.clone()
|
||||||
@ -119,3 +129,9 @@ impl FromRef<AppState> for RedisStorage<AsyncJob> {
|
|||||||
state.apalis.clone()
|
state.apalis.clone()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl FromRef<AppState> for RedisPool {
|
||||||
|
fn from_ref(state: &AppState) -> Self {
|
||||||
|
state.redis.clone()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user