Don't use arc for cache keys
This commit is contained in:
parent
8d95fe3f07
commit
a86cd3edf5
3 changed files with 18 additions and 24 deletions
24
src/cache/low_mem.rs
vendored
24
src/cache/low_mem.rs
vendored
|
@ -8,11 +8,10 @@ use std::sync::Arc;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use log::{warn, LevelFilter};
|
use log::{warn, LevelFilter};
|
||||||
use sqlx::{sqlite::SqliteConnectOptions, ConnectOptions, SqlitePool};
|
use sqlx::sqlite::SqliteConnectOptions;
|
||||||
use tokio::{
|
use sqlx::{ConnectOptions, SqlitePool};
|
||||||
fs::remove_file,
|
use tokio::fs::remove_file;
|
||||||
sync::mpsc::{channel, Sender},
|
use tokio::sync::mpsc::{channel, Sender};
|
||||||
};
|
|
||||||
use tokio_stream::wrappers::ReceiverStream;
|
use tokio_stream::wrappers::ReceiverStream;
|
||||||
|
|
||||||
use super::{BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, ImageMetadata};
|
use super::{BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, ImageMetadata};
|
||||||
|
@ -63,7 +62,8 @@ impl LowMemCache {
|
||||||
// item that was put into the cache.
|
// item that was put into the cache.
|
||||||
let new_self_0 = Arc::clone(&new_self);
|
let new_self_0 = Arc::clone(&new_self);
|
||||||
|
|
||||||
// Spawn a new task that will listen for updates to the db.
|
// Spawn a new task that will listen for updates to the db, pruning if
|
||||||
|
// the size becomes too large
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let db_pool = db_pool;
|
let db_pool = db_pool;
|
||||||
let max_on_disk_size = disk_max_size / 20 * 19;
|
let max_on_disk_size = disk_max_size / 20 * 19;
|
||||||
|
@ -134,15 +134,11 @@ impl LowMemCache {
|
||||||
impl Cache for LowMemCache {
|
impl Cache for LowMemCache {
|
||||||
async fn get(
|
async fn get(
|
||||||
&self,
|
&self,
|
||||||
key: Arc<CacheKey>,
|
key: &CacheKey,
|
||||||
) -> Option<Result<(CacheStream, ImageMetadata), CacheError>> {
|
) -> Option<Result<(CacheStream, ImageMetadata), CacheError>> {
|
||||||
let channel = self.db_update_channel_sender.clone();
|
let channel = self.db_update_channel_sender.clone();
|
||||||
|
|
||||||
let path = Arc::new(
|
let path = Arc::new(self.disk_path.clone().join(PathBuf::from(key)));
|
||||||
self.disk_path
|
|
||||||
.clone()
|
|
||||||
.join(PathBuf::from(Arc::clone(&key).as_ref())),
|
|
||||||
);
|
|
||||||
let path_0 = Arc::clone(&path);
|
let path_0 = Arc::clone(&path);
|
||||||
|
|
||||||
tokio::spawn(async move { channel.send(DbMessage::Get(path_0)).await });
|
tokio::spawn(async move { channel.send(DbMessage::Get(path_0)).await });
|
||||||
|
@ -154,13 +150,13 @@ impl Cache for LowMemCache {
|
||||||
|
|
||||||
async fn put(
|
async fn put(
|
||||||
&self,
|
&self,
|
||||||
key: Arc<CacheKey>,
|
key: &CacheKey,
|
||||||
image: BoxedImageStream,
|
image: BoxedImageStream,
|
||||||
metadata: ImageMetadata,
|
metadata: ImageMetadata,
|
||||||
) -> Result<CacheStream, CacheError> {
|
) -> Result<CacheStream, CacheError> {
|
||||||
let channel = self.db_update_channel_sender.clone();
|
let channel = self.db_update_channel_sender.clone();
|
||||||
|
|
||||||
let path = Arc::new(self.disk_path.clone().join(PathBuf::from(key.as_ref())));
|
let path = Arc::new(self.disk_path.clone().join(PathBuf::from(key)));
|
||||||
let path_0 = Arc::clone(&path);
|
let path_0 = Arc::clone(&path);
|
||||||
|
|
||||||
let db_callback = |size: u32| async move {
|
let db_callback = |size: u32| async move {
|
||||||
|
|
10
src/cache/mod.rs
vendored
10
src/cache/mod.rs
vendored
|
@ -1,8 +1,8 @@
|
||||||
|
use std::fmt::Display;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
use std::{fmt::Display, sync::Arc};
|
|
||||||
|
|
||||||
use actix_web::http::HeaderValue;
|
use actix_web::http::HeaderValue;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
@ -152,14 +152,12 @@ pub enum CacheError {
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait Cache: Send + Sync {
|
pub trait Cache: Send + Sync {
|
||||||
async fn get(
|
async fn get(&self, key: &CacheKey)
|
||||||
&self,
|
-> Option<Result<(CacheStream, ImageMetadata), CacheError>>;
|
||||||
key: Arc<CacheKey>,
|
|
||||||
) -> Option<Result<(CacheStream, ImageMetadata), CacheError>>;
|
|
||||||
|
|
||||||
async fn put(
|
async fn put(
|
||||||
&self,
|
&self,
|
||||||
key: Arc<CacheKey>,
|
key: &CacheKey,
|
||||||
image: BoxedImageStream,
|
image: BoxedImageStream,
|
||||||
metadata: ImageMetadata,
|
metadata: ImageMetadata,
|
||||||
) -> Result<CacheStream, CacheError>;
|
) -> Result<CacheStream, CacheError>;
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use std::sync::{atomic::Ordering, Arc};
|
use std::sync::atomic::Ordering;
|
||||||
|
|
||||||
use actix_web::http::header::{
|
use actix_web::http::header::{
|
||||||
ACCESS_CONTROL_ALLOW_ORIGIN, ACCESS_CONTROL_EXPOSE_HEADERS, CACHE_CONTROL, CONTENT_LENGTH,
|
ACCESS_CONTROL_ALLOW_ORIGIN, ACCESS_CONTROL_EXPOSE_HEADERS, CACHE_CONTROL, CONTENT_LENGTH,
|
||||||
|
@ -191,9 +191,9 @@ async fn fetch_image(
|
||||||
file_name: String,
|
file_name: String,
|
||||||
is_data_saver: bool,
|
is_data_saver: bool,
|
||||||
) -> ServerResponse {
|
) -> ServerResponse {
|
||||||
let key = Arc::new(CacheKey(chapter_hash, file_name, is_data_saver));
|
let key = CacheKey(chapter_hash, file_name, is_data_saver);
|
||||||
|
|
||||||
match cache.get(Arc::clone(&key)).await {
|
match cache.get(&key).await {
|
||||||
Some(Ok((image, metadata))) => {
|
Some(Ok((image, metadata))) => {
|
||||||
return construct_response(image, &metadata);
|
return construct_response(image, &metadata);
|
||||||
}
|
}
|
||||||
|
@ -263,7 +263,7 @@ async fn fetch_image(
|
||||||
|
|
||||||
let metadata = ImageMetadata::new(content_type, length, last_mod).unwrap();
|
let metadata = ImageMetadata::new(content_type, length, last_mod).unwrap();
|
||||||
let stream = {
|
let stream = {
|
||||||
match cache.put(key, Box::new(body), metadata).await {
|
match cache.put(&key, Box::new(body), metadata).await {
|
||||||
Ok(stream) => stream,
|
Ok(stream) => stream,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!("Failed to insert into cache: {}", e);
|
warn!("Failed to insert into cache: {}", e);
|
||||||
|
|
Loading…
Reference in a new issue