From ab15ec6c32f2e5463369fe7a29f5ea2e6d9c4f2d Mon Sep 17 00:00:00 2001 From: Tglman Date: Fri, 18 Jun 2021 00:38:32 +0100 Subject: [PATCH 1/4] feat: Integration with persy using background ops --- Cargo.toml | 5 + src/database.rs | 6 + src/database/abstraction.rs | 5 +- src/database/abstraction/persy.rs | 245 ++++++++++++++++++++++++++++++ src/error.rs | 15 ++ 5 files changed, 275 insertions(+), 1 deletion(-) create mode 100644 src/database/abstraction/persy.rs diff --git a/Cargo.toml b/Cargo.toml index c87d949c..2dbd3fd3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,10 @@ tokio = "1.11.0" # Used for storing data permanently sled = { version = "0.34.6", features = ["compression", "no_metrics"], optional = true } #sled = { git = "https://github.com/spacejam/sled.git", rev = "e4640e0773595229f398438886f19bca6f7326a2", features = ["compression"] } +persy = { git = "https://gitlab.com/tglman/persy.git", branch="master" , optional = true, features=["background_ops"] } +# Used by the persy write cache for background flush +timer = "0.2" +chrono = "0.4" # Used for the http request / response body type for Ruma endpoints used with reqwest bytes = "1.1.0" @@ -87,6 +91,7 @@ sha-1 = "0.9.8" [features] default = ["conduit_bin", "backend_sqlite", "backend_rocksdb"] backend_sled = ["sled"] +backend_persy = ["persy"] backend_sqlite = ["sqlite"] backend_heed = ["heed", "crossbeam"] backend_rocksdb = ["rocksdb"] diff --git a/src/database.rs b/src/database.rs index d688ff9f..c2cd9f29 100644 --- a/src/database.rs +++ b/src/database.rs @@ -255,6 +255,12 @@ impl Database { #[cfg(feature = "rocksdb")] Arc::new(Arc::::open(config)?) } + "persy" => { + #[cfg(not(feature = "persy"))] + return Err(Error::BadConfig("Database backend not found.")); + #[cfg(feature = "persy")] + Arc::new(Arc::::open(config)?) + } _ => { return Err(Error::BadConfig("Database backend not found.")); } diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index 17bd971f..9a3771f3 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -15,7 +15,10 @@ pub mod heed; #[cfg(feature = "rocksdb")] pub mod rocksdb; -#[cfg(any(feature = "sqlite", feature = "rocksdb", feature = "heed"))] +#[cfg(feature = "persy")] +pub mod persy; + +#[cfg(any(feature = "sqlite", feature = "rocksdb", feature = "heed", feature="persy"))] pub mod watchers; pub trait DatabaseEngine: Send + Sync { diff --git a/src/database/abstraction/persy.rs b/src/database/abstraction/persy.rs new file mode 100644 index 00000000..5d633ab4 --- /dev/null +++ b/src/database/abstraction/persy.rs @@ -0,0 +1,245 @@ +use crate::{ + database::{ + abstraction::{DatabaseEngine, Tree}, + Config, + }, + Result, +}; +use persy::{ByteVec, OpenOptions, Persy, Transaction, TransactionConfig, ValueMode}; + +use std::{ + collections::HashMap, + future::Future, + pin::Pin, + sync::{Arc, RwLock}, +}; + +use tokio::sync::oneshot::Sender; +use tracing::warn; + +pub struct PersyEngine { + persy: Persy, +} + +impl DatabaseEngine for PersyEngine { + fn open(config: &Config) -> Result> { + let mut cfg = persy::Config::new(); + cfg.change_cache_size((config.db_cache_capacity_mb * 1024.0 * 1024.0) as u64); + + let persy = OpenOptions::new() + .create(true) + .config(cfg) + .open(&format!("{}/db.persy", config.database_path))?; + Ok(Arc::new(PersyEngine { persy })) + } + + fn open_tree(self: &Arc, name: &'static str) -> Result> { + // Create if it doesn't exist + if !self.persy.exists_index(name)? { + let mut tx = self.persy.begin()?; + tx.create_index::(name, ValueMode::Replace)?; + tx.prepare()?.commit()?; + } + + Ok(Arc::new(PersyTree { + persy: self.persy.clone(), + name: name.to_owned(), + watchers: RwLock::new(HashMap::new()), + })) + } + + fn flush(self: &Arc) -> Result<()> { + Ok(()) + } +} + +pub struct PersyTree { + persy: Persy, + name: String, + watchers: RwLock, Vec>>>, +} + +impl PersyTree { + fn begin(&self) -> Result { + Ok(self + .persy + .begin_with(TransactionConfig::new().set_background_sync(true))?) + } +} + +impl Tree for PersyTree { + #[tracing::instrument(skip(self, key))] + fn get(&self, key: &[u8]) -> Result>> { + let result = self + .persy + .get::(&self.name, &ByteVec::from(key))? + .next() + .map(|v| (*v).to_owned()); + Ok(result) + } + + #[tracing::instrument(skip(self, key, value))] + fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { + self.insert_batch(&mut Some((key.to_owned(), value.to_owned())).into_iter())?; + let watchers = self.watchers.read().unwrap(); + let mut triggered = Vec::new(); + + for length in 0..=key.len() { + if watchers.contains_key(&key[..length]) { + triggered.push(&key[..length]); + } + } + + drop(watchers); + + if !triggered.is_empty() { + let mut watchers = self.watchers.write().unwrap(); + for prefix in triggered { + if let Some(txs) = watchers.remove(prefix) { + for tx in txs { + let _ = tx.send(()); + } + } + } + } + Ok(()) + } + + #[tracing::instrument(skip(self, iter))] + fn insert_batch<'a>(&self, iter: &mut dyn Iterator, Vec)>) -> Result<()> { + let mut tx = self.begin()?; + for (key, value) in iter { + tx.put::( + &self.name, + ByteVec::from(key.clone()), + ByteVec::from(value), + )?; + } + tx.prepare()?.commit()?; + Ok(()) + } + + #[tracing::instrument(skip(self, iter))] + fn increment_batch<'a>(&self, iter: &mut dyn Iterator>) -> Result<()> { + let mut tx = self.begin()?; + for key in iter { + let old = tx + .get::(&self.name, &ByteVec::from(key.clone()))? + .next() + .map(|v| (*v).to_owned()); + let new = crate::utils::increment(old.as_deref()).unwrap(); + tx.put::(&self.name, ByteVec::from(key), ByteVec::from(new))?; + } + tx.prepare()?.commit()?; + Ok(()) + } + + #[tracing::instrument(skip(self, key))] + fn remove(&self, key: &[u8]) -> Result<()> { + let mut tx = self.begin()?; + tx.remove::(&self.name, ByteVec::from(key), None)?; + tx.prepare()?.commit()?; + Ok(()) + } + + #[tracing::instrument(skip(self))] + fn iter<'a>(&'a self) -> Box, Vec)> + 'a> { + let iter = self.persy.range::(&self.name, ..); + match iter { + Ok(iter) => Box::new(iter.filter_map(|(k, v)| { + v.into_iter() + .map(|val| ((*k).to_owned().into(), (*val).to_owned().into())) + .next() + })), + Err(e) => { + warn!("error iterating {:?}", e); + Box::new(std::iter::empty()) + } + } + } + + #[tracing::instrument(skip(self, from, backwards))] + fn iter_from<'a>( + &'a self, + from: &[u8], + backwards: bool, + ) -> Box, Vec)> + 'a> { + let range = if backwards { + self.persy + .range::(&self.name, ..=ByteVec::from(from)) + } else { + self.persy + .range::(&self.name, ByteVec::from(from)..) + }; + match range { + Ok(iter) => { + let map = iter.filter_map(|(k, v)| { + v.into_iter() + .map(|val| ((*k).to_owned().into(), (*val).to_owned().into())) + .next() + }); + if backwards { + Box::new(map.rev()) + } else { + Box::new(map) + } + } + Err(e) => { + warn!("error iterating with prefix {:?}", e); + Box::new(std::iter::empty()) + } + } + } + + #[tracing::instrument(skip(self, key))] + fn increment(&self, key: &[u8]) -> Result> { + self.increment_batch(&mut Some(key.to_owned()).into_iter())?; + Ok(self.get(key)?.unwrap()) + } + + #[tracing::instrument(skip(self, prefix))] + fn scan_prefix<'a>( + &'a self, + prefix: Vec, + ) -> Box, Vec)> + 'a> { + let range_prefix = ByteVec::from(prefix.clone()); + let range = self + .persy + .range::(&self.name, range_prefix..); + + match range { + Ok(iter) => { + let owned_prefix = prefix.clone(); + Box::new( + iter.take_while(move |(k, _)| (*k).starts_with(&owned_prefix)) + .filter_map(|(k, v)| { + v.into_iter() + .map(|val| ((*k).to_owned().into(), (*val).to_owned().into())) + .next() + }), + ) + } + Err(e) => { + warn!("error scanning prefix {:?}", e); + Box::new(std::iter::empty()) + } + } + } + + #[tracing::instrument(skip(self, prefix))] + fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { + let (tx, rx) = tokio::sync::oneshot::channel(); + + self.watchers + .write() + .unwrap() + .entry(prefix.to_vec()) + .or_default() + .push(tx); + + Box::pin(async move { + // Tx is never destroyed + rx.await.unwrap(); + }) + } +} diff --git a/src/error.rs b/src/error.rs index 4d427da4..5ffe48c9 100644 --- a/src/error.rs +++ b/src/error.rs @@ -8,6 +8,9 @@ use ruma::{ use thiserror::Error; use tracing::warn; +#[cfg(feature = "persy")] +use persy::PersyError; + #[cfg(feature = "conduit_bin")] use { crate::RumaResponse, @@ -36,6 +39,9 @@ pub enum Error { #[from] source: rusqlite::Error, }, + #[cfg(feature = "persy")] + #[error("There was a problem with the connection to the persy database.")] + PersyError { source: PersyError }, #[cfg(feature = "heed")] #[error("There was a problem with the connection to the heed database: {error}")] HeedError { error: String }, @@ -142,3 +148,12 @@ where self.to_response().respond_to(r) } } + +#[cfg(feature = "persy")] +impl> From> for Error { + fn from(err: persy::PE) -> Self { + Error::PersyError { + source: err.error().into(), + } + } +} From 1cc41937bd9ae7679aa48c10074ac1041d3a94b5 Mon Sep 17 00:00:00 2001 From: Tglman Date: Thu, 23 Dec 2021 22:59:17 +0000 Subject: [PATCH 2/4] refactor:use generic watcher in persy implementation --- Cargo.lock | 46 ++++++++++++++++++++++++++++ Cargo.toml | 5 +--- src/database/abstraction/persy.rs | 50 ++++--------------------------- 3 files changed, 53 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d297102c..df37fd58 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -293,6 +293,7 @@ dependencies = [ "opentelemetry", "opentelemetry-jaeger", "parking_lot", + "persy", "rand 0.8.4", "regex", "reqwest", @@ -374,6 +375,21 @@ dependencies = [ "libc", ] +[[package]] +name = "crc" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49fc9a695bca7f35f5f4c15cddc84415f66a74ea78eef08e90c5024f2b540e23" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccaeedb56da03b09f598226e25e80088cb4cd25f316e6e4df7d695f0feeb1403" + [[package]] name = "crc32fast" version = "1.3.0" @@ -1651,6 +1667,21 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "persy" +version = "1.2.0" +source = "git+https://gitlab.com/tglman/persy.git?branch=master#ff102d6edeaf14d30a846c2e2376a814685d09e7" +dependencies = [ + "crc", + "data-encoding", + "fs2", + "linked-hash-map", + "rand 0.8.4", + "thiserror", + "unsigned-varint", + "zigzag", +] + [[package]] name = "pin-project" version = "1.0.10" @@ -3290,6 +3321,12 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f14ee04d9415b52b3aeab06258a3f07093182b88ba0f9b8d203f211a7a7d41c7" +[[package]] +name = "unsigned-varint" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d86a8dc7f45e4c1b0d30e43038c38f274e77af056aa5f74b93c2cf9eb3c1c836" + [[package]] name = "untrusted" version = "0.7.1" @@ -3532,6 +3569,15 @@ dependencies = [ "synstructure", ] +[[package]] +name = "zigzag" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70b40401a28d86ce16a330b863b86fd7dbee4d7c940587ab09ab8c019f9e3fdf" +dependencies = [ + "num-traits", +] + [[package]] name = "zstd" version = "0.9.2+zstd.1.5.1" diff --git a/Cargo.toml b/Cargo.toml index 2dbd3fd3..7c94a693 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,9 +29,6 @@ tokio = "1.11.0" sled = { version = "0.34.6", features = ["compression", "no_metrics"], optional = true } #sled = { git = "https://github.com/spacejam/sled.git", rev = "e4640e0773595229f398438886f19bca6f7326a2", features = ["compression"] } persy = { git = "https://gitlab.com/tglman/persy.git", branch="master" , optional = true, features=["background_ops"] } -# Used by the persy write cache for background flush -timer = "0.2" -chrono = "0.4" # Used for the http request / response body type for Ruma endpoints used with reqwest bytes = "1.1.0" @@ -91,7 +88,7 @@ sha-1 = "0.9.8" [features] default = ["conduit_bin", "backend_sqlite", "backend_rocksdb"] backend_sled = ["sled"] -backend_persy = ["persy"] +backend_persy = ["persy", "parking_lot"] backend_sqlite = ["sqlite"] backend_heed = ["heed", "crossbeam"] backend_rocksdb = ["rocksdb"] diff --git a/src/database/abstraction/persy.rs b/src/database/abstraction/persy.rs index 5d633ab4..71efed3b 100644 --- a/src/database/abstraction/persy.rs +++ b/src/database/abstraction/persy.rs @@ -1,20 +1,14 @@ use crate::{ database::{ - abstraction::{DatabaseEngine, Tree}, + abstraction::{watchers::Watchers, DatabaseEngine, Tree}, Config, }, Result, }; use persy::{ByteVec, OpenOptions, Persy, Transaction, TransactionConfig, ValueMode}; -use std::{ - collections::HashMap, - future::Future, - pin::Pin, - sync::{Arc, RwLock}, -}; +use std::{future::Future, pin::Pin, sync::Arc}; -use tokio::sync::oneshot::Sender; use tracing::warn; pub struct PersyEngine { @@ -44,7 +38,7 @@ impl DatabaseEngine for PersyEngine { Ok(Arc::new(PersyTree { persy: self.persy.clone(), name: name.to_owned(), - watchers: RwLock::new(HashMap::new()), + watchers: Watchers::default(), })) } @@ -56,7 +50,7 @@ impl DatabaseEngine for PersyEngine { pub struct PersyTree { persy: Persy, name: String, - watchers: RwLock, Vec>>>, + watchers: Watchers, } impl PersyTree { @@ -81,27 +75,7 @@ impl Tree for PersyTree { #[tracing::instrument(skip(self, key, value))] fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { self.insert_batch(&mut Some((key.to_owned(), value.to_owned())).into_iter())?; - let watchers = self.watchers.read().unwrap(); - let mut triggered = Vec::new(); - - for length in 0..=key.len() { - if watchers.contains_key(&key[..length]) { - triggered.push(&key[..length]); - } - } - - drop(watchers); - - if !triggered.is_empty() { - let mut watchers = self.watchers.write().unwrap(); - for prefix in triggered { - if let Some(txs) = watchers.remove(prefix) { - for tx in txs { - let _ = tx.send(()); - } - } - } - } + self.watchers.wake(key); Ok(()) } @@ -228,18 +202,6 @@ impl Tree for PersyTree { #[tracing::instrument(skip(self, prefix))] fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { - let (tx, rx) = tokio::sync::oneshot::channel(); - - self.watchers - .write() - .unwrap() - .entry(prefix.to_vec()) - .or_default() - .push(tx); - - Box::pin(async move { - // Tx is never destroyed - rx.await.unwrap(); - }) + self.watchers.watch(prefix) } } From f9977ca64f84768c0a71535f6038f4a6487ddc17 Mon Sep 17 00:00:00 2001 From: Tglman Date: Thu, 13 Jan 2022 22:37:19 +0000 Subject: [PATCH 3/4] fix: changes to update to the last database engine trait definition --- src/database/abstraction.rs | 7 ++++++- src/database/abstraction/persy.rs | 12 ++++++------ 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index 9a3771f3..09081826 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -18,7 +18,12 @@ pub mod rocksdb; #[cfg(feature = "persy")] pub mod persy; -#[cfg(any(feature = "sqlite", feature = "rocksdb", feature = "heed", feature="persy"))] +#[cfg(any( + feature = "sqlite", + feature = "rocksdb", + feature = "heed", + feature = "persy" +))] pub mod watchers; pub trait DatabaseEngine: Send + Sync { diff --git a/src/database/abstraction/persy.rs b/src/database/abstraction/persy.rs index 71efed3b..628cf32b 100644 --- a/src/database/abstraction/persy.rs +++ b/src/database/abstraction/persy.rs @@ -11,12 +11,12 @@ use std::{future::Future, pin::Pin, sync::Arc}; use tracing::warn; -pub struct PersyEngine { +pub struct Engine { persy: Persy, } -impl DatabaseEngine for PersyEngine { - fn open(config: &Config) -> Result> { +impl DatabaseEngine for Arc { + fn open(config: &Config) -> Result { let mut cfg = persy::Config::new(); cfg.change_cache_size((config.db_cache_capacity_mb * 1024.0 * 1024.0) as u64); @@ -24,10 +24,10 @@ impl DatabaseEngine for PersyEngine { .create(true) .config(cfg) .open(&format!("{}/db.persy", config.database_path))?; - Ok(Arc::new(PersyEngine { persy })) + Ok(Arc::new(Engine { persy })) } - fn open_tree(self: &Arc, name: &'static str) -> Result> { + fn open_tree(&self, name: &'static str) -> Result> { // Create if it doesn't exist if !self.persy.exists_index(name)? { let mut tx = self.persy.begin()?; @@ -42,7 +42,7 @@ impl DatabaseEngine for PersyEngine { })) } - fn flush(self: &Arc) -> Result<()> { + fn flush(&self) -> Result<()> { Ok(()) } } From c1cd4b5e26c68d1c5e91f85df2a65591f774d13c Mon Sep 17 00:00:00 2001 From: Tglman Date: Fri, 14 Jan 2022 21:00:13 +0000 Subject: [PATCH 4/4] chore: set the released version of persy in Cargo.toml --- Cargo.lock | 3 ++- Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index df37fd58..469c5663 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1670,7 +1670,8 @@ checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" [[package]] name = "persy" version = "1.2.0" -source = "git+https://gitlab.com/tglman/persy.git?branch=master#ff102d6edeaf14d30a846c2e2376a814685d09e7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29c6aa7d7f093620a28b74fcf5f5da73ba17a9e52fcbbdbb4ecc89e61cb2d673" dependencies = [ "crc", "data-encoding", diff --git a/Cargo.toml b/Cargo.toml index 7c94a693..df879c36 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ tokio = "1.11.0" # Used for storing data permanently sled = { version = "0.34.6", features = ["compression", "no_metrics"], optional = true } #sled = { git = "https://github.com/spacejam/sled.git", rev = "e4640e0773595229f398438886f19bca6f7326a2", features = ["compression"] } -persy = { git = "https://gitlab.com/tglman/persy.git", branch="master" , optional = true, features=["background_ops"] } +persy = { version = "1.2" , optional = true, features=["background_ops"] } # Used for the http request / response body type for Ruma endpoints used with reqwest bytes = "1.1.0"