WIP add apalis & split up main process

This commit is contained in:
Tyler Hallada 2024-07-27 13:55:08 -04:00
parent 4a5d514cc7
commit 764d3f23b8
10 changed files with 514 additions and 5 deletions

307
Cargo.lock generated
View File

@ -138,6 +138,91 @@ version = "1.0.83"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25bdb32cbbdce2b519a9cd7df3a678443100e265d5e25ca763b7572a5104f5f3"
[[package]]
name = "apalis"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be13bf89e734a1ec4d44233429aafea5a9e693c98a4a126b00a29f321d4a2e03"
dependencies = [
"apalis-core",
"apalis-cron",
"apalis-redis",
"apalis-sql",
"futures",
"pin-project-lite",
"serde",
"thiserror",
"tokio",
"tower",
"tracing",
"tracing-futures",
]
[[package]]
name = "apalis-core"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fb0704a3274e289bebbe042d7adf2b1455a2afd084c7a835cfc2e918cad2eff"
dependencies = [
"async-oneshot",
"futures",
"futures-timer",
"pin-project-lite",
"serde",
"serde_json",
"thiserror",
"tower",
"ulid",
]
[[package]]
name = "apalis-cron"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3918af413df3fb888bb662b7504ea16cbbabd20293a08f9e7548c57764612db"
dependencies = [
"apalis-core",
"async-stream",
"chrono",
"cron",
"futures",
"tower",
]
[[package]]
name = "apalis-redis"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8deabd06576b44f87e0fa709e44aa7edc47937b4325eac78384168df47ba30b"
dependencies = [
"apalis-core",
"async-stream",
"async-trait",
"chrono",
"futures",
"log",
"redis",
"serde",
"tokio",
]
[[package]]
name = "apalis-sql"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb4df1ac2762e170a12a920d1f74207816341e5eed5870887cc3bcd9e8c59028"
dependencies = [
"apalis-core",
"async-stream",
"futures",
"futures-lite",
"log",
"serde",
"serde_json",
"sqlx",
"tokio",
]
[[package]]
name = "arc-swap"
version = "1.7.1"
@ -156,6 +241,37 @@ dependencies = [
"password-hash",
]
[[package]]
name = "async-oneshot"
version = "0.5.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae47de2a02d543205f3f5457a90b6ecbc9494db70557bd29590ec8f1ddff5463"
dependencies = [
"futures-micro",
]
[[package]]
name = "async-stream"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51"
dependencies = [
"async-stream-impl",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-stream-impl"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.61",
]
[[package]]
name = "async-trait"
version = "0.1.80"
@ -469,6 +585,20 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422"
[[package]]
name = "combine"
version = "4.6.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd"
dependencies = [
"bytes",
"futures-core",
"memchr",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]
name = "const-oid"
version = "0.9.6"
@ -532,6 +662,7 @@ dependencies = [
"ammonia",
"ansi-to-html",
"anyhow",
"apalis",
"async-trait",
"axum",
"axum-client-ip",
@ -596,6 +727,17 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff"
[[package]]
name = "cron"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f8c3e73077b4b4a6ab1ea5047c37c57aee77657bc8ecd6f29b0af082d0b0c07"
dependencies = [
"chrono",
"nom",
"once_cell",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.12"
@ -976,6 +1118,19 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
[[package]]
name = "futures-lite"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52527eb5074e35e9339c6b4e8d12600c7128b68fb25dcb9fa9dec18f7c25f3a5"
dependencies = [
"fastrand",
"futures-core",
"futures-io",
"parking",
"pin-project-lite",
]
[[package]]
name = "futures-macro"
version = "0.3.30"
@ -987,6 +1142,15 @@ dependencies = [
"syn 2.0.61",
]
[[package]]
name = "futures-micro"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b460264b3593d68b16a7bc35f7bc226ddfebdf9a1c8db1ed95d5cc6b7168c826"
dependencies = [
"pin-project-lite",
]
[[package]]
name = "futures-sink"
version = "0.3.30"
@ -999,6 +1163,12 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004"
[[package]]
name = "futures-timer"
version = "3.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24"
[[package]]
name = "futures-util"
version = "0.3.30"
@ -2024,6 +2194,12 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "parking"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae"
[[package]]
name = "parking_lot"
version = "0.12.2"
@ -2355,6 +2531,29 @@ dependencies = [
"url",
]
[[package]]
name = "redis"
version = "0.25.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6472825949c09872e8f2c50bde59fcefc17748b6be5c90fd67cd8b4daca73bfd"
dependencies = [
"arc-swap",
"async-trait",
"bytes",
"combine",
"futures",
"futures-util",
"itoa",
"percent-encoding",
"pin-project-lite",
"ryu",
"sha1_smol",
"tokio",
"tokio-retry",
"tokio-util",
"url",
]
[[package]]
name = "redis-protocol"
version = "4.1.0"
@ -2513,6 +2712,21 @@ dependencies = [
"winreg 0.52.0",
]
[[package]]
name = "ring"
version = "0.17.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d"
dependencies = [
"cc",
"cfg-if",
"getrandom",
"libc",
"spin 0.9.8",
"untrusted",
"windows-sys 0.52.0",
]
[[package]]
name = "rmp"
version = "0.8.14"
@ -2574,6 +2788,17 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "rustls"
version = "0.21.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e"
dependencies = [
"ring",
"rustls-webpki",
"sct",
]
[[package]]
name = "rustls-pemfile"
version = "1.0.4"
@ -2599,6 +2824,16 @@ version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d"
[[package]]
name = "rustls-webpki"
version = "0.101.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "rustversion"
version = "1.0.16"
@ -2635,6 +2870,16 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "sct"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "security-framework"
version = "2.11.0"
@ -2758,6 +3003,12 @@ dependencies = [
"digest",
]
[[package]]
name = "sha1_smol"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012"
[[package]]
name = "sha2"
version = "0.10.8"
@ -2912,6 +3163,8 @@ dependencies = [
"once_cell",
"paste",
"percent-encoding",
"rustls",
"rustls-pemfile 1.0.4",
"serde",
"serde_json",
"sha2",
@ -2923,6 +3176,7 @@ dependencies = [
"tracing",
"url",
"uuid",
"webpki-roots",
]
[[package]]
@ -3336,6 +3590,17 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-retry"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f"
dependencies = [
"pin-project",
"rand",
"tokio",
]
[[package]]
name = "tokio-stream"
version = "0.1.15"
@ -3555,6 +3820,15 @@ dependencies = [
"valuable",
]
[[package]]
name = "tracing-futures"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2"
dependencies = [
"tracing",
]
[[package]]
name = "tracing-log"
version = "0.2.0"
@ -3596,6 +3870,17 @@ version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"
[[package]]
name = "ulid"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34778c17965aa2a08913b57e1f34db9b4a63f5de31768b55bf20d2795f921259"
dependencies = [
"getrandom",
"rand",
"web-time",
]
[[package]]
name = "unicase"
version = "2.7.0"
@ -3638,6 +3923,12 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e"
[[package]]
name = "untrusted"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
[[package]]
name = "url"
version = "2.5.0"
@ -3832,6 +4123,22 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "web-time"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb"
dependencies = [
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "webpki-roots"
version = "0.25.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1"
[[package]]
name = "whoami"
version = "1.5.1"

View File

@ -2,7 +2,7 @@
name = "crawlnicle"
version = "0.1.0"
edition = "2021"
default-run = "crawlnicle"
default-run = "web"
authors = ["Tyler Hallada <tyler@hallada.net>"]
[lib]
@ -15,6 +15,7 @@ path = "src/lib.rs"
ammonia = "4"
ansi-to-html = "0.2"
anyhow = "1"
apalis = { version = "0.5", features = ["redis", "cron", "retry"] }
async-trait = "0.1"
axum = { version = "0.7", features = ["form", "multipart", "query"] }
axum-client-ip = "0.6"

5
rust-analyzer.json Normal file
View File

@ -0,0 +1,5 @@
{
"files": {
"excludeDirs": ["frontend"]
}
}

110
src/bin/crawler.rs Normal file
View File

@ -0,0 +1,110 @@
use anyhow::{anyhow, Result};
use apalis::cron::{CronStream, Schedule};
use apalis::layers::retry::{RetryLayer, RetryPolicy};
use apalis::layers::tracing::TraceLayer;
use apalis::prelude::*;
use apalis::redis::RedisStorage;
use chrono::{DateTime, Utc};
use clap::Parser;
use lib::actors::crawl_scheduler::CrawlSchedulerError;
use lib::jobs::AsyncJob;
use lib::models::feed::{Feed, GetFeedsOptions};
use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool;
use std::str::FromStr;
use std::sync::Arc;
use tower::ServiceBuilder;
use tracing::{info, instrument};
use dotenvy::dotenv;
use lib::config::Config;
use lib::log::init_worker_tracing;
#[derive(Default, Debug, Clone)]
struct Crawl(DateTime<Utc>);
impl From<DateTime<Utc>> for Crawl {
fn from(t: DateTime<Utc>) -> Self {
Crawl(t)
}
}
impl Job for Crawl {
const NAME: &'static str = "apalis::Crawl";
}
struct State {
pool: PgPool,
apalis: RedisStorage<AsyncJob>,
}
#[instrument(skip_all)]
pub async fn crawl_fn(job: Crawl, state: Data<Arc<State>>) -> Result<()> {
tracing::info!(job = ?job, "crawl");
let mut apalis = (state.apalis).clone();
let mut options = GetFeedsOptions::default();
loop {
info!("fetching feeds before: {:?}", options.before);
let feeds = match Feed::get_all(&state.pool, &options).await {
Err(err) => {
return Err(anyhow!(err));
}
Ok(feeds) if feeds.is_empty() => {
info!("no more feeds found");
break;
}
Ok(feeds) => feeds,
};
info!("found {} feeds", feeds.len());
options.before = feeds.last().map(|f| f.created_at);
for feed in feeds.into_iter() {
// self.spawn_crawler_loop(feed, respond_to.clone());
apalis
.push(AsyncJob::HelloWorld(feed.feed_id.to_string()))
.await?;
}
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<()> {
dotenv().ok();
let config = Config::parse();
let _guard = init_worker_tracing()?;
let pool = PgPoolOptions::new()
.max_connections(config.database_max_connections)
.acquire_timeout(std::time::Duration::from_secs(3))
.connect(&config.database_url)
.await?;
// TODO: use redis_pool from above instead of making a new connection
// See: https://github.com/geofmureithi/apalis/issues/290
let redis_conn = apalis::redis::connect(config.redis_url.clone()).await?;
let apalis_config = apalis::redis::Config::default();
let mut apalis: RedisStorage<AsyncJob> =
RedisStorage::new_with_config(redis_conn, apalis_config);
let schedule = Schedule::from_str("0 * * * * *").unwrap();
// let service = ServiceBuilder::new()
// .layer(RetryLayer::new(RetryPolicy::default()))
// .layer(TraceLayer::new())
// .service(service_fn(crawl_fn));
let worker = WorkerBuilder::new("crawler")
.stream(CronStream::new(schedule).into_stream())
.layer(RetryLayer::new(RetryPolicy::default()))
.layer(TraceLayer::new())
.data(Arc::new(State { pool, apalis }))
.build_fn(crawl_fn);
Monitor::<TokioExecutor>::new()
.register(worker)
.run()
.await
.unwrap();
Ok(())
}

View File

@ -1,6 +1,8 @@
use std::{collections::HashMap, net::SocketAddr, path::Path, sync::Arc};
use anyhow::Result;
use apalis::prelude::*;
use apalis::redis::RedisStorage;
use axum::{
routing::{get, post},
Router,
@ -32,6 +34,7 @@ use tracing::debug;
use lib::config::Config;
use lib::domain_locks::DomainLocks;
use lib::handlers;
use lib::jobs::AsyncJob;
use lib::log::init_tracing;
use lib::state::AppState;
use lib::USER_AGENT;
@ -93,6 +96,17 @@ async fn main() -> Result<()> {
sqlx::migrate!().run(&pool).await?;
// TODO: use redis_pool from above instead of making a new connection
// See: https://github.com/geofmureithi/apalis/issues/290
let redis_conn = apalis::redis::connect(config.redis_url.clone()).await?;
let apalis_config = apalis::redis::Config::default();
let mut apalis: RedisStorage<AsyncJob> =
RedisStorage::new_with_config(redis_conn, apalis_config);
apalis
.push(AsyncJob::HelloWorld("hello".to_string()))
.await?;
let crawl_scheduler = CrawlSchedulerHandle::new(
pool.clone(),
client.clone(),
@ -150,6 +164,7 @@ async fn main() -> Result<()> {
importer,
imports,
mailer,
apalis,
})
.layer(ServiceBuilder::new().layer(TraceLayer::new_for_http()))
.layer(auth_layer)

36
src/bin/worker.rs Normal file
View File

@ -0,0 +1,36 @@
use anyhow::Result;
use apalis::layers::tracing::TraceLayer;
use apalis::prelude::*;
use apalis::redis::RedisStorage;
use clap::Parser;
use dotenvy::dotenv;
use lib::config::Config;
use lib::jobs::AsyncJob;
use lib::log::init_worker_tracing;
pub async fn worker_fn(job: AsyncJob) {
tracing::info!(job = ?job, "Hello, world!");
}
#[tokio::main]
async fn main() -> Result<()> {
dotenv().ok();
let config = Config::parse();
let _guard = init_worker_tracing()?;
let redis_conn = apalis::redis::connect(config.redis_url.clone()).await?;
let apalis_config = apalis::redis::Config::default();
let apalis: RedisStorage<AsyncJob> = RedisStorage::new_with_config(redis_conn, apalis_config);
Monitor::<TokioExecutor>::new()
.register_with_count(2, {
WorkerBuilder::new("worker")
.layer(TraceLayer::new())
.with_storage(apalis.clone())
.build_fn(worker_fn)
})
.run()
.await
.unwrap();
Ok(())
}

11
src/jobs/mod.rs Normal file
View File

@ -0,0 +1,11 @@
use apalis::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Clone)]
pub enum AsyncJob {
HelloWorld(String),
}
impl Job for AsyncJob {
const NAME: &'static str = "apalis::AsyncJob";
}

View File

@ -7,6 +7,7 @@ pub mod error;
pub mod handlers;
pub mod headers;
pub mod htmx;
pub mod jobs;
pub mod log;
pub mod mailers;
pub mod models;

View File

@ -91,3 +91,17 @@ pub fn init_tracing(
.init();
Ok((file_writer_guard, mem_writer_guard))
}
pub fn init_worker_tracing() -> Result<WorkerGuard> {
let stdout_layer = tracing_subscriber::fmt::layer().pretty();
let filter_layer = EnvFilter::from_default_env();
let file_appender = tracing_appender::rolling::hourly("./logs", "log");
let (file_writer, file_writer_guard) = tracing_appender::non_blocking(file_appender);
let file_writer_layer = tracing_subscriber::fmt::layer().with_writer(file_writer);
tracing_subscriber::registry()
.with(filter_layer)
.with(stdout_layer)
.with(file_writer_layer)
.init();
Ok(file_writer_guard)
}

View File

@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::sync::Arc;
use apalis::redis::RedisStorage;
use axum::extract::FromRef;
use bytes::Bytes;
use lettre::SmtpTransport;
@ -9,10 +10,11 @@ use sqlx::PgPool;
use tokio::sync::{broadcast, watch, Mutex};
use uuid::Uuid;
use crate::actors::importer::{ImporterHandle, ImporterHandleMessage};
use crate::actors::crawl_scheduler::{CrawlSchedulerHandle, CrawlSchedulerHandleMessage};
use crate::actors::importer::{ImporterHandle, ImporterHandleMessage};
use crate::config::Config;
use crate::domain_locks::DomainLocks;
use crate::jobs::AsyncJob;
/// A map of feed IDs to a channel receiver for the active `CrawlScheduler` running a feed crawl
/// for that feed.
@ -28,12 +30,12 @@ pub type Crawls = Arc<Mutex<HashMap<Uuid, broadcast::Receiver<CrawlSchedulerHand
/// A map of unique import IDs to a channel receiver for the active `Importer` running that import.
///
/// Same as the `Crawls` map, the only purpose of this is to keep track of active imports so that
/// axum handlers can subscribe to the result of the import via the receiver channel which are then
/// Same as the `Crawls` map, the only purpose of this is to keep track of active imports so that
/// axum handlers can subscribe to the result of the import via the receiver channel which are then
/// sent to end-users as a stream of server-sent events.
///
/// This map should only contain imports that have just been created but not yet subscribed to.
/// Entries are only added when a user adds uploads an OPML to import and entries are removed by
/// Entries are only added when a user adds uploads an OPML to import and entries are removed by
/// the same user once a server-sent event connection is established.
pub type Imports = Arc<Mutex<HashMap<Uuid, broadcast::Receiver<ImporterHandleMessage>>>>;
@ -49,6 +51,7 @@ pub struct AppState {
pub importer: ImporterHandle,
pub imports: Imports,
pub mailer: SmtpTransport,
pub apalis: RedisStorage<AsyncJob>,
}
impl FromRef<AppState> for PgPool {
@ -110,3 +113,9 @@ impl FromRef<AppState> for SmtpTransport {
state.mailer.clone()
}
}
impl FromRef<AppState> for RedisStorage<AsyncJob> {
fn from_ref(state: &AppState) -> Self {
state.apalis.clone()
}
}