diff --git a/Cargo.toml b/Cargo.toml index f923d9a5..74a24679 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -121,7 +121,7 @@ regex = "1.11.1" reqwest = { version = "0.12.22", default-features = false, features = ["http2", "rustls-tls-native-roots", "socks"] } ring = "0.17.14" rocksdb = { package = "rust-rocksdb", version = "0.36.0", features = ["lz4", "multi-threaded-cf", "zstd"], optional = true } -ruma = { git = "https://gitlab.computer.surgery/matrix/ruma.git", rev = "ruma-0.12.2+grapevine-1", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "server-util", "state-res", "unstable-msc2448", "unstable-msc3575", "ring-compat", "unstable-unspecified" ] } +ruma = { git = "https://gitlab.computer.surgery/matrix/ruma.git", rev = "ruma-0.12.2+grapevine-1", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "server-util", "state-res", "unstable-msc2448", "ring-compat", "unstable-unspecified" ] } rusqlite = { version = "0.34.0", optional = true, features = ["bundled"] } rustls = { version = "0.23.31", default-features = false, features = ["ring", "log", "logging", "std", "tls12"] } sd-notify = { version = "0.4.5", optional = true } diff --git a/book/changelog.md b/book/changelog.md index 3729bfbb..40a8abea 100644 --- a/book/changelog.md +++ b/book/changelog.md @@ -87,6 +87,8 @@ This will be the first release of Grapevine since it was forked from Conduit * Instead, it is now possible to configure each cache capacity individually. 10. Remove jemalloc support. ([!93](https://gitlab.computer.surgery/matrix/grapevine/-/merge_requests/193)) +11. Removed support for MSC3575 (sliding sync), which has been closed. + ([!198](https://gitlab.computer.surgery/matrix/grapevine/-/merge_requests/198)) ### Changed diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index 2a17aa17..4ad37425 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -5,7 +5,6 @@ use crate::{ service::rooms::timeline::PduCount, services, Error, PduEvent, Result, }; -pub(crate) mod msc3575; pub(crate) mod v3; fn load_timeline( diff --git a/src/api/client_server/sync/msc3575.rs b/src/api/client_server/sync/msc3575.rs deleted file mode 100644 index 31333eaf..00000000 --- a/src/api/client_server/sync/msc3575.rs +++ /dev/null @@ -1,671 +0,0 @@ -//! [MSC3575], aka Sliding Sync, aka Sync v3 (even though the endpoint is called -//! /v4) support -//! -//! [MSC3575]: https://github.com/matrix-org/matrix-spec-proposals/pull/3575 - -use std::{ - collections::{BTreeMap, BTreeSet, HashSet}, - time::Duration, -}; - -use ruma::{ - api::client::{ - sync::sync_events::{ - self, v4::SlidingOp, DeviceLists, UnreadNotificationsCount, - }, - uiaa::UiaaResponse, - }, - events::{ - room::member::{MembershipState, RoomMemberEventContent}, - StateEventType, TimelineEventType, - }, - uint, JsOption, UInt, UserId, -}; -use tracing::{debug, error}; - -use super::{load_timeline, share_encrypted_room}; -use crate::{ - service::{account_data, rooms::timeline::PduCount}, - services, Ar, Error, Ra, Result, -}; - -#[allow(clippy::too_many_lines)] -pub(crate) async fn sync_events_v4_route( - body: Ar, -) -> Result, Ra> { - let sender_user = body.sender_user.expect("user is authenticated"); - let sender_device = body.sender_device.expect("user is authenticated"); - let mut body = body.body; - // Setup watchers, so if there's no response, we can wait for them - let watcher = services().globals.watch(&sender_user, &sender_device); - - let next_batch = services().globals.next_count()?; - - let globalsince = - body.pos.as_ref().and_then(|string| string.parse().ok()).unwrap_or(0); - - if globalsince == 0 { - if let Some(conn_id) = &body.conn_id { - services().users.forget_sync_request_connection( - sender_user.clone(), - sender_device.clone(), - conn_id.clone(), - ); - } - } - - // Get sticky parameters from cache - let known_rooms = services().users.update_sync_request_with_cache( - sender_user.clone(), - sender_device.clone(), - &mut body, - ); - - let all_joined_rooms = services() - .rooms - .state_cache - .rooms_joined(&sender_user) - .filter_map(Result::ok) - .collect::>(); - - if body.extensions.to_device.enabled.unwrap_or(false) { - services().users.remove_to_device_events( - &sender_user, - &sender_device, - globalsince, - )?; - } - - // Users that have left any encrypted rooms the sender was in - let mut left_encrypted_users = HashSet::new(); - let mut device_list_changes = HashSet::new(); - let mut device_list_left = HashSet::new(); - - if body.extensions.e2ee.enabled.unwrap_or(false) { - // Look for device list updates of this account - device_list_changes.extend( - services() - .users - .keys_changed(sender_user.as_ref(), globalsince, None) - .filter_map(Result::ok), - ); - - for room_id in &all_joined_rooms { - let Some(current_shortstatehash) = - services().rooms.state.get_room_shortstatehash(room_id)? - else { - error!(%room_id, "Room has no state"); - continue; - }; - - let since_shortstatehash = services() - .rooms - .user - .get_token_shortstatehash(room_id, globalsince)?; - - let since_sender_member: Option = - since_shortstatehash - .and_then(|shortstatehash| { - services() - .rooms - .state_accessor - .state_get( - shortstatehash, - &StateEventType::RoomMember, - sender_user.as_str(), - ) - .transpose() - }) - .transpose()? - .and_then(|pdu| { - serde_json::from_str(pdu.content.get()) - .map_err(|_| { - Error::bad_database("Invalid PDU in database.") - }) - .ok() - }); - - let encrypted_room = services() - .rooms - .state_accessor - .state_get( - current_shortstatehash, - &StateEventType::RoomEncryption, - "", - )? - .is_some(); - - if let Some(since_shortstatehash) = since_shortstatehash { - // Skip if there are only timeline changes - if since_shortstatehash == current_shortstatehash { - continue; - } - - let since_encryption = - services().rooms.state_accessor.state_get( - since_shortstatehash, - &StateEventType::RoomEncryption, - "", - )?; - - let joined_since_last_sync = - since_sender_member.is_none_or(|member| { - member.membership != MembershipState::Join - }); - - let new_encrypted_room = - encrypted_room && since_encryption.is_none(); - if encrypted_room { - let current_state_ids = services() - .rooms - .state_accessor - .state_full_ids(current_shortstatehash) - .await?; - let since_state_ids = services() - .rooms - .state_accessor - .state_full_ids(since_shortstatehash) - .await?; - - for (key, event_id) in current_state_ids { - if since_state_ids.get(&key) != Some(&event_id) { - let Some(pdu) = - services().rooms.timeline.get_pdu(&event_id)? - else { - error!(%event_id, "Event in state not found"); - continue; - }; - if pdu.kind == TimelineEventType::RoomMember { - if let Some(state_key) = &pdu.state_key { - let user_id = - UserId::parse(state_key.clone()) - .map_err(|_| { - Error::bad_database( - "Invalid UserId in member \ - PDU.", - ) - })?; - - if user_id == sender_user { - continue; - } - - let new_membership = - serde_json::from_str::< - RoomMemberEventContent, - >( - pdu.content.get() - ) - .map_err(|_| { - Error::bad_database( - "Invalid PDU in database.", - ) - })? - .membership; - - match new_membership { - MembershipState::Join => { - // A new user joined an encrypted - // room - if !share_encrypted_room( - &sender_user, - &user_id, - room_id, - )? { - device_list_changes - .insert(user_id); - } - } - MembershipState::Leave => { - // Write down users that have left - // encrypted rooms we are in - left_encrypted_users - .insert(user_id); - } - _ => {} - } - } - } - } - } - if joined_since_last_sync || new_encrypted_room { - // If the user is in a new encrypted room, give them all - // joined users - device_list_changes.extend( - services() - .rooms - .state_cache - .room_members(room_id) - .flatten() - .filter(|user_id| { - // Don't send key updates from the sender to - // the sender - &sender_user != user_id - }) - .filter(|user_id| { - // Only send keys if the sender doesn't - // share an encrypted room with the target - // already - !share_encrypted_room( - &sender_user, - user_id, - room_id, - ) - .unwrap_or(false) - }), - ); - } - } - } - // Look for device list updates in this room - device_list_changes.extend( - services() - .users - .keys_changed(room_id.as_ref(), globalsince, None) - .filter_map(Result::ok), - ); - } - for user_id in left_encrypted_users { - let dont_share_encrypted_room = services() - .rooms - .user - .get_shared_rooms(vec![sender_user.clone(), user_id.clone()])? - .filter_map(Result::ok) - .filter_map(|other_room_id| { - Some( - services() - .rooms - .state_accessor - .room_state_get( - &other_room_id, - &StateEventType::RoomEncryption, - "", - ) - .ok()? - .is_some(), - ) - }) - .all(|encrypted| !encrypted); - // If the user doesn't share an encrypted room with the target - // anymore, we need to tell them - if dont_share_encrypted_room { - device_list_left.insert(user_id); - } - } - } - - let mut lists = BTreeMap::new(); - // and required state - let mut todo_rooms = BTreeMap::new(); - - for (list_id, list) in body.lists { - if list.filters.and_then(|f| f.is_invite).unwrap_or(false) { - continue; - } - - let mut new_known_rooms = BTreeSet::new(); - - lists.insert( - list_id.clone(), - sync_events::v4::SyncList { - ops: list - .ranges - .into_iter() - .map(|mut r| { - r.0 = r.0.clamp( - uint!(0), - UInt::try_from(all_joined_rooms.len() - 1) - .unwrap_or(UInt::MAX), - ); - r.1 = r.1.clamp( - r.0, - UInt::try_from(all_joined_rooms.len() - 1) - .unwrap_or(UInt::MAX), - ); - let room_ids = all_joined_rooms[r - .0 - .try_into() - .unwrap_or(usize::MAX) - ..=r.1.try_into().unwrap_or(usize::MAX)] - .to_vec(); - new_known_rooms.extend(room_ids.iter().cloned()); - for room_id in &room_ids { - let todo_room = todo_rooms - .entry(room_id.clone()) - .or_insert((BTreeSet::new(), 0, u64::MAX)); - let limit = list - .room_details - .timeline_limit - .map_or(10, u64::from) - .min(100); - todo_room.0.extend( - list.room_details - .required_state - .iter() - .cloned(), - ); - todo_room.1 = todo_room.1.max(limit); - // 0 means unknown because it got out of date - todo_room.2 = todo_room.2.min( - known_rooms - .get(&list_id) - .and_then(|k| k.get(room_id)) - .copied() - .unwrap_or(0), - ); - } - sync_events::v4::SyncOp { - op: SlidingOp::Sync, - range: Some(r), - index: None, - room_ids, - room_id: None, - } - }) - .collect(), - count: UInt::try_from(all_joined_rooms.len()) - .unwrap_or(UInt::MAX), - }, - ); - - if let Some(conn_id) = &body.conn_id { - services().users.update_sync_known_rooms( - sender_user.clone(), - sender_device.clone(), - conn_id.clone(), - list_id, - new_known_rooms, - globalsince, - ); - } - } - - let mut known_subscription_rooms = BTreeSet::new(); - for (room_id, room) in &body.room_subscriptions { - if !services().rooms.metadata.exists(room_id)? { - continue; - } - let todo_room = todo_rooms.entry(room_id.clone()).or_insert(( - BTreeSet::new(), - 0, - u64::MAX, - )); - let limit = room.timeline_limit.map_or(10, u64::from).min(100); - todo_room.0.extend(room.required_state.iter().cloned()); - todo_room.1 = todo_room.1.max(limit); - // 0 means unknown because it got out of date - todo_room.2 = todo_room.2.min( - known_rooms - .get("subscriptions") - .and_then(|k| k.get(room_id)) - .copied() - .unwrap_or(0), - ); - known_subscription_rooms.insert(room_id.clone()); - } - - for r in body.unsubscribe_rooms { - known_subscription_rooms.remove(&r); - body.room_subscriptions.remove(&r); - } - - if let Some(conn_id) = &body.conn_id { - services().users.update_sync_known_rooms( - sender_user.clone(), - sender_device.clone(), - conn_id.clone(), - "subscriptions".to_owned(), - known_subscription_rooms, - globalsince, - ); - } - - if let Some(conn_id) = &body.conn_id { - services().users.update_sync_subscriptions( - sender_user.clone(), - sender_device.clone(), - conn_id.clone(), - body.room_subscriptions, - ); - } - - let mut rooms = BTreeMap::new(); - for (room_id, (required_state_request, timeline_limit, roomsince)) in - &todo_rooms - { - let roomsincecount = PduCount::Normal(*roomsince); - - let (timeline_pdus, limited) = load_timeline( - &sender_user, - room_id, - roomsincecount, - *timeline_limit, - )?; - - if roomsince != &0 && timeline_pdus.is_empty() { - continue; - } - - let prev_batch = timeline_pdus - .first() - .map_or(Ok::<_, Error>(None), |(pdu_count, _)| { - Ok(Some(match pdu_count { - PduCount::Backfilled(_) => { - error!("Timeline in backfill state?!"); - "0".to_owned() - } - PduCount::Normal(c) => c.to_string(), - })) - })? - .or_else(|| (roomsince != &0).then(|| roomsince.to_string())); - - let room_events: Vec<_> = timeline_pdus - .iter() - .map(|(_, pdu)| pdu.to_sync_room_event()) - .collect(); - - let required_state = required_state_request - .iter() - .filter_map(|state| { - services() - .rooms - .state_accessor - .room_state_get(room_id, &state.0, &state.1) - .ok() - .flatten() - .map(|state| state.to_sync_state_event()) - }) - .collect(); - - // Heroes - let heroes = services() - .rooms - .state_cache - .room_members(room_id) - .filter_map(Result::ok) - .filter(|member| member != &sender_user) - .filter_map(|member| { - services() - .rooms - .state_accessor - .get_member(room_id, &member) - .ok() - .flatten() - .map(|memberevent| { - ( - memberevent - .displayname - .unwrap_or_else(|| member.to_string()), - memberevent.avatar_url, - ) - }) - }) - .take(5) - .collect::>(); - let name = match &*heroes { - [] => None, - [only] => Some(only.0.clone()), - [firsts @ .., last] => Some({ - let firsts = firsts - .iter() - .map(|h| h.0.clone()) - .collect::>() - .join(", "); - - format!("{firsts} and {}", last.0) - }), - }; - - let avatar = if let [only] = &*heroes { - only.1.clone() - } else { - None - }; - - rooms.insert( - room_id.clone(), - sync_events::v4::SlidingSyncRoom { - name: services() - .rooms - .state_accessor - .get_name(room_id)? - .or(name), - avatar: if let Some(avatar) = avatar { - JsOption::Some(avatar) - } else { - match services().rooms.state_accessor.get_avatar(room_id)? { - JsOption::Some(avatar) => { - JsOption::from_option(avatar.url) - } - JsOption::Null => JsOption::Null, - JsOption::Undefined => JsOption::Undefined, - } - }, - initial: Some(roomsince == &0), - is_dm: None, - invite_state: None, - unread_notifications: UnreadNotificationsCount { - highlight_count: Some( - services() - .rooms - .user - .highlight_count(&sender_user, room_id)? - .try_into() - .expect("notification count can't go that high"), - ), - notification_count: Some( - services() - .rooms - .user - .notification_count(&sender_user, room_id)? - .try_into() - .expect("notification count can't go that high"), - ), - }, - timeline: room_events, - required_state, - prev_batch, - limited, - joined_count: Some( - services() - .rooms - .state_cache - .room_joined_count(room_id)? - .map(UInt::new_saturating) - .unwrap_or(uint!(0)), - ), - invited_count: Some( - services() - .rooms - .state_cache - .room_invited_count(room_id)? - .map(UInt::new_saturating) - .unwrap_or(uint!(0)), - ), - // Count events in timeline greater than global sync counter - num_live: None, - timestamp: None, - // TODO - heroes: None, - }, - ); - } - - if rooms - .iter() - .all(|(_, r)| r.timeline.is_empty() && r.required_state.is_empty()) - { - // Hang a few seconds so requests are not spammed - // Stop hanging if new info arrives - let mut duration = body.timeout.unwrap_or(Duration::from_secs(30)); - if duration.as_secs() > 30 { - duration = Duration::from_secs(30); - } - match tokio::time::timeout(duration, watcher).await { - Ok(x) => x.expect("watcher should succeed"), - Err(error) => debug!(%error, "Timed out"), - }; - } - - Ok(Ra(sync_events::v4::Response { - initial: globalsince == 0, - txn_id: body.txn_id.clone(), - pos: next_batch.to_string(), - lists, - rooms, - extensions: sync_events::v4::Extensions { - to_device: body - .extensions - .to_device - .enabled - .unwrap_or(false) - .then(|| { - services() - .users - .get_to_device_events(&sender_user, &sender_device) - .map(|events| sync_events::v4::ToDevice { - events, - next_batch: next_batch.to_string(), - }) - }) - .transpose()?, - e2ee: sync_events::v4::E2EE { - device_lists: DeviceLists { - changed: device_list_changes.into_iter().collect(), - left: device_list_left.into_iter().collect(), - }, - device_one_time_keys_count: services() - .users - .count_one_time_keys(&sender_user, &sender_device)?, - // Fallback keys are not yet supported - device_unused_fallback_key_types: None, - }, - account_data: sync_events::v4::AccountData { - global: if body.extensions.account_data.enabled.unwrap_or(false) - { - services() - .account_data - .global_changes_since(&sender_user, globalsince)? - .into_iter() - .map(|(event_type, content)| { - account_data::raw_global_event_from_parts( - &event_type, - &content, - ) - }) - .collect() - } else { - Vec::new() - }, - rooms: BTreeMap::new(), - }, - receipts: sync_events::v4::Receipts { - rooms: BTreeMap::new(), - }, - typing: sync_events::v4::Typing { - rooms: BTreeMap::new(), - }, - }, - delta_token: None, - })) -} diff --git a/src/api/well_known.rs b/src/api/well_known.rs index 44210e55..edb5a347 100644 --- a/src/api/well_known.rs +++ b/src/api/well_known.rs @@ -37,14 +37,5 @@ pub(crate) async fn client(_: Ar) -> Ra { Ra(client::Response { homeserver: client::HomeserverInfo::new(base_url.clone()), identity_server: None, - sliding_sync_proxy: services() - .globals - .config - .server_discovery - .client - .advertise_sliding_sync - .then_some(client::SlidingSyncProxyInfo { - url: base_url, - }), }) } diff --git a/src/cli/serve.rs b/src/cli/serve.rs index 7a528748..f0e0f969 100644 --- a/src/cli/serve.rs +++ b/src/cli/serve.rs @@ -651,7 +651,6 @@ fn client_routes() -> Router { .ruma_route(c2s::get_state_events_route) .ruma_route(c2s::get_state_events_for_key_route) .ruma_route(c2s::v3::sync_events_route) - .ruma_route(c2s::msc3575::sync_events_v4_route) .ruma_route(c2s::get_context_route) .ruma_route(c2s::get_message_events_route) .ruma_route(c2s::search_events_route) diff --git a/src/config.rs b/src/config.rs index b597e4a9..5924d232 100644 --- a/src/config.rs +++ b/src/config.rs @@ -151,9 +151,6 @@ pub(crate) struct ServerServerDiscovery { pub(crate) struct ClientServerDiscovery { /// The base URL to make client-server API requests to pub(crate) base_url: Url, - - #[serde(default, rename = "advertise_buggy_sliding_sync")] - pub(crate) advertise_sliding_sync: bool, } #[derive(Debug, Deserialize)] diff --git a/src/service/rooms/state_accessor.rs b/src/service/rooms/state_accessor.rs index 3c859a01..78af94ed 100644 --- a/src/service/rooms/state_accessor.rs +++ b/src/service/rooms/state_accessor.rs @@ -7,7 +7,6 @@ use lru_cache::LruCache; use ruma::{ events::{ room::{ - avatar::RoomAvatarEventContent, history_visibility::{ HistoryVisibility, RoomHistoryVisibilityEventContent, }, @@ -18,8 +17,8 @@ use ruma::{ StateEventType, }, state_res::Event, - EventId, JsOption, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, - ServerName, UserId, + EventId, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, ServerName, + UserId, }; use serde_json::value::to_raw_value; use tracing::{error, warn}; @@ -508,23 +507,6 @@ impl Service { ) } - #[tracing::instrument(skip(self))] - pub(crate) fn get_avatar( - &self, - room_id: &RoomId, - ) -> Result> { - self.room_state_get(room_id, &StateEventType::RoomAvatar, "")?.map_or( - Ok(JsOption::Undefined), - |s| { - serde_json::from_str(s.content.get()).map_err(|_| { - Error::bad_database( - "Invalid room avatar event in database.", - ) - }) - }, - ) - } - // Allowed because this function uses `services()` #[allow(clippy::unused_self)] #[tracing::instrument(skip(self), ret(level = "trace"))] @@ -553,24 +535,6 @@ impl Service { .is_ok() } - #[tracing::instrument(skip(self))] - pub(crate) fn get_member( - &self, - room_id: &RoomId, - user_id: &UserId, - ) -> Result> { - self.room_state_get( - room_id, - &StateEventType::RoomMember, - user_id.as_str(), - )? - .map_or(Ok(None), |s| { - serde_json::from_str(s.content.get()).map_err(|_| { - Error::bad_database("Invalid room member event in database.") - }) - }) - } - /// Checks if a given user can redact a given event /// /// If `federation` is `true`, it allows redaction events from any user of diff --git a/src/service/users.rs b/src/service/users.rs index 9df6e835..dbacb715 100644 --- a/src/service/users.rs +++ b/src/service/users.rs @@ -1,23 +1,12 @@ -use std::{ - collections::{BTreeMap, BTreeSet}, - mem, - sync::{Arc, Mutex}, -}; +use std::{collections::BTreeMap, mem}; use ruma::{ - api::client::{ - device::Device, - filter::FilterDefinition, - sync::sync_events::{ - self, - v4::{ExtensionsConfig, SyncRequestList}, - }, - }, + api::client::{device::Device, filter::FilterDefinition}, encryption::{CrossSigningKey, DeviceKeys, OneTimeKey}, events::AnyToDeviceEvent, serde::Raw, DeviceId, OneTimeKeyAlgorithm, OneTimeKeyName, OwnedDeviceId, OwnedKeyId, - OwnedMxcUri, OwnedOneTimeKeyId, OwnedRoomId, OwnedUserId, UInt, UserId, + OwnedMxcUri, OwnedOneTimeKeyId, OwnedUserId, UInt, UserId, }; use crate::{services, Error, Result}; @@ -26,30 +15,14 @@ mod data; pub(crate) use data::Data; -pub(crate) struct SlidingSyncCache { - lists: BTreeMap, - subscriptions: BTreeMap, - // For every room, the roomsince number - known_rooms: BTreeMap>, - extensions: ExtensionsConfig, -} - pub(crate) struct Service { pub(crate) db: &'static dyn Data, - #[allow(clippy::type_complexity)] - pub(crate) connections: Mutex< - BTreeMap< - (OwnedUserId, OwnedDeviceId, String), - Arc>, - >, - >, } impl Service { pub(crate) fn new(db: &'static dyn Data) -> Self { Self { db, - connections: Mutex::new(BTreeMap::new()), } } @@ -58,206 +31,6 @@ impl Service { self.db.exists(user_id) } - pub(crate) fn forget_sync_request_connection( - &self, - user_id: OwnedUserId, - device_id: OwnedDeviceId, - conn_id: String, - ) { - self.connections.lock().unwrap().remove(&(user_id, device_id, conn_id)); - } - - #[allow(clippy::too_many_lines)] - pub(crate) fn update_sync_request_with_cache( - &self, - user_id: OwnedUserId, - device_id: OwnedDeviceId, - request: &mut sync_events::v4::Request, - ) -> BTreeMap> { - let Some(conn_id) = request.conn_id.clone() else { - return BTreeMap::new(); - }; - - let mut cache = self.connections.lock().unwrap(); - let cached = Arc::clone( - cache.entry((user_id, device_id, conn_id)).or_insert_with(|| { - Arc::new(Mutex::new(SlidingSyncCache { - lists: BTreeMap::new(), - subscriptions: BTreeMap::new(), - known_rooms: BTreeMap::new(), - extensions: ExtensionsConfig::default(), - })) - }), - ); - let cached = &mut cached.lock().unwrap(); - drop(cache); - - for (list_id, list) in &mut request.lists { - if let Some(cached_list) = cached.lists.get(list_id) { - if list.sort.is_empty() { - list.sort.clone_from(&cached_list.sort); - } - if list.room_details.required_state.is_empty() { - list.room_details - .required_state - .clone_from(&cached_list.room_details.required_state); - } - list.room_details.timeline_limit = list - .room_details - .timeline_limit - .or(cached_list.room_details.timeline_limit); - list.include_old_rooms = list - .include_old_rooms - .clone() - .or(cached_list.include_old_rooms.clone()); - match (&mut list.filters, cached_list.filters.clone()) { - (Some(list_filters), Some(cached_filters)) => { - list_filters.is_dm = - list_filters.is_dm.or(cached_filters.is_dm); - if list_filters.spaces.is_empty() { - list_filters.spaces = cached_filters.spaces; - } - list_filters.is_encrypted = list_filters - .is_encrypted - .or(cached_filters.is_encrypted); - list_filters.is_invite = - list_filters.is_invite.or(cached_filters.is_invite); - if list_filters.room_types.is_empty() { - list_filters.room_types = cached_filters.room_types; - } - if list_filters.not_room_types.is_empty() { - list_filters.not_room_types = - cached_filters.not_room_types; - } - list_filters.room_name_like = list_filters - .room_name_like - .clone() - .or(cached_filters.room_name_like); - if list_filters.tags.is_empty() { - list_filters.tags = cached_filters.tags; - } - if list_filters.not_tags.is_empty() { - list_filters.not_tags = cached_filters.not_tags; - } - } - (_, Some(cached_filters)) => { - list.filters = Some(cached_filters); - } - (Some(list_filters), _) => { - list.filters = Some(list_filters.clone()); - } - (..) => {} - } - if list.bump_event_types.is_empty() { - list.bump_event_types - .clone_from(&cached_list.bump_event_types); - } - } - cached.lists.insert(list_id.clone(), list.clone()); - } - - cached.subscriptions.extend( - request - .room_subscriptions - .iter() - .map(|(k, v)| (k.clone(), v.clone())), - ); - request.room_subscriptions.extend( - cached.subscriptions.iter().map(|(k, v)| (k.clone(), v.clone())), - ); - - request.extensions.e2ee.enabled = - request.extensions.e2ee.enabled.or(cached.extensions.e2ee.enabled); - - request.extensions.to_device.enabled = request - .extensions - .to_device - .enabled - .or(cached.extensions.to_device.enabled); - - request.extensions.account_data.enabled = request - .extensions - .account_data - .enabled - .or(cached.extensions.account_data.enabled); - request.extensions.account_data.lists = request - .extensions - .account_data - .lists - .clone() - .or(cached.extensions.account_data.lists.clone()); - request.extensions.account_data.rooms = request - .extensions - .account_data - .rooms - .clone() - .or(cached.extensions.account_data.rooms.clone()); - - cached.extensions = request.extensions.clone(); - - cached.known_rooms.clone() - } - - pub(crate) fn update_sync_subscriptions( - &self, - user_id: OwnedUserId, - device_id: OwnedDeviceId, - conn_id: String, - subscriptions: BTreeMap, - ) { - let mut cache = self.connections.lock().unwrap(); - let cached = Arc::clone( - cache.entry((user_id, device_id, conn_id)).or_insert_with(|| { - Arc::new(Mutex::new(SlidingSyncCache { - lists: BTreeMap::new(), - subscriptions: BTreeMap::new(), - known_rooms: BTreeMap::new(), - extensions: ExtensionsConfig::default(), - })) - }), - ); - let cached = &mut cached.lock().unwrap(); - drop(cache); - - cached.subscriptions = subscriptions; - } - - pub(crate) fn update_sync_known_rooms( - &self, - user_id: OwnedUserId, - device_id: OwnedDeviceId, - conn_id: String, - list_id: String, - new_cached_rooms: BTreeSet, - globalsince: u64, - ) { - let mut cache = self.connections.lock().unwrap(); - let cached = Arc::clone( - cache.entry((user_id, device_id, conn_id)).or_insert_with(|| { - Arc::new(Mutex::new(SlidingSyncCache { - lists: BTreeMap::new(), - subscriptions: BTreeMap::new(), - known_rooms: BTreeMap::new(), - extensions: ExtensionsConfig::default(), - })) - }), - ); - let cached = &mut cached.lock().unwrap(); - drop(cache); - - for (roomid, lastsince) in - cached.known_rooms.entry(list_id.clone()).or_default().iter_mut() - { - if !new_cached_rooms.contains(roomid) { - *lastsince = 0; - } - } - let list = cached.known_rooms.entry(list_id).or_default(); - for roomid in new_cached_rooms { - list.insert(roomid, globalsince); - } - } - /// Check if account is deactivated pub(crate) fn is_deactivated(&self, user_id: &UserId) -> Result { self.db.is_deactivated(user_id)