diff --git a/src/database/abstraction/rocksdb.rs b/src/database/abstraction/rocksdb.rs index 3ff6ab86..825c02e0 100644 --- a/src/database/abstraction/rocksdb.rs +++ b/src/database/abstraction/rocksdb.rs @@ -22,14 +22,20 @@ impl DatabaseEngine for Engine { fn open(config: &Config) -> Result> { let mut db_opts = rocksdb::Options::default(); db_opts.create_if_missing(true); - db_opts.set_max_open_files(16); + db_opts.set_max_open_files(512); db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level); - db_opts.set_compression_type(rocksdb::DBCompressionType::Snappy); - db_opts.set_target_file_size_base(256 << 20); - db_opts.set_write_buffer_size(256 << 20); + db_opts.set_compression_type(rocksdb::DBCompressionType::Zstd); + db_opts.set_target_file_size_base(2 << 22); + db_opts.set_max_bytes_for_level_base(2 << 24); + db_opts.set_max_bytes_for_level_multiplier(2.0); + db_opts.set_num_levels(8); + db_opts.set_write_buffer_size(2 << 27); + + let rocksdb_cache = rocksdb::Cache::new_lru_cache((config.db_cache_capacity_mb * 1024.0 * 1024.0) as usize).unwrap(); let mut block_based_options = rocksdb::BlockBasedOptions::default(); - block_based_options.set_block_size(512 << 10); + block_based_options.set_block_size(2 << 19); + block_based_options.set_block_cache(&rocksdb_cache); db_opts.set_block_based_table_factory(&block_based_options); let cfs = rocksdb::DBWithThreadMode::::list_cf( @@ -45,7 +51,6 @@ impl DatabaseEngine for Engine { let mut options = rocksdb::Options::default(); let prefix_extractor = rocksdb::SliceTransform::create_fixed_prefix(1); options.set_prefix_extractor(prefix_extractor); - options.set_merge_operator_associative("increment", utils::increment_rocksdb); rocksdb::ColumnFamilyDescriptor::new(name, options) }), @@ -63,7 +68,6 @@ impl DatabaseEngine for Engine { let mut options = rocksdb::Options::default(); let prefix_extractor = rocksdb::SliceTransform::create_fixed_prefix(1); options.set_prefix_extractor(prefix_extractor); - options.set_merge_operator_associative("increment", utils::increment_rocksdb); let _ = self.rocks.create_cf(name, &options); println!("created cf"); diff --git a/src/server_server.rs b/src/server_server.rs index 594152ae..d6bc9b91 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1392,12 +1392,11 @@ async fn upgrade_outlier_to_timeline_pdu( let mut starting_events = Vec::with_capacity(leaf_state.len()); for (k, id) in leaf_state { - let k = db - .rooms - .get_statekey_from_short(k) - .map_err(|_| "Failed to get_statekey_from_short.".to_owned())?; - - state.insert(k, id.clone()); + if let Ok(k) = db.rooms.get_statekey_from_short(k) { + state.insert(k, id.clone()); + } else { + warn!("Failed to get_statekey_from_short."); + } starting_events.push(id); } @@ -1755,11 +1754,16 @@ async fn upgrade_outlier_to_timeline_pdu( .into_iter() .map(|map| { map.into_iter() - .map(|(k, id)| db.rooms.get_statekey_from_short(k).map(|k| (k, id))) - .collect::>>() + .filter_map(|(k, id)| { + db.rooms + .get_statekey_from_short(k) + .map(|k| (k, id)) + .map_err(|e| warn!("Failed to get_statekey_from_short: {}", e)) + .ok() + }) + .collect::>() }) - .collect::>() - .map_err(|_| "Failed to get_statekey_from_short.".to_owned())?; + .collect(); let state = match state_res::resolve( room_version_id, @@ -1871,73 +1875,78 @@ pub(crate) fn fetch_and_handle_outliers<'a>( // a. Look in the main timeline (pduid_pdu tree) // b. Look at outlier pdu tree // (get_pdu_json checks both) - let local_pdu = db.rooms.get_pdu(id); - let pdu = match local_pdu { - Ok(Some(pdu)) => { + if let Ok(Some(local_pdu)) = db.rooms.get_pdu(id) { + trace!("Found {} in db", id); + pdus.push((local_pdu, None)); + } + + // c. Ask origin server over federation + // We also handle its auth chain here so we don't get a stack overflow in + // handle_outlier_pdu. + let mut todo_auth_events = vec![id]; + let mut events_in_reverse_order = Vec::new(); + while let Some(next_id) = todo_auth_events.pop() { + if let Ok(Some(_)) = db.rooms.get_pdu(next_id) { trace!("Found {} in db", id); - (pdu, None) + continue; } - Ok(None) => { - // c. Ask origin server over federation - warn!("Fetching {} over federation.", id); - match db - .sending - .send_federation_request( - &db.globals, - origin, - get_event::v1::Request { event_id: id }, - ) - .await - { - Ok(res) => { - warn!("Got {} over federation", id); - let (calculated_event_id, value) = - match crate::pdu::gen_event_id_canonical_json(&res.pdu) { - Ok(t) => t, - Err(_) => { - back_off((**id).to_owned()); - continue; - } - }; - if calculated_event_id != **id { - warn!("Server didn't return event id we requested: requested: {}, we got {}. Event: {:?}", - id, calculated_event_id, &res.pdu); - } - - // This will also fetch the auth chain - match handle_outlier_pdu( - origin, - create_event, - id, - room_id, - value.clone(), - db, - pub_key_map, - ) - .await - { - Ok((pdu, json)) => (pdu, Some(json)), - Err(e) => { - warn!("Authentication of event {} failed: {:?}", id, e); - back_off((**id).to_owned()); + warn!("Fetching {} over federation.", next_id); + match db + .sending + .send_federation_request( + &db.globals, + origin, + get_event::v1::Request { event_id: next_id }, + ) + .await + { + Ok(res) => { + warn!("Got {} over federation", next_id); + let (calculated_event_id, value) = + match crate::pdu::gen_event_id_canonical_json(&res.pdu) { + Ok(t) => t, + Err(_) => { + back_off((**next_id).to_owned()); continue; } - } - } - Err(_) => { - warn!("Failed to fetch event: {}", id); - back_off((**id).to_owned()); - continue; + }; + + if calculated_event_id != **next_id { + warn!("Server didn't return event id we requested: requested: {}, we got {}. Event: {:?}", + next_id, calculated_event_id, &res.pdu); } + + events_in_reverse_order.push((next_id, value)); + } + Err(_) => { + warn!("Failed to fetch event: {}", next_id); + back_off((**next_id).to_owned()); } } - Err(e) => { - warn!("Error loading {}: {}", id, e); - continue; + } + + while let Some((next_id, value)) = events_in_reverse_order.pop() { + match handle_outlier_pdu( + origin, + create_event, + next_id, + room_id, + value.clone(), + db, + pub_key_map, + ) + .await + { + Ok((pdu, json)) => { + pdus.push((pdu, Some(json))); + } + Err(e) => { + warn!("Authentication of event {} failed: {:?}", next_id, e); + back_off((**next_id).to_owned()); + } } - }; - pdus.push(pdu); + } } pdus })