mirror of
https://gitlab.computer.surgery/matrix/grapevine.git
synced 2025-12-17 07:41:23 +01:00
move pdu_cache to service
This commit is contained in:
parent
fb534d8140
commit
7563360bee
6 changed files with 44 additions and 40 deletions
|
|
@ -49,7 +49,7 @@ pub(crate) struct Config {
|
||||||
#[serde(default = "default_cache_capacity_modifier")]
|
#[serde(default = "default_cache_capacity_modifier")]
|
||||||
pub(crate) cache_capacity_modifier: f64,
|
pub(crate) cache_capacity_modifier: f64,
|
||||||
#[serde(default = "default_pdu_cache_capacity")]
|
#[serde(default = "default_pdu_cache_capacity")]
|
||||||
pub(crate) pdu_cache_capacity: u32,
|
pub(crate) pdu_cache_capacity: usize,
|
||||||
#[serde(default = "default_cleanup_second_interval")]
|
#[serde(default = "default_cleanup_second_interval")]
|
||||||
pub(crate) cleanup_second_interval: u32,
|
pub(crate) cleanup_second_interval: u32,
|
||||||
#[serde(default = "default_max_request_size")]
|
#[serde(default = "default_max_request_size")]
|
||||||
|
|
@ -390,7 +390,7 @@ fn default_rocksdb_max_open_files() -> i32 {
|
||||||
1000
|
1000
|
||||||
}
|
}
|
||||||
|
|
||||||
fn default_pdu_cache_capacity() -> u32 {
|
fn default_pdu_cache_capacity() -> usize {
|
||||||
150_000
|
150_000
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,7 @@ use crate::{
|
||||||
timeline::PduCount,
|
timeline::PduCount,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
services, utils, Config, Error, PduEvent, Result,
|
services, utils, Config, Error, Result,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub(crate) mod abstraction;
|
pub(crate) mod abstraction;
|
||||||
|
|
@ -236,7 +236,6 @@ pub(crate) struct KeyValueDatabase {
|
||||||
pub(super) senderkey_pusher: Arc<dyn KvTree>,
|
pub(super) senderkey_pusher: Arc<dyn KvTree>,
|
||||||
|
|
||||||
// Uncategorized trees
|
// Uncategorized trees
|
||||||
pub(super) pdu_cache: Mutex<LruCache<OwnedEventId, Arc<PduEvent>>>,
|
|
||||||
pub(super) shorteventid_cache: Mutex<LruCache<ShortEventId, Arc<EventId>>>,
|
pub(super) shorteventid_cache: Mutex<LruCache<ShortEventId, Arc<EventId>>>,
|
||||||
pub(super) auth_chain_cache:
|
pub(super) auth_chain_cache:
|
||||||
Mutex<LruCache<Vec<ShortEventId>, Arc<HashSet<ShortEventId>>>>,
|
Mutex<LruCache<Vec<ShortEventId>, Arc<HashSet<ShortEventId>>>>,
|
||||||
|
|
@ -468,12 +467,6 @@ impl KeyValueDatabase {
|
||||||
global: builder.open_tree("global")?,
|
global: builder.open_tree("global")?,
|
||||||
server_signingkeys: builder.open_tree("server_signingkeys")?,
|
server_signingkeys: builder.open_tree("server_signingkeys")?,
|
||||||
|
|
||||||
pdu_cache: Mutex::new(LruCache::new(
|
|
||||||
config
|
|
||||||
.pdu_cache_capacity
|
|
||||||
.try_into()
|
|
||||||
.expect("pdu cache capacity fits into usize"),
|
|
||||||
)),
|
|
||||||
#[allow(
|
#[allow(
|
||||||
clippy::as_conversions,
|
clippy::as_conversions,
|
||||||
clippy::cast_sign_loss,
|
clippy::cast_sign_loss,
|
||||||
|
|
|
||||||
|
|
@ -132,14 +132,7 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
|
||||||
/// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
|
/// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
fn get_pdu(&self, event_id: &EventId) -> Result<Option<Arc<PduEvent>>> {
|
fn get_pdu(&self, event_id: &EventId) -> Result<Option<Arc<PduEvent>>> {
|
||||||
let lookup = Lookup::Pdu;
|
Ok(self
|
||||||
|
|
||||||
if let Some(p) = self.pdu_cache.lock().unwrap().get_mut(event_id) {
|
|
||||||
METRICS.record_lookup(lookup, FoundIn::Cache);
|
|
||||||
return Ok(Some(Arc::clone(p)));
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(pdu) = self
|
|
||||||
.get_non_outlier_pdu(event_id)?
|
.get_non_outlier_pdu(event_id)?
|
||||||
.map_or_else(
|
.map_or_else(
|
||||||
|| {
|
|| {
|
||||||
|
|
@ -154,18 +147,7 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
|
||||||
},
|
},
|
||||||
|x| Ok(Some(x)),
|
|x| Ok(Some(x)),
|
||||||
)?
|
)?
|
||||||
.map(Arc::new)
|
.map(Arc::new))
|
||||||
{
|
|
||||||
METRICS.record_lookup(lookup, FoundIn::Database);
|
|
||||||
self.pdu_cache
|
|
||||||
.lock()
|
|
||||||
.unwrap()
|
|
||||||
.insert(event_id.to_owned(), Arc::clone(&pdu));
|
|
||||||
Ok(Some(pdu))
|
|
||||||
} else {
|
|
||||||
METRICS.record_lookup(lookup, FoundIn::Nothing);
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the pdu.
|
/// Returns the pdu.
|
||||||
|
|
@ -241,7 +223,6 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
|
||||||
&self,
|
&self,
|
||||||
pdu_id: &PduId,
|
pdu_id: &PduId,
|
||||||
pdu_json: &CanonicalJsonObject,
|
pdu_json: &CanonicalJsonObject,
|
||||||
pdu: &PduEvent,
|
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
if self.pduid_pdu.get(pdu_id.as_bytes())?.is_some() {
|
if self.pduid_pdu.get(pdu_id.as_bytes())?.is_some() {
|
||||||
self.pduid_pdu.insert(
|
self.pduid_pdu.insert(
|
||||||
|
|
@ -256,8 +237,6 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
self.pdu_cache.lock().unwrap().remove(&(*pdu.event_id).to_owned());
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -130,9 +130,10 @@ impl Services {
|
||||||
(100.0 * config.cache_capacity_modifier) as usize,
|
(100.0 * config.cache_capacity_modifier) as usize,
|
||||||
)),
|
)),
|
||||||
},
|
},
|
||||||
timeline: rooms::timeline::Service {
|
timeline: rooms::timeline::Service::new(
|
||||||
db,
|
db,
|
||||||
},
|
config.pdu_cache_capacity,
|
||||||
|
),
|
||||||
threads: rooms::threads::Service {
|
threads: rooms::threads::Service {
|
||||||
db,
|
db,
|
||||||
},
|
},
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,10 @@
|
||||||
use std::{
|
use std::{
|
||||||
cmp::Ordering,
|
cmp::Ordering,
|
||||||
collections::{BTreeMap, HashSet},
|
collections::{BTreeMap, HashSet},
|
||||||
sync::Arc,
|
sync::{Arc, Mutex},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use lru_cache::LruCache;
|
||||||
use ruma::{
|
use ruma::{
|
||||||
api::{client::error::ErrorKind, federation},
|
api::{client::error::ErrorKind, federation},
|
||||||
canonical_json::to_canonical_value,
|
canonical_json::to_canonical_value,
|
||||||
|
|
@ -30,6 +31,7 @@ use tracing::{error, info, warn};
|
||||||
use super::{short::ShortRoomId, state_compressor::CompressedStateEvent};
|
use super::{short::ShortRoomId, state_compressor::CompressedStateEvent};
|
||||||
use crate::{
|
use crate::{
|
||||||
api::server_server,
|
api::server_server,
|
||||||
|
observability::{FoundIn, Lookup, METRICS},
|
||||||
service::{
|
service::{
|
||||||
appservice::NamespaceRegex,
|
appservice::NamespaceRegex,
|
||||||
globals::{marker, SigningKeys},
|
globals::{marker, SigningKeys},
|
||||||
|
|
@ -111,10 +113,21 @@ impl Ord for PduCount {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) struct Service {
|
pub(crate) struct Service {
|
||||||
pub(crate) db: &'static dyn Data,
|
db: &'static dyn Data,
|
||||||
|
pdu_cache: Mutex<LruCache<OwnedEventId, Arc<PduEvent>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Service {
|
impl Service {
|
||||||
|
pub(crate) fn new(
|
||||||
|
db: &'static dyn Data,
|
||||||
|
pdu_cache_capacity: usize,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
db,
|
||||||
|
pdu_cache: Mutex::new(LruCache::new(pdu_cache_capacity)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
pub(crate) fn first_pdu_in_room(
|
pub(crate) fn first_pdu_in_room(
|
||||||
&self,
|
&self,
|
||||||
|
|
@ -174,7 +187,24 @@ impl Service {
|
||||||
&self,
|
&self,
|
||||||
event_id: &EventId,
|
event_id: &EventId,
|
||||||
) -> Result<Option<Arc<PduEvent>>> {
|
) -> Result<Option<Arc<PduEvent>>> {
|
||||||
self.db.get_pdu(event_id)
|
let lookup = Lookup::Pdu;
|
||||||
|
|
||||||
|
if let Some(p) = self.pdu_cache.lock().unwrap().get_mut(event_id) {
|
||||||
|
METRICS.record_lookup(lookup, FoundIn::Cache);
|
||||||
|
return Ok(Some(Arc::clone(p)));
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(pdu) = self.db.get_pdu(event_id)? {
|
||||||
|
METRICS.record_lookup(lookup, FoundIn::Database);
|
||||||
|
self.pdu_cache
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.insert(event_id.to_owned(), Arc::clone(&pdu));
|
||||||
|
Ok(Some(pdu))
|
||||||
|
} else {
|
||||||
|
METRICS.record_lookup(lookup, FoundIn::Nothing);
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the pdu.
|
/// Returns the pdu.
|
||||||
|
|
@ -203,7 +233,9 @@ impl Service {
|
||||||
pdu_json: &CanonicalJsonObject,
|
pdu_json: &CanonicalJsonObject,
|
||||||
pdu: &PduEvent,
|
pdu: &PduEvent,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
self.db.replace_pdu(pdu_id, pdu_json, pdu)
|
self.db.replace_pdu(pdu_id, pdu_json)?;
|
||||||
|
self.pdu_cache.lock().unwrap().remove(&(*pdu.event_id).to_owned());
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Creates a new persisted data unit and adds it to a room.
|
/// Creates a new persisted data unit and adds it to a room.
|
||||||
|
|
|
||||||
|
|
@ -76,7 +76,6 @@ pub(crate) trait Data: Send + Sync {
|
||||||
&self,
|
&self,
|
||||||
pdu_id: &PduId,
|
pdu_id: &PduId,
|
||||||
pdu_json: &CanonicalJsonObject,
|
pdu_json: &CanonicalJsonObject,
|
||||||
pdu: &PduEvent,
|
|
||||||
) -> Result<()>;
|
) -> Result<()>;
|
||||||
|
|
||||||
/// Returns an iterator over all events and their tokens in a room that
|
/// Returns an iterator over all events and their tokens in a room that
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue