From 9ab60993982a4caaa599b04d7fbae741564200f5 Mon Sep 17 00:00:00 2001 From: colagy Date: Wed, 22 Mar 2023 20:17:01 +0800 Subject: [PATCH 1/3] cache init --- src/db/version_set.rs | 1 - src/lib.rs | 1 + src/util/cache.rs | 242 +++++++++++++++++++++++++++++++---------- src/util/cache_test.rs | 126 ++++++++++++++++++++- src/util/slice.rs | 27 ++++- 5 files changed, 332 insertions(+), 65 deletions(-) diff --git a/src/db/version_set.rs b/src/db/version_set.rs index 15dabb0..1ab9154 100644 --- a/src/db/version_set.rs +++ b/src/db/version_set.rs @@ -5,7 +5,6 @@ use crate::db::file_meta_data::FileMetaData; use crate::db::table_cache::TableCache; use crate::db::version_edit::VersionEdit; use crate::traits::comparator_trait::Comparator; -use crate::util::cache::Cache; use crate::util::options::{Env, Options, ReadOptions}; use crate::util::slice::Slice; use crate::util::Result; diff --git a/src/lib.rs b/src/lib.rs index 064907a..032a46e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ #![feature(box_syntax)] +#![feature(label_break_value)] mod db; mod table; diff --git a/src/util/cache.rs b/src/util/cache.rs index 8d69c2d..3d815b6 100644 --- a/src/util/cache.rs +++ b/src/util/cache.rs @@ -1,81 +1,152 @@ -use std::cell::RefCell; +use std::borrow::BorrowMut; +use std::cell::{RefCell, RefMut}; +use std::collections::HashMap; +use std::ops::{Deref, Shr}; use std::rc::Rc; +use crate::util::hash::ToHash; +use crate::util::linked_list::LinkedList; use crate::util::slice::Slice; use crate::util::Result; -pub struct Handle {} - -pub struct LRUHandle { +#[derive(Clone, Debug, PartialEq)] +pub struct LRUHandle { key: Slice, value: T, hash: u32, in_cache: bool, key_length: usize, charge: usize, + refs: u32, prev: Option>>>, next: Option>>>, - next_hash: Option>>, + next_hash: Option>>>, } -impl LRUHandle { - pub fn key(&self) -> Slice { - todo!() +impl LRUHandle { + fn new(key: Slice, + value: T, + hash: u32, + charge: usize, + prev: Option>>>, + next: Option>>>, + next_hash: Option>>>) -> Self { + let key_length = key.size(); + Self { + key, + value, + hash, + in_cache: false, + key_length, + charge, + refs: 1, + prev, + next, + next_hash, + } + } + pub fn key(&self) -> &Slice { + &self.key + } + pub fn value(&self) -> &T { + &self.value } } -pub struct HandleTable { +#[derive(Clone)] +pub struct HandleTable { length: usize, + list: [Option>; 16], } -impl HandleTable { - pub fn look_up(&self, _key: &Slice, _hash: u32) -> &LRUHandle { - todo!() +impl Default for HandleTable { + fn default() -> Self { + HandleTable { + length: 16, + list: [None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None], + } } +} - pub fn insert(&mut self, _handle: LRUHandle) -> &LRUHandle { - todo!() +impl HandleTable { + pub fn look_up(&self, key: &Slice, hash: u32) -> Result>> { + match &self.list[hash as usize & self.length.wrapping_sub(1)] { + Some(v) => { + Ok(Some(v.clone())) + } + _ => { + return Ok(None); + } + } } - pub fn remove(&mut self, _key: &Slice, _hash: u32) -> LRUHandle { - todo!() + pub fn insert(&mut self, handle: LRUHandle) { + let index = handle.hash as usize & self.length.wrapping_sub(1); + self.list[index] = Some(handle); + } + + pub fn remove(&mut self, _key: &Slice, _hash: u32) { + let index = _hash as usize & self.length.wrapping_sub(1); + self.list[index] = None; } pub fn length(&self) -> usize { self.length } + /// 扩容 + /// + /// # Examples + /// + /// ``` + /// + /// ``` fn resize(&mut self) { todo!() } } -pub struct LRUCache { +pub struct LRUCache { capacity: usize, usage: usize, - in_use: LRUHandle, - table: HandleTable, + in_use: Option>, + table: HandleTable, } -impl LRUCache { - pub fn set_capacity(&mut self, capacity: usize) { - self.capacity = capacity; +impl LRUCache { + pub fn new(capacity: usize, usage: usize, in_use: Option>, table: HandleTable) -> Self { + Self { capacity, usage, in_use, table } } - pub fn insert(&mut self, _key: &Slice, _hash: u32, _value: T, _charge: usize) -> &LRUHandle { - todo!() + // pub fn set_capacity(&mut self, capacity: usize) { + // self.capacity = capacity; + // } + + pub fn insert(&mut self, key: Slice, hash: u32, value: T, charge: usize, deleter: F) + where F: FnOnce(Slice, T) { + let e = LRUHandle::new(key, + value, + hash, + charge, + None, + None, + None, + ); + self.table.insert(e); + self.usage += 1; } - pub fn look_up(&self, _key: &Slice, _hash: u32) -> &LRUHandle { - todo!() + pub fn look_up(&self, key: &Slice, hash: u32) -> Result>> { + self.table.look_up(key, hash) } pub fn release(&mut self, _handle: &LRUHandle) { todo!() } - pub fn erase(&mut self, _key: &Slice, _hash: u32) { - todo!() + pub fn erase(&mut self, _key: &Slice, _hash: u32) -> Result<()> { + self.table.remove(_key, _hash); + Ok(()) } pub fn prune(&mut self) { todo!() @@ -84,87 +155,142 @@ impl LRUCache { todo!() } - pub fn lru_remove(&mut self, _handle: &LRUHandle) { + fn lru_remove(&mut self, _handle: &LRUHandle) { todo!() } - pub fn lru_append(&mut self, _head_of_list: &LRUHandle, _e: LRUHandle) { + fn lru_append(&mut self, _head_of_list: &LRUHandle, _e: LRUHandle) { todo!() } - pub fn refer(&self, _e: &LRUHandle) { + fn refer(&self, _e: &LRUHandle) { todo!() } - pub fn unref(&self, _e: &LRUHandle) { + fn unref(&self, _e: &LRUHandle) { todo!() } } -pub trait Cache { - /// 向缓存中插入数据 +const K_NUM_SHARD_BITS: usize = 4; +const K_NUM_SHARDS: usize = 1 << K_NUM_SHARD_BITS; + +pub struct ShardLRUCache { + shard: Vec>, +} + +impl ShardLRUCache { + /// 构造一个指定容量的ShardLRUCache /// /// # Arguments /// - /// * `key`: 键 - /// * `value`: 值 - /// * `charge`: 长度 - /// * `deleter`: 删除的回调函数 + /// * `capacity`: 容量 /// - /// returns: Handle + /// returns: ShardLRUCache /// /// # Examples /// /// ``` - /// let element = cache.insert(Slice::from("123"), block, 10, move || {}); + /// ShardLRUCache::new_with_capacity(32); /// ``` - fn insert(&mut self, key: &Slice, value: T, charge: usize, deleter: F) -> Handle - where F: FnOnce(&Slice, T); + pub fn new_with_capacity(capacity: usize) -> Self { + let per_shard: usize = (capacity + (K_NUM_SHARDS - 1)) / K_NUM_SHARD_BITS; - /// 从缓存中读取数据 + let mut shard_vec: Vec> = Vec::with_capacity(K_NUM_SHARDS); + for _ in 1..K_NUM_SHARDS { + let table = HandleTable::default(); + let cache: LRUCache = LRUCache::new(per_shard, 0, None, table); + shard_vec.push(cache); + } + Self { + shard: shard_vec + } + } + + fn hash_slice(s: &Slice) -> u32 { + s.to_hash_with_seed(0) + } + + fn shard(hash: u32) -> u32 { + hash.shr(32 - K_NUM_SHARD_BITS) + } + + /// 从缓存中获取数据 /// /// # Arguments /// /// * `key`: 键 /// - /// returns: Handle + /// returns: Result, Status> /// /// # Examples /// /// ``` - /// let element = cache.lookup(Slice::from("123")); + /// let value= cache.lookup(Slice::from("123")); /// ``` - fn lookup(&self, key: &Slice) -> Handle; + pub fn lookup(&self, key: &Slice) -> Result>> { + let hash = Self::hash_slice(&key); + let i = Self::shard(hash); + self.shard[i as usize].look_up(key, hash) + } - /// 从缓存中释放元素 + /// 插入数据到缓存 /// /// # Arguments /// - /// * `handle`: 元素 + /// * `key`: 键 + /// * `value`: 值 + /// * `charge`: 空间占用量 + /// * `deleter`: 删除的回调函数 /// /// returns: () /// /// # Examples /// /// ``` - /// cache.release(element); + /// cache.insert(Slice::from("123", 123,1,move || {})) /// ``` - fn release(&mut self, handle: Handle); + pub fn insert(&mut self, key: Slice, value: T, charge: usize, deleter: F) -> Result<()> + where F: FnOnce(Slice, T) { + let hash = Self::hash_slice(&key); + let i = Self::shard(hash); + let mut shard = &mut self.shard[i as usize]; + shard.insert(key, hash, value, charge, deleter); + Ok(()) + } - /// 从缓存中删除元素 + /// 释放引用 + /// 当数据不再需要使用时, 使用方必须释放引用 /// /// # Arguments /// - /// * `key`: 键 + /// * `handle`: 需要释放的值 /// /// returns: Result<(), Status> /// /// # Examples /// /// ``` - /// cache.erase(Slice::from("123"))?; + /// cache.release(handle); /// ``` - fn erase(&mut self, key: &Slice) -> Result<()>; + pub fn release(&mut self, handle: LRUHandle) -> Result<()> { + todo!() + } - fn new_id(&self) -> Result; - fn prune(&mut self) -> Result<()>; - fn total_charge(&self) -> usize; - // fn value(&self, key: Handle) -> Handle; + /// 从缓存中删除值 + /// + /// # Arguments + /// + /// * `key`: 值 + /// + /// returns: Result<(), Status> + /// + /// # Examples + /// + /// ``` + /// cache.erase(Slice::from("123")); + /// ``` + pub fn erase(&mut self, key: &Slice) -> Result<()> { + let hash = Self::hash_slice(&key); + let i = Self::shard(hash); + let mut shard = &mut self.shard[i as usize]; + shard.erase(key, hash) + } } \ No newline at end of file diff --git a/src/util/cache_test.rs b/src/util/cache_test.rs index dd63ab2..c14bf2f 100644 --- a/src/util/cache_test.rs +++ b/src/util/cache_test.rs @@ -1,6 +1,130 @@ mod test { + use std::borrow::Borrow; + use std::collections::HashMap; + use std::ops::Deref; + use crate::util::cache::{LRUHandle, ShardLRUCache}; + use crate::util::slice::Slice; + + use crate::util::Result; + + #[test] + fn test_insert() -> Result<()> { + let mut cache: ShardLRUCache = ShardLRUCache::new_with_capacity(16); + let key = Slice::from("123"); + let value = 1234; + cache.insert(key.clone(), value, 1, move |k, v| { + println!("delete key: {}", String::from(k)); + println!("delete value: {}", v); + })?; + println!("key: {}", String::from(key.clone())); + println!("value: {}", value); + Ok(()) + } + #[test] - fn test_insert() { + fn test_update() -> Result<()> { + let mut cache: ShardLRUCache = ShardLRUCache::new_with_capacity(16); + let key = Slice::from("123"); + let value = 1234; + cache.insert(key.clone(), value, 1, move |k, v| { + println!("delete key: {}", String::from(k)); + println!("delete value: {}", v); + })?; + println!("key: {}", String::from(key.clone())); + println!("value: {}", value); + let mut inserted = cache.lookup(&key.clone())?; + assert_eq!(value, *inserted.unwrap().value()); + + let value = 1235; + cache.insert(key.clone(), value, 1, move |k, v| { + println!("delete key: {}", String::from(k)); + println!("delete value: {}", v); + })?; + let mut inserted = cache.lookup(&key.clone())?; + println!("key: {}", String::from(key.clone())); + println!("value: {}", value); + assert_eq!(value, *inserted.unwrap().value()); + + Ok(()) + } + + #[test] + fn test_lookup() -> Result<()> { + let mut cache: ShardLRUCache = ShardLRUCache::new_with_capacity(16); + let key = Slice::from("123"); + let value = 1234; + cache.insert(key.clone(), value, 1, move |k, v| { + println!("delete key: {}", String::from(k)); + println!("delete value: {}", v); + })?; + println!("key: {}", String::from(key.clone())); + println!("value: {}", value); + + let value = cache.lookup(&key.clone())?; + match value { + None => { + println!("value is none"); + } + Some(v) => { + println!("key: {}", String::from(v.key())); + println!("value: {}", v.value()); + } + } + Ok(()) + } + + #[test] + fn test_remove() -> Result<()> { + let mut cache: ShardLRUCache = ShardLRUCache::new_with_capacity(16); + let key = Slice::from("123"); + let value = 1234; + cache.insert("123", value, 1, move |k, v| { + println!("delete key: {}", String::from(k)); + println!("delete value: {}", v); + })?; + println!("key: {:?}", &key); + println!("value: {}", value); + + let lookup = cache.lookup(&key.clone())?; + match &lookup { + None => { + println!("value is none"); + } + Some(v) => { + println!("key: {}", String::from(v.key())); + println!("value: {}", v.value()); + } + } + assert_eq!(value, *lookup.unwrap().value()); + + cache.erase(&key)?; + + let lookup = cache.lookup(&key.clone())?; + match &lookup { + None => { + println!("value is none"); + } + Some(v) => { + println!("key: {}", String::from(v.key())); + println!("value: {}", v.value()); + } + } + assert_eq!(None, lookup); + + Ok(()) + } + + #[test] + fn test_hash_map() { + let mut map: HashMap<&str, &str> = HashMap::new(); + map.insert("123", "a"); + let value = map.get("123"); + match value { + None => {} + Some(v) => { + println!("{}", v); + } + } } } \ No newline at end of file diff --git a/src/util/slice.rs b/src/util/slice.rs index 26ea8b1..e787628 100644 --- a/src/util/slice.rs +++ b/src/util/slice.rs @@ -23,8 +23,16 @@ impl Default for Slice { } } -impl Slice { +impl Clone for Slice { + fn clone(&self) -> Self { + let data = self.data.clone(); + Self { + data + } + } +} +impl Slice { /// 从 &mut [u8] 转到 Slice, 这里存在内存拷贝开销 #[inline] pub fn from_buf(buf: &[u8]) -> Self { @@ -54,7 +62,7 @@ impl Slice { #[inline] pub fn as_sub_ref(&self, start: usize, length: usize) -> &[u8] { - &(**self)[start..(start+length)] + &(**self)[start..(start + length)] } /// 移除头部 n 个元素 @@ -95,7 +103,6 @@ impl Slice { } } } - } impl<'a> Slice { @@ -119,6 +126,16 @@ impl From for String { } } +impl From<&Slice> for String { + #[inline] + fn from(s: &Slice) -> Self { + let data = &s.data; + unsafe { + String::from_utf8_unchecked(data.clone()) + } + } +} + impl From for Vec { #[inline] fn from(s: Slice) -> Self { @@ -126,7 +143,7 @@ impl From for Vec { } } -impl > From for Slice { +impl> From for Slice { #[inline] fn from(r: R) -> Self { Self { @@ -191,7 +208,7 @@ impl Deref for Slice { /// Slice 解引用到 &[u8] #[inline] fn deref(&self) -> &Self::Target { - &*self.data + &*self.data } } -- Gitee From 99cd4497dc165b653a779daf6d666a89761bdef1 Mon Sep 17 00:00:00 2001 From: colagy Date: Wed, 19 Apr 2023 16:54:04 +0800 Subject: [PATCH 2/3] Implement new cache can use in multi threads; Created a crate for proc_macro, and create a arr!() function-like proc macro to generate an arr which element have not implemented the Copy Trait; --- Cargo.toml | 3 +- custom_proc_macro/Cargo.toml | 14 ++ custom_proc_macro/src/lib.rs | 55 +++++ src/db/version_set.rs | 1 - src/util/cache.rs | 419 +++++++++++++++++++------------- src/util/cache_test.rs | 130 ---------- src/util/debug.rs | 2 +- src/util/mod.rs | 1 - tests/custom_proc_macro_test.rs | 39 +++ 9 files changed, 356 insertions(+), 308 deletions(-) create mode 100644 custom_proc_macro/Cargo.toml create mode 100644 custom_proc_macro/src/lib.rs delete mode 100644 src/util/cache_test.rs create mode 100644 tests/custom_proc_macro_test.rs diff --git a/Cargo.toml b/Cargo.toml index be435a3..e5d8965 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,8 +6,6 @@ edition = "2021" [lib] name = "level_db_rust" path = "src/lib.rs" -# 过程宏 -proc-macro = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -16,6 +14,7 @@ rand = "0.8.5" tokio = "1.24.1" jemallocator = "0.5" jemalloc-sys = { version = "0.5", features = ["stats"] } +custom_proc_macro = { path = "custom_proc_macro" } [dev-dependencies] criterion = { version = "0.4.0", features = ["html_reports"] } diff --git a/custom_proc_macro/Cargo.toml b/custom_proc_macro/Cargo.toml new file mode 100644 index 0000000..48488f1 --- /dev/null +++ b/custom_proc_macro/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "custom_proc_macro" +version = "0.1.0" +edition = "2021" + +[lib] +# 过程宏 +proc-macro = true +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +syn = { version = "1.0", features = ["full"] } + +[features] \ No newline at end of file diff --git a/custom_proc_macro/src/lib.rs b/custom_proc_macro/src/lib.rs new file mode 100644 index 0000000..5142909 --- /dev/null +++ b/custom_proc_macro/src/lib.rs @@ -0,0 +1,55 @@ +use proc_macro::{TokenStream}; +use std::ops::Deref; +use syn::{ExprRepeat, parse_macro_input, Lit, Expr}; +use syn::__private::quote::quote; +use syn::parse_macro_input::parse; + +/// 生成数组的宏 主要用于没有实现copy语义的结构体 在无法使用[T; 32] 这种方式生成数组的情况下 +/// +/// # Arguments +/// +/// * `input`: TokenStream(ExprRepeat) 以分号(;)为分割符, 第一个参数为表达式, 第二个参数为数量. 例: T::default(); 16 +/// +/// returns: TokenStream +/// +/// # Examples +/// +/// ``` +/// struct Test; +/// let arr: [Test; 16] = arr!([Test; 16]); +/// ``` +/// # Expansion +/// ``` +/// [Test; 16]; +/// [0; 16] +/// ``` +#[proc_macro] +pub fn arr(input: TokenStream) -> TokenStream { + let repeat_expr: ExprRepeat = parse(input) + .expect("like arr!([Test; 16])"); + + let mut len = 0; + // 获取表达式中的长度信息并转为usize + if let Expr::Lit(expr_lit) = repeat_expr.len.deref() { + if let Lit::Int(int_lit) = &expr_lit.lit { + len = int_lit.base10_parse::().expect("Failed to parse integer literal"); + } + } + // 解析并拼接成数组 + let _expr = repeat_expr.expr; + // 1.生成数组中的一个元素 + let _one = quote! { #_expr, }; + let mut _all = quote!(); + for _ in 0..len { + // 2.将数组中的每个元素向数组中追加 + _all = quote! { #_all #_one }; + } + // 3.加上中括号 + let arr = quote! { [ #_all ] }; + return arr.into(); +} + +#[test] +fn test_arr() { + let int_arr = arr!([u32; 12]); +} \ No newline at end of file diff --git a/src/db/version_set.rs b/src/db/version_set.rs index 6175203..2435778 100644 --- a/src/db/version_set.rs +++ b/src/db/version_set.rs @@ -5,7 +5,6 @@ use crate::db::file_meta_data::FileMetaData; use crate::db::table_cache::TableCache; use crate::db::version_edit::VersionEdit; use crate::traits::comparator_trait::Comparator; -use crate::util::cache::Cache; use crate::util::env::Env; use crate::util::options::{Options, ReadOptions}; use crate::util::slice::Slice; diff --git a/src/util/cache.rs b/src/util/cache.rs index 3d815b6..58099fb 100644 --- a/src/util/cache.rs +++ b/src/util/cache.rs @@ -1,36 +1,49 @@ use std::borrow::BorrowMut; use std::cell::{RefCell, RefMut}; use std::collections::HashMap; -use std::ops::{Deref, Shr}; +use std::ops::{Deref, DerefMut, Shr}; use std::rc::Rc; -use crate::util::hash::ToHash; +use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard}; +use std::{io, result, thread}; +use std::any::Any; +use std::str::FromStr; +use std::sync::atomic::AtomicUsize; +use custom_proc_macro::arr; +use crate::util::hash::{Hash, ToHash}; use crate::util::linked_list::LinkedList; use crate::util::slice::Slice; use crate::util::Result; -#[derive(Clone, Debug, PartialEq)] -pub struct LRUHandle { +// 缓存的对象, 以Handle为单位进行数据传递和共享, 其中的value是只读的, 带有读写锁 +#[derive(Debug)] +pub struct LRUHandle { + // 缓存的键, 当hash出现冲突时判断key是否相等 key: Slice, + // 缓存的数据, 只读 value: T, + // key的hash值, 用于在HandleTable中寻址 hash: u32, + // 是否在缓存中 in_cache: bool, + // key的长度 key_length: usize, + // value的长度或者大小 charge: usize, - refs: u32, - prev: Option>>>, - next: Option>>>, - next_hash: Option>>>, + // 上一节点 + prev: Option>>>, + // 下一节点 + next: Option>>>, + // 下一lru节点 + next_lru: Option>>>, } -impl LRUHandle { +impl LRUHandle { fn new(key: Slice, value: T, hash: u32, charge: usize, - prev: Option>>>, - next: Option>>>, - next_hash: Option>>>) -> Self { + ) -> Self { let key_length = key.size(); Self { key, @@ -39,117 +52,159 @@ impl LRUHandle { in_cache: false, key_length, charge, - refs: 1, - prev, - next, - next_hash, + prev: None, + next: None, + next_lru: None, } } pub fn key(&self) -> &Slice { &self.key } pub fn value(&self) -> &T { + &*self + } +} + +impl Deref for LRUHandle { + type Target = T; + + fn deref(&self) -> &Self::Target { &self.value } } -#[derive(Clone)] -pub struct HandleTable { +impl DerefMut for LRUHandle { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.value + } +} + +#[derive(Debug)] +struct HandleTable { length: usize, - list: [Option>; 16], + list: Vec>>>>, } -impl Default for HandleTable { +impl Default for HandleTable { fn default() -> Self { HandleTable { length: 16, - list: [None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None], + list: vec![None; 16], } } } -impl HandleTable { - pub fn look_up(&self, key: &Slice, hash: u32) -> Result>> { - match &self.list[hash as usize & self.length.wrapping_sub(1)] { - Some(v) => { - Ok(Some(v.clone())) - } - _ => { - return Ok(None); +impl HandleTable { + pub fn look_up(&self, key: &Slice, hash: u32) -> Result>>>> { + // 获取hash槽位上的数据, 存在则遍历链表 + let index = hash as usize & self.length.wrapping_sub(1); + let mut head = self.list[index].clone(); + while let Some(handle) = head { + let read = handle.read()?; + if &read.key == key { + return Ok(Some(handle.clone())); } + head = read.next.clone(); } + Ok(None) } - pub fn insert(&mut self, handle: LRUHandle) { - let index = handle.hash as usize & self.length.wrapping_sub(1); - self.list[index] = Some(handle); + pub fn insert(&mut self, handle: LRUHandle) -> Result<()> { + let index = handle.hash as usize & (self.length - 1); + // 获取hash槽位上的数据, 不存在直接插入, 存在插入尾部 + match self.list[index].clone() { + Some(mut head) => { + while let Some(value) = head.clone().write()?.next.clone() { + head = value; + } + head.clone().write()?.next = Some(Arc::new(RwLock::new(handle))); + } + None => { + self.list[index] = Some(Arc::new(RwLock::new(handle))); + } + } + Ok(()) } - pub fn remove(&mut self, _key: &Slice, _hash: u32) { - let index = _hash as usize & self.length.wrapping_sub(1); - self.list[index] = None; + pub fn remove(&mut self, key: &Slice, hash: u32) -> Result<()> { + let index = hash as usize & self.length.wrapping_sub(1); + let mut head = self.list[index].clone(); + // 获取hash槽位上的数据, 遍历到key相等时删除handle + while let Some(handle) = head { + let write = handle.write()?; + if &write.key == key { + if write.prev.is_none() && write.next.is_none() { + // 只有一个节点直接置空 + self.list[index] = None; + } else if write.prev.is_none() { + // 头节点移交至下一节点 + self.list[index] = write.next.clone(); + } else { + // 其余中间节点或尾节点, 删除当前节点并将下一节点移交给上一节点 + write.prev.clone().unwrap().write()?.next = write.next.clone() + } + } + head = write.next.clone(); + } + Ok(()) } pub fn length(&self) -> usize { self.length } - /// 扩容 - /// - /// # Examples - /// - /// ``` - /// - /// ``` fn resize(&mut self) { todo!() } } -pub struct LRUCache { +#[derive(Debug)] +struct LRUCache { capacity: usize, usage: usize, in_use: Option>, table: HandleTable, } -impl LRUCache { +impl Default for LRUCache { + fn default() -> Self { + Self { + capacity: 0, + usage: 0, + in_use: None, + table: HandleTable::default(), + } + } +} + +impl LRUCache { pub fn new(capacity: usize, usage: usize, in_use: Option>, table: HandleTable) -> Self { Self { capacity, usage, in_use, table } } - // pub fn set_capacity(&mut self, capacity: usize) { - // self.capacity = capacity; - // } - - pub fn insert(&mut self, key: Slice, hash: u32, value: T, charge: usize, deleter: F) - where F: FnOnce(Slice, T) { + pub fn insert(&mut self, key: Slice, hash: u32, value: T, charge: usize) -> Result<()> { let e = LRUHandle::new(key, value, hash, charge, - None, - None, - None, ); - self.table.insert(e); + self.table.insert(e)?; self.usage += 1; + self.capacity += 1; + Ok(()) } - pub fn look_up(&self, key: &Slice, hash: u32) -> Result>> { + pub fn look_up(&self, key: &Slice, hash: u32) -> Result>>>> { self.table.look_up(key, hash) } - pub fn release(&mut self, _handle: &LRUHandle) { - todo!() - } - - pub fn erase(&mut self, _key: &Slice, _hash: u32) -> Result<()> { - self.table.remove(_key, _hash); + pub fn erase(&mut self, key: &Slice, hash: u32) -> Result<()> { + self.table.remove(key, hash)?; + self.capacity += 1; Ok(()) } - pub fn prune(&mut self) { - todo!() + pub fn prune(&mut self) -> Result<()> { + Ok(()) } pub fn total_charge(&self) -> usize { todo!() @@ -161,136 +216,154 @@ impl LRUCache { fn lru_append(&mut self, _head_of_list: &LRUHandle, _e: LRUHandle) { todo!() } - fn refer(&self, _e: &LRUHandle) { - todo!() - } - fn unref(&self, _e: &LRUHandle) { - todo!() - } } const K_NUM_SHARD_BITS: usize = 4; const K_NUM_SHARDS: usize = 1 << K_NUM_SHARD_BITS; -pub struct ShardLRUCache { - shard: Vec>, +#[derive(Debug)] +pub struct ShardLRUCache { + shard: [LRUCache; 16], + // 封闭构造器, 请使用ShardLRUCache::new()进行构造, 请勿自行构造结构体 + __private: (), } -impl ShardLRUCache { - /// 构造一个指定容量的ShardLRUCache - /// - /// # Arguments - /// - /// * `capacity`: 容量 - /// - /// returns: ShardLRUCache - /// - /// # Examples - /// - /// ``` - /// ShardLRUCache::new_with_capacity(32); - /// ``` - pub fn new_with_capacity(capacity: usize) -> Self { - let per_shard: usize = (capacity + (K_NUM_SHARDS - 1)) / K_NUM_SHARD_BITS; - - let mut shard_vec: Vec> = Vec::with_capacity(K_NUM_SHARDS); - for _ in 1..K_NUM_SHARDS { - let table = HandleTable::default(); - let cache: LRUCache = LRUCache::new(per_shard, 0, None, table); - shard_vec.push(cache); - } +#[inline] +fn hash_slice(slice: &Slice) -> u32 { + Hash::hash_code(slice, 0) +} + +#[inline] +fn shard(hash: u32) -> usize { + (hash >> (32 - K_NUM_SHARD_BITS)) as usize +} + +#[inline] +fn pre_shard(capacity: usize) -> usize { + (capacity + (K_NUM_SHARDS - 1)) / K_NUM_SHARDS +} + +unsafe impl Send for ShardLRUCache {} + +unsafe impl Sync for ShardLRUCache {} + +/// shard的实现可以降低锁粒度, 提高并发度 +impl ShardLRUCache { + pub fn new() -> ShardLRUCache { Self { - shard: shard_vec + shard: arr!([LRUCache::default(); 16]), + __private: (), } } - fn hash_slice(s: &Slice) -> u32 { - s.to_hash_with_seed(0) + pub fn new_with_arc() -> Arc>> { + Arc::new(RwLock::new(ShardLRUCache { + shard: arr!([LRUCache::default(); 16]), + __private: (), + })) } - fn shard(hash: u32) -> u32 { - hash.shr(32 - K_NUM_SHARD_BITS) + pub fn insert(&mut self, key: &Slice, value: T, charge: usize) -> Result<()> { + let hash = hash_slice(key); + self.shard[shard(hash)].insert(key.clone(), hash, value, charge) + } + pub fn lookup(&self, key: &Slice) -> Result>>>> { + let hash = hash_slice(key); + self.shard[shard(hash)].look_up(key, hash) + } + pub fn erase(&mut self, key: &Slice) -> Result<()> { + // 删除缓存 + let hash = hash_slice(key); + self.shard[shard(hash)].erase(key, hash) } + pub fn prune(&mut self) -> Result<()> { + // 清空全部shard的缓存 + for mut shard in &mut self.shard { + shard.prune()? + } + Ok(()) + } +} - /// 从缓存中获取数据 - /// - /// # Arguments - /// - /// * `key`: 键 - /// - /// returns: Result, Status> - /// - /// # Examples - /// - /// ``` - /// let value= cache.lookup(Slice::from("123")); - /// ``` - pub fn lookup(&self, key: &Slice) -> Result>> { - let hash = Self::hash_slice(&key); - let i = Self::shard(hash); - self.shard[i as usize].look_up(key, hash) +#[test] +fn test_insert_cache() -> Result<()> { + let mut cache = ShardLRUCache::new(); + let key = Slice::from("test_key"); + cache.insert(&key, 10, 4)?; + println!("{:?}", cache); + let handle = cache.lookup(&key)?; + println!("{:?}", handle); + assert_eq!(true, handle.is_some()); + assert_eq!(&10, handle.unwrap().read()?.value()); + + Ok(()) +} + +#[test] +fn test_insert_cache_multi_thread() -> Result<()> { + let mut cache = ShardLRUCache::new_with_arc(); + + let mut thread_vec = vec![]; + let thread_count = 128; + // 创建5线程写入缓存 + for i in 0..thread_count { + let share_cache = cache.clone(); + let thread = thread::spawn(move || -> Result<()>{ + let key = Slice::from("test_key".to_string() + &i.to_string()); + share_cache.write()?.insert(&key, i, 4)?; + + println!("write thread {}, write value: {}", i, i); + Ok(()) + }); + thread_vec.push(thread); } - /// 插入数据到缓存 - /// - /// # Arguments - /// - /// * `key`: 键 - /// * `value`: 值 - /// * `charge`: 空间占用量 - /// * `deleter`: 删除的回调函数 - /// - /// returns: () - /// - /// # Examples - /// - /// ``` - /// cache.insert(Slice::from("123", 123,1,move || {})) - /// ``` - pub fn insert(&mut self, key: Slice, value: T, charge: usize, deleter: F) -> Result<()> - where F: FnOnce(Slice, T) { - let hash = Self::hash_slice(&key); - let i = Self::shard(hash); - let mut shard = &mut self.shard[i as usize]; - shard.insert(key, hash, value, charge, deleter); - Ok(()) + for thread in thread_vec { + thread.join().unwrap()?; } - /// 释放引用 - /// 当数据不再需要使用时, 使用方必须释放引用 - /// - /// # Arguments - /// - /// * `handle`: 需要释放的值 - /// - /// returns: Result<(), Status> - /// - /// # Examples - /// - /// ``` - /// cache.release(handle); - /// ``` - pub fn release(&mut self, handle: LRUHandle) -> Result<()> { - todo!() + let mut thread_vec = vec![]; + + // 创建5线程读取缓存 + for i in 0..thread_count { + let share_cache = cache.clone(); + let thread = thread::spawn(move || -> Result<()>{ + let key = Slice::from("test_key".to_string() + &i.to_string()); + let read = share_cache.read()?.lookup(&key)?; + println!("read thread {}, read value: {}", i, read.clone().unwrap().read()?.value); + assert_eq!(true, read.is_some()); + assert_eq!(i, read.clone().unwrap().read()?.value); + Ok(()) + }); + thread_vec.push(thread); } - /// 从缓存中删除值 - /// - /// # Arguments - /// - /// * `key`: 值 - /// - /// returns: Result<(), Status> - /// - /// # Examples - /// - /// ``` - /// cache.erase(Slice::from("123")); - /// ``` - pub fn erase(&mut self, key: &Slice) -> Result<()> { - let hash = Self::hash_slice(&key); - let i = Self::shard(hash); - let mut shard = &mut self.shard[i as usize]; - shard.erase(key, hash) + for thread in thread_vec { + thread.join().unwrap()?; } + + // 线程全部执行完打印缓存信息 + println!("{:?}", cache); + + Ok(()) +} + +#[test] +fn test_erase_cache() -> Result<()> { + let mut cache = ShardLRUCache::new(); + let key = Slice::from("test_key"); + cache.insert(&key, 10, 4)?; + println!("{:?}", cache); + cache.erase(&key)?; + println!("{:?}", cache); + let handle = cache.lookup(&key)?; + println!("{:?}", handle); + assert_eq!(true, handle.is_none()); + + Ok(()) +} + +#[test] +fn test_clear_cache() -> Result<()> { + todo!() } \ No newline at end of file diff --git a/src/util/cache_test.rs b/src/util/cache_test.rs deleted file mode 100644 index c14bf2f..0000000 --- a/src/util/cache_test.rs +++ /dev/null @@ -1,130 +0,0 @@ -mod test { - use std::borrow::Borrow; - use std::collections::HashMap; - use std::ops::Deref; - use crate::util::cache::{LRUHandle, ShardLRUCache}; - use crate::util::slice::Slice; - - use crate::util::Result; - - #[test] - fn test_insert() -> Result<()> { - let mut cache: ShardLRUCache = ShardLRUCache::new_with_capacity(16); - let key = Slice::from("123"); - let value = 1234; - cache.insert(key.clone(), value, 1, move |k, v| { - println!("delete key: {}", String::from(k)); - println!("delete value: {}", v); - })?; - println!("key: {}", String::from(key.clone())); - println!("value: {}", value); - Ok(()) - } - - #[test] - fn test_update() -> Result<()> { - let mut cache: ShardLRUCache = ShardLRUCache::new_with_capacity(16); - let key = Slice::from("123"); - let value = 1234; - cache.insert(key.clone(), value, 1, move |k, v| { - println!("delete key: {}", String::from(k)); - println!("delete value: {}", v); - })?; - println!("key: {}", String::from(key.clone())); - println!("value: {}", value); - let mut inserted = cache.lookup(&key.clone())?; - assert_eq!(value, *inserted.unwrap().value()); - - let value = 1235; - cache.insert(key.clone(), value, 1, move |k, v| { - println!("delete key: {}", String::from(k)); - println!("delete value: {}", v); - })?; - let mut inserted = cache.lookup(&key.clone())?; - println!("key: {}", String::from(key.clone())); - println!("value: {}", value); - assert_eq!(value, *inserted.unwrap().value()); - - Ok(()) - } - - #[test] - fn test_lookup() -> Result<()> { - let mut cache: ShardLRUCache = ShardLRUCache::new_with_capacity(16); - let key = Slice::from("123"); - let value = 1234; - cache.insert(key.clone(), value, 1, move |k, v| { - println!("delete key: {}", String::from(k)); - println!("delete value: {}", v); - })?; - println!("key: {}", String::from(key.clone())); - println!("value: {}", value); - - let value = cache.lookup(&key.clone())?; - match value { - None => { - println!("value is none"); - } - Some(v) => { - println!("key: {}", String::from(v.key())); - println!("value: {}", v.value()); - } - } - - Ok(()) - } - - #[test] - fn test_remove() -> Result<()> { - let mut cache: ShardLRUCache = ShardLRUCache::new_with_capacity(16); - let key = Slice::from("123"); - let value = 1234; - cache.insert("123", value, 1, move |k, v| { - println!("delete key: {}", String::from(k)); - println!("delete value: {}", v); - })?; - println!("key: {:?}", &key); - println!("value: {}", value); - - let lookup = cache.lookup(&key.clone())?; - match &lookup { - None => { - println!("value is none"); - } - Some(v) => { - println!("key: {}", String::from(v.key())); - println!("value: {}", v.value()); - } - } - assert_eq!(value, *lookup.unwrap().value()); - - cache.erase(&key)?; - - let lookup = cache.lookup(&key.clone())?; - match &lookup { - None => { - println!("value is none"); - } - Some(v) => { - println!("key: {}", String::from(v.key())); - println!("value: {}", v.value()); - } - } - assert_eq!(None, lookup); - - Ok(()) - } - - #[test] - fn test_hash_map() { - let mut map: HashMap<&str, &str> = HashMap::new(); - map.insert("123", "a"); - let value = map.get("123"); - match value { - None => {} - Some(v) => { - println!("{}", v); - } - } - } -} \ No newline at end of file diff --git a/src/util/debug.rs b/src/util/debug.rs index 464919b..5e6926e 100644 --- a/src/util/debug.rs +++ b/src/util/debug.rs @@ -10,7 +10,7 @@ macro_rules! debug { }; ($($arg:tt)*) => {{ use std::io::Write; - std::io::stdout().write(format!($($arg)*).as_bytes()); + std::io::stdout().write(format!($($arg)*).as_bytes()).unwrap(); debug!(); }}; } diff --git a/src/util/mod.rs b/src/util/mod.rs index 53143b2..07789c7 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -11,7 +11,6 @@ pub mod r#const; pub mod slice; mod slice_test; pub mod cache; -mod cache_test; pub mod coding; pub mod arena; mod arena_test; diff --git a/tests/custom_proc_macro_test.rs b/tests/custom_proc_macro_test.rs new file mode 100644 index 0000000..e3bd642 --- /dev/null +++ b/tests/custom_proc_macro_test.rs @@ -0,0 +1,39 @@ +use custom_proc_macro::arr; + +#[derive(Debug, PartialEq)] +struct Test; + +#[test] +fn test_arr() { + let origin = [0; 16]; + + let u32_arr = arr!([0_u32; 16]); + println!("{:?}", u32_arr); + assert_eq!(origin, u32_arr); + + let num_arr = arr!([0; 16]); + println!("{:?}", num_arr); + assert_eq!(origin, num_arr); + + let u32_arr: [u32; 16] = arr!([0_u32; 16]); + println!("{:?}", u32_arr); + assert_eq!(origin, u32_arr); + + let num_arr: [u32; 16] = arr!([0; 16]); + println!("{:?}", num_arr); + assert_eq!(origin, num_arr); + + let num_arr: [u64; 16] = arr!([0; 16]); + println!("{:?}", num_arr); + assert_eq!(origin, u32_arr); + + let test_origin = [ + Test, Test, Test, Test, Test, Test, Test, Test, + Test, Test, Test, Test, Test, Test, Test, Test + ]; + let test_arr = arr!([Test; 16]); + println!("{:?}", test_arr); + assert_eq!(test_origin, test_arr); + + let err = arr!(Test;16); +} \ No newline at end of file -- Gitee From e875b273469a8646963727110bc8c66e01dcb06e Mon Sep 17 00:00:00 2001 From: colagy Date: Tue, 25 Apr 2023 22:40:40 +0800 Subject: [PATCH 3/3] HashTable in shardLruCache can auto resize; LruCache can auto remove the least recently used element when usage great than capacity; --- Cargo.toml | 1 + custom_proc_macro/Cargo.toml | 2 +- custom_proc_macro/src/lib.rs | 31 +- src/util/cache.rs | 985 +++++++++++++++++++++++++++++------ src/util/coding.rs | 3 +- 5 files changed, 853 insertions(+), 169 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e5d8965..7bdd91a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ rand = "0.8.5" tokio = "1.24.1" jemallocator = "0.5" jemalloc-sys = { version = "0.5", features = ["stats"] } +# 自定义过程宏的crate custom_proc_macro = { path = "custom_proc_macro" } [dev-dependencies] diff --git a/custom_proc_macro/Cargo.toml b/custom_proc_macro/Cargo.toml index 48488f1..3a37db5 100644 --- a/custom_proc_macro/Cargo.toml +++ b/custom_proc_macro/Cargo.toml @@ -9,6 +9,6 @@ proc-macro = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -syn = { version = "1.0", features = ["full"] } +syn = { version = "2.0.15", features = ["full"] } [features] \ No newline at end of file diff --git a/custom_proc_macro/src/lib.rs b/custom_proc_macro/src/lib.rs index 5142909..868ee8c 100644 --- a/custom_proc_macro/src/lib.rs +++ b/custom_proc_macro/src/lib.rs @@ -1,8 +1,8 @@ use proc_macro::{TokenStream}; use std::ops::Deref; -use syn::{ExprRepeat, parse_macro_input, Lit, Expr}; +use syn::{ExprRepeat, Lit, Expr}; use syn::__private::quote::quote; -use syn::parse_macro_input::parse; +use syn::parse; /// 生成数组的宏 主要用于没有实现copy语义的结构体 在无法使用[T; 32] 这种方式生成数组的情况下 /// @@ -26,7 +26,7 @@ use syn::parse_macro_input::parse; #[proc_macro] pub fn arr(input: TokenStream) -> TokenStream { let repeat_expr: ExprRepeat = parse(input) - .expect("like arr!([Test; 16])"); + .expect("Like arr!([Test::new(); 16])"); let mut len = 0; // 获取表达式中的长度信息并转为usize @@ -37,19 +37,28 @@ pub fn arr(input: TokenStream) -> TokenStream { } // 解析并拼接成数组 let _expr = repeat_expr.expr; - // 1.生成数组中的一个元素 - let _one = quote! { #_expr, }; + // 1.生成数组的集合 let mut _all = quote!(); - for _ in 0..len { - // 2.将数组中的每个元素向数组中追加 - _all = quote! { #_all #_one }; + for _i in 0..len { + // 2.将每个元素向数组中追加 + if let Expr::Path(path) = _expr.as_ref() { + // 如果是element宏的情况会调用element宏并传入index + let _mac_name = &path; + _all = quote! { #_all #_mac_name!(#_i, capacity, default_length), }; + } else { + _all = quote! { #_all #_expr, }; + } } // 3.加上中括号 let arr = quote! { [ #_all ] }; return arr.into(); } -#[test] -fn test_arr() { - let int_arr = arr!([u32; 12]); +/// 生成调用NonNull::new_unchecked()的方法, 会自动包裹unsafe{}代码块 +#[proc_macro] +pub fn non_null_new_uncheck(input: TokenStream) -> TokenStream { + let ptr_expr: Expr = parse(input.into()) + .expect("Like non_null_new_uncheck!(ptr), ptr must a variable with a raw point"); + let output = quote! { unsafe { std::ptr::NonNull::new_unchecked(#ptr_expr) } }; + output.into() } \ No newline at end of file diff --git a/src/util/cache.rs b/src/util/cache.rs index 58099fb..e15d7de 100644 --- a/src/util/cache.rs +++ b/src/util/cache.rs @@ -1,19 +1,19 @@ -use std::borrow::BorrowMut; -use std::cell::{RefCell, RefMut}; -use std::collections::HashMap; -use std::ops::{Deref, DerefMut, Shr}; -use std::rc::Rc; -use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard}; -use std::{io, result, thread}; -use std::any::Any; -use std::str::FromStr; -use std::sync::atomic::AtomicUsize; -use custom_proc_macro::arr; -use crate::util::hash::{Hash, ToHash}; -use crate::util::linked_list::LinkedList; -use crate::util::slice::Slice; +use std::{ptr, thread, usize}; +use std::fmt::Debug; +use std::marker::PhantomData; +use std::ops::Deref; +use std::ptr::NonNull; +use std::sync::{Arc, RwLock}; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use custom_proc_macro::{arr, non_null_new_uncheck}; +use crate::util::hash::Hash; use crate::util::Result; +use crate::util::slice::Slice; + +/// handle类型定义 +type HandleRef = NonNull>; // 缓存的对象, 以Handle为单位进行数据传递和共享, 其中的value是只读的, 带有读写锁 #[derive(Debug)] @@ -21,24 +21,27 @@ pub struct LRUHandle { // 缓存的键, 当hash出现冲突时判断key是否相等 key: Slice, // 缓存的数据, 只读 - value: T, + value: Arc, // key的hash值, 用于在HandleTable中寻址 hash: u32, // 是否在缓存中 in_cache: bool, // key的长度 key_length: usize, - // value的长度或者大小 + // value的长度或者数据量的大小, 用于统计当前缓存了多少数据量 charge: usize, - // 上一节点 - prev: Option>>>, - // 下一节点 - next: Option>>>, - // 下一lru节点 - next_lru: Option>>>, + // 上一节点(lruCache中的双向链表的上一节点) + prev: Option>, + // 下一节点(lruCache中的双向链表的下一节点) + next: Option>, + // 上一节点(handleTable中的双向链表的上一节点) + prev_hash: Option>, + // 下一节点(handleTable中的双向链表的下一节点) + next_hash: Option>, } impl LRUHandle { + /// 从栈上分配内存 fn new(key: Slice, value: T, hash: u32, @@ -47,21 +50,59 @@ impl LRUHandle { let key_length = key.size(); Self { key, - value, + value: Arc::new(value), hash, - in_cache: false, + in_cache: true, key_length, charge, prev: None, next: None, - next_lru: None, + prev_hash: None, + next_hash: None, } } + /// 从堆上分配内存 + /// + /// # Arguments + /// + /// * `key`: 键 + /// * `value`: 值 + /// * `hash`: 键的hash + /// * `charge`: 值的长度或者数据大小 + /// + /// returns: HandleRef + /// + /// # Examples + /// + /// ``` + /// + /// ``` + fn new_on_heap(key: Slice, value: T, hash: u32, charge: usize) -> HandleRef { + let key_length = key.size(); + // 在堆上分配 LRUHandle 使用的内存 + let data = Box::new(Self { + key, + value: Arc::new(value), + hash, + in_cache: true, + key_length, + charge, + prev: None, + next: None, + prev_hash: None, + next_hash: None, + }); + // 不检查是否为空指针 异常情况可能会导致程序崩溃 + // 转为裸指针后这块内存不会被自动回收 + non_null_new_uncheck!(Box::into_raw(data)) + } + /// 返回handle的键 pub fn key(&self) -> &Slice { &self.key } - pub fn value(&self) -> &T { - &*self + /// 返回handle的值 + pub fn value(&self) -> Arc { + self.value.clone() } } @@ -69,163 +110,496 @@ impl Deref for LRUHandle { type Target = T; fn deref(&self) -> &Self::Target { + // 解引用为value &self.value } } -impl DerefMut for LRUHandle { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.value - } -} - -#[derive(Debug)] +/// hash表 +/// 当写入达到阈值后会进行扩容, 可以传入default_length避免扩容 struct HandleTable { + // hash表中已写入的数据量 + elements: usize, + // hash表默认大小, prune时会恢复到这个长度 + default_length: usize, + // hash表的大小 length: usize, - list: Vec>>>>, + // hash表的table, 堆上分配数组 + list: Vec>>, + // shard号, 用于debug + _shard: usize, + // 标识LRUHandle属于HandleTable, 编译器会检查LRUHandle的生命周期小于HandleTable的生命周期 + _marker: PhantomData<*mut LRUHandle>, } -impl Default for HandleTable { - fn default() -> Self { - HandleTable { - length: 16, - list: vec![None; 16], +/// 格式化长度, 返回2的次幂 +fn format_length(length: usize) -> usize { + // 最小长度是DEFAULT_HASH_TABLE_LENGTH + if length <= DEFAULT_HASH_TABLE_LENGTH { + return DEFAULT_HASH_TABLE_LENGTH; + } + let mut shift = 0; + while length > 1 << shift { + shift += 1; + if 1_usize.checked_shl(shift).is_none() { + // 如果发生了溢出, 返回不溢出的最大值 + return 1 << (shift - 1); } } + 1 << shift } impl HandleTable { - pub fn look_up(&self, key: &Slice, hash: u32) -> Result>>>> { + fn new(shard: usize) -> Self { + Self::new_with_length(shard, DEFAULT_HASH_TABLE_LENGTH) + } + + fn new_with_length(shard: usize, default_length: usize) -> Self <> { + // 格式化用户输出的长度为2的次幂 + let length = format_length(default_length); + Self { + elements: 0, + default_length: length, + length, + list: vec![None; length], + _shard: shard, + _marker: PhantomData::default(), + } + } + + /// 从hash表中查询数据 + pub fn look_up(&self, key: &Slice, hash: u32) -> Result>> { + let index = self.find_index(hash); // 获取hash槽位上的数据, 存在则遍历链表 - let index = hash as usize & self.length.wrapping_sub(1); - let mut head = self.list[index].clone(); + let mut head = self.list[index]; while let Some(handle) = head { - let read = handle.read()?; - if &read.key == key { - return Ok(Some(handle.clone())); + let handle_ref = unsafe { handle.as_ref() }; + if &handle_ref.key == key { + return Ok(Some(handle)); } - head = read.next.clone(); + head = handle_ref.next_hash; } Ok(None) } - pub fn insert(&mut self, handle: LRUHandle) -> Result<()> { - let index = handle.hash as usize & (self.length - 1); - // 获取hash槽位上的数据, 不存在直接插入, 存在插入尾部 - match self.list[index].clone() { + /// 向hash表中插入数据 + pub fn insert(&mut self, mut handle: HandleRef) -> Result<()> { + let handle_mut = unsafe { handle.as_mut() }; + let index = self.find_index(handle_mut.hash); + // 获取hash槽位上的头节点 + match self.list[index] { Some(mut head) => { - while let Some(value) = head.clone().write()?.next.clone() { - head = value; - } - head.clone().write()?.next = Some(Arc::new(RwLock::new(handle))); + let head_mut = unsafe { head.as_mut() }; + // 头插法插入数据 + self.list[index] = Some(handle); + handle_mut.next_hash = Some(head); + head_mut.prev_hash = Some(handle); } None => { - self.list[index] = Some(Arc::new(RwLock::new(handle))); + self.list[index] = Some(handle); } } + self.elements += 1; + self.should_resize()?; Ok(()) } - pub fn remove(&mut self, key: &Slice, hash: u32) -> Result<()> { - let index = hash as usize & self.length.wrapping_sub(1); - let mut head = self.list[index].clone(); + /// 从hash表中删除数据, 并回收内存 + pub fn remove(&mut self, key: &Slice, hash: u32) -> Result>> { + let index = self.find_index(hash); + let mut head = self.list[index]; // 获取hash槽位上的数据, 遍历到key相等时删除handle - while let Some(handle) = head { - let write = handle.write()?; - if &write.key == key { - if write.prev.is_none() && write.next.is_none() { - // 只有一个节点直接置空 + while let Some(mut handle) = head { + let handle_mut = unsafe { handle.as_mut() }; + // key相等进行删除, 这里只断开链表的连接, 内存在lru链表上回收 + if &handle_mut.key == key { + if handle_mut.prev_hash.is_none() && handle_mut.next_hash.is_none() { + // 只有一个节点, 直接置空 self.list[index] = None; - } else if write.prev.is_none() { - // 头节点移交至下一节点 - self.list[index] = write.next.clone(); + } else if handle_mut.prev_hash.is_none() { + // 是头节点, 将头节点移交至下一节点 + self.list[index] = handle_mut.next_hash; + // 下一节点的prev_hash要置空 + handle_mut.prev_hash = None; } else { - // 其余中间节点或尾节点, 删除当前节点并将下一节点移交给上一节点 - write.prev.clone().unwrap().write()?.next = write.next.clone() + // 是其余中间节点或尾节点, 删除当前节点并将下一节点移交给上一节点 + let prev_hash_ptr = unsafe { handle_mut.prev_hash.unwrap().as_mut() }; + prev_hash_ptr.next_hash = handle_mut.next_hash; + // 下一结点不为空时, 将当前节点的prev移交给下一节点的prev + if let Some(mut next_hash) = handle_mut.next_hash { + let next_hash_ptr = unsafe { next_hash.as_mut() }; + next_hash_ptr.prev_hash = handle_mut.prev_hash; + } } + // 回收内存 + Self::drop_handle(handle.as_ptr()); + self.elements -= 1; + return Ok(Some(handle)); } - head = write.next.clone(); + head = handle_mut.next_hash; } - Ok(()) + Ok(None) + } + + + /// 清空hash表 并回收内存 + pub fn prune(&mut self) { + for handle in self.list.iter().filter(|v| v.is_some()) { + // 回收内存 + Self::drop_handle(handle.unwrap().as_ptr()); + } + // 清空list恢复内存 + self.list.clear(); + self.elements = 0; + // 恢复到初始的默认容量 + self.list.resize(self.default_length, None); + self.length = self.default_length; } + /// 获取hash表的长度 + #[inline] + #[allow(dead_code)] pub fn length(&self) -> usize { self.length } - fn resize(&mut self) { - todo!() + /// 是否需要扩容 + /// 需要扩容时调用扩容方法 + #[inline] + fn should_resize(&mut self) -> Result<()> { + // 负载因子需要平衡寻址速度与内存占用, 如果扩容后将溢出, 则不扩容 + if (self.elements as f32 > self.list.len() as f32 * LOAD_FACTOR) && self.list.len().checked_shl(1).is_some() { + self.resize()? + } + Ok(()) + } + + /// 获取hash槽位 + #[inline] + fn find_index(&self, hash: u32) -> usize { + hash as usize & self.length.wrapping_sub(1) + } + + /// hash表扩容 + /// 扩容操作较少使用, 标记为cold + #[cold] + fn resize(&mut self) -> Result<()> { + let old_len = self.list.len(); + let new_len = self.list.len() << 1; + self.list.resize(new_len, None); + self.length = new_len; + let list = &mut self.list; + let list_ptr = list.as_mut_ptr(); + // 遍历原hash表 + for (index, handle_option) in list[0..old_len].iter_mut().enumerate() { + if handle_option.is_none() { + // 为空的直接跳过 + continue; + } + let mut current_option = *handle_option; + let (mut low_head, mut low_tail) = (None, None); + let (mut high_head, mut high_tail) = (None, None); + while let Some(mut current) = current_option { + let current_mut = unsafe { current.as_mut() }; + let next = current_mut.next_hash; + // 与原来的容量进行与运算, 可能落在原位置 或者 原位置 + old_len + if current_mut.hash as usize & old_len == 0 { + // 低位 + if low_head.is_none() { + low_head = current_option; + low_tail = current_option; + } else { + // 头插法 + current_mut.next_hash = low_head; + unsafe { low_head.unwrap().as_mut().prev_hash = current_option }; + low_head = current_option; + } + } else { + // 高位 + if high_head.is_none() { + high_head = current_option; + high_tail = current_option; + } else { + // 头插法 + current_mut.next_hash = high_head; + unsafe { high_head.unwrap().as_mut().prev_hash = current_option }; + high_head = current_option; + } + } + current_option = next; + } + if low_head.is_some() { + unsafe { + // 头节点的prev_hash需要置空 + low_head.unwrap().as_mut().prev_hash = None; + // 尾节点的next_hash需要置空 + low_tail.unwrap().as_mut().next_hash = None; + } + } + unsafe { ptr::write(list_ptr.add(index), low_head); } + if high_head.is_some() { + unsafe { + // 头节点的prev_hash需要置空 + high_head.unwrap().as_mut().prev_hash = None; + // 尾节点的next_hash需要置空 + high_tail.unwrap().as_mut().next_hash = None; + } + } + unsafe { ptr::write(list_ptr.add(old_len + index), high_head); } + } + Ok(()) + } + + /// 将裸指针包装回Box并回收 + /// 只能在hash表删除后回收内存, 在其他位置回收内存可能会double free, 或其他未定义行为 + #[inline] + fn drop_handle(handle_ptr: *mut LRUHandle) { + // 将指针包装回box, box会在作用域结束之后自动drop掉 + unsafe { Box::from_raw(handle_ptr) }; } } -#[derive(Debug)] struct LRUCache { + // hash表, 用于存放缓存数据 + table: HandleTable, + // cache的容量 capacity: usize, + // cache的当前使用量, 使用量超过容量会进行扩容 usage: usize, - in_use: Option>, - table: HandleTable, + // lru链表的头指针, 最近使用的 + head_of_lru: Option>, + // lru链表的尾指针, 最先被删除 + tail_of_lru: Option>, + // shard号, 用于debug + _shard: usize, } -impl Default for LRUCache { - fn default() -> Self { +/// 默认容量 值的总长度或者是数据总大小 +const DEFAULT_CACHE_PRE_SHARD_CAPACITY: usize = (DEFAULT_CACHE_CAPACITY + (K_NUM_SHARDS - 1)) / K_NUM_SHARDS; + +impl LRUCache { + fn new(shard: usize) -> Self { + Self::new_with_capacity(shard, DEFAULT_CACHE_PRE_SHARD_CAPACITY, DEFAULT_SHARD_LENGTH) + } + /// 创建LruCache, 使用默认table, 指定容量 + fn new_with_capacity(shard: usize, capacity: usize, default_length: usize) -> Self { + Self::new_with_table_capacity(shard, capacity, default_length) + } + + /// 创建LruCache, 指定table, 指定容量 + fn new_with_table_capacity(shard: usize, capacity: usize, default_length: usize) -> Self { Self { - capacity: 0, + table: HandleTable::new_with_length(shard, default_length), + capacity, usage: 0, - in_use: None, - table: HandleTable::default(), + head_of_lru: None, + tail_of_lru: None, + _shard: shard, } } -} - -impl LRUCache { - pub fn new(capacity: usize, usage: usize, in_use: Option>, table: HandleTable) -> Self { - Self { capacity, usage, in_use, table } - } + /// 向lru缓存中插入数据 + /// # Arguments + /// * `key`: 键 + /// * `hash`: 键的hash + /// * `value`: 值 + /// * `charge`: 值的长度或数据大小 + /// returns: Result<(), Status> + /// # Examples + /// ``` + /// + /// ``` pub fn insert(&mut self, key: Slice, hash: u32, value: T, charge: usize) -> Result<()> { - let e = LRUHandle::new(key, - value, - hash, - charge, - ); - self.table.insert(e)?; - self.usage += 1; - self.capacity += 1; + let handle = LRUHandle::new_on_heap( + key.clone(), + value, + hash, + charge); + // hash表中插入数据 + self.table.insert(handle)?; + // 插入lru + self.lru_append(handle)?; + // 使用量加上写入的value的长度或者数据大小 + self.usage += charge; + + // 使用量已经达到容量, 那么删除最少使用的 + if self.usage >= self.capacity { + if let Some(tail) = self.tail_of_lru { + let tail_ref = unsafe { tail.as_ref() }; + // 先删除lru链表尾 + self.lru_remove(tail)?; + // 于从hash表中删除链表尾, 同时回收内存 + self.table.remove(&tail_ref.key, tail_ref.hash)?; + } + } + Ok(()) } - pub fn look_up(&self, key: &Slice, hash: u32) -> Result>>>> { - self.table.look_up(key, hash) + /// 从lru缓存查询数据 + pub fn look_up(&self, key: &Slice, hash: u32) -> Result>> { + match self.table.look_up(key, hash) { + Ok(handle) => { + match handle { + Some(handle) => { + // 返回为Arc, 这样用户才可以和缓存在多个线程中共享数据 + Ok(Some(unsafe { handle.as_ref() }.value.clone())) + } + None => { Ok(None) } + } + } + Err(err) => { + Err(err) + } + } } - pub fn erase(&mut self, key: &Slice, hash: u32) -> Result<()> { - self.table.remove(key, hash)?; - self.capacity += 1; - Ok(()) + /// 从lru缓存中删除数据, 同时回收内存 + pub fn erase(&mut self, key: &Slice, hash: u32) -> Result { + let mut charge = 0; + // 先从hash表中删除, 同时回收内存 + let removed_handle = self.table.remove(key, hash)?; + if let Some(removed) = removed_handle { + // 再删除lru链表中的数据 + self.lru_remove(removed)?; + charge = unsafe { removed.as_ref().charge }; + } + + // 返回删除了多少数据量 + Ok(charge) } + + /// 清空lru缓存, 同时回收内存 pub fn prune(&mut self) -> Result<()> { + // hash表清空, 回收内存 + self.table.prune(); + // lru头尾指针置空 + self.head_of_lru = None; + self.tail_of_lru = None; + // 使用量归零 + self.usage = 0; Ok(()) } + + /// 获取当前缓存的数据量 + #[inline] pub fn total_charge(&self) -> usize { - todo!() + self.usage } - fn lru_remove(&mut self, _handle: &LRUHandle) { - todo!() + /// 获取当前hash表的槽位数 + pub fn slots(&self) -> usize { + self.table.length } - fn lru_append(&mut self, _head_of_list: &LRUHandle, _e: LRUHandle) { - todo!() + + /// 向lru链表中插入新缓存, 头插法 + /// + /// # Arguments + /// + /// * `head_of_list`: + /// * `handle`: + /// + /// returns: () + /// + /// # Examples + /// + /// ``` + /// + /// ``` + fn lru_append(&mut self, mut handle: HandleRef) -> Result<()> { + if let None = self.head_of_lru { + // 头节点为空时, 尾节点也为空 + self.head_of_lru = Some(handle); + self.tail_of_lru = Some(handle); + return Ok(()); + } + // 头插法, 插入lru链表头 + let handle_mut = unsafe { handle.as_mut() }; + let mut head = self.head_of_lru.unwrap(); + let head_mut = unsafe { head.as_mut() }; + head_mut.prev = Some(handle); + handle_mut.next = Some(head); + + // 更新头指针 + self.head_of_lru = Some(handle); + + Ok(()) + } + + /// 删除lru链表中的数据, 同时回收内存 + fn lru_remove(&mut self, mut handle: HandleRef) -> Result<()> { + let handle_mut = unsafe { handle.as_mut() }; + + // 有上一节点, 上一节点直接连接到下一节点 + if let Some(mut prev) = handle_mut.prev { + unsafe { prev.as_mut() }.next = handle_mut.next; + } else { + // 没有上一节点代表是链表头, 需要更新头指针 + self.head_of_lru = handle_mut.next; + } + + // 有下一节点, 下一节点直接连接到上一节点 + if let Some(mut next) = handle_mut.next { + unsafe { next.as_mut() }.prev = handle_mut.prev; + } else { + // 没有下一节点代表是链表尾, 需要更新尾指针 + self.tail_of_lru = handle_mut.prev; + } + + // 使用量 + self.usage -= handle_mut.charge; + + // 删除后, 标记数据已经不在缓存中 + handle_mut.in_cache = false; + + Ok(()) } } -const K_NUM_SHARD_BITS: usize = 4; +macro_rules! cache_element { + ($shard:expr, $capacity:expr, $default_length:expr) => (RwLock::new(LRUCache::new_with_capacity($shard, pre_shard($capacity), $default_length))); +} + +macro_rules! cache_element_default { + ($shard:expr, $capacity:expr, $default_length:expr) => (RwLock::new(LRUCache::new($shard))); +} + +const K_NUM_SHARD_BITS: usize = 5; +/// 默认shard数 32 const K_NUM_SHARDS: usize = 1 << K_NUM_SHARD_BITS; +/// 默认1000万条或者10M数据 +const DEFAULT_CACHE_CAPACITY: usize = 10_000_000; +/// 负载因子不要太小, 否则会浪费内存 +const LOAD_FACTOR: f32 = 0.75; +const DEFAULT_HASH_TABLE_LENGTH: usize = 16; +// 默认hash表长度为默认shard数*默认的hash表长度 +const DEFAULT_SHARD_LENGTH: usize = K_NUM_SHARDS * DEFAULT_HASH_TABLE_LENGTH; -#[derive(Debug)] +/// 具有多个shard的lru缓存 +/// shard的实现可以降低锁粒度, 提高并发度 +/// shard之间的lru容量是相等的, 会进行独立的lru淘汰, hash表扩容等操作 +/// 每个shard拥有独立的读写锁, 一个shard的读写操作不会影响另一个shard的读写 +/// 插入和删除数据时会更新容量, 当容量达到上限时会进行扩容操作 +/// 目前没有实现自动的缩容操作, 可以调用total_charge判断当前容量并进行手动清空 +/// +/// ### Note +/// 1.当使用RC构成双向链表时, 请不要尝试打印cache, 否则会无限递归 +/// ShardLRUCache, LRUCache, HandleTable 不实现Debug +/// 2. 加读锁后请勿再加次读锁, 否则可能死锁 +/// | 线程1 | 线程2 | +/// | ------ | ------- | +/// | read | | +/// | | write(block) | +/// | read(dead) | | +/// pub struct ShardLRUCache { - shard: [LRUCache; 16], - // 封闭构造器, 请使用ShardLRUCache::new()进行构造, 请勿自行构造结构体 - __private: (), + // shard用于降低锁粒度 + shard: [RwLock>; 32], + // 默认的初始化hash表长度, 用于初始化hash表 + // 使用较大的值可以避免扩容, 但是不要使用过大的值避免浪费空间 + default_length: usize, + // 当前所有shard中lru cache的最大容量, 超过这个容量将会淘汰数据 + capacity: usize, } #[inline] @@ -243,96 +617,341 @@ fn pre_shard(capacity: usize) -> usize { (capacity + (K_NUM_SHARDS - 1)) / K_NUM_SHARDS } +/// 所有权可以多线程传递 unsafe impl Send for ShardLRUCache {} +/// 不可变借用可以多线程共享, 内部shard具有可变性并且加锁, 可以安全的在多线程环境下使用 unsafe impl Sync for ShardLRUCache {} -/// shard的实现可以降低锁粒度, 提高并发度 impl ShardLRUCache { - pub fn new() -> ShardLRUCache { + /// 私有化构造器 + /// 请使用ShardLRUCache::new()进行构造, 请勿尝试自行构造结构体 + fn default() -> Self { Self { - shard: arr!([LRUCache::default(); 16]), - __private: (), + shard: arr!([cache_element_default; 32]), + default_length: DEFAULT_SHARD_LENGTH, + capacity: DEFAULT_CACHE_CAPACITY, } } - pub fn new_with_arc() -> Arc>> { - Arc::new(RwLock::new(ShardLRUCache { - shard: arr!([LRUCache::default(); 16]), - __private: (), - })) + /// 创建ShardLruCache单线程使用 + /// 单线程使用时内部的读写锁会被编译器消除 + /// + /// # Arguments + /// + /// * `capacity`: 最大容量, 超出这个容量时, 将会开始淘汰数据 + /// * `default_length`: 默认的hash表容量, 使用较大的值可以避免扩容, 但不要使用太大的值, 避免空间浪费 + /// + /// returns: ShardLRUCache + /// + /// # Examples + /// + /// ``` + /// use level_db_rust::util::cache::ShardLRUCache; + /// let charge = 4; + /// let total_length = 10000; + /// ShardLRUCache::new_with_capacity(charge * total_length, 1000); + /// ``` + pub fn new_with_capacity(capacity: usize, default_length: usize) -> ShardLRUCache { + let mut default_length = if default_length <= DEFAULT_SHARD_LENGTH { + DEFAULT_SHARD_LENGTH + } else { + default_length + }; + default_length = default_length / K_NUM_SHARDS; + Self { + shard: arr!([cache_element; 32]), + default_length, + capacity, + } } - pub fn insert(&mut self, key: &Slice, value: T, charge: usize) -> Result<()> { + + /// 创建ShardLruCache多线程使用 + /// lookUp会加读锁, insert/erase/prune等写操作会加写锁 + /// 持有写锁的线程panic后, 会导致锁中毒, 数据无法访问, 持有读锁线程panic不会中毒 + /// + /// # Arguments + /// + /// * `capacity`: 最大容量, 超出这个容量时, 将会开始淘汰数据 + /// * `default_length`: 默认的hash表容量, 使用较大的值可以避免扩容, 但不要使用太大的值, 避免空间浪费 + /// + /// returns: Arc> + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// use level_db_rust::util::cache::ShardLRUCache; + /// let charge = 4; + /// let total_length = 10000; + /// let cache = ShardLRUCache::new_arc_with_capacity(charge * total_length, 1000); + /// thread::spawn(move || -> Result<(),E>{ + /// cache_clone.insert("key".into(), 1, charge)?; + /// Ok(()) + /// }); + /// ``` + pub fn new_arc_with_capacity(capacity: usize, default_length: usize) -> Arc> { + let default_length = if default_length <= DEFAULT_SHARD_LENGTH { + DEFAULT_SHARD_LENGTH + } else { + default_length + }; + let default_length_per_shard = default_length / K_NUM_SHARDS; + Arc::new(Self { + shard: arr!([cache_element; 32]), + default_length: default_length_per_shard, + capacity, + }) + } + + + /// 向shard中插入数据 + /// 插入时会将值写入指定的shard, 每个 + /// # Arguments + /// * `key`: 键 + /// * `value`: 值 + /// * `charge`: 值长度或者数据大小 + /// returns: Result<(), Status> + /// # Examples + /// ``` + /// use level_db_rust::util::cache::ShardLRUCache; + /// let cache = ShardLRUCache::new_with_capacity(40_0000, 1000); + /// let value = 1; + /// cache.insert("key".into(), value, charge)?; + /// ``` + pub fn insert(&self, key: &Slice, value: T, charge: usize) -> Result<()> { let hash = hash_slice(key); - self.shard[shard(hash)].insert(key.clone(), hash, value, charge) + let shard = shard(hash); + let result = self.shard[shard].write()?.insert(key.clone(), hash, value, charge); + result } - pub fn lookup(&self, key: &Slice) -> Result>>>> { + + /// 从shard中查询缓存数据 + /// 返回Arc包装的数据, 便于多线程共享value的引用, 请不要在cache外回收value的内存 + /// + /// # Arguments + /// * `key`: 键 + /// returns: Result>>>, Status> + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use level_db_rust::util::cache::ShardLRUCache; + /// use level_db_rust::util::slice::Slice; + /// + /// let cache = ShardLRUCache::new_with_capacity(40_0000, 1000); + /// let key: Slice = "key".into(); + /// let value: Option> = cache.lookup(&key)?; + /// ``` + pub fn lookup(&self, key: &Slice) -> Result>> { let hash = hash_slice(key); - self.shard[shard(hash)].look_up(key, hash) + let shard = shard(hash); + self.shard[shard].read()?.look_up(key, hash) } + + /// 从shard中删除缓存数据 + /// + /// # Arguments + /// * `key`: 键 + /// returns: Result<(), Status> + /// # Examples + /// + /// ``` + /// use level_db_rust::util::cache::ShardLRUCache; + /// use level_db_rust::util::slice::Slice; + /// + /// let mut cache = ShardLRUCache::new_with_capacity(40_0000, 1000); + /// let key: Slice = "key".into(); + /// cache.erase(&key)?; + /// ``` pub fn erase(&mut self, key: &Slice) -> Result<()> { - // 删除缓存 let hash = hash_slice(key); - self.shard[shard(hash)].erase(key, hash) + // 删除缓存 + self.shard[shard(hash)].write()?.erase(key, hash)?; + Ok(()) } + + /// 清空全部shard的缓存 + /// + /// returns: Result<(), Status> + /// # Examples + /// ``` + /// use level_db_rust::util::cache::ShardLRUCache; + /// + /// let mut cache = ShardLRUCache::new_with_capacity(40_0000, 1000); + /// cache.prune()?; + /// ``` pub fn prune(&mut self) -> Result<()> { // 清空全部shard的缓存 - for mut shard in &mut self.shard { - shard.prune()? + for shard in &mut self.shard { + shard.write()?.prune()? } Ok(()) } + + /// 获取当前缓存的总数据量 + pub fn total_charge(&self) -> Result { + let mut total_charge = 0; + for shard in &self.shard { + total_charge += shard.read()?.total_charge(); + } + Ok(total_charge) + } + + /// 获取当前缓存的最大容量 + #[inline] + #[allow(dead_code)] + pub fn capacity(&self) -> usize { + self.capacity + } + + /// 获取当前全部shard中的槽位数, 可用于判断内存占用情况及扩容效果 + #[allow(dead_code)] + pub fn slots(&self) -> Result { + let mut slots = 0; + for shard in &self.shard { + slots += shard.read()?.slots(); + } + Ok(slots) + } +} + +#[test] +fn test_insert_lookup_single() -> Result<()> { + let capacity = 10_0000; + let cache = ShardLRUCache::new_with_capacity(capacity, 100); + let key = Slice::from("test_key".to_owned() + &1.to_string()); + cache.insert(&key, 1, 4)?; + + let result = cache.lookup(&key)?; + assert_eq!(true, result.is_some()); + assert_eq!(1, *result.unwrap()); + + Ok(()) } #[test] fn test_insert_cache() -> Result<()> { - let mut cache = ShardLRUCache::new(); - let key = Slice::from("test_key"); - cache.insert(&key, 10, 4)?; - println!("{:?}", cache); - let handle = cache.lookup(&key)?; - println!("{:?}", handle); - assert_eq!(true, handle.is_some()); - assert_eq!(&10, handle.unwrap().read()?.value()); + let size = 100_0000; + let capacity = 1_0000_0000; + let cache = ShardLRUCache::new_with_capacity(capacity, size); + + let slots = cache.slots()?; + eprintln!("init slots: {}", slots); + + let charge = 4; + for i in 0..size { + let key = Slice::from("test_key".to_owned() + &i.to_string()); + // dbg!(key.clone().to_string()); + cache.insert(&key, i, charge)?; + } + + let total_charge = cache.total_charge()?; + dbg!(total_charge); + assert_eq!(size * charge, total_charge); + + println!("insert count: {}", size); + + let slots = cache.slots()?; + println!("slots after insert: {}", slots); + + for i in 0..size { + let key = Slice::from("test_key".to_owned() + &i.to_string()); + let value = cache.lookup(&key)?; + // dbg!(value.clone()); + assert_eq!(true, value.is_some(), "i: {}", i); + assert_eq!(i, *value.unwrap()); + } + + + Ok(()) +} + +#[test] +fn test_insert_lru() -> Result<()> { + // 测试lru淘汰 + let size = 100_0000; + let capacity = 4_0000; + let cache = ShardLRUCache::new_with_capacity(capacity, size); + let charge = 4; + for i in 0..size { + let key = Slice::from("test_key".to_owned() + &i.to_string()); + // dbg!(key.clone().to_string()); + cache.insert(&key, i, charge)?; + } + + let total_charge = cache.total_charge()?; + dbg!(total_charge); + // 由于shard分布可能有倾斜, 写入的容量小于容量限制即可 + assert_eq!(true, total_charge < capacity); + + let mut count = 0; + for i in 0..size { + let key = Slice::from("test_key".to_owned() + &i.to_string()); + let value = cache.lookup(&key)?; + // dbg!(value.clone()); + if let Some(v) = value { + assert_eq!(i, *v, "i: {}", i); + count += 1; + } + } + + // 由于shard分布可能有倾斜, 可以取出数量小于容量限制即可 + dbg!(count); + assert_eq!(true, count < capacity / charge); + + // 写入数量应该等于写入容量除以单个数据的大小 + assert_eq!(count, total_charge / charge); Ok(()) } #[test] fn test_insert_cache_multi_thread() -> Result<()> { - let mut cache = ShardLRUCache::new_with_arc(); + // todo 多线程写入 数据分组 + let capacity = 4_0000; + let thread_count: usize = 8; + let charge = 4; + let cache = ShardLRUCache::new_arc_with_capacity(capacity, thread_count); let mut thread_vec = vec![]; - let thread_count = 128; - // 创建5线程写入缓存 + // 创建多线程写入缓存 for i in 0..thread_count { let share_cache = cache.clone(); - let thread = thread::spawn(move || -> Result<()>{ + let thread_builder = thread::Builder::new().name("my-thread".to_string().to_owned() + i.to_string().as_str()); + let thread = thread_builder.spawn(move || -> Result<()>{ let key = Slice::from("test_key".to_string() + &i.to_string()); - share_cache.write()?.insert(&key, i, 4)?; + share_cache.insert(&key, i, charge)?; - println!("write thread {}, write value: {}", i, i); + // println!("write thread {}, write value: {}", i, i); Ok(()) }); thread_vec.push(thread); } for thread in thread_vec { - thread.join().unwrap()?; + thread?.join().unwrap()?; } let mut thread_vec = vec![]; - // 创建5线程读取缓存 + let in_cache_count = Arc::new(AtomicUsize::new(0)); + let out_cache_count = Arc::new(AtomicUsize::new(0)); + // 创建多线程读取缓存 for i in 0..thread_count { let share_cache = cache.clone(); + let share_in_cache_count = in_cache_count.clone(); + let share_out_cache_count = out_cache_count.clone(); let thread = thread::spawn(move || -> Result<()>{ let key = Slice::from("test_key".to_string() + &i.to_string()); - let read = share_cache.read()?.lookup(&key)?; - println!("read thread {}, read value: {}", i, read.clone().unwrap().read()?.value); - assert_eq!(true, read.is_some()); - assert_eq!(i, read.clone().unwrap().read()?.value); + let read = share_cache.lookup(&key)?; + if read.is_some() { + assert_eq!(i, *read.clone().unwrap().as_ref()); + share_in_cache_count.fetch_add(1, Ordering::Relaxed); + } else { + share_out_cache_count.fetch_add(1, Ordering::Relaxed); + } Ok(()) }); thread_vec.push(thread); @@ -342,20 +961,23 @@ fn test_insert_cache_multi_thread() -> Result<()> { thread.join().unwrap()?; } - // 线程全部执行完打印缓存信息 - println!("{:?}", cache); + println!("in cache count: {}", in_cache_count.load(Ordering::Acquire)); + println!("out cache count: {}", out_cache_count.load(Ordering::Acquire)); + let total_charge = cache.total_charge()?; + println!("thread_count: {}, charge: {}, capacity: {}, total_charge: {}", thread_count, charge, capacity, total_charge); + assert_eq!(true, charge * in_cache_count.load(Ordering::Acquire) < capacity); Ok(()) } #[test] fn test_erase_cache() -> Result<()> { - let mut cache = ShardLRUCache::new(); + let mut cache = ShardLRUCache::new_with_capacity(1000000000, 1024); let key = Slice::from("test_key"); cache.insert(&key, 10, 4)?; - println!("{:?}", cache); cache.erase(&key)?; - println!("{:?}", cache); + cache.insert(&key, 10, 4)?; + cache.erase(&key)?; let handle = cache.lookup(&key)?; println!("{:?}", handle); assert_eq!(true, handle.is_none()); @@ -364,6 +986,59 @@ fn test_erase_cache() -> Result<()> { } #[test] -fn test_clear_cache() -> Result<()> { - todo!() +fn test_prune() -> Result<()> { + let default_length = 1024; + let mut cache = ShardLRUCache::new_with_capacity(1000000000, default_length); + + let slots = cache.slots()?; + dbg!(slots); + + let count = 100_0000; + + let charge = 4; + println!("-------------------- before insert --------------------"); + for i in 0..count { + let key: Slice = ("key".to_owned() + &i.to_string()).into(); + cache.insert(&key, i, charge)?; + } + println!("-------------------- after insert --------------------"); + + + let total_charge = cache.total_charge()?; + dbg!(total_charge); + assert_eq!(charge * count, total_charge); + + for i in 0..count { + let key: Slice = ("key".to_owned() + &i.to_string()).into(); + let value = cache.lookup(&key)?; + assert_eq!(true, value.is_some(), "i: {}", i); + assert_eq!(i, *value.unwrap()); + } + + dbg!(cache.capacity()); + let slots = cache.slots()?; + dbg!(slots); + + println!("-------------------- before prune --------------------"); + cache.prune()?; + println!("-------------------- after prune --------------------"); + + let slots = cache.slots()?; + dbg!(slots); + assert_eq!(default_length, slots); + dbg!(cache.capacity()); + + // 清空后 总存储的数据量为0 + let total_charge = cache.total_charge()?; + dbg!(total_charge); + assert_eq!(0, total_charge); + + // 清空后 数据不能再查询出来 + for i in 0..count { + let key: Slice = ("key".to_owned() + &i.to_string()).into(); + let value = cache.lookup(&key)?; + assert_eq!(true, value.is_none(), "i: {}", i); + } + + Ok(()) } \ No newline at end of file diff --git a/src/util/coding.rs b/src/util/coding.rs index 4359aa2..e9fc98c 100644 --- a/src/util/coding.rs +++ b/src/util/coding.rs @@ -1,7 +1,6 @@ use std::{intrinsics, ptr}; use std::alloc::{alloc, Layout}; use std::ops::Deref; -use jemalloc_sys::malloc; use crate::util::coding::EncodeData::{Buffer, Slices, Vector}; use crate::util::coding::MutEncodeData::{MutBuffer, MutSlices, MutVector}; @@ -1861,7 +1860,7 @@ fn test_swap_bytes() { println!("value: {:?}, new_value: {:?}", value, new_value); assert_eq!(value, new_value); // 小端存储bytes - let mut buf = [0x01, 0x02, 0x03, 0x04]; + let buf = [0x01, 0x02, 0x03, 0x04]; let decode = unsafe { uncheck_decode_fixed32(&Buffer(&buf), 0) }; // 小端存储的0x01,0x02,0x03,0x04解出来的数据要等于0x04030201_u32 println!("value: {:?}, decode: {:?}", value, decode); -- Gitee