diff --git a/src/api/client_server/sync/v3.rs b/src/api/client_server/sync/v3.rs index d3c596d8..65830053 100644 --- a/src/api/client_server/sync/v3.rs +++ b/src/api/client_server/sync/v3.rs @@ -21,7 +21,7 @@ use ruma::{ room::member::{MembershipState, RoomMemberEventContent}, StateEventType, TimelineEventType, }, - uint, DeviceId, EventId, OwnedUserId, RoomId, UInt, UserId, + uint, DeviceId, EventId, OwnedRoomId, OwnedUserId, RoomId, UInt, UserId, }; use tracing::{debug, error, field}; @@ -174,229 +174,14 @@ pub(crate) async fn sync_events_route( .filter_map(Result::ok), ); - let mut joined_rooms = BTreeMap::new(); - let all_joined_rooms = services() - .rooms - .state_cache - .rooms_joined(ctx.sender_user) - .collect::>(); - for room_id in all_joined_rooms { - let room_id = room_id?; - if let Ok(joined_room) = load_joined_room( - &ctx, - &room_id, - &mut device_list_updates, - &mut left_encrypted_users, - ) - .await - { - if !joined_room.is_empty() { - joined_rooms.insert(room_id.clone(), joined_room); - } - } - } - - let mut left_rooms = BTreeMap::new(); - let all_left_rooms: Vec<_> = - services().rooms.state_cache.rooms_left(ctx.sender_user).collect(); - for result in all_left_rooms { - let (room_id, _) = result?; - - { - // Get and drop the lock to wait for remaining operations to finish - let room_token = services() - .globals - .roomid_mutex_insert - .lock_key(room_id.clone()) - .await; - drop(room_token); - } - - let left_count = services() - .rooms - .state_cache - .get_left_count(&room_id, ctx.sender_user)?; - - // Left before last sync - if Some(ctx.since) >= left_count { - continue; - } - - if !services().rooms.metadata.exists(&room_id)? { - // This is just a rejected invite, not a room we know - let event = PduEvent { - event_id: EventId::new(services().globals.server_name()).into(), - sender: ctx.sender_user.to_owned(), - origin_server_ts: utils::millis_since_unix_epoch() - .try_into() - .expect("Timestamp is valid js_int value"), - kind: TimelineEventType::RoomMember, - content: serde_json::from_str(r#"{ "membership": "leave"}"#) - .unwrap(), - state_key: Some(ctx.sender_user.to_string()), - unsigned: None, - // The following keys are dropped on conversion - room_id: room_id.clone(), - prev_events: vec![], - depth: uint!(1), - auth_events: vec![], - redacts: None, - hashes: EventHash { - sha256: String::new(), - }, - signatures: None, - }; - - left_rooms.insert( - room_id, - LeftRoom { - account_data: RoomAccountData { - events: Vec::new(), - }, - timeline: Timeline { - limited: false, - prev_batch: Some(ctx.next_batch_string.clone()), - events: Vec::new(), - }, - state: State { - events: vec![event.to_sync_state_event()], - }, - }, - ); - - continue; - } - - let mut left_state_events = Vec::new(); - - let since_shortstatehash = services() - .rooms - .user - .get_token_shortstatehash(&room_id, ctx.since)?; - - let since_state_ids = match since_shortstatehash { - Some(s) => { - services().rooms.state_accessor.state_full_ids(s).await? - } - None => HashMap::new(), - }; - - let Some(left_event_id) = - services().rooms.state_accessor.room_state_get_id( - &room_id, - &StateEventType::RoomMember, - ctx.sender_user.as_str(), - )? - else { - error!("Left room but no left state event"); - continue; - }; - - let Some(left_shortstatehash) = services() - .rooms - .state_accessor - .pdu_shortstatehash(&left_event_id)? - else { - error!("Leave event has no state"); - continue; - }; - - let mut left_state_ids = services() - .rooms - .state_accessor - .state_full_ids(left_shortstatehash) - .await?; - - let leave_shortstatekey = - services().rooms.short.get_or_create_shortstatekey( - &StateEventType::RoomMember, - ctx.sender_user.as_str(), - )?; - - left_state_ids.insert(leave_shortstatekey, left_event_id); - - let mut i = 0; - for (key, event_id) in left_state_ids { - if ctx.full_state || since_state_ids.get(&key) != Some(&event_id) { - let (event_type, state_key) = - services().rooms.short.get_statekey_from_short(key)?; - - if !ctx.lazy_load_enabled - || event_type != StateEventType::RoomMember - || ctx.full_state - // TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565 - || *ctx.sender_user == state_key - { - let Some(pdu) = - services().rooms.timeline.get_pdu(&event_id)? - else { - error!(%event_id, "Event in state not found"); - continue; - }; - - left_state_events.push(pdu.to_sync_state_event()); - - i += 1; - if i % 100 == 0 { - tokio::task::yield_now().await; - } - } - } - } - - left_rooms.insert( - room_id.clone(), - LeftRoom { - account_data: RoomAccountData { - events: Vec::new(), - }, - timeline: Timeline { - limited: false, - prev_batch: Some(ctx.next_batch_string.clone()), - events: Vec::new(), - }, - state: State { - events: left_state_events, - }, - }, - ); - } - - let mut invited_rooms = BTreeMap::new(); - let all_invited_rooms: Vec<_> = - services().rooms.state_cache.rooms_invited(ctx.sender_user).collect(); - for result in all_invited_rooms { - let (room_id, invite_state_events) = result?; - - { - // Get and drop the lock to wait for remaining operations to finish - let room_token = services() - .globals - .roomid_mutex_insert - .lock_key(room_id.clone()) - .await; - drop(room_token); - } - - let invite_count = services() - .rooms - .state_cache - .get_invite_count(&room_id, ctx.sender_user)?; - - // Invited before last sync - if Some(ctx.since) >= invite_count { - continue; - } - - invited_rooms.insert( - room_id.clone(), - InvitedRoom { - invite_state: InviteState { - events: invite_state_events, - }, - }, - ); - } + let joined_rooms = collect_joined_rooms( + &ctx, + &mut device_list_updates, + &mut left_encrypted_users, + ) + .await?; + let left_rooms = collect_left_rooms(&ctx).await?; + let invited_rooms = collect_invited_rooms(&ctx).await?; for user_id in left_encrypted_users { let dont_share_encrypted_room = services() @@ -500,6 +285,37 @@ pub(crate) async fn sync_events_route( Ok(Ra(response)) } +#[tracing::instrument(skip_all)] +async fn collect_joined_rooms( + ctx: &SyncContext<'_>, + device_list_updates: &mut HashSet, + left_encrypted_users: &mut HashSet, +) -> Result> { + let mut joined_rooms = BTreeMap::new(); + let all_joined_rooms = services() + .rooms + .state_cache + .rooms_joined(ctx.sender_user) + .collect::>(); + for room_id in all_joined_rooms { + let room_id = room_id?; + if let Ok(joined_room) = load_joined_room( + ctx, + &room_id, + device_list_updates, + left_encrypted_users, + ) + .await + { + if !joined_room.is_empty() { + joined_rooms.insert(room_id.clone(), joined_room); + } + } + } + + Ok(joined_rooms) +} + #[tracing::instrument(skip_all, fields(room_id = %room_id))] #[allow(clippy::too_many_arguments, clippy::too_many_lines)] async fn load_joined_room( @@ -1104,3 +920,219 @@ async fn load_joined_room( unread_thread_notifications: BTreeMap::new(), }) } + +#[tracing::instrument(skip_all)] +async fn collect_left_rooms( + ctx: &SyncContext<'_>, +) -> Result> { + let mut left_rooms = BTreeMap::new(); + let all_left_rooms: Vec<_> = + services().rooms.state_cache.rooms_left(ctx.sender_user).collect(); + for result in all_left_rooms { + let (room_id, _) = result?; + + { + // Get and drop the lock to wait for remaining operations to finish + let room_token = services() + .globals + .roomid_mutex_insert + .lock_key(room_id.clone()) + .await; + drop(room_token); + } + + let left_count = services() + .rooms + .state_cache + .get_left_count(&room_id, ctx.sender_user)?; + + // Left before last sync + if Some(ctx.since) >= left_count { + continue; + } + + if !services().rooms.metadata.exists(&room_id)? { + // This is just a rejected invite, not a room we know + let event = PduEvent { + event_id: EventId::new(services().globals.server_name()).into(), + sender: ctx.sender_user.to_owned(), + origin_server_ts: utils::millis_since_unix_epoch() + .try_into() + .expect("Timestamp is valid js_int value"), + kind: TimelineEventType::RoomMember, + content: serde_json::from_str(r#"{ "membership": "leave"}"#) + .unwrap(), + state_key: Some(ctx.sender_user.to_string()), + unsigned: None, + // The following keys are dropped on conversion + room_id: room_id.clone(), + prev_events: vec![], + depth: uint!(1), + auth_events: vec![], + redacts: None, + hashes: EventHash { + sha256: String::new(), + }, + signatures: None, + }; + + left_rooms.insert( + room_id, + LeftRoom { + account_data: RoomAccountData { + events: Vec::new(), + }, + timeline: Timeline { + limited: false, + prev_batch: Some(ctx.next_batch_string.clone()), + events: Vec::new(), + }, + state: State { + events: vec![event.to_sync_state_event()], + }, + }, + ); + + continue; + } + + let mut left_state_events = Vec::new(); + + let since_shortstatehash = services() + .rooms + .user + .get_token_shortstatehash(&room_id, ctx.since)?; + + let since_state_ids = match since_shortstatehash { + Some(s) => { + services().rooms.state_accessor.state_full_ids(s).await? + } + None => HashMap::new(), + }; + + let Some(left_event_id) = + services().rooms.state_accessor.room_state_get_id( + &room_id, + &StateEventType::RoomMember, + ctx.sender_user.as_str(), + )? + else { + error!("Left room but no left state event"); + continue; + }; + + let Some(left_shortstatehash) = services() + .rooms + .state_accessor + .pdu_shortstatehash(&left_event_id)? + else { + error!("Leave event has no state"); + continue; + }; + + let mut left_state_ids = services() + .rooms + .state_accessor + .state_full_ids(left_shortstatehash) + .await?; + + let leave_shortstatekey = + services().rooms.short.get_or_create_shortstatekey( + &StateEventType::RoomMember, + ctx.sender_user.as_str(), + )?; + + left_state_ids.insert(leave_shortstatekey, left_event_id); + + let mut i = 0; + for (key, event_id) in left_state_ids { + if ctx.full_state || since_state_ids.get(&key) != Some(&event_id) { + let (event_type, state_key) = + services().rooms.short.get_statekey_from_short(key)?; + + if !ctx.lazy_load_enabled + || event_type != StateEventType::RoomMember + || ctx.full_state + // TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565 + || *ctx.sender_user == state_key + { + let Some(pdu) = + services().rooms.timeline.get_pdu(&event_id)? + else { + error!(%event_id, "Event in state not found"); + continue; + }; + + left_state_events.push(pdu.to_sync_state_event()); + + i += 1; + if i % 100 == 0 { + tokio::task::yield_now().await; + } + } + } + } + + left_rooms.insert( + room_id.clone(), + LeftRoom { + account_data: RoomAccountData { + events: Vec::new(), + }, + timeline: Timeline { + limited: false, + prev_batch: Some(ctx.next_batch_string.clone()), + events: Vec::new(), + }, + state: State { + events: left_state_events, + }, + }, + ); + } + + Ok(left_rooms) +} + +#[tracing::instrument(skip_all)] +async fn collect_invited_rooms( + ctx: &SyncContext<'_>, +) -> Result> { + let mut invited_rooms = BTreeMap::new(); + let all_invited_rooms: Vec<_> = + services().rooms.state_cache.rooms_invited(ctx.sender_user).collect(); + for result in all_invited_rooms { + let (room_id, invite_state_events) = result?; + + { + // Get and drop the lock to wait for remaining operations to finish + let room_token = services() + .globals + .roomid_mutex_insert + .lock_key(room_id.clone()) + .await; + drop(room_token); + } + + let invite_count = services() + .rooms + .state_cache + .get_invite_count(&room_id, ctx.sender_user)?; + + // Invited before last sync + if Some(ctx.since) >= invite_count { + continue; + } + + invited_rooms.insert( + room_id.clone(), + InvitedRoom { + invite_state: InviteState { + events: invite_state_events, + }, + }, + ); + } + + Ok(invited_rooms) +}