From 0f2dc9a239d8dd031ce84193b7a00544e87f8d34 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Mon, 19 Jul 2021 15:56:20 +0200 Subject: [PATCH 1/6] add stuff and bits --- src/database.rs | 46 +++++++++- src/database/abstraction/sqlite.rs | 134 ++++++++++++++++++++++------- 2 files changed, 148 insertions(+), 32 deletions(-) diff --git a/src/database.rs b/src/database.rs index 2d7886e6..82560db1 100644 --- a/src/database.rs +++ b/src/database.rs @@ -53,6 +53,10 @@ pub struct Config { sqlite_wal_clean_second_interval: u32, #[serde(default = "default_sqlite_wal_clean_second_timeout")] sqlite_wal_clean_second_timeout: u32, + #[serde(default = "default_sqlite_spillover_reap_chunk")] + sqlite_spillover_reap_chunk: u32, + #[serde(default = "default_sqlite_spillover_reap_interval_secs")] + sqlite_spillover_reap_interval_secs: u32, #[serde(default = "default_max_request_size")] max_request_size: u32, #[serde(default = "default_max_concurrent_requests")] @@ -121,6 +125,14 @@ fn default_sqlite_wal_clean_second_timeout() -> u32 { 2 } +fn default_sqlite_spillover_reap_chunk() -> u32 { + 5 +} + +fn default_sqlite_spillover_reap_interval_secs() -> u32 { + 10 +} + fn default_max_request_size() -> u32 { 20 * 1024 * 1024 // Default to 20 MB } @@ -420,7 +432,10 @@ impl Database { drop(guard); #[cfg(feature = "sqlite")] - Self::start_wal_clean_task(&db, &config).await; + { + Self::start_wal_clean_task(&db, &config).await; + Self::start_spillover_reap_task(builder, &config).await; + } Ok(db) } @@ -541,6 +556,35 @@ impl Database { self._db.flush_wal() } + #[cfg(feature = "sqlite")] + pub async fn start_spillover_reap_task(engine: Arc, config: &Config) { + let chunk_size = match config.sqlite_spillover_reap_chunk { + 0 => None, // zero means no chunking, reap everything + a @ _ => Some(a), + }; + let interval_secs = config.sqlite_spillover_reap_interval_secs as u64; + + let weak = Arc::downgrade(&engine); + + tokio::spawn(async move { + use tokio::time::interval; + + use std::{sync::Weak, time::Duration}; + + let mut i = interval(Duration::from_secs(interval_secs)); + + loop { + i.tick().await; + + if let Some(arc) = Weak::upgrade(&weak) { + arc.reap_spillover(chunk_size); + } else { + break; + } + } + }); + } + #[cfg(feature = "sqlite")] pub async fn start_wal_clean_task(lock: &Arc>, config: &Config) { use tokio::time::{interval, timeout}; diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index 25d236a9..ac92a45f 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -1,3 +1,11 @@ +use super::{DatabaseEngine, Tree}; +use crate::{database::Config, Result}; +use crossbeam::channel::{ + bounded, unbounded, Receiver as ChannelReceiver, Sender as ChannelSender, TryRecvError, +}; +use log::debug; +use parking_lot::{Mutex, MutexGuard, RwLock}; +use rusqlite::{params, Connection, DatabaseName::Main, OptionalExtension}; use std::{ collections::BTreeMap, future::Future, @@ -8,33 +16,12 @@ use std::{ thread, time::{Duration, Instant}, }; - -use crate::{database::Config, Result}; - -use super::{DatabaseEngine, Tree}; - -use log::debug; - -use crossbeam::channel::{bounded, Sender as ChannelSender}; -use parking_lot::{Mutex, MutexGuard, RwLock}; -use rusqlite::{params, Connection, DatabaseName::Main, OptionalExtension}; - use tokio::sync::oneshot::Sender; -// const SQL_CREATE_TABLE: &str = -// "CREATE TABLE IF NOT EXISTS {} {{ \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL }}"; -// const SQL_SELECT: &str = "SELECT value FROM {} WHERE key = ?"; -// const SQL_INSERT: &str = "INSERT OR REPLACE INTO {} (key, value) VALUES (?, ?)"; -// const SQL_DELETE: &str = "DELETE FROM {} WHERE key = ?"; -// const SQL_SELECT_ITER: &str = "SELECT key, value FROM {}"; -// const SQL_SELECT_PREFIX: &str = "SELECT key, value FROM {} WHERE key LIKE ?||'%' ORDER BY key ASC"; -// const SQL_SELECT_ITER_FROM_FORWARDS: &str = "SELECT key, value FROM {} WHERE key >= ? ORDER BY ASC"; -// const SQL_SELECT_ITER_FROM_BACKWARDS: &str = -// "SELECT key, value FROM {} WHERE key <= ? ORDER BY DESC"; - struct Pool { writer: Mutex, readers: Vec>, + spills: ConnectionRecycler, spill_tracker: Arc<()>, path: PathBuf, } @@ -43,7 +30,7 @@ pub const MILLI: Duration = Duration::from_millis(1); enum HoldingConn<'a> { FromGuard(MutexGuard<'a, Connection>), - FromOwned(Connection, Arc<()>), + FromRecycled(RecycledConn, Arc<()>), } impl<'a> Deref for HoldingConn<'a> { @@ -52,7 +39,57 @@ impl<'a> Deref for HoldingConn<'a> { fn deref(&self) -> &Self::Target { match self { HoldingConn::FromGuard(guard) => guard.deref(), - HoldingConn::FromOwned(conn, _) => conn, + HoldingConn::FromRecycled(conn, _) => conn.deref(), + } + } +} + +struct ConnectionRecycler(ChannelSender, ChannelReceiver); + +impl ConnectionRecycler { + fn new() -> Self { + let (s, r) = unbounded(); + Self(s, r) + } + + fn recycle(&self, conn: Connection) -> RecycledConn { + let sender = self.0.clone(); + + RecycledConn(Some(conn), sender) + } + + fn try_take(&self) -> Option { + match self.1.try_recv() { + Ok(conn) => Some(conn), + Err(TryRecvError::Empty) => None, + // as this is pretty impossible, a panic is warranted if it ever occurs + Err(TryRecvError::Disconnected) => panic!("Receiving channel was disconnected. A a sender is owned by the current struct, this should never happen(!!!)") + } + } +} + +struct RecycledConn( + Option, // To allow moving out of the struct when `Drop` is called. + ChannelSender, +); + +impl Deref for RecycledConn { + type Target = Connection; + + fn deref(&self) -> &Self::Target { + self.0 + .as_ref() + .expect("RecycledConn does not have a connection in Option<>") + } +} + +impl Drop for RecycledConn { + fn drop(&mut self) { + if let Some(conn) = self.0.take() { + log::debug!("Recycled connection"); + if let Err(e) = self.1.send(conn) { + log::warn!("Recycling a connection led to the following error: {:?}", e) + } } } } @@ -76,6 +113,7 @@ impl Pool { Ok(Self { writer, readers, + spills: ConnectionRecycler::new(), spill_tracker: Arc::new(()), path: path.as_ref().to_path_buf(), }) @@ -104,24 +142,38 @@ impl Pool { } fn read_lock(&self) -> HoldingConn<'_> { + // First try to get a connection from the permanent pool for r in &self.readers { if let Some(reader) = r.try_lock() { return HoldingConn::FromGuard(reader); } } - let spill_arc = self.spill_tracker.clone(); + // We didn't get a connection from the permanent pool, so we'll dumpster-dive for recycled connections. + // Either we have a connection or we dont, if we don't, we make a new one. + let conn = match self.spills.try_take() { + Some(conn) => conn, + None => Self::prepare_conn(&self.path, None).unwrap(), + }; + + // Clone the spill Arc to mark how many spilled connections actually exist. + let spill_arc = Arc::clone(&self.spill_tracker); + + // Get a sense of how many connections exist now. let now_count = Arc::strong_count(&spill_arc) - 1 /* because one is held by the pool */; - log::warn!("read_lock: all readers locked, creating spillover reader..."); + log::debug!("read_lock: all readers locked, creating spillover reader..."); - if now_count > 1 { - log::warn!("read_lock: now {} spillover readers exist", now_count); + // If the spillover readers are more than the number of total readers, there might be a problem. + if now_count > self.readers.len() { + log::warn!( + "read_lock: possible high load; now {} spillover readers exist", + now_count + ); } - let spilled = Self::prepare_conn(&self.path, None).unwrap(); - - HoldingConn::FromOwned(spilled, spill_arc) + // Return the recyclable connection. + HoldingConn::FromRecycled(self.spills.recycle(conn), spill_arc) } } @@ -188,6 +240,26 @@ impl Engine { ) .map_err(Into::into) } + + // Reaps (at most) X amount of connections if `amount` is Some. + // If none, reaps all currently idle connections. + pub fn reap_spillover(&self, amount: Option) { + let mut reaped = 0; + + if let Some(amount) = amount { + for _ in 0..amount { + if self.pool.spills.try_take().is_some() { + reaped += 1; + } + } + } else { + while let Some(_) = self.pool.spills.try_take() { + reaped += 1; + } + } + + log::debug!("Reaped {} connections", reaped); + } } pub struct SqliteTable { From 7e579f8d346d04bdcde32917cf296a9a7ef4f582 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Mon, 19 Jul 2021 16:25:41 +0200 Subject: [PATCH 2/6] change to fraction-based approach --- src/database.rs | 19 +++++++++---------- src/database/abstraction/sqlite.rs | 18 +++++++----------- 2 files changed, 16 insertions(+), 21 deletions(-) diff --git a/src/database.rs b/src/database.rs index 82560db1..f7c3d9d3 100644 --- a/src/database.rs +++ b/src/database.rs @@ -53,8 +53,8 @@ pub struct Config { sqlite_wal_clean_second_interval: u32, #[serde(default = "default_sqlite_wal_clean_second_timeout")] sqlite_wal_clean_second_timeout: u32, - #[serde(default = "default_sqlite_spillover_reap_chunk")] - sqlite_spillover_reap_chunk: u32, + #[serde(default = "default_sqlite_spillover_reap_fraction")] + sqlite_spillover_reap_fraction: u32, #[serde(default = "default_sqlite_spillover_reap_interval_secs")] sqlite_spillover_reap_interval_secs: u32, #[serde(default = "default_max_request_size")] @@ -125,12 +125,12 @@ fn default_sqlite_wal_clean_second_timeout() -> u32 { 2 } -fn default_sqlite_spillover_reap_chunk() -> u32 { - 5 +fn default_sqlite_spillover_reap_fraction() -> u32 { + 2 } fn default_sqlite_spillover_reap_interval_secs() -> u32 { - 10 + 60 } fn default_max_request_size() -> u32 { @@ -558,10 +558,9 @@ impl Database { #[cfg(feature = "sqlite")] pub async fn start_spillover_reap_task(engine: Arc, config: &Config) { - let chunk_size = match config.sqlite_spillover_reap_chunk { - 0 => None, // zero means no chunking, reap everything - a @ _ => Some(a), - }; + use std::convert::TryInto; + + let fraction_factor = config.sqlite_spillover_reap_fraction.max(1).try_into().unwrap(/* We just converted it to be at least 1 */); let interval_secs = config.sqlite_spillover_reap_interval_secs as u64; let weak = Arc::downgrade(&engine); @@ -577,7 +576,7 @@ impl Database { i.tick().await; if let Some(arc) = Weak::upgrade(&weak) { - arc.reap_spillover(chunk_size); + arc.reap_spillover_by_fraction(fraction_factor); } else { break; } diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index ac92a45f..e4acdbb5 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -9,6 +9,7 @@ use rusqlite::{params, Connection, DatabaseName::Main, OptionalExtension}; use std::{ collections::BTreeMap, future::Future, + num::NonZeroU32, ops::Deref, path::{Path, PathBuf}, pin::Pin, @@ -241,19 +242,14 @@ impl Engine { .map_err(Into::into) } - // Reaps (at most) X amount of connections if `amount` is Some. - // If none, reaps all currently idle connections. - pub fn reap_spillover(&self, amount: Option) { + // Reaps (at most) (.len() / `fraction`) (rounded down, min 1) connections. + pub fn reap_spillover_by_fraction(&self, fraction: NonZeroU32) { let mut reaped = 0; - if let Some(amount) = amount { - for _ in 0..amount { - if self.pool.spills.try_take().is_some() { - reaped += 1; - } - } - } else { - while let Some(_) = self.pool.spills.try_take() { + let amount = ((self.pool.spills.1.len() as u32) / fraction).max(1); + + for _ in 0..amount { + if self.pool.spills.try_take().is_some() { reaped += 1; } } From 79bf7fc597aa71419ffcf74f863225b7d12d82a6 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Mon, 19 Jul 2021 16:46:59 +0200 Subject: [PATCH 3/6] some logging shuffling --- src/database/abstraction/sqlite.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index e4acdbb5..14306e19 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -150,11 +150,16 @@ impl Pool { } } + log::debug!("read_lock: All permanent readers locked, obtaining spillover reader..."); + // We didn't get a connection from the permanent pool, so we'll dumpster-dive for recycled connections. // Either we have a connection or we dont, if we don't, we make a new one. let conn = match self.spills.try_take() { Some(conn) => conn, - None => Self::prepare_conn(&self.path, None).unwrap(), + None => { + log::debug!("read_lock: No recycled connections left, creating new one..."); + Self::prepare_conn(&self.path, None).unwrap() + } }; // Clone the spill Arc to mark how many spilled connections actually exist. @@ -163,8 +168,6 @@ impl Pool { // Get a sense of how many connections exist now. let now_count = Arc::strong_count(&spill_arc) - 1 /* because one is held by the pool */; - log::debug!("read_lock: all readers locked, creating spillover reader..."); - // If the spillover readers are more than the number of total readers, there might be a problem. if now_count > self.readers.len() { log::warn!( From e7a51c07d0612b257b631babc0967923ccc3469c Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Mon, 19 Jul 2021 17:17:10 +0200 Subject: [PATCH 4/6] log change feedback --- src/database/abstraction/sqlite.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index 14306e19..445093a0 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -171,7 +171,7 @@ impl Pool { // If the spillover readers are more than the number of total readers, there might be a problem. if now_count > self.readers.len() { log::warn!( - "read_lock: possible high load; now {} spillover readers exist", + "Database is under high load. Consider increasing sqlite_read_pool_size ({} spillover readers exist)", now_count ); } From ec44f3d568fd0093c7cfab821d7a43eb01af4843 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Tue, 20 Jul 2021 10:47:36 +0200 Subject: [PATCH 5/6] change to f64 --- src/database.rs | 10 ++++------ src/database/abstraction/sqlite.rs | 8 +++++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/database.rs b/src/database.rs index f7c3d9d3..34bce734 100644 --- a/src/database.rs +++ b/src/database.rs @@ -54,7 +54,7 @@ pub struct Config { #[serde(default = "default_sqlite_wal_clean_second_timeout")] sqlite_wal_clean_second_timeout: u32, #[serde(default = "default_sqlite_spillover_reap_fraction")] - sqlite_spillover_reap_fraction: u32, + sqlite_spillover_reap_fraction: f64, #[serde(default = "default_sqlite_spillover_reap_interval_secs")] sqlite_spillover_reap_interval_secs: u32, #[serde(default = "default_max_request_size")] @@ -125,8 +125,8 @@ fn default_sqlite_wal_clean_second_timeout() -> u32 { 2 } -fn default_sqlite_spillover_reap_fraction() -> u32 { - 2 +fn default_sqlite_spillover_reap_fraction() -> f64 { + 2.0 } fn default_sqlite_spillover_reap_interval_secs() -> u32 { @@ -558,9 +558,7 @@ impl Database { #[cfg(feature = "sqlite")] pub async fn start_spillover_reap_task(engine: Arc, config: &Config) { - use std::convert::TryInto; - - let fraction_factor = config.sqlite_spillover_reap_fraction.max(1).try_into().unwrap(/* We just converted it to be at least 1 */); + let fraction_factor = config.sqlite_spillover_reap_fraction.max(1.0); let interval_secs = config.sqlite_spillover_reap_interval_secs as u64; let weak = Arc::downgrade(&engine); diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index 445093a0..f7c178fc 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -9,7 +9,6 @@ use rusqlite::{params, Connection, DatabaseName::Main, OptionalExtension}; use std::{ collections::BTreeMap, future::Future, - num::NonZeroU32, ops::Deref, path::{Path, PathBuf}, pin::Pin, @@ -246,10 +245,13 @@ impl Engine { } // Reaps (at most) (.len() / `fraction`) (rounded down, min 1) connections. - pub fn reap_spillover_by_fraction(&self, fraction: NonZeroU32) { + pub fn reap_spillover_by_fraction(&self, fraction: f64) { let mut reaped = 0; - let amount = ((self.pool.spills.1.len() as u32) / fraction).max(1); + let spill_amount = self.pool.spills.1.len() as f64; + let fraction = fraction.max(1.0 /* Can never be too sure */); + + let amount = (spill_amount / fraction).max(1.0) as u32; for _ in 0..amount { if self.pool.spills.try_take().is_some() { From d253f9236a87f0571c8561290cebda0437885a62 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Tue, 20 Jul 2021 11:01:35 +0200 Subject: [PATCH 6/6] change fraction type --- src/database.rs | 6 +++--- src/database/abstraction/sqlite.rs | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/database.rs b/src/database.rs index 34bce734..9452e638 100644 --- a/src/database.rs +++ b/src/database.rs @@ -126,7 +126,7 @@ fn default_sqlite_wal_clean_second_timeout() -> u32 { } fn default_sqlite_spillover_reap_fraction() -> f64 { - 2.0 + 0.5 } fn default_sqlite_spillover_reap_interval_secs() -> u32 { @@ -558,7 +558,7 @@ impl Database { #[cfg(feature = "sqlite")] pub async fn start_spillover_reap_task(engine: Arc, config: &Config) { - let fraction_factor = config.sqlite_spillover_reap_fraction.max(1.0); + let fraction = config.sqlite_spillover_reap_fraction.clamp(0.01, 1.0); let interval_secs = config.sqlite_spillover_reap_interval_secs as u64; let weak = Arc::downgrade(&engine); @@ -574,7 +574,7 @@ impl Database { i.tick().await; if let Some(arc) = Weak::upgrade(&weak) { - arc.reap_spillover_by_fraction(fraction_factor); + arc.reap_spillover_by_fraction(fraction); } else { break; } diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index f7c178fc..8100ed91 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -244,14 +244,14 @@ impl Engine { .map_err(Into::into) } - // Reaps (at most) (.len() / `fraction`) (rounded down, min 1) connections. + // Reaps (at most) (.len() * `fraction`) (rounded down, min 1) connections. pub fn reap_spillover_by_fraction(&self, fraction: f64) { let mut reaped = 0; let spill_amount = self.pool.spills.1.len() as f64; - let fraction = fraction.max(1.0 /* Can never be too sure */); + let fraction = fraction.clamp(0.01, 1.0); - let amount = (spill_amount / fraction).max(1.0) as u32; + let amount = (spill_amount * fraction).max(1.0) as u32; for _ in 0..amount { if self.pool.spills.try_take().is_some() {