diff --git a/sqlx-data.json b/sqlx-data.json index 7ec2755..09fc0ee 100644 --- a/sqlx-data.json +++ b/sqlx-data.json @@ -1,5 +1,15 @@ { "db": "SQLite", + "24b536161a0ed44d0595052ad069c023631ffcdeadb15a01ee294717f87cdd42": { + "query": "update Images set accessed = ? where id = ?", + "describe": { + "columns": [], + "parameters": { + "Right": 2 + }, + "nullable": [] + } + }, "2a8aa6dd2c59241a451cd73f23547d0e94930e35654692839b5d11bb8b87703e": { "query": "insert into Images (id, size, accessed) values (?, ?, ?) on conflict do nothing", "describe": { @@ -52,16 +62,6 @@ ] } }, - "59e09d6fbcc7cd91807f6c9598e4028352b28e83e85c98d96cafd3e713b4d7b7": { - "query": "update Images set accessed = ? where id = ? or id = ?", - "describe": { - "columns": [], - "parameters": { - "Right": 3 - }, - "nullable": [] - } - }, "a60501a30fd75b2a2a59f089e850343af075436a5c543a267ecb4fa841593ce9": { "query": "create table if not exists Images(\n id varchar primary key not null,\n size integer not null,\n accessed timestamp not null default CURRENT_TIMESTAMP\n);\ncreate index if not exists Images_accessed on Images(accessed);", "describe": { diff --git a/src/cache/disk.rs b/src/cache/disk.rs index 071a374..1d18587 100644 --- a/src/cache/disk.rs +++ b/src/cache/disk.rs @@ -15,7 +15,8 @@ use md5::digest::generic_array::GenericArray; use md5::{Digest, Md5}; use sqlx::sqlite::SqliteConnectOptions; use sqlx::{ConnectOptions, Sqlite, SqlitePool, Transaction}; -use tokio::fs::{remove_file, rename}; +use tokio::fs::{remove_file, rename, File}; +use tokio::join; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio_stream::wrappers::ReceiverStream; use tracing::{debug, error, info, instrument, warn}; @@ -214,27 +215,11 @@ async fn db_listener( #[instrument(level = "debug", skip(transaction))] async fn handle_db_get(entry: &Path, transaction: &mut Transaction<'_, Sqlite>) { - let hash = if let Ok(hash) = Md5Hash::try_from(entry) { - hash - } else { - error!( - "Failed to derive hash from entry, dropping message: {}", - entry.to_string_lossy() - ); - return; - }; - - let hash_str = hash.to_hex_string(); let key = entry.as_os_str().to_str(); let now = chrono::Utc::now(); - let query = sqlx::query!( - "update Images set accessed = ? where id = ? or id = ?", - now, - key, - hash_str - ) - .execute(transaction) - .await; + let query = sqlx::query!("update Images set accessed = ? where id = ?", now, key) + .execute(transaction) + .await; if let Err(e) = query { warn!("Failed to update timestamp in db for {:?}: {}", key, e); } @@ -319,14 +304,34 @@ impl Cache for DiskCache { ) -> Option> { let channel = self.db_update_channel_sender.clone(); - // TODO: Check legacy path as well - let path = Arc::new(self.disk_path.clone().join(PathBuf::from(key))); let path_0 = Arc::clone(&path); - tokio::spawn(async move { channel.send(DbMessage::Get(path_0)).await }); + let legacy_path = Md5Hash::try_from(path_0.as_path()) + .map(PathBuf::from) + .map(Arc::new); - super::fs::read_file_from_path(&path).await.map(|res| { + // Get file and path of first existing location path + let (file, path) = if let Ok(legacy_path) = legacy_path { + let maybe_files = join!( + File::open(path.as_path()), + File::open(legacy_path.as_path()) + ); + match maybe_files { + (Ok(f), _) => Some((f, path)), + (_, Ok(f)) => Some((f, legacy_path)), + _ => return None, + } + } else { + File::open(path.as_path()) + .await + .ok() + .map(|file| (file, path)) + }?; + + tokio::spawn(async move { channel.send(DbMessage::Get(path)).await }); + + super::fs::read_file(file).await.map(|res| { res.map(|(stream, _, metadata)| (stream, metadata)) .map_err(|_| CacheError::DecryptionFailure) }) diff --git a/src/cache/fs.rs b/src/cache/fs.rs index 89735d2..e264fae 100644 --- a/src/cache/fs.rs +++ b/src/cache/fs.rs @@ -16,7 +16,7 @@ use std::error::Error; use std::fmt::Display; -use std::io::{Seek, SeekFrom}; +use std::io::SeekFrom; use std::path::Path; use std::pin::Pin; use std::task::{Context, Poll}; @@ -44,28 +44,21 @@ use super::{CacheKey, CacheStream, ImageMetadata, ENCRYPTION_KEY}; /// Attempts to lookup the file on disk, returning a byte stream if it exists. /// Note that this could return two types of streams, depending on if the file /// is in progress of being written to. -#[inline] -pub(super) async fn read_file_from_path( - path: &Path, -) -> Option, ImageMetadata), std::io::Error>> { - read_file(std::fs::File::open(path).ok()?).await -} - #[instrument(level = "debug")] -async fn read_file( - file: std::fs::File, +pub(super) async fn read_file( + file: File, ) -> Option, ImageMetadata), std::io::Error>> { - let mut file_0 = file.try_clone().unwrap(); - let file_1 = file.try_clone().unwrap(); + let mut file_0 = file.try_clone().await.unwrap(); + let file_1 = file.try_clone().await.unwrap(); // Try reading decrypted header first... - let mut deserializer = serde_json::Deserializer::from_reader(file); + let mut deserializer = serde_json::Deserializer::from_reader(file.into_std().await); let mut maybe_metadata = ImageMetadata::deserialize(&mut deserializer); // Failed to parse normally, see if we have a legacy file format if maybe_metadata.is_err() { - file_0.seek(SeekFrom::Start(2)).ok()?; - let mut deserializer = serde_json::Deserializer::from_reader(file_0); + file_0.seek(SeekFrom::Start(2)).await.ok()?; + let mut deserializer = serde_json::Deserializer::from_reader(file_0.into_std().await); maybe_metadata = LegacyImageMetadata::deserialize(&mut deserializer).map(LegacyImageMetadata::into); } @@ -82,12 +75,12 @@ async fn read_file( return None; } - reader = Some(Box::pin(BufReader::new(File::from_std(file_1)))); + reader = Some(Box::pin(BufReader::new(file_1))); parsed_metadata = Some(metadata); debug!("Found not encrypted file"); } else { debug!("metadata read failed, trying to see if it's encrypted"); - let mut file = File::from_std(file_1); + let mut file = file_1; file.seek(SeekFrom::Start(0)).await.ok()?; // image is encrypted or corrupt @@ -391,6 +384,7 @@ mod read_file { use futures::StreamExt; use std::io::{Seek, SeekFrom, Write}; use tempfile::tempfile; + use tokio::fs::File; #[tokio::test] #[cfg_attr(miri, ignore)] @@ -402,6 +396,7 @@ mod read_file { ) .unwrap(); temp_file.seek(SeekFrom::Start(0)).unwrap(); + let temp_file = File::from_std(temp_file); let (inner_stream, maybe_header, metadata) = read_file(temp_file).await.unwrap().unwrap(); @@ -431,6 +426,7 @@ mod read_file_compat { use futures::StreamExt; use std::io::{Seek, SeekFrom, Write}; use tempfile::tempfile; + use tokio::fs::File; #[tokio::test] #[cfg_attr(miri, ignore)] @@ -442,6 +438,7 @@ mod read_file_compat { ) .unwrap(); temp_file.seek(SeekFrom::Start(0)).unwrap(); + let temp_file = File::from_std(temp_file); let (inner_stream, maybe_header, metadata) = read_file(temp_file).await.unwrap().unwrap();