diff --git a/Cargo.lock b/Cargo.lock index d297102c..df37fd58 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -293,6 +293,7 @@ dependencies = [ "opentelemetry", "opentelemetry-jaeger", "parking_lot", + "persy", "rand 0.8.4", "regex", "reqwest", @@ -374,6 +375,21 @@ dependencies = [ "libc", ] +[[package]] +name = "crc" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49fc9a695bca7f35f5f4c15cddc84415f66a74ea78eef08e90c5024f2b540e23" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccaeedb56da03b09f598226e25e80088cb4cd25f316e6e4df7d695f0feeb1403" + [[package]] name = "crc32fast" version = "1.3.0" @@ -1651,6 +1667,21 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "persy" +version = "1.2.0" +source = "git+https://gitlab.com/tglman/persy.git?branch=master#ff102d6edeaf14d30a846c2e2376a814685d09e7" +dependencies = [ + "crc", + "data-encoding", + "fs2", + "linked-hash-map", + "rand 0.8.4", + "thiserror", + "unsigned-varint", + "zigzag", +] + [[package]] name = "pin-project" version = "1.0.10" @@ -3290,6 +3321,12 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f14ee04d9415b52b3aeab06258a3f07093182b88ba0f9b8d203f211a7a7d41c7" +[[package]] +name = "unsigned-varint" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d86a8dc7f45e4c1b0d30e43038c38f274e77af056aa5f74b93c2cf9eb3c1c836" + [[package]] name = "untrusted" version = "0.7.1" @@ -3532,6 +3569,15 @@ dependencies = [ "synstructure", ] +[[package]] +name = "zigzag" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70b40401a28d86ce16a330b863b86fd7dbee4d7c940587ab09ab8c019f9e3fdf" +dependencies = [ + "num-traits", +] + [[package]] name = "zstd" version = "0.9.2+zstd.1.5.1" diff --git a/Cargo.toml b/Cargo.toml index 2dbd3fd3..7c94a693 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,9 +29,6 @@ tokio = "1.11.0" sled = { version = "0.34.6", features = ["compression", "no_metrics"], optional = true } #sled = { git = "https://github.com/spacejam/sled.git", rev = "e4640e0773595229f398438886f19bca6f7326a2", features = ["compression"] } persy = { git = "https://gitlab.com/tglman/persy.git", branch="master" , optional = true, features=["background_ops"] } -# Used by the persy write cache for background flush -timer = "0.2" -chrono = "0.4" # Used for the http request / response body type for Ruma endpoints used with reqwest bytes = "1.1.0" @@ -91,7 +88,7 @@ sha-1 = "0.9.8" [features] default = ["conduit_bin", "backend_sqlite", "backend_rocksdb"] backend_sled = ["sled"] -backend_persy = ["persy"] +backend_persy = ["persy", "parking_lot"] backend_sqlite = ["sqlite"] backend_heed = ["heed", "crossbeam"] backend_rocksdb = ["rocksdb"] diff --git a/src/database/abstraction/persy.rs b/src/database/abstraction/persy.rs index 5d633ab4..71efed3b 100644 --- a/src/database/abstraction/persy.rs +++ b/src/database/abstraction/persy.rs @@ -1,20 +1,14 @@ use crate::{ database::{ - abstraction::{DatabaseEngine, Tree}, + abstraction::{watchers::Watchers, DatabaseEngine, Tree}, Config, }, Result, }; use persy::{ByteVec, OpenOptions, Persy, Transaction, TransactionConfig, ValueMode}; -use std::{ - collections::HashMap, - future::Future, - pin::Pin, - sync::{Arc, RwLock}, -}; +use std::{future::Future, pin::Pin, sync::Arc}; -use tokio::sync::oneshot::Sender; use tracing::warn; pub struct PersyEngine { @@ -44,7 +38,7 @@ impl DatabaseEngine for PersyEngine { Ok(Arc::new(PersyTree { persy: self.persy.clone(), name: name.to_owned(), - watchers: RwLock::new(HashMap::new()), + watchers: Watchers::default(), })) } @@ -56,7 +50,7 @@ impl DatabaseEngine for PersyEngine { pub struct PersyTree { persy: Persy, name: String, - watchers: RwLock, Vec>>>, + watchers: Watchers, } impl PersyTree { @@ -81,27 +75,7 @@ impl Tree for PersyTree { #[tracing::instrument(skip(self, key, value))] fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { self.insert_batch(&mut Some((key.to_owned(), value.to_owned())).into_iter())?; - let watchers = self.watchers.read().unwrap(); - let mut triggered = Vec::new(); - - for length in 0..=key.len() { - if watchers.contains_key(&key[..length]) { - triggered.push(&key[..length]); - } - } - - drop(watchers); - - if !triggered.is_empty() { - let mut watchers = self.watchers.write().unwrap(); - for prefix in triggered { - if let Some(txs) = watchers.remove(prefix) { - for tx in txs { - let _ = tx.send(()); - } - } - } - } + self.watchers.wake(key); Ok(()) } @@ -228,18 +202,6 @@ impl Tree for PersyTree { #[tracing::instrument(skip(self, prefix))] fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { - let (tx, rx) = tokio::sync::oneshot::channel(); - - self.watchers - .write() - .unwrap() - .entry(prefix.to_vec()) - .or_default() - .push(tx); - - Box::pin(async move { - // Tx is never destroyed - rx.await.unwrap(); - }) + self.watchers.watch(prefix) } }