Basic todo example with lru cache and in-memory db

This commit is contained in:
Tyler Hallada 2020-09-14 22:52:33 -04:00
commit d4d604f779
15 changed files with 2317 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
/target
tags

1473
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

20
Cargo.toml Normal file
View File

@ -0,0 +1,20 @@
[package]
name = "warp-caching"
version = "0.1.0"
authors = ["Tyler Hallada <tyler@hallada.net>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0"
http = "0.2"
http-api-problem = { version = "0.17", features = ["with-warp"] }
hyper = "0.13"
lru = "0.5"
tokio = { version = "0.2", features = ["macros", "rt-threaded", "sync"] }
tracing = "0.1"
tracing-subscriber = "0.2"
tracing-futures = "0.2"
serde = { version = "1.0", features = ["derive"] }
warp = "0.2"

135
src/caches/cache.rs Normal file
View File

@ -0,0 +1,135 @@
use anyhow::Result;
use lru::LruCache;
use std::fmt::Debug;
use std::future::Future;
use std::hash::Hash;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::debug;
use warp::http::StatusCode;
use warp::{reject, Rejection, Reply};
use crate::problem::{reject_anyhow, unpack_problem};
use super::CachedResponse;
#[derive(Debug, Clone)]
pub struct Cache<K, V>
where
K: Eq + Hash + Debug,
V: Clone,
{
pub name: String,
pub lru_mutex: Arc<Mutex<LruCache<K, V>>>,
pub log_keys: bool,
}
impl<K, V> Cache<K, V>
where
K: Eq + Hash + Debug,
V: Clone,
{
pub fn new(name: &str, capacity: usize) -> Self {
Cache {
name: name.to_string(),
lru_mutex: Arc::new(Mutex::new(LruCache::new(capacity))),
log_keys: true,
}
}
pub fn log_keys(mut self, value: bool) -> Self {
self.log_keys = value;
self
}
pub fn log_with_key(&self, key: &K, message: &str) {
if self.log_keys {
debug!(cache = %self.name, key = ?key, message);
} else {
debug!(cache = %self.name, message);
}
}
pub async fn get<G, F>(&self, key: K, getter: G) -> Result<V>
where
G: Fn() -> F,
F: Future<Output = Result<V>>,
{
let mut guard = self.lru_mutex.lock().await;
if let Some(value) = guard.get(&key) {
self.log_with_key(&key, "get: hit");
return Ok(value.clone());
}
drop(guard);
self.log_with_key(&key, "get: miss");
let value = getter().await?;
let mut guard = self.lru_mutex.lock().await;
guard.put(key, value.clone());
Ok(value)
}
pub async fn delete(&self, key: K) -> Result<Option<V>> {
let mut guard = self.lru_mutex.lock().await;
let value = guard.pop(&key);
self.log_with_key(&key, "delete");
Ok(value)
}
pub async fn clear(&self) {
let mut guard = self.lru_mutex.lock().await;
guard.clear();
debug!(cache = %self.name, "cache clear");
}
}
impl<K> Cache<K, CachedResponse>
where
K: Eq + Hash + Debug,
{
pub async fn get_response<G, F, R>(
&self,
key: K,
getter: G,
) -> Result<CachedResponse, Rejection>
where
G: Fn() -> F,
F: Future<Output = Result<R>>,
R: Reply,
{
let mut guard = self.lru_mutex.lock().await;
if let Some(value) = guard.get(&key) {
self.log_with_key(&key, "get_response: hit");
return Ok(value.clone());
}
drop(guard);
self.log_with_key(&key, "get_response: miss");
let reply = getter().await.map_err(reject_anyhow);
let cached_response = match reply {
Ok(reply) => CachedResponse::from_reply(reply)
.await
.map_err(reject_anyhow)?,
Err(rejection) => {
let reply = unpack_problem(rejection).await?;
CachedResponse::from_reply(reply)
.await
.map_err(reject_anyhow)?
}
};
let mut guard = self.lru_mutex.lock().await;
guard.put(key, cached_response.clone());
Ok(cached_response)
}
pub async fn delete_response(&self, key: K) -> Result<Option<CachedResponse>> {
let mut guard = self.lru_mutex.lock().await;
let cached_response = guard.pop(&key);
self.log_with_key(&key, "delete_response");
Ok(cached_response)
}
}

View File

@ -0,0 +1,46 @@
use anyhow::Result;
use http::{HeaderMap, HeaderValue, Response, StatusCode, Version};
use hyper::body::{to_bytes, Body, Bytes};
use warp::Reply;
#[derive(Debug, Clone)]
pub struct CachedResponse {
pub status: StatusCode,
pub version: Version,
pub headers: HeaderMap<HeaderValue>,
pub body: Bytes,
}
impl CachedResponse {
pub async fn from_reply<T>(reply: T) -> Result<Self>
where
T: Reply,
{
let mut response = reply.into_response();
Ok(CachedResponse {
status: response.status(),
version: response.version(),
headers: response.headers().clone(),
body: to_bytes(response.body_mut()).await?,
})
}
}
impl Reply for CachedResponse {
fn into_response(self) -> warp::reply::Response {
match Response::builder()
.status(self.status)
.version(self.version)
.body(Body::from(self.body))
{
Ok(mut response) => {
let headers = response.headers_mut();
for (header, value) in self.headers.iter() {
headers.insert(header, value.clone());
}
response
}
Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
}
}
}

24
src/caches/mod.rs Normal file
View File

@ -0,0 +1,24 @@
use std::fmt::Debug;
use crate::models::ListOptions;
mod cache;
mod cached_response;
pub use cache::Cache;
pub use cached_response::CachedResponse;
#[derive(Debug, Clone)]
pub struct Caches {
pub todos: Cache<u64, CachedResponse>,
pub list_todos: Cache<ListOptions, CachedResponse>,
}
impl Caches {
pub fn initialize() -> Self {
Caches {
todos: Cache::new("todos", 100),
list_todos: Cache::new("list_todos", 100),
}
}
}

90
src/lru_cache/filters.rs Normal file
View File

@ -0,0 +1,90 @@
use warp::Filter;
use super::handlers;
use crate::models::{Environment, ListOptions, Todo};
/// The 4 TODOs filters combined.
pub fn todos(
env: Environment,
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
warp::path("lru_cache").and(
todos_list(env.clone())
.or(todos_create(env.clone()))
.or(todos_update(env.clone()))
.or(todos_get(env.clone()))
.or(todos_delete(env)),
)
}
/// GET /todos?offset=3&limit=5
pub fn todos_list(
env: Environment,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
warp::path!("todos")
.and(warp::get())
.and(warp::query::<ListOptions>())
.and(with_env(env))
.and_then(handlers::list_todos)
}
/// GET /todos/:id
pub fn todos_get(
env: Environment,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
warp::path!("todos" / u64)
.and(warp::get())
.and(with_env(env))
.and_then(handlers::get_todo)
}
/// POST /todos with JSON body
pub fn todos_create(
env: Environment,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
warp::path!("todos")
.and(warp::post())
.and(json_body())
.and(with_env(env))
.and_then(handlers::create_todo)
}
/// PUT /todos/:id with JSON body
pub fn todos_update(
env: Environment,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
warp::path!("todos" / u64)
.and(warp::put())
.and(json_body())
.and(with_env(env))
.and_then(handlers::update_todo)
}
/// DELETE /todos/:id
pub fn todos_delete(
env: Environment,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
// We'll make one of our endpoints admin-only to show how authentication filters are used
let admin_only = warp::header::exact("authorization", "Bearer admin");
warp::path!("todos" / u64)
// It is important to put the auth check _after_ the path filters.
// If we put the auth check before, the request `PUT /todos/invalid-string`
// would try this filter and reject because the authorization header doesn't match,
// rather because the param is wrong for that other path.
.and(admin_only)
.and(warp::delete())
.and(with_env(env))
.and_then(handlers::delete_todo)
}
fn with_env(
env: Environment,
) -> impl Filter<Extract = (Environment,), Error = std::convert::Infallible> + Clone {
warp::any().map(move || env.clone())
}
fn json_body() -> impl Filter<Extract = (Todo,), Error = warp::Rejection> + Clone {
// When accepting a body, we want a JSON body
// (and to reject huge payloads)...
warp::body::content_length_limit(1024 * 16).and(warp::body::json())
}

137
src/lru_cache/handlers.rs Normal file
View File

@ -0,0 +1,137 @@
/// These are our API handlers, the ends of each filter chain.
/// Notice how thanks to using `Filter::and`, we can define a function
/// with the exact arguments we'd expect from each filter in the chain.
/// No tuples are needed, it's auto flattened for the functions.
use anyhow::anyhow;
use http_api_problem::HttpApiProblem;
use std::convert::Infallible;
use tracing::debug;
use warp::http::StatusCode;
use crate::models::{Environment, ListOptions, Todo};
use crate::problem::reject_anyhow;
pub async fn list_todos(
opts: ListOptions,
env: Environment,
) -> Result<impl warp::Reply, warp::Rejection> {
env.caches
.list_todos
.get_response(opts.clone(), || async {
// Just return a JSON array of todos, applying the limit and offset.
let todos = env.db.lock().await;
let todos: Vec<Todo> = todos
.clone()
.into_iter()
.skip(opts.offset.unwrap_or(0))
.take(opts.limit.unwrap_or(std::usize::MAX))
.collect();
Ok(warp::reply::json(&todos))
})
.await
}
pub async fn get_todo(id: u64, env: Environment) -> Result<impl warp::Reply, warp::Rejection> {
env.caches
.todos
.get_response(id, || async {
debug!("get_todo: id={}", id);
let mut vec = env.db.lock().await;
// Look for the specified Todo...
for todo in vec.iter_mut() {
if todo.id == id {
return Ok(warp::reply::json(&todo));
}
}
debug!(" -> todo id not found!");
// If the for loop didn't return OK, then the ID doesn't exist...
Err(anyhow!(HttpApiProblem::new(format!(
"Todo {} not found",
id
))
.set_status(StatusCode::NOT_FOUND)))
})
.await
}
pub async fn create_todo(create: Todo, env: Environment) -> Result<impl warp::Reply, Infallible> {
debug!("create_todo: {:?}", create);
let mut vec = env.db.lock().await;
for todo in vec.iter() {
if todo.id == create.id {
debug!(" -> id already exists: {}", create.id);
// Todo with id already exists, return `400 BadRequest`.
return Ok(StatusCode::BAD_REQUEST);
}
}
// No existing Todo with id, so insert and return `201 Created`.
vec.push(create);
env.caches.list_todos.clear().await;
Ok(StatusCode::CREATED)
}
pub async fn update_todo(
id: u64,
update: Todo,
env: Environment,
) -> Result<impl warp::Reply, warp::Rejection> {
debug!("update_todo: id={}, todo={:?}", id, update);
let mut vec = env.db.lock().await;
// Look for the specified Todo...
for todo in vec.iter_mut() {
if todo.id == id {
*todo = update;
env.caches
.todos
.delete_response(id)
.await
.map_err(reject_anyhow)?;
env.caches.list_todos.clear().await;
return Ok(StatusCode::OK);
}
}
debug!(" -> todo id not found!");
// If the for loop didn't return OK, then the ID doesn't exist...
Ok(StatusCode::NOT_FOUND)
}
pub async fn delete_todo(id: u64, env: Environment) -> Result<impl warp::Reply, warp::Rejection> {
debug!("delete_todo: id={}", id);
let mut vec = env.db.lock().await;
let len = vec.len();
vec.retain(|todo| {
// Retain all Todos that aren't this id...
// In other words, remove all that *are* this id...
todo.id != id
});
// If the vec is smaller, we found and deleted a Todo!
let deleted = vec.len() != len;
if deleted {
env.caches
.todos
.delete_response(id)
.await
.map_err(reject_anyhow)?;
env.caches.list_todos.clear().await;
// respond with a `204 No Content`, which means successful,
// yet no body expected...
Ok(StatusCode::NO_CONTENT)
} else {
debug!(" -> todo id not found!");
Ok(StatusCode::NOT_FOUND)
}
}

2
src/lru_cache/mod.rs Normal file
View File

@ -0,0 +1,2 @@
pub mod filters;
pub mod handlers;

116
src/main.rs Normal file
View File

@ -0,0 +1,116 @@
use std::env;
use tracing_subscriber::fmt::format::FmtSpan;
use warp::Filter;
mod caches;
mod lru_cache;
mod models;
mod no_cache;
mod problem;
/// Provides a RESTful web server managing some Todos.
///
/// API will be:
///
/// - `GET /todos`: return a JSON list of Todos.
/// - `POST /todos`: create a new Todo.
/// - `PUT /todos/:id`: update a specific Todo.
/// - `DELETE /todos/:id`: delete a specific Todo.
#[tokio::main]
async fn main() {
let env_log_filter =
env::var("RUST_LOG").unwrap_or_else(|_| "warp=info,warp_caching=debug".to_owned());
tracing_subscriber::fmt()
.with_env_filter(env_log_filter)
.with_span_events(FmtSpan::CLOSE)
.init();
let env = models::Environment {
db: models::blank_db(),
caches: models::blank_caches(),
};
let api = no_cache::filters::todos(env.clone()).or(lru_cache::filters::todos(env));
// View access logs by setting `RUST_LOG=warp-caching`.
let routes = api.with(warp::log("warp_caching"));
// Start up the server...
warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
}
#[cfg(test)]
mod tests {
use warp::http::StatusCode;
use warp::test::request;
use crate::models::{self, Todo};
use crate::no_cache::filters;
#[tokio::test]
async fn test_post() {
let env = models::Environment {
db: models::blank_db(),
caches: models::blank_caches(),
};
let api = filters::todos(env);
let resp = request()
.method("POST")
.path("/todos")
.json(&Todo {
id: 1,
text: "test 1".into(),
completed: false,
})
.reply(&api)
.await;
assert_eq!(resp.status(), StatusCode::CREATED);
}
#[tokio::test]
async fn test_post_conflict() {
let env = models::Environment {
db: models::blank_db(),
caches: models::blank_caches(),
};
env.db.lock().await.push(todo1());
let api = filters::todos(env);
let resp = request()
.method("POST")
.path("/todos")
.json(&todo1())
.reply(&api)
.await;
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn test_put_unknown() {
let env = models::Environment {
db: models::blank_db(),
caches: models::blank_caches(),
};
let api = filters::todos(env);
let resp = request()
.method("PUT")
.path("/todos/1")
.header("authorization", "Bearer admin")
.json(&todo1())
.reply(&api)
.await;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
fn todo1() -> Todo {
Todo {
id: 1,
text: "test 1".into(),
completed: false,
}
}
}

37
src/models.rs Normal file
View File

@ -0,0 +1,37 @@
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::caches::Caches;
/// So we don't have to tackle how different database work, we'll just use
/// a simple in-memory DB, a vector synchronized by a mutex.
pub type Db = Arc<Mutex<Vec<Todo>>>;
pub fn blank_db() -> Db {
Arc::new(Mutex::new(Vec::new()))
}
pub fn blank_caches() -> Caches {
Caches::initialize()
}
#[derive(Debug, Clone)]
pub struct Environment {
pub db: Db,
pub caches: Caches,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct Todo {
pub id: u64,
pub text: String,
pub completed: bool,
}
// The query parameters for list_todos.
#[derive(Debug, Deserialize, Clone, Hash, Eq, PartialEq)]
pub struct ListOptions {
pub offset: Option<usize>,
pub limit: Option<usize>,
}

90
src/no_cache/filters.rs Normal file
View File

@ -0,0 +1,90 @@
use warp::Filter;
use super::handlers;
use crate::models::{Environment, ListOptions, Todo};
/// The 4 TODOs filters combined.
pub fn todos(
env: Environment,
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
warp::path("no_cache").and(
todos_list(env.clone())
.or(todos_create(env.clone()))
.or(todos_update(env.clone()))
.or(todos_get(env.clone()))
.or(todos_delete(env)),
)
}
/// GET /todos?offset=3&limit=5
pub fn todos_list(
env: Environment,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
warp::path!("todos")
.and(warp::get())
.and(warp::query::<ListOptions>())
.and(with_env(env))
.and_then(handlers::list_todos)
}
/// GET /todos/:id
pub fn todos_get(
env: Environment,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
warp::path!("todos" / u64)
.and(warp::get())
.and(with_env(env))
.and_then(handlers::get_todo)
}
/// POST /todos with JSON body
pub fn todos_create(
env: Environment,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
warp::path!("todos")
.and(warp::post())
.and(json_body())
.and(with_env(env))
.and_then(handlers::create_todo)
}
/// PUT /todos/:id with JSON body
pub fn todos_update(
env: Environment,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
warp::path!("todos" / u64)
.and(warp::put())
.and(json_body())
.and(with_env(env))
.and_then(handlers::update_todo)
}
/// DELETE /todos/:id
pub fn todos_delete(
env: Environment,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
// We'll make one of our endpoints admin-only to show how authentication filters are used
let admin_only = warp::header::exact("authorization", "Bearer admin");
warp::path!("todos" / u64)
// It is important to put the auth check _after_ the path filters.
// If we put the auth check before, the request `PUT /todos/invalid-string`
// would try this filter and reject because the authorization header doesn't match,
// rather because the param is wrong for that other path.
.and(admin_only)
.and(warp::delete())
.and(with_env(env))
.and_then(handlers::delete_todo)
}
fn with_env(
env: Environment,
) -> impl Filter<Extract = (Environment,), Error = std::convert::Infallible> + Clone {
warp::any().map(move || env.clone())
}
fn json_body() -> impl Filter<Extract = (Todo,), Error = warp::Rejection> + Clone {
// When accepting a body, we want a JSON body
// (and to reject huge payloads)...
warp::body::content_length_limit(1024 * 16).and(warp::body::json())
}

107
src/no_cache/handlers.rs Normal file
View File

@ -0,0 +1,107 @@
/// These are our API handlers, the ends of each filter chain.
/// Notice how thanks to using `Filter::and`, we can define a function
/// with the exact arguments we'd expect from each filter in the chain.
/// No tuples are needed, it's auto flattened for the functions.
use std::convert::Infallible;
use tracing::debug;
use warp::http::StatusCode;
use crate::models::{Environment, ListOptions, Todo};
pub async fn list_todos(
opts: ListOptions,
env: Environment,
) -> Result<impl warp::Reply, Infallible> {
// Just return a JSON array of todos, applying the limit and offset.
let todos = env.db.lock().await;
let todos: Vec<Todo> = todos
.clone()
.into_iter()
.skip(opts.offset.unwrap_or(0))
.take(opts.limit.unwrap_or(std::usize::MAX))
.collect();
Ok(warp::reply::json(&todos))
}
pub async fn get_todo(id: u64, env: Environment) -> Result<impl warp::Reply, warp::Rejection> {
debug!("get_todo: id={}", id);
let mut vec = env.db.lock().await;
// Look for the specified Todo...
for todo in vec.iter_mut() {
if todo.id == id {
return Ok(warp::reply::json(&todo));
}
}
debug!(" -> todo id not found!");
// If the for loop didn't return OK, then the ID doesn't exist...
Err(warp::reject::not_found())
}
pub async fn create_todo(create: Todo, env: Environment) -> Result<impl warp::Reply, Infallible> {
debug!("create_todo: {:?}", create);
let mut vec = env.db.lock().await;
for todo in vec.iter() {
if todo.id == create.id {
debug!(" -> id already exists: {}", create.id);
// Todo with id already exists, return `400 BadRequest`.
return Ok(StatusCode::BAD_REQUEST);
}
}
// No existing Todo with id, so insert and return `201 Created`.
vec.push(create);
Ok(StatusCode::CREATED)
}
pub async fn update_todo(
id: u64,
update: Todo,
env: Environment,
) -> Result<impl warp::Reply, warp::Rejection> {
debug!("update_todo: id={}, todo={:?}", id, update);
let mut vec = env.db.lock().await;
// Look for the specified Todo...
for todo in vec.iter_mut() {
if todo.id == id {
*todo = update;
return Ok(StatusCode::OK);
}
}
debug!(" -> todo id not found!");
// If the for loop didn't return OK, then the ID doesn't exist...
Ok(StatusCode::NOT_FOUND)
}
pub async fn delete_todo(id: u64, env: Environment) -> Result<impl warp::Reply, warp::Rejection> {
debug!("delete_todo: id={}", id);
let mut vec = env.db.lock().await;
let len = vec.len();
vec.retain(|todo| {
// Retain all Todos that aren't this id...
// In other words, remove all that *are* this id...
todo.id != id
});
// If the vec is smaller, we found and deleted a Todo!
let deleted = vec.len() != len;
if deleted {
// respond with a `204 No Content`, which means successful,
// yet no body expected...
Ok(StatusCode::NO_CONTENT)
} else {
debug!(" -> todo id not found!");
Ok(StatusCode::NOT_FOUND)
}
}

2
src/no_cache/mod.rs Normal file
View File

@ -0,0 +1,2 @@
pub mod filters;
pub mod handlers;

36
src/problem/mod.rs Normal file
View File

@ -0,0 +1,36 @@
use http::StatusCode;
use http_api_problem::HttpApiProblem;
use tracing::error;
use warp::{reject, Rejection, Reply};
pub fn from_anyhow(error: anyhow::Error) -> HttpApiProblem {
let error = match error.downcast::<HttpApiProblem>() {
Ok(problem) => return problem,
Err(error) => error,
};
error!("Recovering unhandled error: {:?}", error);
HttpApiProblem::new("Internal Server Error: 500").set_status(StatusCode::INTERNAL_SERVER_ERROR)
}
pub async fn unpack_problem(rejection: Rejection) -> Result<impl Reply, Rejection> {
if let Some(problem) = rejection.find::<HttpApiProblem>() {
let code = problem.status.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let reply = warp::reply::json(problem);
let reply = warp::reply::with_status(reply, code);
let reply = warp::reply::with_header(
reply,
warp::http::header::CONTENT_TYPE,
http_api_problem::PROBLEM_JSON_MEDIA_TYPE,
);
return Ok(reply);
}
Err(rejection)
}
pub fn reject_anyhow(error: anyhow::Error) -> Rejection {
reject::custom(from_anyhow(error))
}