From dd705a2bb205b84a1f21c611392a1391f4751e78 Mon Sep 17 00:00:00 2001 From: Lambda Date: Sun, 10 Aug 2025 10:49:50 +0000 Subject: [PATCH] Revert "Remove support for MSC3575 (sliding sync)" This reverts commit d87848b9a6073d8497a5f860876bc8dec858dda7. --- Cargo.toml | 1 + book/changelog.md | 2 - src/api/client_server/sync.rs | 1 + src/api/client_server/sync/msc3575.rs | 671 ++++++++++++++++++++++++++ src/api/well_known.rs | 9 + src/cli/serve.rs | 1 + src/config.rs | 3 + src/service/rooms/state_accessor.rs | 40 +- src/service/users.rs | 233 ++++++++- 9 files changed, 954 insertions(+), 7 deletions(-) create mode 100644 src/api/client_server/sync/msc3575.rs diff --git a/Cargo.toml b/Cargo.toml index bdea44fb..6cfd3f65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -165,6 +165,7 @@ features = [ "state-res", "unstable-msc2448", "ring-compat", + "unstable-msc3575", ] [target.'cfg(unix)'.dependencies] diff --git a/book/changelog.md b/book/changelog.md index 40a8abea..3729bfbb 100644 --- a/book/changelog.md +++ b/book/changelog.md @@ -87,8 +87,6 @@ This will be the first release of Grapevine since it was forked from Conduit * Instead, it is now possible to configure each cache capacity individually. 10. Remove jemalloc support. ([!93](https://gitlab.computer.surgery/matrix/grapevine/-/merge_requests/193)) -11. Removed support for MSC3575 (sliding sync), which has been closed. - ([!198](https://gitlab.computer.surgery/matrix/grapevine/-/merge_requests/198)) ### Changed diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index 4ad37425..2a17aa17 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -5,6 +5,7 @@ use crate::{ service::rooms::timeline::PduCount, services, Error, PduEvent, Result, }; +pub(crate) mod msc3575; pub(crate) mod v3; fn load_timeline( diff --git a/src/api/client_server/sync/msc3575.rs b/src/api/client_server/sync/msc3575.rs new file mode 100644 index 00000000..31333eaf --- /dev/null +++ b/src/api/client_server/sync/msc3575.rs @@ -0,0 +1,671 @@ +//! [MSC3575], aka Sliding Sync, aka Sync v3 (even though the endpoint is called +//! /v4) support +//! +//! [MSC3575]: https://github.com/matrix-org/matrix-spec-proposals/pull/3575 + +use std::{ + collections::{BTreeMap, BTreeSet, HashSet}, + time::Duration, +}; + +use ruma::{ + api::client::{ + sync::sync_events::{ + self, v4::SlidingOp, DeviceLists, UnreadNotificationsCount, + }, + uiaa::UiaaResponse, + }, + events::{ + room::member::{MembershipState, RoomMemberEventContent}, + StateEventType, TimelineEventType, + }, + uint, JsOption, UInt, UserId, +}; +use tracing::{debug, error}; + +use super::{load_timeline, share_encrypted_room}; +use crate::{ + service::{account_data, rooms::timeline::PduCount}, + services, Ar, Error, Ra, Result, +}; + +#[allow(clippy::too_many_lines)] +pub(crate) async fn sync_events_v4_route( + body: Ar, +) -> Result, Ra> { + let sender_user = body.sender_user.expect("user is authenticated"); + let sender_device = body.sender_device.expect("user is authenticated"); + let mut body = body.body; + // Setup watchers, so if there's no response, we can wait for them + let watcher = services().globals.watch(&sender_user, &sender_device); + + let next_batch = services().globals.next_count()?; + + let globalsince = + body.pos.as_ref().and_then(|string| string.parse().ok()).unwrap_or(0); + + if globalsince == 0 { + if let Some(conn_id) = &body.conn_id { + services().users.forget_sync_request_connection( + sender_user.clone(), + sender_device.clone(), + conn_id.clone(), + ); + } + } + + // Get sticky parameters from cache + let known_rooms = services().users.update_sync_request_with_cache( + sender_user.clone(), + sender_device.clone(), + &mut body, + ); + + let all_joined_rooms = services() + .rooms + .state_cache + .rooms_joined(&sender_user) + .filter_map(Result::ok) + .collect::>(); + + if body.extensions.to_device.enabled.unwrap_or(false) { + services().users.remove_to_device_events( + &sender_user, + &sender_device, + globalsince, + )?; + } + + // Users that have left any encrypted rooms the sender was in + let mut left_encrypted_users = HashSet::new(); + let mut device_list_changes = HashSet::new(); + let mut device_list_left = HashSet::new(); + + if body.extensions.e2ee.enabled.unwrap_or(false) { + // Look for device list updates of this account + device_list_changes.extend( + services() + .users + .keys_changed(sender_user.as_ref(), globalsince, None) + .filter_map(Result::ok), + ); + + for room_id in &all_joined_rooms { + let Some(current_shortstatehash) = + services().rooms.state.get_room_shortstatehash(room_id)? + else { + error!(%room_id, "Room has no state"); + continue; + }; + + let since_shortstatehash = services() + .rooms + .user + .get_token_shortstatehash(room_id, globalsince)?; + + let since_sender_member: Option = + since_shortstatehash + .and_then(|shortstatehash| { + services() + .rooms + .state_accessor + .state_get( + shortstatehash, + &StateEventType::RoomMember, + sender_user.as_str(), + ) + .transpose() + }) + .transpose()? + .and_then(|pdu| { + serde_json::from_str(pdu.content.get()) + .map_err(|_| { + Error::bad_database("Invalid PDU in database.") + }) + .ok() + }); + + let encrypted_room = services() + .rooms + .state_accessor + .state_get( + current_shortstatehash, + &StateEventType::RoomEncryption, + "", + )? + .is_some(); + + if let Some(since_shortstatehash) = since_shortstatehash { + // Skip if there are only timeline changes + if since_shortstatehash == current_shortstatehash { + continue; + } + + let since_encryption = + services().rooms.state_accessor.state_get( + since_shortstatehash, + &StateEventType::RoomEncryption, + "", + )?; + + let joined_since_last_sync = + since_sender_member.is_none_or(|member| { + member.membership != MembershipState::Join + }); + + let new_encrypted_room = + encrypted_room && since_encryption.is_none(); + if encrypted_room { + let current_state_ids = services() + .rooms + .state_accessor + .state_full_ids(current_shortstatehash) + .await?; + let since_state_ids = services() + .rooms + .state_accessor + .state_full_ids(since_shortstatehash) + .await?; + + for (key, event_id) in current_state_ids { + if since_state_ids.get(&key) != Some(&event_id) { + let Some(pdu) = + services().rooms.timeline.get_pdu(&event_id)? + else { + error!(%event_id, "Event in state not found"); + continue; + }; + if pdu.kind == TimelineEventType::RoomMember { + if let Some(state_key) = &pdu.state_key { + let user_id = + UserId::parse(state_key.clone()) + .map_err(|_| { + Error::bad_database( + "Invalid UserId in member \ + PDU.", + ) + })?; + + if user_id == sender_user { + continue; + } + + let new_membership = + serde_json::from_str::< + RoomMemberEventContent, + >( + pdu.content.get() + ) + .map_err(|_| { + Error::bad_database( + "Invalid PDU in database.", + ) + })? + .membership; + + match new_membership { + MembershipState::Join => { + // A new user joined an encrypted + // room + if !share_encrypted_room( + &sender_user, + &user_id, + room_id, + )? { + device_list_changes + .insert(user_id); + } + } + MembershipState::Leave => { + // Write down users that have left + // encrypted rooms we are in + left_encrypted_users + .insert(user_id); + } + _ => {} + } + } + } + } + } + if joined_since_last_sync || new_encrypted_room { + // If the user is in a new encrypted room, give them all + // joined users + device_list_changes.extend( + services() + .rooms + .state_cache + .room_members(room_id) + .flatten() + .filter(|user_id| { + // Don't send key updates from the sender to + // the sender + &sender_user != user_id + }) + .filter(|user_id| { + // Only send keys if the sender doesn't + // share an encrypted room with the target + // already + !share_encrypted_room( + &sender_user, + user_id, + room_id, + ) + .unwrap_or(false) + }), + ); + } + } + } + // Look for device list updates in this room + device_list_changes.extend( + services() + .users + .keys_changed(room_id.as_ref(), globalsince, None) + .filter_map(Result::ok), + ); + } + for user_id in left_encrypted_users { + let dont_share_encrypted_room = services() + .rooms + .user + .get_shared_rooms(vec![sender_user.clone(), user_id.clone()])? + .filter_map(Result::ok) + .filter_map(|other_room_id| { + Some( + services() + .rooms + .state_accessor + .room_state_get( + &other_room_id, + &StateEventType::RoomEncryption, + "", + ) + .ok()? + .is_some(), + ) + }) + .all(|encrypted| !encrypted); + // If the user doesn't share an encrypted room with the target + // anymore, we need to tell them + if dont_share_encrypted_room { + device_list_left.insert(user_id); + } + } + } + + let mut lists = BTreeMap::new(); + // and required state + let mut todo_rooms = BTreeMap::new(); + + for (list_id, list) in body.lists { + if list.filters.and_then(|f| f.is_invite).unwrap_or(false) { + continue; + } + + let mut new_known_rooms = BTreeSet::new(); + + lists.insert( + list_id.clone(), + sync_events::v4::SyncList { + ops: list + .ranges + .into_iter() + .map(|mut r| { + r.0 = r.0.clamp( + uint!(0), + UInt::try_from(all_joined_rooms.len() - 1) + .unwrap_or(UInt::MAX), + ); + r.1 = r.1.clamp( + r.0, + UInt::try_from(all_joined_rooms.len() - 1) + .unwrap_or(UInt::MAX), + ); + let room_ids = all_joined_rooms[r + .0 + .try_into() + .unwrap_or(usize::MAX) + ..=r.1.try_into().unwrap_or(usize::MAX)] + .to_vec(); + new_known_rooms.extend(room_ids.iter().cloned()); + for room_id in &room_ids { + let todo_room = todo_rooms + .entry(room_id.clone()) + .or_insert((BTreeSet::new(), 0, u64::MAX)); + let limit = list + .room_details + .timeline_limit + .map_or(10, u64::from) + .min(100); + todo_room.0.extend( + list.room_details + .required_state + .iter() + .cloned(), + ); + todo_room.1 = todo_room.1.max(limit); + // 0 means unknown because it got out of date + todo_room.2 = todo_room.2.min( + known_rooms + .get(&list_id) + .and_then(|k| k.get(room_id)) + .copied() + .unwrap_or(0), + ); + } + sync_events::v4::SyncOp { + op: SlidingOp::Sync, + range: Some(r), + index: None, + room_ids, + room_id: None, + } + }) + .collect(), + count: UInt::try_from(all_joined_rooms.len()) + .unwrap_or(UInt::MAX), + }, + ); + + if let Some(conn_id) = &body.conn_id { + services().users.update_sync_known_rooms( + sender_user.clone(), + sender_device.clone(), + conn_id.clone(), + list_id, + new_known_rooms, + globalsince, + ); + } + } + + let mut known_subscription_rooms = BTreeSet::new(); + for (room_id, room) in &body.room_subscriptions { + if !services().rooms.metadata.exists(room_id)? { + continue; + } + let todo_room = todo_rooms.entry(room_id.clone()).or_insert(( + BTreeSet::new(), + 0, + u64::MAX, + )); + let limit = room.timeline_limit.map_or(10, u64::from).min(100); + todo_room.0.extend(room.required_state.iter().cloned()); + todo_room.1 = todo_room.1.max(limit); + // 0 means unknown because it got out of date + todo_room.2 = todo_room.2.min( + known_rooms + .get("subscriptions") + .and_then(|k| k.get(room_id)) + .copied() + .unwrap_or(0), + ); + known_subscription_rooms.insert(room_id.clone()); + } + + for r in body.unsubscribe_rooms { + known_subscription_rooms.remove(&r); + body.room_subscriptions.remove(&r); + } + + if let Some(conn_id) = &body.conn_id { + services().users.update_sync_known_rooms( + sender_user.clone(), + sender_device.clone(), + conn_id.clone(), + "subscriptions".to_owned(), + known_subscription_rooms, + globalsince, + ); + } + + if let Some(conn_id) = &body.conn_id { + services().users.update_sync_subscriptions( + sender_user.clone(), + sender_device.clone(), + conn_id.clone(), + body.room_subscriptions, + ); + } + + let mut rooms = BTreeMap::new(); + for (room_id, (required_state_request, timeline_limit, roomsince)) in + &todo_rooms + { + let roomsincecount = PduCount::Normal(*roomsince); + + let (timeline_pdus, limited) = load_timeline( + &sender_user, + room_id, + roomsincecount, + *timeline_limit, + )?; + + if roomsince != &0 && timeline_pdus.is_empty() { + continue; + } + + let prev_batch = timeline_pdus + .first() + .map_or(Ok::<_, Error>(None), |(pdu_count, _)| { + Ok(Some(match pdu_count { + PduCount::Backfilled(_) => { + error!("Timeline in backfill state?!"); + "0".to_owned() + } + PduCount::Normal(c) => c.to_string(), + })) + })? + .or_else(|| (roomsince != &0).then(|| roomsince.to_string())); + + let room_events: Vec<_> = timeline_pdus + .iter() + .map(|(_, pdu)| pdu.to_sync_room_event()) + .collect(); + + let required_state = required_state_request + .iter() + .filter_map(|state| { + services() + .rooms + .state_accessor + .room_state_get(room_id, &state.0, &state.1) + .ok() + .flatten() + .map(|state| state.to_sync_state_event()) + }) + .collect(); + + // Heroes + let heroes = services() + .rooms + .state_cache + .room_members(room_id) + .filter_map(Result::ok) + .filter(|member| member != &sender_user) + .filter_map(|member| { + services() + .rooms + .state_accessor + .get_member(room_id, &member) + .ok() + .flatten() + .map(|memberevent| { + ( + memberevent + .displayname + .unwrap_or_else(|| member.to_string()), + memberevent.avatar_url, + ) + }) + }) + .take(5) + .collect::>(); + let name = match &*heroes { + [] => None, + [only] => Some(only.0.clone()), + [firsts @ .., last] => Some({ + let firsts = firsts + .iter() + .map(|h| h.0.clone()) + .collect::>() + .join(", "); + + format!("{firsts} and {}", last.0) + }), + }; + + let avatar = if let [only] = &*heroes { + only.1.clone() + } else { + None + }; + + rooms.insert( + room_id.clone(), + sync_events::v4::SlidingSyncRoom { + name: services() + .rooms + .state_accessor + .get_name(room_id)? + .or(name), + avatar: if let Some(avatar) = avatar { + JsOption::Some(avatar) + } else { + match services().rooms.state_accessor.get_avatar(room_id)? { + JsOption::Some(avatar) => { + JsOption::from_option(avatar.url) + } + JsOption::Null => JsOption::Null, + JsOption::Undefined => JsOption::Undefined, + } + }, + initial: Some(roomsince == &0), + is_dm: None, + invite_state: None, + unread_notifications: UnreadNotificationsCount { + highlight_count: Some( + services() + .rooms + .user + .highlight_count(&sender_user, room_id)? + .try_into() + .expect("notification count can't go that high"), + ), + notification_count: Some( + services() + .rooms + .user + .notification_count(&sender_user, room_id)? + .try_into() + .expect("notification count can't go that high"), + ), + }, + timeline: room_events, + required_state, + prev_batch, + limited, + joined_count: Some( + services() + .rooms + .state_cache + .room_joined_count(room_id)? + .map(UInt::new_saturating) + .unwrap_or(uint!(0)), + ), + invited_count: Some( + services() + .rooms + .state_cache + .room_invited_count(room_id)? + .map(UInt::new_saturating) + .unwrap_or(uint!(0)), + ), + // Count events in timeline greater than global sync counter + num_live: None, + timestamp: None, + // TODO + heroes: None, + }, + ); + } + + if rooms + .iter() + .all(|(_, r)| r.timeline.is_empty() && r.required_state.is_empty()) + { + // Hang a few seconds so requests are not spammed + // Stop hanging if new info arrives + let mut duration = body.timeout.unwrap_or(Duration::from_secs(30)); + if duration.as_secs() > 30 { + duration = Duration::from_secs(30); + } + match tokio::time::timeout(duration, watcher).await { + Ok(x) => x.expect("watcher should succeed"), + Err(error) => debug!(%error, "Timed out"), + }; + } + + Ok(Ra(sync_events::v4::Response { + initial: globalsince == 0, + txn_id: body.txn_id.clone(), + pos: next_batch.to_string(), + lists, + rooms, + extensions: sync_events::v4::Extensions { + to_device: body + .extensions + .to_device + .enabled + .unwrap_or(false) + .then(|| { + services() + .users + .get_to_device_events(&sender_user, &sender_device) + .map(|events| sync_events::v4::ToDevice { + events, + next_batch: next_batch.to_string(), + }) + }) + .transpose()?, + e2ee: sync_events::v4::E2EE { + device_lists: DeviceLists { + changed: device_list_changes.into_iter().collect(), + left: device_list_left.into_iter().collect(), + }, + device_one_time_keys_count: services() + .users + .count_one_time_keys(&sender_user, &sender_device)?, + // Fallback keys are not yet supported + device_unused_fallback_key_types: None, + }, + account_data: sync_events::v4::AccountData { + global: if body.extensions.account_data.enabled.unwrap_or(false) + { + services() + .account_data + .global_changes_since(&sender_user, globalsince)? + .into_iter() + .map(|(event_type, content)| { + account_data::raw_global_event_from_parts( + &event_type, + &content, + ) + }) + .collect() + } else { + Vec::new() + }, + rooms: BTreeMap::new(), + }, + receipts: sync_events::v4::Receipts { + rooms: BTreeMap::new(), + }, + typing: sync_events::v4::Typing { + rooms: BTreeMap::new(), + }, + }, + delta_token: None, + })) +} diff --git a/src/api/well_known.rs b/src/api/well_known.rs index edb5a347..44210e55 100644 --- a/src/api/well_known.rs +++ b/src/api/well_known.rs @@ -37,5 +37,14 @@ pub(crate) async fn client(_: Ar) -> Ra { Ra(client::Response { homeserver: client::HomeserverInfo::new(base_url.clone()), identity_server: None, + sliding_sync_proxy: services() + .globals + .config + .server_discovery + .client + .advertise_sliding_sync + .then_some(client::SlidingSyncProxyInfo { + url: base_url, + }), }) } diff --git a/src/cli/serve.rs b/src/cli/serve.rs index f0e0f969..7a528748 100644 --- a/src/cli/serve.rs +++ b/src/cli/serve.rs @@ -651,6 +651,7 @@ fn client_routes() -> Router { .ruma_route(c2s::get_state_events_route) .ruma_route(c2s::get_state_events_for_key_route) .ruma_route(c2s::v3::sync_events_route) + .ruma_route(c2s::msc3575::sync_events_v4_route) .ruma_route(c2s::get_context_route) .ruma_route(c2s::get_message_events_route) .ruma_route(c2s::search_events_route) diff --git a/src/config.rs b/src/config.rs index 5924d232..b597e4a9 100644 --- a/src/config.rs +++ b/src/config.rs @@ -151,6 +151,9 @@ pub(crate) struct ServerServerDiscovery { pub(crate) struct ClientServerDiscovery { /// The base URL to make client-server API requests to pub(crate) base_url: Url, + + #[serde(default, rename = "advertise_buggy_sliding_sync")] + pub(crate) advertise_sliding_sync: bool, } #[derive(Debug, Deserialize)] diff --git a/src/service/rooms/state_accessor.rs b/src/service/rooms/state_accessor.rs index 78af94ed..3c859a01 100644 --- a/src/service/rooms/state_accessor.rs +++ b/src/service/rooms/state_accessor.rs @@ -7,6 +7,7 @@ use lru_cache::LruCache; use ruma::{ events::{ room::{ + avatar::RoomAvatarEventContent, history_visibility::{ HistoryVisibility, RoomHistoryVisibilityEventContent, }, @@ -17,8 +18,8 @@ use ruma::{ StateEventType, }, state_res::Event, - EventId, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, ServerName, - UserId, + EventId, JsOption, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, + ServerName, UserId, }; use serde_json::value::to_raw_value; use tracing::{error, warn}; @@ -507,6 +508,23 @@ impl Service { ) } + #[tracing::instrument(skip(self))] + pub(crate) fn get_avatar( + &self, + room_id: &RoomId, + ) -> Result> { + self.room_state_get(room_id, &StateEventType::RoomAvatar, "")?.map_or( + Ok(JsOption::Undefined), + |s| { + serde_json::from_str(s.content.get()).map_err(|_| { + Error::bad_database( + "Invalid room avatar event in database.", + ) + }) + }, + ) + } + // Allowed because this function uses `services()` #[allow(clippy::unused_self)] #[tracing::instrument(skip(self), ret(level = "trace"))] @@ -535,6 +553,24 @@ impl Service { .is_ok() } + #[tracing::instrument(skip(self))] + pub(crate) fn get_member( + &self, + room_id: &RoomId, + user_id: &UserId, + ) -> Result> { + self.room_state_get( + room_id, + &StateEventType::RoomMember, + user_id.as_str(), + )? + .map_or(Ok(None), |s| { + serde_json::from_str(s.content.get()).map_err(|_| { + Error::bad_database("Invalid room member event in database.") + }) + }) + } + /// Checks if a given user can redact a given event /// /// If `federation` is `true`, it allows redaction events from any user of diff --git a/src/service/users.rs b/src/service/users.rs index dbacb715..9df6e835 100644 --- a/src/service/users.rs +++ b/src/service/users.rs @@ -1,12 +1,23 @@ -use std::{collections::BTreeMap, mem}; +use std::{ + collections::{BTreeMap, BTreeSet}, + mem, + sync::{Arc, Mutex}, +}; use ruma::{ - api::client::{device::Device, filter::FilterDefinition}, + api::client::{ + device::Device, + filter::FilterDefinition, + sync::sync_events::{ + self, + v4::{ExtensionsConfig, SyncRequestList}, + }, + }, encryption::{CrossSigningKey, DeviceKeys, OneTimeKey}, events::AnyToDeviceEvent, serde::Raw, DeviceId, OneTimeKeyAlgorithm, OneTimeKeyName, OwnedDeviceId, OwnedKeyId, - OwnedMxcUri, OwnedOneTimeKeyId, OwnedUserId, UInt, UserId, + OwnedMxcUri, OwnedOneTimeKeyId, OwnedRoomId, OwnedUserId, UInt, UserId, }; use crate::{services, Error, Result}; @@ -15,14 +26,30 @@ mod data; pub(crate) use data::Data; +pub(crate) struct SlidingSyncCache { + lists: BTreeMap, + subscriptions: BTreeMap, + // For every room, the roomsince number + known_rooms: BTreeMap>, + extensions: ExtensionsConfig, +} + pub(crate) struct Service { pub(crate) db: &'static dyn Data, + #[allow(clippy::type_complexity)] + pub(crate) connections: Mutex< + BTreeMap< + (OwnedUserId, OwnedDeviceId, String), + Arc>, + >, + >, } impl Service { pub(crate) fn new(db: &'static dyn Data) -> Self { Self { db, + connections: Mutex::new(BTreeMap::new()), } } @@ -31,6 +58,206 @@ impl Service { self.db.exists(user_id) } + pub(crate) fn forget_sync_request_connection( + &self, + user_id: OwnedUserId, + device_id: OwnedDeviceId, + conn_id: String, + ) { + self.connections.lock().unwrap().remove(&(user_id, device_id, conn_id)); + } + + #[allow(clippy::too_many_lines)] + pub(crate) fn update_sync_request_with_cache( + &self, + user_id: OwnedUserId, + device_id: OwnedDeviceId, + request: &mut sync_events::v4::Request, + ) -> BTreeMap> { + let Some(conn_id) = request.conn_id.clone() else { + return BTreeMap::new(); + }; + + let mut cache = self.connections.lock().unwrap(); + let cached = Arc::clone( + cache.entry((user_id, device_id, conn_id)).or_insert_with(|| { + Arc::new(Mutex::new(SlidingSyncCache { + lists: BTreeMap::new(), + subscriptions: BTreeMap::new(), + known_rooms: BTreeMap::new(), + extensions: ExtensionsConfig::default(), + })) + }), + ); + let cached = &mut cached.lock().unwrap(); + drop(cache); + + for (list_id, list) in &mut request.lists { + if let Some(cached_list) = cached.lists.get(list_id) { + if list.sort.is_empty() { + list.sort.clone_from(&cached_list.sort); + } + if list.room_details.required_state.is_empty() { + list.room_details + .required_state + .clone_from(&cached_list.room_details.required_state); + } + list.room_details.timeline_limit = list + .room_details + .timeline_limit + .or(cached_list.room_details.timeline_limit); + list.include_old_rooms = list + .include_old_rooms + .clone() + .or(cached_list.include_old_rooms.clone()); + match (&mut list.filters, cached_list.filters.clone()) { + (Some(list_filters), Some(cached_filters)) => { + list_filters.is_dm = + list_filters.is_dm.or(cached_filters.is_dm); + if list_filters.spaces.is_empty() { + list_filters.spaces = cached_filters.spaces; + } + list_filters.is_encrypted = list_filters + .is_encrypted + .or(cached_filters.is_encrypted); + list_filters.is_invite = + list_filters.is_invite.or(cached_filters.is_invite); + if list_filters.room_types.is_empty() { + list_filters.room_types = cached_filters.room_types; + } + if list_filters.not_room_types.is_empty() { + list_filters.not_room_types = + cached_filters.not_room_types; + } + list_filters.room_name_like = list_filters + .room_name_like + .clone() + .or(cached_filters.room_name_like); + if list_filters.tags.is_empty() { + list_filters.tags = cached_filters.tags; + } + if list_filters.not_tags.is_empty() { + list_filters.not_tags = cached_filters.not_tags; + } + } + (_, Some(cached_filters)) => { + list.filters = Some(cached_filters); + } + (Some(list_filters), _) => { + list.filters = Some(list_filters.clone()); + } + (..) => {} + } + if list.bump_event_types.is_empty() { + list.bump_event_types + .clone_from(&cached_list.bump_event_types); + } + } + cached.lists.insert(list_id.clone(), list.clone()); + } + + cached.subscriptions.extend( + request + .room_subscriptions + .iter() + .map(|(k, v)| (k.clone(), v.clone())), + ); + request.room_subscriptions.extend( + cached.subscriptions.iter().map(|(k, v)| (k.clone(), v.clone())), + ); + + request.extensions.e2ee.enabled = + request.extensions.e2ee.enabled.or(cached.extensions.e2ee.enabled); + + request.extensions.to_device.enabled = request + .extensions + .to_device + .enabled + .or(cached.extensions.to_device.enabled); + + request.extensions.account_data.enabled = request + .extensions + .account_data + .enabled + .or(cached.extensions.account_data.enabled); + request.extensions.account_data.lists = request + .extensions + .account_data + .lists + .clone() + .or(cached.extensions.account_data.lists.clone()); + request.extensions.account_data.rooms = request + .extensions + .account_data + .rooms + .clone() + .or(cached.extensions.account_data.rooms.clone()); + + cached.extensions = request.extensions.clone(); + + cached.known_rooms.clone() + } + + pub(crate) fn update_sync_subscriptions( + &self, + user_id: OwnedUserId, + device_id: OwnedDeviceId, + conn_id: String, + subscriptions: BTreeMap, + ) { + let mut cache = self.connections.lock().unwrap(); + let cached = Arc::clone( + cache.entry((user_id, device_id, conn_id)).or_insert_with(|| { + Arc::new(Mutex::new(SlidingSyncCache { + lists: BTreeMap::new(), + subscriptions: BTreeMap::new(), + known_rooms: BTreeMap::new(), + extensions: ExtensionsConfig::default(), + })) + }), + ); + let cached = &mut cached.lock().unwrap(); + drop(cache); + + cached.subscriptions = subscriptions; + } + + pub(crate) fn update_sync_known_rooms( + &self, + user_id: OwnedUserId, + device_id: OwnedDeviceId, + conn_id: String, + list_id: String, + new_cached_rooms: BTreeSet, + globalsince: u64, + ) { + let mut cache = self.connections.lock().unwrap(); + let cached = Arc::clone( + cache.entry((user_id, device_id, conn_id)).or_insert_with(|| { + Arc::new(Mutex::new(SlidingSyncCache { + lists: BTreeMap::new(), + subscriptions: BTreeMap::new(), + known_rooms: BTreeMap::new(), + extensions: ExtensionsConfig::default(), + })) + }), + ); + let cached = &mut cached.lock().unwrap(); + drop(cache); + + for (roomid, lastsince) in + cached.known_rooms.entry(list_id.clone()).or_default().iter_mut() + { + if !new_cached_rooms.contains(roomid) { + *lastsince = 0; + } + } + let list = cached.known_rooms.entry(list_id).or_default(); + for roomid in new_cached_rooms { + list.insert(roomid, globalsince); + } + } + /// Check if account is deactivated pub(crate) fn is_deactivated(&self, user_id: &UserId) -> Result { self.db.is_deactivated(user_id)