From afa2cf55fae1fba7344d46467cde17a79a79fdc7 Mon Sep 17 00:00:00 2001 From: Edward Shen Date: Sat, 17 Jul 2021 13:32:43 -0400 Subject: [PATCH] Fix some future not send lints --- src/cache/mem.rs | 2 +- src/cache/mod.rs | 2 +- src/client.rs | 195 +++++++++++++++++++++++++---------------------- src/routes.rs | 4 +- 4 files changed, 105 insertions(+), 98 deletions(-) diff --git a/src/cache/mem.rs b/src/cache/mem.rs index 1efc52a..4a1f282 100644 --- a/src/cache/mem.rs +++ b/src/cache/mem.rs @@ -66,7 +66,7 @@ impl ToRedisArgs for CacheValue { where W: ?Sized + redis::RedisWrite, { - out.write_arg(&bincode::serialize(self).expect("serialization to work")) + out.write_arg(&bincode::serialize(self).expect("serialization to work")); } } diff --git a/src/cache/mod.rs b/src/cache/mod.rs index 7fc1699..19bd5cd 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -41,7 +41,7 @@ impl ToRedisArgs for CacheKey { where W: ?Sized + redis::RedisWrite, { - out.write_arg_fmt(self) + out.write_arg_fmt(self); } } diff --git a/src/client.rs b/src/client.rs index 22b9a98..95f2f1c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -84,8 +84,11 @@ impl CachingClient { key: CacheKey, cache: Data, ) -> FetchResult { - if let Some(recv) = self.locks.read().get(&url) { - let mut recv = recv.clone(); + let maybe_receiver = { + let lock = self.locks.read(); + lock.get(&url).map(Clone::clone) + }; + if let Some(mut recv) = maybe_receiver { loop { if !matches!(*recv.borrow(), FetchResult::Processing) { break; @@ -97,100 +100,17 @@ impl CachingClient { return recv.borrow().clone(); } - let url_0 = url.clone(); let notify = Arc::new(Notify::new()); - let notify2 = Arc::clone(¬ify); + tokio::spawn(self.fetch_and_cache_impl(cache, url.clone(), key, Arc::clone(¬ify))); + notify.notified().await; - tokio::spawn(async move { - let (tx, rx) = channel(FetchResult::Processing); - - self.locks.write().insert(url.clone(), rx); - notify.notify_one(); - let resp = self.inner.get(&url).send().await; - - let resp = match resp { - Ok(mut resp) => { - let content_type = resp.headers().get(CONTENT_TYPE); - - let is_image = content_type - .map(|v| String::from_utf8_lossy(v.as_ref()).contains("image/")) - .unwrap_or_default(); - - if resp.status() != StatusCode::OK || !is_image { - warn!("Got non-OK or non-image response code from upstream, proxying and not caching result."); - - let mut headers = DEFAULT_HEADERS.clone(); - - if let Some(content_type) = content_type { - headers.insert(CONTENT_TYPE, content_type.clone()); - } - - FetchResult::Data( - resp.status(), - headers, - resp.bytes().await.unwrap_or_default(), - ) - } else { - let (content_type, length, last_mod) = { - let headers = resp.headers_mut(); - ( - headers.remove(CONTENT_TYPE), - headers.remove(CONTENT_LENGTH), - headers.remove(LAST_MODIFIED), - ) - }; - - let body = resp.bytes().await.unwrap(); - - debug!("Inserting into cache"); - - let metadata = ImageMetadata::new( - content_type.clone(), - length.clone(), - last_mod.clone(), - ) - .unwrap(); - - match cache.put(key, body.clone(), metadata).await { - Ok(()) => { - debug!("Done putting into cache"); - - let mut headers = DEFAULT_HEADERS.clone(); - if let Some(content_type) = content_type { - headers.insert(CONTENT_TYPE, content_type); - } - - if let Some(content_length) = length { - headers.insert(CONTENT_LENGTH, content_length); - } - - if let Some(last_modified) = last_mod { - headers.insert(LAST_MODIFIED, last_modified); - } - - FetchResult::Data(StatusCode::OK, headers, body) - } - Err(e) => { - warn!("Failed to insert into cache: {}", e); - FetchResult::InternalServerError - } - } - } - } - Err(e) => { - error!("Failed to fetch image from server: {}", e); - FetchResult::ServiceUnavailable - } - }; - // This shouldn't happen - tx.send(resp).unwrap(); - self.locks.write().remove(&url); - }); - - notify2.notified().await; - - let mut recv = self.locks.read().get(&url_0).unwrap().clone(); + let mut recv = self + .locks + .read() + .get(&url) + .expect("receiver to exist since we just made one") + .clone(); loop { if !matches!(*recv.borrow(), FetchResult::Processing) { break; @@ -203,6 +123,95 @@ impl CachingClient { resp } + async fn fetch_and_cache_impl( + &self, + cache: Data, + url: String, + key: CacheKey, + notify: Arc, + ) { + let (tx, rx) = channel(FetchResult::Processing); + + self.locks.write().insert(url.clone(), rx); + notify.notify_one(); + let resp = self.inner.get(&url).send().await; + + let resp = match resp { + Ok(mut resp) => { + let content_type = resp.headers().get(CONTENT_TYPE); + + let is_image = content_type + .map(|v| String::from_utf8_lossy(v.as_ref()).contains("image/")) + .unwrap_or_default(); + + if resp.status() != StatusCode::OK || !is_image { + warn!("Got non-OK or non-image response code from upstream, proxying and not caching result."); + + let mut headers = DEFAULT_HEADERS.clone(); + + if let Some(content_type) = content_type { + headers.insert(CONTENT_TYPE, content_type.clone()); + } + + FetchResult::Data( + resp.status(), + headers, + resp.bytes().await.unwrap_or_default(), + ) + } else { + let (content_type, length, last_mod) = { + let headers = resp.headers_mut(); + ( + headers.remove(CONTENT_TYPE), + headers.remove(CONTENT_LENGTH), + headers.remove(LAST_MODIFIED), + ) + }; + + let body = resp.bytes().await.unwrap(); + + debug!("Inserting into cache"); + + let metadata = + ImageMetadata::new(content_type.clone(), length.clone(), last_mod.clone()) + .unwrap(); + + match cache.put(key, body.clone(), metadata).await { + Ok(()) => { + debug!("Done putting into cache"); + + let mut headers = DEFAULT_HEADERS.clone(); + if let Some(content_type) = content_type { + headers.insert(CONTENT_TYPE, content_type); + } + + if let Some(content_length) = length { + headers.insert(CONTENT_LENGTH, content_length); + } + + if let Some(last_modified) = last_mod { + headers.insert(LAST_MODIFIED, last_modified); + } + + FetchResult::Data(StatusCode::OK, headers, body) + } + Err(e) => { + warn!("Failed to insert into cache: {}", e); + FetchResult::InternalServerError + } + } + } + } + Err(e) => { + error!("Failed to fetch image from server: {}", e); + FetchResult::ServiceUnavailable + } + }; + // This shouldn't happen + tx.send(resp).unwrap(); + self.locks.write().remove(&url); + } + #[inline] pub const fn inner(&self) -> &Client { &self.inner diff --git a/src/routes.rs b/src/routes.rs index f624bce..0b76b5a 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -51,7 +51,6 @@ async fn index() -> impl Responder { HttpResponse::Ok().body(include_str!("index.html")) } -#[allow(clippy::future_not_send)] #[get("/{token}/data/{chapter_hash}/{file_name}")] async fn token_data( state: Data, @@ -68,7 +67,6 @@ async fn token_data( fetch_image(state, cache, chapter_hash, file_name, false).await } -#[allow(clippy::future_not_send)] #[get("/{token}/data-saver/{chapter_hash}/{file_name}")] async fn token_data_saver( state: Data, @@ -124,7 +122,7 @@ pub async fn default(state: Data, req: HttpRequest) -> impl R ServerResponse::HttpResponse(resp) } -#[allow(clippy::future_not_send, clippy::unused_async)] +#[allow(clippy::unused_async)] #[get("/prometheus")] pub async fn metrics() -> impl Responder { let metric_families = prometheus::gather();