From 6713a7a440f90cd1ae35a3fa9542e4fab508563f Mon Sep 17 00:00:00 2001 From: Tyler Hallada Date: Sat, 3 Jun 2023 19:03:58 -0400 Subject: [PATCH] Complete log stream implementation Sets up a watch channel to send tracing lines from tracing-subscriber to receivers in a axum handler which streams Server Sent Events to any number of connected /log/stream clients. --- Cargo.lock | 3 +++ Cargo.toml | 2 ++ src/handlers/log.rs | 39 ++++++++++++++++++++++++++++++++++++--- src/log.rs | 12 +++++++++--- src/main.rs | 8 ++++++-- src/state.rs | 10 ++++++++++ 6 files changed, 66 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ae9d2fa..b03719d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -255,6 +255,7 @@ dependencies = [ "anyhow", "argh", "axum", + "bytes", "chrono", "dotenvy", "feed-rs", @@ -267,6 +268,7 @@ dependencies = [ "sqlx", "thiserror", "tokio", + "tokio-stream", "tower", "tower-http", "tower-livereload", @@ -2051,6 +2053,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 3e4e6a7..84523e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ path = "src/lib.rs" anyhow = "1" argh = "0.1" axum = "0.6" +bytes = "1.4" chrono = { version = "0.4", features = ["serde"] } dotenvy = "0.15" feed-rs = "1.3" @@ -32,6 +33,7 @@ sqlx = { version = "0.6", features = [ ] } thiserror = "1" tokio = { version = "1", features = ["full"] } +tokio-stream = { version = "0.1", features = ["sync"] } tower = "0.4" tower-livereload = "0.7" tower-http = { version = "0.4", features = ["trace"] } diff --git a/src/handlers/log.rs b/src/handlers/log.rs index e064cb3..414cad6 100644 --- a/src/handlers/log.rs +++ b/src/handlers/log.rs @@ -1,13 +1,46 @@ -use axum::response::Response; +use std::convert::Infallible; +use std::time::Duration; + +use axum::extract::State; +use axum::response::{ + sse::{Event, Sse}, + Response, +}; +use bytes::Bytes; use maud::html; +use tokio::sync::watch::Receiver; +use tokio_stream::wrappers::WatchStream; +use tokio_stream::Stream; +use tokio_stream::StreamExt; use crate::error::Result; -use crate::partials::layout::Layout; use crate::log::MEM_LOG; +use crate::partials::layout::Layout; pub async fn get(layout: Layout) -> Result { let mem_buf = MEM_LOG.lock().unwrap(); Ok(layout.render(html! { - pre { (std::str::from_utf8(mem_buf.as_slices().0).unwrap()) } + turbo-stream-source src="/log/stream" {} + pre id="log" { (std::str::from_utf8(mem_buf.as_slices().0).unwrap()) } })) } + +pub async fn stream( + State(log_receiver): State>, +) -> Sse>> { + let log_stream = WatchStream::new(log_receiver); + let log_stream = log_stream.map(|line| { + Ok(Event::default().data(html! { + turbo-stream action="append" target="log" { + template { + (std::str::from_utf8(&line).unwrap()) + } + } + }.into_string())) + }); + Sse::new(log_stream).keep_alive( + axum::response::sse::KeepAlive::new() + .interval(Duration::from_secs(15)) + .text("keep-alive-text"), + ) +} diff --git a/src/log.rs b/src/log.rs index f4e4afa..ab77657 100644 --- a/src/log.rs +++ b/src/log.rs @@ -2,7 +2,9 @@ use std::sync::Mutex; use std::{io::Write, collections::VecDeque}; use anyhow::Result; +use bytes::Bytes; use once_cell::sync::Lazy; +use tokio::sync::watch::Sender; use tracing_appender::non_blocking::WorkerGuard; use tracing_subscriber::prelude::*; use tracing_subscriber::{fmt::format, EnvFilter}; @@ -25,13 +27,15 @@ pub static MEM_LOG: Lazy>> = Lazy::new(|| Mutex::new(VecDeque /// to write will make the buffer exceed `max`. struct LimitedInMemoryBuffer { pub buf: &'static Mutex>, + sender: Sender, max: usize, } impl LimitedInMemoryBuffer { - fn new(buf: &'static Mutex>, max: usize) -> Self { + fn new(buf: &'static Mutex>, sender: Sender, max: usize) -> Self { Self { buf, + sender, max, } } @@ -59,6 +63,8 @@ impl Write for LimitedInMemoryBuffer { fn write(&mut self, buf: &[u8]) -> std::io::Result { // if self.buf is too big, truncate it to the closest newline starting from the front self.truncate(); + let bytes = Bytes::copy_from_slice(buf); + self.sender.send(bytes).ok(); let mut mem_buf = self.buf.lock().unwrap(); mem_buf.write(buf) } @@ -69,12 +75,12 @@ impl Write for LimitedInMemoryBuffer { } } -pub fn init_tracing(config: &Config) -> Result<(WorkerGuard, WorkerGuard)> { +pub fn init_tracing(config: &Config, log_sender: Sender) -> Result<(WorkerGuard, WorkerGuard)> { let fmt_layer = tracing_subscriber::fmt::layer(); let filter_layer = EnvFilter::from_default_env(); let file_appender = tracing_appender::rolling::hourly("./logs", "log"); let (file_writer, file_writer_guard) = tracing_appender::non_blocking(file_appender); - let mem_writer = LimitedInMemoryBuffer::new(&MEM_LOG, config.max_mem_log_size); + let mem_writer = LimitedInMemoryBuffer::new(&MEM_LOG, log_sender, config.max_mem_log_size); let (mem_writer, mem_writer_guard) = tracing_appender::non_blocking(mem_writer); let file_writer_layer = tracing_subscriber::fmt::layer() .with_writer(file_writer) diff --git a/src/main.rs b/src/main.rs index 47b1810..36d9446 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,9 +5,11 @@ use axum::{ routing::{get, post}, Router, }; +use bytes::Bytes; use dotenvy::dotenv; use notify::Watcher; use sqlx::postgres::PgPoolOptions; +use tokio::sync::watch::channel; use tower::ServiceBuilder; use tower_http::trace::TraceLayer; use tower_livereload::LiveReloadLayer; @@ -24,7 +26,8 @@ async fn main() -> Result<()> { let config = Config::new()?; - let _guards = init_tracing(&config)?; + let (log_sender, log_receiver) = channel::(Bytes::new()); + let _guards = init_tracing(&config, log_sender)?; let pool = PgPoolOptions::new() .max_connections(config.database_max_connections) @@ -45,7 +48,8 @@ async fn main() -> Result<()> { .route("/feeds", get(handlers::feeds::get)) .route("/entry/:id", get(handlers::entry::get)) .route("/log", get(handlers::log::get)) - .with_state(AppState { pool, config }) + .route("/log/stream", get(handlers::log::stream)) + .with_state(AppState { pool, config, log_receiver }) .layer(ServiceBuilder::new().layer(TraceLayer::new_for_http())); #[cfg(debug_assertions)] diff --git a/src/state.rs b/src/state.rs index a08c9e4..7cc5d94 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,4 +1,7 @@ +use tokio::sync::watch::Receiver; + use axum::extract::FromRef; +use bytes::Bytes; use sqlx::PgPool; use crate::config::Config; @@ -7,6 +10,7 @@ use crate::config::Config; pub struct AppState { pub pool: PgPool, pub config: Config, + pub log_receiver: Receiver, } impl FromRef for PgPool { @@ -20,3 +24,9 @@ impl FromRef for Config { state.config.clone() } } + +impl FromRef for Receiver { + fn from_ref(state: &AppState) -> Self { + state.log_receiver.clone() + } +}