From 276f0e17a8ef55ff69b0b7ba9549873a15279a4d Mon Sep 17 00:00:00 2001 From: Tyler Hallada Date: Tue, 29 Aug 2023 23:30:00 -0400 Subject: [PATCH] Remove self from Crawls and Imports in actors at end of task In case the user never listens to the stream so that I do not create inifinitely growing hashmaps in the server memory. --- src/actors/crawl_scheduler.rs | 13 ++++++++----- src/actors/feed_crawler.rs | 12 +++++++++++- src/actors/importer.rs | 27 ++++++++++++++++----------- src/bin/cli.rs | 2 ++ src/handlers/import.rs | 2 +- src/main.rs | 2 ++ src/state.rs | 6 ------ 7 files changed, 40 insertions(+), 24 deletions(-) diff --git a/src/actors/crawl_scheduler.rs b/src/actors/crawl_scheduler.rs index 4213477..aed9c5d 100644 --- a/src/actors/crawl_scheduler.rs +++ b/src/actors/crawl_scheduler.rs @@ -12,6 +12,7 @@ use uuid::Uuid; use crate::actors::feed_crawler::{FeedCrawlerError, FeedCrawlerHandle, FeedCrawlerHandleMessage}; use crate::domain_locks::DomainLocks; use crate::models::feed::{Feed, GetFeedsOptions}; +use crate::state::Crawls; struct CrawlScheduler { receiver: mpsc::Receiver, @@ -19,6 +20,7 @@ struct CrawlScheduler { client: Client, domain_locks: DomainLocks, content_dir: String, + crawls: Crawls, } #[derive(Debug)] @@ -61,6 +63,7 @@ impl CrawlScheduler { client: Client, domain_locks: DomainLocks, content_dir: String, + crawls: Crawls, ) -> Self { CrawlScheduler { receiver, @@ -68,6 +71,7 @@ impl CrawlScheduler { client, domain_locks, content_dir, + crawls, } } @@ -123,11 +127,7 @@ impl CrawlScheduler { let crawl_interval = Duration::from_secs(feed.crawl_interval_minutes as u64 * 60); let mut interval = tokio::time::interval(crawl_interval); if let Some(last_crawled_at) = feed.last_crawled_at { - dbg!(last_crawled_at); - dbg!(Utc::now()); if let Ok(duration_since_last_crawl) = (Utc::now() - last_crawled_at).to_std() { - dbg!(duration_since_last_crawl); - dbg!(crawl_interval); if duration_since_last_crawl < crawl_interval { info!( "last crawled at {:?}, crawling again in {:?}", @@ -146,6 +146,7 @@ impl CrawlScheduler { self.client.clone(), self.domain_locks.clone(), self.content_dir.clone(), + self.crawls.clone(), ); tokio::spawn(async move { loop { @@ -247,9 +248,11 @@ impl CrawlSchedulerHandle { client: Client, domain_locks: DomainLocks, content_dir: String, + crawls: Crawls, ) -> Self { let (sender, receiver) = mpsc::channel(8); - let mut scheduler = CrawlScheduler::new(receiver, pool, client, domain_locks, content_dir); + let mut scheduler = + CrawlScheduler::new(receiver, pool, client, domain_locks, content_dir, crawls); tokio::spawn(async move { scheduler.run().await }); Self { sender } diff --git a/src/actors/feed_crawler.rs b/src/actors/feed_crawler.rs index 757ea75..52727cc 100644 --- a/src/actors/feed_crawler.rs +++ b/src/actors/feed_crawler.rs @@ -20,6 +20,7 @@ use crate::actors::entry_crawler::{ use crate::domain_locks::DomainLocks; use crate::models::entry::{CreateEntry, Entry}; use crate::models::feed::{Feed, MAX_CRAWL_INTERVAL_MINUTES, MIN_CRAWL_INTERVAL_MINUTES}; +use crate::state::Crawls; use crate::uuid::Base62Uuid; /// The `FeedCrawler` actor fetches a feed url, parses it, and saves it to the database. @@ -34,6 +35,7 @@ struct FeedCrawler { client: Client, domain_locks: DomainLocks, content_dir: String, + crawls: Crawls, } #[derive(Debug)] @@ -78,6 +80,7 @@ impl FeedCrawler { client: Client, domain_locks: DomainLocks, content_dir: String, + crawls: Crawls, ) -> Self { FeedCrawler { receiver, @@ -85,6 +88,7 @@ impl FeedCrawler { client, domain_locks, content_dir, + crawls, } } @@ -281,6 +285,10 @@ impl FeedCrawler { respond_to, } => { let result = self.crawl_feed(feed_id, respond_to.clone()).await; + { + let mut crawls = self.crawls.lock().await; + crawls.remove(&feed_id); + } if let Err(error) = &result { match Feed::update_crawl_error(&self.pool, feed_id, format!("{}", error)).await { @@ -332,9 +340,11 @@ impl FeedCrawlerHandle { client: Client, domain_locks: DomainLocks, content_dir: String, + crawls: Crawls, ) -> Self { let (sender, receiver) = mpsc::channel(8); - let mut crawler = FeedCrawler::new(receiver, pool, client, domain_locks, content_dir); + let mut crawler = + FeedCrawler::new(receiver, pool, client, domain_locks, content_dir, crawls); tokio::spawn(async move { crawler.run().await }); Self { sender } diff --git a/src/actors/importer.rs b/src/actors/importer.rs index a5a940e..051fc7d 100644 --- a/src/actors/importer.rs +++ b/src/actors/importer.rs @@ -2,8 +2,6 @@ use std::fmt::{self, Display, Formatter}; use std::io::Cursor; use bytes::Bytes; -use futures::stream::FuturesUnordered; -use futures::StreamExt; use opml::OPML; use sqlx::PgPool; use tokio::sync::{broadcast, mpsc}; @@ -11,8 +9,8 @@ use tracing::{debug, error, instrument}; use uuid::Uuid; use crate::actors::crawl_scheduler::{CrawlSchedulerHandle, CrawlSchedulerHandleMessage}; -use crate::actors::feed_crawler::FeedCrawlerHandleMessage; use crate::models::feed::{Feed, UpsertFeed}; +use crate::state::Imports; use crate::uuid::Base62Uuid; /// The `Importer` actor parses OPML bytes, loops through the document to find all feed URLs, then @@ -26,12 +24,13 @@ struct Importer { receiver: mpsc::Receiver, pool: PgPool, crawl_scheduler: CrawlSchedulerHandle, + imports: Imports, } #[derive(Debug)] enum ImporterMessage { Import { - import_id: Base62Uuid, + import_id: Uuid, file_name: Option, bytes: Bytes, respond_to: broadcast::Sender, @@ -75,24 +74,27 @@ impl Importer { receiver: mpsc::Receiver, pool: PgPool, crawl_scheduler: CrawlSchedulerHandle, + imports: Imports, ) -> Self { Importer { receiver, pool, crawl_scheduler, + imports, } } #[instrument(skip_all, fields(import_id = %import_id, file_name = ?file_name))] async fn import_opml( &self, - import_id: Base62Uuid, + import_id: Uuid, file_name: Option, bytes: Bytes, respond_to: broadcast::Sender, ) -> ImporterResult<()> { - let document = OPML::from_reader(&mut Cursor::new(bytes)) - .map_err(|_| ImporterError::InvalidOPML(file_name.unwrap_or(import_id.to_string())))?; + let document = OPML::from_reader(&mut Cursor::new(bytes)).map_err(|_| { + ImporterError::InvalidOPML(file_name.unwrap_or(Base62Uuid::from(import_id).to_string())) + })?; for url in Self::gather_feed_urls(document.body.outlines) { let feed = Feed::upsert( &self.pool, @@ -138,6 +140,10 @@ impl Importer { let result = self .import_opml(import_id, file_name, bytes, respond_to.clone()) .await; + { + let mut imports = self.imports.lock().await; + imports.remove(&import_id); + } // ignore the result since the initiator may have cancelled waiting for the // response, and that is ok @@ -171,16 +177,15 @@ pub struct ImporterHandle { #[allow(clippy::large_enum_variant)] #[derive(Debug, Clone)] pub enum ImporterHandleMessage { - // TODO: send stats of import or forward crawler messages? Import(ImporterResult<()>), CrawlScheduler(CrawlSchedulerHandleMessage), } impl ImporterHandle { /// Creates an async actor task that will listen for messages on the `sender` channel. - pub fn new(pool: PgPool, crawl_scheduler: CrawlSchedulerHandle) -> Self { + pub fn new(pool: PgPool, crawl_scheduler: CrawlSchedulerHandle, imports: Imports) -> Self { let (sender, receiver) = mpsc::channel(8); - let mut importer = Importer::new(receiver, pool, crawl_scheduler); + let mut importer = Importer::new(receiver, pool, crawl_scheduler, imports); tokio::spawn(async move { importer.run().await }); Self { sender } @@ -191,7 +196,7 @@ impl ImporterHandle { /// Listen to the result of the import via the returned `broadcast::Receiver`. pub async fn import( &self, - import_id: Base62Uuid, + import_id: Uuid, file_name: Option, bytes: Bytes, ) -> broadcast::Receiver { diff --git a/src/bin/cli.rs b/src/bin/cli.rs index fdcc6cf..c1b7146 100644 --- a/src/bin/cli.rs +++ b/src/bin/cli.rs @@ -98,6 +98,7 @@ pub async fn main() -> Result<()> { .max_connections(env::var("DATABASE_MAX_CONNECTIONS")?.parse()?) .connect(&env::var("DATABASE_URL")?) .await?; + let crawls = Arc::new(Mutex::new(HashMap::new())); let cli: Cli = Cli::parse(); @@ -147,6 +148,7 @@ pub async fn main() -> Result<()> { client.clone(), domain_locks.clone(), env::var("CONTENT_DIR")?, + crawls.clone(), ); let _ = feed_crawler.crawl(id).await; } diff --git a/src/handlers/import.rs b/src/handlers/import.rs index 0ff07ec..2abe8e0 100644 --- a/src/handlers/import.rs +++ b/src/handlers/import.rs @@ -27,7 +27,7 @@ pub async fn opml( let import_id = Base62Uuid::new(); let file_name = field.file_name().map(|s| s.to_string()); let bytes = field.bytes().await?; - let receiver = importer.import(import_id, file_name, bytes).await; + let receiver = importer.import(import_id.as_uuid(), file_name, bytes).await; { let mut imports = imports.lock().await; imports.insert(import_id.as_uuid(), receiver); diff --git a/src/main.rs b/src/main.rs index 189fb12..cb8f6a6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -66,11 +66,13 @@ async fn main() -> Result<()> { client.clone(), domain_locks.clone(), config.content_dir.clone(), + crawls.clone(), ); let _ = crawl_scheduler.bootstrap().await; let importer = ImporterHandle::new( pool.clone(), crawl_scheduler.clone(), + imports.clone(), ); let addr = format!("{}:{}", &config.host, &config.port).parse()?; diff --git a/src/state.rs b/src/state.rs index 9bd34cd..0781d94 100644 --- a/src/state.rs +++ b/src/state.rs @@ -24,9 +24,6 @@ use crate::domain_locks::DomainLocks; /// This map should only contain crawls that have just been created but not yet subscribed to. /// Entries are only added when a user adds a feed in the UI and entries are removed by the same /// user once a server-sent event connection is established. -/// -/// TODO: remove the entries in the CrawlScheduler once the crawl is complete if the user never -/// requested the stream to remove it themselves. pub type Crawls = Arc>>>; /// A map of unique import IDs to a channel receiver for the active `Importer` running that import. @@ -38,9 +35,6 @@ pub type Crawls = Arc>>>; #[derive(Clone)]