Delay set cache after response, make cache static

Uses `tokio::spawn` to delay updating the cache while the server responds to the request.

Because `tokio::spawn` can run on another thread, references need to be static, so I initialized the cache in `lazy_static`.
This commit is contained in:
Tyler Hallada 2020-11-07 00:28:54 -05:00
parent 0980d01640
commit a53eeffb0f
11 changed files with 177 additions and 141 deletions

1
Cargo.lock generated
View File

@ -142,6 +142,7 @@ dependencies = [
"http-api-problem",
"hyper",
"ipnetwork",
"lazy_static",
"listenfd",
"lru",
"seahash",

View File

@ -12,6 +12,7 @@ chrono = { version = "0.4", features = ["serde"] }
dotenv = "0.15"
http-api-problem = { version = "0.17", features = ["with-warp"] }
hyper = "0.13"
lazy_static = "1.4"
listenfd = "0.3"
tokio = { version = "0.2", features = ["macros", "rt-threaded", "sync"] }
sqlx = { version = "0.3", default-features = false, features = [ "runtime-tokio", "macros", "postgres", "chrono", "uuid", "ipnetwork", "json" ] }

View File

@ -24,8 +24,8 @@ where
impl<K, V> Cache<K, V>
where
K: Eq + Hash + Debug,
V: Clone,
K: Eq + Hash + Debug + Send,
V: Clone + Send,
{
pub fn new(name: &str, capacity: usize) -> Self {
Cache {
@ -48,7 +48,7 @@ where
}
}
pub async fn get<G, F>(&self, key: K, getter: G) -> Result<V>
pub async fn get<G, F>(&'static self, key: K, getter: G) -> Result<V>
where
G: Fn() -> F,
F: Future<Output = Result<V>>,
@ -62,8 +62,13 @@ where
self.log_with_key(&key, "get: miss");
let value = getter().await?;
let to_cache = value.clone();
tokio::spawn(async move {
let mut guard = self.lru_mutex.lock().await;
guard.put(key, value.clone());
self.log_with_key(&key, "get: update cache");
guard.put(key, to_cache);
});
Ok(value)
}
@ -85,10 +90,10 @@ where
impl<K> Cache<K, CachedResponse>
where
K: Eq + Hash + Debug,
K: Eq + Hash + Debug + Send,
{
pub async fn get_response<G, F, R>(
&self,
&'static self,
key: K,
getter: G,
) -> Result<CachedResponse, Rejection>
@ -111,8 +116,12 @@ where
let cached_response = CachedResponse::from_reply(reply)
.await
.map_err(reject_anyhow)?;
let to_cache = cached_response.clone();
tokio::spawn(async move {
let mut guard = self.lru_mutex.lock().await;
guard.put(key, cached_response.clone());
self.log_with_key(&key, "get_response: update cache");
guard.put(key, to_cache);
});
cached_response
}
Err(rejection) => {

View File

@ -9,6 +9,10 @@ mod cached_response;
pub use cache::Cache;
pub use cached_response::CachedResponse;
lazy_static! {
pub static ref CACHES: Caches = Caches::initialize();
}
#[derive(Debug, Clone)]
pub struct Caches {
pub owner_ids_by_api_key: Cache<Uuid, i32>,

View File

@ -4,6 +4,7 @@ use uuid::Uuid;
use warp::reply::{with_header, with_status};
use warp::{Rejection, Reply};
use crate::caches::CACHES;
use crate::models::{InteriorRefList, ListParams};
use crate::problem::reject_anyhow;
use crate::Environment;
@ -11,8 +12,7 @@ use crate::Environment;
use super::{authenticate, check_etag, JsonWithETag};
pub async fn get(id: i32, etag: Option<String>, env: Environment) -> Result<impl Reply, Rejection> {
let response = env
.caches
let response = CACHES
.interior_ref_list
.get_response(id, || async {
let interior_ref_list = InteriorRefList::get(&env.db, id).await?;
@ -29,8 +29,7 @@ pub async fn get_by_shop_id(
etag: Option<String>,
env: Environment,
) -> Result<impl Reply, Rejection> {
let response = env
.caches
let response = CACHES
.interior_ref_list_by_shop_id
.get_response(shop_id, || async {
let interior_ref_list = InteriorRefList::get_by_shop_id(&env.db, shop_id).await?;
@ -47,8 +46,7 @@ pub async fn list(
etag: Option<String>,
env: Environment,
) -> Result<impl Reply, Rejection> {
let response = env
.caches
let response = CACHES
.list_interior_ref_lists
.get_response(list_params.clone(), || async {
let interior_ref_lists = InteriorRefList::list(&env.db, &list_params).await?;
@ -80,11 +78,13 @@ pub async fn create(
let reply = JsonWithETag::from_serializable(&saved_interior_ref_list).map_err(reject_anyhow)?;
let reply = with_header(reply, "Location", url.as_str());
let reply = with_status(reply, StatusCode::CREATED);
env.caches.list_interior_ref_lists.clear().await;
env.caches
tokio::spawn(async move {
CACHES.list_interior_ref_lists.clear().await;
CACHES
.interior_ref_list_by_shop_id
.delete_response(saved_interior_ref_list.shop_id)
.await;
});
Ok(reply)
}
@ -118,12 +118,14 @@ pub async fn update(
JsonWithETag::from_serializable(&updated_interior_ref_list).map_err(reject_anyhow)?;
let reply = with_header(reply, "Location", url.as_str());
let reply = with_status(reply, StatusCode::CREATED);
env.caches.interior_ref_list.delete_response(id).await;
env.caches
tokio::spawn(async move {
CACHES.interior_ref_list.delete_response(id).await;
CACHES
.interior_ref_list_by_shop_id
.delete_response(updated_interior_ref_list.shop_id)
.await;
env.caches.list_interior_ref_lists.clear().await;
CACHES.list_interior_ref_lists.clear().await;
});
Ok(reply)
}
@ -149,7 +151,8 @@ pub async fn update_by_shop_id(
JsonWithETag::from_serializable(&updated_interior_ref_list).map_err(reject_anyhow)?;
let reply = with_header(reply, "Location", url.as_str());
let reply = with_status(reply, StatusCode::CREATED);
env.caches
tokio::spawn(async move {
CACHES
.interior_ref_list
.delete_response(
updated_interior_ref_list
@ -157,11 +160,12 @@ pub async fn update_by_shop_id(
.expect("saved interior_ref_list has no id"),
)
.await;
env.caches
CACHES
.interior_ref_list_by_shop_id
.delete_response(updated_interior_ref_list.shop_id)
.await;
env.caches.list_interior_ref_lists.clear().await;
CACHES.list_interior_ref_lists.clear().await;
});
Ok(reply)
}
@ -177,11 +181,13 @@ pub async fn delete(
InteriorRefList::delete(&env.db, owner_id, id)
.await
.map_err(reject_anyhow)?;
env.caches.interior_ref_list.delete_response(id).await;
env.caches.list_interior_ref_lists.clear().await;
env.caches
tokio::spawn(async move {
CACHES.interior_ref_list.delete_response(id).await;
CACHES.list_interior_ref_lists.clear().await;
CACHES
.interior_ref_list_by_shop_id
.delete_response(interior_ref_list.shop_id)
.await;
});
Ok(StatusCode::NO_CONTENT)
}

View File

@ -4,6 +4,7 @@ use uuid::Uuid;
use warp::reply::{with_header, with_status};
use warp::{Rejection, Reply};
use crate::caches::CACHES;
use crate::models::{ListParams, MerchandiseList};
use crate::problem::reject_anyhow;
use crate::Environment;
@ -11,8 +12,7 @@ use crate::Environment;
use super::{authenticate, check_etag, JsonWithETag};
pub async fn get(id: i32, etag: Option<String>, env: Environment) -> Result<impl Reply, Rejection> {
let response = env
.caches
let response = CACHES
.merchandise_list
.get_response(id, || async {
let merchandise_list = MerchandiseList::get(&env.db, id).await?;
@ -29,8 +29,7 @@ pub async fn get_by_shop_id(
etag: Option<String>,
env: Environment,
) -> Result<impl Reply, Rejection> {
let response = env
.caches
let response = CACHES
.merchandise_list_by_shop_id
.get_response(shop_id, || async {
let merchandise_list = MerchandiseList::get_by_shop_id(&env.db, shop_id).await?;
@ -47,8 +46,7 @@ pub async fn list(
etag: Option<String>,
env: Environment,
) -> Result<impl Reply, Rejection> {
let response = env
.caches
let response = CACHES
.list_merchandise_lists
.get_response(list_params.clone(), || async {
let merchandise_lists = MerchandiseList::list(&env.db, &list_params).await?;
@ -80,11 +78,13 @@ pub async fn create(
let reply = JsonWithETag::from_serializable(&saved_merchandise_list).map_err(reject_anyhow)?;
let reply = with_header(reply, "Location", url.as_str());
let reply = with_status(reply, StatusCode::CREATED);
env.caches.list_merchandise_lists.clear().await;
env.caches
tokio::spawn(async move {
CACHES.list_merchandise_lists.clear().await;
CACHES
.merchandise_list_by_shop_id
.delete_response(saved_merchandise_list.shop_id)
.await;
});
Ok(reply)
}
@ -118,12 +118,14 @@ pub async fn update(
JsonWithETag::from_serializable(&updated_merchandise_list).map_err(reject_anyhow)?;
let reply = with_header(reply, "Location", url.as_str());
let reply = with_status(reply, StatusCode::CREATED);
env.caches.merchandise_list.delete_response(id).await;
env.caches
tokio::spawn(async move {
CACHES.merchandise_list.delete_response(id).await;
CACHES
.merchandise_list_by_shop_id
.delete_response(updated_merchandise_list.shop_id)
.await;
env.caches.list_merchandise_lists.clear().await;
CACHES.list_merchandise_lists.clear().await;
});
Ok(reply)
}
@ -149,7 +151,8 @@ pub async fn update_by_shop_id(
JsonWithETag::from_serializable(&updated_merchandise_list).map_err(reject_anyhow)?;
let reply = with_header(reply, "Location", url.as_str());
let reply = with_status(reply, StatusCode::CREATED);
env.caches
tokio::spawn(async move {
CACHES
.merchandise_list
.delete_response(
updated_merchandise_list
@ -157,11 +160,12 @@ pub async fn update_by_shop_id(
.expect("saved merchandise_list has no id"),
)
.await;
env.caches
CACHES
.merchandise_list_by_shop_id
.delete_response(updated_merchandise_list.shop_id)
.await;
env.caches.list_merchandise_lists.clear().await;
CACHES.list_merchandise_lists.clear().await;
});
Ok(reply)
}
@ -177,11 +181,13 @@ pub async fn delete(
MerchandiseList::delete(&env.db, owner_id, id)
.await
.map_err(reject_anyhow)?;
env.caches.merchandise_list.delete_response(id).await;
env.caches
tokio::spawn(async move {
CACHES.merchandise_list.delete_response(id).await;
CACHES
.merchandise_list_by_shop_id
.delete_response(merchandise_list.shop_id)
.await;
env.caches.list_merchandise_lists.clear().await;
CACHES.list_merchandise_lists.clear().await;
});
Ok(StatusCode::NO_CONTENT)
}

View File

@ -15,14 +15,14 @@ pub mod owner;
pub mod shop;
pub mod transaction;
use super::caches::CachedResponse;
use super::caches::{CachedResponse, CACHES};
use super::problem::{unauthorized_no_api_key, unauthorized_no_owner};
use super::Environment;
#[instrument(level = "debug", skip(env, api_key))]
pub async fn authenticate(env: &Environment, api_key: Option<Uuid>) -> Result<i32> {
if let Some(api_key) = api_key {
env.caches
CACHES
.owner_ids_by_api_key
.get(api_key, || async {
Ok(

View File

@ -6,6 +6,7 @@ use uuid::Uuid;
use warp::reply::{with_header, with_status};
use warp::{Rejection, Reply};
use crate::caches::CACHES;
use crate::models::{ListParams, Owner};
use crate::problem::{reject_anyhow, unauthorized_no_api_key};
use crate::Environment;
@ -13,8 +14,7 @@ use crate::Environment;
use super::{authenticate, check_etag, JsonWithETag};
pub async fn get(id: i32, etag: Option<String>, env: Environment) -> Result<impl Reply, Rejection> {
let response = env
.caches
let response = CACHES
.owner
.get_response(id, || async {
let owner = Owner::get(&env.db, id).await?;
@ -31,8 +31,7 @@ pub async fn list(
etag: Option<String>,
env: Environment,
) -> Result<impl Reply, Rejection> {
let response = env
.caches
let response = CACHES
.list_owners
.get_response(list_params.clone(), || async {
let owners = Owner::list(&env.db, &list_params).await?;
@ -72,7 +71,9 @@ pub async fn create(
let reply = JsonWithETag::from_serializable(&saved_owner).map_err(reject_anyhow)?;
let reply = with_header(reply, "Location", url.as_str());
let reply = with_status(reply, StatusCode::CREATED);
env.caches.list_owners.clear().await;
tokio::spawn(async move {
CACHES.list_owners.clear().await;
});
Ok(reply)
} else {
Err(reject_anyhow(unauthorized_no_api_key()))
@ -98,8 +99,10 @@ pub async fn update(
let reply = JsonWithETag::from_serializable(&updated_owner).map_err(reject_anyhow)?;
let reply = with_header(reply, "Location", url.as_str());
let reply = with_status(reply, StatusCode::CREATED);
env.caches.owner.delete_response(id).await;
env.caches.list_owners.clear().await;
tokio::spawn(async move {
CACHES.owner.delete_response(id).await;
CACHES.list_owners.clear().await;
});
Ok(reply)
}
@ -112,11 +115,13 @@ pub async fn delete(
Owner::delete(&env.db, owner_id, id)
.await
.map_err(reject_anyhow)?;
env.caches.owner.delete_response(id).await;
env.caches
tokio::spawn(async move {
CACHES.owner.delete_response(id).await;
CACHES
.owner_ids_by_api_key
.delete(api_key.expect("api-key has been validated during authenticate"))
.await;
env.caches.list_owners.clear().await;
CACHES.list_owners.clear().await;
});
Ok(StatusCode::NO_CONTENT)
}

View File

@ -5,6 +5,7 @@ use uuid::Uuid;
use warp::reply::{with_header, with_status};
use warp::{Rejection, Reply};
use crate::caches::CACHES;
use crate::models::{InteriorRefList, ListParams, MerchandiseList, Shop};
use crate::problem::reject_anyhow;
use crate::Environment;
@ -12,8 +13,7 @@ use crate::Environment;
use super::{authenticate, check_etag, JsonWithETag};
pub async fn get(id: i32, etag: Option<String>, env: Environment) -> Result<impl Reply, Rejection> {
let response = env
.caches
let response = CACHES
.shop
.get_response(id, || async {
let shop = Shop::get(&env.db, id).await?;
@ -30,8 +30,7 @@ pub async fn list(
etag: Option<String>,
env: Environment,
) -> Result<impl Reply, Rejection> {
let response = env
.caches
let response = CACHES
.list_shops
.get_response(list_params.clone(), || async {
let shops = Shop::list(&env.db, &list_params).await?;
@ -90,7 +89,9 @@ pub async fn create(
let reply = JsonWithETag::from_serializable(&saved_shop).map_err(reject_anyhow)?;
let reply = with_header(reply, "Location", url.as_str());
let reply = with_status(reply, StatusCode::CREATED);
env.caches.list_shops.clear().await;
tokio::spawn(async move {
CACHES.list_shops.clear().await;
});
Ok(reply)
}
@ -122,8 +123,10 @@ pub async fn update(
let reply = JsonWithETag::from_serializable(&updated_shop).map_err(reject_anyhow)?;
let reply = with_header(reply, "Location", url.as_str());
let reply = with_status(reply, StatusCode::CREATED);
env.caches.shop.delete_response(id).await;
env.caches.list_shops.clear().await;
tokio::spawn(async move {
CACHES.shop.delete_response(id).await;
CACHES.list_shops.clear().await;
});
Ok(reply)
}
@ -136,15 +139,14 @@ pub async fn delete(
Shop::delete(&env.db, owner_id, id)
.await
.map_err(reject_anyhow)?;
env.caches.shop.delete_response(id).await;
env.caches.list_shops.clear().await;
env.caches
tokio::spawn(async move {
CACHES.shop.delete_response(id).await;
CACHES.list_shops.clear().await;
CACHES
.interior_ref_list_by_shop_id
.delete_response(id)
.await;
env.caches
.merchandise_list_by_shop_id
.delete_response(id)
.await;
CACHES.merchandise_list_by_shop_id.delete_response(id).await;
});
Ok(StatusCode::NO_CONTENT)
}

View File

@ -4,6 +4,7 @@ use uuid::Uuid;
use warp::reply::{with_header, with_status};
use warp::{Rejection, Reply};
use crate::caches::CACHES;
use crate::models::{ListParams, MerchandiseList, Transaction};
use crate::problem::reject_anyhow;
use crate::Environment;
@ -11,8 +12,7 @@ use crate::Environment;
use super::{authenticate, check_etag, JsonWithETag};
pub async fn get(id: i32, etag: Option<String>, env: Environment) -> Result<impl Reply, Rejection> {
let response = env
.caches
let response = CACHES
.transaction
.get_response(id, || async {
let transaction = Transaction::get(&env.db, id).await?;
@ -29,8 +29,7 @@ pub async fn list(
etag: Option<String>,
env: Environment,
) -> Result<impl Reply, Rejection> {
let response = env
.caches
let response = CACHES
.list_transactions
.get_response(list_params.clone(), || async {
let transactions = Transaction::list(&env.db, &list_params).await?;
@ -48,8 +47,7 @@ pub async fn list_by_shop_id(
etag: Option<String>,
env: Environment,
) -> Result<impl Reply, Rejection> {
let response = env
.caches
let response = CACHES
.list_transactions_by_shop_id
.get_response((shop_id, list_params.clone()), || async {
let transactions = Transaction::list_by_shop_id(&env.db, shop_id, &list_params).await?;
@ -104,10 +102,11 @@ pub async fn create(
let reply = JsonWithETag::from_serializable(&saved_transaction).map_err(reject_anyhow)?;
let reply = with_header(reply, "Location", url.as_str());
let reply = with_status(reply, StatusCode::CREATED);
tokio::spawn(async move {
// TODO: will this make these caches effectively useless?
env.caches.list_transactions.clear().await;
env.caches.list_transactions_by_shop_id.clear().await;
env.caches
CACHES.list_transactions.clear().await;
CACHES.list_transactions_by_shop_id.clear().await;
CACHES
.merchandise_list
.delete_response(
updated_merchandise_list
@ -115,11 +114,12 @@ pub async fn create(
.expect("saved merchandise_list has no id"),
)
.await;
env.caches
CACHES
.merchandise_list_by_shop_id
.delete_response(updated_merchandise_list.shop_id)
.await;
env.caches.list_merchandise_lists.clear().await;
CACHES.list_merchandise_lists.clear().await;
});
Ok(reply)
}
@ -132,8 +132,10 @@ pub async fn delete(
Transaction::delete(&env.db, owner_id, id)
.await
.map_err(reject_anyhow)?;
env.caches.transaction.delete_response(id).await;
env.caches.list_transactions.clear().await;
env.caches.list_transactions_by_shop_id.clear().await;
tokio::spawn(async move {
CACHES.transaction.delete_response(id).await;
CACHES.list_transactions.clear().await;
CACHES.list_transactions_by_shop_id.clear().await;
});
Ok(StatusCode::NO_CONTENT)
}

View File

@ -1,3 +1,6 @@
#[macro_use]
extern crate lazy_static;
use anyhow::Result;
use dotenv::dotenv;
use http::StatusCode;
@ -18,13 +21,11 @@ mod macros;
mod models;
mod problem;
use caches::Caches;
use models::{InteriorRefList, ListParams, MerchandiseList, Owner, Shop, Transaction};
#[derive(Debug, Clone)]
pub struct Environment {
pub db: PgPool,
pub caches: Caches,
pub api_url: Url,
}
@ -35,7 +36,6 @@ impl Environment {
.max_size(5)
.build(&env::var("DATABASE_URL")?)
.await?,
caches: Caches::initialize(),
api_url,
})
}