Fix some future not send lints

This commit is contained in:
Edward Shen 2021-07-17 13:32:43 -04:00
parent e95afd3e32
commit afa2cf55fa
Signed by: edward
GPG key ID: 19182661E818369F
4 changed files with 105 additions and 98 deletions

2
src/cache/mem.rs vendored
View file

@ -66,7 +66,7 @@ impl ToRedisArgs for CacheValue {
where where
W: ?Sized + redis::RedisWrite, W: ?Sized + redis::RedisWrite,
{ {
out.write_arg(&bincode::serialize(self).expect("serialization to work")) out.write_arg(&bincode::serialize(self).expect("serialization to work"));
} }
} }

2
src/cache/mod.rs vendored
View file

@ -41,7 +41,7 @@ impl ToRedisArgs for CacheKey {
where where
W: ?Sized + redis::RedisWrite, W: ?Sized + redis::RedisWrite,
{ {
out.write_arg_fmt(self) out.write_arg_fmt(self);
} }
} }

View file

@ -84,8 +84,11 @@ impl CachingClient {
key: CacheKey, key: CacheKey,
cache: Data<dyn Cache>, cache: Data<dyn Cache>,
) -> FetchResult { ) -> FetchResult {
if let Some(recv) = self.locks.read().get(&url) { let maybe_receiver = {
let mut recv = recv.clone(); let lock = self.locks.read();
lock.get(&url).map(Clone::clone)
};
if let Some(mut recv) = maybe_receiver {
loop { loop {
if !matches!(*recv.borrow(), FetchResult::Processing) { if !matches!(*recv.borrow(), FetchResult::Processing) {
break; break;
@ -97,100 +100,17 @@ impl CachingClient {
return recv.borrow().clone(); return recv.borrow().clone();
} }
let url_0 = url.clone();
let notify = Arc::new(Notify::new()); let notify = Arc::new(Notify::new());
let notify2 = Arc::clone(&notify); tokio::spawn(self.fetch_and_cache_impl(cache, url.clone(), key, Arc::clone(&notify)));
notify.notified().await;
tokio::spawn(async move { let mut recv = self
let (tx, rx) = channel(FetchResult::Processing); .locks
.read()
self.locks.write().insert(url.clone(), rx); .get(&url)
notify.notify_one(); .expect("receiver to exist since we just made one")
let resp = self.inner.get(&url).send().await; .clone();
let resp = match resp {
Ok(mut resp) => {
let content_type = resp.headers().get(CONTENT_TYPE);
let is_image = content_type
.map(|v| String::from_utf8_lossy(v.as_ref()).contains("image/"))
.unwrap_or_default();
if resp.status() != StatusCode::OK || !is_image {
warn!("Got non-OK or non-image response code from upstream, proxying and not caching result.");
let mut headers = DEFAULT_HEADERS.clone();
if let Some(content_type) = content_type {
headers.insert(CONTENT_TYPE, content_type.clone());
}
FetchResult::Data(
resp.status(),
headers,
resp.bytes().await.unwrap_or_default(),
)
} else {
let (content_type, length, last_mod) = {
let headers = resp.headers_mut();
(
headers.remove(CONTENT_TYPE),
headers.remove(CONTENT_LENGTH),
headers.remove(LAST_MODIFIED),
)
};
let body = resp.bytes().await.unwrap();
debug!("Inserting into cache");
let metadata = ImageMetadata::new(
content_type.clone(),
length.clone(),
last_mod.clone(),
)
.unwrap();
match cache.put(key, body.clone(), metadata).await {
Ok(()) => {
debug!("Done putting into cache");
let mut headers = DEFAULT_HEADERS.clone();
if let Some(content_type) = content_type {
headers.insert(CONTENT_TYPE, content_type);
}
if let Some(content_length) = length {
headers.insert(CONTENT_LENGTH, content_length);
}
if let Some(last_modified) = last_mod {
headers.insert(LAST_MODIFIED, last_modified);
}
FetchResult::Data(StatusCode::OK, headers, body)
}
Err(e) => {
warn!("Failed to insert into cache: {}", e);
FetchResult::InternalServerError
}
}
}
}
Err(e) => {
error!("Failed to fetch image from server: {}", e);
FetchResult::ServiceUnavailable
}
};
// This shouldn't happen
tx.send(resp).unwrap();
self.locks.write().remove(&url);
});
notify2.notified().await;
let mut recv = self.locks.read().get(&url_0).unwrap().clone();
loop { loop {
if !matches!(*recv.borrow(), FetchResult::Processing) { if !matches!(*recv.borrow(), FetchResult::Processing) {
break; break;
@ -203,6 +123,95 @@ impl CachingClient {
resp resp
} }
async fn fetch_and_cache_impl(
&self,
cache: Data<dyn Cache>,
url: String,
key: CacheKey,
notify: Arc<Notify>,
) {
let (tx, rx) = channel(FetchResult::Processing);
self.locks.write().insert(url.clone(), rx);
notify.notify_one();
let resp = self.inner.get(&url).send().await;
let resp = match resp {
Ok(mut resp) => {
let content_type = resp.headers().get(CONTENT_TYPE);
let is_image = content_type
.map(|v| String::from_utf8_lossy(v.as_ref()).contains("image/"))
.unwrap_or_default();
if resp.status() != StatusCode::OK || !is_image {
warn!("Got non-OK or non-image response code from upstream, proxying and not caching result.");
let mut headers = DEFAULT_HEADERS.clone();
if let Some(content_type) = content_type {
headers.insert(CONTENT_TYPE, content_type.clone());
}
FetchResult::Data(
resp.status(),
headers,
resp.bytes().await.unwrap_or_default(),
)
} else {
let (content_type, length, last_mod) = {
let headers = resp.headers_mut();
(
headers.remove(CONTENT_TYPE),
headers.remove(CONTENT_LENGTH),
headers.remove(LAST_MODIFIED),
)
};
let body = resp.bytes().await.unwrap();
debug!("Inserting into cache");
let metadata =
ImageMetadata::new(content_type.clone(), length.clone(), last_mod.clone())
.unwrap();
match cache.put(key, body.clone(), metadata).await {
Ok(()) => {
debug!("Done putting into cache");
let mut headers = DEFAULT_HEADERS.clone();
if let Some(content_type) = content_type {
headers.insert(CONTENT_TYPE, content_type);
}
if let Some(content_length) = length {
headers.insert(CONTENT_LENGTH, content_length);
}
if let Some(last_modified) = last_mod {
headers.insert(LAST_MODIFIED, last_modified);
}
FetchResult::Data(StatusCode::OK, headers, body)
}
Err(e) => {
warn!("Failed to insert into cache: {}", e);
FetchResult::InternalServerError
}
}
}
}
Err(e) => {
error!("Failed to fetch image from server: {}", e);
FetchResult::ServiceUnavailable
}
};
// This shouldn't happen
tx.send(resp).unwrap();
self.locks.write().remove(&url);
}
#[inline] #[inline]
pub const fn inner(&self) -> &Client { pub const fn inner(&self) -> &Client {
&self.inner &self.inner

View file

@ -51,7 +51,6 @@ async fn index() -> impl Responder {
HttpResponse::Ok().body(include_str!("index.html")) HttpResponse::Ok().body(include_str!("index.html"))
} }
#[allow(clippy::future_not_send)]
#[get("/{token}/data/{chapter_hash}/{file_name}")] #[get("/{token}/data/{chapter_hash}/{file_name}")]
async fn token_data( async fn token_data(
state: Data<RwLockServerState>, state: Data<RwLockServerState>,
@ -68,7 +67,6 @@ async fn token_data(
fetch_image(state, cache, chapter_hash, file_name, false).await fetch_image(state, cache, chapter_hash, file_name, false).await
} }
#[allow(clippy::future_not_send)]
#[get("/{token}/data-saver/{chapter_hash}/{file_name}")] #[get("/{token}/data-saver/{chapter_hash}/{file_name}")]
async fn token_data_saver( async fn token_data_saver(
state: Data<RwLockServerState>, state: Data<RwLockServerState>,
@ -124,7 +122,7 @@ pub async fn default(state: Data<RwLockServerState>, req: HttpRequest) -> impl R
ServerResponse::HttpResponse(resp) ServerResponse::HttpResponse(resp)
} }
#[allow(clippy::future_not_send, clippy::unused_async)] #[allow(clippy::unused_async)]
#[get("/prometheus")] #[get("/prometheus")]
pub async fn metrics() -> impl Responder { pub async fn metrics() -> impl Responder {
let metric_families = prometheus::gather(); let metric_families = prometheus::gather();