Compare commits
No commits in common. "eab0449a02b220ea276daa55c557dbe1e20a53c6" and "53c0cb664a2b6b37af68027c5992295be9cddd7d" have entirely different histories.
eab0449a02
...
53c0cb664a
10 changed files with 159 additions and 557 deletions
312
Cargo.lock
generated
312
Cargo.lock
generated
|
@ -234,12 +234,6 @@ dependencies = [
|
|||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "arrayvec"
|
||||
version = "0.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b"
|
||||
|
||||
[[package]]
|
||||
name = "async-trait"
|
||||
version = "0.1.50"
|
||||
|
@ -251,15 +245,6 @@ dependencies = [
|
|||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "atoi"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "616896e05fc0e2649463a93a15183c6a16bf03413a7af88ef1285ddedfa9cda5"
|
||||
dependencies = [
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "atty"
|
||||
version = "0.2.14"
|
||||
|
@ -304,18 +289,6 @@ version = "1.2.1"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
|
||||
|
||||
[[package]]
|
||||
name = "bitvec"
|
||||
version = "0.19.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8942c8d352ae1838c9dda0b0ca2ab657696ef2232a20147cf1b30ae1a9cb4321"
|
||||
dependencies = [
|
||||
"funty",
|
||||
"radium",
|
||||
"tap",
|
||||
"wyz",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "block-buffer"
|
||||
version = "0.9.0"
|
||||
|
@ -345,24 +318,12 @@ dependencies = [
|
|||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "build_const"
|
||||
version = "0.2.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b4ae4235e6dac0694637c763029ecea1a2ec9e4e06ec2729bd21ba4d9c863eb7"
|
||||
|
||||
[[package]]
|
||||
name = "bumpalo"
|
||||
version = "3.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "63396b8a4b9de3f4fdfb320ab6080762242f66a8ef174c49d8e19b674db4cdbe"
|
||||
|
||||
[[package]]
|
||||
name = "byteorder"
|
||||
version = "1.4.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
|
||||
|
||||
[[package]]
|
||||
name = "bytes"
|
||||
version = "1.0.1"
|
||||
|
@ -477,15 +438,6 @@ version = "0.1.2"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8aebca1129a03dc6dc2b127edd729435bbc4a37e1d5f4d7513165089ceb02634"
|
||||
|
||||
[[package]]
|
||||
name = "crc"
|
||||
version = "1.8.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d663548de7f5cca343f1e0a48d14dcfb0e9eb4e079ec58883b7251539fa10aeb"
|
||||
dependencies = [
|
||||
"build_const",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crc32fast"
|
||||
version = "1.2.1"
|
||||
|
@ -495,37 +447,6 @@ dependencies = [
|
|||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-channel"
|
||||
version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-queue"
|
||||
version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0f6cb3c7f5b8e51bc3ebb73a2327ad4abdbd119dc13223f14f961d2f38486756"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-utils"
|
||||
version = "0.8.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e7e9d99fa91428effe99c5c6d4634cdeba32b8cf784fc428a2a687f61a952c49"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"cfg-if",
|
||||
"lazy_static",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ctrlc"
|
||||
version = "3.1.9"
|
||||
|
@ -612,12 +533,6 @@ dependencies = [
|
|||
"percent-encoding",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "funty"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fed34cd105917e91daa4da6b3728c47b068749d6a62c59811f06ed2ac71d9da7"
|
||||
|
||||
[[package]]
|
||||
name = "futures"
|
||||
version = "0.3.14"
|
||||
|
@ -759,15 +674,6 @@ dependencies = [
|
|||
"ahash 0.4.7",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hashlink"
|
||||
version = "0.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d99cf782f0dc4372d26846bec3de7804ceb5df083c2d4462c0b8d2330e894fa8"
|
||||
dependencies = [
|
||||
"hashbrown",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "heck"
|
||||
version = "0.3.2"
|
||||
|
@ -786,12 +692,6 @@ dependencies = [
|
|||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hex"
|
||||
version = "0.4.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
|
||||
|
||||
[[package]]
|
||||
name = "http"
|
||||
version = "0.2.4"
|
||||
|
@ -928,19 +828,6 @@ version = "1.4.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
|
||||
|
||||
[[package]]
|
||||
name = "lexical-core"
|
||||
version = "0.7.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "21f866863575d0e1d654fbeeabdc927292fdf862873dc3c96c6f753357e13374"
|
||||
dependencies = [
|
||||
"arrayvec",
|
||||
"bitflags",
|
||||
"cfg-if",
|
||||
"ryu",
|
||||
"static_assertions",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.93"
|
||||
|
@ -958,17 +845,6 @@ dependencies = [
|
|||
"pkg-config",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libsqlite3-sys"
|
||||
version = "0.22.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "19cb1effde5f834799ac5e5ef0e40d45027cd74f271b1de786ba8abb30e2164d"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"pkg-config",
|
||||
"vcpkg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "local-channel"
|
||||
version = "0.1.2"
|
||||
|
@ -1038,7 +914,6 @@ dependencies = [
|
|||
"serde_json",
|
||||
"simple_logger",
|
||||
"sodiumoxide",
|
||||
"sqlx",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
|
@ -1046,12 +921,6 @@ dependencies = [
|
|||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "maplit"
|
||||
version = "1.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d"
|
||||
|
||||
[[package]]
|
||||
name = "matches"
|
||||
version = "0.1.8"
|
||||
|
@ -1114,19 +983,6 @@ dependencies = [
|
|||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nom"
|
||||
version = "6.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e7413f999671bd4745a7b624bd370a569fb6bc574b23c83a3c5ed2e453f3d5e2"
|
||||
dependencies = [
|
||||
"bitvec",
|
||||
"funty",
|
||||
"lexical-core",
|
||||
"memchr",
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ntapi"
|
||||
version = "0.3.6"
|
||||
|
@ -1318,12 +1174,6 @@ dependencies = [
|
|||
"proc-macro2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "radium"
|
||||
version = "0.5.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "941ba9d78d8e2f7ce474c015eea4d9c6d25b6a3327f9832ee29a4de27f91bbb8"
|
||||
|
||||
[[package]]
|
||||
name = "rand"
|
||||
version = "0.8.3"
|
||||
|
@ -1562,19 +1412,6 @@ version = "0.6.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d"
|
||||
|
||||
[[package]]
|
||||
name = "sha2"
|
||||
version = "0.9.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fa827a14b29ab7f44778d14a88d3cb76e949c45083f7dbfa507d0cb699dc12de"
|
||||
dependencies = [
|
||||
"block-buffer",
|
||||
"cfg-if",
|
||||
"cpuid-bool",
|
||||
"digest",
|
||||
"opaque-debug",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "signal-hook-registry"
|
||||
version = "1.3.0"
|
||||
|
@ -1636,105 +1473,6 @@ version = "0.5.2"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
|
||||
|
||||
[[package]]
|
||||
name = "sqlformat"
|
||||
version = "0.1.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6d86e3c77ff882a828346ba401a7ef4b8e440df804491c6064fe8295765de71c"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
"maplit",
|
||||
"nom",
|
||||
"regex",
|
||||
"unicode_categories",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sqlx"
|
||||
version = "0.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d582b9bc04ec6c03084196efc42c2226b018e9941f03ee62bd88921d500917c0"
|
||||
dependencies = [
|
||||
"sqlx-core",
|
||||
"sqlx-macros",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sqlx-core"
|
||||
version = "0.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "de52d1d473cebb2abb79c886ef6a8023e965e34c0676a99cfeac2cc7f0fde4c1"
|
||||
dependencies = [
|
||||
"ahash 0.7.2",
|
||||
"atoi",
|
||||
"bitflags",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"crc",
|
||||
"crossbeam-channel",
|
||||
"crossbeam-queue",
|
||||
"crossbeam-utils",
|
||||
"either",
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"hashlink",
|
||||
"hex",
|
||||
"itoa",
|
||||
"libc",
|
||||
"libsqlite3-sys",
|
||||
"log",
|
||||
"memchr",
|
||||
"once_cell",
|
||||
"parking_lot",
|
||||
"percent-encoding",
|
||||
"rustls",
|
||||
"sha2",
|
||||
"smallvec",
|
||||
"sqlformat",
|
||||
"sqlx-rt",
|
||||
"stringprep",
|
||||
"thiserror",
|
||||
"time 0.2.26",
|
||||
"tokio-stream",
|
||||
"url",
|
||||
"webpki",
|
||||
"webpki-roots",
|
||||
"whoami",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sqlx-macros"
|
||||
version = "0.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1a40f0be97e704d3fbf059e7e3333c3735639146a72d586c5534c70e79da88a4"
|
||||
dependencies = [
|
||||
"dotenv",
|
||||
"either",
|
||||
"futures",
|
||||
"heck",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"sha2",
|
||||
"sqlx-core",
|
||||
"sqlx-rt",
|
||||
"syn",
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sqlx-rt"
|
||||
version = "0.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b6ae97ab05063ed515cdc23d90253213aa24dda0a288c5ec079af3d10f9771bc"
|
||||
dependencies = [
|
||||
"actix-rt",
|
||||
"once_cell",
|
||||
"tokio",
|
||||
"tokio-rustls",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "standback"
|
||||
version = "0.2.17"
|
||||
|
@ -1744,12 +1482,6 @@ dependencies = [
|
|||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "static_assertions"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
|
||||
|
||||
[[package]]
|
||||
name = "stdweb"
|
||||
version = "0.4.20"
|
||||
|
@ -1799,16 +1531,6 @@ version = "0.1.5"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0"
|
||||
|
||||
[[package]]
|
||||
name = "stringprep"
|
||||
version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1"
|
||||
dependencies = [
|
||||
"unicode-bidi",
|
||||
"unicode-normalization",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "strsim"
|
||||
version = "0.10.0"
|
||||
|
@ -1826,12 +1548,6 @@ dependencies = [
|
|||
"unicode-xid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tap"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369"
|
||||
|
||||
[[package]]
|
||||
name = "termcolor"
|
||||
version = "1.1.2"
|
||||
|
@ -2086,12 +1802,6 @@ version = "0.2.1"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564"
|
||||
|
||||
[[package]]
|
||||
name = "unicode_categories"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e"
|
||||
|
||||
[[package]]
|
||||
name = "untrusted"
|
||||
version = "0.7.1"
|
||||
|
@ -2111,12 +1821,6 @@ dependencies = [
|
|||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "vcpkg"
|
||||
version = "0.2.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cbdbff6266a24120518560b5dc983096efb98462e51d0d68169895b237be3e5d"
|
||||
|
||||
[[package]]
|
||||
name = "vec_map"
|
||||
version = "0.8.2"
|
||||
|
@ -2242,16 +1946,6 @@ dependencies = [
|
|||
"webpki",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "whoami"
|
||||
version = "1.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4abacf325c958dfeaf1046931d37f2a901b6dfe0968ee965a29e94c6766b2af6"
|
||||
dependencies = [
|
||||
"wasm-bindgen",
|
||||
"web-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "winapi"
|
||||
version = "0.3.9"
|
||||
|
@ -2291,9 +1985,3 @@ checksum = "0120db82e8a1e0b9fb3345a539c478767c0048d842860994d96113d5b667bd69"
|
|||
dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wyz"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "85e60b0d1b5f99db2556934e21937020776a5d31520bf169e851ac44e6420214"
|
||||
|
|
|
@ -29,7 +29,6 @@ serde = "1"
|
|||
serde_json = "1"
|
||||
simple_logger = "1"
|
||||
sodiumoxide = "0.2"
|
||||
sqlx = { version = "0.5", features = [ "runtime-actix-rustls", "sqlite", "time", "chrono", "macros" ] }
|
||||
thiserror = "1"
|
||||
tokio = { version = "1", features = [ "full", "parking_lot" ] }
|
||||
tokio-stream = { version = "0.1", features = [ "sync" ] }
|
||||
|
@ -39,4 +38,3 @@ url = { version = "2", features = [ "serde" ] }
|
|||
[profile.release]
|
||||
lto = true
|
||||
codegen-units = 1
|
||||
debug = true
|
|
@ -1,7 +0,0 @@
|
|||
create table if not exists Images(
|
||||
id varchar primary key not null,
|
||||
size integer not null,
|
||||
accessed timestamp not null default CURRENT_TIMESTAMP,
|
||||
disk_size integer as ((size + 4095) / 4096 * 4096)
|
||||
);
|
||||
create index if not exists Images_accessed on Images(accessed);
|
|
@ -1,9 +0,0 @@
|
|||
#!/usr/bin/env bash
|
||||
|
||||
# This script needs to be run once in order for compile time macros to not
|
||||
# complain about a missing DB
|
||||
|
||||
# We can trust that our program will initialize the db at runtime the same way
|
||||
# as it pulls from the same file for initialization
|
||||
|
||||
sqlite3 cache/metadata.sqlite < db_queries/init.sql
|
84
src/cache/fs.rs
vendored
84
src/cache/fs.rs
vendored
|
@ -3,13 +3,11 @@ use bytes::{Buf, Bytes};
|
|||
use futures::{Stream, StreamExt};
|
||||
use log::{debug, error};
|
||||
use once_cell::sync::Lazy;
|
||||
use serde::Deserialize;
|
||||
use std::fmt::Display;
|
||||
use std::num::NonZeroU64;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use std::{collections::HashMap, num::NonZeroU64};
|
||||
use tokio::fs::{create_dir_all, remove_file, File};
|
||||
use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf};
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
|
@ -18,7 +16,7 @@ use tokio::sync::RwLock;
|
|||
use tokio_stream::wrappers::WatchStream;
|
||||
use tokio_util::codec::{BytesCodec, FramedRead};
|
||||
|
||||
use super::{BoxedImageStream, CacheStream, CacheStreamItem, ImageMetadata};
|
||||
use super::{BoxedImageStream, CacheStream, CacheStreamItem};
|
||||
|
||||
/// Keeps track of files that are currently being written to.
|
||||
///
|
||||
|
@ -40,27 +38,26 @@ static WRITING_STATUS: Lazy<RwLock<HashMap<PathBuf, Receiver<WritingStatus>>>> =
|
|||
Lazy::new(|| RwLock::new(HashMap::new()));
|
||||
|
||||
/// Tries to read from the file, returning a byte stream if it exists
|
||||
pub async fn read_file(
|
||||
path: &Path,
|
||||
) -> Option<Result<(CacheStream, ImageMetadata), std::io::Error>> {
|
||||
let file = File::open(path).await.ok()?;
|
||||
let std_file = file.try_clone().await.unwrap().into_std().await;
|
||||
pub async fn read_file(path: &Path) -> Option<Result<CacheStream, std::io::Error>> {
|
||||
if path.exists() {
|
||||
let status = WRITING_STATUS.read().await.get(path).map(Clone::clone);
|
||||
|
||||
let metadata = {
|
||||
let mut de = serde_json::Deserializer::from_reader(std_file);
|
||||
ImageMetadata::deserialize(&mut de).ok()?
|
||||
};
|
||||
|
||||
let stream = if let Some(status) = WRITING_STATUS.read().await.get(path).map(Clone::clone) {
|
||||
CacheStream::Concurrent(ConcurrentFsStream::from_file(
|
||||
file,
|
||||
WatchStream::new(status),
|
||||
))
|
||||
if let Some(status) = status {
|
||||
Some(
|
||||
ConcurrentFsStream::new(path, WatchStream::new(status))
|
||||
.await
|
||||
.map(CacheStream::Concurrent),
|
||||
)
|
||||
} else {
|
||||
Some(
|
||||
File::open(path)
|
||||
.await
|
||||
.map(|f| CacheStream::Completed(FramedRead::new(f, BytesCodec::new()))),
|
||||
)
|
||||
}
|
||||
} else {
|
||||
CacheStream::Completed(FramedRead::new(file, BytesCodec::new()))
|
||||
};
|
||||
|
||||
Some(Ok((stream, metadata)))
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Maps the input byte stream into one that writes to disk instead, returning
|
||||
|
@ -68,7 +65,6 @@ pub async fn read_file(
|
|||
pub async fn write_file(
|
||||
path: &Path,
|
||||
mut byte_stream: BoxedImageStream,
|
||||
metadata: ImageMetadata,
|
||||
notifier: UnboundedSender<u64>,
|
||||
) -> Result<CacheStream, std::io::Error> {
|
||||
let (tx, rx) = channel(WritingStatus::NotDone);
|
||||
|
@ -77,7 +73,7 @@ pub async fn write_file(
|
|||
let mut write_lock = WRITING_STATUS.write().await;
|
||||
let parent = path.parent().unwrap();
|
||||
create_dir_all(parent).await?;
|
||||
let file = File::create(path).await?; // we need to make sure the file exists and is truncated.
|
||||
let file = File::create(path).await?; // we need to make sure the file exists and is truncated.
|
||||
write_lock.insert(path.to_path_buf(), rx.clone());
|
||||
file
|
||||
};
|
||||
|
@ -88,8 +84,6 @@ pub async fn write_file(
|
|||
let path_buf = path_buf; // moves path buf into async
|
||||
let mut errored = false;
|
||||
let mut bytes_written: u64 = 0;
|
||||
file.write_all(serde_json::to_string(&metadata).unwrap().as_bytes())
|
||||
.await?;
|
||||
while let Some(bytes) = byte_stream.next().await {
|
||||
if let Ok(mut bytes) = bytes {
|
||||
loop {
|
||||
|
@ -119,19 +113,17 @@ pub async fn write_file(
|
|||
debug!("writing to file done");
|
||||
}
|
||||
|
||||
{
|
||||
let mut write_lock = WRITING_STATUS.write().await;
|
||||
// This needs to be written atomically with the write lock, else
|
||||
// it's possible we have an inconsistent state
|
||||
//
|
||||
// We don't really care if we have no receivers
|
||||
if errored {
|
||||
let _ = tx.send(WritingStatus::Error);
|
||||
} else {
|
||||
let _ = tx.send(WritingStatus::Done(bytes_written));
|
||||
}
|
||||
write_lock.remove(&path_buf);
|
||||
let mut write_lock = WRITING_STATUS.write().await;
|
||||
// This needs to be written atomically with the write lock, else
|
||||
// it's possible we have an inconsistent state
|
||||
//
|
||||
// We don't really care if we have no receivers
|
||||
if errored {
|
||||
let _ = tx.send(WritingStatus::Error);
|
||||
} else {
|
||||
let _ = tx.send(WritingStatus::Done(bytes_written));
|
||||
}
|
||||
write_lock.remove(&path_buf);
|
||||
|
||||
// notify
|
||||
if let Err(e) = notifier.send(bytes_written) {
|
||||
|
@ -162,18 +154,12 @@ impl ConcurrentFsStream {
|
|||
path: &Path,
|
||||
receiver: WatchStream<WritingStatus>,
|
||||
) -> Result<Self, std::io::Error> {
|
||||
File::open(path)
|
||||
.await
|
||||
.map(|file| Self::from_file(file, receiver))
|
||||
}
|
||||
|
||||
fn from_file(file: File, receiver: WatchStream<WritingStatus>) -> Self {
|
||||
Self {
|
||||
file: Box::pin(file),
|
||||
Ok(Self {
|
||||
file: Box::pin(File::open(path).await?),
|
||||
receiver: Box::pin(receiver),
|
||||
bytes_read: 0,
|
||||
bytes_total: None,
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -196,7 +182,7 @@ impl Stream for ConcurrentFsStream {
|
|||
// First, try to read from the file...
|
||||
|
||||
// TODO: Might be more efficient to have a larger buffer
|
||||
let mut bytes = [0; 4 * 1024].to_vec();
|
||||
let mut bytes = [0; 8 * 1024].to_vec();
|
||||
let mut buffer = ReadBuf::new(&mut bytes);
|
||||
match self.file.as_mut().poll_read(cx, &mut buffer) {
|
||||
Poll::Ready(Ok(_)) => (),
|
||||
|
|
110
src/cache/generational.rs
vendored
110
src/cache/generational.rs
vendored
|
@ -1,4 +1,4 @@
|
|||
use std::{path::PathBuf, sync::Arc};
|
||||
use std::path::PathBuf;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
|
@ -136,70 +136,67 @@ impl GenerationalCache {
|
|||
#[async_trait]
|
||||
impl Cache for GenerationalCache {
|
||||
async fn get(
|
||||
&self,
|
||||
key: Arc<CacheKey>,
|
||||
) -> Option<Result<(CacheStream, ImageMetadata), CacheError>> {
|
||||
todo!();
|
||||
// if self.in_memory.contains(key) {
|
||||
// return self
|
||||
// .in_memory
|
||||
// .get(key)
|
||||
// // TODO: get rid of clone?
|
||||
// .map(|(image, metadata)| Ok((CacheStream::from(image.clone()), metadata.clone())));
|
||||
// }
|
||||
&mut self,
|
||||
key: &CacheKey,
|
||||
) -> Option<Result<(CacheStream, &ImageMetadata), CacheError>> {
|
||||
if self.in_memory.contains(key) {
|
||||
return self
|
||||
.in_memory
|
||||
.get(key)
|
||||
// TODO: get rid of clone?
|
||||
.map(|(image, metadata)| Ok((CacheStream::from(image.clone()), metadata)));
|
||||
}
|
||||
|
||||
// if let Some(metadata) = self.on_disk.pop(key) {
|
||||
// let new_key_path = {
|
||||
// let mut root = self.disk_path.clone();
|
||||
// root.push(PathBuf::from(key.clone()));
|
||||
// root
|
||||
// };
|
||||
if let Some(metadata) = self.on_disk.pop(key) {
|
||||
let new_key_path = {
|
||||
let mut root = self.disk_path.clone();
|
||||
root.push(PathBuf::from(key.clone()));
|
||||
root
|
||||
};
|
||||
|
||||
// // extract from disk, if it exists
|
||||
// let file = File::open(&new_key_path).await;
|
||||
// extract from disk, if it exists
|
||||
let file = File::open(&new_key_path).await;
|
||||
|
||||
// let mut buffer = metadata
|
||||
// .content_length
|
||||
// .map_or_else(Vec::new, |v| Vec::with_capacity(v as usize));
|
||||
let mut buffer = metadata
|
||||
.content_length
|
||||
.map_or_else(Vec::new, |v| Vec::with_capacity(v as usize));
|
||||
|
||||
// match file {
|
||||
// Ok(mut file) => {
|
||||
// match file.read_to_end(&mut buffer).await {
|
||||
// Ok(_) => {
|
||||
// // We don't particularly care if we fail to delete from disk since
|
||||
// // if it's an error it means it's already been dropped.
|
||||
// tokio::spawn(remove_file(new_key_path));
|
||||
// }
|
||||
// Err(e) => {
|
||||
// warn!("Failed to read from {:?}: {}", new_key_path, e);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// Err(e) => {
|
||||
// warn!("Failed to open {:?}: {}", new_key_path, e);
|
||||
// return None;
|
||||
// }
|
||||
// }
|
||||
match file {
|
||||
Ok(mut file) => {
|
||||
match file.read_to_end(&mut buffer).await {
|
||||
Ok(_) => {
|
||||
// We don't particularly care if we fail to delete from disk since
|
||||
// if it's an error it means it's already been dropped.
|
||||
tokio::spawn(remove_file(new_key_path));
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to read from {:?}: {}", new_key_path, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to open {:?}: {}", new_key_path, e);
|
||||
return None;
|
||||
}
|
||||
}
|
||||
|
||||
// buffer.shrink_to_fit();
|
||||
buffer.shrink_to_fit();
|
||||
|
||||
// todo!();
|
||||
// self.disk_cur_size -= buffer.len() as u64;
|
||||
// // let image = CacheStream::from(CachedImage(Bytes::from(buffer))).map_err(|e| e.into());
|
||||
self.disk_cur_size -= buffer.len() as u64;
|
||||
let image = CacheStream::from(CachedImage(Bytes::from(buffer))).map_err(|e| e.into());
|
||||
|
||||
// // return Some(self.put(key.clone(), Box::new(image), metadata).await);
|
||||
// }
|
||||
return Some(self.put(key.clone(), Box::new(image), metadata).await);
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
async fn put(
|
||||
&self,
|
||||
key: Arc<CacheKey>,
|
||||
&mut self,
|
||||
key: CacheKey,
|
||||
mut image: BoxedImageStream,
|
||||
metadata: ImageMetadata,
|
||||
) -> Result<CacheStream, CacheError> {
|
||||
todo!();
|
||||
) -> Result<(CacheStream, &ImageMetadata), CacheError> {
|
||||
let mut hot_evicted = vec![];
|
||||
|
||||
let image = {
|
||||
|
@ -227,14 +224,12 @@ impl Cache for GenerationalCache {
|
|||
}
|
||||
}
|
||||
|
||||
todo!();
|
||||
// self.in_memory.put(key.clone(), (image, metadata));
|
||||
self.in_memory.put(key.clone(), (image, metadata));
|
||||
self.memory_cur_size += new_img_size;
|
||||
} else {
|
||||
// Image was larger than memory capacity, push directly into cold
|
||||
// storage.
|
||||
todo!();
|
||||
// self.push_into_cold(key.clone(), image, metadata).await;
|
||||
self.push_into_cold(key.clone(), image, metadata).await;
|
||||
};
|
||||
|
||||
// Push evicted hot entires into cold storage.
|
||||
|
@ -242,10 +237,9 @@ impl Cache for GenerationalCache {
|
|||
self.push_into_cold(key, image, metadata).await;
|
||||
}
|
||||
|
||||
todo!();
|
||||
// self.get(&key).await.unwrap()
|
||||
self.get(&key).await.unwrap()
|
||||
}
|
||||
|
||||
// noop
|
||||
async fn increase_usage(&self, _amt: u64) {}
|
||||
async fn increase_usage(&mut self, _amt: u64) {}
|
||||
}
|
||||
|
|
129
src/cache/low_mem.rs
vendored
129
src/cache/low_mem.rs
vendored
|
@ -1,131 +1,88 @@
|
|||
//! Low memory caching stuff
|
||||
|
||||
use std::sync::{atomic::Ordering, Arc};
|
||||
use std::{path::PathBuf, sync::atomic::AtomicU64};
|
||||
use std::{path::PathBuf, sync::Arc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use futures::StreamExt;
|
||||
use sqlx::SqlitePool;
|
||||
use tokio::sync::mpsc::{channel, unbounded_channel, Sender, UnboundedSender};
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use lru::LruCache;
|
||||
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use super::{BoxedImageStream, Cache, CacheError, CacheKey, CacheStream, ImageMetadata};
|
||||
|
||||
pub struct LowMemCache {
|
||||
on_disk: LruCache<CacheKey, ImageMetadata>,
|
||||
disk_path: PathBuf,
|
||||
disk_max_size: u64,
|
||||
disk_cur_size: AtomicU64,
|
||||
file_size_channel_sender: UnboundedSender<u64>,
|
||||
db_update_channel_sender: Sender<DbMessage>,
|
||||
}
|
||||
|
||||
enum DbMessage {
|
||||
Get(Arc<CacheKey>),
|
||||
Put(Arc<CacheKey>, ImageMetadata),
|
||||
disk_cur_size: u64,
|
||||
master_sender: UnboundedSender<u64>,
|
||||
}
|
||||
|
||||
impl LowMemCache {
|
||||
/// Constructs a new low memory cache at the provided path and capaci ty.
|
||||
/// Constructs a new low memory cache at the provided path and capacity.
|
||||
/// This internally spawns a task that will wait for filesystem
|
||||
/// notifications when a file has been written.
|
||||
#[allow(clippy::new_ret_no_self)]
|
||||
pub async fn new(disk_max_size: u64, disk_path: PathBuf) -> Arc<Box<dyn Cache>> {
|
||||
let (file_size_tx, mut file_size_rx) = unbounded_channel();
|
||||
let (db_tx, db_rx) = channel(128);
|
||||
let db_pool = {
|
||||
let db_url = format!(
|
||||
"sqlite:{}/metadata.sqlite?mode=rwc",
|
||||
disk_path.to_str().unwrap()
|
||||
);
|
||||
let db = SqlitePool::connect(&db_url).await.unwrap();
|
||||
|
||||
// Run db init
|
||||
sqlx::query_file!("./db_queries/init.sql")
|
||||
.execute(&mut db.acquire().await.unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
db
|
||||
};
|
||||
|
||||
let new_self: Arc<Box<dyn Cache>> = Arc::new(Box::new(Self {
|
||||
pub fn new(disk_max_size: u64, disk_path: PathBuf) -> Arc<RwLock<Box<dyn Cache>>> {
|
||||
let (tx, mut rx) = unbounded_channel();
|
||||
let new_self: Arc<RwLock<Box<dyn Cache>>> = Arc::new(RwLock::new(Box::new(Self {
|
||||
on_disk: LruCache::unbounded(),
|
||||
disk_path,
|
||||
disk_max_size,
|
||||
disk_cur_size: AtomicU64::new(0),
|
||||
file_size_channel_sender: file_size_tx,
|
||||
db_update_channel_sender: db_tx,
|
||||
}));
|
||||
disk_cur_size: 0,
|
||||
master_sender: tx,
|
||||
})));
|
||||
|
||||
// Spawns a new task that continuously listens for events received by
|
||||
// the channel, which informs the low memory cache the total size of the
|
||||
// item that was put into the cache.
|
||||
let new_self_0 = Arc::clone(&new_self);
|
||||
tokio::spawn(async move {
|
||||
// This will never return None, effectively a loop
|
||||
while let Some(new_size) = file_size_rx.recv().await {
|
||||
new_self_0.increase_usage(new_size).await;
|
||||
}
|
||||
});
|
||||
|
||||
// Spawn a new task that will listen for updates to the db.
|
||||
tokio::spawn(async move {
|
||||
let db_pool = db_pool;
|
||||
let mut recv_stream = ReceiverStream::new(db_rx).ready_chunks(128);
|
||||
while let Some(messages) = recv_stream.next().await {
|
||||
let mut transaction = db_pool.begin().await.unwrap();
|
||||
for message in messages {}
|
||||
|
||||
transaction.commit().await.unwrap();
|
||||
while let Some(new_size) = rx.recv().await {
|
||||
new_self_0.write().await.increase_usage(new_size).await;
|
||||
}
|
||||
});
|
||||
|
||||
new_self
|
||||
}
|
||||
|
||||
async fn prune(&mut self) {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Cache for LowMemCache {
|
||||
async fn get(
|
||||
&self,
|
||||
key: Arc<CacheKey>,
|
||||
) -> Option<Result<(CacheStream, ImageMetadata), CacheError>> {
|
||||
let channel = self.db_update_channel_sender.clone();
|
||||
let key_0 = Arc::clone(&key);
|
||||
|
||||
tokio::spawn(async move { channel.send(DbMessage::Get(key_0)).await });
|
||||
|
||||
let path = self
|
||||
.disk_path
|
||||
.clone()
|
||||
.join(PathBuf::from(Arc::clone(&key).as_ref()));
|
||||
&mut self,
|
||||
key: &CacheKey,
|
||||
) -> Option<Result<(CacheStream, &ImageMetadata), CacheError>> {
|
||||
let metadata = self.on_disk.get(key)?;
|
||||
let path = self.disk_path.clone().join(PathBuf::from(key.clone()));
|
||||
super::fs::read_file(&path)
|
||||
.await
|
||||
.map(|res| res.map_err(Into::into))
|
||||
.map(|res| res.map(|stream| (stream, metadata)).map_err(Into::into))
|
||||
}
|
||||
|
||||
async fn put(
|
||||
&self,
|
||||
key: Arc<CacheKey>,
|
||||
&mut self,
|
||||
key: CacheKey,
|
||||
image: BoxedImageStream,
|
||||
metadata: ImageMetadata,
|
||||
) -> Result<CacheStream, CacheError> {
|
||||
let channel = self.db_update_channel_sender.clone();
|
||||
let key_0 = Arc::clone(&key);
|
||||
tokio::spawn(async move { channel.send(DbMessage::Put(key_0, metadata)).await });
|
||||
|
||||
let path = self.disk_path.clone().join(PathBuf::from(key.as_ref()));
|
||||
|
||||
super::fs::write_file(
|
||||
&path,
|
||||
image,
|
||||
metadata,
|
||||
self.file_size_channel_sender.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(Into::into)
|
||||
) -> Result<(CacheStream, &ImageMetadata), CacheError> {
|
||||
let path = self.disk_path.clone().join(PathBuf::from(key.clone()));
|
||||
self.on_disk.put(key.clone(), metadata);
|
||||
super::fs::write_file(&path, image, self.master_sender.clone())
|
||||
.await
|
||||
.map(move |stream| (stream, self.on_disk.get(&key).unwrap()))
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
async fn increase_usage(&self, amt: u64) {
|
||||
self.disk_cur_size.fetch_add(amt, Ordering::Release);
|
||||
/// Increments the internal size counter, pruning if the value exceeds the
|
||||
/// user-defined capacity.
|
||||
async fn increase_usage(&mut self, amt: u64) {
|
||||
self.disk_cur_size += amt;
|
||||
if self.disk_cur_size > self.disk_max_size {
|
||||
self.prune().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
33
src/cache/mod.rs
vendored
33
src/cache/mod.rs
vendored
|
@ -1,8 +1,8 @@
|
|||
use std::fmt::Display;
|
||||
use std::path::PathBuf;
|
||||
use std::pin::Pin;
|
||||
use std::str::FromStr;
|
||||
use std::task::{Context, Poll};
|
||||
use std::{fmt::Display, sync::Arc};
|
||||
|
||||
use actix_web::http::HeaderValue;
|
||||
use async_trait::async_trait;
|
||||
|
@ -10,14 +10,13 @@ use bytes::{Bytes, BytesMut};
|
|||
use chrono::{DateTime, FixedOffset};
|
||||
use fs::ConcurrentFsStream;
|
||||
use futures::{Stream, StreamExt};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use thiserror::Error;
|
||||
use tokio::fs::File;
|
||||
use tokio_util::codec::{BytesCodec, FramedRead};
|
||||
|
||||
pub use fs::UpstreamError;
|
||||
pub use generational::GenerationalCache;
|
||||
pub use low_mem::LowMemCache;
|
||||
use tokio::fs::File;
|
||||
use tokio_util::codec::{BytesCodec, FramedRead};
|
||||
|
||||
mod fs;
|
||||
mod generational;
|
||||
|
@ -43,25 +42,19 @@ impl From<CacheKey> for PathBuf {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<&CacheKey> for PathBuf {
|
||||
#[inline]
|
||||
fn from(key: &CacheKey) -> Self {
|
||||
key.to_string().into()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct CachedImage(pub Bytes);
|
||||
|
||||
#[derive(Copy, Clone, Serialize, Deserialize)]
|
||||
#[derive(Copy, Clone)]
|
||||
pub struct ImageMetadata {
|
||||
pub content_type: Option<ImageContentType>,
|
||||
// If we can guarantee a non-zero u32 here we can save 4 bytes
|
||||
pub content_length: Option<u32>,
|
||||
pub last_modified: Option<DateTime<FixedOffset>>,
|
||||
}
|
||||
|
||||
// Confirmed by Ply to be these types: https://link.eddie.sh/ZXfk0
|
||||
#[derive(Copy, Clone, Serialize, Deserialize)]
|
||||
#[derive(Copy, Clone)]
|
||||
pub enum ImageContentType {
|
||||
Png,
|
||||
Jpeg,
|
||||
|
@ -155,18 +148,18 @@ pub enum CacheError {
|
|||
#[async_trait]
|
||||
pub trait Cache: Send + Sync {
|
||||
async fn get(
|
||||
&self,
|
||||
key: Arc<CacheKey>,
|
||||
) -> Option<Result<(CacheStream, ImageMetadata), CacheError>>;
|
||||
&mut self,
|
||||
key: &CacheKey,
|
||||
) -> Option<Result<(CacheStream, &ImageMetadata), CacheError>>;
|
||||
|
||||
async fn put(
|
||||
&self,
|
||||
key: Arc<CacheKey>,
|
||||
&mut self,
|
||||
key: CacheKey,
|
||||
image: BoxedImageStream,
|
||||
metadata: ImageMetadata,
|
||||
) -> Result<CacheStream, CacheError>;
|
||||
) -> Result<(CacheStream, &ImageMetadata), CacheError>;
|
||||
|
||||
async fn increase_usage(&self, amt: u64);
|
||||
async fn increase_usage(&mut self, amt: u64);
|
||||
}
|
||||
|
||||
pub enum CacheStream {
|
||||
|
|
|
@ -22,6 +22,7 @@ use simple_logger::SimpleLogger;
|
|||
use state::{RwLockServerState, ServerState};
|
||||
use stop::send_stop;
|
||||
use thiserror::Error;
|
||||
use tokio::sync::RwLock as TokioRwLock;
|
||||
|
||||
mod cache;
|
||||
mod config;
|
||||
|
@ -123,14 +124,14 @@ async fn main() -> Result<(), std::io::Error> {
|
|||
}
|
||||
});
|
||||
|
||||
let cache: Arc<Box<dyn Cache>> = if low_mem_mode {
|
||||
LowMemCache::new(disk_quota, cache_path.clone()).await
|
||||
let cache: Arc<TokioRwLock<Box<dyn Cache>>> = if low_mem_mode {
|
||||
LowMemCache::new(disk_quota, cache_path.clone())
|
||||
} else {
|
||||
Arc::new(Box::new(GenerationalCache::new(
|
||||
Arc::new(TokioRwLock::new(Box::new(GenerationalCache::new(
|
||||
memory_max_size,
|
||||
disk_quota,
|
||||
cache_path.clone(),
|
||||
)))
|
||||
))))
|
||||
};
|
||||
let cache = Arc::clone(&cache);
|
||||
let cache1 = Arc::clone(&cache);
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::sync::{atomic::Ordering, Arc};
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use actix_web::http::header::{
|
||||
ACCESS_CONTROL_ALLOW_ORIGIN, ACCESS_CONTROL_EXPOSE_HEADERS, CACHE_CONTROL, CONTENT_LENGTH,
|
||||
|
@ -15,6 +15,7 @@ use log::{debug, error, info, warn};
|
|||
use serde::Deserialize;
|
||||
use sodiumoxide::crypto::box_::{open_precomputed, Nonce, PrecomputedKey, NONCEBYTES};
|
||||
use thiserror::Error;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::cache::{Cache, CacheKey, ImageMetadata, UpstreamError};
|
||||
use crate::client_api_version;
|
||||
|
@ -51,7 +52,7 @@ impl Responder for ServerResponse {
|
|||
#[get("/{token}/data/{chapter_hash}/{file_name}")]
|
||||
async fn token_data(
|
||||
state: Data<RwLockServerState>,
|
||||
cache: Data<Box<dyn Cache>>,
|
||||
cache: Data<RwLock<Box<dyn Cache>>>,
|
||||
path: Path<(String, String, String)>,
|
||||
) -> impl Responder {
|
||||
let (token, chapter_hash, file_name) = path.into_inner();
|
||||
|
@ -68,7 +69,7 @@ async fn token_data(
|
|||
#[get("/{token}/data-saver/{chapter_hash}/{file_name}")]
|
||||
async fn token_data_saver(
|
||||
state: Data<RwLockServerState>,
|
||||
cache: Data<Box<dyn Cache>>,
|
||||
cache: Data<RwLock<Box<dyn Cache>>>,
|
||||
path: Path<(String, String, String)>,
|
||||
) -> impl Responder {
|
||||
let (token, chapter_hash, file_name) = path.into_inner();
|
||||
|
@ -186,16 +187,16 @@ fn push_headers(builder: &mut HttpResponseBuilder) -> &mut HttpResponseBuilder {
|
|||
#[allow(clippy::future_not_send)]
|
||||
async fn fetch_image(
|
||||
state: Data<RwLockServerState>,
|
||||
cache: Data<Box<dyn Cache>>,
|
||||
cache: Data<RwLock<Box<dyn Cache>>>,
|
||||
chapter_hash: String,
|
||||
file_name: String,
|
||||
is_data_saver: bool,
|
||||
) -> ServerResponse {
|
||||
let key = Arc::new(CacheKey(chapter_hash, file_name, is_data_saver));
|
||||
let key = CacheKey(chapter_hash, file_name, is_data_saver);
|
||||
|
||||
match cache.get(Arc::clone(&key)).await {
|
||||
match cache.write().await.get(&key).await {
|
||||
Some(Ok((image, metadata))) => {
|
||||
return construct_response(image, &metadata);
|
||||
return construct_response(image, metadata);
|
||||
}
|
||||
Some(Err(_)) => {
|
||||
return ServerResponse::HttpResponse(HttpResponse::BadGateway().finish());
|
||||
|
@ -262,9 +263,9 @@ async fn fetch_image(
|
|||
debug!("Inserting into cache");
|
||||
|
||||
let metadata = ImageMetadata::new(content_type, length, last_mod).unwrap();
|
||||
let stream = {
|
||||
match cache.put(key, Box::new(body), metadata).await {
|
||||
Ok(stream) => stream,
|
||||
let (stream, metadata) = {
|
||||
match cache.write().await.put(key, Box::new(body), metadata).await {
|
||||
Ok((stream, metadata)) => (stream, *metadata),
|
||||
Err(e) => {
|
||||
warn!("Failed to insert into cache: {}", e);
|
||||
return ServerResponse::HttpResponse(
|
||||
|
|
Loading…
Reference in a new issue