From fa6d7f7ccd14426f1fc2d802fff021b06f39bf02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Sun, 9 Jan 2022 16:44:44 +0100 Subject: [PATCH] feat: database backend selection at runtime --- Cargo.toml | 2 +- conduit-example.toml | 15 ++- src/database.rs | 142 ++++++++++++++++------------ src/database/abstraction.rs | 13 ++- src/database/abstraction/rocksdb.rs | 9 +- src/database/abstraction/sqlite.rs | 14 ++- src/utils.rs | 11 --- 7 files changed, 117 insertions(+), 89 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6241b6a8..c898d4d6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,7 +85,7 @@ hmac = "0.11.0" sha-1 = "0.9.8" [features] -default = ["conduit_bin", "backend_rocksdb"] +default = ["conduit_bin", "backend_sqlite", "backend_rocksdb"] backend_sled = ["sled"] backend_sqlite = ["sqlite"] backend_heed = ["heed", "crossbeam"] diff --git a/conduit-example.toml b/conduit-example.toml index 4275f528..c0274a4d 100644 --- a/conduit-example.toml +++ b/conduit-example.toml @@ -1,11 +1,15 @@ [global] -# The server_name is the name of this server. It is used as a suffix for user +# The server_name is the pretty name of this server. It is used as a suffix for user # and room ids. Examples: matrix.org, conduit.rs -# The Conduit server needs to be reachable at https://your.server.name/ on port -# 443 (client-server) and 8448 (federation) OR you can create /.well-known -# files to redirect requests. See + +# The Conduit server needs all /_matrix/ requests to be reachable at +# https://your.server.name/ on port 443 (client-server) and 8448 (federation). + +# If that's not possible for you, you can create /.well-known files to redirect +# requests. See # https://matrix.org/docs/spec/client_server/latest#get-well-known-matrix-client -# and https://matrix.org/docs/spec/server_server/r0.1.4#get-well-known-matrix-server +# and +# https://matrix.org/docs/spec/server_server/r0.1.4#get-well-known-matrix-server # for more information # YOU NEED TO EDIT THIS @@ -13,6 +17,7 @@ # This is the only directory where Conduit will save its data database_path = "/var/lib/conduit/" +database_backend = "rocksdb" # The port Conduit will be running on. You need to set up a reverse proxy in # your web server (e.g. apache or nginx), so all requests to /_matrix on port diff --git a/src/database.rs b/src/database.rs index ddf701bb..c2b3e2b9 100644 --- a/src/database.rs +++ b/src/database.rs @@ -44,13 +44,15 @@ use self::proxy::ProxyConfig; #[derive(Clone, Debug, Deserialize)] pub struct Config { server_name: Box, + #[serde(default = "default_database_backend")] + database_backend: String, database_path: String, #[serde(default = "default_db_cache_capacity_mb")] db_cache_capacity_mb: f64, #[serde(default = "default_pdu_cache_capacity")] pdu_cache_capacity: u32, - #[serde(default = "default_sqlite_wal_clean_second_interval")] - sqlite_wal_clean_second_interval: u32, + #[serde(default = "default_cleanup_second_interval")] + cleanup_second_interval: u32, #[serde(default = "default_max_request_size")] max_request_size: u32, #[serde(default = "default_max_concurrent_requests")] @@ -117,6 +119,10 @@ fn true_fn() -> bool { true } +fn default_database_backend() -> String { + "sqlite".to_owned() +} + fn default_db_cache_capacity_mb() -> f64 { 200.0 } @@ -125,7 +131,7 @@ fn default_pdu_cache_capacity() -> u32 { 100_000 } -fn default_sqlite_wal_clean_second_interval() -> u32 { +fn default_cleanup_second_interval() -> u32 { 1 * 60 // every minute } @@ -145,20 +151,8 @@ fn default_turn_ttl() -> u64 { 60 * 60 * 24 } -#[cfg(feature = "sled")] -pub type Engine = abstraction::sled::Engine; - -#[cfg(feature = "sqlite")] -pub type Engine = abstraction::sqlite::Engine; - -#[cfg(feature = "heed")] -pub type Engine = abstraction::heed::Engine; - -#[cfg(feature = "rocksdb")] -pub type Engine = abstraction::rocksdb::Engine; - pub struct Database { - _db: Arc, + _db: Arc, pub globals: globals::Globals, pub users: users::Users, pub uiaa: uiaa::Uiaa, @@ -186,27 +180,53 @@ impl Database { Ok(()) } - fn check_sled_or_sqlite_db(config: &Config) -> Result<()> { - #[cfg(feature = "backend_sqlite")] - { - let path = Path::new(&config.database_path); - - let sled_exists = path.join("db").exists(); - let sqlite_exists = path.join("conduit.db").exists(); - if sled_exists { - if sqlite_exists { - // most likely an in-place directory, only warn - warn!("Both sled and sqlite databases are detected in database directory"); - warn!("Currently running from the sqlite database, but consider removing sled database files to free up space") - } else { - error!( - "Sled database detected, conduit now uses sqlite for database operations" - ); - error!("This database must be converted to sqlite, go to https://github.com/ShadowJonathan/conduit_toolbox#conduit_sled_to_sqlite"); - return Err(Error::bad_config( - "sled database detected, migrate to sqlite", - )); - } + fn check_db_setup(config: &Config) -> Result<()> { + let path = Path::new(&config.database_path); + + let sled_exists = path.join("db").exists(); + let sqlite_exists = path.join("conduit.db").exists(); + let rocksdb_exists = path.join("IDENTITY").exists(); + + let mut count = 0; + + if sled_exists { + count += 1; + } + + if sqlite_exists { + count += 1; + } + + if rocksdb_exists { + count += 1; + } + + if count > 1 { + warn!("Multiple databases at database_path detected"); + return Ok(()); + } + + if sled_exists { + if config.database_backend != "sled" { + return Err(Error::bad_config( + "Found sled at database_path, but is not specified in config.", + )); + } + } + + if sqlite_exists { + if config.database_backend != "sqlite" { + return Err(Error::bad_config( + "Found sqlite at database_path, but is not specified in config.", + )); + } + } + + if rocksdb_exists { + if config.database_backend != "rocksdb" { + return Err(Error::bad_config( + "Found rocksdb at database_path, but is not specified in config.", + )); } } @@ -215,14 +235,30 @@ impl Database { /// Load an existing database or create a new one. pub async fn load_or_create(config: &Config) -> Result>> { - Self::check_sled_or_sqlite_db(config)?; + Self::check_db_setup(config)?; if !Path::new(&config.database_path).exists() { std::fs::create_dir_all(&config.database_path) .map_err(|_| Error::BadConfig("Database folder doesn't exists and couldn't be created (e.g. due to missing permissions). Please create the database folder yourself."))?; } - let builder = Engine::open(config)?; + let builder: Arc = match &*config.database_backend { + "sqlite" => { + #[cfg(not(feature = "sqlite"))] + return Err(Error::BadConfig("Database backend not found.")); + #[cfg(feature = "sqlite")] + Arc::new(Arc::::open(config)?) + } + "rocksdb" => { + #[cfg(not(feature = "rocksdb"))] + return Err(Error::BadConfig("Database backend not found.")); + #[cfg(feature = "rocksdb")] + Arc::new(Arc::::open(config)?) + } + _ => { + return Err(Error::BadConfig("Database backend not found.")); + } + }; if config.max_request_size < 1024 { eprintln!("ERROR: Max request size is less than 1KB. Please increase it."); @@ -784,10 +820,7 @@ impl Database { drop(guard); - #[cfg(feature = "sqlite")] - { - Self::start_wal_clean_task(Arc::clone(&db), config).await; - } + Self::start_cleanup_task(Arc::clone(&db), config).await; Ok(db) } @@ -925,15 +958,8 @@ impl Database { res } - #[cfg(feature = "sqlite")] - #[tracing::instrument(skip(self))] - pub fn flush_wal(&self) -> Result<()> { - self._db.flush_wal() - } - - #[cfg(feature = "sqlite")] #[tracing::instrument(skip(db, config))] - pub async fn start_wal_clean_task(db: Arc>, config: &Config) { + pub async fn start_cleanup_task(db: Arc>, config: &Config) { use tokio::time::interval; #[cfg(unix)] @@ -942,7 +968,7 @@ impl Database { use std::time::{Duration, Instant}; - let timer_interval = Duration::from_secs(config.sqlite_wal_clean_second_interval as u64); + let timer_interval = Duration::from_secs(config.cleanup_second_interval as u64); tokio::spawn(async move { let mut i = interval(timer_interval); @@ -953,23 +979,23 @@ impl Database { #[cfg(unix)] tokio::select! { _ = i.tick() => { - info!("wal-trunc: Timer ticked"); + info!("cleanup: Timer ticked"); } _ = s.recv() => { - info!("wal-trunc: Received SIGHUP"); + info!("cleanup: Received SIGHUP"); } }; #[cfg(not(unix))] { i.tick().await; - info!("wal-trunc: Timer ticked") + info!("cleanup: Timer ticked") } let start = Instant::now(); - if let Err(e) = db.read().await.flush_wal() { - error!("wal-trunc: Errored: {}", e); + if let Err(e) = db.read().await._db.cleanup() { + error!("cleanup: Errored: {}", e); } else { - info!("wal-trunc: Flushed in {:?}", start.elapsed()); + info!("cleanup: Finished in {:?}", start.elapsed()); } } }); diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index a347f831..45627bbc 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -18,10 +18,15 @@ pub mod rocksdb; #[cfg(any(feature = "sqlite", feature = "rocksdb", feature = "heed"))] pub mod watchers; -pub trait DatabaseEngine: Sized { - fn open(config: &Config) -> Result>; - fn open_tree(self: &Arc, name: &'static str) -> Result>; - fn flush(self: &Arc) -> Result<()>; +pub trait DatabaseEngine: Send + Sync { + fn open(config: &Config) -> Result + where + Self: Sized; + fn open_tree(&self, name: &'static str) -> Result>; + fn flush(self: &Self) -> Result<()>; + fn cleanup(self: &Self) -> Result<()> { + Ok(()) + } } pub trait Tree: Send + Sync { diff --git a/src/database/abstraction/rocksdb.rs b/src/database/abstraction/rocksdb.rs index 397047bd..a41ed1fb 100644 --- a/src/database/abstraction/rocksdb.rs +++ b/src/database/abstraction/rocksdb.rs @@ -14,8 +14,8 @@ pub struct RocksDbEngineTree<'a> { write_lock: RwLock<()> } -impl DatabaseEngine for Engine { - fn open(config: &Config) -> Result> { +impl DatabaseEngine for Arc { + fn open(config: &Config) -> Result { let mut db_opts = rocksdb::Options::default(); db_opts.create_if_missing(true); db_opts.set_max_open_files(512); @@ -60,7 +60,7 @@ impl DatabaseEngine for Engine { })) } - fn open_tree(self: &Arc, name: &'static str) -> Result> { + fn open_tree(&self, name: &'static str) -> Result> { if !self.old_cfs.contains(&name.to_owned()) { // Create if it didn't exist let mut options = rocksdb::Options::default(); @@ -68,7 +68,6 @@ impl DatabaseEngine for Engine { options.set_prefix_extractor(prefix_extractor); let _ = self.rocks.create_cf(name, &options); - println!("created cf"); } Ok(Arc::new(RocksDbEngineTree { @@ -79,7 +78,7 @@ impl DatabaseEngine for Engine { })) } - fn flush(self: &Arc) -> Result<()> { + fn flush(&self) -> Result<()> { // TODO? Ok(()) } diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index 31875667..d4fd0bdd 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -80,8 +80,8 @@ impl Engine { } } -impl DatabaseEngine for Engine { - fn open(config: &Config) -> Result> { +impl DatabaseEngine for Arc { + fn open(config: &Config) -> Result { let path = Path::new(&config.database_path).join("conduit.db"); // calculates cache-size per permanent connection @@ -92,7 +92,7 @@ impl DatabaseEngine for Engine { / ((num_cpus::get().max(1) * 2) + 1) as f64) as u32; - let writer = Mutex::new(Self::prepare_conn(&path, cache_size_per_thread)?); + let writer = Mutex::new(Engine::prepare_conn(&path, cache_size_per_thread)?); let arc = Arc::new(Engine { writer, @@ -105,7 +105,7 @@ impl DatabaseEngine for Engine { Ok(arc) } - fn open_tree(self: &Arc, name: &str) -> Result> { + fn open_tree(&self, name: &str) -> Result> { self.write_lock().execute(&format!("CREATE TABLE IF NOT EXISTS {} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )", name), [])?; Ok(Arc::new(SqliteTable { @@ -115,10 +115,14 @@ impl DatabaseEngine for Engine { })) } - fn flush(self: &Arc) -> Result<()> { + fn flush(&self) -> Result<()> { // we enabled PRAGMA synchronous=normal, so this should not be necessary Ok(()) } + + fn cleanup(&self) -> Result<()> { + self.flush_wal() + } } pub struct SqliteTable { diff --git a/src/utils.rs b/src/utils.rs index 4702d051..26d71a8c 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -29,17 +29,6 @@ pub fn increment(old: Option<&[u8]>) -> Option> { Some(number.to_be_bytes().to_vec()) } -#[cfg(feature = "rocksdb")] -pub fn increment_rocksdb( - _new_key: &[u8], - old: Option<&[u8]>, - _operands: &mut rocksdb::MergeOperands, -) -> Option> { - dbg!(_new_key); - dbg!(old); - increment(old) -} - pub fn generate_keypair() -> Vec { let mut value = random_string(8).as_bytes().to_vec(); value.push(0xff);