diff --git a/src/database.rs b/src/database.rs index 2d7886e6..9452e638 100644 --- a/src/database.rs +++ b/src/database.rs @@ -53,6 +53,10 @@ pub struct Config { sqlite_wal_clean_second_interval: u32, #[serde(default = "default_sqlite_wal_clean_second_timeout")] sqlite_wal_clean_second_timeout: u32, + #[serde(default = "default_sqlite_spillover_reap_fraction")] + sqlite_spillover_reap_fraction: f64, + #[serde(default = "default_sqlite_spillover_reap_interval_secs")] + sqlite_spillover_reap_interval_secs: u32, #[serde(default = "default_max_request_size")] max_request_size: u32, #[serde(default = "default_max_concurrent_requests")] @@ -121,6 +125,14 @@ fn default_sqlite_wal_clean_second_timeout() -> u32 { 2 } +fn default_sqlite_spillover_reap_fraction() -> f64 { + 0.5 +} + +fn default_sqlite_spillover_reap_interval_secs() -> u32 { + 60 +} + fn default_max_request_size() -> u32 { 20 * 1024 * 1024 // Default to 20 MB } @@ -420,7 +432,10 @@ impl Database { drop(guard); #[cfg(feature = "sqlite")] - Self::start_wal_clean_task(&db, &config).await; + { + Self::start_wal_clean_task(&db, &config).await; + Self::start_spillover_reap_task(builder, &config).await; + } Ok(db) } @@ -541,6 +556,32 @@ impl Database { self._db.flush_wal() } + #[cfg(feature = "sqlite")] + pub async fn start_spillover_reap_task(engine: Arc, config: &Config) { + let fraction = config.sqlite_spillover_reap_fraction.clamp(0.01, 1.0); + let interval_secs = config.sqlite_spillover_reap_interval_secs as u64; + + let weak = Arc::downgrade(&engine); + + tokio::spawn(async move { + use tokio::time::interval; + + use std::{sync::Weak, time::Duration}; + + let mut i = interval(Duration::from_secs(interval_secs)); + + loop { + i.tick().await; + + if let Some(arc) = Weak::upgrade(&weak) { + arc.reap_spillover_by_fraction(fraction); + } else { + break; + } + } + }); + } + #[cfg(feature = "sqlite")] pub async fn start_wal_clean_task(lock: &Arc>, config: &Config) { use tokio::time::{interval, timeout}; diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index 25d236a9..8100ed91 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -1,3 +1,11 @@ +use super::{DatabaseEngine, Tree}; +use crate::{database::Config, Result}; +use crossbeam::channel::{ + bounded, unbounded, Receiver as ChannelReceiver, Sender as ChannelSender, TryRecvError, +}; +use log::debug; +use parking_lot::{Mutex, MutexGuard, RwLock}; +use rusqlite::{params, Connection, DatabaseName::Main, OptionalExtension}; use std::{ collections::BTreeMap, future::Future, @@ -8,33 +16,12 @@ use std::{ thread, time::{Duration, Instant}, }; - -use crate::{database::Config, Result}; - -use super::{DatabaseEngine, Tree}; - -use log::debug; - -use crossbeam::channel::{bounded, Sender as ChannelSender}; -use parking_lot::{Mutex, MutexGuard, RwLock}; -use rusqlite::{params, Connection, DatabaseName::Main, OptionalExtension}; - use tokio::sync::oneshot::Sender; -// const SQL_CREATE_TABLE: &str = -// "CREATE TABLE IF NOT EXISTS {} {{ \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL }}"; -// const SQL_SELECT: &str = "SELECT value FROM {} WHERE key = ?"; -// const SQL_INSERT: &str = "INSERT OR REPLACE INTO {} (key, value) VALUES (?, ?)"; -// const SQL_DELETE: &str = "DELETE FROM {} WHERE key = ?"; -// const SQL_SELECT_ITER: &str = "SELECT key, value FROM {}"; -// const SQL_SELECT_PREFIX: &str = "SELECT key, value FROM {} WHERE key LIKE ?||'%' ORDER BY key ASC"; -// const SQL_SELECT_ITER_FROM_FORWARDS: &str = "SELECT key, value FROM {} WHERE key >= ? ORDER BY ASC"; -// const SQL_SELECT_ITER_FROM_BACKWARDS: &str = -// "SELECT key, value FROM {} WHERE key <= ? ORDER BY DESC"; - struct Pool { writer: Mutex, readers: Vec>, + spills: ConnectionRecycler, spill_tracker: Arc<()>, path: PathBuf, } @@ -43,7 +30,7 @@ pub const MILLI: Duration = Duration::from_millis(1); enum HoldingConn<'a> { FromGuard(MutexGuard<'a, Connection>), - FromOwned(Connection, Arc<()>), + FromRecycled(RecycledConn, Arc<()>), } impl<'a> Deref for HoldingConn<'a> { @@ -52,7 +39,57 @@ impl<'a> Deref for HoldingConn<'a> { fn deref(&self) -> &Self::Target { match self { HoldingConn::FromGuard(guard) => guard.deref(), - HoldingConn::FromOwned(conn, _) => conn, + HoldingConn::FromRecycled(conn, _) => conn.deref(), + } + } +} + +struct ConnectionRecycler(ChannelSender, ChannelReceiver); + +impl ConnectionRecycler { + fn new() -> Self { + let (s, r) = unbounded(); + Self(s, r) + } + + fn recycle(&self, conn: Connection) -> RecycledConn { + let sender = self.0.clone(); + + RecycledConn(Some(conn), sender) + } + + fn try_take(&self) -> Option { + match self.1.try_recv() { + Ok(conn) => Some(conn), + Err(TryRecvError::Empty) => None, + // as this is pretty impossible, a panic is warranted if it ever occurs + Err(TryRecvError::Disconnected) => panic!("Receiving channel was disconnected. A a sender is owned by the current struct, this should never happen(!!!)") + } + } +} + +struct RecycledConn( + Option, // To allow moving out of the struct when `Drop` is called. + ChannelSender, +); + +impl Deref for RecycledConn { + type Target = Connection; + + fn deref(&self) -> &Self::Target { + self.0 + .as_ref() + .expect("RecycledConn does not have a connection in Option<>") + } +} + +impl Drop for RecycledConn { + fn drop(&mut self) { + if let Some(conn) = self.0.take() { + log::debug!("Recycled connection"); + if let Err(e) = self.1.send(conn) { + log::warn!("Recycling a connection led to the following error: {:?}", e) + } } } } @@ -76,6 +113,7 @@ impl Pool { Ok(Self { writer, readers, + spills: ConnectionRecycler::new(), spill_tracker: Arc::new(()), path: path.as_ref().to_path_buf(), }) @@ -104,24 +142,41 @@ impl Pool { } fn read_lock(&self) -> HoldingConn<'_> { + // First try to get a connection from the permanent pool for r in &self.readers { if let Some(reader) = r.try_lock() { return HoldingConn::FromGuard(reader); } } - let spill_arc = self.spill_tracker.clone(); - let now_count = Arc::strong_count(&spill_arc) - 1 /* because one is held by the pool */; + log::debug!("read_lock: All permanent readers locked, obtaining spillover reader..."); - log::warn!("read_lock: all readers locked, creating spillover reader..."); + // We didn't get a connection from the permanent pool, so we'll dumpster-dive for recycled connections. + // Either we have a connection or we dont, if we don't, we make a new one. + let conn = match self.spills.try_take() { + Some(conn) => conn, + None => { + log::debug!("read_lock: No recycled connections left, creating new one..."); + Self::prepare_conn(&self.path, None).unwrap() + } + }; - if now_count > 1 { - log::warn!("read_lock: now {} spillover readers exist", now_count); - } + // Clone the spill Arc to mark how many spilled connections actually exist. + let spill_arc = Arc::clone(&self.spill_tracker); - let spilled = Self::prepare_conn(&self.path, None).unwrap(); + // Get a sense of how many connections exist now. + let now_count = Arc::strong_count(&spill_arc) - 1 /* because one is held by the pool */; - HoldingConn::FromOwned(spilled, spill_arc) + // If the spillover readers are more than the number of total readers, there might be a problem. + if now_count > self.readers.len() { + log::warn!( + "Database is under high load. Consider increasing sqlite_read_pool_size ({} spillover readers exist)", + now_count + ); + } + + // Return the recyclable connection. + HoldingConn::FromRecycled(self.spills.recycle(conn), spill_arc) } } @@ -188,6 +243,24 @@ impl Engine { ) .map_err(Into::into) } + + // Reaps (at most) (.len() * `fraction`) (rounded down, min 1) connections. + pub fn reap_spillover_by_fraction(&self, fraction: f64) { + let mut reaped = 0; + + let spill_amount = self.pool.spills.1.len() as f64; + let fraction = fraction.clamp(0.01, 1.0); + + let amount = (spill_amount * fraction).max(1.0) as u32; + + for _ in 0..amount { + if self.pool.spills.try_take().is_some() { + reaped += 1; + } + } + + log::debug!("Reaped {} connections", reaped); + } } pub struct SqliteTable {