Added importer actor, file upload still not working
This commit is contained in:
201
src/actors/importer.rs
Normal file
201
src/actors/importer.rs
Normal file
@@ -0,0 +1,201 @@
|
||||
use std::fmt::{self, Display, Formatter};
|
||||
use std::io::Cursor;
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::StreamExt;
|
||||
use opml::OPML;
|
||||
use sqlx::PgPool;
|
||||
use tokio::sync::{broadcast, mpsc};
|
||||
use tracing::{debug, error, instrument};
|
||||
|
||||
use crate::actors::crawl_scheduler::{CrawlSchedulerHandle, CrawlSchedulerHandleMessage};
|
||||
use crate::models::feed::{Feed, UpsertFeed};
|
||||
use crate::uuid::Base62Uuid;
|
||||
|
||||
/// The `Importer` actor parses OPML bytes, loops through the document to find all feed URLs, then
|
||||
/// creates a DB entry for each and initiates a new crawl if the feed is new.
|
||||
///
|
||||
/// It receives `ImporterMessage` messages via the `receiver` channel. It communicates back to
|
||||
/// the sender of those messages via the `respond_to` channel on the `ImporterMessage`.
|
||||
///
|
||||
/// `Importer` should not be instantiated directly. Instead, use the `ImporterHandle`.
|
||||
struct Importer {
|
||||
receiver: mpsc::Receiver<ImporterMessage>,
|
||||
pool: PgPool,
|
||||
crawl_scheduler: CrawlSchedulerHandle,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum ImporterMessage {
|
||||
Import {
|
||||
import_id: Base62Uuid,
|
||||
file_name: Option<String>,
|
||||
bytes: Bytes,
|
||||
respond_to: broadcast::Sender<ImporterHandleMessage>,
|
||||
},
|
||||
}
|
||||
|
||||
impl Display for ImporterMessage {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
ImporterMessage::Import {
|
||||
import_id, bytes, ..
|
||||
} => write!(f, "Import({}: {} bytes)", import_id, bytes.len()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An error type that enumerates possible failures during a crawl and is cloneable and can be sent
|
||||
/// across threads (does not reference the originating Errors which are usually not cloneable).
|
||||
#[derive(thiserror::Error, Debug, Clone)]
|
||||
pub enum ImporterError {
|
||||
#[error("invalid OPML file: {0}")]
|
||||
InvalidOPML(String),
|
||||
#[error("failed to create feed: {0}")]
|
||||
CreateFeedError(String),
|
||||
}
|
||||
pub type ImporterResult<T, E = ImporterError> = ::std::result::Result<T, E>;
|
||||
|
||||
impl Importer {
|
||||
fn new(
|
||||
receiver: mpsc::Receiver<ImporterMessage>,
|
||||
pool: PgPool,
|
||||
crawl_scheduler: CrawlSchedulerHandle,
|
||||
) -> Self {
|
||||
Importer {
|
||||
receiver,
|
||||
pool,
|
||||
crawl_scheduler,
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(import_id = %import_id, file_name = ?file_name))]
|
||||
async fn import_opml(
|
||||
&self,
|
||||
import_id: Base62Uuid,
|
||||
file_name: Option<String>,
|
||||
bytes: Bytes,
|
||||
respond_to: broadcast::Sender<ImporterHandleMessage>,
|
||||
) -> ImporterResult<()> {
|
||||
let document = OPML::from_reader(&mut Cursor::new(bytes))
|
||||
.map_err(|_| ImporterError::InvalidOPML(file_name.unwrap_or(import_id.to_string())))?;
|
||||
let mut receivers = Vec::new();
|
||||
for url in Self::gather_feed_urls(document.body.outlines) {
|
||||
let feed = Feed::upsert(
|
||||
&self.pool,
|
||||
UpsertFeed {
|
||||
url: url.clone(),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await
|
||||
.map_err(|_| ImporterError::CreateFeedError(url))?;
|
||||
if feed.updated_at.is_some() {
|
||||
receivers.push(self.crawl_scheduler.schedule(feed.feed_id).await);
|
||||
}
|
||||
}
|
||||
|
||||
let mut future_recvs: FuturesUnordered<_> =
|
||||
receivers.iter_mut().map(|rx| rx.recv()).collect();
|
||||
|
||||
while let Some(result) = future_recvs.next().await {
|
||||
if let Ok(crawl_scheduler_msg) = result {
|
||||
let _ = respond_to.send(ImporterHandleMessage::CrawlScheduler(crawl_scheduler_msg));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn gather_feed_urls(outlines: Vec<opml::Outline>) -> Vec<String> {
|
||||
let mut urls = Vec::new();
|
||||
for outline in outlines.into_iter() {
|
||||
if let Some(url) = outline.xml_url {
|
||||
urls.push(url);
|
||||
}
|
||||
urls.append(&mut Self::gather_feed_urls(outline.outlines));
|
||||
}
|
||||
urls
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(msg = %msg))]
|
||||
async fn handle_message(&mut self, msg: ImporterMessage) {
|
||||
match msg {
|
||||
ImporterMessage::Import {
|
||||
import_id,
|
||||
file_name,
|
||||
bytes,
|
||||
respond_to,
|
||||
} => {
|
||||
let result = self
|
||||
.import_opml(import_id, file_name, bytes, respond_to.clone())
|
||||
.await;
|
||||
|
||||
// ignore the result since the initiator may have cancelled waiting for the
|
||||
// response, and that is ok
|
||||
let _ = respond_to.send(ImporterHandleMessage::Import(result));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn run(&mut self) {
|
||||
debug!("starting importer");
|
||||
while let Some(msg) = self.receiver.recv().await {
|
||||
self.handle_message(msg).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The `ImporterHandle` is used to initialize and communicate with a `Importer` actor.
|
||||
///
|
||||
/// The `Importer` actor parses OPML bytes, loops through the document to find all feed URLs, then
|
||||
/// creates a DB entry for each and initiates a new crawl if the feed is new.
|
||||
#[derive(Clone)]
|
||||
pub struct ImporterHandle {
|
||||
sender: mpsc::Sender<ImporterMessage>,
|
||||
}
|
||||
|
||||
/// The `ImporterHandleMessage` is the response to a `ImporterMessage` sent to the
|
||||
/// `ImporterHandle`.
|
||||
///
|
||||
/// `ImporterHandleMessage::Import` contains the result of importing the OPML file.
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
#[derive(Clone)]
|
||||
pub enum ImporterHandleMessage {
|
||||
// TODO: send stats of import or forward crawler messages?
|
||||
Import(ImporterResult<()>),
|
||||
CrawlScheduler(CrawlSchedulerHandleMessage),
|
||||
}
|
||||
|
||||
impl ImporterHandle {
|
||||
/// Creates an async actor task that will listen for messages on the `sender` channel.
|
||||
pub fn new(pool: PgPool, crawl_scheduler: CrawlSchedulerHandle) -> Self {
|
||||
let (sender, receiver) = mpsc::channel(8);
|
||||
let mut importer = Importer::new(receiver, pool, crawl_scheduler);
|
||||
tokio::spawn(async move { importer.run().await });
|
||||
|
||||
Self { sender }
|
||||
}
|
||||
|
||||
/// Sends a `ImporterMessage::Import` message to the running `Importer` actor.
|
||||
///
|
||||
/// Listen to the result of the import via the returned `broadcast::Receiver`.
|
||||
pub async fn import(
|
||||
&self,
|
||||
import_id: Base62Uuid,
|
||||
file_name: Option<String>,
|
||||
bytes: Bytes,
|
||||
) -> broadcast::Receiver<ImporterHandleMessage> {
|
||||
let (sender, receiver) = broadcast::channel(8);
|
||||
let msg = ImporterMessage::Import {
|
||||
import_id,
|
||||
file_name,
|
||||
bytes,
|
||||
respond_to: sender,
|
||||
};
|
||||
|
||||
self.sender.send(msg).await.expect("importer task has died");
|
||||
receiver
|
||||
}
|
||||
}
|
||||
@@ -1,3 +1,4 @@
|
||||
pub mod crawl_scheduler;
|
||||
pub mod entry_crawler;
|
||||
pub mod feed_crawler;
|
||||
pub mod importer;
|
||||
|
||||
10
src/error.rs
10
src/error.rs
@@ -1,3 +1,4 @@
|
||||
use axum::extract::multipart::MultipartError;
|
||||
use axum::http::StatusCode;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use axum::Json;
|
||||
@@ -26,6 +27,12 @@ pub enum Error {
|
||||
#[error("validation error in request body")]
|
||||
InvalidEntity(#[from] ValidationErrors),
|
||||
|
||||
#[error("error with file upload: (0)")]
|
||||
Upload(#[from] MultipartError),
|
||||
|
||||
#[error("no file uploaded")]
|
||||
NoFile,
|
||||
|
||||
#[error("{0}: {1} not found")]
|
||||
NotFound(&'static str, Uuid),
|
||||
|
||||
@@ -78,7 +85,8 @@ impl Error {
|
||||
InternalServerError | Sqlx(_) | Anyhow(_) | Reqwest(_) => {
|
||||
StatusCode::INTERNAL_SERVER_ERROR
|
||||
}
|
||||
InvalidEntity(_) | RelationNotFound(_) => StatusCode::UNPROCESSABLE_ENTITY,
|
||||
InvalidEntity(_) | RelationNotFound(_) | NoFile => StatusCode::UNPROCESSABLE_ENTITY,
|
||||
Upload(err) => err.status(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -202,7 +202,6 @@ pub async fn stream(
|
||||
Ok(CrawlSchedulerHandleMessage::FeedCrawler(FeedCrawlerHandleMessage::Entry(Ok(_)))) => {
|
||||
Ok(Event::default().data(
|
||||
html! {
|
||||
turbo-stream action="remove" target="feed-stream" {}
|
||||
turbo-stream action="replace" target=(feed_id) {
|
||||
template {
|
||||
li id=(feed_id) { "fetched entry" }
|
||||
@@ -216,7 +215,6 @@ pub async fn stream(
|
||||
error,
|
||||
)))) => Ok(Event::default().data(
|
||||
html! {
|
||||
turbo-stream action="remove" target="feed-stream" {}
|
||||
turbo-stream action="replace" target=(feed_id) {
|
||||
template {
|
||||
li id=(feed_id) { span class="error" { (error) } }
|
||||
|
||||
@@ -31,16 +31,19 @@ pub async fn get(State(pool): State<PgPool>, layout: Layout) -> Result<Response>
|
||||
}
|
||||
div class="add-feed" {
|
||||
h3 { "Add Feed" }
|
||||
form action="/feed" method="post" class="add-feed-form" {
|
||||
form action="/feed" method="post" class="feed-form" {
|
||||
div class="form-grid" {
|
||||
label for="url" { "URL (required): " }
|
||||
label for="url" { "URL: " }
|
||||
input type="text" id="url" name="url" placeholder="https://example.com/feed.xml" required="true";
|
||||
label for="title" { "Title: " }
|
||||
input type="text" id="title" name="title" placeholder="Feed title";
|
||||
label { "Description: " }
|
||||
textarea id="description" name="description" placeholder="Feed description" {}
|
||||
button type="submit" { "Add Feed" }
|
||||
}
|
||||
}
|
||||
form action="/import/opml" method="post" enctype="mulipart/form-data" class="feed-form" {
|
||||
div class="form-grid" {
|
||||
label for="opml" { "OPML: " }
|
||||
input type="file" id="opml" name="opml" required="true" accept="text/x-opml,application/xml,text/xml";
|
||||
button type="submit" { "Import Feeds" }
|
||||
}
|
||||
button type="submit" { "Add Feed" }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
134
src/handlers/import.rs
Normal file
134
src/handlers/import.rs
Normal file
@@ -0,0 +1,134 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use axum::extract::{Multipart, Path, State};
|
||||
use axum::http::StatusCode;
|
||||
use axum::response::sse::{Event, KeepAlive};
|
||||
use axum::response::{IntoResponse, Response, Sse};
|
||||
use maud::html;
|
||||
use tokio_stream::wrappers::BroadcastStream;
|
||||
use tokio_stream::StreamExt;
|
||||
|
||||
use crate::actors::crawl_scheduler::CrawlSchedulerHandleMessage;
|
||||
use crate::actors::feed_crawler::FeedCrawlerHandleMessage;
|
||||
use crate::actors::importer::{ImporterHandle, ImporterHandleMessage};
|
||||
use crate::error::{Error, Result};
|
||||
use crate::partials::feed_link::feed_link;
|
||||
use crate::state::Imports;
|
||||
use crate::turbo_stream::TurboStream;
|
||||
use crate::uuid::Base62Uuid;
|
||||
|
||||
pub async fn opml(
|
||||
State(imports): State<Imports>,
|
||||
State(importer): State<ImporterHandle>,
|
||||
mut multipart: Multipart,
|
||||
) -> Result<Response> {
|
||||
dbg!("opml handler");
|
||||
if let Some(field) = multipart.next_field().await.map_err(|err| { dbg!(&err); err })? {
|
||||
let import_id = Base62Uuid::new();
|
||||
dbg!(&import_id);
|
||||
let file_name = field.file_name().map(|s| s.to_string());
|
||||
dbg!(&file_name);
|
||||
let bytes = field.bytes().await?;
|
||||
dbg!(&bytes.len());
|
||||
let receiver = importer.import(import_id, file_name, bytes).await;
|
||||
{
|
||||
let mut imports = imports.lock().await;
|
||||
imports.insert(import_id.as_uuid(), receiver);
|
||||
}
|
||||
|
||||
let import_html_id = format!("import-{}", import_id);
|
||||
let import_stream = format!("/import/{}/stream", import_id);
|
||||
return Ok((
|
||||
StatusCode::CREATED,
|
||||
TurboStream(
|
||||
html! {
|
||||
turbo-stream-source src=(import_stream) id="import-stream" {}
|
||||
turbo-stream action="append" target="feeds" {
|
||||
template {
|
||||
li id=(import_html_id) { "Importing..." }
|
||||
}
|
||||
}
|
||||
turbo-stream action="remove" target="no-feeds";
|
||||
}
|
||||
.into_string(),
|
||||
),
|
||||
)
|
||||
.into_response());
|
||||
}
|
||||
dbg!("no file");
|
||||
Err(Error::NoFile)
|
||||
}
|
||||
|
||||
pub async fn stream(
|
||||
Path(id): Path<Base62Uuid>,
|
||||
State(imports): State<Imports>,
|
||||
) -> Result<impl IntoResponse> {
|
||||
let receiver = {
|
||||
let mut imports = imports.lock().await;
|
||||
imports.remove(&id.as_uuid())
|
||||
}
|
||||
.ok_or_else(|| Error::NotFound("import stream", id.as_uuid()))?;
|
||||
|
||||
let stream = BroadcastStream::new(receiver);
|
||||
let import_html_id = format!("import-{}", id);
|
||||
let stream = stream.map(move |msg| match msg {
|
||||
Ok(ImporterHandleMessage::Import(Ok(_))) => Ok::<Event, String>(
|
||||
Event::default().data(
|
||||
html! {
|
||||
turbo-stream action="remove" target="import-stream" {}
|
||||
turbo-stream action="replace" target=(import_html_id) {
|
||||
template {
|
||||
li id=(import_html_id) { "Done importing" }
|
||||
}
|
||||
}
|
||||
}
|
||||
.into_string(),
|
||||
),
|
||||
),
|
||||
Ok(ImporterHandleMessage::CrawlScheduler(CrawlSchedulerHandleMessage::FeedCrawler(
|
||||
FeedCrawlerHandleMessage::Feed(Ok(feed)),
|
||||
))) => Ok::<Event, String>(
|
||||
Event::default().data(
|
||||
html! {
|
||||
turbo-stream action="prepend" target="feeds" {
|
||||
template {
|
||||
li id=(format!("feed-{}", feed.feed_id)) { (feed_link(&feed, false)) }
|
||||
}
|
||||
}
|
||||
}
|
||||
.into_string(),
|
||||
),
|
||||
),
|
||||
Ok(ImporterHandleMessage::CrawlScheduler(CrawlSchedulerHandleMessage::FeedCrawler(
|
||||
FeedCrawlerHandleMessage::Feed(Err(error)),
|
||||
))) => Ok::<Event, String>(
|
||||
Event::default().data(
|
||||
html! {
|
||||
turbo-stream action="prepend" target="feeds" {
|
||||
template {
|
||||
li { span class="error" { (error) } }
|
||||
}
|
||||
}
|
||||
}
|
||||
.into_string(),
|
||||
),
|
||||
),
|
||||
Ok(ImporterHandleMessage::Import(Err(error))) => Ok(Event::default().data(
|
||||
html! {
|
||||
turbo-stream action="remove" target="import-stream" {}
|
||||
turbo-stream action="replace" target=(import_html_id) {
|
||||
template {
|
||||
li id=(import_html_id) { span class="error" { (error) } }
|
||||
}
|
||||
}
|
||||
}
|
||||
.into_string(),
|
||||
)),
|
||||
_ => Ok(Event::default()),
|
||||
});
|
||||
Ok(Sse::new(stream).keep_alive(
|
||||
KeepAlive::new()
|
||||
.interval(Duration::from_secs(15))
|
||||
.text("keep-alive-text"),
|
||||
))
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
pub mod api;
|
||||
pub mod entry;
|
||||
pub mod home;
|
||||
pub mod import;
|
||||
pub mod feed;
|
||||
pub mod feeds;
|
||||
pub mod log;
|
||||
|
||||
10
src/main.rs
10
src/main.rs
@@ -24,6 +24,7 @@ use tower_livereload::LiveReloadLayer;
|
||||
use tracing::debug;
|
||||
|
||||
use lib::actors::crawl_scheduler::CrawlSchedulerHandle;
|
||||
use lib::actors::importer::ImporterHandle;
|
||||
use lib::config::Config;
|
||||
use lib::domain_locks::DomainLocks;
|
||||
use lib::handlers;
|
||||
@@ -49,6 +50,7 @@ async fn main() -> Result<()> {
|
||||
let _guards = init_tracing(&config, log_sender)?;
|
||||
|
||||
let crawls = Arc::new(Mutex::new(HashMap::new()));
|
||||
let imports = Arc::new(Mutex::new(HashMap::new()));
|
||||
let domain_locks = DomainLocks::new();
|
||||
let client = Client::builder().user_agent(USER_AGENT).build()?;
|
||||
|
||||
@@ -66,6 +68,10 @@ async fn main() -> Result<()> {
|
||||
config.content_dir.clone(),
|
||||
);
|
||||
let _ = crawl_scheduler.bootstrap().await;
|
||||
let importer = ImporterHandle::new(
|
||||
pool.clone(),
|
||||
crawl_scheduler.clone(),
|
||||
);
|
||||
|
||||
let addr = format!("{}:{}", &config.host, &config.port).parse()?;
|
||||
let mut app = Router::new()
|
||||
@@ -84,6 +90,8 @@ async fn main() -> Result<()> {
|
||||
.route("/entry/:id", get(handlers::entry::get))
|
||||
.route("/log", get(handlers::log::get))
|
||||
.route("/log/stream", get(handlers::log::stream))
|
||||
.route("/import/opml", post(handlers::import::opml))
|
||||
.route("/import/:id/stream", get(handlers::import::stream))
|
||||
.nest_service("/static", ServeDir::new("static"))
|
||||
.with_state(AppState {
|
||||
pool,
|
||||
@@ -93,6 +101,8 @@ async fn main() -> Result<()> {
|
||||
domain_locks,
|
||||
client,
|
||||
crawl_scheduler,
|
||||
importer,
|
||||
imports,
|
||||
})
|
||||
.layer(ServiceBuilder::new().layer(TraceLayer::new_for_http()));
|
||||
|
||||
|
||||
@@ -81,7 +81,7 @@ pub struct CreateFeed {
|
||||
pub description: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Validate)]
|
||||
#[derive(Debug, Deserialize, Default, Validate)]
|
||||
pub struct UpsertFeed {
|
||||
#[validate(length(max = 255))]
|
||||
pub title: Option<String>,
|
||||
|
||||
32
src/state.rs
32
src/state.rs
@@ -9,6 +9,7 @@ use reqwest::Client;
|
||||
use sqlx::PgPool;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::actors::importer::{ImporterHandle, ImporterHandleMessage};
|
||||
use crate::actors::crawl_scheduler::{CrawlSchedulerHandle, CrawlSchedulerHandleMessage};
|
||||
use crate::config::Config;
|
||||
use crate::domain_locks::DomainLocks;
|
||||
@@ -23,8 +24,25 @@ use crate::domain_locks::DomainLocks;
|
||||
/// This map should only contain crawls that have just been created but not yet subscribed to.
|
||||
/// Entries are only added when a user adds a feed in the UI and entries are removed by the same
|
||||
/// user once a server-sent event connection is established.
|
||||
///
|
||||
/// TODO: remove the entries in the CrawlScheduler once the crawl is complete if the user never
|
||||
/// requested the stream to remove it themselves.
|
||||
pub type Crawls = Arc<Mutex<HashMap<Uuid, broadcast::Receiver<CrawlSchedulerHandleMessage>>>>;
|
||||
|
||||
/// A map of unique import IDs to a channel receiver for the active `Importer` running that import.
|
||||
///
|
||||
/// Same as the `Crawls` map, the only purpose of this is to keep track of active imports so that
|
||||
/// axum handlers can subscribe to the result of the import via the receiver channel which are then
|
||||
/// sent to end-users as a stream of server-sent events.
|
||||
///
|
||||
/// This map should only contain imports that have just been created but not yet subscribed to.
|
||||
/// Entries are only added when a user adds uploads an OPML to import and entries are removed by
|
||||
/// the same user once a server-sent event connection is established.
|
||||
///
|
||||
/// TODO: remove the entries in the Importer once the crawl is complete if the user never requested
|
||||
/// the stream to remove it themselves.
|
||||
pub type Imports = Arc<Mutex<HashMap<Uuid, broadcast::Receiver<ImporterHandleMessage>>>>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AppState {
|
||||
pub pool: PgPool,
|
||||
@@ -34,6 +52,8 @@ pub struct AppState {
|
||||
pub domain_locks: DomainLocks,
|
||||
pub client: Client,
|
||||
pub crawl_scheduler: CrawlSchedulerHandle,
|
||||
pub importer: ImporterHandle,
|
||||
pub imports: Imports,
|
||||
}
|
||||
|
||||
impl FromRef<AppState> for PgPool {
|
||||
@@ -77,3 +97,15 @@ impl FromRef<AppState> for CrawlSchedulerHandle {
|
||||
state.crawl_scheduler.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl FromRef<AppState> for ImporterHandle {
|
||||
fn from_ref(state: &AppState) -> Self {
|
||||
state.importer.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl FromRef<AppState> for Imports {
|
||||
fn from_ref(state: &AppState) -> Self {
|
||||
state.imports.clone()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user