Example axum server with log stream page

This commit is contained in:
Tyler Hallada 2023-06-04 00:35:34 -04:00
commit 0ad871772e
16 changed files with 1494 additions and 0 deletions

5
.env Normal file
View File

@ -0,0 +1,5 @@
RUST_LOG=axum_log_stream=debug,tower_http=debug
HOST=127.0.0.1
PORT=3000
TITLE=axum-log-stream
MAX_MEM_LOG_SIZE=1000000

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
/target
/logs

1100
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

22
Cargo.toml Normal file
View File

@ -0,0 +1,22 @@
[package]
name = "axum-log-stream"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
ansi-to-html = "0.1"
anyhow = "1"
axum = "0.6"
bytes = "1.4"
dotenvy = "0.15"
maud = { version = "0.25", features = ["axum"] }
once_cell = "1.17"
tokio = { version = "1", features = ["full"] }
tokio-stream = { version = "0.1", features = ["sync"] }
tower = "0.4"
tower-http = { version = "0.4", features = ["trace"] }
tracing = "0.1"
tracing-appender = "0.2"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

23
README.md Normal file
View File

@ -0,0 +1,23 @@
# axum-log-stream
An example axum server that serves a page with a live stream of the server's
tracing log.
## Setup
A `.env` file should be at the root of the directory. Leave the default or
change them to your desired values:
```bash
RUST_LOG=axum_log_stream=debug,tower_http=debug
HOST=127.0.0.1
PORT=3000
TITLE=axum-log-stream
MAX_MEM_LOG_SIZE=1000000
```
## Running
```bash
cargo run
```

29
src/config.rs Normal file
View File

