feat: database backend selection at runtime

merge-requests/217/head
Timo Kösters 3 years ago
parent 4f39d36e98
commit fa6d7f7ccd
No known key found for this signature in database
GPG Key ID: 356E705610F626D5

@ -85,7 +85,7 @@ hmac = "0.11.0"
sha-1 = "0.9.8" sha-1 = "0.9.8"
[features] [features]
default = ["conduit_bin", "backend_rocksdb"] default = ["conduit_bin", "backend_sqlite", "backend_rocksdb"]
backend_sled = ["sled"] backend_sled = ["sled"]
backend_sqlite = ["sqlite"] backend_sqlite = ["sqlite"]
backend_heed = ["heed", "crossbeam"] backend_heed = ["heed", "crossbeam"]

@ -1,11 +1,15 @@
[global] [global]
# The server_name is the name of this server. It is used as a suffix for user # The server_name is the pretty name of this server. It is used as a suffix for user
# and room ids. Examples: matrix.org, conduit.rs # and room ids. Examples: matrix.org, conduit.rs
# The Conduit server needs to be reachable at https://your.server.name/ on port
# 443 (client-server) and 8448 (federation) OR you can create /.well-known # The Conduit server needs all /_matrix/ requests to be reachable at
# files to redirect requests. See # https://your.server.name/ on port 443 (client-server) and 8448 (federation).
# If that's not possible for you, you can create /.well-known files to redirect
# requests. See
# https://matrix.org/docs/spec/client_server/latest#get-well-known-matrix-client # https://matrix.org/docs/spec/client_server/latest#get-well-known-matrix-client
# and https://matrix.org/docs/spec/server_server/r0.1.4#get-well-known-matrix-server # and
# https://matrix.org/docs/spec/server_server/r0.1.4#get-well-known-matrix-server
# for more information # for more information
# YOU NEED TO EDIT THIS # YOU NEED TO EDIT THIS
@ -13,6 +17,7 @@
# This is the only directory where Conduit will save its data # This is the only directory where Conduit will save its data
database_path = "/var/lib/conduit/" database_path = "/var/lib/conduit/"
database_backend = "rocksdb"
# The port Conduit will be running on. You need to set up a reverse proxy in # The port Conduit will be running on. You need to set up a reverse proxy in
# your web server (e.g. apache or nginx), so all requests to /_matrix on port # your web server (e.g. apache or nginx), so all requests to /_matrix on port

