diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index 45d617b2..129b1d52 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -23,7 +23,8 @@ use ruma::{ room::member::{MembershipState, RoomMemberEventContent}, StateEventType, TimelineEventType, }, - uint, DeviceId, EventId, JsOption, OwnedUserId, RoomId, UInt, UserId, + uint, DeviceId, EventId, JsOption, OwnedRoomId, OwnedUserId, RoomId, UInt, + UserId, }; use tracing::{debug, error}; @@ -157,168 +158,16 @@ pub(crate) async fn sync_events_route( let all_left_rooms: Vec<_> = services().rooms.state_cache.rooms_left(&sender_user).collect(); for result in all_left_rooms { - let (room_id, _) = result?; - - { - // Get and drop the lock to wait for remaining operations to finish - let mutex_insert = Arc::clone( - services() - .globals - .roomid_mutex_insert - .write() - .await - .entry(room_id.clone()) - .or_default(), - ); - let insert_lock = mutex_insert.lock().await; - drop(insert_lock); - } - - let left_count = services() - .rooms - .state_cache - .get_left_count(&room_id, &sender_user)?; - - // Left before last sync - if Some(since) >= left_count { - continue; - } - - if !services().rooms.metadata.exists(&room_id)? { - // This is just a rejected invite, not a room we know - let event = PduEvent { - event_id: EventId::new(services().globals.server_name()).into(), - sender: sender_user.clone(), - origin_server_ts: utils::millis_since_unix_epoch() - .try_into() - .expect("Timestamp is valid js_int value"), - kind: TimelineEventType::RoomMember, - content: serde_json::from_str(r#"{ "membership": "leave"}"#) - .unwrap(), - state_key: Some(sender_user.to_string()), - unsigned: None, - // The following keys are dropped on conversion - room_id: room_id.clone(), - prev_events: vec![], - depth: uint!(1), - auth_events: vec![], - redacts: None, - hashes: EventHash { - sha256: String::new(), - }, - signatures: None, - }; - - left_rooms.insert( - room_id, - LeftRoom { - account_data: RoomAccountData { - events: Vec::new(), - }, - timeline: Timeline { - limited: false, - prev_batch: Some(next_batch_string.clone()), - events: Vec::new(), - }, - state: State { - events: vec![event.to_sync_state_event()], - }, - }, - ); - - continue; - } - - let mut left_state_events = Vec::new(); - - let since_shortstatehash = - services().rooms.user.get_token_shortstatehash(&room_id, since)?; - - let since_state_ids = match since_shortstatehash { - Some(s) => { - services().rooms.state_accessor.state_full_ids(s).await? - } - None => HashMap::new(), - }; - - let Some(left_event_id) = - services().rooms.state_accessor.room_state_get_id( - &room_id, - &StateEventType::RoomMember, - sender_user.as_str(), - )? - else { - error!("Left room but no left state event"); - continue; - }; - - let Some(left_shortstatehash) = services() - .rooms - .state_accessor - .pdu_shortstatehash(&left_event_id)? - else { - error!("Leave event has no state"); - continue; - }; - - let mut left_state_ids = services() - .rooms - .state_accessor - .state_full_ids(left_shortstatehash) - .await?; - - let leave_shortstatekey = - services().rooms.short.get_or_create_shortstatekey( - &StateEventType::RoomMember, - sender_user.as_str(), - )?; - - left_state_ids.insert(leave_shortstatekey, left_event_id); - - let mut i = 0; - for (key, id) in left_state_ids { - if full_state || since_state_ids.get(&key) != Some(&id) { - let (event_type, state_key) = - services().rooms.short.get_statekey_from_short(key)?; - - if !lazy_load_enabled - || event_type != StateEventType::RoomMember - || full_state - // TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565 - || *sender_user == state_key - { - let Some(pdu) = services().rooms.timeline.get_pdu(&id)? - else { - error!("Pdu in state not found: {}", id); - continue; - }; - - left_state_events.push(pdu.to_sync_state_event()); - - i += 1; - if i % 100 == 0 { - tokio::task::yield_now().await; - } - } - } - } - - left_rooms.insert( - room_id.clone(), - LeftRoom { - account_data: RoomAccountData { - events: Vec::new(), - }, - timeline: Timeline { - limited: false, - prev_batch: Some(next_batch_string.clone()), - events: Vec::new(), - }, - state: State { - events: left_state_events, - }, - }, - ); + handle_left_room( + result?.0, + &sender_user, + &mut left_rooms, + since, + &next_batch_string, + full_state, + lazy_load_enabled, + ) + .await?; } let mut invited_rooms = BTreeMap::new(); @@ -1058,6 +907,179 @@ async fn load_joined_room( }) } +#[tracing::instrument( + skip_all, + fields( + user_id = %sender_user, + room_id = %room_id, + ), +)] +async fn handle_left_room( + room_id: OwnedRoomId, + sender_user: &UserId, + left_rooms: &mut BTreeMap, + since: u64, + next_batch_string: &str, + full_state: bool, + lazy_load_enabled: bool, +) -> Result<()> { + { + // Get and drop the lock to wait for remaining operations to finish + let mutex_insert = Arc::clone( + services() + .globals + .roomid_mutex_insert + .write() + .await + .entry(room_id.clone()) + .or_default(), + ); + let insert_lock = mutex_insert.lock().await; + drop(insert_lock); + } + + let left_count = + services().rooms.state_cache.get_left_count(&room_id, sender_user)?; + + // Left before last sync + if Some(since) >= left_count { + return Ok(()); + } + + if !services().rooms.metadata.exists(&room_id)? { + // This is just a rejected invite, not a room we know + let event = PduEvent { + event_id: EventId::new(services().globals.server_name()).into(), + sender: sender_user.to_owned(), + origin_server_ts: utils::millis_since_unix_epoch() + .try_into() + .expect("Timestamp is valid js_int value"), + kind: TimelineEventType::RoomMember, + content: serde_json::from_str(r#"{ "membership": "leave"}"#) + .unwrap(), + state_key: Some(sender_user.to_string()), + unsigned: None, + // The following keys are dropped on conversion + room_id: room_id.clone(), + prev_events: vec![], + depth: uint!(1), + auth_events: vec![], + redacts: None, + hashes: EventHash { + sha256: String::new(), + }, + signatures: None, + }; + + left_rooms.insert( + room_id, + LeftRoom { + account_data: RoomAccountData { + events: Vec::new(), + }, + timeline: Timeline { + limited: false, + prev_batch: Some(next_batch_string.to_owned()), + events: Vec::new(), + }, + state: State { + events: vec![event.to_sync_state_event()], + }, + }, + ); + + return Ok(()); + } + + let mut left_state_events = Vec::new(); + + let since_shortstatehash = + services().rooms.user.get_token_shortstatehash(&room_id, since)?; + + let since_state_ids = match since_shortstatehash { + Some(s) => services().rooms.state_accessor.state_full_ids(s).await?, + None => HashMap::new(), + }; + + let Some(left_event_id) = + services().rooms.state_accessor.room_state_get_id( + &room_id, + &StateEventType::RoomMember, + sender_user.as_str(), + )? + else { + error!("Left room but no left state event"); + return Ok(()); + }; + + let Some(left_shortstatehash) = + services().rooms.state_accessor.pdu_shortstatehash(&left_event_id)? + else { + error!("Leave event has no state"); + return Ok(()); + }; + + let mut left_state_ids = services() + .rooms + .state_accessor + .state_full_ids(left_shortstatehash) + .await?; + + let leave_shortstatekey = + services().rooms.short.get_or_create_shortstatekey( + &StateEventType::RoomMember, + sender_user.as_str(), + )?; + + left_state_ids.insert(leave_shortstatekey, left_event_id); + + let mut i = 0; + for (key, id) in left_state_ids { + if full_state || since_state_ids.get(&key) != Some(&id) { + let (event_type, state_key) = + services().rooms.short.get_statekey_from_short(key)?; + + if !lazy_load_enabled + || event_type != StateEventType::RoomMember + || full_state + // TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565 + || *sender_user == state_key + { + let Some(pdu) = services().rooms.timeline.get_pdu(&id)? else { + error!("Pdu in state not found: {}", id); + continue; + }; + + left_state_events.push(pdu.to_sync_state_event()); + + i += 1; + if i % 100 == 0 { + tokio::task::yield_now().await; + } + } + } + } + + left_rooms.insert( + room_id.clone(), + LeftRoom { + account_data: RoomAccountData { + events: Vec::new(), + }, + timeline: Timeline { + limited: false, + prev_batch: Some(next_batch_string.to_owned()), + events: Vec::new(), + }, + state: State { + events: left_state_events, + }, + }, + ); + + Ok(()) +} + fn load_timeline( sender_user: &UserId, room_id: &RoomId,