@ -0,0 +1,29 @@
use anyhow::Result;
#[derive(Clone, Debug)]
pub struct Config {
pub host: String,
pub port: u16,
pub title: String,
pub max_mem_log_size: usize,
}
impl Config {
pub fn new() -> Result<Config> {
let host = std::env::var("HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
let port = std::env::var("PORT")
.unwrap_or_else(|_| "3000".to_string())
.parse()?;
let title = std::env::var("TITLE").unwrap_or_else(|_| "axum-log-stream".to_string());
let max_mem_log_size = std::env::var("MAX_MEM_LOG_SIZE")
.unwrap_or_else(|_| "1000000".to_string())
.parse()?;
Ok(Config {
host,
port,
title,
max_mem_log_size,
})
}
}

10
src/handlers/home.rs Normal file
View File

@ -0,0 +1,10 @@
use axum::response::Response;
use maud::html;
use crate::partials::layout::Layout;
pub async fn get(layout: Layout) -> Response {
layout.render(html! {
p { "Home page" }
})
}

51
src/handlers/log.rs Normal file
View File

@ -0,0 +1,51 @@
use std::convert::Infallible;
use std::str::from_utf8;
use std::time::Duration;
use ansi_to_html::convert_escaped;
use anyhow::Result;
use axum::extract::State;
use axum::response::{
sse::{Event, Sse},
Response,
};
use bytes::Bytes;
use maud::{html, PreEscaped};
use tokio::sync::watch::Receiver;
use tokio_stream::wrappers::WatchStream;
use tokio_stream::Stream;
use tokio_stream::StreamExt;
use crate::log::MEM_LOG;
use crate::partials::layout::Layout;
pub async fn get(layout: Layout) -> Response {
let mem_buf = MEM_LOG.lock().unwrap();
layout.render(html! {
turbo-stream-source src="/log/stream" {}
pre id="log" { (PreEscaped(convert_escaped(from_utf8(mem_buf.as_slices().0).unwrap()).unwrap())) }
})
}
pub async fn stream(
State(log_receiver): State<Receiver<Bytes>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let log_stream = WatchStream::new(log_receiver);
let log_stream = log_stream.map(|line| {
Ok(Event::default().data(
html! {
turbo-stream action="append" target="log" {
template {
(PreEscaped(convert_escaped(from_utf8(&line).unwrap()).unwrap()))
}
}
}
.into_string(),
))
});
Sse::new(log_stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(Duration::from_secs(15))
.text("keep-alive-text"),
)
}

3
src/handlers/mod.rs Normal file
View File

@ -0,0 +1,3 @@
pub mod home;
pub mod log;
pub mod other;

10
src/handlers/other.rs Normal file
View File

@ -0,0 +1,10 @@
use axum::response::Response;
use maud::html;
use crate::partials::layout::Layout;
pub async fn get(layout: Layout) -> Response {
layout.render(html! {
p { "Other page" }
})
}

93
src/log.rs Normal file
View File

@ -0,0 +1,93 @@
use std::sync::Mutex;
use std::{collections::VecDeque, io::Write};
use anyhow::Result;
use bytes::Bytes;
use once_cell::sync::Lazy;
use tokio::sync::watch::Sender;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::prelude::*;
use tracing_subscriber::EnvFilter;
use crate::config::Config;
/// A shared in-memory buffer to store log bytes
pub static MEM_LOG: Lazy<Mutex<VecDeque<u8>>> = Lazy::new(|| Mutex::new(VecDeque::new()));
/// A `Writer` to a shared static in-memory buffer that stores bytes up until `max` bytes, at which
/// point it will truncate the buffer from the front up to the first newline byte `\n` within the
/// size limit.
///
/// This is useful for storing the last emitted log lines of an application in-memory without
/// needing to worry about the memory growing infinitely large.
///
/// `LimitedInMemoryBuffer` does not guarantee that the memory usage is less than `max`.
/// VecDeque`'s capacity may exceed `max` and it will only check and truncate the size of the
/// internal buffer *before* writing to it. It will continue to write, even if the size of the line
/// to write will make the buffer exceed `max`.
struct LimitedInMemoryBuffer {
pub buf: &'static Mutex<VecDeque<u8>>,
sender: Sender<Bytes>,
max: usize,
}
impl LimitedInMemoryBuffer {
fn new(buf: &'static Mutex<VecDeque<u8>>, sender: Sender<Bytes>, max: usize) -> Self {
Self { buf, sender, max }
}
/// Truncate the buffer to max bytes plus bytes before the closest newline starting from the
/// front
fn truncate(&mut self) {
let mut buf = self.buf.lock().unwrap();
let len = buf.len();
if len > self.max {
drop(buf.drain(..len - self.max));
let mut i = 0;
while i < buf.len() {
if buf[i] == b'\n' {
break;
}
i += 1;
}
drop(buf.drain(..i));
}
}
}
impl Write for LimitedInMemoryBuffer {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
// if self.buf is too big, truncate it to the closest newline starting from the front
self.truncate();
let bytes = Bytes::copy_from_slice(buf);
self.sender.send(bytes).ok();
let mut mem_buf = self.buf.lock().unwrap();
mem_buf.write(buf)
}
fn flush(&mut self) -> std::io::Result<()> {
let mut buf = self.buf.lock().unwrap();
buf.flush()
}
}
pub fn init_tracing(
config: &Config,
log_sender: Sender<Bytes>,
) -> Result<(WorkerGuard, WorkerGuard)> {
let fmt_layer = tracing_subscriber::fmt::layer();
let filter_layer = EnvFilter::from_default_env();
let file_appender = tracing_appender::rolling::hourly("./logs", "log");
let (file_writer, file_writer_guard) = tracing_appender::non_blocking(file_appender);
let mem_writer = LimitedInMemoryBuffer::new(&MEM_LOG, log_sender, config.max_mem_log_size);
let (mem_writer, mem_writer_guard) = tracing_appender::non_blocking(mem_writer);
let file_writer_layer = tracing_subscriber::fmt::layer().with_writer(file_writer);
let mem_writer_layer = tracing_subscriber::fmt::layer().with_writer(mem_writer);
tracing_subscriber::registry()
.with(filter_layer)
.with(fmt_layer)
.with(file_writer_layer)
.with(mem_writer_layer)
.init();
Ok((file_writer_guard, mem_writer_guard))
}

