|
|
|
@ -10,6 +10,7 @@ use std::{
|
|
|
|
|
pin::Pin,
|
|
|
|
|
sync::Arc,
|
|
|
|
|
};
|
|
|
|
|
use thread_local::ThreadLocal;
|
|
|
|
|
use tokio::sync::oneshot::Sender;
|
|
|
|
|
use tracing::debug;
|
|
|
|
|
|
|
|
|
@ -40,6 +41,8 @@ impl<T> Drop for NonAliasingBox<T> {
|
|
|
|
|
|
|
|
|
|
pub struct Engine {
|
|
|
|
|
writer: Mutex<Connection>,
|
|
|
|
|
read_conn_tls: ThreadLocal<Connection>,
|
|
|
|
|
read_iterator_conn_tls: ThreadLocal<Connection>,
|
|
|
|
|
|
|
|
|
|
path: PathBuf,
|
|
|
|
|
cache_size_per_thread: u32,
|
|
|
|
@ -62,34 +65,14 @@ impl Engine {
|
|
|
|
|
self.writer.lock()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn read_lock(&self) -> &'static Connection {
|
|
|
|
|
READ_CONNECTION.with(|cell| {
|
|
|
|
|
let connection = &mut cell.borrow_mut();
|
|
|
|
|
|
|
|
|
|
if (*connection).is_none() {
|
|
|
|
|
let c = Box::leak(Box::new(
|
|
|
|
|
Self::prepare_conn(&self.path, self.cache_size_per_thread).unwrap(),
|
|
|
|
|
));
|
|
|
|
|
**connection = Some(c);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
connection.unwrap()
|
|
|
|
|
})
|
|
|
|
|
fn read_lock<'a>(&'a self) -> &'a Connection {
|
|
|
|
|
self.read_conn_tls
|
|
|
|
|
.get_or(|| Self::prepare_conn(&self.path, self.cache_size_per_thread).unwrap())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn read_lock_iterator(&self) -> &'static Connection {
|
|
|
|
|
READ_CONNECTION_ITERATOR.with(|cell| {
|
|
|
|
|
let connection = &mut cell.borrow_mut();
|
|
|
|
|
|
|
|
|
|
if (*connection).is_none() {
|
|
|
|
|
let c = Box::leak(Box::new(
|
|
|
|
|
Self::prepare_conn(&self.path, self.cache_size_per_thread).unwrap(),
|
|
|
|
|
));
|
|
|
|
|
**connection = Some(c);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
connection.unwrap()
|
|
|
|
|
})
|
|
|
|
|
fn read_lock_iterator<'a>(&'a self) -> &'a Connection {
|
|
|
|
|
self.read_iterator_conn_tls
|
|
|
|
|
.get_or(|| Self::prepare_conn(&self.path, self.cache_size_per_thread).unwrap())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn flush_wal(self: &Arc<Self>) -> Result<()> {
|
|
|
|
@ -105,15 +88,18 @@ impl DatabaseEngine for Engine {
|
|
|
|
|
|
|
|
|
|
// calculates cache-size per permanent connection
|
|
|
|
|
// 1. convert MB to KiB
|
|
|
|
|
// 2. divide by permanent connections
|
|
|
|
|
// 2. divide by permanent connections + permanent iter connections + write connection
|
|
|
|
|
// 3. round down to nearest integer
|
|
|
|
|
let cache_size_per_thread: u32 =
|
|
|
|
|
((config.db_cache_capacity_mb * 1024.0) / (num_cpus::get().max(1) + 1) as f64) as u32;
|
|
|
|
|
let cache_size_per_thread: u32 = ((config.db_cache_capacity_mb * 1024.0)
|
|
|
|
|
/ ((num_cpus::get().max(1) * 2) + 1) as f64)
|
|
|
|
|
as u32;
|
|
|
|
|
|
|
|
|
|
let writer = Mutex::new(Self::prepare_conn(&path, cache_size_per_thread)?);
|
|
|
|
|
|
|
|
|
|
let arc = Arc::new(Engine {
|
|
|
|
|
writer,
|
|
|
|
|
read_conn_tls: ThreadLocal::new(),
|
|
|
|
|
read_iterator_conn_tls: ThreadLocal::new(),
|
|
|
|
|
path,
|
|
|
|
|
cache_size_per_thread,
|
|
|
|
|
});
|
|
|
|
|