diff --git a/Cargo.lock b/Cargo.lock index 274e64f8..b29d6164 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2442,7 +2442,7 @@ dependencies = [ [[package]] name = "ruma" version = "0.12.1" -source = "git+https://github.com/ruma/ruma.git?rev=1387667de806c37a6d7f72125117009bd618e32a#1387667de806c37a6d7f72125117009bd618e32a" +source = "git+https://gitlab.computer.surgery/matrix/ruma.git?rev=ruma-0.12.1%2Bgrapevine-1#598c22e1d99ac6ce5b3864259e63a6d5bdc00199" dependencies = [ "assign", "js_int", @@ -2461,7 +2461,7 @@ dependencies = [ [[package]] name = "ruma-appservice-api" version = "0.12.1" -source = "git+https://github.com/ruma/ruma.git?rev=1387667de806c37a6d7f72125117009bd618e32a#1387667de806c37a6d7f72125117009bd618e32a" +source = "git+https://gitlab.computer.surgery/matrix/ruma.git?rev=ruma-0.12.1%2Bgrapevine-1#598c22e1d99ac6ce5b3864259e63a6d5bdc00199" dependencies = [ "js_int", "ruma-common", @@ -2473,7 +2473,7 @@ dependencies = [ [[package]] name = "ruma-client-api" version = "0.20.1" -source = "git+https://github.com/ruma/ruma.git?rev=1387667de806c37a6d7f72125117009bd618e32a#1387667de806c37a6d7f72125117009bd618e32a" +source = "git+https://gitlab.computer.surgery/matrix/ruma.git?rev=ruma-0.12.1%2Bgrapevine-1#598c22e1d99ac6ce5b3864259e63a6d5bdc00199" dependencies = [ "as_variant", "assign", @@ -2496,7 +2496,7 @@ dependencies = [ [[package]] name = "ruma-common" version = "0.15.1" -source = "git+https://github.com/ruma/ruma.git?rev=1387667de806c37a6d7f72125117009bd618e32a#1387667de806c37a6d7f72125117009bd618e32a" +source = "git+https://gitlab.computer.surgery/matrix/ruma.git?rev=ruma-0.12.1%2Bgrapevine-1#598c22e1d99ac6ce5b3864259e63a6d5bdc00199" dependencies = [ "as_variant", "base64", @@ -2527,7 +2527,7 @@ dependencies = [ [[package]] name = "ruma-events" version = "0.30.1" -source = "git+https://github.com/ruma/ruma.git?rev=1387667de806c37a6d7f72125117009bd618e32a#1387667de806c37a6d7f72125117009bd618e32a" +source = "git+https://gitlab.computer.surgery/matrix/ruma.git?rev=ruma-0.12.1%2Bgrapevine-1#598c22e1d99ac6ce5b3864259e63a6d5bdc00199" dependencies = [ "as_variant", "indexmap 2.10.0", @@ -2550,7 +2550,7 @@ dependencies = [ [[package]] name = "ruma-federation-api" version = "0.11.0" -source = "git+https://github.com/ruma/ruma.git?rev=1387667de806c37a6d7f72125117009bd618e32a#1387667de806c37a6d7f72125117009bd618e32a" +source = "git+https://gitlab.computer.surgery/matrix/ruma.git?rev=ruma-0.12.1%2Bgrapevine-1#598c22e1d99ac6ce5b3864259e63a6d5bdc00199" dependencies = [ "bytes", "headers", @@ -2572,7 +2572,7 @@ dependencies = [ [[package]] name = "ruma-identifiers-validation" version = "0.10.1" -source = "git+https://github.com/ruma/ruma.git?rev=1387667de806c37a6d7f72125117009bd618e32a#1387667de806c37a6d7f72125117009bd618e32a" +source = "git+https://gitlab.computer.surgery/matrix/ruma.git?rev=ruma-0.12.1%2Bgrapevine-1#598c22e1d99ac6ce5b3864259e63a6d5bdc00199" dependencies = [ "js_int", "thiserror 2.0.12", @@ -2581,7 +2581,7 @@ dependencies = [ [[package]] name = "ruma-macros" version = "0.15.1" -source = "git+https://github.com/ruma/ruma.git?rev=1387667de806c37a6d7f72125117009bd618e32a#1387667de806c37a6d7f72125117009bd618e32a" +source = "git+https://gitlab.computer.surgery/matrix/ruma.git?rev=ruma-0.12.1%2Bgrapevine-1#598c22e1d99ac6ce5b3864259e63a6d5bdc00199" dependencies = [ "cfg-if", "proc-macro-crate", @@ -2596,7 +2596,7 @@ dependencies = [ [[package]] name = "ruma-push-gateway-api" version = "0.11.0" -source = "git+https://github.com/ruma/ruma.git?rev=1387667de806c37a6d7f72125117009bd618e32a#1387667de806c37a6d7f72125117009bd618e32a" +source = "git+https://gitlab.computer.surgery/matrix/ruma.git?rev=ruma-0.12.1%2Bgrapevine-1#598c22e1d99ac6ce5b3864259e63a6d5bdc00199" dependencies = [ "js_int", "ruma-common", @@ -2608,7 +2608,7 @@ dependencies = [ [[package]] name = "ruma-signatures" version = "0.17.0" -source = "git+https://github.com/ruma/ruma.git?rev=1387667de806c37a6d7f72125117009bd618e32a#1387667de806c37a6d7f72125117009bd618e32a" +source = "git+https://gitlab.computer.surgery/matrix/ruma.git?rev=ruma-0.12.1%2Bgrapevine-1#598c22e1d99ac6ce5b3864259e63a6d5bdc00199" dependencies = [ "base64", "ed25519-dalek", @@ -2624,7 +2624,7 @@ dependencies = [ [[package]] name = "ruma-state-res" version = "0.13.0" -source = "git+https://github.com/ruma/ruma.git?rev=1387667de806c37a6d7f72125117009bd618e32a#1387667de806c37a6d7f72125117009bd618e32a" +source = "git+https://gitlab.computer.surgery/matrix/ruma.git?rev=ruma-0.12.1%2Bgrapevine-1#598c22e1d99ac6ce5b3864259e63a6d5bdc00199" dependencies = [ "js_int", "ruma-common", diff --git a/Cargo.toml b/Cargo.toml index bdea44fb..6222d501 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -144,8 +144,8 @@ trust-dns-resolver = "0.23.2" xdg = "2.5.2" [dependencies.ruma] -git = "https://github.com/ruma/ruma.git" -rev = "1387667de806c37a6d7f72125117009bd618e32a" +git = "https://gitlab.computer.surgery/matrix/ruma.git" +rev = "ruma-0.12.1+grapevine-1" features = [ "compat-server-signing-key-version", "compat-empty-string-null", @@ -165,6 +165,7 @@ features = [ "state-res", "unstable-msc2448", "ring-compat", + "unstable-msc4186", ] [target.'cfg(unix)'.dependencies] diff --git a/book/changelog.md b/book/changelog.md index 40a8abea..3729bfbb 100644 --- a/book/changelog.md +++ b/book/changelog.md @@ -87,8 +87,6 @@ This will be the first release of Grapevine since it was forked from Conduit * Instead, it is now possible to configure each cache capacity individually. 10. Remove jemalloc support. ([!93](https://gitlab.computer.surgery/matrix/grapevine/-/merge_requests/193)) -11. Removed support for MSC3575 (sliding sync), which has been closed. - ([!198](https://gitlab.computer.surgery/matrix/grapevine/-/merge_requests/198)) ### Changed diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index 4ad37425..a2ed2c86 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -5,6 +5,7 @@ use crate::{ service::rooms::timeline::PduCount, services, Error, PduEvent, Result, }; +pub(crate) mod msc4186; pub(crate) mod v3; fn load_timeline( diff --git a/src/api/client_server/sync/msc4186.rs b/src/api/client_server/sync/msc4186.rs new file mode 100644 index 00000000..ab5fa762 --- /dev/null +++ b/src/api/client_server/sync/msc4186.rs @@ -0,0 +1,1177 @@ +//! [MSC4186], aka Simplified Sliding Sync, aka Simplified [MSC3575], support +//! +//! [MSC4186]: https://github.com/matrix-org/matrix-spec-proposals/pull/4186 +//! [MSC3575]: https://github.com/matrix-org/matrix-spec-proposals/pull/3575 + +use std::{ + collections::{BTreeMap, BTreeSet, HashSet}, + time::Duration, +}; + +use ruma::{ + api::client::{ + sync::sync_events::{ + self, + v5::{request::ListFilters, response::Hero}, + DeviceLists, UnreadNotificationsCount, + }, + uiaa::UiaaResponse, + }, + events::{ + direct::DirectEventContent, + receipt::ReceiptEventContent, + room::{ + create::RoomCreateEventContent, + encryption::PossiblyRedactedRoomEncryptionEventContent, + member::{MembershipState, RoomMemberEventContent}, + }, + AnyStrippedStateEvent, AnySyncEphemeralRoomEvent, + PossiblyRedactedStateEventContent, StateEventType, StrippedStateEvent, + SyncEphemeralRoomEvent, TimelineEventType, + }, + room::RoomType, + serde::Raw, + uint, JsOption, OwnedRoomId, RoomId, UInt, UserId, +}; +use serde::de::DeserializeOwned; +use tracing::{debug, error, field, trace, warn}; + +use super::{load_timeline, share_encrypted_room}; +use crate::{ + service::{ + account_data, + rooms::{ + short::ShortStateHash, state::ExtractType, timeline::PduCount, + }, + users::ConnectionKey, + }, + services, utils, Ar, Error, Ra, Result, +}; + +#[derive(Debug)] +enum RequiredStateKeys { + All, + Selected(BTreeSet), +} +impl RequiredStateKeys { + fn merge(&mut self, key: String) { + match self { + RequiredStateKeys::All => { + // nothing to do, we're already getting all keys + } + RequiredStateKeys::Selected(keys) => { + if key == "*" { + *self = RequiredStateKeys::All; + } else { + keys.insert(key); + } + } + } + } +} + +#[derive(Debug)] +struct RequiredState { + /// Indicates that a `("*", "*")` tuple was present in `required_state`. + /// When `true`, all state events are sent by default, except for state + /// event types that are present in `filters`, for which only the + /// request state keys are sent. + all_events: bool, + filters: BTreeMap, +} +impl RequiredState { + fn update( + &mut self, + required_state: Vec<(StateEventType, String)>, + sender_user: &UserId, + ) { + let contains_wildcard = required_state + .iter() + .any(|(typ, key)| typ.to_string() == "*" && key == "*"); + + let mut old_filters = None; + if contains_wildcard { + if self.all_events { + // filters already contains existing negative filters, remember + // them and only apply new filters that were + // already there previously + old_filters = Some(std::mem::take(&mut self.filters)); + } else { + // clear existing positive filters + self.filters = BTreeMap::new(); + } + + self.all_events = true; + } else if self.all_events { + // all events were requested previously, don't add any additional + // positive filters + return; + } + + for (typ, mut key) in required_state { + if typ.to_string() == "*" { + continue; + } + if key == "$ME" { + key = sender_user.to_string(); + } + + if let Some(old_filters) = old_filters.as_mut() { + // re-insert the old negative filter if it matches the new + // negative filter exactly + if let Some(old_filter) = old_filters.remove(&typ) { + if let RequiredStateKeys::Selected(state_keys) = &old_filter + { + if state_keys.len() == 1 && state_keys.contains(&key) { + self.filters.insert(typ, old_filter); + } + } + } + } else { + // add the key to the filter for this event type + self.filters + .entry(typ) + .or_insert_with(|| { + RequiredStateKeys::Selected(BTreeSet::new()) + }) + .merge(key); + } + } + } + + fn matches(&self, typ: &StateEventType, key: &str) -> bool { + match self.filters.get(typ) { + Some(keys) => match keys { + RequiredStateKeys::All => true, + RequiredStateKeys::Selected(keys) => keys.contains(key), + }, + None => self.all_events, + } + } +} + +#[derive(Debug)] +struct TodoRoom { + required_state: RequiredState, + timeline_limit: u64, + roomsince: u64, +} +impl TodoRoom { + fn update( + &mut self, + required_state: Vec<(StateEventType, String)>, + timeline_limit: UInt, + known_rooms: &BTreeMap, + room_id: &RoomId, + sender_user: &UserId, + ) { + self.required_state.update(required_state, sender_user); + + self.timeline_limit = + self.timeline_limit.max(u64::from(timeline_limit).min(100)); + // 0 means unknown because it got out of date + self.roomsince = + self.roomsince.min(known_rooms.get(room_id).copied().unwrap_or(0)); + } +} +impl Default for TodoRoom { + fn default() -> Self { + Self { + required_state: RequiredState { + all_events: false, + filters: BTreeMap::new(), + }, + timeline_limit: 0, + roomsince: u64::MAX, + } + } +} + +fn is_dm_room(user: &UserId, room: &RoomId) -> Result { + let Some(event) = + services().account_data.get_global::(user)? + else { + return Ok(false); + }; + + let event = event + .deserialize() + .map_err(|_| Error::bad_database("Invalid m.direct event"))?; + + Ok(event.values().flatten().any(|r| r == room)) +} + +fn is_encrypted_room(current_shortstatehash: ShortStateHash) -> Result { + Ok(services() + .rooms + .state_accessor + .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")? + .is_some()) +} + +fn get_invite_state( + invite_state: &[Raw], +) -> Option> +where + T: PossiblyRedactedStateEventContent + DeserializeOwned, +{ + invite_state + .iter() + .find_map(|ev| ev.deserialize_as::>().ok()) +} + +#[derive(Debug)] +struct RoomData { + id: OwnedRoomId, + current_shortstatehash: ShortStateHash, + is_dm: bool, + is_encrypted: bool, + is_invite: bool, + room_type: Option, +} + +impl RoomData { + #[tracing::instrument] + fn new( + id: OwnedRoomId, + user: &UserId, + invite_state: Option<&[Raw]>, + ) -> Result { + let current_shortstatehash = services() + .rooms + .state + .get_room_shortstatehash(&id)? + .ok_or_else(|| Error::bad_database("Room has no state"))?; + + let room_type = if let Some(invite_state) = &invite_state { + get_invite_state::(invite_state) + .and_then(|e| e.content.room_type) + } else { + services().rooms.state.get_create_content::(&id)? + }; + + let is_dm = match is_dm_room(user, &id) { + Ok(x) => x, + Err(error) => { + error!(%error, %user, "Invalid m.direct account data event"); + false + } + }; + let is_encrypted = if let Some(invite_state) = &invite_state { + get_invite_state::( + invite_state, + ) + .is_some() + } else { + is_encrypted_room(current_shortstatehash)? + }; + let is_invite = invite_state.is_some(); + + Ok(Self { + id, + current_shortstatehash, + is_dm, + is_encrypted, + is_invite, + room_type, + }) + } + + #[tracing::instrument(skip(self), fields(room_id = self.id.as_str()))] + fn matches_filter(&self, filter_data: &ListFilters) -> Result { + if let Some(is_dm) = filter_data.is_dm { + if self.is_dm != is_dm { + return Ok(false); + } + } + if let Some(is_encrypted) = filter_data.is_encrypted { + if self.is_encrypted != is_encrypted { + return Ok(false); + } + } + if let Some(is_invite) = filter_data.is_invite { + if self.is_invite != is_invite { + return Ok(false); + } + } + + let room_type = self.room_type.clone().into(); + if filter_data.not_room_types.contains(&room_type) { + return Ok(false); + } + if !filter_data.room_types.is_empty() + && !filter_data.room_types.contains(&room_type) + { + return Ok(false); + } + + Ok(true) + } +} + +#[tracing::instrument(skip_all)] +fn joined_rooms_data(sender_user: &UserId) -> Vec { + services() + .rooms + .state_cache + .rooms_joined(sender_user) + .filter_map(Result::ok) + .filter_map(move |id| { + RoomData::new(id.clone(), sender_user, None) + .inspect_err(|error| { + error!(%error, room_id = %id, "Failed to get data for room, skipping"); + }) + .ok() + }).collect() +} + +#[tracing::instrument(skip_all)] +fn invited_rooms_data(sender_user: &UserId) -> Vec { + services() + .rooms + .state_cache + .rooms_invited(sender_user) + .filter_map(Result::ok) + .filter_map(move |(id, invite_state)| { + RoomData::new(id.clone(), sender_user, Some(&invite_state)) + .inspect_err(|error| { + error!( + %error, room_id = %id, "Failed to get data for room, skipping" + ); + }) + .ok() + }) + .collect() +} + +#[allow(clippy::too_many_lines)] +#[tracing::instrument(skip_all, fields( + pos, + next_batch, + connection_id = ?body.conn_id, +))] +pub(crate) async fn sync_events_v5_route( + body: Ar, +) -> Result, Ra> { + let current_span = tracing::Span::current(); + + let sender_user = body.sender_user.expect("user is authenticated"); + let sender_device = body.sender_device.expect("user is authenticated"); + let body = body.body; + // Setup watchers, so if there's no response, we can wait for them + let watcher = services().globals.watch(&sender_user, &sender_device); + + let next_batch = services().globals.next_count()?; + current_span.record("next_batch", field::display(&next_batch)); + + current_span.record("pos", field::debug(&body.pos)); + let globalsince = + body.pos.as_ref().and_then(|string| string.parse().ok()).unwrap_or(0); + + let connection_key = ConnectionKey { + user: sender_user.clone(), + device: sender_device.clone(), + connection: body.conn_id.clone(), + }; + + if globalsince == 0 { + services().users.forget_sync_request_connection(&connection_key); + } + + let known_rooms = services() + .users + .get_rooms_in_connection(connection_key.clone(), globalsince); + + let all_joined_rooms = joined_rooms_data(&sender_user); + + #[allow(clippy::if_then_some_else_none)] + let e2ee = if body.extensions.e2ee.enabled == Some(true) { + Some(sync_events::v5::response::E2EE { + device_lists: get_e2ee_data( + &sender_user, + globalsince, + &all_joined_rooms, + ) + .await?, + // TODO: only include this field when it has changed + device_one_time_keys_count: services() + .users + .count_one_time_keys(&sender_user, &sender_device)?, + // Fallback keys are not yet supported + device_unused_fallback_key_types: None, + }) + } else { + None + }; + + let mut all_rooms = all_joined_rooms; + all_rooms.extend(invited_rooms_data(&sender_user)); + all_rooms.sort_by_key(|r| { + services().rooms.timeline.last_timeline_count(&sender_user, &r.id).ok() + }); + + let all_room_ids: Vec<_> = all_rooms.iter().map(|r| r.id.clone()).collect(); + let all_room_ids: Vec<_> = all_room_ids.iter().map(|id| &**id).collect(); + + let mut todo_rooms: BTreeMap = BTreeMap::new(); + + let lists = body + .lists + .into_iter() + .map(|(list_id, list)| { + let rooms = rooms_in_list( + &list_id, + list, + &all_rooms, + &all_room_ids, + &known_rooms, + &mut todo_rooms, + &sender_user, + ); + (list_id, rooms) + }) + .collect(); + + for (room_id, room) in &body.room_subscriptions { + if !services().rooms.metadata.exists(room_id)? { + warn!(room_id = room_id.as_str(), "Subscribed room does not exist"); + continue; + } + todo_rooms.entry(room_id.clone()).or_default().update( + room.required_state.clone(), + room.timeline_limit, + &known_rooms, + room_id, + &sender_user, + ); + } + + services().users.update_sync_known_rooms( + connection_key.clone(), + todo_rooms.keys().cloned().collect(), + globalsince, + next_batch, + ); + + let to_device = if body.extensions.to_device.enabled == Some(true) { + if let Some(until) = + body.extensions.to_device.since.and_then(|s| s.parse().ok()) + { + debug!(until, "Deleting to-device events"); + services().users.remove_to_device_events( + &sender_user, + &sender_device, + until, + )?; + } + + let events = services() + .users + .get_to_device_events(&sender_user, &sender_device)?; + if !events.is_empty() { + debug!( + events = utils::debug_slice_truncated(&events, 3), + "Got new to-device events" + ); + } + + Some(sync_events::v5::response::ToDevice { + events, + next_batch: next_batch.to_string(), + }) + } else { + None + }; + + let account_data = if body.extensions.account_data.enabled == Some(true) { + let global = services() + .account_data + .global_changes_since(&sender_user, globalsince)? + .into_iter() + .map(|(event_type, content)| { + account_data::raw_global_event_from_parts(&event_type, &content) + }) + .collect(); + + let mut rooms_account_data = BTreeMap::new(); + for (room_id, todo_room) in &todo_rooms { + let account_data: Vec<_> = services() + .account_data + .room_changes_since(&sender_user, room_id, todo_room.roomsince)? + .into_iter() + .map(|(event_type, content)| { + account_data::raw_room_event_from_parts( + &event_type, + &content, + ) + }) + .collect(); + if !account_data.is_empty() { + rooms_account_data.insert(room_id.clone(), account_data); + } + } + + Some(sync_events::v5::response::AccountData { + global, + rooms: rooms_account_data, + }) + } else { + None + }; + + #[allow(clippy::if_then_some_else_none)] + let receipts = if body.extensions.receipts.enabled == Some(true) { + let mut receipts = BTreeMap::new(); + for (room_id, todo_room) in &todo_rooms { + let mut event_content: BTreeMap<_, BTreeMap<_, BTreeMap<_, _>>> = + BTreeMap::new(); + for x in services() + .rooms + .edus + .read_receipt + .readreceipts_since(room_id, todo_room.roomsince) + { + let Ok((_user_id, _, edu)) = x else { + // invalid DB entry + continue; + }; + let Ok(edu) = edu.deserialize() else { + // invalid EDU JSON + continue; + }; + let AnySyncEphemeralRoomEvent::Receipt(edu) = edu else { + // wrong EDU type + continue; + }; + + // merge all receipt EDUs into one + for (event_id, receipts) in edu.content.0 { + let entry = event_content.entry(event_id).or_default(); + for (typ, receipts) in receipts { + let entry = entry.entry(typ).or_default(); + for (user, receipt) in receipts { + entry.insert(user, receipt); + } + } + } + } + + if !event_content.is_empty() { + let Ok(event) = Raw::new(&SyncEphemeralRoomEvent { + content: ReceiptEventContent(event_content), + }) else { + continue; + }; + receipts.insert(room_id.clone(), event); + } + } + + Some(sync_events::v5::response::Receipts { + rooms: receipts, + }) + } else { + None + }; + + let typing = if body.extensions.typing.enabled == Some(true) { + let mut typing = BTreeMap::new(); + for room_id in todo_rooms.keys() { + if services().rooms.edus.typing.last_typing_update(room_id).await? + > globalsince + { + let event = + services().rooms.edus.typing.typings_all(room_id).await?; + + let Ok(event) = Raw::new(&event) else { + continue; + }; + typing.insert(room_id.clone(), event); + } + } + Some(sync_events::v5::response::Typing { + rooms: typing, + }) + } else { + None + }; + + let mut rooms = BTreeMap::new(); + for (room_id, todo_room) in todo_rooms { + if let Some(room) = + process_room(&sender_user, &room_id, &todo_room, globalsince)? + { + rooms.insert(room_id.clone(), room); + } + } + + let extensions = sync_events::v5::response::Extensions { + to_device, + e2ee: e2ee.unwrap_or_default(), + account_data: account_data.unwrap_or_default(), + receipts: receipts.unwrap_or_default(), + typing: typing.unwrap_or_default(), + }; + + let extensions_empty = extensions + .to_device + .as_ref() + .is_none_or(|to_device| to_device.events.is_empty()) + && extensions.e2ee.device_lists.is_empty() + && extensions.account_data.is_empty() + && extensions.receipts.is_empty() + && extensions.typing.is_empty(); + + if rooms + .iter() + .all(|(_, r)| r.timeline.is_empty() && r.required_state.is_empty()) + && extensions_empty + { + // Hang a few seconds so requests are not spammed + // Stop hanging if new info arrives + let mut duration = body.timeout.unwrap_or(Duration::from_secs(30)); + if duration.as_secs() > 30 { + duration = Duration::from_secs(30); + } + match tokio::time::timeout(duration, watcher).await { + Ok(x) => x.expect("watcher should succeed"), + Err(error) => debug!(%error, "Timed out"), + }; + } + + Ok(Ra(sync_events::v5::Response { + txn_id: None, + pos: next_batch.to_string(), + lists, + rooms, + extensions, + })) +} + +#[allow(clippy::too_many_lines)] +#[tracing::instrument(skip_all)] +async fn get_e2ee_data( + sender_user: &UserId, + globalsince: u64, + all_joined_rooms: &[RoomData], +) -> Result { + // Users that have left any encrypted rooms the sender was in + let mut left_encrypted_users = HashSet::new(); + + // Look for device list updates of this account + let mut device_list_changes: HashSet<_> = services() + .users + .keys_changed(sender_user.as_ref(), globalsince, None) + .filter_map(Result::ok) + .collect(); + + for RoomData { + id: room_id, + current_shortstatehash, + is_encrypted, + .. + } in all_joined_rooms + { + let since_shortstatehash = services() + .rooms + .user + .get_token_shortstatehash(room_id, globalsince)?; + + let since_sender_member: Option = + since_shortstatehash + .and_then(|shortstatehash| { + services() + .rooms + .state_accessor + .state_get( + shortstatehash, + &StateEventType::RoomMember, + sender_user.as_str(), + ) + .transpose() + }) + .transpose()? + .and_then(|pdu| { + serde_json::from_str(pdu.content.get()) + .map_err(|_| { + Error::bad_database("Invalid PDU in database.") + }) + .ok() + }); + + if let Some(since_shortstatehash) = since_shortstatehash { + // Skip if there are only timeline changes + if since_shortstatehash == *current_shortstatehash { + continue; + } + + let since_encryption = services().rooms.state_accessor.state_get( + since_shortstatehash, + &StateEventType::RoomEncryption, + "", + )?; + + let joined_since_last_sync = + since_sender_member.is_none_or(|member| { + member.membership != MembershipState::Join + }); + + let new_encrypted_room = + *is_encrypted && since_encryption.is_none(); + if *is_encrypted { + let current_state_ids = services() + .rooms + .state_accessor + .state_full_ids(*current_shortstatehash) + .await?; + let since_state_ids = services() + .rooms + .state_accessor + .state_full_ids(since_shortstatehash) + .await?; + + for (key, event_id) in current_state_ids { + if since_state_ids.get(&key) != Some(&event_id) { + let Some(pdu) = + services().rooms.timeline.get_pdu(&event_id)? + else { + error!(%event_id, "Event in state not found"); + continue; + }; + if pdu.kind == TimelineEventType::RoomMember { + if let Some(state_key) = &pdu.state_key { + let user_id = UserId::parse(state_key.clone()) + .map_err(|_| { + Error::bad_database( + "Invalid UserId in member PDU.", + ) + })?; + + if user_id == sender_user { + continue; + } + + let new_membership = serde_json::from_str::< + RoomMemberEventContent, + >( + pdu.content.get() + ) + .map_err(|_| { + Error::bad_database( + "Invalid PDU in database.", + ) + })? + .membership; + + match new_membership { + MembershipState::Join => { + // A new user joined an encrypted + // room + if !share_encrypted_room( + sender_user, + &user_id, + room_id, + )? { + device_list_changes.insert(user_id); + } + } + MembershipState::Leave => { + // Write down users that have left + // encrypted rooms we are in + left_encrypted_users.insert(user_id); + } + _ => {} + } + } + } + } + } + if joined_since_last_sync || new_encrypted_room { + // If the user is in a new encrypted room, give them all + // joined users + device_list_changes.extend( + services() + .rooms + .state_cache + .room_members(room_id) + .flatten() + .filter(|user_id| { + // Don't send key updates from the sender to + // the sender + sender_user != user_id + }) + .filter(|user_id| { + // Only send keys if the sender doesn't + // share an encrypted room with the target + // already + !share_encrypted_room( + sender_user, + user_id, + room_id, + ) + .unwrap_or(false) + }), + ); + } + } + } + // Look for device list updates in this room + device_list_changes.extend( + services() + .users + .keys_changed(room_id.as_ref(), globalsince, None) + .filter_map(Result::ok), + ); + } + + let mut device_list_left = HashSet::new(); + for user_id in left_encrypted_users { + let dont_share_encrypted_room = services() + .rooms + .user + .get_shared_rooms(vec![sender_user.to_owned(), user_id.clone()])? + .filter_map(Result::ok) + .filter_map(|other_room_id| { + Some( + services() + .rooms + .state_accessor + .room_state_get( + &other_room_id, + &StateEventType::RoomEncryption, + "", + ) + .ok()? + .is_some(), + ) + }) + .all(|encrypted| !encrypted); + // If the user doesn't share an encrypted room with the target + // anymore, we need to tell them + if dont_share_encrypted_room { + device_list_left.insert(user_id); + } + } + + Ok(DeviceLists { + changed: device_list_changes.into_iter().collect(), + left: device_list_left.into_iter().collect(), + }) +} + +#[tracing::instrument( + skip_all, + fields(list_id = list_id, ?list), +)] +fn rooms_in_list( + list_id: &str, + list: sync_events::v5::request::List, + all_rooms: &[RoomData], + all_room_ids: &[&RoomId], + known_rooms: &BTreeMap, + todo_rooms: &mut BTreeMap, + sender_user: &UserId, +) -> sync_events::v5::response::List { + trace!(list_id, ?list, "Collecting rooms in list"); + + let matching_room_ids_buf: Vec<&RoomId>; + let matching_room_ids = if let Some(filters) = list.filters.as_ref() { + matching_room_ids_buf = all_rooms + .iter() + .filter_map(|r| { + match r.matches_filter(filters) { + Ok(pass) => pass.then_some(&*r.id), + Err(error) => { + warn!(%error, ?filters, room_id=r.id.as_str(), "Failed to evaluate list filter, skipping room"); + None + } + } + }) + .collect(); + matching_room_ids_buf.as_slice() + } else { + all_room_ids + }; + + if !matching_room_ids.is_empty() { + let mut list_room_ids: BTreeSet<&RoomId> = BTreeSet::new(); + for (from, to) in list.ranges { + let from = usize::try_from(from) + .unwrap_or(usize::MAX) + .clamp(0, matching_room_ids.len() - 1); + let to = usize::try_from(to) + .unwrap_or(usize::MAX) + .clamp(from, matching_room_ids.len() - 1); + list_room_ids.extend(&matching_room_ids[from..=to]); + } + for room_id in list_room_ids { + todo_rooms.entry(room_id.to_owned()).or_default().update( + list.room_details.required_state.clone(), + list.room_details.timeline_limit, + known_rooms, + room_id, + sender_user, + ); + } + } + + let num_rooms = matching_room_ids.len(); + trace!(list_id, num_rooms, "Done collecting rooms"); + + sync_events::v5::response::List { + count: UInt::try_from(num_rooms).unwrap_or(UInt::MAX), + } +} + +#[allow(clippy::too_many_lines)] +#[tracing::instrument(skip(sender_user))] +fn process_room( + sender_user: &UserId, + room_id: &RoomId, + todo_room: &TodoRoom, + globalsince: u64, +) -> Result> { + let roomsincecount = PduCount::Normal(todo_room.roomsince); + + let (timeline_pdus, limited) = load_timeline( + sender_user, + room_id, + roomsincecount, + todo_room.timeline_limit, + )?; + + if todo_room.roomsince != 0 && timeline_pdus.is_empty() { + return Ok(None); + } + + let prev_batch = timeline_pdus + .first() + .map(|(pdu_count, _)| match pdu_count { + PduCount::Backfilled(_) => { + error!("Timeline in backfill state?!"); + "0".to_owned() + } + PduCount::Normal(c) => c.to_string(), + }) + .or_else(|| { + (todo_room.roomsince != 0).then(|| todo_room.roomsince.to_string()) + }); + + // TODO: consider only message-like PDUs here, rather than all PDUs + let bump_stamp = match services() + .rooms + .timeline + .last_timeline_count(sender_user, room_id)? + { + PduCount::Backfilled(n) | PduCount::Normal(n) => { + Some(UInt::new_saturating(n)) + } + }; + + let num_live = Some( + timeline_pdus + .iter() + .filter(|(pdu_count, _)| match pdu_count { + // TODO check logic + PduCount::Backfilled(_) => false, + PduCount::Normal(pdu_count) => { + if globalsince == 0 { + false + } else { + *pdu_count > globalsince + } + } + }) + .count() + .try_into() + .unwrap_or(UInt::MAX), + ); + + let room_events: Vec<_> = + timeline_pdus.iter().map(|(_, pdu)| pdu.to_sync_room_event()).collect(); + + let Some(current_shortstatehash) = + services().rooms.state.get_room_shortstatehash(room_id)? + else { + error!(%room_id, "Room has no state"); + return Ok(None); + }; + + // TODO: invalidate current_shortstatehash and send down all(?) state events + // if effective requested required_state changes between requests: + // + // > If new entries are added to required_state then the server must send + // > down matching current state events. + + let need_scan = todo_room.required_state.all_events + || todo_room + .required_state + .filters + .iter() + .any(|(_, keys)| matches!(keys, RequiredStateKeys::All)); + let required_state = if need_scan { + let full_state = services() + .rooms + .state_compressor + .load_shortstatehash_info(current_shortstatehash)? + .pop() + .expect("there is always one layer") + .full_state; + full_state + .iter() + .filter_map(|compressed| { + let Ok((typ, key)) = services() + .rooms + .short + .get_statekey_from_short(compressed.state) + else { + warn!( + ?compressed, + "Failed to get info for shortstatekey, skipping" + ); + return None; + }; + + if !todo_room.required_state.matches(&typ, &key) { + return None; + } + + let shorteventid = compressed.event; + let pdu = match services() + .rooms + .short + .get_eventid_from_short(shorteventid) + { + Ok(event_id) => { + services().rooms.timeline.get_pdu(&event_id) + } + Err(error) => { + warn!( + %error, + %typ, + key, + ?shorteventid, + "Failed to get event ID from short event ID" + ); + return None; + } + }; + match pdu { + Ok(Some(pdu)) => Some(pdu.to_sync_state_event()), + Ok(None) => None, + Err(error) => { + warn!(%error, %typ, key, "Failed to get state PDU"); + None + } + } + }) + .collect() + } else { + todo_room + .required_state + .filters + .iter() + .flat_map(|(typ, keys)| { + let RequiredStateKeys::Selected(keys) = keys else { + panic!( + "wildcard key should have triggered a full state scan" + ); + }; + keys.iter().filter_map(move |key| { + match services().rooms.state_accessor.state_get( + current_shortstatehash, + typ, + key, + ) { + Ok(Some(pdu)) => Some(pdu.to_sync_state_event()), + Ok(None) => None, + Err(error) => { + warn!(%error, %typ, key, "Failed to get state PDU"); + None + } + } + }) + }) + .collect() + }; + + let name = services().rooms.state_accessor.get_name(room_id)?; + let heroes = name.is_none().then(|| { + services() + .rooms + .state_cache + .room_members(room_id) + .filter_map(Result::ok) + .filter(|member| member != sender_user) + .filter_map(|member| { + services() + .rooms + .state_accessor + .get_member(room_id, &member) + .ok() + .flatten() + .map(|memberevent| Hero { + user_id: member, + name: memberevent.displayname, + avatar: memberevent.avatar_url, + }) + }) + .take(5) + .collect::>() + }); + + let room = sync_events::v5::response::Room { + name, + avatar: match services().rooms.state_accessor.get_avatar(room_id)? { + JsOption::Some(avatar) => JsOption::from_option(avatar.url), + JsOption::Null => JsOption::Null, + JsOption::Undefined => JsOption::Undefined, + }, + initial: Some(todo_room.roomsince == 0), + is_dm: None, + invite_state: None, + unread_notifications: UnreadNotificationsCount { + highlight_count: Some( + services() + .rooms + .user + .highlight_count(sender_user, room_id)? + .try_into() + .expect("notification count can't go that high"), + ), + notification_count: Some( + services() + .rooms + .user + .notification_count(sender_user, room_id)? + .try_into() + .expect("notification count can't go that high"), + ), + }, + timeline: room_events, + required_state, + prev_batch, + limited, + joined_count: Some( + services() + .rooms + .state_cache + .room_joined_count(room_id)? + .map(UInt::new_saturating) + .unwrap_or(uint!(0)), + ), + invited_count: Some( + services() + .rooms + .state_cache + .room_invited_count(room_id)? + .map(UInt::new_saturating) + .unwrap_or(uint!(0)), + ), + num_live, + bump_stamp, + heroes, + }; + trace!(?room, "Built room data"); + + Ok(Some(room)) +} diff --git a/src/api/client_server/unversioned.rs b/src/api/client_server/unversioned.rs index 92ea88ed..5605f53c 100644 --- a/src/api/client_server/unversioned.rs +++ b/src/api/client_server/unversioned.rs @@ -32,6 +32,7 @@ pub(crate) async fn get_supported_versions_route( unstable_features: BTreeMap::from_iter([ ("org.matrix.e2e_cross_signing".to_owned(), true), ("org.matrix.msc3916.stable".to_owned(), true), + ("org.matrix.simplified_msc3575".to_owned(), true), ]), }; diff --git a/src/cli/serve.rs b/src/cli/serve.rs index f0e0f969..be781fe5 100644 --- a/src/cli/serve.rs +++ b/src/cli/serve.rs @@ -651,6 +651,7 @@ fn client_routes() -> Router { .ruma_route(c2s::get_state_events_route) .ruma_route(c2s::get_state_events_for_key_route) .ruma_route(c2s::v3::sync_events_route) + .ruma_route(c2s::msc4186::sync_events_v5_route) .ruma_route(c2s::get_context_route) .ruma_route(c2s::get_message_events_route) .ruma_route(c2s::search_events_route) diff --git a/src/database/key_value/account_data.rs b/src/database/key_value/account_data.rs index 49231a2b..101986f8 100644 --- a/src/database/key_value/account_data.rs +++ b/src/database/key_value/account_data.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use ruma::{api::client::error::ErrorKind, RoomId, UserId}; use serde::Deserialize; use serde_json::value::RawValue; +use tracing::error; use crate::{ database::KeyValueDatabase, service, services, utils, Error, Result, @@ -16,16 +17,6 @@ impl service::account_data::Data for KeyValueDatabase { event_type: &str, data: &RawValue, ) -> Result<()> { - // Allowed because we just use this type to validate the schema, and - // don't read the fields. - #[allow(dead_code)] - #[derive(Deserialize)] - struct ExtractEventFields<'a> { - #[serde(rename = "type")] - event_type: &'a str, - content: &'a RawValue, - } - let mut prefix = room_id .map(ToString::to_string) .unwrap_or_default() @@ -44,11 +35,41 @@ impl service::account_data::Data for KeyValueDatabase { let mut key = prefix; key.extend_from_slice(event_type.as_bytes()); - if serde_json::from_str::>(data.get()).is_err() { - return Err(Error::BadRequest( - ErrorKind::InvalidParam, - "Account data doesn't have all required fields.", - )); + { + #[derive(Deserialize)] + struct ExtractEventFields<'a> { + #[serde(rename = "type")] + event_type: &'a str, + // Allowed because we just use this type to validate the schema + // and event type, and don't extract the content. + #[allow(dead_code)] + content: &'a RawValue, + } + + let Ok(ExtractEventFields { + event_type: serialised_event_type, + .. + }) = serde_json::from_str(data.get()) + else { + return Err(Error::BadRequest( + ErrorKind::InvalidParam, + "Account data doesn't have all required fields.", + )); + }; + + if serialised_event_type != event_type { + error!( + %user_id, + ?room_id, + event_type, + serialised_event_type, + "Mismatch between discrete and serialised account data event type" + ); + return Err(Error::BadRequest( + ErrorKind::InvalidParam, + "Account data event type mismatch.", + )); + } } self.roomuserdataid_accountdata diff --git a/src/service/rooms/short.rs b/src/service/rooms/short.rs index ce4d207c..35cef167 100644 --- a/src/service/rooms/short.rs +++ b/src/service/rooms/short.rs @@ -9,27 +9,56 @@ use crate::{ }; macro_rules! short_id_type { - ($name:ident) => { - #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] - #[repr(transparent)] - pub(crate) struct $name(u64); + ($($(#[$doc:meta])* struct $name:ident(u64);)*) => { + $( + #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] + #[repr(transparent)] + $(#[$doc])* + pub(crate) struct $name(u64); - impl $name { - pub(crate) fn new(id: u64) -> Self { - Self(id) - } + impl $name { + pub(crate) fn new(id: u64) -> Self { + Self(id) + } - pub(crate) fn get(&self) -> u64 { - self.0 + pub(crate) fn get(&self) -> u64 { + self.0 + } } - } + )* }; } -short_id_type!(ShortRoomId); -short_id_type!(ShortEventId); -short_id_type!(ShortStateHash); -short_id_type!(ShortStateKey); +short_id_type!( + /// Interned [RoomId]. + /// + /// Created using [`get_shortroomid()`](Service::get_shortroomid) or + /// [`get_or_create_shortroomid()`](Service::get_or_create_shortroomid). + struct ShortRoomId(u64); + /// Interned [EventId]. + /// + /// Created using + /// [`get_or_create_shorteventid()`](Service::get_or_create_shorteventid), + /// resolved using + /// [`get_eventid_from_short()`](Service::get_eventid_from_short). + struct ShortEventId(u64); + /// Interned hash of concatenated state events. + /// + /// Equal state sets do not necessarily correspond to equal short state + /// hashes, because the calculated hash is dependent on `HashSet` + /// iteration order. + /// + /// Created using + /// [`get_or_create_shortstatehash()`](Service::get_or_create_shortstatehash). + struct ShortStateHash(u64); + /// Interned `(event type, state key)` tuple. + /// + /// Created using [`get_shortstatekey()`](Service::get_shortstatekey) or + /// [`get_or_create_shortstatekey()`](Service::get_or_create_shortstatekey), + /// resolved using + /// [`get_statekey_from_short()`](Service::get_statekey_from_short). + struct ShortStateKey(u64); +); mod data; diff --git a/src/service/rooms/state_accessor.rs b/src/service/rooms/state_accessor.rs index 78af94ed..3c859a01 100644 --- a/src/service/rooms/state_accessor.rs +++ b/src/service/rooms/state_accessor.rs @@ -7,6 +7,7 @@ use lru_cache::LruCache; use ruma::{ events::{ room::{ + avatar::RoomAvatarEventContent, history_visibility::{ HistoryVisibility, RoomHistoryVisibilityEventContent, }, @@ -17,8 +18,8 @@ use ruma::{ StateEventType, }, state_res::Event, - EventId, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, ServerName, - UserId, + EventId, JsOption, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, + ServerName, UserId, }; use serde_json::value::to_raw_value; use tracing::{error, warn}; @@ -507,6 +508,23 @@ impl Service { ) } + #[tracing::instrument(skip(self))] + pub(crate) fn get_avatar( + &self, + room_id: &RoomId, + ) -> Result> { + self.room_state_get(room_id, &StateEventType::RoomAvatar, "")?.map_or( + Ok(JsOption::Undefined), + |s| { + serde_json::from_str(s.content.get()).map_err(|_| { + Error::bad_database( + "Invalid room avatar event in database.", + ) + }) + }, + ) + } + // Allowed because this function uses `services()` #[allow(clippy::unused_self)] #[tracing::instrument(skip(self), ret(level = "trace"))] @@ -535,6 +553,24 @@ impl Service { .is_ok() } + #[tracing::instrument(skip(self))] + pub(crate) fn get_member( + &self, + room_id: &RoomId, + user_id: &UserId, + ) -> Result> { + self.room_state_get( + room_id, + &StateEventType::RoomMember, + user_id.as_str(), + )? + .map_or(Ok(None), |s| { + serde_json::from_str(s.content.get()).map_err(|_| { + Error::bad_database("Invalid room member event in database.") + }) + }) + } + /// Checks if a given user can redact a given event /// /// If `federation` is `true`, it allows redaction events from any user of diff --git a/src/service/users.rs b/src/service/users.rs index dbacb715..192b0597 100644 --- a/src/service/users.rs +++ b/src/service/users.rs @@ -1,4 +1,8 @@ -use std::{collections::BTreeMap, mem}; +use std::{ + collections::{BTreeMap, BTreeSet}, + mem, + sync::{Arc, Mutex}, +}; use ruma::{ api::client::{device::Device, filter::FilterDefinition}, @@ -6,7 +10,7 @@ use ruma::{ events::AnyToDeviceEvent, serde::Raw, DeviceId, OneTimeKeyAlgorithm, OneTimeKeyName, OwnedDeviceId, OwnedKeyId, - OwnedMxcUri, OwnedOneTimeKeyId, OwnedUserId, UInt, UserId, + OwnedMxcUri, OwnedOneTimeKeyId, OwnedRoomId, OwnedUserId, UInt, UserId, }; use crate::{services, Error, Result}; @@ -15,22 +19,133 @@ mod data; pub(crate) use data::Data; +#[derive(Debug)] +struct KnownRooms { + /// The `pos` value of the request that these `room_since` values apply to + pos: u64, + /// `since` values for rooms that have been sent previously + room_since: BTreeMap, +} + +#[derive(Debug, Default)] +pub(crate) struct SlidingSyncCache { + /// `since` values for rooms in the current/previous request. Needed in + /// case the response doesn't arrive and the client requests the same + /// `pos` value again. + current_known_rooms: Option, + /// Overlay on top of `current_known_rooms` of `since` values for rooms in + /// the expected next request (where the `pos` value should be the + /// `pos` value from our response). + next_known_rooms: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub(crate) struct ConnectionKey { + pub(crate) user: OwnedUserId, + pub(crate) device: OwnedDeviceId, + pub(crate) connection: Option, +} + pub(crate) struct Service { pub(crate) db: &'static dyn Data, + #[allow(clippy::type_complexity)] + pub(crate) connections: + Mutex>>>, } impl Service { pub(crate) fn new(db: &'static dyn Data) -> Self { Self { db, + connections: Mutex::new(BTreeMap::new()), } } + fn get_cache_entry( + &self, + key: ConnectionKey, + ) -> Arc> { + let mut cache = self.connections.lock().unwrap(); + Arc::clone(cache.entry(key).or_default()) + } + /// Check if a user has an account on this homeserver. pub(crate) fn exists(&self, user_id: &UserId) -> Result { self.db.exists(user_id) } + pub(crate) fn forget_sync_request_connection( + &self, + connection_key: &ConnectionKey, + ) { + self.connections.lock().unwrap().remove(connection_key); + } + + #[tracing::instrument(skip(self))] + pub(crate) fn get_rooms_in_connection( + &self, + connection_key: ConnectionKey, + pos: u64, + ) -> BTreeMap { + let cached = self.get_cache_entry(connection_key); + let mut cached = cached.lock().unwrap(); + let cached = &mut *cached; + + let current_known_rooms = + cached.current_known_rooms.get_or_insert(KnownRooms { + pos, + room_since: BTreeMap::new(), + }); + + if current_known_rooms.pos == pos { + // Another request for a previous `pos` value, invalidate the next + // result + cached.next_known_rooms = None; + } else if let Some(next_known_rooms) = + cached.next_known_rooms.take().filter(|x| x.pos == pos) + { + // Merge overlay into current_known_rooms + current_known_rooms.pos = next_known_rooms.pos; + current_known_rooms.room_since.extend(next_known_rooms.room_since); + } else { + // Not a repeated request, and we don't have calculated values for a + // next request, start over + *current_known_rooms = KnownRooms { + pos, + room_since: BTreeMap::new(), + }; + } + + current_known_rooms.room_since.clone() + } + + #[tracing::instrument(skip(self))] + pub(crate) fn update_sync_known_rooms( + &self, + connection_key: ConnectionKey, + new_cached_rooms: BTreeSet, + pos: u64, + next_batch: u64, + ) { + let cached = self.get_cache_entry(connection_key); + let mut cached = cached.lock().unwrap(); + + assert_eq!( + cached.current_known_rooms.as_ref().map(|x| x.pos), + Some(pos), + "current known rooms should match current request's pos" + ); + + // Add an overlay to the current request's known rooms + cached.next_known_rooms = Some(KnownRooms { + pos: next_batch, + room_since: new_cached_rooms + .into_iter() + .map(|room_id| (room_id, next_batch)) + .collect(), + }); + } + /// Check if account is deactivated pub(crate) fn is_deactivated(&self, user_id: &UserId) -> Result { self.db.is_deactivated(user_id)