Complete log stream implementation

Sets up a watch channel to send tracing lines from tracing-subscriber to
receivers in a axum handler which streams Server Sent Events to any
number of connected /log/stream clients.
This commit is contained in:
Tyler Hallada 2023-06-03 19:03:58 -04:00
parent 951d6d23e2
commit 6713a7a440
6 changed files with 66 additions and 8 deletions

3
Cargo.lock generated
View File

@ -255,6 +255,7 @@ dependencies = [
"anyhow",
"argh",
"axum",
"bytes",
"chrono",
"dotenvy",
"feed-rs",
@ -267,6 +268,7 @@ dependencies = [
"sqlx",
"thiserror",
"tokio",
"tokio-stream",
"tower",
"tower-http",
"tower-livereload",
@ -2051,6 +2053,7 @@ dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]

View File

@ -14,6 +14,7 @@ path = "src/lib.rs"
anyhow = "1"
argh = "0.1"
axum = "0.6"
bytes = "1.4"
chrono = { version = "0.4", features = ["serde"] }
dotenvy = "0.15"
feed-rs = "1.3"
@ -32,6 +33,7 @@ sqlx = { version = "0.6", features = [
] }
thiserror = "1"
tokio = { version = "1", features = ["full"] }
tokio-stream = { version = "0.1", features = ["sync"] }
tower = "0.4"
tower-livereload = "0.7"
tower-http = { version = "0.4", features = ["trace"] }

View File

@ -1,13 +1,46 @@
use axum::response::Response;
use std::convert::Infallible;
use std::time::Duration;
use axum::extract::State;
use axum::response::{
sse::{Event, Sse},
Response,
};
use bytes::Bytes;
use maud::html;
use tokio::sync::watch::Receiver;
use tokio_stream::wrappers::WatchStream;
use tokio_stream::Stream;
use tokio_stream::StreamExt;
use crate::error::Result;
use crate::partials::layout::Layout;
use crate::log::MEM_LOG;
use crate::partials::layout::Layout;
pub async fn get(layout: Layout) -> Result<Response> {
let mem_buf = MEM_LOG.lock().unwrap();
Ok(layout.render(html! {
pre { (std::str::from_utf8(mem_buf.as_slices().0).unwrap()) }
turbo-stream-source src="/log/stream" {}
pre id="log" { (std::str::from_utf8(mem_buf.as_slices().0).unwrap()) }
}))
}
pub async fn stream(
State(log_receiver): State<Receiver<Bytes>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let log_stream = WatchStream::new(log_receiver);
let log_stream = log_stream.map(|line| {
Ok(Event::default().data(html! {
turbo-stream action="append" target="log" {
template {
(std::str::from_utf8(&line).unwrap())
}
}
}.into_string()))
});
Sse::new(log_stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(Duration::from_secs(15))
.text("keep-alive-text"),
)
}

View File

@ -2,7 +2,9 @@ use std::sync::Mutex;
use std::{io::Write, collections::VecDeque};
use anyhow::Result;
use bytes::Bytes;
use once_cell::sync::Lazy;
use tokio::sync::watch::Sender;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::prelude::*;
use tracing_subscriber::{fmt::format, EnvFilter};
@ -25,13 +27,15 @@ pub static MEM_LOG: Lazy<Mutex<VecDeque<u8>>> = Lazy::new(|| Mutex::new(VecDeque
/// to write will make the buffer exceed `max`.
struct LimitedInMemoryBuffer {
pub buf: &'static Mutex<VecDeque<u8>>,
sender: Sender<Bytes>,
max: usize,
}
impl LimitedInMemoryBuffer {
fn new(buf: &'static Mutex<VecDeque<u8>>, max: usize) -> Self {
fn new(buf: &'static Mutex<VecDeque<u8>>, sender: Sender<Bytes>, max: usize) -> Self {
Self {
buf,
sender,
max,
}
}
@ -59,6 +63,8 @@ impl Write for LimitedInMemoryBuffer {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
// if self.buf is too big, truncate it to the closest newline starting from the front
self.truncate();
let bytes = Bytes::copy_from_slice(buf);
self.sender.send(bytes).ok();
let mut mem_buf = self.buf.lock().unwrap();
mem_buf.write(buf)
}
@ -69,12 +75,12 @@ impl Write for LimitedInMemoryBuffer {
}
}
pub fn init_tracing(config: &Config) -> Result<(WorkerGuard, WorkerGuard)> {
pub fn init_tracing(config: &Config, log_sender: Sender<Bytes>) -> Result<(WorkerGuard, WorkerGuard)> {
let fmt_layer = tracing_subscriber::fmt::layer();
let filter_layer = EnvFilter::from_default_env();
let file_appender = tracing_appender::rolling::hourly("./logs", "log");
let (file_writer, file_writer_guard) = tracing_appender::non_blocking(file_appender);
let mem_writer = LimitedInMemoryBuffer::new(&MEM_LOG, config.max_mem_log_size);
let mem_writer = LimitedInMemoryBuffer::new(&MEM_LOG, log_sender, config.max_mem_log_size);
let (mem_writer, mem_writer_guard) = tracing_appender::non_blocking(mem_writer);
let file_writer_layer = tracing_subscriber::fmt::layer()
.with_writer(file_writer)

View File

@ -5,9 +5,11 @@ use axum::{
routing::{get, post},
Router,
};
use bytes::Bytes;
use dotenvy::dotenv;
use notify::Watcher;
use sqlx::postgres::PgPoolOptions;
use tokio::sync::watch::channel;
use tower::ServiceBuilder;
use tower_http::trace::TraceLayer;
use tower_livereload::LiveReloadLayer;
@ -24,7 +26,8 @@ async fn main() -> Result<()> {
let config = Config::new()?;
let _guards = init_tracing(&config)?;
let (log_sender, log_receiver) = channel::<Bytes>(Bytes::new());
let _guards = init_tracing(&config, log_sender)?;
let pool = PgPoolOptions::new()
.max_connections(config.database_max_connections)
@ -45,7 +48,8 @@ async fn main() -> Result<()> {
.route("/feeds", get(handlers::feeds::get))
.route("/entry/:id", get(handlers::entry::get))
.route("/log", get(handlers::log::get))
.with_state(AppState { pool, config })
.route("/log/stream", get(handlers::log::stream))
.with_state(AppState { pool, config, log_receiver })
.layer(ServiceBuilder::new().layer(TraceLayer::new_for_http()));
#[cfg(debug_assertions)]

View File

@ -1,4 +1,7 @@
use tokio::sync::watch::Receiver;
use axum::extract::FromRef;
use bytes::Bytes;
use sqlx::PgPool;
use crate::config::Config;
@ -7,6 +10,7 @@ use crate::config::Config;
pub struct AppState {
pub pool: PgPool,
pub config: Config,
pub log_receiver: Receiver<Bytes>,
}
impl FromRef<AppState> for PgPool {
@ -20,3 +24,9 @@ impl FromRef<AppState> for Config {
state.config.clone()
}
}
impl FromRef<AppState> for Receiver<Bytes> {
fn from_ref(state: &AppState) -> Self {
state.log_receiver.clone()
}
}