fix: make appservices more efficient

merge-requests/172/head
Timo Kösters 3 years ago
parent 1601027605
commit 632a1343eb
No known key found for this signature in database
GPG Key ID: 24DA7517711A2BA4

@ -246,12 +246,6 @@ async fn sync_helper(
.current_shortstatehash(&room_id)? .current_shortstatehash(&room_id)?
.expect("All rooms have state"); .expect("All rooms have state");
let pdus_after_since = db
.rooms
.pdus_after(&sender_user, &room_id, since)?
.next()
.is_some();
let since_shortstatehash = db.rooms.get_token_shortstatehash(&room_id, since)?; let since_shortstatehash = db.rooms.get_token_shortstatehash(&room_id, since)?;
// Calculates joined_member_count, invited_member_count and heroes // Calculates joined_member_count, invited_member_count and heroes
@ -341,7 +335,7 @@ async fn sync_helper(
true, true,
state_events, state_events,
) )
} else if !pdus_after_since && since_shortstatehash == Some(current_shortstatehash) { } else if timeline_pdus.len() == 0 && since_shortstatehash == Some(current_shortstatehash) {
// No state changes // No state changes
(Vec::new(), None, None, false, Vec::new()) (Vec::new(), None, None, false, Vec::new())
} else { } else {
@ -401,10 +395,7 @@ async fn sync_helper(
let send_member_count = state_events let send_member_count = state_events
.iter() .iter()
.any(|event| event.kind == EventType::RoomMember) .any(|event| event.kind == EventType::RoomMember);
|| timeline_pdus.iter().any(|(_, event)| {
event.state_key.is_some() && event.kind == EventType::RoomMember
});
if encrypted_room { if encrypted_room {
for state_event in &state_events { for state_event in &state_events {

@ -287,8 +287,9 @@ impl Database {
eventidshort_cache: Mutex::new(LruCache::new(1_000_000)), eventidshort_cache: Mutex::new(LruCache::new(1_000_000)),
shortstatekey_cache: Mutex::new(LruCache::new(1_000_000)), shortstatekey_cache: Mutex::new(LruCache::new(1_000_000)),
statekeyshort_cache: Mutex::new(LruCache::new(1_000_000)), statekeyshort_cache: Mutex::new(LruCache::new(1_000_000)),
stateinfo_cache: Mutex::new(LruCache::new(1000)),
our_real_users_cache: RwLock::new(HashMap::new()), our_real_users_cache: RwLock::new(HashMap::new()),
appservice_in_room_cache: RwLock::new(HashMap::new()),
stateinfo_cache: Mutex::new(LruCache::new(1000)),
}, },
account_data: account_data::AccountData { account_data: account_data::AccountData {
roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?, roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?,

@ -148,6 +148,7 @@ type TupleOfBytes = (Vec<u8>, Vec<u8>);
impl SqliteTable { impl SqliteTable {
#[tracing::instrument(skip(self, guard, key))] #[tracing::instrument(skip(self, guard, key))]
fn get_with_guard(&self, guard: &Connection, key: &[u8]) -> Result<Option<Vec<u8>>> { fn get_with_guard(&self, guard: &Connection, key: &[u8]) -> Result<Option<Vec<u8>>> {
//dbg!(&self.name);
Ok(guard Ok(guard
.prepare(format!("SELECT value FROM {} WHERE key = ?", self.name).as_str())? .prepare(format!("SELECT value FROM {} WHERE key = ?", self.name).as_str())?
.query_row([key], |row| row.get(0)) .query_row([key], |row| row.get(0))
@ -156,6 +157,7 @@ impl SqliteTable {
#[tracing::instrument(skip(self, guard, key, value))] #[tracing::instrument(skip(self, guard, key, value))]
fn insert_with_guard(&self, guard: &Connection, key: &[u8], value: &[u8]) -> Result<()> { fn insert_with_guard(&self, guard: &Connection, key: &[u8], value: &[u8]) -> Result<()> {
//dbg!(&self.name);
guard.execute( guard.execute(
format!( format!(
"INSERT OR REPLACE INTO {} (key, value) VALUES (?, ?)", "INSERT OR REPLACE INTO {} (key, value) VALUES (?, ?)",
@ -182,11 +184,16 @@ impl SqliteTable {
let statement_ref = NonAliasingBox(statement); let statement_ref = NonAliasingBox(statement);
//let name = self.name.clone();
let iterator = Box::new( let iterator = Box::new(
statement statement
.query_map([], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) .query_map([], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
.unwrap() .unwrap()
.map(|r| r.unwrap()), .map(move |r| {
//dbg!(&name);
r.unwrap()
}),
); );
Box::new(PreparedStatementIterator { Box::new(PreparedStatementIterator {
@ -294,6 +301,8 @@ impl Tree for SqliteTable {
let guard = self.engine.read_lock_iterator(); let guard = self.engine.read_lock_iterator();
let from = from.to_vec(); // TODO change interface? let from = from.to_vec(); // TODO change interface?
//let name = self.name.clone();
if backwards { if backwards {
let statement = Box::leak(Box::new( let statement = Box::leak(Box::new(
guard guard
@ -310,7 +319,10 @@ impl Tree for SqliteTable {
statement statement
.query_map([from], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) .query_map([from], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
.unwrap() .unwrap()
.map(|r| r.unwrap()), .map(move |r| {
//dbg!(&name);
r.unwrap()
}),
); );
Box::new(PreparedStatementIterator { Box::new(PreparedStatementIterator {
iterator, iterator,
@ -332,7 +344,10 @@ impl Tree for SqliteTable {
statement statement
.query_map([from], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) .query_map([from], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
.unwrap() .unwrap()
.map(|r| r.unwrap()), .map(move |r| {
//dbg!(&name);
r.unwrap()
}),
); );
Box::new(PreparedStatementIterator { Box::new(PreparedStatementIterator {

@ -103,6 +103,7 @@ pub struct Rooms {
pub(super) statekeyshort_cache: Mutex<LruCache<(EventType, String), u64>>, pub(super) statekeyshort_cache: Mutex<LruCache<(EventType, String), u64>>,
pub(super) shortstatekey_cache: Mutex<LruCache<u64, (EventType, String)>>, pub(super) shortstatekey_cache: Mutex<LruCache<u64, (EventType, String)>>,
pub(super) our_real_users_cache: RwLock<HashMap<RoomId, Arc<HashSet<UserId>>>>, pub(super) our_real_users_cache: RwLock<HashMap<RoomId, Arc<HashSet<UserId>>>>,
pub(super) appservice_in_room_cache: RwLock<HashMap<RoomId, HashMap<String, bool>>>,
pub(super) stateinfo_cache: Mutex< pub(super) stateinfo_cache: Mutex<
LruCache< LruCache<
u64, u64,
@ -2110,6 +2111,11 @@ impl Rooms {
} }
for appservice in db.appservice.all()? { for appservice in db.appservice.all()? {
if self.appservice_in_room(room_id, &appservice, db)? {
db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?;
continue;
}
if let Some(namespaces) = appservice.1.get("namespaces") { if let Some(namespaces) = appservice.1.get("namespaces") {
let users = namespaces let users = namespaces
.get("users") .get("users")
@ -2133,17 +2139,6 @@ impl Rooms {
.get("rooms") .get("rooms")
.and_then(|rooms| rooms.as_sequence()); .and_then(|rooms| rooms.as_sequence());
let bridge_user_id = appservice
.1
.get("sender_localpart")
.and_then(|string| string.as_str())
.and_then(|string| {
UserId::parse_with_server_name(string, db.globals.server_name()).ok()
});
let user_is_joined =
|bridge_user_id| self.is_joined(&bridge_user_id, room_id).unwrap_or(false);
let matching_users = |users: &Regex| { let matching_users = |users: &Regex| {
users.is_match(pdu.sender.as_str()) users.is_match(pdu.sender.as_str())
|| pdu.kind == EventType::RoomMember || pdu.kind == EventType::RoomMember
@ -2151,9 +2146,6 @@ impl Rooms {
.state_key .state_key
.as_ref() .as_ref()
.map_or(false, |state_key| users.is_match(&state_key)) .map_or(false, |state_key| users.is_match(&state_key))
|| self.room_members(&room_id).any(|userid| {
userid.map_or(false, |userid| users.is_match(userid.as_str()))
})
}; };
let matching_aliases = |aliases: &Regex| { let matching_aliases = |aliases: &Regex| {
self.room_aliases(&room_id) self.room_aliases(&room_id)
@ -2161,8 +2153,7 @@ impl Rooms {
.any(|room_alias| aliases.is_match(room_alias.as_str())) .any(|room_alias| aliases.is_match(room_alias.as_str()))
}; };
if bridge_user_id.map_or(false, user_is_joined) if aliases.iter().any(matching_aliases)
|| aliases.iter().any(matching_aliases)
|| rooms.map_or(false, |rooms| rooms.contains(&room_id.as_str().into())) || rooms.map_or(false, |rooms| rooms.contains(&room_id.as_str().into()))
|| users.iter().any(matching_users) || users.iter().any(matching_users)
{ {
@ -2579,6 +2570,11 @@ impl Rooms {
self.serverroomids.insert(&serverroom_id, &[])?; self.serverroomids.insert(&serverroom_id, &[])?;
} }
self.appservice_in_room_cache
.write()
.unwrap()
.remove(room_id);
Ok(()) Ok(())
} }
@ -2608,6 +2604,65 @@ impl Rooms {
} }
} }
#[tracing::instrument(skip(self, room_id, appservice, db))]
pub fn appservice_in_room(
&self,
room_id: &RoomId,
appservice: &(String, serde_yaml::Value),
db: &Database,
) -> Result<bool> {
let maybe = self
.appservice_in_room_cache
.read()
.unwrap()
.get(room_id)
.and_then(|map| map.get(&appservice.0))
.copied();
if let Some(b) = maybe {
Ok(b)
} else {
if let Some(namespaces) = appservice.1.get("namespaces") {
let users = namespaces
.get("users")
.and_then(|users| users.as_sequence())
.map_or_else(Vec::new, |users| {
users
.iter()
.filter_map(|users| Regex::new(users.get("regex")?.as_str()?).ok())
.collect::<Vec<_>>()
});
let bridge_user_id = appservice
.1
.get("sender_localpart")
.and_then(|string| string.as_str())
.and_then(|string| {
UserId::parse_with_server_name(string, db.globals.server_name()).ok()
});
let in_room = bridge_user_id
.map_or(false, |id| self.is_joined(&id, room_id).unwrap_or(false))
|| self.room_members(&room_id).any(|userid| {
userid.map_or(false, |userid| {
users.iter().any(|r| r.is_match(userid.as_str()))
})
});
self.appservice_in_room_cache
.write()
.unwrap()
.entry(room_id.clone())
.or_default()
.insert(appservice.0.clone(), in_room);
Ok(in_room)
} else {
Ok(false)
}
}
}
#[tracing::instrument(skip(self, db))] #[tracing::instrument(skip(self, db))]
pub async fn leave_room( pub async fn leave_room(
&self, &self,

@ -48,7 +48,7 @@ use ruma::{
state_res::{self, RoomVersion, StateMap}, state_res::{self, RoomVersion, StateMap},
to_device::DeviceIdOrAllDevices, to_device::DeviceIdOrAllDevices,
uint, EventId, MilliSecondsSinceUnixEpoch, RoomId, RoomVersionId, ServerName, uint, EventId, MilliSecondsSinceUnixEpoch, RoomId, RoomVersionId, ServerName,
ServerSigningKeyId, UserId, ServerSigningKeyId,
}; };
use std::{ use std::{
collections::{btree_map, hash_map, BTreeMap, BTreeSet, HashMap, HashSet}, collections::{btree_map, hash_map, BTreeMap, BTreeSet, HashMap, HashSet},
@ -2017,6 +2017,11 @@ fn append_incoming_pdu(
)?; )?;
for appservice in db.appservice.all()? { for appservice in db.appservice.all()? {
if db.rooms.appservice_in_room(&pdu.room_id, &appservice, db)? {
db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?;
continue;
}
if let Some(namespaces) = appservice.1.get("namespaces") { if let Some(namespaces) = appservice.1.get("namespaces") {
let users = namespaces let users = namespaces
.get("users") .get("users")
@ -2029,45 +2034,35 @@ fn append_incoming_pdu(
}); });
let aliases = namespaces let aliases = namespaces
.get("aliases") .get("aliases")
.and_then(|users| users.get("regex")) .and_then(|aliases| aliases.as_sequence())
.and_then(|regex| regex.as_str()) .map_or_else(Vec::new, |aliases| {
.and_then(|regex| Regex::new(regex).ok()); aliases
.iter()
.filter_map(|aliases| Regex::new(aliases.get("regex")?.as_str()?).ok())
.collect::<Vec<_>>()
});
let rooms = namespaces let rooms = namespaces
.get("rooms") .get("rooms")
.and_then(|rooms| rooms.as_sequence()); .and_then(|rooms| rooms.as_sequence());
let room_aliases = db.rooms.room_aliases(&pdu.room_id); let matching_users = |users: &Regex| {
let bridge_user_id = appservice
.1
.get("sender_localpart")
.and_then(|string| string.as_str())
.and_then(|string| {
UserId::parse_with_server_name(string, db.globals.server_name()).ok()
});
#[allow(clippy::blocks_in_if_conditions)]
if bridge_user_id.map_or(false, |bridge_user_id| {
db.rooms
.is_joined(&bridge_user_id, &pdu.room_id)
.unwrap_or(false)
}) || users.iter().any(|users| {
users.is_match(pdu.sender.as_str()) users.is_match(pdu.sender.as_str())
|| pdu.kind == EventType::RoomMember || pdu.kind == EventType::RoomMember
&& pdu && pdu
.state_key .state_key
.as_ref() .as_ref()
.map_or(false, |state_key| users.is_match(&state_key)) .map_or(false, |state_key| users.is_match(&state_key))
}) || aliases.map_or(false, |aliases| { };
room_aliases let matching_aliases = |aliases: &Regex| {
db.rooms
.room_aliases(&pdu.room_id)
.filter_map(|r| r.ok()) .filter_map(|r| r.ok())
.any(|room_alias| aliases.is_match(room_alias.as_str())) .any(|room_alias| aliases.is_match(room_alias.as_str()))
}) || rooms.map_or(false, |rooms| rooms.contains(&pdu.room_id.as_str().into())) };
|| db
.rooms if aliases.iter().any(matching_aliases)
.room_members(&pdu.room_id) || rooms.map_or(false, |rooms| rooms.contains(&pdu.room_id.as_str().into()))
.filter_map(|r| r.ok()) || users.iter().any(matching_users)
.any(|member| users.iter().any(|regex| regex.is_match(member.as_str())))
{ {
db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?; db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?;
} }

Loading…
Cancel
Save