diff --git a/src/client_server/account.rs b/src/client_server/account.rs index fad59c37..81119ba3 100644 --- a/src/client_server/account.rs +++ b/src/client_server/account.rs @@ -241,6 +241,7 @@ pub async fn register_route( &room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; @@ -264,6 +265,7 @@ pub async fn register_route( &room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; @@ -300,6 +302,7 @@ pub async fn register_route( &room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; @@ -319,6 +322,7 @@ pub async fn register_route( &room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; @@ -340,6 +344,7 @@ pub async fn register_route( &room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; @@ -359,6 +364,7 @@ pub async fn register_route( &room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; @@ -380,6 +386,7 @@ pub async fn register_route( &room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; @@ -398,6 +405,7 @@ pub async fn register_route( &room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; @@ -422,6 +430,7 @@ pub async fn register_route( &room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; @@ -447,6 +456,7 @@ pub async fn register_route( &room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; db.rooms.build_and_append_pdu( @@ -468,6 +478,7 @@ pub async fn register_route( &room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; @@ -494,6 +505,7 @@ pub async fn register_route( &room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; } @@ -666,6 +678,7 @@ pub async fn deactivate_route( &room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; } diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index d79079db..25cad85c 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -124,6 +124,7 @@ pub async fn leave_room_route( &body.room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; @@ -162,6 +163,7 @@ pub async fn invite_user_route( &body.room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; @@ -216,6 +218,7 @@ pub async fn kick_user_route( &body.room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; @@ -274,6 +277,7 @@ pub async fn ban_user_route( &body.room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; @@ -324,6 +328,7 @@ pub async fn unban_user_route( &body.room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; @@ -673,7 +678,7 @@ async fn join_room_by_id_helper( pdu_id.clone().into(), &db.globals, &db.account_data, - &db.sending, + &db.admin, )?; if state_events.contains(ev_id) { @@ -703,6 +708,7 @@ async fn join_room_by_id_helper( &room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; } diff --git a/src/client_server/message.rs b/src/client_server/message.rs index f9c8ba10..327b9ab2 100644 --- a/src/client_server/message.rs +++ b/src/client_server/message.rs @@ -67,6 +67,7 @@ pub async fn send_message_event_route( &body.room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; diff --git a/src/client_server/profile.rs b/src/client_server/profile.rs index 3fa1da65..22d13cbd 100644 --- a/src/client_server/profile.rs +++ b/src/client_server/profile.rs @@ -65,6 +65,7 @@ pub async fn set_displayname_route( &room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; @@ -160,6 +161,7 @@ pub async fn set_avatar_url_route( &room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; diff --git a/src/client_server/redact.rs b/src/client_server/redact.rs index 486eb6c8..6f7728a3 100644 --- a/src/client_server/redact.rs +++ b/src/client_server/redact.rs @@ -33,6 +33,7 @@ pub async fn redact_event_route( &body.room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; diff --git a/src/client_server/room.rs b/src/client_server/room.rs index eeab68b2..fdc9529a 100644 --- a/src/client_server/room.rs +++ b/src/client_server/room.rs @@ -65,6 +65,7 @@ pub async fn create_room_route( &room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; @@ -88,6 +89,7 @@ pub async fn create_room_route( &room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; @@ -131,6 +133,7 @@ pub async fn create_room_route( &room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; @@ -165,6 +168,7 @@ pub async fn create_room_route( &room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; @@ -184,6 +188,7 @@ pub async fn create_room_route( &room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; @@ -211,6 +216,7 @@ pub async fn create_room_route( &room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; @@ -232,6 +238,7 @@ pub async fn create_room_route( &room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; } @@ -255,6 +262,7 @@ pub async fn create_room_route( &room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; } @@ -275,6 +283,7 @@ pub async fn create_room_route( &room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; } @@ -300,6 +309,7 @@ pub async fn create_room_route( &room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; } @@ -387,6 +397,7 @@ pub async fn upgrade_room_route( &body.room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; @@ -429,6 +440,7 @@ pub async fn upgrade_room_route( &replacement_room, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; @@ -452,6 +464,7 @@ pub async fn upgrade_room_route( &replacement_room, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; @@ -487,6 +500,7 @@ pub async fn upgrade_room_route( &replacement_room, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; } @@ -532,6 +546,7 @@ pub async fn upgrade_room_route( &body.room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; diff --git a/src/client_server/state.rs b/src/client_server/state.rs index dbc7fdd4..ca6bdf7e 100644 --- a/src/client_server/state.rs +++ b/src/client_server/state.rs @@ -284,6 +284,7 @@ pub async fn send_state_event_for_key_helper( &room_id, &db.globals, &db.sending, + &db.admin, &db.account_data, )?; diff --git a/src/database.rs b/src/database.rs index 3b0bd6fa..51c3895a 100644 --- a/src/database.rs +++ b/src/database.rs @@ -1,4 +1,5 @@ pub mod account_data; +pub mod admin; pub mod globals; pub mod key_backups; pub mod media; @@ -12,10 +13,14 @@ use crate::{Error, Result}; use directories::ProjectDirs; use futures::StreamExt; use log::info; -use rocket::{futures, Config}; +use rocket::{ + futures::{self, channel::mpsc}, + Config, +}; use ruma::{DeviceId, UserId}; use std::{convert::TryFrom, fs::remove_dir_all}; +#[derive(Clone)] pub struct Database { pub globals: globals::Globals, pub users: users::Users, @@ -26,6 +31,7 @@ pub struct Database { pub key_backups: key_backups::KeyBackups, pub transaction_ids: transaction_ids::TransactionIds, pub sending: sending::Sending, + pub admin: admin::Admin, pub _db: sled::Db, } @@ -80,7 +86,9 @@ impl Database { info!("Opened sled database at {}", path); - Ok(Self { + let (admin_sender, admin_receiver) = mpsc::unbounded(); + + let db = Self { globals: globals::Globals::load(db.open_tree("global")?, config)?, users: users::Users { userid_password: db.open_tree("userid_password")?, @@ -152,8 +160,15 @@ impl Database { servernamepduids: db.open_tree("servernamepduids")?, servercurrentpdus: db.open_tree("servercurrentpdus")?, }, + admin: admin::Admin { + sender: admin_sender, + }, _db: db, - }) + }; + + db.admin.start_handler(db.clone(), admin_receiver); + + Ok(db) } pub async fn watch(&self, user_id: &UserId, device_id: &DeviceId) { diff --git a/src/database/account_data.rs b/src/database/account_data.rs index a9171235..9a6a050f 100644 --- a/src/database/account_data.rs +++ b/src/database/account_data.rs @@ -8,6 +8,7 @@ use serde::{de::DeserializeOwned, Serialize}; use sled::IVec; use std::{collections::HashMap, convert::TryFrom}; +#[derive(Clone)] pub struct AccountData { pub(super) roomuserdataid_accountdata: sled::Tree, // RoomUserDataId = Room + User + Count + Type } diff --git a/src/database/admin.rs b/src/database/admin.rs new file mode 100644 index 00000000..f8b23855 --- /dev/null +++ b/src/database/admin.rs @@ -0,0 +1,74 @@ +use std::convert::{TryFrom, TryInto}; + +use crate::{pdu::PduBuilder, Error}; +use rocket::futures::{channel::mpsc, stream::StreamExt}; +use ruma::{events::room::message, events::EventType, UserId}; +use tokio::select; + +pub enum AdminCommand { + SendTextMessage(message::TextMessageEventContent), +} + +#[derive(Clone)] +pub struct Admin { + pub sender: mpsc::UnboundedSender, +} + +impl Admin { + pub fn start_handler( + &self, + db: super::Database, + mut receiver: mpsc::UnboundedReceiver, + ) { + tokio::spawn(async move { + // TODO: Use futures when we have long admin commands + //let mut futures = FuturesUnordered::new(); + + let conduit_user = UserId::try_from(format!("@conduit:{}", db.globals.server_name())) + .expect("@conduit:server_name is valid"); + + let conduit_room = db + .rooms + .id_from_alias( + &format!("#admins:{}", db.globals.server_name()) + .try_into() + .expect("#admins:server_name is a valid room alias"), + ) + .unwrap() + .ok_or_else(|| Error::BadConfig("Conduit instance does not have an #admins room.")) + .unwrap(); + + loop { + select! { + Some(event) = receiver.next() => { + match event { + AdminCommand::SendTextMessage(message) => { + println!("{:?}", message); + + db.rooms.build_and_append_pdu( + PduBuilder { + event_type: EventType::RoomMessage, + content: serde_json::to_value(message).expect("event is valid, we just created it"), + unsigned: None, + state_key: None, + redacts: None, + }, + &conduit_user, + &conduit_room, + &db.globals, + &db.sending, + &db.admin, + &db.account_data, + ).unwrap(); + } + } + } + } + } + }); + } + + pub fn send(&self, command: AdminCommand) { + self.sender.unbounded_send(command).unwrap() + } +} diff --git a/src/database/key_backups.rs b/src/database/key_backups.rs index 1ce75955..a50e45eb 100644 --- a/src/database/key_backups.rs +++ b/src/database/key_backups.rs @@ -8,6 +8,7 @@ use ruma::{ }; use std::{collections::BTreeMap, convert::TryFrom}; +#[derive(Clone)] pub struct KeyBackups { pub(super) backupid_algorithm: sled::Tree, // BackupId = UserId + Version(Count) pub(super) backupid_etag: sled::Tree, // BackupId = UserId + Version(Count) diff --git a/src/database/media.rs b/src/database/media.rs index 3ecf4bd9..8c59aa4d 100644 --- a/src/database/media.rs +++ b/src/database/media.rs @@ -9,6 +9,7 @@ pub struct FileMeta { pub file: Vec, } +#[derive(Clone)] pub struct Media { pub(super) mediaid_file: sled::Tree, // MediaId = MXC + WidthHeight + Filename + ContentType } diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 05abe03e..8ab900fd 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -27,6 +27,8 @@ use std::{ sync::Arc, }; +use super::admin::AdminCommand; + /// The unique identifier of each state group. /// /// This is created when a state group is added to the database by @@ -443,7 +445,7 @@ impl Rooms { pdu_id: IVec, globals: &super::globals::Globals, account_data: &super::account_data::AccountData, - sending: &super::sending::Sending, + admin: &super::admin::Admin, ) -> Result<()> { self.replace_pdu_leaves(&pdu.room_id, &pdu.event_id)?; @@ -514,28 +516,13 @@ impl Rooms { if let Some(command) = parts.next() { let args = parts.collect::>(); - self.build_and_append_pdu( - PduBuilder { - event_type: EventType::RoomMessage, - content: serde_json::to_value( - message::TextMessageEventContent { - body: format!("Command: {}, Args: {:?}", command, args), - formatted: None, - relates_to: None, - }, - ) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: None, - redacts: None, + admin.send(AdminCommand::SendTextMessage( + message::TextMessageEventContent { + body: format!("Command: {}, Args: {:?}", command, args), + formatted: None, + relates_to: None, }, - &UserId::try_from(format!("@conduit:{}", globals.server_name())) - .expect("@conduit:server_name is valid"), - &pdu.room_id, - &globals, - &sending, - &account_data, - )?; + )); } } } @@ -612,6 +599,7 @@ impl Rooms { room_id: &RoomId, globals: &super::globals::Globals, sending: &super::sending::Sending, + admin: &super::admin::Admin, account_data: &super::account_data::AccountData, ) -> Result { let PduBuilder { @@ -849,7 +837,7 @@ impl Rooms { pdu_id.clone().into(), globals, account_data, - sending, + admin, )?; for server in self diff --git a/src/database/sending.rs b/src/database/sending.rs index 597778f2..e3fca4f0 100644 --- a/src/database/sending.rs +++ b/src/database/sending.rs @@ -8,6 +8,7 @@ use ruma::{api::federation, ServerName}; use sled::IVec; use tokio::select; +#[derive(Clone)] pub struct Sending { /// The state for a given state hash. pub(super) servernamepduids: sled::Tree, // ServernamePduId = ServerName + PduId @@ -54,7 +55,7 @@ impl Sending { )) }) .filter_map(|r| r.ok()) - .filter(|pdu| !pdu.is_empty()) // Skip reservation key + .filter(|(_, pdu)| !pdu.is_empty()) // Skip reservation key .take(50) // This should not contain more than 50 anyway { diff --git a/src/database/transaction_ids.rs b/src/database/transaction_ids.rs index 9485b361..7c0eb98b 100644 --- a/src/database/transaction_ids.rs +++ b/src/database/transaction_ids.rs @@ -2,6 +2,7 @@ use crate::Result; use ruma::{DeviceId, UserId}; use sled::IVec; +#[derive(Clone)] pub struct TransactionIds { pub(super) userdevicetxnid_response: sled::Tree, // Response can be empty (/sendToDevice) or the event id (/send) } diff --git a/src/database/uiaa.rs b/src/database/uiaa.rs index e318f436..381a7016 100644 --- a/src/database/uiaa.rs +++ b/src/database/uiaa.rs @@ -7,6 +7,7 @@ use ruma::{ DeviceId, UserId, }; +#[derive(Clone)] pub struct Uiaa { pub(super) userdeviceid_uiaainfo: sled::Tree, // User-interactive authentication } diff --git a/src/database/users.rs b/src/database/users.rs index 0d35e362..2a039602 100644 --- a/src/database/users.rs +++ b/src/database/users.rs @@ -14,6 +14,7 @@ use ruma::{ }; use std::{collections::BTreeMap, convert::TryFrom, mem, time::SystemTime}; +#[derive(Clone)] pub struct Users { pub(super) userid_password: sled::Tree, pub(super) userid_displayname: sled::Tree, diff --git a/src/server_server.rs b/src/server_server.rs index ccb13994..0f24e153 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -404,7 +404,7 @@ pub fn send_transaction_message_route<'a>( pdu_id.clone().into(), &db.globals, &db.account_data, - &db.sending, + &db.admin, )?; } }