move our_real_users_cache to service

This commit is contained in:
Charles Hall 2024-10-08 15:21:07 -07:00
parent d3b62e598d
commit 9d62865b28
No known key found for this signature in database
GPG key ID: 7B8E0645816E07CF
5 changed files with 56 additions and 47 deletions

View file

@ -10,7 +10,7 @@ use std::{
use ruma::{ use ruma::{
events::{push_rules::PushRulesEvent, GlobalAccountDataEventType}, events::{push_rules::PushRulesEvent, GlobalAccountDataEventType},
push::Ruleset, push::Ruleset,
EventId, OwnedRoomId, OwnedUserId, RoomId, UserId, EventId, OwnedRoomId, RoomId, UserId,
}; };
use tracing::{debug, error, info, info_span, warn, Instrument}; use tracing::{debug, error, info, info_span, warn, Instrument};
@ -233,8 +233,6 @@ pub(crate) struct KeyValueDatabase {
pub(super) senderkey_pusher: Arc<dyn KvTree>, pub(super) senderkey_pusher: Arc<dyn KvTree>,
// Uncategorized trees // Uncategorized trees
pub(super) our_real_users_cache:
RwLock<HashMap<OwnedRoomId, Arc<HashSet<OwnedUserId>>>>,
pub(super) appservice_in_room_cache: pub(super) appservice_in_room_cache:
RwLock<HashMap<OwnedRoomId, HashMap<String, bool>>>, RwLock<HashMap<OwnedRoomId, HashMap<String, bool>>>,
pub(super) lasttimelinecount_cache: Mutex<HashMap<OwnedRoomId, PduCount>>, pub(super) lasttimelinecount_cache: Mutex<HashMap<OwnedRoomId, PduCount>>,
@ -456,7 +454,6 @@ impl KeyValueDatabase {
global: builder.open_tree("global")?, global: builder.open_tree("global")?,
server_signingkeys: builder.open_tree("server_signingkeys")?, server_signingkeys: builder.open_tree("server_signingkeys")?,
our_real_users_cache: RwLock::new(HashMap::new()),
appservice_in_room_cache: RwLock::new(HashMap::new()), appservice_in_room_cache: RwLock::new(HashMap::new()),
lasttimelinecount_cache: Mutex::new(HashMap::new()), lasttimelinecount_cache: Mutex::new(HashMap::new()),
}; };

View file

@ -1,4 +1,4 @@
use std::{collections::HashSet, sync::Arc}; use std::collections::HashSet;
use ruma::{ use ruma::{
events::{AnyStrippedStateEvent, AnySyncStateEvent}, events::{AnyStrippedStateEvent, AnySyncStateEvent},
@ -101,7 +101,10 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
Ok(()) Ok(())
} }
fn update_joined_count(&self, room_id: &RoomId) -> Result<()> { fn update_joined_count(
&self,
room_id: &RoomId,
) -> Result<HashSet<OwnedUserId>> {
let mut joinedcount = 0_u64; let mut joinedcount = 0_u64;
let mut invitedcount = 0_u64; let mut invitedcount = 0_u64;
let mut joined_servers = HashSet::new(); let mut joined_servers = HashSet::new();
@ -129,11 +132,6 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
self.roomid_invitedcount self.roomid_invitedcount
.insert(room_id.as_bytes(), &invitedcount.to_be_bytes())?; .insert(room_id.as_bytes(), &invitedcount.to_be_bytes())?;
self.our_real_users_cache
.write()
.unwrap()
.insert(room_id.to_owned(), Arc::new(real_users));
for old_joined_server in for old_joined_server in
self.room_servers(room_id).filter_map(Result::ok) self.room_servers(room_id).filter_map(Result::ok)
{ {
@ -168,28 +166,7 @@ impl service::rooms::state_cache::Data for KeyValueDatabase {
self.appservice_in_room_cache.write().unwrap().remove(room_id); self.appservice_in_room_cache.write().unwrap().remove(room_id);
Ok(()) Ok(real_users)
}
#[tracing::instrument(skip(self))]
fn get_our_real_users(
&self,
room_id: &RoomId,
) -> Result<Arc<HashSet<OwnedUserId>>> {
let lookup = Lookup::OurRealUsers;
let maybe =
self.our_real_users_cache.read().unwrap().get(room_id).cloned();
if let Some(users) = maybe {
METRICS.record_lookup(lookup, FoundIn::Cache);
Ok(users)
} else {
self.update_joined_count(room_id)?;
METRICS.record_lookup(lookup, FoundIn::Database);
Ok(Arc::clone(
self.our_real_users_cache.read().unwrap().get(room_id).unwrap(),
))
}
} }
#[tracing::instrument( #[tracing::instrument(

View file

@ -159,9 +159,7 @@ impl Services {
(100.0 * config.cache_capacity_modifier) as usize, (100.0 * config.cache_capacity_modifier) as usize,
)), )),
}, },
state_cache: rooms::state_cache::Service { state_cache: rooms::state_cache::Service::new(db),
db,
},
state_compressor: rooms::state_compressor::Service { state_compressor: rooms::state_compressor::Service {
db, db,
#[allow( #[allow(

View file

@ -1,4 +1,7 @@
use std::{collections::HashSet, sync::Arc}; use std::{
collections::{HashMap, HashSet},
sync::{Arc, RwLock},
};
use ruma::{ use ruma::{
events::{ events::{
@ -12,19 +15,32 @@ use ruma::{
}; };
use tracing::warn; use tracing::warn;
use crate::{service::appservice::RegistrationInfo, services, Error, Result}; use crate::{
observability::{FoundIn, Lookup, METRICS},
service::appservice::RegistrationInfo,
services, Error, Result,
};
mod data; mod data;
pub(crate) use data::Data; pub(crate) use data::Data;
pub(crate) struct Service { pub(crate) struct Service {
pub(crate) db: &'static dyn Data, db: &'static dyn Data,
our_real_users_cache:
RwLock<HashMap<OwnedRoomId, Arc<HashSet<OwnedUserId>>>>,
} }
type RoomsLeft = (OwnedRoomId, Vec<Raw<AnySyncStateEvent>>); type RoomsLeft = (OwnedRoomId, Vec<Raw<AnySyncStateEvent>>);
impl Service { impl Service {
pub(crate) fn new(db: &'static dyn Data) -> Self {
Self {
db,
our_real_users_cache: RwLock::new(HashMap::new()),
}
}
/// Update current membership data. /// Update current membership data.
#[tracing::instrument(skip(self, last_state))] #[tracing::instrument(skip(self, last_state))]
pub(crate) fn update_membership( pub(crate) fn update_membership(
@ -283,8 +299,18 @@ impl Service {
} }
#[tracing::instrument(skip(self, room_id))] #[tracing::instrument(skip(self, room_id))]
pub(crate) fn update_joined_count(&self, room_id: &RoomId) -> Result<()> { pub(crate) fn update_joined_count(
self.db.update_joined_count(room_id) &self,
room_id: &RoomId,
) -> Result<Arc<HashSet<OwnedUserId>>> {
let our_real_users = Arc::new(self.db.update_joined_count(room_id)?);
self.our_real_users_cache
.write()
.unwrap()
.insert(room_id.to_owned(), our_real_users.clone());
Ok(our_real_users)
} }
#[tracing::instrument(skip(self, room_id))] #[tracing::instrument(skip(self, room_id))]
@ -292,7 +318,20 @@ impl Service {
&self, &self,
room_id: &RoomId, room_id: &RoomId,
) -> Result<Arc<HashSet<OwnedUserId>>> { ) -> Result<Arc<HashSet<OwnedUserId>>> {
self.db.get_our_real_users(room_id) let lookup = Lookup::OurRealUsers;
if let Some(our_real_users) =
self.our_real_users_cache.read().unwrap().get(room_id).cloned()
{
METRICS.record_lookup(lookup, FoundIn::Cache);
return Ok(our_real_users);
}
let our_real_users = self.update_joined_count(room_id)?;
METRICS.record_lookup(lookup, FoundIn::Database);
Ok(our_real_users)
} }
#[tracing::instrument(skip(self, room_id, appservice))] #[tracing::instrument(skip(self, room_id, appservice))]

View file

@ -1,4 +1,4 @@
use std::{collections::HashSet, sync::Arc}; use std::collections::HashSet;
use ruma::{ use ruma::{
events::{AnyStrippedStateEvent, AnySyncStateEvent}, events::{AnyStrippedStateEvent, AnySyncStateEvent},
@ -23,12 +23,10 @@ pub(crate) trait Data: Send + Sync {
) -> Result<()>; ) -> Result<()>;
fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId) -> Result<()>; fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId) -> Result<()>;
fn update_joined_count(&self, room_id: &RoomId) -> Result<()>; fn update_joined_count(
fn get_our_real_users(
&self, &self,
room_id: &RoomId, room_id: &RoomId,
) -> Result<Arc<HashSet<OwnedUserId>>>; ) -> Result<HashSet<OwnedUserId>>;
fn appservice_in_room( fn appservice_in_room(
&self, &self,