diff --git a/Cargo.toml b/Cargo.toml index 02159e31..ceae6ae9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,7 +87,7 @@ sha-1 = "0.9.8" default = ["conduit_bin", "backend_sqlite"] backend_sled = ["sled"] backend_sqlite = ["sqlite"] -backend_heed = ["heed", "crossbeam"] +backend_heed = ["heed", "crossbeam", "parking_lot"] sqlite = ["rusqlite", "parking_lot", "crossbeam", "tokio/signal"] conduit_bin = [] # TODO: add rocket to this when it is optional diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index 11bbc3b1..67b80d1a 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -12,6 +12,9 @@ pub mod sqlite; #[cfg(feature = "heed")] pub mod heed; +#[cfg(any(feature = "sqlite", feature = "heed"))] +pub mod watchers; + pub trait DatabaseEngine: Sized { fn open(config: &Config) -> Result>; fn open_tree(self: &Arc, name: &'static str) -> Result>; diff --git a/src/database/abstraction/heed.rs b/src/database/abstraction/heed.rs index e767e22b..83dafc57 100644 --- a/src/database/abstraction/heed.rs +++ b/src/database/abstraction/heed.rs @@ -1,15 +1,13 @@ -use super::super::Config; +use super::{super::Config, watchers::Watchers}; use crossbeam::channel::{bounded, Sender as ChannelSender}; use threadpool::ThreadPool; use crate::{Error, Result}; use std::{ - collections::HashMap, future::Future, pin::Pin, - sync::{Arc, Mutex, RwLock}, + sync::{Arc, Mutex}, }; -use tokio::sync::oneshot::Sender; use super::{DatabaseEngine, Tree}; @@ -23,7 +21,7 @@ pub struct Engine { pub struct EngineTree { engine: Arc, tree: Arc, - watchers: RwLock, Vec>>>, + watchers: Watchers, } fn convert_error(error: heed::Error) -> Error { @@ -60,7 +58,7 @@ impl DatabaseEngine for Engine { .create_database(Some(name)) .map_err(convert_error)?, ), - watchers: RwLock::new(HashMap::new()), + watchers: Default::default(), })) } @@ -145,29 +143,7 @@ impl Tree for EngineTree { .put(&mut txn, &key, &value) .map_err(convert_error)?; txn.commit().map_err(convert_error)?; - - let watchers = self.watchers.read().unwrap(); - let mut triggered = Vec::new(); - - for length in 0..=key.len() { - if watchers.contains_key(&key[..length]) { - triggered.push(&key[..length]); - } - } - - drop(watchers); - - if !triggered.is_empty() { - let mut watchers = self.watchers.write().unwrap(); - for prefix in triggered { - if let Some(txs) = watchers.remove(prefix) { - for tx in txs { - let _ = tx.send(()); - } - } - } - }; - + self.watchers.wake(key); Ok(()) } @@ -223,18 +199,6 @@ impl Tree for EngineTree { #[tracing::instrument(skip(self, prefix))] fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { - let (tx, rx) = tokio::sync::oneshot::channel(); - - self.watchers - .write() - .unwrap() - .entry(prefix.to_vec()) - .or_default() - .push(tx); - - Box::pin(async move { - // Tx is never destroyed - rx.await.unwrap(); - }) + self.watchers.watch(prefix) } } diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index 1d2038c5..1e6a2d89 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -1,17 +1,15 @@ -use super::{DatabaseEngine, Tree}; +use super::{watchers::Watchers, DatabaseEngine, Tree}; use crate::{database::Config, Result}; -use parking_lot::{Mutex, MutexGuard, RwLock}; +use parking_lot::{Mutex, MutexGuard}; use rusqlite::{Connection, DatabaseName::Main, OptionalExtension}; use std::{ cell::RefCell, - collections::{hash_map, HashMap}, future::Future, path::{Path, PathBuf}, pin::Pin, sync::Arc, }; use thread_local::ThreadLocal; -use tokio::sync::watch; use tracing::debug; thread_local! { @@ -113,7 +111,7 @@ impl DatabaseEngine for Engine { Ok(Arc::new(SqliteTable { engine: Arc::clone(self), name: name.to_owned(), - watchers: RwLock::new(HashMap::new()), + watchers: Watchers::default(), })) } @@ -126,7 +124,7 @@ impl DatabaseEngine for Engine { pub struct SqliteTable { engine: Arc, name: String, - watchers: RwLock, (watch::Sender<()>, watch::Receiver<()>)>>, + watchers: Watchers, } type TupleOfBytes = (Vec, Vec); @@ -200,27 +198,7 @@ impl Tree for SqliteTable { let guard = self.engine.write_lock(); self.insert_with_guard(&guard, key, value)?; drop(guard); - - let watchers = self.watchers.read(); - let mut triggered = Vec::new(); - - for length in 0..=key.len() { - if watchers.contains_key(&key[..length]) { - triggered.push(&key[..length]); - } - } - - drop(watchers); - - if !triggered.is_empty() { - let mut watchers = self.watchers.write(); - for prefix in triggered { - if let Some(tx) = watchers.remove(prefix) { - let _ = tx.0.send(()); - } - } - }; - + self.watchers.wake(key); Ok(()) } @@ -365,19 +343,7 @@ impl Tree for SqliteTable { #[tracing::instrument(skip(self, prefix))] fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { - let mut rx = match self.watchers.write().entry(prefix.to_vec()) { - hash_map::Entry::Occupied(o) => o.get().1.clone(), - hash_map::Entry::Vacant(v) => { - let (tx, rx) = tokio::sync::watch::channel(()); - v.insert((tx, rx.clone())); - rx - } - }; - - Box::pin(async move { - // Tx is never destroyed - rx.changed().await.unwrap(); - }) + self.watchers.watch(prefix) } #[tracing::instrument(skip(self))] diff --git a/src/database/abstraction/watchers.rs b/src/database/abstraction/watchers.rs new file mode 100644 index 00000000..404f3f06 --- /dev/null +++ b/src/database/abstraction/watchers.rs @@ -0,0 +1,54 @@ +use parking_lot::RwLock; +use std::{ + collections::{hash_map, HashMap}, + future::Future, + pin::Pin, +}; +use tokio::sync::watch; + +#[derive(Default)] +pub(super) struct Watchers { + watchers: RwLock, (watch::Sender<()>, watch::Receiver<()>)>>, +} + +impl Watchers { + pub(super) fn watch<'a>( + &'a self, + prefix: &[u8], + ) -> Pin + Send + 'a>> { + let mut rx = match self.watchers.write().entry(prefix.to_vec()) { + hash_map::Entry::Occupied(o) => o.get().1.clone(), + hash_map::Entry::Vacant(v) => { + let (tx, rx) = tokio::sync::watch::channel(()); + v.insert((tx, rx.clone())); + rx + } + }; + + Box::pin(async move { + // Tx is never destroyed + rx.changed().await.unwrap(); + }) + } + pub(super) fn wake(&self, key: &[u8]) { + let watchers = self.watchers.read(); + let mut triggered = Vec::new(); + + for length in 0..=key.len() { + if watchers.contains_key(&key[..length]) { + triggered.push(&key[..length]); + } + } + + drop(watchers); + + if !triggered.is_empty() { + let mut watchers = self.watchers.write(); + for prefix in triggered { + if let Some(tx) = watchers.remove(prefix) { + let _ = tx.0.send(()); + } + } + }; + } +}