MSC3575: factor out ConnectionKey

This commit is contained in:
Lambda 2025-08-10 17:25:47 +00:00
parent 3bbee92db4
commit 991f1e2c0d
2 changed files with 50 additions and 85 deletions

View file

@ -25,7 +25,7 @@ use tracing::{debug, error};
use super::{load_timeline, share_encrypted_room}; use super::{load_timeline, share_encrypted_room};
use crate::{ use crate::{
service::{account_data, rooms::timeline::PduCount}, service::{account_data, rooms::timeline::PduCount, users::ConnectionKey},
services, Ar, Error, Ra, Result, services, Ar, Error, Ra, Result,
}; };
@ -76,22 +76,22 @@ pub(crate) async fn sync_events_v4_route(
let globalsince = let globalsince =
body.pos.as_ref().and_then(|string| string.parse().ok()).unwrap_or(0); body.pos.as_ref().and_then(|string| string.parse().ok()).unwrap_or(0);
let connection_key = ConnectionKey {
user: sender_user.clone(),
device: sender_device.clone(),
connection: body.conn_id.clone(),
};
if globalsince == 0 { if globalsince == 0 {
if let Some(conn_id) = &body.conn_id { if body.conn_id.is_some() {
services().users.forget_sync_request_connection( services().users.forget_sync_request_connection(&connection_key);
sender_user.clone(),
sender_device.clone(),
conn_id.clone(),
);
} }
} }
// Get sticky parameters from cache // Get sticky parameters from cache
let known_rooms = services().users.update_sync_request_with_cache( let known_rooms = services()
sender_user.clone(), .users
sender_device.clone(), .update_sync_request_with_cache(connection_key.clone(), &mut body);
&mut body,
);
let all_joined_rooms = services() let all_joined_rooms = services()
.rooms .rooms
@ -379,11 +379,9 @@ pub(crate) async fn sync_events_v4_route(
}, },
); );
if let Some(conn_id) = &body.conn_id { if body.conn_id.is_some() {
services().users.update_sync_known_rooms( services().users.update_sync_known_rooms(
sender_user.clone(), connection_key.clone(),
sender_device.clone(),
conn_id.clone(),
list_id, list_id,
new_known_rooms, new_known_rooms,
globalsince, globalsince,
@ -410,24 +408,19 @@ pub(crate) async fn sync_events_v4_route(
body.room_subscriptions.remove(&r); body.room_subscriptions.remove(&r);
} }
if let Some(conn_id) = &body.conn_id { if body.conn_id.is_some() {
services().users.update_sync_known_rooms( services().users.update_sync_known_rooms(
sender_user.clone(), connection_key.clone(),
sender_device.clone(),
conn_id.clone(),
"subscriptions".to_owned(), "subscriptions".to_owned(),
known_subscription_rooms, known_subscription_rooms,
globalsince, globalsince,
); );
} }
if let Some(conn_id) = &body.conn_id { if body.conn_id.is_some() {
services().users.update_sync_subscriptions( services()
sender_user.clone(), .users
sender_device.clone(), .update_sync_subscriptions(connection_key, body.room_subscriptions);
conn_id.clone(),
body.room_subscriptions,
);
} }
let mut rooms = BTreeMap::new(); let mut rooms = BTreeMap::new();

View file

@ -26,6 +26,7 @@ mod data;
pub(crate) use data::Data; pub(crate) use data::Data;
#[derive(Debug, Default)]
pub(crate) struct SlidingSyncCache { pub(crate) struct SlidingSyncCache {
lists: BTreeMap<String, SyncRequestList>, lists: BTreeMap<String, SyncRequestList>,
subscriptions: BTreeMap<OwnedRoomId, sync_events::v4::RoomSubscription>, subscriptions: BTreeMap<OwnedRoomId, sync_events::v4::RoomSubscription>,
@ -34,15 +35,18 @@ pub(crate) struct SlidingSyncCache {
extensions: ExtensionsConfig, extensions: ExtensionsConfig,
} }
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub(crate) struct ConnectionKey {
pub(crate) user: OwnedUserId,
pub(crate) device: OwnedDeviceId,
pub(crate) connection: Option<String>,
}
pub(crate) struct Service { pub(crate) struct Service {
pub(crate) db: &'static dyn Data, pub(crate) db: &'static dyn Data,
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
pub(crate) connections: Mutex< pub(crate) connections:
BTreeMap< Mutex<BTreeMap<ConnectionKey, Arc<Mutex<SlidingSyncCache>>>>,
(OwnedUserId, OwnedDeviceId, String),
Arc<Mutex<SlidingSyncCache>>,
>,
>,
} }
impl Service { impl Service {
@ -53,6 +57,14 @@ impl Service {
} }
} }
fn get_cache_entry(
&self,
key: ConnectionKey,
) -> Arc<Mutex<SlidingSyncCache>> {
let mut cache = self.connections.lock().unwrap();
Arc::clone(cache.entry(key).or_default())
}
/// Check if a user has an account on this homeserver. /// Check if a user has an account on this homeserver.
pub(crate) fn exists(&self, user_id: &UserId) -> Result<bool> { pub(crate) fn exists(&self, user_id: &UserId) -> Result<bool> {
self.db.exists(user_id) self.db.exists(user_id)
@ -60,37 +72,23 @@ impl Service {
pub(crate) fn forget_sync_request_connection( pub(crate) fn forget_sync_request_connection(
&self, &self,
user_id: OwnedUserId, connection_key: &ConnectionKey,
device_id: OwnedDeviceId,
conn_id: String,
) { ) {
self.connections.lock().unwrap().remove(&(user_id, device_id, conn_id)); self.connections.lock().unwrap().remove(connection_key);
} }
#[allow(clippy::too_many_lines)] #[allow(clippy::too_many_lines)]
pub(crate) fn update_sync_request_with_cache( pub(crate) fn update_sync_request_with_cache(
&self, &self,
user_id: OwnedUserId, connection_key: ConnectionKey,
device_id: OwnedDeviceId,
request: &mut sync_events::v4::Request, request: &mut sync_events::v4::Request,
) -> BTreeMap<String, BTreeMap<OwnedRoomId, u64>> { ) -> BTreeMap<String, BTreeMap<OwnedRoomId, u64>> {
let Some(conn_id) = request.conn_id.clone() else { if connection_key.connection.is_none() {
return BTreeMap::new(); return BTreeMap::new();
}; };
let mut cache = self.connections.lock().unwrap(); let cached = self.get_cache_entry(connection_key);
let cached = Arc::clone( let mut cached = cached.lock().unwrap();
cache.entry((user_id, device_id, conn_id)).or_insert_with(|| {
Arc::new(Mutex::new(SlidingSyncCache {
lists: BTreeMap::new(),
subscriptions: BTreeMap::new(),
known_rooms: BTreeMap::new(),
extensions: ExtensionsConfig::default(),
}))
}),
);
let cached = &mut cached.lock().unwrap();
drop(cache);
for (list_id, list) in &mut request.lists { for (list_id, list) in &mut request.lists {
if let Some(cached_list) = cached.lists.get(list_id) { if let Some(cached_list) = cached.lists.get(list_id) {
@ -200,50 +198,24 @@ impl Service {
pub(crate) fn update_sync_subscriptions( pub(crate) fn update_sync_subscriptions(
&self, &self,
user_id: OwnedUserId, connection_key: ConnectionKey,
device_id: OwnedDeviceId,
conn_id: String,
subscriptions: BTreeMap<OwnedRoomId, sync_events::v4::RoomSubscription>, subscriptions: BTreeMap<OwnedRoomId, sync_events::v4::RoomSubscription>,
) { ) {
let mut cache = self.connections.lock().unwrap(); let cached = self.get_cache_entry(connection_key);
let cached = Arc::clone( let mut cached = cached.lock().unwrap();
cache.entry((user_id, device_id, conn_id)).or_insert_with(|| {
Arc::new(Mutex::new(SlidingSyncCache {
lists: BTreeMap::new(),
subscriptions: BTreeMap::new(),
known_rooms: BTreeMap::new(),
extensions: ExtensionsConfig::default(),
}))
}),
);
let cached = &mut cached.lock().unwrap();
drop(cache);
cached.subscriptions = subscriptions; cached.subscriptions = subscriptions;
} }
pub(crate) fn update_sync_known_rooms( pub(crate) fn update_sync_known_rooms(
&self, &self,
user_id: OwnedUserId, connection_key: ConnectionKey,
device_id: OwnedDeviceId,
conn_id: String,
list_id: String, list_id: String,
new_cached_rooms: BTreeSet<OwnedRoomId>, new_cached_rooms: BTreeSet<OwnedRoomId>,
globalsince: u64, globalsince: u64,
) { ) {
let mut cache = self.connections.lock().unwrap(); let cached = self.get_cache_entry(connection_key);
let cached = Arc::clone( let mut cached = cached.lock().unwrap();
cache.entry((user_id, device_id, conn_id)).or_insert_with(|| {
Arc::new(Mutex::new(SlidingSyncCache {
lists: BTreeMap::new(),
subscriptions: BTreeMap::new(),
known_rooms: BTreeMap::new(),
extensions: ExtensionsConfig::default(),
}))
}),
);
let cached = &mut cached.lock().unwrap();
drop(cache);
for (roomid, lastsince) in for (roomid, lastsince) in
cached.known_rooms.entry(list_id.clone()).or_default().iter_mut() cached.known_rooms.entry(list_id.clone()).or_default().iter_mut()