Remove self from Crawls and Imports in actors at end of task

In case the user never listens to the stream so that I do not create
inifinitely growing hashmaps in the server memory.
This commit is contained in:
Tyler Hallada 2023-08-29 23:30:00 -04:00
parent ceac234ce7
commit 276f0e17a8
7 changed files with 40 additions and 24 deletions

View File

@ -12,6 +12,7 @@ use uuid::Uuid;
use crate::actors::feed_crawler::{FeedCrawlerError, FeedCrawlerHandle, FeedCrawlerHandleMessage}; use crate::actors::feed_crawler::{FeedCrawlerError, FeedCrawlerHandle, FeedCrawlerHandleMessage};
use crate::domain_locks::DomainLocks; use crate::domain_locks::DomainLocks;
use crate::models::feed::{Feed, GetFeedsOptions}; use crate::models::feed::{Feed, GetFeedsOptions};
use crate::state::Crawls;
struct CrawlScheduler { struct CrawlScheduler {
receiver: mpsc::Receiver<CrawlSchedulerMessage>, receiver: mpsc::Receiver<CrawlSchedulerMessage>,
@ -19,6 +20,7 @@ struct CrawlScheduler {
client: Client, client: Client,
domain_locks: DomainLocks, domain_locks: DomainLocks,
content_dir: String, content_dir: String,
crawls: Crawls,
} }
#[derive(Debug)] #[derive(Debug)]
@ -61,6 +63,7 @@ impl CrawlScheduler {
client: Client, client: Client,
domain_locks: DomainLocks, domain_locks: DomainLocks,
content_dir: String, content_dir: String,
crawls: Crawls,
) -> Self { ) -> Self {
CrawlScheduler { CrawlScheduler {
receiver, receiver,
@ -68,6 +71,7 @@ impl CrawlScheduler {
client, client,
domain_locks, domain_locks,
content_dir, content_dir,
crawls,
} }
} }
@ -123,11 +127,7 @@ impl CrawlScheduler {
let crawl_interval = Duration::from_secs(feed.crawl_interval_minutes as u64 * 60); let crawl_interval = Duration::from_secs(feed.crawl_interval_minutes as u64 * 60);
let mut interval = tokio::time::interval(crawl_interval); let mut interval = tokio::time::interval(crawl_interval);
if let Some(last_crawled_at) = feed.last_crawled_at { if let Some(last_crawled_at) = feed.last_crawled_at {
dbg!(last_crawled_at);
dbg!(Utc::now());
if let Ok(duration_since_last_crawl) = (Utc::now() - last_crawled_at).to_std() { if let Ok(duration_since_last_crawl) = (Utc::now() - last_crawled_at).to_std() {
dbg!(duration_since_last_crawl);
dbg!(crawl_interval);
if duration_since_last_crawl < crawl_interval { if duration_since_last_crawl < crawl_interval {
info!( info!(
"last crawled at {:?}, crawling again in {:?}", "last crawled at {:?}, crawling again in {:?}",
@ -146,6 +146,7 @@ impl CrawlScheduler {
self.client.clone(), self.client.clone(),
self.domain_locks.clone(), self.domain_locks.clone(),
self.content_dir.clone(), self.content_dir.clone(),
self.crawls.clone(),
); );
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
@ -247,9 +248,11 @@ impl CrawlSchedulerHandle {
client: Client, client: Client,
domain_locks: DomainLocks, domain_locks: DomainLocks,
content_dir: String, content_dir: String,
crawls: Crawls,
) -> Self { ) -> Self {
let (sender, receiver) = mpsc::channel(8); let (sender, receiver) = mpsc::channel(8);
let mut scheduler = CrawlScheduler::new(receiver, pool, client, domain_locks, content_dir); let mut scheduler =
CrawlScheduler::new(receiver, pool, client, domain_locks, content_dir, crawls);
tokio::spawn(async move { scheduler.run().await }); tokio::spawn(async move { scheduler.run().await });
Self { sender } Self { sender }

View File

@ -20,6 +20,7 @@ use crate::actors::entry_crawler::{
use crate::domain_locks::DomainLocks; use crate::domain_locks::DomainLocks;
use crate::models::entry::{CreateEntry, Entry}; use crate::models::entry::{CreateEntry, Entry};
use crate::models::feed::{Feed, MAX_CRAWL_INTERVAL_MINUTES, MIN_CRAWL_INTERVAL_MINUTES}; use crate::models::feed::{Feed, MAX_CRAWL_INTERVAL_MINUTES, MIN_CRAWL_INTERVAL_MINUTES};
use crate::state::Crawls;
use crate::uuid::Base62Uuid; use crate::uuid::Base62Uuid;
/// The `FeedCrawler` actor fetches a feed url, parses it, and saves it to the database. /// The `FeedCrawler` actor fetches a feed url, parses it, and saves it to the database.
@ -34,6 +35,7 @@ struct FeedCrawler {
client: Client, client: Client,
domain_locks: DomainLocks, domain_locks: DomainLocks,
content_dir: String, content_dir: String,
crawls: Crawls,
} }
#[derive(Debug)] #[derive(Debug)]
@ -78,6 +80,7 @@ impl FeedCrawler {
client: Client, client: Client,
domain_locks: DomainLocks, domain_locks: DomainLocks,
content_dir: String, content_dir: String,
crawls: Crawls,
) -> Self { ) -> Self {
FeedCrawler { FeedCrawler {
receiver, receiver,
@ -85,6 +88,7 @@ impl FeedCrawler {
client, client,
domain_locks, domain_locks,
content_dir, content_dir,
crawls,
} }
} }
@ -281,6 +285,10 @@ impl FeedCrawler {
respond_to, respond_to,
} => { } => {
let result = self.crawl_feed(feed_id, respond_to.clone()).await; let result = self.crawl_feed(feed_id, respond_to.clone()).await;
{
let mut crawls = self.crawls.lock().await;
crawls.remove(&feed_id);
}
if let Err(error) = &result { if let Err(error) = &result {
match Feed::update_crawl_error(&self.pool, feed_id, format!("{}", error)).await match Feed::update_crawl_error(&self.pool, feed_id, format!("{}", error)).await
{ {
@ -332,9 +340,11 @@ impl FeedCrawlerHandle {
client: Client, client: Client,
domain_locks: DomainLocks, domain_locks: DomainLocks,
content_dir: String, content_dir: String,
crawls: Crawls,
) -> Self { ) -> Self {
let (sender, receiver) = mpsc::channel(8); let (sender, receiver) = mpsc::channel(8);
let mut crawler = FeedCrawler::new(receiver, pool, client, domain_locks, content_dir); let mut crawler =
FeedCrawler::new(receiver, pool, client, domain_locks, content_dir, crawls);
tokio::spawn(async move { crawler.run().await }); tokio::spawn(async move { crawler.run().await });
Self { sender } Self { sender }

View File

@ -2,8 +2,6 @@ use std::fmt::{self, Display, Formatter};
use std::io::Cursor; use std::io::Cursor;
use bytes::Bytes; use bytes::Bytes;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use opml::OPML; use opml::OPML;
use sqlx::PgPool; use sqlx::PgPool;
use tokio::sync::{broadcast, mpsc}; use tokio::sync::{broadcast, mpsc};
@ -11,8 +9,8 @@ use tracing::{debug, error, instrument};
use uuid::Uuid; use uuid::Uuid;
use crate::actors::crawl_scheduler::{CrawlSchedulerHandle, CrawlSchedulerHandleMessage}; use crate::actors::crawl_scheduler::{CrawlSchedulerHandle, CrawlSchedulerHandleMessage};
use crate::actors::feed_crawler::FeedCrawlerHandleMessage;
use crate::models::feed::{Feed, UpsertFeed}; use crate::models::feed::{Feed, UpsertFeed};
use crate::state::Imports;
use crate::uuid::Base62Uuid; use crate::uuid::Base62Uuid;
/// The `Importer` actor parses OPML bytes, loops through the document to find all feed URLs, then /// The `Importer` actor parses OPML bytes, loops through the document to find all feed URLs, then
@ -26,12 +24,13 @@ struct Importer {
receiver: mpsc::Receiver<ImporterMessage>, receiver: mpsc::Receiver<ImporterMessage>,
pool: PgPool, pool: PgPool,
crawl_scheduler: CrawlSchedulerHandle, crawl_scheduler: CrawlSchedulerHandle,
imports: Imports,
} }
#[derive(Debug)] #[derive(Debug)]
enum ImporterMessage { enum ImporterMessage {
Import { Import {
import_id: Base62Uuid, import_id: Uuid,
file_name: Option<String>, file_name: Option<String>,
bytes: Bytes, bytes: Bytes,
respond_to: broadcast::Sender<ImporterHandleMessage>, respond_to: broadcast::Sender<ImporterHandleMessage>,
@ -75,24 +74,27 @@ impl Importer {
receiver: mpsc::Receiver<ImporterMessage>, receiver: mpsc::Receiver<ImporterMessage>,
pool: PgPool, pool: PgPool,
crawl_scheduler: CrawlSchedulerHandle, crawl_scheduler: CrawlSchedulerHandle,
imports: Imports,
) -> Self { ) -> Self {
Importer { Importer {
receiver, receiver,
pool, pool,
crawl_scheduler, crawl_scheduler,
imports,
} }
} }
#[instrument(skip_all, fields(import_id = %import_id, file_name = ?file_name))] #[instrument(skip_all, fields(import_id = %import_id, file_name = ?file_name))]
async fn import_opml( async fn import_opml(
&self, &self,
import_id: Base62Uuid, import_id: Uuid,
file_name: Option<String>, file_name: Option<String>,
bytes: Bytes, bytes: Bytes,
respond_to: broadcast::Sender<ImporterHandleMessage>, respond_to: broadcast::Sender<ImporterHandleMessage>,
) -> ImporterResult<()> { ) -> ImporterResult<()> {
let document = OPML::from_reader(&mut Cursor::new(bytes)) let document = OPML::from_reader(&mut Cursor::new(bytes)).map_err(|_| {
.map_err(|_| ImporterError::InvalidOPML(file_name.unwrap_or(import_id.to_string())))?; ImporterError::InvalidOPML(file_name.unwrap_or(Base62Uuid::from(import_id).to_string()))
})?;
for url in Self::gather_feed_urls(document.body.outlines) { for url in Self::gather_feed_urls(document.body.outlines) {
let feed = Feed::upsert( let feed = Feed::upsert(
&self.pool, &self.pool,
@ -138,6 +140,10 @@ impl Importer {
let result = self let result = self
.import_opml(import_id, file_name, bytes, respond_to.clone()) .import_opml(import_id, file_name, bytes, respond_to.clone())
.await; .await;
{
let mut imports = self.imports.lock().await;
imports.remove(&import_id);
}
// ignore the result since the initiator may have cancelled waiting for the // ignore the result since the initiator may have cancelled waiting for the
// response, and that is ok // response, and that is ok
@ -171,16 +177,15 @@ pub struct ImporterHandle {
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum ImporterHandleMessage { pub enum ImporterHandleMessage {
// TODO: send stats of import or forward crawler messages?
Import(ImporterResult<()>), Import(ImporterResult<()>),
CrawlScheduler(CrawlSchedulerHandleMessage), CrawlScheduler(CrawlSchedulerHandleMessage),
} }
impl ImporterHandle { impl ImporterHandle {
/// Creates an async actor task that will listen for messages on the `sender` channel. /// Creates an async actor task that will listen for messages on the `sender` channel.
pub fn new(pool: PgPool, crawl_scheduler: CrawlSchedulerHandle) -> Self { pub fn new(pool: PgPool, crawl_scheduler: CrawlSchedulerHandle, imports: Imports) -> Self {
let (sender, receiver) = mpsc::channel(8); let (sender, receiver) = mpsc::channel(8);
let mut importer = Importer::new(receiver, pool, crawl_scheduler); let mut importer = Importer::new(receiver, pool, crawl_scheduler, imports);
tokio::spawn(async move { importer.run().await }); tokio::spawn(async move { importer.run().await });
Self { sender } Self { sender }
@ -191,7 +196,7 @@ impl ImporterHandle {
/// Listen to the result of the import via the returned `broadcast::Receiver`. /// Listen to the result of the import via the returned `broadcast::Receiver`.
pub async fn import( pub async fn import(
&self, &self,
import_id: Base62Uuid, import_id: Uuid,
file_name: Option<String>, file_name: Option<String>,
bytes: Bytes, bytes: Bytes,
) -> broadcast::Receiver<ImporterHandleMessage> { ) -> broadcast::Receiver<ImporterHandleMessage> {

View File

@ -98,6 +98,7 @@ pub async fn main() -> Result<()> {
.max_connections(env::var("DATABASE_MAX_CONNECTIONS")?.parse()?) .max_connections(env::var("DATABASE_MAX_CONNECTIONS")?.parse()?)
.connect(&env::var("DATABASE_URL")?) .connect(&env::var("DATABASE_URL")?)
.await?; .await?;
let crawls = Arc::new(Mutex::new(HashMap::new()));
let cli: Cli = Cli::parse(); let cli: Cli = Cli::parse();
@ -147,6 +148,7 @@ pub async fn main() -> Result<()> {
client.clone(), client.clone(),
domain_locks.clone(), domain_locks.clone(),
env::var("CONTENT_DIR")?, env::var("CONTENT_DIR")?,
crawls.clone(),
); );
let _ = feed_crawler.crawl(id).await; let _ = feed_crawler.crawl(id).await;
} }

View File

@ -27,7 +27,7 @@ pub async fn opml(
let import_id = Base62Uuid::new(); let import_id = Base62Uuid::new();
let file_name = field.file_name().map(|s| s.to_string()); let file_name = field.file_name().map(|s| s.to_string());
let bytes = field.bytes().await?; let bytes = field.bytes().await?;
let receiver = importer.import(import_id, file_name, bytes).await; let receiver = importer.import(import_id.as_uuid(), file_name, bytes).await;
{ {
let mut imports = imports.lock().await; let mut imports = imports.lock().await;
imports.insert(import_id.as_uuid(), receiver); imports.insert(import_id.as_uuid(), receiver);

View File

@ -66,11 +66,13 @@ async fn main() -> Result<()> {
client.clone(), client.clone(),
domain_locks.clone(), domain_locks.clone(),
config.content_dir.clone(), config.content_dir.clone(),
crawls.clone(),
); );
let _ = crawl_scheduler.bootstrap().await; let _ = crawl_scheduler.bootstrap().await;
let importer = ImporterHandle::new( let importer = ImporterHandle::new(
pool.clone(), pool.clone(),
crawl_scheduler.clone(), crawl_scheduler.clone(),
imports.clone(),
); );
let addr = format!("{}:{}", &config.host, &config.port).parse()?; let addr = format!("{}:{}", &config.host, &config.port).parse()?;

View File

@ -24,9 +24,6 @@ use crate::domain_locks::DomainLocks;
/// This map should only contain crawls that have just been created but not yet subscribed to. /// This map should only contain crawls that have just been created but not yet subscribed to.
/// Entries are only added when a user adds a feed in the UI and entries are removed by the same /// Entries are only added when a user adds a feed in the UI and entries are removed by the same
/// user once a server-sent event connection is established. /// user once a server-sent event connection is established.
///
/// TODO: remove the entries in the CrawlScheduler once the crawl is complete if the user never
/// requested the stream to remove it themselves.
pub type Crawls = Arc<Mutex<HashMap<Uuid, broadcast::Receiver<CrawlSchedulerHandleMessage>>>>; pub type Crawls = Arc<Mutex<HashMap<Uuid, broadcast::Receiver<CrawlSchedulerHandleMessage>>>>;
/// A map of unique import IDs to a channel receiver for the active `Importer` running that import. /// A map of unique import IDs to a channel receiver for the active `Importer` running that import.
@ -38,9 +35,6 @@ pub type Crawls = Arc<Mutex<HashMap<Uuid, broadcast::Receiver<CrawlSchedulerHand
/// This map should only contain imports that have just been created but not yet subscribed to. /// This map should only contain imports that have just been created but not yet subscribed to.
/// Entries are only added when a user adds uploads an OPML to import and entries are removed by /// Entries are only added when a user adds uploads an OPML to import and entries are removed by
/// the same user once a server-sent event connection is established. /// the same user once a server-sent event connection is established.
///
/// TODO: remove the entries in the Importer once the crawl is complete if the user never requested
/// the stream to remove it themselves.
pub type Imports = Arc<Mutex<HashMap<Uuid, broadcast::Receiver<ImporterHandleMessage>>>>; pub type Imports = Arc<Mutex<HashMap<Uuid, broadcast::Receiver<ImporterHandleMessage>>>>;
#[derive(Clone)] #[derive(Clone)]