use std::{ collections::{HashMap, HashSet}, fs, io::Write, mem::size_of, path::Path, sync::{Arc, Mutex, RwLock}, }; use lru_cache::LruCache; use ruma::{ events::{ push_rules::PushRulesEvent, GlobalAccountDataEventType, StateEventType, }, push::Ruleset, EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId, }; use tracing::{debug, error, info, info_span, warn, Instrument}; use crate::{ config::DatabaseBackend, service::{ media::MediaFileKey, rooms::{ short::{ShortEventId, ShortStateHash, ShortStateKey}, state_compressor::CompressedStateEvent, timeline::PduCount, }, }, services, utils, Config, Error, Result, }; pub(crate) mod abstraction; pub(crate) mod key_value; use abstraction::{KeyValueDatabaseEngine, KvTree}; pub(crate) struct KeyValueDatabase { db: Arc, // Trees "owned" by `self::key_value::globals` pub(super) global: Arc, pub(super) server_signingkeys: Arc, // Trees "owned" by `self::key_value::users` pub(super) userid_password: Arc, pub(super) userid_displayname: Arc, pub(super) userid_avatarurl: Arc, pub(super) userid_blurhash: Arc, pub(super) userdeviceid_token: Arc, // This is also used to check if a device exists pub(super) userdeviceid_metadata: Arc, // DevicelistVersion = u64 pub(super) userid_devicelistversion: Arc, pub(super) token_userdeviceid: Arc, // OneTimeKeyId = UserId + DeviceKeyId pub(super) onetimekeyid_onetimekeys: Arc, // LastOneTimeKeyUpdate = Count pub(super) userid_lastonetimekeyupdate: Arc, // KeyChangeId = UserId/RoomId + Count pub(super) keychangeid_userid: Arc, // KeyId = UserId + KeyId (depends on key type) pub(super) keyid_key: Arc, pub(super) userid_masterkeyid: Arc, pub(super) userid_selfsigningkeyid: Arc, pub(super) userid_usersigningkeyid: Arc, // UserFilterId = UserId + FilterId pub(super) userfilterid_filter: Arc, // ToDeviceId = UserId + DeviceId + Count pub(super) todeviceid_events: Arc, // Trees "owned" by `self::key_value::uiaa` // User-interactive authentication pub(super) userdevicesessionid_uiaainfo: Arc, // Trees "owned" by `self::key_value::rooms::edus` // ReadReceiptId = RoomId + Count + UserId pub(super) readreceiptid_readreceipt: Arc, // RoomUserId = Room + User, PrivateRead = Count pub(super) roomuserid_privateread: Arc, // LastPrivateReadUpdate = Count pub(super) roomuserid_lastprivatereadupdate: Arc, // PresenceId = RoomId + Count + UserId // This exists in the database already but is currently unused #[allow(dead_code)] pub(super) presenceid_presence: Arc, // LastPresenceUpdate = Count // This exists in the database already but is currently unused #[allow(dead_code)] pub(super) userid_lastpresenceupdate: Arc, // Trees "owned" by `self::key_value::rooms` // PduId = ShortRoomId + Count pub(super) pduid_pdu: Arc, pub(super) eventid_pduid: Arc, pub(super) roomid_pduleaves: Arc, pub(super) alias_roomid: Arc, // AliasId = RoomId + Count pub(super) aliasid_alias: Arc, pub(super) publicroomids: Arc, // ThreadId = RoomId + Count pub(super) threadid_userids: Arc, // TokenId = ShortRoomId + Token + PduIdCount pub(super) tokenids: Arc, /// Participating servers in a room. // RoomServerId = RoomId + ServerName pub(super) roomserverids: Arc, // ServerRoomId = ServerName + RoomId pub(super) serverroomids: Arc, pub(super) userroomid_joined: Arc, pub(super) roomuserid_joined: Arc, pub(super) roomid_joinedcount: Arc, pub(super) roomid_invitedcount: Arc, pub(super) roomuseroncejoinedids: Arc, // InviteState = Vec> pub(super) userroomid_invitestate: Arc, // InviteCount = Count pub(super) roomuserid_invitecount: Arc, pub(super) userroomid_leftstate: Arc, pub(super) roomuserid_leftcount: Arc, // Rooms where incoming federation handling is disabled pub(super) disabledroomids: Arc, // LazyLoadedIds = UserId + DeviceId + RoomId + LazyLoadedUserId pub(super) lazyloadedids: Arc, // NotifyCount = u64 pub(super) userroomid_notificationcount: Arc, // HightlightCount = u64 pub(super) userroomid_highlightcount: Arc, // LastNotificationRead = u64 pub(super) roomuserid_lastnotificationread: Arc, /// Remember the current state hash of a room. pub(super) roomid_shortstatehash: Arc, pub(super) roomsynctoken_shortstatehash: Arc, /// Remember the state hash at events in the past. pub(super) shorteventid_shortstatehash: Arc, /// `StateKey = EventType + StateKey`, `ShortStateKey = Count` pub(super) statekey_shortstatekey: Arc, pub(super) shortstatekey_statekey: Arc, pub(super) roomid_shortroomid: Arc, pub(super) shorteventid_eventid: Arc, pub(super) eventid_shorteventid: Arc, pub(super) statehash_shortstatehash: Arc, // StateDiff = parent (or 0) + (shortstatekey+shorteventid++) + 0_u64 + // (shortstatekey+shorteventid--) pub(super) shortstatehash_statediff: Arc, pub(super) shorteventid_authchain: Arc, /// `RoomId + EventId -> outlier PDU` /// /// Any pdu that has passed the steps 1-8 in the incoming event /// /federation/send/txn. pub(super) eventid_outlierpdu: Arc, pub(super) softfailedeventids: Arc, /// `ShortEventId + ShortEventId -> ()` pub(super) tofrom_relation: Arc, /// `RoomId + EventId -> Parent PDU EventId` pub(super) referencedevents: Arc, // Trees "owned" by `self::key_value::account_data` // RoomUserDataId = Room + User + Count + Type pub(super) roomuserdataid_accountdata: Arc, // RoomUserType = Room + User + Type pub(super) roomusertype_roomuserdataid: Arc, // Trees "owned" by `self::key_value::media` // MediaId = MXC + WidthHeight + ContentDisposition + ContentType pub(super) mediaid_file: Arc, // Trees "owned" by `self::key_value::key_backups` // BackupId = UserId + Version(Count) pub(super) backupid_algorithm: Arc, // BackupId = UserId + Version(Count) pub(super) backupid_etag: Arc, // BackupKeyId = UserId + Version + RoomId + SessionId pub(super) backupkeyid_backup: Arc, // Trees "owned" by `self::key_value::transaction_ids` // Response can be empty (/sendToDevice) or the event id (/send) pub(super) userdevicetxnid_response: Arc, // Trees "owned" by `self::key_value::sending` // EduCount: Count of last EDU sync pub(super) servername_educount: Arc, // ServernameEvent = (+ / $)SenderKey / ServerName / UserId + PduId / Id // (for edus), Data = EDU content pub(super) servernameevent_data: Arc, // ServerCurrentEvents = (+ / $)ServerName / UserId + PduId / Id (for // edus), Data = EDU content pub(super) servercurrentevent_data: Arc, // Trees "owned" by `self::key_value::appservice` pub(super) id_appserviceregistrations: Arc, // Trees "owned" by `self::key_value::pusher` pub(super) senderkey_pusher: Arc, // Uncategorized trees pub(super) auth_chain_cache: Mutex, Arc>>>, pub(super) eventidshort_cache: Mutex>, pub(super) statekeyshort_cache: Mutex>, pub(super) shortstatekey_cache: Mutex>, pub(super) our_real_users_cache: RwLock>>>, pub(super) appservice_in_room_cache: RwLock>>, pub(super) lasttimelinecount_cache: Mutex>, } impl KeyValueDatabase { fn check_db_setup(config: &Config) -> Result<()> { let path = Path::new(&config.database.path); let sqlite_exists = path .join(format!( "{}.db", if config.conduit_compat { "conduit" } else { "grapevine" } )) .exists(); let rocksdb_exists = path.join("IDENTITY").exists(); let mut count = 0; if sqlite_exists { count += 1; } if rocksdb_exists { count += 1; } if count > 1 { warn!("Multiple databases at database_path detected"); return Ok(()); } let (backend_is_rocksdb, backend_is_sqlite): (bool, bool) = match config.database.backend { #[cfg(feature = "rocksdb")] DatabaseBackend::Rocksdb => (true, false), #[cfg(feature = "sqlite")] DatabaseBackend::Sqlite => (false, true), }; if sqlite_exists && !backend_is_sqlite { return Err(Error::bad_config( "Found sqlite at database_path, but is not specified in \ config.", )); } if rocksdb_exists && !backend_is_rocksdb { return Err(Error::bad_config( "Found rocksdb at database_path, but is not specified in \ config.", )); } Ok(()) } /// Load an existing database or create a new one, and initialize all /// services with the loaded database. #[cfg_attr( not(any(feature = "rocksdb", feature = "sqlite")), allow(unreachable_code) )] #[allow(clippy::too_many_lines)] pub(crate) fn load_or_create(config: &Config) -> Result { Self::check_db_setup(config)?; if !Path::new(&config.database.path).exists() { fs::create_dir_all(&config.database.path).map_err(|_| { Error::BadConfig( "Database folder doesn't exists and couldn't be created \ (e.g. due to missing permissions). Please create the \ database folder yourself.", ) })?; } #[cfg_attr( not(any(feature = "rocksdb", feature = "sqlite")), allow(unused_variables) )] let builder: Arc = match config.database.backend { #[cfg(feature = "sqlite")] DatabaseBackend::Sqlite => { Arc::new(Arc::::open(config)?) } #[cfg(feature = "rocksdb")] DatabaseBackend::Rocksdb => { Arc::new(Arc::::open(config)?) } }; let db = Self { db: builder.clone(), userid_password: builder.open_tree("userid_password")?, userid_displayname: builder.open_tree("userid_displayname")?, userid_avatarurl: builder.open_tree("userid_avatarurl")?, userid_blurhash: builder.open_tree("userid_blurhash")?, userdeviceid_token: builder.open_tree("userdeviceid_token")?, userdeviceid_metadata: builder .open_tree("userdeviceid_metadata")?, userid_devicelistversion: builder .open_tree("userid_devicelistversion")?, token_userdeviceid: builder.open_tree("token_userdeviceid")?, onetimekeyid_onetimekeys: builder .open_tree("onetimekeyid_onetimekeys")?, userid_lastonetimekeyupdate: builder .open_tree("userid_lastonetimekeyupdate")?, keychangeid_userid: builder.open_tree("keychangeid_userid")?, keyid_key: builder.open_tree("keyid_key")?, userid_masterkeyid: builder.open_tree("userid_masterkeyid")?, userid_selfsigningkeyid: builder .open_tree("userid_selfsigningkeyid")?, userid_usersigningkeyid: builder .open_tree("userid_usersigningkeyid")?, userfilterid_filter: builder.open_tree("userfilterid_filter")?, todeviceid_events: builder.open_tree("todeviceid_events")?, userdevicesessionid_uiaainfo: builder .open_tree("userdevicesessionid_uiaainfo")?, readreceiptid_readreceipt: builder .open_tree("readreceiptid_readreceipt")?, // "Private" read receipt roomuserid_privateread: builder .open_tree("roomuserid_privateread")?, roomuserid_lastprivatereadupdate: builder .open_tree("roomuserid_lastprivatereadupdate")?, presenceid_presence: builder.open_tree("presenceid_presence")?, userid_lastpresenceupdate: builder .open_tree("userid_lastpresenceupdate")?, pduid_pdu: builder.open_tree("pduid_pdu")?, eventid_pduid: builder.open_tree("eventid_pduid")?, roomid_pduleaves: builder.open_tree("roomid_pduleaves")?, alias_roomid: builder.open_tree("alias_roomid")?, aliasid_alias: builder.open_tree("aliasid_alias")?, publicroomids: builder.open_tree("publicroomids")?, threadid_userids: builder.open_tree("threadid_userids")?, tokenids: builder.open_tree("tokenids")?, roomserverids: builder.open_tree("roomserverids")?, serverroomids: builder.open_tree("serverroomids")?, userroomid_joined: builder.open_tree("userroomid_joined")?, roomuserid_joined: builder.open_tree("roomuserid_joined")?, roomid_joinedcount: builder.open_tree("roomid_joinedcount")?, roomid_invitedcount: builder.open_tree("roomid_invitedcount")?, roomuseroncejoinedids: builder .open_tree("roomuseroncejoinedids")?, userroomid_invitestate: builder .open_tree("userroomid_invitestate")?, roomuserid_invitecount: builder .open_tree("roomuserid_invitecount")?, userroomid_leftstate: builder.open_tree("userroomid_leftstate")?, roomuserid_leftcount: builder.open_tree("roomuserid_leftcount")?, disabledroomids: builder.open_tree("disabledroomids")?, lazyloadedids: builder.open_tree("lazyloadedids")?, userroomid_notificationcount: builder .open_tree("userroomid_notificationcount")?, userroomid_highlightcount: builder .open_tree("userroomid_highlightcount")?, roomuserid_lastnotificationread: builder .open_tree("userroomid_highlightcount")?, statekey_shortstatekey: builder .open_tree("statekey_shortstatekey")?, shortstatekey_statekey: builder .open_tree("shortstatekey_statekey")?, shorteventid_authchain: builder .open_tree("shorteventid_authchain")?, roomid_shortroomid: builder.open_tree("roomid_shortroomid")?, shortstatehash_statediff: builder .open_tree("shortstatehash_statediff")?, eventid_shorteventid: builder.open_tree("eventid_shorteventid")?, shorteventid_eventid: builder.open_tree("shorteventid_eventid")?, shorteventid_shortstatehash: builder .open_tree("shorteventid_shortstatehash")?, roomid_shortstatehash: builder .open_tree("roomid_shortstatehash")?, roomsynctoken_shortstatehash: builder .open_tree("roomsynctoken_shortstatehash")?, statehash_shortstatehash: builder .open_tree("statehash_shortstatehash")?, eventid_outlierpdu: builder.open_tree("eventid_outlierpdu")?, softfailedeventids: builder.open_tree("softfailedeventids")?, tofrom_relation: builder.open_tree("tofrom_relation")?, referencedevents: builder.open_tree("referencedevents")?, roomuserdataid_accountdata: builder .open_tree("roomuserdataid_accountdata")?, roomusertype_roomuserdataid: builder .open_tree("roomusertype_roomuserdataid")?, mediaid_file: builder.open_tree("mediaid_file")?, backupid_algorithm: builder.open_tree("backupid_algorithm")?, backupid_etag: builder.open_tree("backupid_etag")?, backupkeyid_backup: builder.open_tree("backupkeyid_backup")?, userdevicetxnid_response: builder .open_tree("userdevicetxnid_response")?, servername_educount: builder.open_tree("servername_educount")?, servernameevent_data: builder.open_tree("servernameevent_data")?, servercurrentevent_data: builder .open_tree("servercurrentevent_data")?, id_appserviceregistrations: builder .open_tree("id_appserviceregistrations")?, senderkey_pusher: builder.open_tree("senderkey_pusher")?, global: builder.open_tree("global")?, server_signingkeys: builder.open_tree("server_signingkeys")?, #[allow( clippy::as_conversions, clippy::cast_sign_loss, clippy::cast_possible_truncation )] auth_chain_cache: Mutex::new(LruCache::new( (100_000.0 * config.cache_capacity_modifier) as usize, )), #[allow( clippy::as_conversions, clippy::cast_sign_loss, clippy::cast_possible_truncation )] eventidshort_cache: Mutex::new(LruCache::new( (100_000.0 * config.cache_capacity_modifier) as usize, )), #[allow( clippy::as_conversions, clippy::cast_sign_loss, clippy::cast_possible_truncation )] shortstatekey_cache: Mutex::new(LruCache::new( (100_000.0 * config.cache_capacity_modifier) as usize, )), #[allow( clippy::as_conversions, clippy::cast_sign_loss, clippy::cast_possible_truncation )] statekeyshort_cache: Mutex::new(LruCache::new( (100_000.0 * config.cache_capacity_modifier) as usize, )), our_real_users_cache: RwLock::new(HashMap::new()), appservice_in_room_cache: RwLock::new(HashMap::new()), lasttimelinecount_cache: Mutex::new(HashMap::new()), }; Ok(db) } /// Ensure that the database is at the current version, applying migrations /// if necessary. /// /// If it is not possible to migrate the database to the current version, /// returns an error. #[allow(clippy::too_many_lines)] pub(crate) async fn apply_migrations(&self) -> Result<()> { // If the database has any data, perform data migrations before starting let latest_database_version = 13; if services().users.count()? > 0 { // MIGRATIONS migration(1, || { for (roomserverid, _) in self.roomserverids.iter() { let mut parts = roomserverid.split(|&b| b == 0xFF); let room_id = parts.next().expect("split always returns one element"); let Some(servername) = parts.next() else { error!("Migration: Invalid roomserverid in db."); continue; }; let mut serverroomid = servername.to_vec(); serverroomid.push(0xFF); serverroomid.extend_from_slice(room_id); self.serverroomids.insert(&serverroomid, &[])?; } Ok(()) })?; migration(2, || { // We accidentally inserted hashed versions of "" into the db // instead of just "" for (userid, password) in self.userid_password.iter() { let password = utils::string_from_bytes(&password); let empty_hashed_password = password .map_or(false, |password| { utils::verify_password("", password) }); if empty_hashed_password { self.userid_password.insert(&userid, b"")?; } } Ok(()) })?; migration(3, || { // Move media to filesystem for (key, content) in self.mediaid_file.iter() { let key = MediaFileKey::new(key); if content.is_empty() { continue; } let path = services().globals.get_media_file(&key); let mut file = fs::File::create(path)?; file.write_all(&content)?; self.mediaid_file.insert(key.as_bytes(), &[])?; } Ok(()) })?; migration(4, || { // Add federated users to services() as deactivated for our_user in services().users.iter() { let our_user = our_user?; if services().users.is_deactivated(&our_user)? { continue; } for room in services().rooms.state_cache.rooms_joined(&our_user) { for user in services().rooms.state_cache.room_members(&room?) { let user = user?; if user.server_name() != services().globals.server_name() { info!(?user, "Migration: creating user"); services().users.create(&user, None)?; } } } } Ok(()) })?; migration(5, || { // Upgrade user data store for (roomuserdataid, _) in self.roomuserdataid_accountdata.iter() { let mut parts = roomuserdataid.split(|&b| b == 0xFF); let room_id = parts.next().unwrap(); let user_id = parts.next().unwrap(); let event_type = roomuserdataid.rsplit(|&b| b == 0xFF).next().unwrap(); let mut key = room_id.to_vec(); key.push(0xFF); key.extend_from_slice(user_id); key.push(0xFF); key.extend_from_slice(event_type); self.roomusertype_roomuserdataid .insert(&key, &roomuserdataid)?; } Ok(()) })?; migration(6, || { // Set room member count for (roomid, _) in self.roomid_shortstatehash.iter() { let string = utils::string_from_bytes(&roomid).unwrap(); let room_id = <&RoomId>::try_from(string.as_str()).unwrap(); services() .rooms .state_cache .update_joined_count(room_id)?; } Ok(()) })?; migration(7, || { // Upgrade state store let mut last_roomstates: HashMap = HashMap::new(); let mut current_sstatehash: Option = None; let mut current_room = None; let mut current_state = HashSet::new(); let mut counter = 0; let mut handle_state = |current_sstatehash: ShortStateHash, current_room: &RoomId, current_state: HashSet<_>, last_roomstates: &mut HashMap<_, _>| { counter += 1; let last_roomsstatehash = last_roomstates.get(current_room); let states_parents = last_roomsstatehash.map_or_else( || Ok(Vec::new()), |&last_roomsstatehash| { services() .rooms .state_compressor .load_shortstatehash_info( last_roomsstatehash, ) }, )?; let (statediffnew, statediffremoved) = if let Some(parent_stateinfo) = states_parents.last() { let statediffnew = current_state .difference(&parent_stateinfo.full_state) .copied() .collect::>(); let statediffremoved = parent_stateinfo .full_state .difference(¤t_state) .copied() .collect::>(); (statediffnew, statediffremoved) } else { (current_state, HashSet::new()) }; services() .rooms .state_compressor .save_state_from_diff( current_sstatehash, Arc::new(statediffnew), Arc::new(statediffremoved), // every state change is 2 event changes on // average 2, states_parents, )?; Ok::<_, Error>(()) }; for (k, seventid) in self.db.open_tree("stateid_shorteventid")?.iter() { let sstatehash = ShortStateHash::new( utils::u64_from_bytes(&k[0..size_of::()]) .expect("number of bytes is correct"), ); let sstatekey = ShortStateKey::new( utils::u64_from_bytes(&k[size_of::()..]) .expect("number of bytes is correct"), ); if Some(sstatehash) != current_sstatehash { if let Some(current_sstatehash) = current_sstatehash { handle_state( current_sstatehash, current_room.as_deref().unwrap(), current_state, &mut last_roomstates, )?; last_roomstates.insert( current_room.clone().unwrap(), current_sstatehash, ); } current_state = HashSet::new(); current_sstatehash = Some(sstatehash); let event_id = self .shorteventid_eventid .get(&seventid) .unwrap() .unwrap(); let string = utils::string_from_bytes(&event_id).unwrap(); let event_id = <&EventId>::try_from(string.as_str()).unwrap(); let pdu = services() .rooms .timeline .get_pdu(event_id) .unwrap() .unwrap(); if Some(&pdu.room_id) != current_room.as_ref() { current_room = Some(pdu.room_id.clone()); } } let seventid = ShortEventId::new( utils::u64_from_bytes(&seventid) .expect("number of bytes is correct"), ); current_state.insert(CompressedStateEvent { state: sstatekey, event: seventid, }); } if let Some(current_sstatehash) = current_sstatehash { handle_state( current_sstatehash, current_room.as_deref().unwrap(), current_state, &mut last_roomstates, )?; } Ok(()) })?; migration(8, || { // Generate short room ids for all rooms for (room_id, _) in self.roomid_shortstatehash.iter() { let shortroomid = services().globals.next_count()?.to_be_bytes(); self.roomid_shortroomid.insert(&room_id, &shortroomid)?; info!("Migration: 8"); } // Update pduids db layout let mut batch = self.pduid_pdu.iter().filter_map(|(key, v)| { if !key.starts_with(b"!") { return None; } let mut parts = key.splitn(2, |&b| b == 0xFF); let room_id = parts.next().unwrap(); let count = parts.next().unwrap(); let short_room_id = self .roomid_shortroomid .get(room_id) .unwrap() .expect("shortroomid should exist"); let mut new_key = short_room_id; new_key.extend_from_slice(count); Some((new_key, v)) }); self.pduid_pdu.insert_batch(&mut batch)?; let mut batch2 = self.eventid_pduid.iter().filter_map(|(k, value)| { if !value.starts_with(b"!") { return None; } let mut parts = value.splitn(2, |&b| b == 0xFF); let room_id = parts.next().unwrap(); let count = parts.next().unwrap(); let short_room_id = self .roomid_shortroomid .get(room_id) .unwrap() .expect("shortroomid should exist"); let mut new_value = short_room_id; new_value.extend_from_slice(count); Some((k, new_value)) }); self.eventid_pduid.insert_batch(&mut batch2)?; Ok(()) })?; migration(9, || { // Update tokenids db layout let mut iter = self .tokenids .iter() .filter_map(|(key, _)| { if !key.starts_with(b"!") { return None; } let mut parts = key.splitn(4, |&b| b == 0xFF); let room_id = parts.next().unwrap(); let word = parts.next().unwrap(); let _pdu_id_room = parts.next().unwrap(); let pdu_id_count = parts.next().unwrap(); let short_room_id = self .roomid_shortroomid .get(room_id) .unwrap() .expect("shortroomid should exist"); let mut new_key = short_room_id; new_key.extend_from_slice(word); new_key.push(0xFF); new_key.extend_from_slice(pdu_id_count); Some((new_key, Vec::new())) }) .peekable(); while iter.peek().is_some() { self.tokenids .insert_batch(&mut iter.by_ref().take(1000))?; debug!("Inserted smaller batch"); } info!("Deleting starts"); let batch2: Vec<_> = self .tokenids .iter() .filter_map(|(key, _)| key.starts_with(b"!").then_some(key)) .collect(); for key in batch2 { self.tokenids.remove(&key)?; } Ok(()) })?; migration(10, || { // Add other direction for shortstatekeys for (statekey, shortstatekey) in self.statekey_shortstatekey.iter() { self.shortstatekey_statekey .insert(&shortstatekey, &statekey)?; } // Force E2EE device list updates so we can send them over // federation for user_id in services().users.iter().filter_map(Result::ok) { services().users.mark_device_key_update(&user_id)?; } Ok(()) })?; migration(11, || { self.db .open_tree("userdevicesessionid_uiaarequest")? .clear()?; Ok(()) })?; migration(12, || { for username in services().users.list_local_users()? { let user = match UserId::parse_with_server_name( username.clone(), services().globals.server_name(), ) { Ok(u) => u, Err(error) => { warn!( %error, user_localpart = %username, "Invalid username", ); continue; } }; let raw_rules_list = services() .account_data .get( None, &user, GlobalAccountDataEventType::PushRules .to_string() .into(), ) .unwrap() .expect("Username is invalid"); let mut account_data = serde_json::from_str::( raw_rules_list.get(), ) .unwrap(); let rules_list = &mut account_data.content.global; //content rule { let content_rule_transformation = [ ".m.rules.contains_user_name", ".m.rule.contains_user_name", ]; let rule = rules_list .content .get(content_rule_transformation[0]); if rule.is_some() { let mut rule = rule.unwrap().clone(); content_rule_transformation[1] .clone_into(&mut rule.rule_id); rules_list .content .shift_remove(content_rule_transformation[0]); rules_list.content.insert(rule); } } //underride rules { let underride_rule_transformation = [ [".m.rules.call", ".m.rule.call"], [ ".m.rules.room_one_to_one", ".m.rule.room_one_to_one", ], [ ".m.rules.encrypted_room_one_to_one", ".m.rule.encrypted_room_one_to_one", ], [".m.rules.message", ".m.rule.message"], [".m.rules.encrypted", ".m.rule.encrypted"], ]; for transformation in underride_rule_transformation { let rule = rules_list.underride.get(transformation[0]); if let Some(rule) = rule { let mut rule = rule.clone(); transformation[1].clone_into(&mut rule.rule_id); rules_list .underride .shift_remove(transformation[0]); rules_list.underride.insert(rule); } } } services().account_data.update( None, &user, GlobalAccountDataEventType::PushRules .to_string() .into(), &serde_json::to_value(account_data) .expect("to json value always works"), )?; } Ok(()) })?; // This migration can be reused as-is anytime the server-default // rules are updated. migration(13, || { for username in services().users.list_local_users()? { let user = match UserId::parse_with_server_name( username.clone(), services().globals.server_name(), ) { Ok(u) => u, Err(error) => { warn!( %error, user_localpart = %username, "Invalid username", ); continue; } }; let raw_rules_list = services() .account_data .get( None, &user, GlobalAccountDataEventType::PushRules .to_string() .into(), ) .unwrap() .expect("Username is invalid"); let mut account_data = serde_json::from_str::( raw_rules_list.get(), ) .unwrap(); let user_default_rules = Ruleset::server_default(&user); account_data .content .global .update_with_server_default(user_default_rules); services().account_data.update( None, &user, GlobalAccountDataEventType::PushRules .to_string() .into(), &serde_json::to_value(account_data) .expect("to json value always works"), )?; } Ok(()) })?; assert_eq!( services().globals.database_version().unwrap(), latest_database_version, "database should be migrated to the current version", ); info!( backend = %services().globals.config.database.backend, version = latest_database_version, "Loaded database", ); } else { services() .globals .bump_database_version(latest_database_version)?; // Create the admin room and server user on first run services().admin.create_admin_room().await?; info!( backend = %services().globals.config.database.backend, version = latest_database_version, "Created new database", ); } Ok(()) } #[tracing::instrument] pub(crate) fn start_cleanup_task() { use std::time::{Duration, Instant}; #[cfg(unix)] use tokio::signal::unix::{signal, SignalKind}; use tokio::time::interval; let timer_interval = Duration::from_secs(u64::from( services().globals.config.cleanup_second_interval, )); tokio::spawn(async move { let mut i = interval(timer_interval); #[cfg(unix)] let mut s = signal(SignalKind::hangup()).unwrap(); loop { #[cfg(unix)] let msg = tokio::select! { _ = i.tick() => || { debug!("cleanup: Timer ticked"); }, _ = s.recv() => || { debug!("cleanup: Received SIGHUP"); }, }; #[cfg(not(unix))] let msg = { i.tick().await; || debug!("cleanup: Timer ticked") }; async { msg(); let start = Instant::now(); if let Err(error) = services().globals.cleanup() { error!(%error, "cleanup: Error"); } else { debug!(elapsed = ?start.elapsed(), "cleanup: Finished"); } } .instrument(info_span!("database_cleanup")) .await; } }); } } /// If the current version is older than `new_version`, execute a migration /// function. fn migration(new_version: u64, migration: F) -> Result<(), Error> where F: FnOnce() -> Result<(), Error>, { let current_version = services().globals.database_version()?; if current_version < new_version { migration()?; services().globals.bump_database_version(new_version)?; warn!("Migration: {current_version} -> {new_version} finished"); } Ok(()) }