diff --git a/Cargo.lock b/Cargo.lock index 5be10f14..8fe767e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -293,6 +293,7 @@ dependencies = [ "opentelemetry", "opentelemetry-jaeger", "parking_lot", + "persy", "rand 0.8.4", "regex", "reqwest", @@ -374,6 +375,21 @@ dependencies = [ "libc", ] +[[package]] +name = "crc" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49fc9a695bca7f35f5f4c15cddc84415f66a74ea78eef08e90c5024f2b540e23" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccaeedb56da03b09f598226e25e80088cb4cd25f316e6e4df7d695f0feeb1403" + [[package]] name = "crc32fast" version = "1.3.0" @@ -1651,6 +1667,22 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "persy" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29c6aa7d7f093620a28b74fcf5f5da73ba17a9e52fcbbdbb4ecc89e61cb2d673" +dependencies = [ + "crc", + "data-encoding", + "fs2", + "linked-hash-map", + "rand 0.8.4", + "thiserror", + "unsigned-varint", + "zigzag", +] + [[package]] name = "pin-project" version = "1.0.10" @@ -3293,6 +3325,12 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f14ee04d9415b52b3aeab06258a3f07093182b88ba0f9b8d203f211a7a7d41c7" +[[package]] +name = "unsigned-varint" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d86a8dc7f45e4c1b0d30e43038c38f274e77af056aa5f74b93c2cf9eb3c1c836" + [[package]] name = "untrusted" version = "0.7.1" @@ -3544,6 +3582,15 @@ dependencies = [ "synstructure", ] +[[package]] +name = "zigzag" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70b40401a28d86ce16a330b863b86fd7dbee4d7c940587ab09ab8c019f9e3fdf" +dependencies = [ + "num-traits", +] + [[package]] name = "zstd" version = "0.9.2+zstd.1.5.1" diff --git a/Cargo.toml b/Cargo.toml index f335873a..e3614ec4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ tokio = "1.11.0" # Used for storing data permanently sled = { version = "0.34.6", features = ["compression", "no_metrics"], optional = true } #sled = { git = "https://github.com/spacejam/sled.git", rev = "e4640e0773595229f398438886f19bca6f7326a2", features = ["compression"] } +persy = { version = "1.2" , optional = true, features=["background_ops"] } # Used for the http request / response body type for Ruma endpoints used with reqwest bytes = "1.1.0" @@ -89,6 +90,7 @@ sha-1 = "0.9.8" [features] default = ["conduit_bin", "backend_sqlite", "backend_rocksdb"] backend_sled = ["sled"] +backend_persy = ["persy", "parking_lot"] backend_sqlite = ["sqlite"] backend_heed = ["heed", "crossbeam"] backend_rocksdb = ["rocksdb"] diff --git a/src/database.rs b/src/database.rs index 7a4ddc66..4f230f32 100644 --- a/src/database.rs +++ b/src/database.rs @@ -255,6 +255,12 @@ impl Database { #[cfg(feature = "rocksdb")] Arc::new(Arc::::open(config)?) } + "persy" => { + #[cfg(not(feature = "persy"))] + return Err(Error::BadConfig("Database backend not found.")); + #[cfg(feature = "persy")] + Arc::new(Arc::::open(config)?) + } _ => { return Err(Error::BadConfig("Database backend not found.")); } diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index 321b064f..74f3a45a 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -15,7 +15,15 @@ pub mod heed; #[cfg(feature = "rocksdb")] pub mod rocksdb; -#[cfg(any(feature = "sqlite", feature = "rocksdb", feature = "heed"))] +#[cfg(feature = "persy")] +pub mod persy; + +#[cfg(any( + feature = "sqlite", + feature = "rocksdb", + feature = "heed", + feature = "persy" +))] pub mod watchers; pub trait DatabaseEngine: Send + Sync { diff --git a/src/database/abstraction/persy.rs b/src/database/abstraction/persy.rs new file mode 100644 index 00000000..628cf32b --- /dev/null +++ b/src/database/abstraction/persy.rs @@ -0,0 +1,207 @@ +use crate::{ + database::{ + abstraction::{watchers::Watchers, DatabaseEngine, Tree}, + Config, + }, + Result, +}; +use persy::{ByteVec, OpenOptions, Persy, Transaction, TransactionConfig, ValueMode}; + +use std::{future::Future, pin::Pin, sync::Arc}; + +use tracing::warn; + +pub struct Engine { + persy: Persy, +} + +impl DatabaseEngine for Arc { + fn open(config: &Config) -> Result { + let mut cfg = persy::Config::new(); + cfg.change_cache_size((config.db_cache_capacity_mb * 1024.0 * 1024.0) as u64); + + let persy = OpenOptions::new() + .create(true) + .config(cfg) + .open(&format!("{}/db.persy", config.database_path))?; + Ok(Arc::new(Engine { persy })) + } + + fn open_tree(&self, name: &'static str) -> Result> { + // Create if it doesn't exist + if !self.persy.exists_index(name)? { + let mut tx = self.persy.begin()?; + tx.create_index::(name, ValueMode::Replace)?; + tx.prepare()?.commit()?; + } + + Ok(Arc::new(PersyTree { + persy: self.persy.clone(), + name: name.to_owned(), + watchers: Watchers::default(), + })) + } + + fn flush(&self) -> Result<()> { + Ok(()) + } +} + +pub struct PersyTree { + persy: Persy, + name: String, + watchers: Watchers, +} + +impl PersyTree { + fn begin(&self) -> Result { + Ok(self + .persy + .begin_with(TransactionConfig::new().set_background_sync(true))?) + } +} + +impl Tree for PersyTree { + #[tracing::instrument(skip(self, key))] + fn get(&self, key: &[u8]) -> Result>> { + let result = self + .persy + .get::(&self.name, &ByteVec::from(key))? + .next() + .map(|v| (*v).to_owned()); + Ok(result) + } + + #[tracing::instrument(skip(self, key, value))] + fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { + self.insert_batch(&mut Some((key.to_owned(), value.to_owned())).into_iter())?; + self.watchers.wake(key); + Ok(()) + } + + #[tracing::instrument(skip(self, iter))] + fn insert_batch<'a>(&self, iter: &mut dyn Iterator, Vec)>) -> Result<()> { + let mut tx = self.begin()?; + for (key, value) in iter { + tx.put::( + &self.name, + ByteVec::from(key.clone()), + ByteVec::from(value), + )?; + } + tx.prepare()?.commit()?; + Ok(()) + } + + #[tracing::instrument(skip(self, iter))] + fn increment_batch<'a>(&self, iter: &mut dyn Iterator>) -> Result<()> { + let mut tx = self.begin()?; + for key in iter { + let old = tx + .get::(&self.name, &ByteVec::from(key.clone()))? + .next() + .map(|v| (*v).to_owned()); + let new = crate::utils::increment(old.as_deref()).unwrap(); + tx.put::(&self.name, ByteVec::from(key), ByteVec::from(new))?; + } + tx.prepare()?.commit()?; + Ok(()) + } + + #[tracing::instrument(skip(self, key))] + fn remove(&self, key: &[u8]) -> Result<()> { + let mut tx = self.begin()?; + tx.remove::(&self.name, ByteVec::from(key), None)?; + tx.prepare()?.commit()?; + Ok(()) + } + + #[tracing::instrument(skip(self))] + fn iter<'a>(&'a self) -> Box, Vec)> + 'a> { + let iter = self.persy.range::(&self.name, ..); + match iter { + Ok(iter) => Box::new(iter.filter_map(|(k, v)| { + v.into_iter() + .map(|val| ((*k).to_owned().into(), (*val).to_owned().into())) + .next() + })), + Err(e) => { + warn!("error iterating {:?}", e); + Box::new(std::iter::empty()) + } + } + } + + #[tracing::instrument(skip(self, from, backwards))] + fn iter_from<'a>( + &'a self, + from: &[u8], + backwards: bool, + ) -> Box, Vec)> + 'a> { + let range = if backwards { + self.persy + .range::(&self.name, ..=ByteVec::from(from)) + } else { + self.persy + .range::(&self.name, ByteVec::from(from)..) + }; + match range { + Ok(iter) => { + let map = iter.filter_map(|(k, v)| { + v.into_iter() + .map(|val| ((*k).to_owned().into(), (*val).to_owned().into())) + .next() + }); + if backwards { + Box::new(map.rev()) + } else { + Box::new(map) + } + } + Err(e) => { + warn!("error iterating with prefix {:?}", e); + Box::new(std::iter::empty()) + } + } + } + + #[tracing::instrument(skip(self, key))] + fn increment(&self, key: &[u8]) -> Result> { + self.increment_batch(&mut Some(key.to_owned()).into_iter())?; + Ok(self.get(key)?.unwrap()) + } + + #[tracing::instrument(skip(self, prefix))] + fn scan_prefix<'a>( + &'a self, + prefix: Vec, + ) -> Box, Vec)> + 'a> { + let range_prefix = ByteVec::from(prefix.clone()); + let range = self + .persy + .range::(&self.name, range_prefix..); + + match range { + Ok(iter) => { + let owned_prefix = prefix.clone(); + Box::new( + iter.take_while(move |(k, _)| (*k).starts_with(&owned_prefix)) + .filter_map(|(k, v)| { + v.into_iter() + .map(|val| ((*k).to_owned().into(), (*val).to_owned().into())) + .next() + }), + ) + } + Err(e) => { + warn!("error scanning prefix {:?}", e); + Box::new(std::iter::empty()) + } + } + } + + #[tracing::instrument(skip(self, prefix))] + fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { + self.watchers.watch(prefix) + } +} diff --git a/src/error.rs b/src/error.rs index 4d427da4..5ffe48c9 100644 --- a/src/error.rs +++ b/src/error.rs @@ -8,6 +8,9 @@ use ruma::{ use thiserror::Error; use tracing::warn; +#[cfg(feature = "persy")] +use persy::PersyError; + #[cfg(feature = "conduit_bin")] use { crate::RumaResponse, @@ -36,6 +39,9 @@ pub enum Error { #[from] source: rusqlite::Error, }, + #[cfg(feature = "persy")] + #[error("There was a problem with the connection to the persy database.")] + PersyError { source: PersyError }, #[cfg(feature = "heed")] #[error("There was a problem with the connection to the heed database: {error}")] HeedError { error: String }, @@ -142,3 +148,12 @@ where self.to_response().respond_to(r) } } + +#[cfg(feature = "persy")] +impl> From> for Error { + fn from(err: persy::PE) -> Self { + Error::PersyError { + source: err.error().into(), + } + } +}