47
src/main.rs Normal file
View File

@ -0,0 +1,47 @@
use anyhow::Result;
use axum::{routing::get, Router};
use bytes::Bytes;
use dotenvy::dotenv;
use tokio::sync::watch::channel;
use tower::ServiceBuilder;
use tower_http::trace::TraceLayer;
use tracing::debug;
pub mod config;
pub mod handlers;
pub mod log;
pub mod partials;
pub mod state;
use crate::config::Config;
use crate::log::init_tracing;
use crate::state::AppState;
#[tokio::main]
async fn main() -> Result<()> {
dotenv().ok();
let config = Config::new()?;
let (log_sender, log_receiver) = channel::<Bytes>(Bytes::new());
let _guards = init_tracing(&config, log_sender)?;
let addr = format!("{}:{}", &config.host, &config.port).parse()?;
let app = Router::new()
.route("/", get(handlers::home::get))
.route("/other", get(handlers::other::get))
.route("/log", get(handlers::log::get))
.route("/log/stream", get(handlers::log::stream))
.with_state(AppState {
config,
log_receiver,
})
.layer(ServiceBuilder::new().layer(TraceLayer::new_for_http()));
debug!("listening on {}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await?;
Ok(())
}

15
src/partials/header.rs Normal file
View File

@ -0,0 +1,15 @@
use maud::{html, Markup};
pub fn header(title: &str) -> Markup {
html! {
header {
nav {
h1 { a href="/" data-turbo-frame="main" { (title) } }
ul {
li { a href="/other" data-turbo-frame="main" { "other" } }
li { a href="/log" data-turbo-frame="main" { "log" } }
}
}
}
}
}

58
src/partials/layout.rs Normal file
View File

@ -0,0 +1,58 @@
use axum::{
async_trait,
extract::{FromRef, FromRequestParts, State},
http::request::Parts,
response::{Html, IntoResponse, Response},
};
use maud::{html, Markup, DOCTYPE};
use crate::config::Config;
use crate::partials::header::header;
pub struct Layout {
pub title: String,
}
#[async_trait]
impl<S> FromRequestParts<S> for Layout
where
S: Send + Sync,
Config: FromRef<S>,
{
type Rejection = Response;
async fn from_request_parts(parts: &mut Parts, state: &S) -> Result<Self, Self::Rejection> {
let State(config) = State::<Config>::from_request_parts(parts, state)
.await
.map_err(|err| err.into_response())?;
Ok(Self {
title: config.title,
})
}
}
impl Layout {
pub fn render(self, template: Markup) -> Response {
let with_layout = html! {
(DOCTYPE)
html lang="en" {
head {
meta charset="utf-8";
title { (self.title) }
script type="module" {
r#"import * as Turbo from 'https://cdn.skypack.dev/@hotwired/turbo';"#
}
}
body {
(header(&self.title))
turbo-frame id="main" data-turbo-action="advance" {
(template)
}
}
}
}
.into_string();
Html(with_layout).into_response()
}
}

2
src/partials/mod.rs Normal file
View File

@ -0,0 +1,2 @@
pub mod layout;
pub mod header;

24
src/state.rs Normal file
View File

@ -0,0 +1,24 @@
use tokio::sync::watch::Receiver;
use axum::extract::FromRef;
use bytes::Bytes;
use crate::config::Config;
#[derive(Clone)]
pub struct AppState {
pub config: Config,
pub log_receiver: Receiver<Bytes>,
}
impl FromRef<AppState> for Config {
fn from_ref(state: &AppState) -> Self {
state.config.clone()
}
}
impl FromRef<AppState> for Receiver<Bytes> {
fn from_ref(state: &AppState) -> Self {
state.log_receiver.clone()
}
}