@ -44,13 +44,15 @@ use self::proxy::ProxyConfig;
#[derive(Clone, Debug, Deserialize)] #[derive(Clone, Debug, Deserialize)]
pub struct Config { pub struct Config {
server_name: Box<ServerName>, server_name: Box<ServerName>,
#[serde(default = "default_database_backend")]
database_backend: String,
database_path: String, database_path: String,
#[serde(default = "default_db_cache_capacity_mb")] #[serde(default = "default_db_cache_capacity_mb")]
db_cache_capacity_mb: f64, db_cache_capacity_mb: f64,
#[serde(default = "default_pdu_cache_capacity")] #[serde(default = "default_pdu_cache_capacity")]
pdu_cache_capacity: u32, pdu_cache_capacity: u32,
#[serde(default = "default_sqlite_wal_clean_second_interval")] #[serde(default = "default_cleanup_second_interval")]
sqlite_wal_clean_second_interval: u32, cleanup_second_interval: u32,
#[serde(default = "default_max_request_size")] #[serde(default = "default_max_request_size")]
max_request_size: u32, max_request_size: u32,
#[serde(default = "default_max_concurrent_requests")] #[serde(default = "default_max_concurrent_requests")]
@ -117,6 +119,10 @@ fn true_fn() -> bool {
true true
} }
fn default_database_backend() -> String {
"sqlite".to_owned()
}
fn default_db_cache_capacity_mb() -> f64 { fn default_db_cache_capacity_mb() -> f64 {
200.0 200.0
} }
@ -125,7 +131,7 @@ fn default_pdu_cache_capacity() -> u32 {
100_000 100_000
} }
fn default_sqlite_wal_clean_second_interval() -> u32 { fn default_cleanup_second_interval() -> u32 {
1 * 60 // every minute 1 * 60 // every minute
} }
@ -145,20 +151,8 @@ fn default_turn_ttl() -> u64 {
60 * 60 * 24 60 * 60 * 24
} }
#[cfg(feature = "sled")]
pub type Engine = abstraction::sled::Engine;
#[cfg(feature = "sqlite")]
pub type Engine = abstraction::sqlite::Engine;
#[cfg(feature = "heed")]
pub type Engine = abstraction::heed::Engine;
#[cfg(feature = "rocksdb")]
pub type Engine = abstraction::rocksdb::Engine;
pub struct Database { pub struct Database {
_db: Arc<Engine>, _db: Arc<dyn DatabaseEngine>,
pub globals: globals::Globals, pub globals: globals::Globals,
pub users: users::Users, pub users: users::Users,
pub uiaa: uiaa::Uiaa, pub uiaa: uiaa::Uiaa,
@ -186,27 +180,53 @@ impl Database {
Ok(()) Ok(())
} }
fn check_sled_or_sqlite_db(config: &Config) -> Result<()> { fn check_db_setup(config: &Config) -> Result<()> {
#[cfg(feature = "backend_sqlite")] let path = Path::new(&config.database_path);
{
let path = Path::new(&config.database_path); let sled_exists = path.join("db").exists();
let sqlite_exists = path.join("conduit.db").exists();
let sled_exists = path.join("db").exists(); let rocksdb_exists = path.join("IDENTITY").exists();
let sqlite_exists = path.join("conduit.db").exists();
if sled_exists { let mut count = 0;
if sqlite_exists {
// most likely an in-place directory, only warn if sled_exists {
warn!("Both sled and sqlite databases are detected in database directory"); count += 1;
warn!("Currently running from the sqlite database, but consider removing sled database files to free up space") }
} else {
error!( if sqlite_exists {
"Sled database detected, conduit now uses sqlite for database operations" count += 1;
); }
error!("This database must be converted to sqlite, go to https://github.com/ShadowJonathan/conduit_toolbox#conduit_sled_to_sqlite");
return Err(Error::bad_config( if rocksdb_exists {
"sled database detected, migrate to sqlite", count += 1;
)); }
}
if count > 1 {
warn!("Multiple databases at database_path detected");
return Ok(());
}
if sled_exists {
if config.database_backend != "sled" {
return Err(Error::bad_config(
"Found sled at database_path, but is not specified in config.",
));
}
}
if sqlite_exists {
if config.database_backend != "sqlite" {
return Err(Error::bad_config(
"Found sqlite at database_path, but is not specified in config.",
));
}
}
if rocksdb_exists {
if config.database_backend != "rocksdb" {
return Err(Error::bad_config(
"Found rocksdb at database_path, but is not specified in config.",
));
} }
} }
@ -215,14 +235,30 @@ impl Database {
/// Load an existing database or create a new one. /// Load an existing database or create a new one.
pub async fn load_or_create(config: &Config) -> Result<Arc<TokioRwLock<Self>>> { pub async fn load_or_create(config: &Config) -> Result<Arc<TokioRwLock<Self>>> {
Self::check_sled_or_sqlite_db(config)?; Self::check_db_setup(config)?;
if !Path::new(&config.database_path).exists() { if !Path::new(&config.database_path).exists() {
std::fs::create_dir_all(&config.database_path) std::fs::create_dir_all(&config.database_path)
.map_err(|_| Error::BadConfig("Database folder doesn't exists and couldn't be created (e.g. due to missing permissions). Please create the database folder yourself."))?; .map_err(|_| Error::BadConfig("Database folder doesn't exists and couldn't be created (e.g. due to missing permissions). Please create the database folder yourself."))?;
} }
let builder = Engine::open(config)?; let builder: Arc<dyn DatabaseEngine> = match &*config.database_backend {
"sqlite" => {
#[cfg(not(feature = "sqlite"))]
return Err(Error::BadConfig("Database backend not found."));
#[cfg(feature = "sqlite")]
Arc::new(Arc::<abstraction::sqlite::Engine>::open(config)?)
}
"rocksdb" => {
#[cfg(not(feature = "rocksdb"))]
return Err(Error::BadConfig("Database backend not found."));
#[cfg(feature = "rocksdb")]
Arc::new(Arc::<abstraction::rocksdb::Engine>::open(config)?)
}
_ => {
return Err(Error::BadConfig("Database backend not found."));
}
};
if config.max_request_size < 1024 { if config.max_request_size < 1024 {
eprintln!("ERROR: Max request size is less than 1KB. Please increase it."); eprintln!("ERROR: Max request size is less than 1KB. Please increase it.");
@ -784,10 +820,7 @@ impl Database {
drop(guard); drop(guard);
#[cfg(feature = "sqlite")] Self::start_cleanup_task(Arc::clone(&db), config).await;
{
Self::start_wal_clean_task(Arc::clone(&db), config).await;
}
Ok(db) Ok(db)
} }
@ -925,15 +958,8 @@ impl Database {
res res
} }
#[cfg(feature = "sqlite")]
#[tracing::instrument(skip(self))]
pub fn flush_wal(&self) -> Result<()> {
self._db.flush_wal()
}
#[cfg(feature = "sqlite")]
#[tracing::instrument(skip(db, config))] #[tracing::instrument(skip(db, config))]
pub async fn start_wal_clean_task(db: Arc<TokioRwLock<Self>>, config: &Config) { pub async fn start_cleanup_task(db: Arc<TokioRwLock<Self>>, config: &Config) {
use tokio::time::interval; use tokio::time::interval;
#[cfg(unix)] #[cfg(unix)]
@ -942,7 +968,7 @@ impl Database {
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
let timer_interval = Duration::from_secs(config.sqlite_wal_clean_second_interval as u64); let timer_interval = Duration::from_secs(config.cleanup_second_interval as u64);
tokio::spawn(async move { tokio::spawn(async move {
let mut i = interval(timer_interval); let mut i = interval(timer_interval);
@ -953,23 +979,23 @@ impl Database {
#[cfg(unix)] #[cfg(unix)]
tokio::select! { tokio::select! {
_ = i.tick() => { _ = i.tick() => {
info!("wal-trunc: Timer ticked"); info!("cleanup: Timer ticked");
} }
_ = s.recv() => { _ = s.recv() => {
info!("wal-trunc: Received SIGHUP"); info!("cleanup: Received SIGHUP");
} }
}; };
#[cfg(not(unix))] #[cfg(not(unix))]
{ {
i.tick().await; i.tick().await;
info!("wal-trunc: Timer ticked") info!("cleanup: Timer ticked")
} }
let start = Instant::now(); let start = Instant::now();
if let Err(e) = db.read().await.flush_wal() { if let Err(e) = db.read().await._db.cleanup() {
error!("wal-trunc: Errored: {}", e); error!("cleanup: Errored: {}", e);
} else { } else {
info!("wal-trunc: Flushed in {:?}", start.elapsed()); info!("cleanup: Finished in {:?}", start.elapsed());
} }
} }
}); });

@ -18,10 +18,15 @@ pub mod rocksdb;
#[cfg(any(feature = "sqlite", feature = "rocksdb", feature = "heed"))] #[cfg(any(feature = "sqlite", feature = "rocksdb", feature = "heed"))]
pub mod watchers; pub mod watchers;
pub trait DatabaseEngine: Sized { pub trait DatabaseEngine: Send + Sync {
fn open(config: &Config) -> Result<Arc<Self>>; fn open(config: &Config) -> Result<Self>
fn open_tree(self: &Arc<Self>, name: &'static str) -> Result<Arc<dyn Tree>>; where
fn flush(self: &Arc<Self>) -> Result<()>; Self: Sized;
fn open_tree(&self, name: &'static str) -> Result<Arc<dyn Tree>>;
fn flush(self: &Self) -> Result<()>;
fn cleanup(self: &Self) -> Result<()> {
Ok(())
}
} }
pub trait Tree: Send + Sync { pub trait Tree: Send + Sync {

@ -14,8 +14,8 @@ pub struct RocksDbEngineTree<'a> {
write_lock: RwLock<()> write_lock: RwLock<()>
} }
impl DatabaseEngine for Engine { impl DatabaseEngine for Arc<Engine> {
fn open(config: &Config) -> Result<Arc<Self>> { fn open(config: &Config) -> Result<Self> {
let mut db_opts = rocksdb::Options::default(); let mut db_opts = rocksdb::Options::default();
db_opts.create_if_missing(true); db_opts.create_if_missing(true);
db_opts.set_max_open_files(512); db_opts.set_max_open_files(512);
@ -60,7 +60,7 @@ impl DatabaseEngine for Engine {
})) }))
} }
fn open_tree(self: &Arc<Self>, name: &'static str) -> Result<Arc<dyn Tree>> { fn open_tree(&self, name: &'static str) -> Result<Arc<dyn Tree>> {
if !self.old_cfs.contains(&name.to_owned()) { if !self.old_cfs.contains(&name.to_owned()) {
// Create if it didn't exist // Create if it didn't exist
let mut options = rocksdb::Options::default(); let mut options = rocksdb::Options::default();
@ -68,7 +68,6 @@ impl DatabaseEngine for Engine {
options.set_prefix_extractor(prefix_extractor); options.set_prefix_extractor(prefix_extractor);
let _ = self.rocks.create_cf(name, &options); let _ = self.rocks.create_cf(name, &options);
println!("created cf");
} }
Ok(Arc::new(RocksDbEngineTree { Ok(Arc::new(RocksDbEngineTree {
@ -79,7 +78,7 @@ impl DatabaseEngine for Engine {
})) }))
} }
fn flush(self: &Arc<Self>) -> Result<()> { fn flush(&self) -> Result<()> {
// TODO? // TODO?
Ok(()) Ok(())
} }

@ -80,8 +80,8 @@ impl Engine {
} }
} }
impl DatabaseEngine for Engine { impl DatabaseEngine for Arc<Engine> {
fn open(config: &Config) -> Result<Arc<Self>> { fn open(config: &Config) -> Result<Self> {
let path = Path::new(&config.database_path).join("conduit.db"); let path = Path::new(&config.database_path).join("conduit.db");
// calculates cache-size per permanent connection // calculates cache-size per permanent connection
@ -92,7 +92,7 @@ impl DatabaseEngine for Engine {
/ ((num_cpus::get().max(1) * 2) + 1) as f64) / ((num_cpus::get().max(1) * 2) + 1) as f64)
as u32; as u32;
let writer = Mutex::new(Self::prepare_conn(&path, cache_size_per_thread)?); let writer = Mutex::new(Engine::prepare_conn(&path, cache_size_per_thread)?);
let arc = Arc::new(Engine { let arc = Arc::new(Engine {
writer, writer,
@ -105,7 +105,7 @@ impl DatabaseEngine for Engine {
Ok(arc) Ok(arc)
} }
fn open_tree(self: &Arc<Self>, name: &str) -> Result<Arc<dyn Tree>> { fn open_tree(&self, name: &str) -> Result<Arc<dyn Tree>> {
self.write_lock().execute(&format!("CREATE TABLE IF NOT EXISTS {} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )", name), [])?; self.write_lock().execute(&format!("CREATE TABLE IF NOT EXISTS {} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )", name), [])?;
Ok(Arc::new(SqliteTable { Ok(Arc::new(SqliteTable {
@ -115,10 +115,14 @@ impl DatabaseEngine for Engine {
})) }))
} }
fn flush(self: &Arc<Self>) -> Result<()> { fn flush(&self) -> Result<()> {
// we enabled PRAGMA synchronous=normal, so this should not be necessary // we enabled PRAGMA synchronous=normal, so this should not be necessary
Ok(()) Ok(())
} }
fn cleanup(&self) -> Result<()> {
self.flush_wal()
}
} }
pub struct SqliteTable { pub struct SqliteTable {

@ -29,17 +29,6 @@ pub fn increment(old: Option<&[u8]>) -> Option<Vec<u8>> {
Some(number.to_be_bytes().to_vec()) Some(number.to_be_bytes().to_vec())
} }
#[cfg(feature = "rocksdb")]
pub fn increment_rocksdb(
_new_key: &[u8],
old: Option<&[u8]>,
_operands: &mut rocksdb::MergeOperands,
) -> Option<Vec<u8>> {
dbg!(_new_key);
dbg!(old);
increment(old)
}
pub fn generate_keypair() -> Vec<u8> { pub fn generate_keypair() -> Vec<u8> {
let mut value = random_string(8).as_bytes().to_vec(); let mut value = random_string(8).as_bytes().to_vec();
value.push(0xff); value.push(0xff);

Loading…
Cancel
Save