improvement: better account data implementation

merge-requests/140/head
Timo Kösters 3 years ago
parent 5c776e9ba7
commit 5df6b8cd5f
No known key found for this signature in database
GPG Key ID: 24DA7517711A2BA4

10
Cargo.lock generated

@ -944,7 +944,7 @@ dependencies = [
[[package]] [[package]]
name = "heed" name = "heed"
version = "0.10.6" version = "0.10.6"
source = "git+https://github.com/Kerollmops/heed.git?rev=b235e9c3e9984737c967b5de1014b48f125dc28b#b235e9c3e9984737c967b5de1014b48f125dc28b" source = "git+https://github.com/timokoesters/heed.git?rev=c6b149fd5621999b0d5ef0c28e199015cfc60fa1#c6b149fd5621999b0d5ef0c28e199015cfc60fa1"
dependencies = [ dependencies = [
"bytemuck", "bytemuck",
"byteorder", "byteorder",
@ -962,12 +962,12 @@ dependencies = [
[[package]] [[package]]
name = "heed-traits" name = "heed-traits"
version = "0.7.0" version = "0.7.0"
source = "git+https://github.com/Kerollmops/heed.git?rev=b235e9c3e9984737c967b5de1014b48f125dc28b#b235e9c3e9984737c967b5de1014b48f125dc28b" source = "git+https://github.com/timokoesters/heed.git?rev=c6b149fd5621999b0d5ef0c28e199015cfc60fa1#c6b149fd5621999b0d5ef0c28e199015cfc60fa1"
[[package]] [[package]]
name = "heed-types" name = "heed-types"
version = "0.7.2" version = "0.7.2"
source = "git+https://github.com/Kerollmops/heed.git?rev=b235e9c3e9984737c967b5de1014b48f125dc28b#b235e9c3e9984737c967b5de1014b48f125dc28b" source = "git+https://github.com/timokoesters/heed.git?rev=c6b149fd5621999b0d5ef0c28e199015cfc60fa1#c6b149fd5621999b0d5ef0c28e199015cfc60fa1"
dependencies = [ dependencies = [
"bincode", "bincode",
"bytemuck", "bytemuck",
@ -2586,9 +2586,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_json" name = "serde_json"
version = "1.0.65" version = "1.0.66"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28c5e91e4240b46c4c19219d6cc84784444326131a4210f496f948d5cc827a29" checksum = "336b10da19a12ad094b59d870ebde26a45402e5b470add4b5fd03c5048a32127"
dependencies = [ dependencies = [
"itoa", "itoa",
"ryu", "ryu",

@ -79,10 +79,10 @@ parking_lot = { version = "0.11.1", optional = true }
crossbeam = { version = "0.8.1", optional = true } crossbeam = { version = "0.8.1", optional = true }
num_cpus = "1.13.0" num_cpus = "1.13.0"
threadpool = "1.8.1" threadpool = "1.8.1"
heed = { git = "https://github.com/Kerollmops/heed.git", rev = "b235e9c3e9984737c967b5de1014b48f125dc28b", optional = true } heed = { git = "https://github.com/timokoesters/heed.git", rev = "f6f825da7fb2c758867e05ad973ef800a6fe1d5d", optional = true }
[features] [features]
default = ["conduit_bin", "backend_heed"] default = ["conduit_bin", "backend_sqlite"]
backend_sled = ["sled"] backend_sled = ["sled"]
backend_rocksdb = ["rocksdb"] backend_rocksdb = ["rocksdb"]
backend_sqlite = ["sqlite"] backend_sqlite = ["sqlite"]

@ -15,7 +15,7 @@ use ruma::{
RoomAliasId, RoomId, RoomVersionId, RoomAliasId, RoomId, RoomVersionId,
}; };
use std::{cmp::max, collections::BTreeMap, convert::TryFrom, sync::Arc}; use std::{cmp::max, collections::BTreeMap, convert::TryFrom, sync::Arc};
use tracing::info; use tracing::{info, warn};
#[cfg(feature = "conduit_bin")] #[cfg(feature = "conduit_bin")]
use rocket::{get, post}; use rocket::{get, post};
@ -233,7 +233,8 @@ pub async fn create_room_route(
// 5. Events listed in initial_state // 5. Events listed in initial_state
for event in &body.initial_state { for event in &body.initial_state {
let pdu_builder = PduBuilder::from(event.deserialize().map_err(|_| { let pdu_builder = PduBuilder::from(event.deserialize().map_err(|e| {
warn!("Invalid initial state event: {:?}", e);
Error::BadRequest(ErrorKind::InvalidParam, "Invalid initial state event.") Error::BadRequest(ErrorKind::InvalidParam, "Invalid initial state event.")
})?); })?);

@ -189,24 +189,28 @@ impl Database {
} }
fn check_sled_or_sqlite_db(config: &Config) -> Result<()> { fn check_sled_or_sqlite_db(config: &Config) -> Result<()> {
#[cfg(feature = "backend_sqlite")]
{
let path = Path::new(&config.database_path); let path = Path::new(&config.database_path);
let sled_exists = path.join("db").exists(); let sled_exists = path.join("db").exists();
let sqlite_exists = path.join("conduit.db").exists(); let sqlite_exists = path.join("conduit.db").exists();
// TODO: heed
if sled_exists { if sled_exists {
if sqlite_exists { if sqlite_exists {
// most likely an in-place directory, only warn // most likely an in-place directory, only warn
warn!("Both sled and sqlite databases are detected in database directory"); warn!("Both sled and sqlite databases are detected in database directory");
warn!("Currently running from the sqlite database, but consider removing sled database files to free up space") warn!("Currently running from the sqlite database, but consider removing sled database files to free up space")
} else { } else {
error!("Sled database detected, conduit now uses sqlite for database operations"); error!(
"Sled database detected, conduit now uses sqlite for database operations"
);
error!("This database must be converted to sqlite, go to https://github.com/ShadowJonathan/conduit_toolbox#conduit_sled_to_sqlite"); error!("This database must be converted to sqlite, go to https://github.com/ShadowJonathan/conduit_toolbox#conduit_sled_to_sqlite");
return Err(Error::bad_config( return Err(Error::bad_config(
"sled database detected, migrate to sqlite", "sled database detected, migrate to sqlite",
)); ));
} }
} }
}
Ok(()) Ok(())
} }
@ -298,6 +302,7 @@ impl Database {
}, },
account_data: account_data::AccountData { account_data: account_data::AccountData {
roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?, roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?,
roomusertype_roomuserdataid: builder.open_tree("roomusertype_roomuserdataid")?,
}, },
media: media::Media { media: media::Media {
mediaid_file: builder.open_tree("mediaid_file")?, mediaid_file: builder.open_tree("mediaid_file")?,
@ -420,6 +425,30 @@ impl Database {
println!("Migration: 3 -> 4 finished"); println!("Migration: 3 -> 4 finished");
} }
if db.globals.database_version()? < 5 {
// Upgrade user data store
for (roomuserdataid, _) in db.account_data.roomuserdataid_accountdata.iter() {
let mut parts = roomuserdataid.split(|&b| b == 0xff);
let user_id = parts.next().unwrap();
let room_id = parts.next().unwrap();
let event_type = roomuserdataid.rsplit(|&b| b == 0xff).next().unwrap();
let mut key = room_id.to_vec();
key.push(0xff);
key.extend_from_slice(user_id);
key.push(0xff);
key.extend_from_slice(event_type);
db.account_data
.roomusertype_roomuserdataid
.insert(&key, &roomuserdataid)?;
}
db.globals.bump_database_version(5)?;
println!("Migration: 4 -> 5 finished");
}
} }
let guard = db.read().await; let guard = db.read().await;
@ -516,7 +545,7 @@ impl Database {
futures.push( futures.push(
self.account_data self.account_data
.roomuserdataid_accountdata .roomusertype_roomuserdataid
.watch_prefix(&roomuser_prefix), .watch_prefix(&roomuser_prefix),
); );
} }
@ -526,7 +555,7 @@ impl Database {
futures.push( futures.push(
self.account_data self.account_data
.roomuserdataid_accountdata .roomusertype_roomuserdataid
.watch_prefix(&globaluserdata_prefix), .watch_prefix(&globaluserdata_prefix),
); );

@ -27,7 +27,6 @@ pub struct EngineTree {
} }
fn convert_error(error: heed::Error) -> Error { fn convert_error(error: heed::Error) -> Error {
panic!(error.to_string());
Error::HeedError { Error::HeedError {
error: error.to_string(), error: error.to_string(),
} }
@ -40,8 +39,8 @@ impl DatabaseEngine for Engine {
env_builder.max_readers(126); env_builder.max_readers(126);
env_builder.max_dbs(128); env_builder.max_dbs(128);
unsafe { unsafe {
env_builder.flag(heed::flags::Flags::MdbNoSync); env_builder.flag(heed::flags::Flags::MdbWriteMap);
env_builder.flag(heed::flags::Flags::MdbNoMetaSync); env_builder.flag(heed::flags::Flags::MdbMapAsync);
} }
Ok(Arc::new(Engine { Ok(Arc::new(Engine {
@ -79,7 +78,7 @@ impl EngineTree {
from: Vec<u8>, from: Vec<u8>,
backwards: bool, backwards: bool,
) -> Box<dyn Iterator<Item = TupleOfBytes> + Send + Sync> { ) -> Box<dyn Iterator<Item = TupleOfBytes> + Send + Sync> {
let (s, r) = bounded::<TupleOfBytes>(5); let (s, r) = bounded::<TupleOfBytes>(100);
let engine = Arc::clone(&self.engine); let engine = Arc::clone(&self.engine);
let lock = self.engine.iter_pool.lock().unwrap(); let lock = self.engine.iter_pool.lock().unwrap();

@ -12,6 +12,7 @@ use super::abstraction::Tree;
pub struct AccountData { pub struct AccountData {
pub(super) roomuserdataid_accountdata: Arc<dyn Tree>, // RoomUserDataId = Room + User + Count + Type pub(super) roomuserdataid_accountdata: Arc<dyn Tree>, // RoomUserDataId = Room + User + Count + Type
pub(super) roomusertype_roomuserdataid: Arc<dyn Tree>, // RoomUserType = Room + User + Type
} }
impl AccountData { impl AccountData {
@ -34,15 +35,13 @@ impl AccountData {
prefix.extend_from_slice(&user_id.as_bytes()); prefix.extend_from_slice(&user_id.as_bytes());
prefix.push(0xff); prefix.push(0xff);
// Remove old entry let mut roomuserdataid = prefix.clone();
if let Some((old_key, _)) = self.find_event(room_id, user_id, &event_type)? { roomuserdataid.extend_from_slice(&globals.next_count()?.to_be_bytes());
self.roomuserdataid_accountdata.remove(&old_key)?; roomuserdataid.push(0xff);
} roomuserdataid.extend_from_slice(&event_type.as_bytes());
let mut key = prefix; let mut key = prefix.clone();
key.extend_from_slice(&globals.next_count()?.to_be_bytes()); key.extend_from_slice(event_type.as_bytes());
key.push(0xff);
key.extend_from_slice(event_type.as_ref().as_bytes());
let json = serde_json::to_value(data).expect("all types here can be serialized"); // TODO: maybe add error handling let json = serde_json::to_value(data).expect("all types here can be serialized"); // TODO: maybe add error handling
if json.get("type").is_none() || json.get("content").is_none() { if json.get("type").is_none() || json.get("content").is_none() {
@ -53,10 +52,20 @@ impl AccountData {
} }
self.roomuserdataid_accountdata.insert( self.roomuserdataid_accountdata.insert(
&key, &roomuserdataid,
&serde_json::to_vec(&json).expect("to_vec always works on json values"), &serde_json::to_vec(&json).expect("to_vec always works on json values"),
)?; )?;
let prev = self.roomusertype_roomuserdataid.get(&key)?;
self.roomusertype_roomuserdataid
.insert(&key, &roomuserdataid)?;
// Remove old entry
if let Some(prev) = prev {
self.roomuserdataid_accountdata.remove(&prev)?;
}
Ok(()) Ok(())
} }
@ -68,9 +77,27 @@ impl AccountData {
user_id: &UserId, user_id: &UserId,
kind: EventType, kind: EventType,
) -> Result<Option<T>> { ) -> Result<Option<T>> {
self.find_event(room_id, user_id, &kind)? let mut key = room_id
.map(|(_, v)| { .map(|r| r.to_string())
serde_json::from_slice(&v).map_err(|_| Error::bad_database("could not deserialize")) .unwrap_or_default()
.as_bytes()
.to_vec();
key.push(0xff);
key.extend_from_slice(&user_id.as_bytes());
key.push(0xff);
key.extend_from_slice(kind.as_ref().as_bytes());
self.roomusertype_roomuserdataid
.get(&key)?
.and_then(|roomuserdataid| {
self.roomuserdataid_accountdata
.get(&roomuserdataid)
.transpose()
})
.transpose()?
.map(|data| {
serde_json::from_slice(&data)
.map_err(|_| Error::bad_database("could not deserialize"))
}) })
.transpose() .transpose()
} }
@ -123,37 +150,4 @@ impl AccountData {
Ok(userdata) Ok(userdata)
} }
#[tracing::instrument(skip(self, room_id, user_id, kind))]
fn find_event(
&self,
room_id: Option<&RoomId>,
user_id: &UserId,
kind: &EventType,
) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
let mut prefix = room_id
.map(|r| r.to_string())
.unwrap_or_default()
.as_bytes()
.to_vec();
prefix.push(0xff);
prefix.extend_from_slice(&user_id.as_bytes());
prefix.push(0xff);
let mut last_possible_key = prefix.clone();
last_possible_key.extend_from_slice(&u64::MAX.to_be_bytes());
let kind = kind.clone();
Ok(self
.roomuserdataid_accountdata
.iter_from(&last_possible_key, true)
.take_while(move |(k, _)| k.starts_with(&prefix))
.find(move |(k, _)| {
k.rsplit(|&b| b == 0xff)
.next()
.map(|current_event_type| current_event_type == kind.as_ref().as_bytes())
.unwrap_or(false)
}))
}
} }

@ -9,10 +9,11 @@ use ruma::{
}, },
IncomingResponse, OutgoingRequest, SendAccessToken, IncomingResponse, OutgoingRequest, SendAccessToken,
}, },
events::{room::power_levels::PowerLevelsEventContent, EventType}, events::{room::power_levels::PowerLevelsEventContent, AnySyncRoomEvent, EventType},
identifiers::RoomName, identifiers::RoomName,
push::{Action, PushConditionRoomCtx, PushFormat, Ruleset, Tweak}, push::{Action, PushConditionRoomCtx, PushFormat, Ruleset, Tweak},
uint, UInt, UserId, serde::Raw,
uint, RoomId, UInt, UserId,
}; };
use tracing::{error, info, warn}; use tracing::{error, info, warn};
@ -172,7 +173,24 @@ pub async fn send_push_notice(
let mut notify = None; let mut notify = None;
let mut tweaks = Vec::new(); let mut tweaks = Vec::new();
for action in get_actions(user, &ruleset, pdu, db)? { let power_levels: PowerLevelsEventContent = db
.rooms
.room_state_get(&pdu.room_id, &EventType::RoomPowerLevels, "")?
.map(|ev| {
serde_json::from_value(ev.content.clone())
.map_err(|_| Error::bad_database("invalid m.room.power_levels event"))
})
.transpose()?
.unwrap_or_default();
for action in get_actions(
user,
&ruleset,
&power_levels,
&pdu.to_sync_room_event(),
&pdu.room_id,
db,
)? {
let n = match action { let n = match action {
Action::DontNotify => false, Action::DontNotify => false,
// TODO: Implement proper support for coalesce // TODO: Implement proper support for coalesce
@ -204,32 +222,24 @@ pub async fn send_push_notice(
pub fn get_actions<'a>( pub fn get_actions<'a>(
user: &UserId, user: &UserId,
ruleset: &'a Ruleset, ruleset: &'a Ruleset,
pdu: &PduEvent, power_levels: &PowerLevelsEventContent,
pdu: &Raw<AnySyncRoomEvent>,
room_id: &RoomId,
db: &Database, db: &Database,
) -> Result<&'a [Action]> { ) -> Result<&'a [Action]> {
let power_levels: PowerLevelsEventContent = db
.rooms
.room_state_get(&pdu.room_id, &EventType::RoomPowerLevels, "")?
.map(|ev| {
serde_json::from_value(ev.content.clone())
.map_err(|_| Error::bad_database("invalid m.room.power_levels event"))
})
.transpose()?
.unwrap_or_default();
let ctx = PushConditionRoomCtx { let ctx = PushConditionRoomCtx {
room_id: pdu.room_id.clone(), room_id: room_id.clone(),
member_count: 10_u32.into(), // TODO: get member count efficiently member_count: 10_u32.into(), // TODO: get member count efficiently
user_display_name: db user_display_name: db
.users .users
.displayname(&user)? .displayname(&user)?
.unwrap_or_else(|| user.localpart().to_owned()), .unwrap_or_else(|| user.localpart().to_owned()),
users_power_levels: power_levels.users, users_power_levels: power_levels.users.clone(),
default_power_level: power_levels.users_default, default_power_level: power_levels.users_default,
notification_power_levels: power_levels.notifications, notification_power_levels: power_levels.notifications.clone(),
}; };
Ok(ruleset.get_actions(&pdu.to_sync_room_event(), &ctx)) Ok(ruleset.get_actions(pdu, &ctx))
} }
#[tracing::instrument(skip(unread, pusher, tweaks, event, db))] #[tracing::instrument(skip(unread, pusher, tweaks, event, db))]

@ -12,7 +12,9 @@ use ruma::{
api::{client::error::ErrorKind, federation}, api::{client::error::ErrorKind, federation},
events::{ events::{
ignored_user_list, push_rules, ignored_user_list, push_rules,
room::{create::CreateEventContent, member, message}, room::{
create::CreateEventContent, member, message, power_levels::PowerLevelsEventContent,
},
AnyStrippedStateEvent, AnySyncStateEvent, EventType, AnyStrippedStateEvent, AnySyncStateEvent, EventType,
}, },
push::{self, Action, Tweak}, push::{self, Action, Tweak},
@ -760,6 +762,18 @@ impl Rooms {
.insert(pdu.event_id.as_bytes(), &pdu_id)?; .insert(pdu.event_id.as_bytes(), &pdu_id)?;
// See if the event matches any known pushers // See if the event matches any known pushers
let power_levels: PowerLevelsEventContent = db
.rooms
.room_state_get(&pdu.room_id, &EventType::RoomPowerLevels, "")?
.map(|ev| {
serde_json::from_value(ev.content.clone())
.map_err(|_| Error::bad_database("invalid m.room.power_levels event"))
})
.transpose()?
.unwrap_or_default();
let sync_pdu = pdu.to_sync_room_event();
for user in db for user in db
.rooms .rooms
.room_members(&pdu.room_id) .room_members(&pdu.room_id)
@ -781,7 +795,14 @@ impl Rooms {
let mut highlight = false; let mut highlight = false;
let mut notify = false; let mut notify = false;
for action in pusher::get_actions(&user, &rules_for_user, pdu, db)? { for action in pusher::get_actions(
&user,
&rules_for_user,
&power_levels,
&sync_pdu,
&pdu.room_id,
db,
)? {
match action { match action {
Action::DontNotify => notify = false, Action::DontNotify => notify = false,
// TODO: Implement proper support for coalesce // TODO: Implement proper support for coalesce

Loading…
Cancel
Save