diff --git a/src/database.rs b/src/database.rs index 445c8a6a..6156c7e0 100644 --- a/src/database.rs +++ b/src/database.rs @@ -27,7 +27,14 @@ use tracing::{debug, error, info, info_span, warn, Instrument}; use crate::{ config::DatabaseBackend, observability::FilterReloadHandles, - service::{media::MediaFileKey, rooms::timeline::PduCount}, + service::{ + media::MediaFileKey, + rooms::{ + short::{ShortEventId, ShortStateHash, ShortStateKey}, + state_compressor::CompressedStateEvent, + timeline::PduCount, + }, + }, services, utils, Config, Error, PduEvent, Result, Services, SERVICES, }; @@ -236,13 +243,14 @@ pub(crate) struct KeyValueDatabase { // Uncategorized trees pub(super) pdu_cache: Mutex>>, - pub(super) shorteventid_cache: Mutex>>, - pub(super) auth_chain_cache: Mutex, Arc>>>, - pub(super) eventidshort_cache: Mutex>, + pub(super) shorteventid_cache: Mutex>>, + pub(super) auth_chain_cache: + Mutex, Arc>>>, + pub(super) eventidshort_cache: Mutex>, pub(super) statekeyshort_cache: - Mutex>, + Mutex>, pub(super) shortstatekey_cache: - Mutex>, + Mutex>, pub(super) our_real_users_cache: RwLock>>>, pub(super) appservice_in_room_cache: @@ -695,15 +703,15 @@ impl KeyValueDatabase { if services().globals.database_version()? < 7 { // Upgrade state store - let mut last_roomstates: HashMap = + let mut last_roomstates: HashMap = HashMap::new(); - let mut current_sstatehash: Option = None; + let mut current_sstatehash: Option = None; let mut current_room = None; let mut current_state = HashSet::new(); let mut counter = 0; let mut handle_state = - |current_sstatehash: u64, + |current_sstatehash: ShortStateHash, current_room: &RoomId, current_state: HashSet<_>, last_roomstates: &mut HashMap<_, _>| { @@ -762,10 +770,14 @@ impl KeyValueDatabase { for (k, seventid) in db.db.open_tree("stateid_shorteventid")?.iter() { - let sstatehash = + let sstatehash = ShortStateHash::new( utils::u64_from_bytes(&k[0..size_of::()]) - .expect("number of bytes is correct"); - let sstatekey = k[size_of::()..].to_vec(); + .expect("number of bytes is correct"), + ); + let sstatekey = ShortStateKey::new( + utils::u64_from_bytes(&k[size_of::()..]) + .expect("number of bytes is correct"), + ); if Some(sstatehash) != current_sstatehash { if let Some(current_sstatehash) = current_sstatehash { handle_state( @@ -803,10 +815,14 @@ impl KeyValueDatabase { } } - let mut val = sstatekey; - val.extend_from_slice(&seventid); - current_state - .insert(val.try_into().expect("size is correct")); + let seventid = ShortEventId::new( + utils::u64_from_bytes(&seventid) + .expect("number of bytes is correct"), + ); + current_state.insert(CompressedStateEvent { + state: sstatekey, + event: seventid, + }); } if let Some(current_sstatehash) = current_sstatehash { diff --git a/src/database/key_value/globals.rs b/src/database/key_value/globals.rs index b2594226..819ef98c 100644 --- a/src/database/key_value/globals.rs +++ b/src/database/key_value/globals.rs @@ -74,6 +74,7 @@ impl service::globals::Data for KeyValueDatabase { .ok() .flatten() .expect("room exists") + .get() .to_be_bytes() .to_vec(); diff --git a/src/database/key_value/rooms/auth_chain.rs b/src/database/key_value/rooms/auth_chain.rs index 896a0f40..b29149f8 100644 --- a/src/database/key_value/rooms/auth_chain.rs +++ b/src/database/key_value/rooms/auth_chain.rs @@ -3,15 +3,16 @@ use std::{collections::HashSet, mem::size_of, sync::Arc}; use crate::{ database::KeyValueDatabase, observability::{FoundIn, Lookup, METRICS}, - service, utils, Result, + service::{self, rooms::short::ShortEventId}, + utils, Result, }; impl service::rooms::auth_chain::Data for KeyValueDatabase { #[tracing::instrument(skip(self, key))] fn get_cached_eventid_authchain( &self, - key: &[u64], - ) -> Result>>> { + key: &[ShortEventId], + ) -> Result>>> { let lookup = Lookup::AuthChain; // Check RAM cache @@ -26,13 +27,15 @@ impl service::rooms::auth_chain::Data for KeyValueDatabase { // Check DB cache let chain = self .shorteventid_authchain - .get(&key[0].to_be_bytes())? + .get(&key[0].get().to_be_bytes())? .map(|chain| { chain .chunks_exact(size_of::()) .map(|chunk| { - utils::u64_from_bytes(chunk) - .expect("byte length is correct") + ShortEventId::new( + utils::u64_from_bytes(chunk) + .expect("byte length is correct"), + ) }) .collect() }); @@ -57,16 +60,16 @@ impl service::rooms::auth_chain::Data for KeyValueDatabase { fn cache_auth_chain( &self, - key: Vec, - auth_chain: Arc>, + key: Vec, + auth_chain: Arc>, ) -> Result<()> { // Only persist single events in db if key.len() == 1 { self.shorteventid_authchain.insert( - &key[0].to_be_bytes(), + &key[0].get().to_be_bytes(), &auth_chain .iter() - .flat_map(|s| s.to_be_bytes().to_vec()) + .flat_map(|s| s.get().to_be_bytes().to_vec()) .collect::>(), )?; } diff --git a/src/database/key_value/rooms/metadata.rs b/src/database/key_value/rooms/metadata.rs index ab7a5cb8..dc8c7a19 100644 --- a/src/database/key_value/rooms/metadata.rs +++ b/src/database/key_value/rooms/metadata.rs @@ -8,7 +8,7 @@ impl service::rooms::metadata::Data for KeyValueDatabase { #[tracing::instrument(skip(self))] fn exists(&self, room_id: &RoomId) -> Result { let prefix = match services().rooms.short.get_shortroomid(room_id)? { - Some(b) => b.to_be_bytes().to_vec(), + Some(b) => b.get().to_be_bytes().to_vec(), None => return Ok(false), }; diff --git a/src/database/key_value/rooms/pdu_metadata.rs b/src/database/key_value/rooms/pdu_metadata.rs index 32689bd4..5fc6a897 100644 --- a/src/database/key_value/rooms/pdu_metadata.rs +++ b/src/database/key_value/rooms/pdu_metadata.rs @@ -6,7 +6,10 @@ use crate::{ database::KeyValueDatabase, service::{ self, - rooms::timeline::{PduCount, PduId}, + rooms::{ + short::ShortRoomId, + timeline::{PduCount, PduId}, + }, }, services, utils, Error, PduEvent, Result, }; @@ -22,7 +25,7 @@ impl service::rooms::pdu_metadata::Data for KeyValueDatabase { fn relations_until<'a>( &'a self, user_id: &'a UserId, - shortroomid: u64, + shortroomid: ShortRoomId, target: u64, until: PduCount, ) -> Result> + 'a>> @@ -51,7 +54,7 @@ impl service::rooms::pdu_metadata::Data for KeyValueDatabase { Error::bad_database("Invalid count in tofrom_relation.") })?; - let mut pduid = shortroomid.to_be_bytes().to_vec(); + let mut pduid = shortroomid.get().to_be_bytes().to_vec(); pduid.extend_from_slice(&from.to_be_bytes()); let pduid = PduId::new(pduid); diff --git a/src/database/key_value/rooms/search.rs b/src/database/key_value/rooms/search.rs index fa48965d..510feca5 100644 --- a/src/database/key_value/rooms/search.rs +++ b/src/database/key_value/rooms/search.rs @@ -2,7 +2,10 @@ use ruma::RoomId; use crate::{ database::KeyValueDatabase, - service::{self, rooms::timeline::PduId}, + service::{ + self, + rooms::{short::ShortRoomId, timeline::PduId}, + }, services, utils, Result, }; @@ -21,12 +24,12 @@ impl service::rooms::search::Data for KeyValueDatabase { #[tracing::instrument(skip(self))] fn index_pdu( &self, - shortroomid: u64, + shortroomid: ShortRoomId, pdu_id: &PduId, message_body: &str, ) -> Result<()> { let mut batch = tokenize(message_body).map(|word| { - let mut key = shortroomid.to_be_bytes().to_vec(); + let mut key = shortroomid.get().to_be_bytes().to_vec(); key.extend_from_slice(word.as_bytes()); key.push(0xFF); // TODO: currently we save the room id a second time here @@ -40,12 +43,12 @@ impl service::rooms::search::Data for KeyValueDatabase { #[tracing::instrument(skip(self))] fn deindex_pdu( &self, - shortroomid: u64, + shortroomid: ShortRoomId, pdu_id: &PduId, message_body: &str, ) -> Result<()> { let batch = tokenize(message_body).map(|word| { - let mut key = shortroomid.to_be_bytes().to_vec(); + let mut key = shortroomid.get().to_be_bytes().to_vec(); key.extend_from_slice(word.as_bytes()); key.push(0xFF); // TODO: currently we save the room id a second time here @@ -73,6 +76,7 @@ impl service::rooms::search::Data for KeyValueDatabase { .short .get_shortroomid(room_id)? .expect("room exists") + .get() .to_be_bytes() .to_vec(); diff --git a/src/database/key_value/rooms/short.rs b/src/database/key_value/rooms/short.rs index c665c092..ca499ed2 100644 --- a/src/database/key_value/rooms/short.rs +++ b/src/database/key_value/rooms/short.rs @@ -5,12 +5,21 @@ use ruma::{events::StateEventType, EventId, RoomId}; use crate::{ database::KeyValueDatabase, observability::{FoundIn, Lookup, METRICS}, - service, services, utils, Error, Result, + service::{ + self, + rooms::short::{ + ShortEventId, ShortRoomId, ShortStateHash, ShortStateKey, + }, + }, + services, utils, Error, Result, }; impl service::rooms::short::Data for KeyValueDatabase { #[tracing::instrument(skip(self))] - fn get_or_create_shorteventid(&self, event_id: &EventId) -> Result { + fn get_or_create_shorteventid( + &self, + event_id: &EventId, + ) -> Result { let lookup = Lookup::CreateEventIdToShort; if let Some(short) = @@ -39,6 +48,8 @@ impl service::rooms::short::Data for KeyValueDatabase { shorteventid }; + let short = ShortEventId::new(short); + self.eventidshort_cache .lock() .unwrap() @@ -52,7 +63,7 @@ impl service::rooms::short::Data for KeyValueDatabase { &self, event_type: &StateEventType, state_key: &str, - ) -> Result> { + ) -> Result> { let lookup = Lookup::StateKeyToShort; if let Some(short) = self @@ -73,9 +84,11 @@ impl service::rooms::short::Data for KeyValueDatabase { .statekey_shortstatekey .get(&db_key)? .map(|shortstatekey| { - utils::u64_from_bytes(&shortstatekey).map_err(|_| { - Error::bad_database("Invalid shortstatekey in db.") - }) + utils::u64_from_bytes(&shortstatekey) + .map_err(|_| { + Error::bad_database("Invalid shortstatekey in db.") + }) + .map(ShortStateKey::new) }) .transpose()?; @@ -98,7 +111,7 @@ impl service::rooms::short::Data for KeyValueDatabase { &self, event_type: &StateEventType, state_key: &str, - ) -> Result { + ) -> Result { let lookup = Lookup::CreateStateKeyToShort; if let Some(short) = self @@ -134,6 +147,8 @@ impl service::rooms::short::Data for KeyValueDatabase { shortstatekey }; + let short = ShortStateKey::new(short); + self.statekeyshort_cache .lock() .unwrap() @@ -145,7 +160,7 @@ impl service::rooms::short::Data for KeyValueDatabase { #[tracing::instrument(skip(self))] fn get_eventid_from_short( &self, - shorteventid: u64, + shorteventid: ShortEventId, ) -> Result> { let lookup = Lookup::ShortToEventId; @@ -158,7 +173,7 @@ impl service::rooms::short::Data for KeyValueDatabase { let bytes = self .shorteventid_eventid - .get(&shorteventid.to_be_bytes())? + .get(&shorteventid.get().to_be_bytes())? .ok_or_else(|| { Error::bad_database("Shorteventid does not exist") })?; @@ -187,7 +202,7 @@ impl service::rooms::short::Data for KeyValueDatabase { #[tracing::instrument(skip(self))] fn get_statekey_from_short( &self, - shortstatekey: u64, + shortstatekey: ShortStateKey, ) -> Result<(StateEventType, String)> { let lookup = Lookup::ShortToStateKey; @@ -200,7 +215,7 @@ impl service::rooms::short::Data for KeyValueDatabase { let bytes = self .shortstatekey_statekey - .get(&shortstatekey.to_be_bytes())? + .get(&shortstatekey.get().to_be_bytes())? .ok_or_else(|| { Error::bad_database("Shortstatekey does not exist") })?; @@ -244,51 +259,56 @@ impl service::rooms::short::Data for KeyValueDatabase { fn get_or_create_shortstatehash( &self, state_hash: &[u8], - ) -> Result<(u64, bool)> { - Ok( - if let Some(shortstatehash) = - self.statehash_shortstatehash.get(state_hash)? - { - ( - utils::u64_from_bytes(&shortstatehash).map_err(|_| { - Error::bad_database("Invalid shortstatehash in db.") - })?, - true, - ) - } else { - let shortstatehash = services().globals.next_count()?; - self.statehash_shortstatehash - .insert(state_hash, &shortstatehash.to_be_bytes())?; - (shortstatehash, false) - }, - ) + ) -> Result<(ShortStateHash, bool)> { + let (short, existed) = if let Some(shortstatehash) = + self.statehash_shortstatehash.get(state_hash)? + { + ( + utils::u64_from_bytes(&shortstatehash).map_err(|_| { + Error::bad_database("Invalid shortstatehash in db.") + })?, + true, + ) + } else { + let shortstatehash = services().globals.next_count()?; + self.statehash_shortstatehash + .insert(state_hash, &shortstatehash.to_be_bytes())?; + (shortstatehash, false) + }; + + Ok((ShortStateHash::new(short), existed)) } - fn get_shortroomid(&self, room_id: &RoomId) -> Result> { + fn get_shortroomid(&self, room_id: &RoomId) -> Result> { self.roomid_shortroomid .get(room_id.as_bytes())? .map(|bytes| { - utils::u64_from_bytes(&bytes).map_err(|_| { - Error::bad_database("Invalid shortroomid in db.") - }) + utils::u64_from_bytes(&bytes) + .map_err(|_| { + Error::bad_database("Invalid shortroomid in db.") + }) + .map(ShortRoomId::new) }) .transpose() } - fn get_or_create_shortroomid(&self, room_id: &RoomId) -> Result { - Ok( - if let Some(short) = - self.roomid_shortroomid.get(room_id.as_bytes())? - { - utils::u64_from_bytes(&short).map_err(|_| { - Error::bad_database("Invalid shortroomid in db.") - })? - } else { - let short = services().globals.next_count()?; - self.roomid_shortroomid - .insert(room_id.as_bytes(), &short.to_be_bytes())?; - short - }, - ) + fn get_or_create_shortroomid( + &self, + room_id: &RoomId, + ) -> Result { + let short = if let Some(short) = + self.roomid_shortroomid.get(room_id.as_bytes())? + { + utils::u64_from_bytes(&short).map_err(|_| { + Error::bad_database("Invalid shortroomid in db.") + })? + } else { + let short = services().globals.next_count()?; + self.roomid_shortroomid + .insert(room_id.as_bytes(), &short.to_be_bytes())?; + short + }; + + Ok(ShortRoomId::new(short)) } } diff --git a/src/database/key_value/rooms/state.rs b/src/database/key_value/rooms/state.rs index a3246878..b7cbcb39 100644 --- a/src/database/key_value/rooms/state.rs +++ b/src/database/key_value/rooms/state.rs @@ -4,21 +4,30 @@ use ruma::{EventId, OwnedEventId, OwnedRoomId, RoomId}; use crate::{ database::KeyValueDatabase, - service::{self, globals::marker}, + service::{ + self, + globals::marker, + rooms::short::{ShortEventId, ShortStateHash}, + }, utils::{self, on_demand_hashmap::KeyToken}, Error, Result, }; impl service::rooms::state::Data for KeyValueDatabase { - fn get_room_shortstatehash(&self, room_id: &RoomId) -> Result> { + fn get_room_shortstatehash( + &self, + room_id: &RoomId, + ) -> Result> { self.roomid_shortstatehash.get(room_id.as_bytes())?.map_or( Ok(None), |bytes| { - Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| { - Error::bad_database( - "Invalid shortstatehash in roomid_shortstatehash", - ) - })?)) + Ok(Some(ShortStateHash::new( + utils::u64_from_bytes(&bytes).map_err(|_| { + Error::bad_database( + "Invalid shortstatehash in roomid_shortstatehash", + ) + })?, + ))) }, ) } @@ -26,21 +35,23 @@ impl service::rooms::state::Data for KeyValueDatabase { fn set_room_state( &self, room_id: &KeyToken, - new_shortstatehash: u64, + new_shortstatehash: ShortStateHash, ) -> Result<()> { - self.roomid_shortstatehash - .insert(room_id.as_bytes(), &new_shortstatehash.to_be_bytes())?; + self.roomid_shortstatehash.insert( + room_id.as_bytes(), + &new_shortstatehash.get().to_be_bytes(), + )?; Ok(()) } fn set_event_state( &self, - shorteventid: u64, - shortstatehash: u64, + shorteventid: ShortEventId, + shortstatehash: ShortStateHash, ) -> Result<()> { self.shorteventid_shortstatehash.insert( - &shorteventid.to_be_bytes(), - &shortstatehash.to_be_bytes(), + &shorteventid.get().to_be_bytes(), + &shortstatehash.get().to_be_bytes(), )?; Ok(()) } diff --git a/src/database/key_value/rooms/state_accessor.rs b/src/database/key_value/rooms/state_accessor.rs index 39b7d1d6..b2265ba2 100644 --- a/src/database/key_value/rooms/state_accessor.rs +++ b/src/database/key_value/rooms/state_accessor.rs @@ -4,16 +4,20 @@ use async_trait::async_trait; use ruma::{events::StateEventType, EventId, RoomId}; use crate::{ - database::KeyValueDatabase, service, services, utils, Error, PduEvent, - Result, + database::KeyValueDatabase, + service::{ + self, + rooms::short::{ShortStateHash, ShortStateKey}, + }, + services, utils, Error, PduEvent, Result, }; #[async_trait] impl service::rooms::state_accessor::Data for KeyValueDatabase { async fn state_full_ids( &self, - shortstatehash: u64, - ) -> Result>> { + shortstatehash: ShortStateHash, + ) -> Result>> { let full_state = services() .rooms .state_compressor @@ -40,7 +44,7 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { async fn state_full( &self, - shortstatehash: u64, + shortstatehash: ShortStateHash, ) -> Result>> { let full_state = services() .rooms @@ -87,7 +91,7 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { /// `state_key`). fn state_get_id( &self, - shortstatehash: u64, + shortstatehash: ShortStateHash, event_type: &StateEventType, state_key: &str, ) -> Result>> { @@ -105,7 +109,7 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { .full_state; Ok(full_state .iter() - .find(|bytes| bytes.starts_with(&shortstatekey.to_be_bytes())) + .find(|compressed| compressed.state == shortstatekey) .and_then(|compressed| { services() .rooms @@ -120,7 +124,7 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { /// `state_key`). fn state_get( &self, - shortstatehash: u64, + shortstatehash: ShortStateHash, event_type: &StateEventType, state_key: &str, ) -> Result>> { @@ -131,19 +135,24 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { } /// Returns the state hash for this pdu. - fn pdu_shortstatehash(&self, event_id: &EventId) -> Result> { + fn pdu_shortstatehash( + &self, + event_id: &EventId, + ) -> Result> { self.eventid_shorteventid.get(event_id.as_bytes())?.map_or( Ok(None), |shorteventid| { self.shorteventid_shortstatehash .get(&shorteventid)? .map(|bytes| { - utils::u64_from_bytes(&bytes).map_err(|_| { - Error::bad_database( - "Invalid shortstatehash bytes in \ - shorteventid_shortstatehash", - ) - }) + utils::u64_from_bytes(&bytes) + .map_err(|_| { + Error::bad_database( + "Invalid shortstatehash bytes in \ + shorteventid_shortstatehash", + ) + }) + .map(ShortStateHash::new) }) .transpose() }, diff --git a/src/database/key_value/rooms/state_compressor.rs b/src/database/key_value/rooms/state_compressor.rs index 7fb24698..4f487ce2 100644 --- a/src/database/key_value/rooms/state_compressor.rs +++ b/src/database/key_value/rooms/state_compressor.rs @@ -2,19 +2,28 @@ use std::{collections::HashSet, mem::size_of, sync::Arc}; use crate::{ database::KeyValueDatabase, - service::{self, rooms::state_compressor::data::StateDiff}, + service::{ + self, + rooms::{ + short::ShortStateHash, + state_compressor::{data::StateDiff, CompressedStateEvent}, + }, + }, utils, Error, Result, }; impl service::rooms::state_compressor::Data for KeyValueDatabase { - fn get_statediff(&self, shortstatehash: u64) -> Result { + fn get_statediff( + &self, + shortstatehash: ShortStateHash, + ) -> Result { let value = self .shortstatehash_statediff - .get(&shortstatehash.to_be_bytes())? + .get(&shortstatehash.get().to_be_bytes())? .ok_or_else(|| Error::bad_database("State hash does not exist"))?; let parent = utils::u64_from_bytes(&value[0..size_of::()]) .expect("bytes have right length"); - let parent = (parent != 0).then_some(parent); + let parent = (parent != 0).then_some(ShortStateHash::new(parent)); let mut add_mode = true; let mut added = HashSet::new(); @@ -28,10 +37,13 @@ impl service::rooms::state_compressor::Data for KeyValueDatabase { continue; } if add_mode { - added.insert(v.try_into().expect("we checked the size above")); + added.insert(CompressedStateEvent::from_bytes( + v.try_into().expect("we checked the size above"), + )); } else { - removed - .insert(v.try_into().expect("we checked the size above")); + removed.insert(CompressedStateEvent::from_bytes( + v.try_into().expect("we checked the size above"), + )); } i += 2 * size_of::(); } @@ -45,22 +57,23 @@ impl service::rooms::state_compressor::Data for KeyValueDatabase { fn save_statediff( &self, - shortstatehash: u64, + shortstatehash: ShortStateHash, diff: StateDiff, ) -> Result<()> { - let mut value = diff.parent.unwrap_or(0).to_be_bytes().to_vec(); + let mut value = + diff.parent.map_or(0, |h| h.get()).to_be_bytes().to_vec(); for new in diff.added.iter() { - value.extend_from_slice(&new[..]); + value.extend_from_slice(&new.as_bytes()); } if !diff.removed.is_empty() { value.extend_from_slice(&0_u64.to_be_bytes()); for removed in diff.removed.iter() { - value.extend_from_slice(&removed[..]); + value.extend_from_slice(&removed.as_bytes()); } } self.shortstatehash_statediff - .insert(&shortstatehash.to_be_bytes(), &value) + .insert(&shortstatehash.get().to_be_bytes(), &value) } } diff --git a/src/database/key_value/rooms/threads.rs b/src/database/key_value/rooms/threads.rs index bb9a1712..fe762fa0 100644 --- a/src/database/key_value/rooms/threads.rs +++ b/src/database/key_value/rooms/threads.rs @@ -24,6 +24,7 @@ impl service::rooms::threads::Data for KeyValueDatabase { .short .get_shortroomid(room_id)? .expect("room exists") + .get() .to_be_bytes() .to_vec(); diff --git a/src/database/key_value/rooms/timeline.rs b/src/database/key_value/rooms/timeline.rs index 951975e5..656598db 100644 --- a/src/database/key_value/rooms/timeline.rs +++ b/src/database/key_value/rooms/timeline.rs @@ -383,6 +383,7 @@ fn count_to_id( .ok_or_else(|| { Error::bad_database("Looked for bad shortroomid in timeline") })? + .get() .to_be_bytes() .to_vec(); let mut pdu_id = prefix.clone(); diff --git a/src/database/key_value/rooms/user.rs b/src/database/key_value/rooms/user.rs index 53461d27..b474def3 100644 --- a/src/database/key_value/rooms/user.rs +++ b/src/database/key_value/rooms/user.rs @@ -1,7 +1,9 @@ use ruma::{OwnedRoomId, OwnedUserId, RoomId, UserId}; use crate::{ - database::KeyValueDatabase, service, services, utils, Error, Result, + database::KeyValueDatabase, + service::{self, rooms::short::ShortStateHash}, + services, utils, Error, Result, }; impl service::rooms::user::Data for KeyValueDatabase { @@ -95,7 +97,7 @@ impl service::rooms::user::Data for KeyValueDatabase { &self, room_id: &RoomId, token: u64, - shortstatehash: u64, + shortstatehash: ShortStateHash, ) -> Result<()> { let shortroomid = services() .rooms @@ -103,36 +105,38 @@ impl service::rooms::user::Data for KeyValueDatabase { .get_shortroomid(room_id)? .expect("room exists"); - let mut key = shortroomid.to_be_bytes().to_vec(); + let mut key = shortroomid.get().to_be_bytes().to_vec(); key.extend_from_slice(&token.to_be_bytes()); self.roomsynctoken_shortstatehash - .insert(&key, &shortstatehash.to_be_bytes()) + .insert(&key, &shortstatehash.get().to_be_bytes()) } fn get_token_shortstatehash( &self, room_id: &RoomId, token: u64, - ) -> Result> { + ) -> Result> { let shortroomid = services() .rooms .short .get_shortroomid(room_id)? .expect("room exists"); - let mut key = shortroomid.to_be_bytes().to_vec(); + let mut key = shortroomid.get().to_be_bytes().to_vec(); key.extend_from_slice(&token.to_be_bytes()); self.roomsynctoken_shortstatehash .get(&key)? .map(|bytes| { - utils::u64_from_bytes(&bytes).map_err(|_| { - Error::bad_database( - "Invalid shortstatehash in \ - roomsynctoken_shortstatehash", - ) - }) + utils::u64_from_bytes(&bytes) + .map_err(|_| { + Error::bad_database( + "Invalid shortstatehash in \ + roomsynctoken_shortstatehash", + ) + }) + .map(ShortStateHash::new) }) .transpose() } diff --git a/src/service/rooms/auth_chain.rs b/src/service/rooms/auth_chain.rs index b45fab8e..5dcf9275 100644 --- a/src/service/rooms/auth_chain.rs +++ b/src/service/rooms/auth_chain.rs @@ -8,6 +8,7 @@ pub(crate) use data::Data; use ruma::{api::client::error::ErrorKind, EventId, RoomId}; use tracing::{debug, error, warn}; +use super::short::ShortEventId; use crate::{services, utils::debug_slice_truncated, Error, Result}; pub(crate) struct Service { @@ -17,16 +18,16 @@ pub(crate) struct Service { impl Service { pub(crate) fn get_cached_eventid_authchain( &self, - key: &[u64], - ) -> Result>>> { + key: &[ShortEventId], + ) -> Result>>> { self.db.get_cached_eventid_authchain(key) } #[tracing::instrument(skip(self))] pub(crate) fn cache_auth_chain( &self, - key: Vec, - auth_chain: Arc>, + key: Vec, + auth_chain: Arc>, ) -> Result<()> { self.db.cache_auth_chain(key, auth_chain) } @@ -51,7 +52,7 @@ impl Service { // I'm afraid to change this in case there is accidental reliance on // the truncation #[allow(clippy::as_conversions, clippy::cast_possible_truncation)] - let bucket_id = (short % NUM_BUCKETS as u64) as usize; + let bucket_id = (short.get() % NUM_BUCKETS as u64) as usize; buckets[bucket_id].insert((short, id.clone())); i += 1; if i % 100 == 0 { @@ -68,7 +69,7 @@ impl Service { continue; } - let chunk_key: Vec = + let chunk_key: Vec<_> = chunk.iter().map(|(short, _)| short).copied().collect(); if let Some(cached) = self.get_cached_eventid_authchain(&chunk_key)? @@ -139,7 +140,7 @@ impl Service { &self, room_id: &RoomId, event_id: &EventId, - ) -> Result> { + ) -> Result> { let mut todo = vec![Arc::from(event_id)]; let mut found = HashSet::new(); diff --git a/src/service/rooms/auth_chain/data.rs b/src/service/rooms/auth_chain/data.rs index 5ecaee34..5d01b1e2 100644 --- a/src/service/rooms/auth_chain/data.rs +++ b/src/service/rooms/auth_chain/data.rs @@ -1,15 +1,15 @@ use std::{collections::HashSet, sync::Arc}; -use crate::Result; +use crate::{service::rooms::short::ShortEventId, Result}; pub(crate) trait Data: Send + Sync { fn get_cached_eventid_authchain( &self, - shorteventid: &[u64], - ) -> Result>>>; + shorteventid: &[ShortEventId], + ) -> Result>>>; fn cache_auth_chain( &self, - shorteventid: Vec, - auth_chain: Arc>, + shorteventid: Vec, + auth_chain: Arc>, ) -> Result<()>; } diff --git a/src/service/rooms/event_handler.rs b/src/service/rooms/event_handler.rs index 6bf29e8e..cd9ca375 100644 --- a/src/service/rooms/event_handler.rs +++ b/src/service/rooms/event_handler.rs @@ -37,7 +37,10 @@ use serde_json::value::RawValue as RawJsonValue; use tokio::sync::{RwLock, RwLockWriteGuard, Semaphore}; use tracing::{debug, error, info, trace, warn}; -use super::{state_compressor::CompressedStateEvent, timeline::PduId}; +use super::{ + short::ShortStateKey, state_compressor::CompressedStateEvent, + timeline::PduId, +}; use crate::{ service::{globals::SigningKeys, pdu}, services, @@ -1120,7 +1123,7 @@ impl Service { &self, room_id: &RoomId, room_version_id: &RoomVersionId, - incoming_state: HashMap>, + incoming_state: HashMap>, ) -> Result>> { debug!("Loading current room state ids"); let current_sstatehash = services() diff --git a/src/service/rooms/pdu_metadata/data.rs b/src/service/rooms/pdu_metadata/data.rs index 9aace01f..baab44a4 100644 --- a/src/service/rooms/pdu_metadata/data.rs +++ b/src/service/rooms/pdu_metadata/data.rs @@ -2,7 +2,10 @@ use std::sync::Arc; use ruma::{EventId, RoomId, UserId}; -use crate::{service::rooms::timeline::PduCount, PduEvent, Result}; +use crate::{ + service::rooms::{short::ShortRoomId, timeline::PduCount}, + PduEvent, Result, +}; pub(crate) trait Data: Send + Sync { fn add_relation(&self, from: u64, to: u64) -> Result<()>; @@ -10,7 +13,7 @@ pub(crate) trait Data: Send + Sync { fn relations_until<'a>( &'a self, user_id: &'a UserId, - room_id: u64, + room_id: ShortRoomId, target: u64, until: PduCount, ) -> Result> + 'a>>; diff --git a/src/service/rooms/search/data.rs b/src/service/rooms/search/data.rs index 515b7d04..2372a9aa 100644 --- a/src/service/rooms/search/data.rs +++ b/src/service/rooms/search/data.rs @@ -1,18 +1,21 @@ use ruma::RoomId; -use crate::{service::rooms::timeline::PduId, Result}; +use crate::{ + service::rooms::{short::ShortRoomId, timeline::PduId}, + Result, +}; pub(crate) trait Data: Send + Sync { fn index_pdu( &self, - shortroomid: u64, + shortroomid: ShortRoomId, pdu_id: &PduId, message_body: &str, ) -> Result<()>; fn deindex_pdu( &self, - shortroomid: u64, + shortroomid: ShortRoomId, pdu_id: &PduId, message_body: &str, ) -> Result<()>; diff --git a/src/service/rooms/short.rs b/src/service/rooms/short.rs index 411199f4..97a7edc8 100644 --- a/src/service/rooms/short.rs +++ b/src/service/rooms/short.rs @@ -1,4 +1,27 @@ mod data; +macro_rules! short_id_type { + ($name:ident) => { + #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] + #[repr(transparent)] + pub(crate) struct $name(u64); + + impl $name { + pub(crate) fn new(id: u64) -> Self { + Self(id) + } + + pub(crate) fn get(&self) -> u64 { + self.0 + } + } + }; +} + +short_id_type!(ShortRoomId); +short_id_type!(ShortEventId); +short_id_type!(ShortStateHash); +short_id_type!(ShortStateKey); + pub(crate) use data::Data; pub(crate) type Service = &'static dyn Data; diff --git a/src/service/rooms/short/data.rs b/src/service/rooms/short/data.rs index dcde51c3..3650a0b1 100644 --- a/src/service/rooms/short/data.rs +++ b/src/service/rooms/short/data.rs @@ -2,38 +2,47 @@ use std::sync::Arc; use ruma::{events::StateEventType, EventId, RoomId}; +use super::{ShortEventId, ShortRoomId, ShortStateHash, ShortStateKey}; use crate::Result; pub(crate) trait Data: Send + Sync { - fn get_or_create_shorteventid(&self, event_id: &EventId) -> Result; + fn get_or_create_shorteventid( + &self, + event_id: &EventId, + ) -> Result; fn get_shortstatekey( &self, event_type: &StateEventType, state_key: &str, - ) -> Result>; + ) -> Result>; fn get_or_create_shortstatekey( &self, event_type: &StateEventType, state_key: &str, - ) -> Result; + ) -> Result; - fn get_eventid_from_short(&self, shorteventid: u64) - -> Result>; + fn get_eventid_from_short( + &self, + shorteventid: ShortEventId, + ) -> Result>; fn get_statekey_from_short( &self, - shortstatekey: u64, + shortstatekey: ShortStateKey, ) -> Result<(StateEventType, String)>; /// Returns `(shortstatehash, already_existed)` fn get_or_create_shortstatehash( &self, state_hash: &[u8], - ) -> Result<(u64, bool)>; + ) -> Result<(ShortStateHash, bool)>; - fn get_shortroomid(&self, room_id: &RoomId) -> Result>; + fn get_shortroomid(&self, room_id: &RoomId) -> Result>; - fn get_or_create_shortroomid(&self, room_id: &RoomId) -> Result; + fn get_or_create_shortroomid( + &self, + room_id: &RoomId, + ) -> Result; } diff --git a/src/service/rooms/state.rs b/src/service/rooms/state.rs index 485d2b8c..2ff970bb 100644 --- a/src/service/rooms/state.rs +++ b/src/service/rooms/state.rs @@ -18,7 +18,7 @@ use ruma::{ use serde::Deserialize; use tracing::warn; -use super::state_compressor::CompressedStateEvent; +use super::{short::ShortStateHash, state_compressor::CompressedStateEvent}; use crate::{ service::globals::marker, services, @@ -38,7 +38,7 @@ impl Service { pub(crate) async fn force_state( &self, room_id: &KeyToken, - shortstatehash: u64, + shortstatehash: ShortStateHash, statediffnew: Arc>, _statediffremoved: Arc>, ) -> Result<()> { @@ -126,15 +126,16 @@ impl Service { event_id: &EventId, room_id: &RoomId, state_ids_compressed: Arc>, - ) -> Result { + ) -> Result { let shorteventid = services().rooms.short.get_or_create_shorteventid(event_id)?; let previous_shortstatehash = self.db.get_room_shortstatehash(room_id)?; - let state_hash = - calculate_hash(state_ids_compressed.iter().map(|s| &s[..])); + let state_hash = calculate_hash( + state_ids_compressed.iter().map(CompressedStateEvent::as_bytes), + ); let (shortstatehash, already_existed) = services().rooms.short.get_or_create_shortstatehash(&state_hash)?; @@ -187,7 +188,10 @@ impl Service { /// This adds all current state events (not including the incoming event) /// to `stateid_pduid` and adds the incoming event to `eventid_statehash`. #[tracing::instrument(skip(self, new_pdu))] - pub(crate) fn append_to_state(&self, new_pdu: &PduEvent) -> Result { + pub(crate) fn append_to_state( + &self, + new_pdu: &PduEvent, + ) -> Result { let shorteventid = services() .rooms .short @@ -225,9 +229,9 @@ impl Service { let replaces = states_parents .last() .map(|info| { - info.full_state.iter().find(|bytes| { - bytes.starts_with(&shortstatekey.to_be_bytes()) - }) + info.full_state + .iter() + .find(|compressed| compressed.state == shortstatekey) }) .unwrap_or_default(); @@ -236,7 +240,8 @@ impl Service { } // TODO: statehash with deterministic inputs - let shortstatehash = services().globals.next_count()?; + let shortstatehash = + ShortStateHash::new(services().globals.next_count()?); let mut statediffnew = HashSet::new(); statediffnew.insert(new); @@ -320,7 +325,7 @@ impl Service { pub(crate) fn set_room_state( &self, room_id: &KeyToken, - shortstatehash: u64, + shortstatehash: ShortStateHash, ) -> Result<()> { self.db.set_room_state(room_id, shortstatehash) } @@ -362,7 +367,7 @@ impl Service { pub(crate) fn get_room_shortstatehash( &self, room_id: &RoomId, - ) -> Result> { + ) -> Result> { self.db.get_room_shortstatehash(room_id) } diff --git a/src/service/rooms/state/data.rs b/src/service/rooms/state/data.rs index 6f15b004..452c3b1b 100644 --- a/src/service/rooms/state/data.rs +++ b/src/service/rooms/state/data.rs @@ -3,25 +3,33 @@ use std::{collections::HashSet, sync::Arc}; use ruma::{EventId, OwnedEventId, OwnedRoomId, RoomId}; use crate::{ - service::globals::marker, utils::on_demand_hashmap::KeyToken, Result, + service::{ + globals::marker, + rooms::short::{ShortEventId, ShortStateHash}, + }, + utils::on_demand_hashmap::KeyToken, + Result, }; pub(crate) trait Data: Send + Sync { /// Returns the last state hash key added to the db for the given room. - fn get_room_shortstatehash(&self, room_id: &RoomId) -> Result>; + fn get_room_shortstatehash( + &self, + room_id: &RoomId, + ) -> Result>; /// Set the state hash to a new version, but does not update `state_cache`. fn set_room_state( &self, room_id: &KeyToken, - new_shortstatehash: u64, + new_shortstatehash: ShortStateHash, ) -> Result<()>; /// Associates a state with an event. fn set_event_state( &self, - shorteventid: u64, - shortstatehash: u64, + shorteventid: ShortEventId, + shortstatehash: ShortStateHash, ) -> Result<()>; /// Returns all events we would send as the `prev_events` of the next event. diff --git a/src/service/rooms/state_accessor.rs b/src/service/rooms/state_accessor.rs index b9e9f702..f27a69ee 100644 --- a/src/service/rooms/state_accessor.rs +++ b/src/service/rooms/state_accessor.rs @@ -26,6 +26,7 @@ use ruma::{ use serde_json::value::to_raw_value; use tracing::{error, warn}; +use super::short::{ShortStateHash, ShortStateKey}; use crate::{ observability::{FoundIn, Lookup, METRICS}, service::{globals::marker, pdu::PduBuilder}, @@ -37,8 +38,9 @@ use crate::{ pub(crate) struct Service { pub(crate) db: &'static dyn Data, pub(crate) server_visibility_cache: - Mutex>, - pub(crate) user_visibility_cache: Mutex>, + Mutex>, + pub(crate) user_visibility_cache: + Mutex>, } impl Service { @@ -47,15 +49,15 @@ impl Service { #[tracing::instrument(skip(self))] pub(crate) async fn state_full_ids( &self, - shortstatehash: u64, - ) -> Result>> { + shortstatehash: ShortStateHash, + ) -> Result>> { self.db.state_full_ids(shortstatehash).await } #[tracing::instrument(skip(self))] pub(crate) async fn state_full( &self, - shortstatehash: u64, + shortstatehash: ShortStateHash, ) -> Result>> { self.db.state_full(shortstatehash).await } @@ -65,7 +67,7 @@ impl Service { #[tracing::instrument(skip(self))] pub(crate) fn state_get_id( &self, - shortstatehash: u64, + shortstatehash: ShortStateHash, event_type: &StateEventType, state_key: &str, ) -> Result>> { @@ -77,7 +79,7 @@ impl Service { #[tracing::instrument(skip(self))] pub(crate) fn state_get( &self, - shortstatehash: u64, + shortstatehash: ShortStateHash, event_type: &StateEventType, state_key: &str, ) -> Result>> { @@ -88,7 +90,7 @@ impl Service { #[tracing::instrument(skip(self))] fn user_membership( &self, - shortstatehash: u64, + shortstatehash: ShortStateHash, user_id: &UserId, ) -> Result { self.state_get( @@ -109,7 +111,11 @@ impl Service { /// The user was a joined member at this state (potentially in the past) #[tracing::instrument(skip(self), ret(level = "trace"))] - fn user_was_joined(&self, shortstatehash: u64, user_id: &UserId) -> bool { + fn user_was_joined( + &self, + shortstatehash: ShortStateHash, + user_id: &UserId, + ) -> bool { self.user_membership(shortstatehash, user_id) .is_ok_and(|s| s == MembershipState::Join) } @@ -117,7 +123,11 @@ impl Service { /// The user was an invited or joined room member at this state (potentially /// in the past) #[tracing::instrument(skip(self), ret(level = "trace"))] - fn user_was_invited(&self, shortstatehash: u64, user_id: &UserId) -> bool { + fn user_was_invited( + &self, + shortstatehash: ShortStateHash, + user_id: &UserId, + ) -> bool { self.user_membership(shortstatehash, user_id).is_ok_and(|s| { s == MembershipState::Join || s == MembershipState::Invite }) @@ -315,7 +325,7 @@ impl Service { pub(crate) fn pdu_shortstatehash( &self, event_id: &EventId, - ) -> Result> { + ) -> Result> { self.db.pdu_shortstatehash(event_id) } diff --git a/src/service/rooms/state_accessor/data.rs b/src/service/rooms/state_accessor/data.rs index 68214f1d..4fd1b2af 100644 --- a/src/service/rooms/state_accessor/data.rs +++ b/src/service/rooms/state_accessor/data.rs @@ -3,7 +3,10 @@ use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; use ruma::{events::StateEventType, EventId, RoomId}; -use crate::{PduEvent, Result}; +use crate::{ + service::rooms::short::{ShortStateHash, ShortStateKey}, + PduEvent, Result, +}; #[async_trait] pub(crate) trait Data: Send + Sync { @@ -11,19 +14,19 @@ pub(crate) trait Data: Send + Sync { /// with state_hash, this gives the full state for the given state_hash. async fn state_full_ids( &self, - shortstatehash: u64, - ) -> Result>>; + shortstatehash: ShortStateHash, + ) -> Result>>; async fn state_full( &self, - shortstatehash: u64, + shortstatehash: ShortStateHash, ) -> Result>>; /// Returns a single PDU from `room_id` with key (`event_type`, /// `state_key`). fn state_get_id( &self, - shortstatehash: u64, + shortstatehash: ShortStateHash, event_type: &StateEventType, state_key: &str, ) -> Result>>; @@ -32,13 +35,16 @@ pub(crate) trait Data: Send + Sync { /// `state_key`). fn state_get( &self, - shortstatehash: u64, + shortstatehash: ShortStateHash, event_type: &StateEventType, state_key: &str, ) -> Result>>; /// Returns the state hash for this pdu. - fn pdu_shortstatehash(&self, event_id: &EventId) -> Result>; + fn pdu_shortstatehash( + &self, + event_id: &EventId, + ) -> Result>; /// Returns the full room state. async fn room_state_full( diff --git a/src/service/rooms/state_compressor.rs b/src/service/rooms/state_compressor.rs index a545cf5e..42dc8878 100644 --- a/src/service/rooms/state_compressor.rs +++ b/src/service/rooms/state_compressor.rs @@ -1,4 +1,5 @@ use std::{ + array, collections::HashSet, mem::size_of, sync::{Arc, Mutex}, @@ -17,9 +18,11 @@ pub(crate) mod data; pub(crate) use data::Data; use data::StateDiff; +use super::short::{ShortEventId, ShortStateHash, ShortStateKey}; + #[derive(Clone)] pub(crate) struct CompressedStateLayer { - pub(crate) shortstatehash: u64, + pub(crate) shortstatehash: ShortStateHash, pub(crate) full_state: Arc>, pub(crate) added: Arc>, pub(crate) removed: Arc>, @@ -29,10 +32,45 @@ pub(crate) struct Service { pub(crate) db: &'static dyn Data, #[allow(clippy::type_complexity)] - pub(crate) stateinfo_cache: Mutex>>, + pub(crate) stateinfo_cache: + Mutex>>, } -pub(crate) type CompressedStateEvent = [u8; 2 * size_of::()]; +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub(crate) struct CompressedStateEvent { + pub(crate) state: ShortStateKey, + pub(crate) event: ShortEventId, +} + +impl CompressedStateEvent { + pub(crate) fn as_bytes( + &self, + ) -> [u8; size_of::() + size_of::()] { + let mut bytes = self + .state + .get() + .to_be_bytes() + .into_iter() + .chain(self.event.get().to_be_bytes()); + array::from_fn(|_| bytes.next().unwrap()) + } + + pub(crate) fn from_bytes( + bytes: [u8; size_of::() + size_of::()], + ) -> Self { + let state = ShortStateKey::new(u64::from_be_bytes( + bytes[0..8].try_into().unwrap(), + )); + let event = ShortEventId::new(u64::from_be_bytes( + bytes[8..16].try_into().unwrap(), + )); + + Self { + state, + event, + } + } +} impl Service { /// Returns a stack with info on shortstatehash, full state, added diff and @@ -41,7 +79,7 @@ impl Service { #[tracing::instrument(skip(self))] pub(crate) fn load_shortstatehash_info( &self, - shortstatehash: u64, + shortstatehash: ShortStateHash, ) -> Result> { let lookup = Lookup::StateInfo; @@ -96,18 +134,16 @@ impl Service { #[allow(clippy::unused_self)] pub(crate) fn compress_state_event( &self, - shortstatekey: u64, + shortstatekey: ShortStateKey, event_id: &EventId, ) -> Result { - let mut v = shortstatekey.to_be_bytes().to_vec(); - v.extend_from_slice( - &services() + Ok(CompressedStateEvent { + state: shortstatekey, + event: services() .rooms .short - .get_or_create_shorteventid(event_id)? - .to_be_bytes(), - ); - Ok(v.try_into().expect("we checked the size above")) + .get_or_create_shorteventid(event_id)?, + }) } /// Returns shortstatekey, event id @@ -116,14 +152,13 @@ impl Service { pub(crate) fn parse_compressed_state_event( &self, compressed_event: &CompressedStateEvent, - ) -> Result<(u64, Arc)> { + ) -> Result<(ShortStateKey, Arc)> { Ok(( - utils::u64_from_bytes(&compressed_event[0..size_of::()]) - .expect("bytes have right length"), - services().rooms.short.get_eventid_from_short( - utils::u64_from_bytes(&compressed_event[size_of::()..]) - .expect("bytes have right length"), - )?, + compressed_event.state, + services() + .rooms + .short + .get_eventid_from_short(compressed_event.event)?, )) } @@ -155,7 +190,7 @@ impl Service { ))] pub(crate) fn save_state_from_diff( &self, - shortstatehash: u64, + shortstatehash: ShortStateHash, statediffnew: Arc>, statediffremoved: Arc>, diff_to_sibling: usize, @@ -275,7 +310,7 @@ impl Service { room_id: &RoomId, new_state_ids_compressed: Arc>, ) -> Result<( - u64, + ShortStateHash, Arc>, Arc>, )> { @@ -283,7 +318,7 @@ impl Service { services().rooms.state.get_room_shortstatehash(room_id)?; let state_hash = utils::calculate_hash( - new_state_ids_compressed.iter().map(|bytes| &bytes[..]), + new_state_ids_compressed.iter().map(CompressedStateEvent::as_bytes), ); let (new_shortstatehash, already_existed) = diff --git a/src/service/rooms/state_compressor/data.rs b/src/service/rooms/state_compressor/data.rs index 3d9ffc19..aafaf009 100644 --- a/src/service/rooms/state_compressor/data.rs +++ b/src/service/rooms/state_compressor/data.rs @@ -1,19 +1,22 @@ use std::{collections::HashSet, sync::Arc}; use super::CompressedStateEvent; -use crate::Result; +use crate::{service::rooms::short::ShortStateHash, Result}; pub(crate) struct StateDiff { - pub(crate) parent: Option, + pub(crate) parent: Option, pub(crate) added: Arc>, pub(crate) removed: Arc>, } pub(crate) trait Data: Send + Sync { - fn get_statediff(&self, shortstatehash: u64) -> Result; + fn get_statediff( + &self, + shortstatehash: ShortStateHash, + ) -> Result; fn save_statediff( &self, - shortstatehash: u64, + shortstatehash: ShortStateHash, diff: StateDiff, ) -> Result<()>; } diff --git a/src/service/rooms/timeline.rs b/src/service/rooms/timeline.rs index 8df9c410..257b2fac 100644 --- a/src/service/rooms/timeline.rs +++ b/src/service/rooms/timeline.rs @@ -31,7 +31,7 @@ use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; use tokio::sync::RwLock; use tracing::{error, info, warn}; -use super::state_compressor::CompressedStateEvent; +use super::{short::ShortRoomId, state_compressor::CompressedStateEvent}; use crate::{ api::server_server, service::{ @@ -297,7 +297,7 @@ impl Service { .reset_notification_counts(&pdu.sender, &pdu.room_id)?; let count2 = services().globals.next_count()?; - let mut pdu_id = shortroomid.to_be_bytes().to_vec(); + let mut pdu_id = shortroomid.get().to_be_bytes().to_vec(); pdu_id.extend_from_slice(&count2.to_be_bytes()); let pdu_id = PduId::new(pdu_id); @@ -1194,7 +1194,7 @@ impl Service { &self, event_id: &EventId, reason: &PduEvent, - shortroomid: u64, + shortroomid: ShortRoomId, ) -> Result<()> { // TODO: Don't reserialize, keep original json if let Some(pdu_id) = self.get_pdu_id(event_id)? { @@ -1359,7 +1359,7 @@ impl Service { .await; let count = services().globals.next_count()?; - let mut pdu_id = shortroomid.to_be_bytes().to_vec(); + let mut pdu_id = shortroomid.get().to_be_bytes().to_vec(); pdu_id.extend_from_slice(&0_u64.to_be_bytes()); pdu_id.extend_from_slice(&(u64::MAX - count).to_be_bytes()); let pdu_id = PduId::new(pdu_id); diff --git a/src/service/rooms/user/data.rs b/src/service/rooms/user/data.rs index bfcb8f70..d948e3e0 100644 --- a/src/service/rooms/user/data.rs +++ b/src/service/rooms/user/data.rs @@ -1,6 +1,6 @@ use ruma::{OwnedRoomId, OwnedUserId, RoomId, UserId}; -use crate::Result; +use crate::{service::rooms::short::ShortStateHash, Result}; pub(crate) trait Data: Send + Sync { fn reset_notification_counts( @@ -32,14 +32,14 @@ pub(crate) trait Data: Send + Sync { &self, room_id: &RoomId, token: u64, - shortstatehash: u64, + shortstatehash: ShortStateHash, ) -> Result<()>; fn get_token_shortstatehash( &self, room_id: &RoomId, token: u64, - ) -> Result>; + ) -> Result>; fn get_shared_rooms<'a>( &'a self,