diff --git a/src/api/client_server/sync/msc4186.rs b/src/api/client_server/sync/msc4186.rs index 41785beb..f4e579bf 100644 --- a/src/api/client_server/sync/msc4186.rs +++ b/src/api/client_server/sync/msc4186.rs @@ -111,269 +111,30 @@ pub(crate) async fn sync_events_v5_route( )?; } - // Users that have left any encrypted rooms the sender was in - let mut left_encrypted_users = HashSet::new(); - let mut device_list_changes = HashSet::new(); - let mut device_list_left = HashSet::new(); + #[allow(clippy::if_then_some_else_none)] + let device_lists = if body.extensions.e2ee.enabled.unwrap_or(false) { + Some(get_e2ee_data(&sender_user, globalsince, &all_joined_rooms).await?) + } else { + None + }; - if body.extensions.e2ee.enabled.unwrap_or(false) { - // Look for device list updates of this account - device_list_changes.extend( - services() - .users - .keys_changed(sender_user.as_ref(), globalsince, None) - .filter_map(Result::ok), - ); - - for room_id in &all_joined_rooms { - let Some(current_shortstatehash) = - services().rooms.state.get_room_shortstatehash(room_id)? - else { - error!(%room_id, "Room has no state"); - continue; - }; - - let since_shortstatehash = services() - .rooms - .user - .get_token_shortstatehash(room_id, globalsince)?; - - let since_sender_member: Option = - since_shortstatehash - .and_then(|shortstatehash| { - services() - .rooms - .state_accessor - .state_get( - shortstatehash, - &StateEventType::RoomMember, - sender_user.as_str(), - ) - .transpose() - }) - .transpose()? - .and_then(|pdu| { - serde_json::from_str(pdu.content.get()) - .map_err(|_| { - Error::bad_database("Invalid PDU in database.") - }) - .ok() - }); - - let encrypted_room = services() - .rooms - .state_accessor - .state_get( - current_shortstatehash, - &StateEventType::RoomEncryption, - "", - )? - .is_some(); - - if let Some(since_shortstatehash) = since_shortstatehash { - // Skip if there are only timeline changes - if since_shortstatehash == current_shortstatehash { - continue; - } - - let since_encryption = - services().rooms.state_accessor.state_get( - since_shortstatehash, - &StateEventType::RoomEncryption, - "", - )?; - - let joined_since_last_sync = - since_sender_member.is_none_or(|member| { - member.membership != MembershipState::Join - }); - - let new_encrypted_room = - encrypted_room && since_encryption.is_none(); - if encrypted_room { - let current_state_ids = services() - .rooms - .state_accessor - .state_full_ids(current_shortstatehash) - .await?; - let since_state_ids = services() - .rooms - .state_accessor - .state_full_ids(since_shortstatehash) - .await?; - - for (key, event_id) in current_state_ids { - if since_state_ids.get(&key) != Some(&event_id) { - let Some(pdu) = - services().rooms.timeline.get_pdu(&event_id)? - else { - error!(%event_id, "Event in state not found"); - continue; - }; - if pdu.kind == TimelineEventType::RoomMember { - if let Some(state_key) = &pdu.state_key { - let user_id = - UserId::parse(state_key.clone()) - .map_err(|_| { - Error::bad_database( - "Invalid UserId in member \ - PDU.", - ) - })?; - - if user_id == sender_user { - continue; - } - - let new_membership = - serde_json::from_str::< - RoomMemberEventContent, - >( - pdu.content.get() - ) - .map_err(|_| { - Error::bad_database( - "Invalid PDU in database.", - ) - })? - .membership; - - match new_membership { - MembershipState::Join => { - // A new user joined an encrypted - // room - if !share_encrypted_room( - &sender_user, - &user_id, - room_id, - )? { - device_list_changes - .insert(user_id); - } - } - MembershipState::Leave => { - // Write down users that have left - // encrypted rooms we are in - left_encrypted_users - .insert(user_id); - } - _ => {} - } - } - } - } - } - if joined_since_last_sync || new_encrypted_room { - // If the user is in a new encrypted room, give them all - // joined users - device_list_changes.extend( - services() - .rooms - .state_cache - .room_members(room_id) - .flatten() - .filter(|user_id| { - // Don't send key updates from the sender to - // the sender - &sender_user != user_id - }) - .filter(|user_id| { - // Only send keys if the sender doesn't - // share an encrypted room with the target - // already - !share_encrypted_room( - &sender_user, - user_id, - room_id, - ) - .unwrap_or(false) - }), - ); - } - } - } - // Look for device list updates in this room - device_list_changes.extend( - services() - .users - .keys_changed(room_id.as_ref(), globalsince, None) - .filter_map(Result::ok), - ); - } - for user_id in left_encrypted_users { - let dont_share_encrypted_room = services() - .rooms - .user - .get_shared_rooms(vec![sender_user.clone(), user_id.clone()])? - .filter_map(Result::ok) - .filter_map(|other_room_id| { - Some( - services() - .rooms - .state_accessor - .room_state_get( - &other_room_id, - &StateEventType::RoomEncryption, - "", - ) - .ok()? - .is_some(), - ) - }) - .all(|encrypted| !encrypted); - // If the user doesn't share an encrypted room with the target - // anymore, we need to tell them - if dont_share_encrypted_room { - device_list_left.insert(user_id); - } - } - } - - let mut lists = BTreeMap::new(); // and required state let mut todo_rooms: BTreeMap = BTreeMap::new(); - for (list_id, list) in body.lists { - trace!(list_id, ?list, "Collecting rooms in list"); - - if list.filters.and_then(|f| f.is_invite).unwrap_or(false) { - continue; - } - - let mut list_room_ids = BTreeSet::new(); - for (mut from, mut to) in list.ranges { - from = from.clamp( - uint!(0), - UInt::try_from(all_joined_rooms.len() - 1).unwrap_or(UInt::MAX), - ); - to = to.clamp( - from, - UInt::try_from(all_joined_rooms.len() - 1).unwrap_or(UInt::MAX), - ); - let room_ids = - all_joined_rooms[from.try_into().unwrap_or(usize::MAX) - ..=to.try_into().unwrap_or(usize::MAX)] - .to_vec(); - list_room_ids.extend(room_ids); - } - for room_id in &list_room_ids { - todo_rooms.entry(room_id.clone()).or_default().update( - list.room_details.required_state.clone(), - list.room_details.timeline_limit, + let lists = body + .lists + .into_iter() + .filter_map(|(list_id, list)| { + let rooms = rooms_in_list( + &list_id, + list, + &all_joined_rooms, &known_rooms, - room_id, - ); - } - let num_rooms = list_room_ids.len(); - trace!(list_id, num_rooms, "Done collecting rooms"); - - lists.insert( - list_id.clone(), - sync_events::v5::response::List { - count: UInt::try_from(num_rooms).unwrap_or(UInt::MAX), - }, - ); - } + &mut todo_rooms, + )?; + Some((list_id, rooms)) + }) + .collect(); for (room_id, room) in &body.room_subscriptions { if !services().rooms.metadata.exists(room_id)? { @@ -395,160 +156,10 @@ pub(crate) async fn sync_events_v5_route( ); let mut rooms = BTreeMap::new(); - for (room_id, todo_room) in &todo_rooms { - trace!( - room_id = room_id.as_str(), - ?todo_room, - "Processing matched room" - ); - let roomsincecount = PduCount::Normal(todo_room.roomsince); - - let (timeline_pdus, limited) = load_timeline( - &sender_user, - room_id, - roomsincecount, - todo_room.timeline_limit, - )?; - - if todo_room.roomsince != 0 && timeline_pdus.is_empty() { - trace!("No new timeline events, skipping"); - continue; + for (room_id, todo_room) in todo_rooms { + if let Some(room) = process_room(&sender_user, &room_id, &todo_room)? { + rooms.insert(room_id.clone(), room); } - - let prev_batch = timeline_pdus - .first() - .map(|(pdu_count, _)| match pdu_count { - PduCount::Backfilled(_) => { - error!("Timeline in backfill state?!"); - "0".to_owned() - } - PduCount::Normal(c) => c.to_string(), - }) - .or_else(|| { - (todo_room.roomsince != 0) - .then(|| todo_room.roomsince.to_string()) - }); - - let room_events: Vec<_> = timeline_pdus - .iter() - .map(|(_, pdu)| pdu.to_sync_room_event()) - .collect(); - - let required_state = todo_room - .required_state_request - .iter() - .filter_map(|state| { - services() - .rooms - .state_accessor - .room_state_get(room_id, &state.0, &state.1) - .ok() - .flatten() - .map(|state| state.to_sync_state_event()) - }) - .collect(); - - // Heroes - let heroes = services() - .rooms - .state_cache - .room_members(room_id) - .filter_map(Result::ok) - .filter(|member| member != &sender_user) - .filter_map(|member| { - services() - .rooms - .state_accessor - .get_member(room_id, &member) - .ok() - .flatten() - .map(|memberevent| { - ( - memberevent - .displayname - .unwrap_or_else(|| member.to_string()), - memberevent.avatar_url, - ) - }) - }) - .take(5) - .collect::>(); - let name = match &*heroes { - [] => None, - [(only, _)] => Some(only.clone()), - [firsts @ .., (last, _)] => Some({ - let firsts = firsts - .iter() - .map(|(name, _)| name.clone()) - .collect::>() - .join(", "); - - format!("{firsts} and {last}") - }), - }; - - let room = sync_events::v5::response::Room { - name: services().rooms.state_accessor.get_name(room_id)?.or(name), - avatar: if let [(_name, Some(avatar))] = &*heroes { - JsOption::Some(avatar.clone()) - } else { - match services().rooms.state_accessor.get_avatar(room_id)? { - JsOption::Some(avatar) => JsOption::from_option(avatar.url), - JsOption::Null => JsOption::Null, - JsOption::Undefined => JsOption::Undefined, - } - }, - initial: Some(todo_room.roomsince == 0), - is_dm: None, - invite_state: None, - unread_notifications: UnreadNotificationsCount { - highlight_count: Some( - services() - .rooms - .user - .highlight_count(&sender_user, room_id)? - .try_into() - .expect("notification count can't go that high"), - ), - notification_count: Some( - services() - .rooms - .user - .notification_count(&sender_user, room_id)? - .try_into() - .expect("notification count can't go that high"), - ), - }, - timeline: room_events, - required_state, - prev_batch, - limited, - joined_count: Some( - services() - .rooms - .state_cache - .room_joined_count(room_id)? - .map(UInt::new_saturating) - .unwrap_or(uint!(0)), - ), - invited_count: Some( - services() - .rooms - .state_cache - .room_invited_count(room_id)? - .map(UInt::new_saturating) - .unwrap_or(uint!(0)), - ), - // Count events in timeline greater than global sync counter - num_live: None, - // TODO - bump_stamp: None, - // TODO - heroes: None, - }; - trace!(room_id = room_id.as_str(), ?room, "Built room data"); - - rooms.insert(room_id.clone(), room); } if rooms @@ -589,10 +200,7 @@ pub(crate) async fn sync_events_v5_route( }) .transpose()?, e2ee: sync_events::v5::response::E2EE { - device_lists: DeviceLists { - changed: device_list_changes.into_iter().collect(), - left: device_list_left.into_iter().collect(), - }, + device_lists: device_lists.unwrap_or_default(), device_one_time_keys_count: services() .users .count_one_time_keys(&sender_user, &sender_device)?, @@ -627,3 +235,426 @@ pub(crate) async fn sync_events_v5_route( }, })) } + +#[allow(clippy::too_many_lines)] +#[tracing::instrument(skip_all)] +async fn get_e2ee_data( + sender_user: &UserId, + globalsince: u64, + all_joined_rooms: &[OwnedRoomId], +) -> Result { + // Users that have left any encrypted rooms the sender was in + let mut left_encrypted_users = HashSet::new(); + + // Look for device list updates of this account + let mut device_list_changes: HashSet<_> = services() + .users + .keys_changed(sender_user.as_ref(), globalsince, None) + .filter_map(Result::ok) + .collect(); + + for room_id in all_joined_rooms { + let Some(current_shortstatehash) = + services().rooms.state.get_room_shortstatehash(room_id)? + else { + error!(%room_id, "Room has no state"); + continue; + }; + + let since_shortstatehash = services() + .rooms + .user + .get_token_shortstatehash(room_id, globalsince)?; + + let since_sender_member: Option = + since_shortstatehash + .and_then(|shortstatehash| { + services() + .rooms + .state_accessor + .state_get( + shortstatehash, + &StateEventType::RoomMember, + sender_user.as_str(), + ) + .transpose() + }) + .transpose()? + .and_then(|pdu| { + serde_json::from_str(pdu.content.get()) + .map_err(|_| { + Error::bad_database("Invalid PDU in database.") + }) + .ok() + }); + + let encrypted_room = services() + .rooms + .state_accessor + .state_get( + current_shortstatehash, + &StateEventType::RoomEncryption, + "", + )? + .is_some(); + + if let Some(since_shortstatehash) = since_shortstatehash { + // Skip if there are only timeline changes + if since_shortstatehash == current_shortstatehash { + continue; + } + + let since_encryption = services().rooms.state_accessor.state_get( + since_shortstatehash, + &StateEventType::RoomEncryption, + "", + )?; + + let joined_since_last_sync = + since_sender_member.is_none_or(|member| { + member.membership != MembershipState::Join + }); + + let new_encrypted_room = + encrypted_room && since_encryption.is_none(); + if encrypted_room { + let current_state_ids = services() + .rooms + .state_accessor + .state_full_ids(current_shortstatehash) + .await?; + let since_state_ids = services() + .rooms + .state_accessor + .state_full_ids(since_shortstatehash) + .await?; + + for (key, event_id) in current_state_ids { + if since_state_ids.get(&key) != Some(&event_id) { + let Some(pdu) = + services().rooms.timeline.get_pdu(&event_id)? + else { + error!(%event_id, "Event in state not found"); + continue; + }; + if pdu.kind == TimelineEventType::RoomMember { + if let Some(state_key) = &pdu.state_key { + let user_id = UserId::parse(state_key.clone()) + .map_err(|_| { + Error::bad_database( + "Invalid UserId in member PDU.", + ) + })?; + + if user_id == sender_user { + continue; + } + + let new_membership = serde_json::from_str::< + RoomMemberEventContent, + >( + pdu.content.get() + ) + .map_err(|_| { + Error::bad_database( + "Invalid PDU in database.", + ) + })? + .membership; + + match new_membership { + MembershipState::Join => { + // A new user joined an encrypted + // room + if !share_encrypted_room( + sender_user, + &user_id, + room_id, + )? { + device_list_changes.insert(user_id); + } + } + MembershipState::Leave => { + // Write down users that have left + // encrypted rooms we are in + left_encrypted_users.insert(user_id); + } + _ => {} + } + } + } + } + } + if joined_since_last_sync || new_encrypted_room { + // If the user is in a new encrypted room, give them all + // joined users + device_list_changes.extend( + services() + .rooms + .state_cache + .room_members(room_id) + .flatten() + .filter(|user_id| { + // Don't send key updates from the sender to + // the sender + sender_user != user_id + }) + .filter(|user_id| { + // Only send keys if the sender doesn't + // share an encrypted room with the target + // already + !share_encrypted_room( + sender_user, + user_id, + room_id, + ) + .unwrap_or(false) + }), + ); + } + } + } + // Look for device list updates in this room + device_list_changes.extend( + services() + .users + .keys_changed(room_id.as_ref(), globalsince, None) + .filter_map(Result::ok), + ); + } + + let mut device_list_left = HashSet::new(); + for user_id in left_encrypted_users { + let dont_share_encrypted_room = services() + .rooms + .user + .get_shared_rooms(vec![sender_user.to_owned(), user_id.clone()])? + .filter_map(Result::ok) + .filter_map(|other_room_id| { + Some( + services() + .rooms + .state_accessor + .room_state_get( + &other_room_id, + &StateEventType::RoomEncryption, + "", + ) + .ok()? + .is_some(), + ) + }) + .all(|encrypted| !encrypted); + // If the user doesn't share an encrypted room with the target + // anymore, we need to tell them + if dont_share_encrypted_room { + device_list_left.insert(user_id); + } + } + + Ok(DeviceLists { + changed: device_list_changes.into_iter().collect(), + left: device_list_left.into_iter().collect(), + }) +} + +#[tracing::instrument( + skip_all, + fields(list_id = list_id, ?list), +)] +fn rooms_in_list( + list_id: &str, + list: sync_events::v5::request::List, + all_joined_rooms: &[OwnedRoomId], + known_rooms: &BTreeMap, + todo_rooms: &mut BTreeMap, +) -> Option { + trace!(list_id, ?list, "Collecting rooms in list"); + + if list.filters.and_then(|f| f.is_invite).unwrap_or(false) { + return None; + } + + let mut list_room_ids: BTreeSet = BTreeSet::new(); + for (mut from, mut to) in list.ranges { + from = from.clamp( + uint!(0), + UInt::try_from(all_joined_rooms.len() - 1).unwrap_or(UInt::MAX), + ); + to = to.clamp( + from, + UInt::try_from(all_joined_rooms.len() - 1).unwrap_or(UInt::MAX), + ); + let room_ids = all_joined_rooms[from.try_into().unwrap_or(usize::MAX) + ..=to.try_into().unwrap_or(usize::MAX)] + .to_vec(); + list_room_ids.extend(room_ids); + } + for room_id in &list_room_ids { + todo_rooms.entry(room_id.clone()).or_default().update( + list.room_details.required_state.clone(), + list.room_details.timeline_limit, + known_rooms, + room_id, + ); + } + let num_rooms = list_room_ids.len(); + trace!(list_id, num_rooms, "Done collecting rooms"); + + Some(sync_events::v5::response::List { + count: UInt::try_from(num_rooms).unwrap_or(UInt::MAX), + }) +} + +#[allow(clippy::too_many_lines)] +#[tracing::instrument(skip(sender_user))] +fn process_room( + sender_user: &UserId, + room_id: &RoomId, + todo_room: &TodoRoom, +) -> Result> { + let roomsincecount = PduCount::Normal(todo_room.roomsince); + + let (timeline_pdus, limited) = load_timeline( + sender_user, + room_id, + roomsincecount, + todo_room.timeline_limit, + )?; + + if todo_room.roomsince != 0 && timeline_pdus.is_empty() { + return Ok(None); + } + + let prev_batch = timeline_pdus + .first() + .map(|(pdu_count, _)| match pdu_count { + PduCount::Backfilled(_) => { + error!("Timeline in backfill state?!"); + "0".to_owned() + } + PduCount::Normal(c) => c.to_string(), + }) + .or_else(|| { + (todo_room.roomsince != 0).then(|| todo_room.roomsince.to_string()) + }); + + let room_events: Vec<_> = + timeline_pdus.iter().map(|(_, pdu)| pdu.to_sync_room_event()).collect(); + + let required_state = todo_room + .required_state_request + .iter() + .filter_map(|state| { + services() + .rooms + .state_accessor + .room_state_get(room_id, &state.0, &state.1) + .ok() + .flatten() + .map(|state| state.to_sync_state_event()) + }) + .collect(); + + // Heroes + let heroes = services() + .rooms + .state_cache + .room_members(room_id) + .filter_map(Result::ok) + .filter(|member| member != sender_user) + .filter_map(|member| { + services() + .rooms + .state_accessor + .get_member(room_id, &member) + .ok() + .flatten() + .map(|memberevent| { + ( + memberevent + .displayname + .unwrap_or_else(|| member.to_string()), + memberevent.avatar_url, + ) + }) + }) + .take(5) + .collect::>(); + let name = match &*heroes { + [] => None, + [(only, _)] => Some(only.clone()), + [firsts @ .., (last, _)] => Some({ + let firsts = firsts + .iter() + .map(|(name, _)| name.clone()) + .collect::>() + .join(", "); + + format!("{firsts} and {last}") + }), + }; + + let room = sync_events::v5::response::Room { + name: services().rooms.state_accessor.get_name(room_id)?.or(name), + avatar: if let [(_name, Some(avatar))] = &*heroes { + JsOption::Some(avatar.clone()) + } else { + match services().rooms.state_accessor.get_avatar(room_id)? { + JsOption::Some(avatar) => JsOption::from_option(avatar.url), + JsOption::Null => JsOption::Null, + JsOption::Undefined => JsOption::Undefined, + } + }, + initial: Some(todo_room.roomsince == 0), + is_dm: None, + invite_state: None, + unread_notifications: UnreadNotificationsCount { + highlight_count: Some( + services() + .rooms + .user + .highlight_count(sender_user, room_id)? + .try_into() + .expect("notification count can't go that high"), + ), + notification_count: Some( + services() + .rooms + .user + .notification_count(sender_user, room_id)? + .try_into() + .expect("notification count can't go that high"), + ), + }, + timeline: room_events, + required_state, + prev_batch, + limited, + joined_count: Some( + services() + .rooms + .state_cache + .room_joined_count(room_id)? + .map(UInt::new_saturating) + .unwrap_or(uint!(0)), + ), + invited_count: Some( + services() + .rooms + .state_cache + .room_invited_count(room_id)? + .map(UInt::new_saturating) + .unwrap_or(uint!(0)), + ), + // Count events in timeline greater than global sync counter + num_live: None, + // TODO + bump_stamp: None, + // TODO + heroes: None, + }; + trace!(?room, "Built room data"); + + Ok(Some(room)) +}