From 991f1e2c0d9c14f80b2c4e3cf4338722e528f78f Mon Sep 17 00:00:00 2001 From: Lambda Date: Sun, 10 Aug 2025 17:25:47 +0000 Subject: [PATCH] MSC3575: factor out ConnectionKey --- src/api/client_server/sync/msc3575.rs | 47 ++++++-------- src/service/users.rs | 88 +++++++++------------------ 2 files changed, 50 insertions(+), 85 deletions(-) diff --git a/src/api/client_server/sync/msc3575.rs b/src/api/client_server/sync/msc3575.rs index 55ed37a8..ed8fafe3 100644 --- a/src/api/client_server/sync/msc3575.rs +++ b/src/api/client_server/sync/msc3575.rs @@ -25,7 +25,7 @@ use tracing::{debug, error}; use super::{load_timeline, share_encrypted_room}; use crate::{ - service::{account_data, rooms::timeline::PduCount}, + service::{account_data, rooms::timeline::PduCount, users::ConnectionKey}, services, Ar, Error, Ra, Result, }; @@ -76,22 +76,22 @@ pub(crate) async fn sync_events_v4_route( let globalsince = body.pos.as_ref().and_then(|string| string.parse().ok()).unwrap_or(0); + let connection_key = ConnectionKey { + user: sender_user.clone(), + device: sender_device.clone(), + connection: body.conn_id.clone(), + }; + if globalsince == 0 { - if let Some(conn_id) = &body.conn_id { - services().users.forget_sync_request_connection( - sender_user.clone(), - sender_device.clone(), - conn_id.clone(), - ); + if body.conn_id.is_some() { + services().users.forget_sync_request_connection(&connection_key); } } // Get sticky parameters from cache - let known_rooms = services().users.update_sync_request_with_cache( - sender_user.clone(), - sender_device.clone(), - &mut body, - ); + let known_rooms = services() + .users + .update_sync_request_with_cache(connection_key.clone(), &mut body); let all_joined_rooms = services() .rooms @@ -379,11 +379,9 @@ pub(crate) async fn sync_events_v4_route( }, ); - if let Some(conn_id) = &body.conn_id { + if body.conn_id.is_some() { services().users.update_sync_known_rooms( - sender_user.clone(), - sender_device.clone(), - conn_id.clone(), + connection_key.clone(), list_id, new_known_rooms, globalsince, @@ -410,24 +408,19 @@ pub(crate) async fn sync_events_v4_route( body.room_subscriptions.remove(&r); } - if let Some(conn_id) = &body.conn_id { + if body.conn_id.is_some() { services().users.update_sync_known_rooms( - sender_user.clone(), - sender_device.clone(), - conn_id.clone(), + connection_key.clone(), "subscriptions".to_owned(), known_subscription_rooms, globalsince, ); } - if let Some(conn_id) = &body.conn_id { - services().users.update_sync_subscriptions( - sender_user.clone(), - sender_device.clone(), - conn_id.clone(), - body.room_subscriptions, - ); + if body.conn_id.is_some() { + services() + .users + .update_sync_subscriptions(connection_key, body.room_subscriptions); } let mut rooms = BTreeMap::new(); diff --git a/src/service/users.rs b/src/service/users.rs index 9df6e835..a30de193 100644 --- a/src/service/users.rs +++ b/src/service/users.rs @@ -26,6 +26,7 @@ mod data; pub(crate) use data::Data; +#[derive(Debug, Default)] pub(crate) struct SlidingSyncCache { lists: BTreeMap, subscriptions: BTreeMap, @@ -34,15 +35,18 @@ pub(crate) struct SlidingSyncCache { extensions: ExtensionsConfig, } +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub(crate) struct ConnectionKey { + pub(crate) user: OwnedUserId, + pub(crate) device: OwnedDeviceId, + pub(crate) connection: Option, +} + pub(crate) struct Service { pub(crate) db: &'static dyn Data, #[allow(clippy::type_complexity)] - pub(crate) connections: Mutex< - BTreeMap< - (OwnedUserId, OwnedDeviceId, String), - Arc>, - >, - >, + pub(crate) connections: + Mutex>>>, } impl Service { @@ -53,6 +57,14 @@ impl Service { } } + fn get_cache_entry( + &self, + key: ConnectionKey, + ) -> Arc> { + let mut cache = self.connections.lock().unwrap(); + Arc::clone(cache.entry(key).or_default()) + } + /// Check if a user has an account on this homeserver. pub(crate) fn exists(&self, user_id: &UserId) -> Result { self.db.exists(user_id) @@ -60,37 +72,23 @@ impl Service { pub(crate) fn forget_sync_request_connection( &self, - user_id: OwnedUserId, - device_id: OwnedDeviceId, - conn_id: String, + connection_key: &ConnectionKey, ) { - self.connections.lock().unwrap().remove(&(user_id, device_id, conn_id)); + self.connections.lock().unwrap().remove(connection_key); } #[allow(clippy::too_many_lines)] pub(crate) fn update_sync_request_with_cache( &self, - user_id: OwnedUserId, - device_id: OwnedDeviceId, + connection_key: ConnectionKey, request: &mut sync_events::v4::Request, ) -> BTreeMap> { - let Some(conn_id) = request.conn_id.clone() else { + if connection_key.connection.is_none() { return BTreeMap::new(); }; - let mut cache = self.connections.lock().unwrap(); - let cached = Arc::clone( - cache.entry((user_id, device_id, conn_id)).or_insert_with(|| { - Arc::new(Mutex::new(SlidingSyncCache { - lists: BTreeMap::new(), - subscriptions: BTreeMap::new(), - known_rooms: BTreeMap::new(), - extensions: ExtensionsConfig::default(), - })) - }), - ); - let cached = &mut cached.lock().unwrap(); - drop(cache); + let cached = self.get_cache_entry(connection_key); + let mut cached = cached.lock().unwrap(); for (list_id, list) in &mut request.lists { if let Some(cached_list) = cached.lists.get(list_id) { @@ -200,50 +198,24 @@ impl Service { pub(crate) fn update_sync_subscriptions( &self, - user_id: OwnedUserId, - device_id: OwnedDeviceId, - conn_id: String, + connection_key: ConnectionKey, subscriptions: BTreeMap, ) { - let mut cache = self.connections.lock().unwrap(); - let cached = Arc::clone( - cache.entry((user_id, device_id, conn_id)).or_insert_with(|| { - Arc::new(Mutex::new(SlidingSyncCache { - lists: BTreeMap::new(), - subscriptions: BTreeMap::new(), - known_rooms: BTreeMap::new(), - extensions: ExtensionsConfig::default(), - })) - }), - ); - let cached = &mut cached.lock().unwrap(); - drop(cache); + let cached = self.get_cache_entry(connection_key); + let mut cached = cached.lock().unwrap(); cached.subscriptions = subscriptions; } pub(crate) fn update_sync_known_rooms( &self, - user_id: OwnedUserId, - device_id: OwnedDeviceId, - conn_id: String, + connection_key: ConnectionKey, list_id: String, new_cached_rooms: BTreeSet, globalsince: u64, ) { - let mut cache = self.connections.lock().unwrap(); - let cached = Arc::clone( - cache.entry((user_id, device_id, conn_id)).or_insert_with(|| { - Arc::new(Mutex::new(SlidingSyncCache { - lists: BTreeMap::new(), - subscriptions: BTreeMap::new(), - known_rooms: BTreeMap::new(), - extensions: ExtensionsConfig::default(), - })) - }), - ); - let cached = &mut cached.lock().unwrap(); - drop(cache); + let cached = self.get_cache_entry(connection_key); + let mut cached = cached.lock().unwrap(); for (roomid, lastsince) in cached.known_rooms.entry(list_id.clone()).or_default().iter_mut()