//! [MSC4186], aka Simplified Sliding Sync, aka Simplified [MSC3575], support //! //! [MSC4186]: https://github.com/matrix-org/matrix-spec-proposals/pull/4186 //! [MSC3575]: https://github.com/matrix-org/matrix-spec-proposals/pull/3575 use std::{ collections::{BTreeMap, BTreeSet, HashSet}, time::Duration, }; use ruma::{ api::client::{ sync::sync_events::{ self, v5::{request::ListFilters, response::Hero}, DeviceLists, UnreadNotificationsCount, }, uiaa::UiaaResponse, }, events::{ direct::DirectEventContent, room::{ create::RoomCreateEventContent, encryption::PossiblyRedactedRoomEncryptionEventContent, member::{MembershipState, RoomMemberEventContent}, }, AnyStrippedStateEvent, PossiblyRedactedStateEventContent, StateEventType, StrippedStateEvent, TimelineEventType, }, room::RoomType, serde::Raw, uint, JsOption, OwnedRoomId, RoomId, UInt, UserId, }; use serde::de::DeserializeOwned; use tracing::{debug, error, field, trace, warn}; use super::{load_timeline, share_encrypted_room}; use crate::{ service::{ account_data, rooms::{ short::ShortStateHash, state::ExtractType, timeline::PduCount, }, users::ConnectionKey, }, services, utils, Ar, Error, Ra, Result, }; #[derive(Debug)] enum RequiredStateKeys { All, Selected(BTreeSet), } impl RequiredStateKeys { fn merge(&mut self, key: String) { match self { RequiredStateKeys::All => { // nothing to do, we're already getting all keys } RequiredStateKeys::Selected(keys) => { if key == "*" { *self = RequiredStateKeys::All; } else { keys.insert(key); } } } } } #[derive(Debug)] struct RequiredState { /// Indicates that a `("*", "*")` tuple was present in `required_state`. /// When `true`, all state events are sent by default, except for state /// event types that are present in `filters`, for which only the /// request state keys are sent. all_events: bool, filters: BTreeMap, } impl RequiredState { fn update( &mut self, required_state: Vec<(StateEventType, String)>, sender_user: &UserId, ) { let contains_wildcard = required_state .iter() .any(|(typ, key)| typ.to_string() == "*" && key == "*"); let mut old_filters = None; if contains_wildcard { if self.all_events { // filters already contains existing negative filters, remember // them and only apply new filters that were // already there previously old_filters = Some(std::mem::take(&mut self.filters)); } else { // clear existing positive filters self.filters = BTreeMap::new(); } self.all_events = true; } else if self.all_events { // all events were requested previously, don't add any additional // positive filters return; } for (typ, mut key) in required_state { if typ.to_string() == "*" { continue; } if key == "$ME" { key = sender_user.to_string(); } if let Some(old_filters) = old_filters.as_mut() { // re-insert the old negative filter if it matches the new // negative filter exactly if let Some(old_filter) = old_filters.remove(&typ) { if let RequiredStateKeys::Selected(state_keys) = &old_filter { if state_keys.len() == 1 && state_keys.contains(&key) { self.filters.insert(typ, old_filter); } } } } else { // add the key to the filter for this event type self.filters .entry(typ) .or_insert_with(|| { RequiredStateKeys::Selected(BTreeSet::new()) }) .merge(key); } } } fn matches(&self, typ: &StateEventType, key: &str) -> bool { match self.filters.get(typ) { Some(keys) => match keys { RequiredStateKeys::All => true, RequiredStateKeys::Selected(keys) => keys.contains(key), }, None => self.all_events, } } } #[derive(Debug)] struct TodoRoom { required_state: RequiredState, timeline_limit: u64, roomsince: u64, } impl TodoRoom { fn update( &mut self, required_state: Vec<(StateEventType, String)>, timeline_limit: UInt, known_rooms: &BTreeMap, room_id: &RoomId, sender_user: &UserId, ) { self.required_state.update(required_state, sender_user); self.timeline_limit = self.timeline_limit.max(u64::from(timeline_limit).min(100)); // 0 means unknown because it got out of date self.roomsince = self.roomsince.min(known_rooms.get(room_id).copied().unwrap_or(0)); } } impl Default for TodoRoom { fn default() -> Self { Self { required_state: RequiredState { all_events: false, filters: BTreeMap::new(), }, timeline_limit: 0, roomsince: u64::MAX, } } } fn is_dm_room(user: &UserId, room: &RoomId) -> Result { let Some(event) = services().account_data.get_global::(user)? else { return Ok(false); }; let event = event .deserialize() .map_err(|_| Error::bad_database("Invalid m.direct event"))?; Ok(event.values().flatten().any(|r| r == room)) } fn is_encrypted_room(current_shortstatehash: ShortStateHash) -> Result { Ok(services() .rooms .state_accessor .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")? .is_some()) } fn get_invite_state( invite_state: &[Raw], ) -> Option> where T: PossiblyRedactedStateEventContent + DeserializeOwned, { invite_state .iter() .find_map(|ev| ev.deserialize_as::>().ok()) } #[derive(Debug)] struct RoomData { id: OwnedRoomId, current_shortstatehash: ShortStateHash, is_dm: bool, is_encrypted: bool, is_invite: bool, room_type: Option, } impl RoomData { #[tracing::instrument] fn new( id: OwnedRoomId, user: &UserId, invite_state: Option<&[Raw]>, ) -> Result { let current_shortstatehash = services() .rooms .state .get_room_shortstatehash(&id)? .ok_or_else(|| Error::bad_database("Room has no state"))?; let room_type = if let Some(invite_state) = &invite_state { get_invite_state::(invite_state) .and_then(|e| e.content.room_type) } else { services().rooms.state.get_create_content::(&id)? }; let is_dm = match is_dm_room(user, &id) { Ok(x) => x, Err(error) => { error!(%error, %user, "Invalid m.direct account data event"); false } }; let is_encrypted = if let Some(invite_state) = &invite_state { get_invite_state::( invite_state, ) .is_some() } else { is_encrypted_room(current_shortstatehash)? }; let is_invite = invite_state.is_some(); Ok(Self { id, current_shortstatehash, is_dm, is_encrypted, is_invite, room_type, }) } #[tracing::instrument(skip(self), fields(room_id = self.id.as_str()))] fn matches_filter(&self, filter_data: &ListFilters) -> Result { if let Some(is_dm) = filter_data.is_dm { if self.is_dm != is_dm { return Ok(false); } } if let Some(is_encrypted) = filter_data.is_encrypted { if self.is_encrypted != is_encrypted { return Ok(false); } } if let Some(is_invite) = filter_data.is_invite { if self.is_invite != is_invite { return Ok(false); } } let room_type = self.room_type.clone().into(); if filter_data.not_room_types.contains(&room_type) { return Ok(false); } if !filter_data.room_types.is_empty() && !filter_data.room_types.contains(&room_type) { return Ok(false); } Ok(true) } } #[tracing::instrument(skip_all)] fn joined_rooms_data(sender_user: &UserId) -> Vec { services() .rooms .state_cache .rooms_joined(sender_user) .filter_map(Result::ok) .filter_map(move |id| { RoomData::new(id.clone(), sender_user, None) .inspect_err(|error| { error!(%error, room_id = %id, "Failed to get data for room, skipping"); }) .ok() }).collect() } #[tracing::instrument(skip_all)] fn invited_rooms_data(sender_user: &UserId) -> Vec { services() .rooms .state_cache .rooms_invited(sender_user) .filter_map(Result::ok) .filter_map(move |(id, invite_state)| { RoomData::new(id.clone(), sender_user, Some(&invite_state)) .inspect_err(|error| { error!( %error, room_id = %id, "Failed to get data for room, skipping" ); }) .ok() }) .collect() } #[allow(clippy::too_many_lines)] #[tracing::instrument(skip_all, fields( pos, next_batch, connection_id = ?body.conn_id, ))] pub(crate) async fn sync_events_v5_route( body: Ar, ) -> Result, Ra> { let current_span = tracing::Span::current(); let sender_user = body.sender_user.expect("user is authenticated"); let sender_device = body.sender_device.expect("user is authenticated"); let body = body.body; // Setup watchers, so if there's no response, we can wait for them let watcher = services().globals.watch(&sender_user, &sender_device); let next_batch = services().globals.next_count()?; current_span.record("next_batch", field::display(&next_batch)); current_span.record("pos", field::debug(&body.pos)); let globalsince = body.pos.as_ref().and_then(|string| string.parse().ok()).unwrap_or(0); let connection_key = ConnectionKey { user: sender_user.clone(), device: sender_device.clone(), connection: body.conn_id.clone(), }; if globalsince == 0 { services().users.forget_sync_request_connection(&connection_key); } let known_rooms = services() .users .get_rooms_in_connection(connection_key.clone(), globalsince); let all_joined_rooms = joined_rooms_data(&sender_user); if body.extensions.to_device.enabled.unwrap_or(false) { services().users.remove_to_device_events( &sender_user, &sender_device, globalsince, )?; } #[allow(clippy::if_then_some_else_none)] let e2ee = if body.extensions.e2ee.enabled == Some(true) { Some(sync_events::v5::response::E2EE { device_lists: get_e2ee_data( &sender_user, globalsince, &all_joined_rooms, ) .await?, device_one_time_keys_count: services() .users .count_one_time_keys(&sender_user, &sender_device)?, // Fallback keys are not yet supported device_unused_fallback_key_types: None, }) } else { None }; let mut all_rooms = all_joined_rooms; all_rooms.extend(invited_rooms_data(&sender_user)); let all_room_ids: Vec<_> = all_rooms.iter().map(|r| r.id.clone()).collect(); let all_room_ids: Vec<_> = all_room_ids.iter().map(|id| &**id).collect(); let mut todo_rooms: BTreeMap = BTreeMap::new(); let lists = body .lists .into_iter() .map(|(list_id, list)| { let rooms = rooms_in_list( &list_id, list, &all_rooms, &all_room_ids, &known_rooms, &mut todo_rooms, &sender_user, ); (list_id, rooms) }) .collect(); for (room_id, room) in &body.room_subscriptions { if !services().rooms.metadata.exists(room_id)? { warn!(room_id = room_id.as_str(), "Subscribed room does not exist"); continue; } todo_rooms.entry(room_id.clone()).or_default().update( room.required_state.clone(), room.timeline_limit, &known_rooms, room_id, &sender_user, ); } services().users.update_sync_known_rooms( connection_key.clone(), todo_rooms.keys().cloned().collect(), globalsince, next_batch, ); let to_device = if body.extensions.to_device.enabled == Some(true) { let events = services() .users .get_to_device_events(&sender_user, &sender_device)?; if !events.is_empty() { debug!( events = utils::debug_slice_truncated(&events, 3), "Got new to-device events" ); } Some(sync_events::v5::response::ToDevice { events, next_batch: next_batch.to_string(), }) } else { None }; let mut rooms = BTreeMap::new(); for (room_id, todo_room) in todo_rooms { if let Some(room) = process_room(&sender_user, &room_id, &todo_room)? { rooms.insert(room_id.clone(), room); } } if rooms .iter() .all(|(_, r)| r.timeline.is_empty() && r.required_state.is_empty()) { // Hang a few seconds so requests are not spammed // Stop hanging if new info arrives let mut duration = body.timeout.unwrap_or(Duration::from_secs(30)); if duration.as_secs() > 30 { duration = Duration::from_secs(30); } match tokio::time::timeout(duration, watcher).await { Ok(x) => x.expect("watcher should succeed"), Err(error) => debug!(%error, "Timed out"), }; } Ok(Ra(sync_events::v5::Response { txn_id: body.txn_id.clone(), pos: next_batch.to_string(), lists, rooms, extensions: sync_events::v5::response::Extensions { to_device, e2ee: e2ee.unwrap_or_default(), account_data: sync_events::v5::response::AccountData { global: if body.extensions.account_data.enabled.unwrap_or(false) { services() .account_data .global_changes_since(&sender_user, globalsince)? .into_iter() .map(|(event_type, content)| { account_data::raw_global_event_from_parts( &event_type, &content, ) }) .collect() } else { Vec::new() }, rooms: BTreeMap::new(), }, receipts: sync_events::v5::response::Receipts { rooms: BTreeMap::new(), }, typing: sync_events::v5::response::Typing { rooms: BTreeMap::new(), }, }, })) } #[allow(clippy::too_many_lines)] #[tracing::instrument(skip_all)] async fn get_e2ee_data( sender_user: &UserId, globalsince: u64, all_joined_rooms: &[RoomData], ) -> Result { // Users that have left any encrypted rooms the sender was in let mut left_encrypted_users = HashSet::new(); // Look for device list updates of this account let mut device_list_changes: HashSet<_> = services() .users .keys_changed(sender_user.as_ref(), globalsince, None) .filter_map(Result::ok) .collect(); for RoomData { id: room_id, current_shortstatehash, is_encrypted, .. } in all_joined_rooms { let since_shortstatehash = services() .rooms .user .get_token_shortstatehash(room_id, globalsince)?; let since_sender_member: Option = since_shortstatehash .and_then(|shortstatehash| { services() .rooms .state_accessor .state_get( shortstatehash, &StateEventType::RoomMember, sender_user.as_str(), ) .transpose() }) .transpose()? .and_then(|pdu| { serde_json::from_str(pdu.content.get()) .map_err(|_| { Error::bad_database("Invalid PDU in database.") }) .ok() }); if let Some(since_shortstatehash) = since_shortstatehash { // Skip if there are only timeline changes if since_shortstatehash == *current_shortstatehash { continue; } let since_encryption = services().rooms.state_accessor.state_get( since_shortstatehash, &StateEventType::RoomEncryption, "", )?; let joined_since_last_sync = since_sender_member.is_none_or(|member| { member.membership != MembershipState::Join }); let new_encrypted_room = *is_encrypted && since_encryption.is_none(); if *is_encrypted { let current_state_ids = services() .rooms .state_accessor .state_full_ids(*current_shortstatehash) .await?; let since_state_ids = services() .rooms .state_accessor .state_full_ids(since_shortstatehash) .await?; for (key, event_id) in current_state_ids { if since_state_ids.get(&key) != Some(&event_id) { let Some(pdu) = services().rooms.timeline.get_pdu(&event_id)? else { error!(%event_id, "Event in state not found"); continue; }; if pdu.kind == TimelineEventType::RoomMember { if let Some(state_key) = &pdu.state_key { let user_id = UserId::parse(state_key.clone()) .map_err(|_| { Error::bad_database( "Invalid UserId in member PDU.", ) })?; if user_id == sender_user { continue; } let new_membership = serde_json::from_str::< RoomMemberEventContent, >( pdu.content.get() ) .map_err(|_| { Error::bad_database( "Invalid PDU in database.", ) })? .membership; match new_membership { MembershipState::Join => { // A new user joined an encrypted // room if !share_encrypted_room( sender_user, &user_id, room_id, )? { device_list_changes.insert(user_id); } } MembershipState::Leave => { // Write down users that have left // encrypted rooms we are in left_encrypted_users.insert(user_id); } _ => {} } } } } } if joined_since_last_sync || new_encrypted_room { // If the user is in a new encrypted room, give them all // joined users device_list_changes.extend( services() .rooms .state_cache .room_members(room_id) .flatten() .filter(|user_id| { // Don't send key updates from the sender to // the sender sender_user != user_id }) .filter(|user_id| { // Only send keys if the sender doesn't // share an encrypted room with the target // already !share_encrypted_room( sender_user, user_id, room_id, ) .unwrap_or(false) }), ); } } } // Look for device list updates in this room device_list_changes.extend( services() .users .keys_changed(room_id.as_ref(), globalsince, None) .filter_map(Result::ok), ); } let mut device_list_left = HashSet::new(); for user_id in left_encrypted_users { let dont_share_encrypted_room = services() .rooms .user .get_shared_rooms(vec![sender_user.to_owned(), user_id.clone()])? .filter_map(Result::ok) .filter_map(|other_room_id| { Some( services() .rooms .state_accessor .room_state_get( &other_room_id, &StateEventType::RoomEncryption, "", ) .ok()? .is_some(), ) }) .all(|encrypted| !encrypted); // If the user doesn't share an encrypted room with the target // anymore, we need to tell them if dont_share_encrypted_room { device_list_left.insert(user_id); } } Ok(DeviceLists { changed: device_list_changes.into_iter().collect(), left: device_list_left.into_iter().collect(), }) } #[tracing::instrument( skip_all, fields(list_id = list_id, ?list), )] fn rooms_in_list( list_id: &str, list: sync_events::v5::request::List, all_rooms: &[RoomData], all_room_ids: &[&RoomId], known_rooms: &BTreeMap, todo_rooms: &mut BTreeMap, sender_user: &UserId, ) -> sync_events::v5::response::List { trace!(list_id, ?list, "Collecting rooms in list"); let matching_room_ids_buf: Vec<&RoomId>; let matching_room_ids = if let Some(filters) = list.filters.as_ref() { matching_room_ids_buf = all_rooms .iter() .filter_map(|r| { match r.matches_filter(filters) { Ok(pass) => pass.then_some(&*r.id), Err(error) => { warn!(%error, ?filters, room_id=r.id.as_str(), "Failed to evaluate list filter, skipping room"); None } } }) .collect(); matching_room_ids_buf.as_slice() } else { all_room_ids }; if !matching_room_ids.is_empty() { let mut list_room_ids: BTreeSet<&RoomId> = BTreeSet::new(); for (from, to) in list.ranges { let from = usize::try_from(from) .unwrap_or(usize::MAX) .clamp(0, matching_room_ids.len() - 1); let to = usize::try_from(to) .unwrap_or(usize::MAX) .clamp(from, matching_room_ids.len() - 1); list_room_ids.extend(&matching_room_ids[from..=to]); } for room_id in list_room_ids { todo_rooms.entry(room_id.to_owned()).or_default().update( list.room_details.required_state.clone(), list.room_details.timeline_limit, known_rooms, room_id, sender_user, ); } } let num_rooms = matching_room_ids.len(); trace!(list_id, num_rooms, "Done collecting rooms"); sync_events::v5::response::List { count: UInt::try_from(num_rooms).unwrap_or(UInt::MAX), } } #[allow(clippy::too_many_lines)] #[tracing::instrument(skip(sender_user))] fn process_room( sender_user: &UserId, room_id: &RoomId, todo_room: &TodoRoom, ) -> Result> { let roomsincecount = PduCount::Normal(todo_room.roomsince); let (timeline_pdus, limited) = load_timeline( sender_user, room_id, roomsincecount, todo_room.timeline_limit, )?; if todo_room.roomsince != 0 && timeline_pdus.is_empty() { return Ok(None); } let prev_batch = timeline_pdus .first() .map(|(pdu_count, _)| match pdu_count { PduCount::Backfilled(_) => { error!("Timeline in backfill state?!"); "0".to_owned() } PduCount::Normal(c) => c.to_string(), }) .or_else(|| { (todo_room.roomsince != 0).then(|| todo_room.roomsince.to_string()) }); // TODO: consider only message-like PDUs here, rather than all PDUs let bump_stamp = match services() .rooms .timeline .last_timeline_count(sender_user, room_id)? { PduCount::Backfilled(n) | PduCount::Normal(n) => { Some(UInt::new_saturating(n)) } }; let room_events: Vec<_> = timeline_pdus.iter().map(|(_, pdu)| pdu.to_sync_room_event()).collect(); let Some(current_shortstatehash) = services().rooms.state.get_room_shortstatehash(room_id)? else { error!(%room_id, "Room has no state"); return Ok(None); }; let need_scan = todo_room.required_state.all_events || todo_room .required_state .filters .iter() .any(|(_, keys)| matches!(keys, RequiredStateKeys::All)); let required_state = if need_scan { let full_state = services() .rooms .state_compressor .load_shortstatehash_info(current_shortstatehash)? .pop() .expect("there is always one layer") .full_state; full_state .iter() .filter_map(|compressed| { let Ok((typ, key)) = services() .rooms .short .get_statekey_from_short(compressed.state) else { warn!( ?compressed, "Failed to get info for shortstatekey, skipping" ); return None; }; if !todo_room.required_state.matches(&typ, &key) { return None; } let shorteventid = compressed.event; let pdu = match services() .rooms .short .get_eventid_from_short(shorteventid) { Ok(event_id) => { services().rooms.timeline.get_pdu(&event_id) } Err(error) => { warn!( %error, %typ, key, ?shorteventid, "Failed to get event ID from short event ID" ); return None; } }; match pdu { Ok(Some(pdu)) => Some(pdu.to_sync_state_event()), Ok(None) => None, Err(error) => { warn!(%error, %typ, key, "Failed to get state PDU"); None } } }) .collect() } else { todo_room .required_state .filters .iter() .flat_map(|(typ, keys)| { let RequiredStateKeys::Selected(keys) = keys else { panic!( "wildcard key should have triggered a full state scan" ); }; keys.iter().filter_map(move |key| { match services().rooms.state_accessor.state_get( current_shortstatehash, typ, key, ) { Ok(Some(pdu)) => Some(pdu.to_sync_state_event()), Ok(None) => None, Err(error) => { warn!(%error, %typ, key, "Failed to get state PDU"); None } } }) }) .collect() }; let name = services().rooms.state_accessor.get_name(room_id)?; let heroes = name.is_none().then(|| { services() .rooms .state_cache .room_members(room_id) .filter_map(Result::ok) .filter(|member| member != sender_user) .filter_map(|member| { services() .rooms .state_accessor .get_member(room_id, &member) .ok() .flatten() .map(|memberevent| Hero { user_id: member, name: memberevent.displayname, avatar: memberevent.avatar_url, }) }) .take(5) .collect::>() }); let room = sync_events::v5::response::Room { name, avatar: match services().rooms.state_accessor.get_avatar(room_id)? { JsOption::Some(avatar) => JsOption::from_option(avatar.url), JsOption::Null => JsOption::Null, JsOption::Undefined => JsOption::Undefined, }, initial: Some(todo_room.roomsince == 0), is_dm: None, invite_state: None, unread_notifications: UnreadNotificationsCount { highlight_count: Some( services() .rooms .user .highlight_count(sender_user, room_id)? .try_into() .expect("notification count can't go that high"), ), notification_count: Some( services() .rooms .user .notification_count(sender_user, room_id)? .try_into() .expect("notification count can't go that high"), ), }, timeline: room_events, required_state, prev_batch, limited, joined_count: Some( services() .rooms .state_cache .room_joined_count(room_id)? .map(UInt::new_saturating) .unwrap_or(uint!(0)), ), invited_count: Some( services() .rooms .state_cache .room_invited_count(room_id)? .map(UInt::new_saturating) .unwrap_or(uint!(0)), ), // Count events in timeline greater than global sync counter num_live: None, bump_stamp, heroes, }; trace!(?room, "Built room data"); Ok(Some(room)) }