From ce7efc1effa4e2ae6c47760cbd4b74e18d0b26b2 Mon Sep 17 00:00:00 2001 From: Charles Hall Date: Tue, 8 Oct 2024 14:40:54 -0700 Subject: [PATCH] move lasttimelinecount_cache to service --- src/database.rs | 8 +--- src/database/key_value/rooms/timeline.rs | 49 +----------------------- src/service/rooms/timeline.rs | 42 ++++++++++++++++++-- src/service/rooms/timeline/data.rs | 7 ---- 4 files changed, 41 insertions(+), 65 deletions(-) diff --git a/src/database.rs b/src/database.rs index 66025b90..5205b9ca 100644 --- a/src/database.rs +++ b/src/database.rs @@ -4,7 +4,7 @@ use std::{ io::Write, mem::size_of, path::Path, - sync::{Arc, Mutex}, + sync::Arc, }; use ruma::{ @@ -21,7 +21,6 @@ use crate::{ rooms::{ short::{ShortEventId, ShortStateHash, ShortStateKey}, state_compressor::CompressedStateEvent, - timeline::PduCount, }, }, services, utils, Config, Error, Result, @@ -231,9 +230,6 @@ pub(crate) struct KeyValueDatabase { // Trees "owned" by `self::key_value::pusher` pub(super) senderkey_pusher: Arc, - - // Uncategorized trees - pub(super) lasttimelinecount_cache: Mutex>, } impl KeyValueDatabase { @@ -451,8 +447,6 @@ impl KeyValueDatabase { senderkey_pusher: builder.open_tree("senderkey_pusher")?, global: builder.open_tree("global")?, server_signingkeys: builder.open_tree("server_signingkeys")?, - - lasttimelinecount_cache: Mutex::new(HashMap::new()), }; Ok(db) diff --git a/src/database/key_value/rooms/timeline.rs b/src/database/key_value/rooms/timeline.rs index 71b75652..b46e9bfe 100644 --- a/src/database/key_value/rooms/timeline.rs +++ b/src/database/key_value/rooms/timeline.rs @@ -1,59 +1,18 @@ -use std::{collections::hash_map, mem::size_of, sync::Arc}; +use std::{mem::size_of, sync::Arc}; use ruma::{ api::client::error::ErrorKind, CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId, }; use service::rooms::timeline::PduCount; -use tracing::error; use crate::{ database::KeyValueDatabase, - observability::{FoundIn, Lookup, METRICS}, service::{self, rooms::timeline::PduId}, services, utils, Error, PduEvent, Result, }; impl service::rooms::timeline::Data for KeyValueDatabase { - #[tracing::instrument(skip(self))] - fn last_timeline_count( - &self, - sender_user: &UserId, - room_id: &RoomId, - ) -> Result { - let lookup = Lookup::LastTimelineCount; - - match self - .lasttimelinecount_cache - .lock() - .unwrap() - .entry(room_id.to_owned()) - { - hash_map::Entry::Vacant(v) => { - if let Some(last_count) = self - .pdus_until(sender_user, room_id, PduCount::MAX)? - .find_map(|x| match x { - Ok(x) => Some(x), - Err(error) => { - error!(%error, "Bad pdu in pdus_since"); - None - } - }) - { - METRICS.record_lookup(lookup, FoundIn::Database); - Ok(*v.insert(last_count.0)) - } else { - METRICS.record_lookup(lookup, FoundIn::Nothing); - Ok(PduCount::Normal(0)) - } - } - hash_map::Entry::Occupied(o) => { - METRICS.record_lookup(lookup, FoundIn::Cache); - Ok(*o.get()) - } - } - } - /// Returns the `count` of this pdu's id. fn get_pdu_count(&self, event_id: &EventId) -> Result> { self.eventid_pduid @@ -180,7 +139,6 @@ impl service::rooms::timeline::Data for KeyValueDatabase { pdu_id: &PduId, pdu: &PduEvent, json: &CanonicalJsonObject, - count: u64, ) -> Result<()> { self.pduid_pdu.insert( pdu_id.as_bytes(), @@ -188,11 +146,6 @@ impl service::rooms::timeline::Data for KeyValueDatabase { .expect("CanonicalJsonObject is always a valid"), )?; - self.lasttimelinecount_cache - .lock() - .unwrap() - .insert(pdu.room_id.clone(), PduCount::Normal(count)); - self.eventid_pduid .insert(pdu.event_id.as_bytes(), pdu_id.as_bytes())?; self.eventid_outlierpdu.remove(pdu.event_id.as_bytes())?; diff --git a/src/service/rooms/timeline.rs b/src/service/rooms/timeline.rs index 341fb1c4..d70fd0c2 100644 --- a/src/service/rooms/timeline.rs +++ b/src/service/rooms/timeline.rs @@ -1,6 +1,6 @@ use std::{ cmp::Ordering, - collections::{BTreeMap, HashSet}, + collections::{hash_map, BTreeMap, HashMap, HashSet}, sync::{Arc, Mutex}, }; @@ -115,6 +115,7 @@ impl Ord for PduCount { pub(crate) struct Service { db: &'static dyn Data, pdu_cache: Mutex>>, + lasttimelinecount_cache: Mutex>, } impl Service { @@ -125,6 +126,7 @@ impl Service { Self { db, pdu_cache: Mutex::new(LruCache::new(pdu_cache_capacity)), + lasttimelinecount_cache: Mutex::new(HashMap::new()), } } @@ -145,7 +147,37 @@ impl Service { sender_user: &UserId, room_id: &RoomId, ) -> Result { - self.db.last_timeline_count(sender_user, room_id) + let lookup = Lookup::LastTimelineCount; + + match self + .lasttimelinecount_cache + .lock() + .unwrap() + .entry(room_id.to_owned()) + { + hash_map::Entry::Vacant(v) => { + if let Some(last_count) = self + .pdus_until(sender_user, room_id, PduCount::MAX)? + .find_map(|x| match x { + Ok(x) => Some(x), + Err(error) => { + error!(%error, "Bad pdu in pdus_since"); + None + } + }) + { + METRICS.record_lookup(lookup, FoundIn::Database); + Ok(*v.insert(last_count.0)) + } else { + METRICS.record_lookup(lookup, FoundIn::Nothing); + Ok(PduCount::Normal(0)) + } + } + hash_map::Entry::Occupied(o) => { + METRICS.record_lookup(lookup, FoundIn::Cache); + Ok(*o.get()) + } + } } /// Returns the `count` of this pdu's id. @@ -334,7 +366,11 @@ impl Service { let pdu_id = PduId::new(pdu_id); // Insert pdu - self.db.append_pdu(&pdu_id, pdu, &pdu_json, count2)?; + self.db.append_pdu(&pdu_id, pdu, &pdu_json)?; + self.lasttimelinecount_cache + .lock() + .unwrap() + .insert(pdu.room_id.clone(), PduCount::Normal(count2)); drop(insert_token); diff --git a/src/service/rooms/timeline/data.rs b/src/service/rooms/timeline/data.rs index a9a85760..49b0c8a2 100644 --- a/src/service/rooms/timeline/data.rs +++ b/src/service/rooms/timeline/data.rs @@ -6,12 +6,6 @@ use super::PduCount; use crate::{service::rooms::timeline::PduId, PduEvent, Result}; pub(crate) trait Data: Send + Sync { - fn last_timeline_count( - &self, - sender_user: &UserId, - room_id: &RoomId, - ) -> Result; - /// Returns the `count` of this pdu's id. fn get_pdu_count(&self, event_id: &EventId) -> Result>; @@ -60,7 +54,6 @@ pub(crate) trait Data: Send + Sync { pdu_id: &PduId, pdu: &PduEvent, json: &CanonicalJsonObject, - count: u64, ) -> Result<()>; // Adds a new pdu to the backfilled timeline