From 8a7f87e9b415dd03b82254ce83bf032e43fe606b Mon Sep 17 00:00:00 2001 From: Lambda Date: Thu, 30 May 2024 22:07:00 +0000 Subject: [PATCH] sync/v3: move readonly data to context struct This makes it a lot easier to factor out parts of the big sync_events_route(). --- src/api/client_server/sync/v3.rs | 261 ++++++++++++++++++------------- 1 file changed, 149 insertions(+), 112 deletions(-) diff --git a/src/api/client_server/sync/v3.rs b/src/api/client_server/sync/v3.rs index 662400a5..d3c596d8 100644 --- a/src/api/client_server/sync/v3.rs +++ b/src/api/client_server/sync/v3.rs @@ -31,6 +31,23 @@ use crate::{ services, utils, Ar, Error, PduEvent, Ra, Result, }; +struct SyncContext<'a> { + sender_user: &'a UserId, + sender_device: &'a DeviceId, + + next_batch: u64, + next_batch_string: String, + next_batchcount: PduCount, + + since: u64, + sincecount: PduCount, + + lazy_load_enabled: bool, + lazy_load_send_redundant: bool, + + full_state: bool, +} + /// # `GET /_matrix/client/r0/sync` /// /// Synchronize the client's state with the latest state on the server. @@ -93,38 +110,56 @@ pub(crate) async fn sync_events_route( // Setup watchers, so if there's no response, we can wait for them let watcher = services().globals.watch(&sender_user, &sender_device); - let next_batch = services().globals.current_count()?; - current_span.record("next_batch", next_batch); - let next_batchcount = PduCount::Normal(next_batch); - let next_batch_string = next_batch.to_string(); + let ctx = { + let next_batch = services().globals.current_count()?; + current_span.record("next_batch", next_batch); + let next_batchcount = PduCount::Normal(next_batch); + let next_batch_string = next_batch.to_string(); - // Load filter - let filter = match body.filter { - None => FilterDefinition::default(), - Some(Filter::FilterDefinition(filter)) => filter, - Some(Filter::FilterId(filter_id)) => services() - .users - .get_filter(&sender_user, &filter_id)? - .unwrap_or_default(), - }; - - let (lazy_load_enabled, lazy_load_send_redundant) = - match filter.room.state.lazy_load_options { - LazyLoadOptions::Enabled { - include_redundant_members: redundant, - } => (true, redundant), - LazyLoadOptions::Disabled => (false, false), + // Load filter + let filter = match body.filter { + None => FilterDefinition::default(), + Some(Filter::FilterDefinition(filter)) => filter, + Some(Filter::FilterId(filter_id)) => services() + .users + .get_filter(&sender_user, &filter_id)? + .unwrap_or_default(), }; - current_span.record("lazy_load_enabled", lazy_load_enabled); - current_span.record("lazy_load_send_redundant", lazy_load_send_redundant); - let full_state = body.full_state; + let (lazy_load_enabled, lazy_load_send_redundant) = + match filter.room.state.lazy_load_options { + LazyLoadOptions::Enabled { + include_redundant_members: redundant, + } => (true, redundant), + LazyLoadOptions::Disabled => (false, false), + }; + current_span.record("lazy_load_enabled", lazy_load_enabled); + current_span + .record("lazy_load_send_redundant", lazy_load_send_redundant); - let mut joined_rooms = BTreeMap::new(); - let since = - body.since.as_ref().and_then(|string| string.parse().ok()).unwrap_or(0); - current_span.record("since", since); - let sincecount = PduCount::Normal(since); + let full_state = body.full_state; + + let since = body + .since + .as_ref() + .and_then(|string| string.parse().ok()) + .unwrap_or(0); + current_span.record("since", since); + let sincecount = PduCount::Normal(since); + + SyncContext { + sender_user: &sender_user, + sender_device: &sender_device, + next_batch, + next_batch_string, + next_batchcount, + since, + sincecount, + lazy_load_enabled, + lazy_load_send_redundant, + full_state, + } + }; // Users that have left any encrypted rooms the sender was in let mut left_encrypted_users = HashSet::new(); @@ -135,28 +170,21 @@ pub(crate) async fn sync_events_route( device_list_updates.extend( services() .users - .keys_changed(sender_user.as_ref(), since, None) + .keys_changed(ctx.sender_user.as_ref(), ctx.since, None) .filter_map(Result::ok), ); + let mut joined_rooms = BTreeMap::new(); let all_joined_rooms = services() .rooms .state_cache - .rooms_joined(&sender_user) + .rooms_joined(ctx.sender_user) .collect::>(); for room_id in all_joined_rooms { let room_id = room_id?; if let Ok(joined_room) = load_joined_room( - &sender_user, - &sender_device, + &ctx, &room_id, - since, - sincecount, - next_batch, - next_batchcount, - lazy_load_enabled, - lazy_load_send_redundant, - full_state, &mut device_list_updates, &mut left_encrypted_users, ) @@ -170,7 +198,7 @@ pub(crate) async fn sync_events_route( let mut left_rooms = BTreeMap::new(); let all_left_rooms: Vec<_> = - services().rooms.state_cache.rooms_left(&sender_user).collect(); + services().rooms.state_cache.rooms_left(ctx.sender_user).collect(); for result in all_left_rooms { let (room_id, _) = result?; @@ -187,10 +215,10 @@ pub(crate) async fn sync_events_route( let left_count = services() .rooms .state_cache - .get_left_count(&room_id, &sender_user)?; + .get_left_count(&room_id, ctx.sender_user)?; // Left before last sync - if Some(since) >= left_count { + if Some(ctx.since) >= left_count { continue; } @@ -198,14 +226,14 @@ pub(crate) async fn sync_events_route( // This is just a rejected invite, not a room we know let event = PduEvent { event_id: EventId::new(services().globals.server_name()).into(), - sender: sender_user.clone(), + sender: ctx.sender_user.to_owned(), origin_server_ts: utils::millis_since_unix_epoch() .try_into() .expect("Timestamp is valid js_int value"), kind: TimelineEventType::RoomMember, content: serde_json::from_str(r#"{ "membership": "leave"}"#) .unwrap(), - state_key: Some(sender_user.to_string()), + state_key: Some(ctx.sender_user.to_string()), unsigned: None, // The following keys are dropped on conversion room_id: room_id.clone(), @@ -227,7 +255,7 @@ pub(crate) async fn sync_events_route( }, timeline: Timeline { limited: false, - prev_batch: Some(next_batch_string.clone()), + prev_batch: Some(ctx.next_batch_string.clone()), events: Vec::new(), }, state: State { @@ -241,8 +269,10 @@ pub(crate) async fn sync_events_route( let mut left_state_events = Vec::new(); - let since_shortstatehash = - services().rooms.user.get_token_shortstatehash(&room_id, since)?; + let since_shortstatehash = services() + .rooms + .user + .get_token_shortstatehash(&room_id, ctx.since)?; let since_state_ids = match since_shortstatehash { Some(s) => { @@ -255,7 +285,7 @@ pub(crate) async fn sync_events_route( services().rooms.state_accessor.room_state_get_id( &room_id, &StateEventType::RoomMember, - sender_user.as_str(), + ctx.sender_user.as_str(), )? else { error!("Left room but no left state event"); @@ -280,22 +310,22 @@ pub(crate) async fn sync_events_route( let leave_shortstatekey = services().rooms.short.get_or_create_shortstatekey( &StateEventType::RoomMember, - sender_user.as_str(), + ctx.sender_user.as_str(), )?; left_state_ids.insert(leave_shortstatekey, left_event_id); let mut i = 0; for (key, event_id) in left_state_ids { - if full_state || since_state_ids.get(&key) != Some(&event_id) { + if ctx.full_state || since_state_ids.get(&key) != Some(&event_id) { let (event_type, state_key) = services().rooms.short.get_statekey_from_short(key)?; - if !lazy_load_enabled + if !ctx.lazy_load_enabled || event_type != StateEventType::RoomMember - || full_state + || ctx.full_state // TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565 - || *sender_user == state_key + || *ctx.sender_user == state_key { let Some(pdu) = services().rooms.timeline.get_pdu(&event_id)? @@ -322,7 +352,7 @@ pub(crate) async fn sync_events_route( }, timeline: Timeline { limited: false, - prev_batch: Some(next_batch_string.clone()), + prev_batch: Some(ctx.next_batch_string.clone()), events: Vec::new(), }, state: State { @@ -334,7 +364,7 @@ pub(crate) async fn sync_events_route( let mut invited_rooms = BTreeMap::new(); let all_invited_rooms: Vec<_> = - services().rooms.state_cache.rooms_invited(&sender_user).collect(); + services().rooms.state_cache.rooms_invited(ctx.sender_user).collect(); for result in all_invited_rooms { let (room_id, invite_state_events) = result?; @@ -351,10 +381,10 @@ pub(crate) async fn sync_events_route( let invite_count = services() .rooms .state_cache - .get_invite_count(&room_id, &sender_user)?; + .get_invite_count(&room_id, ctx.sender_user)?; // Invited before last sync - if Some(since) >= invite_count { + if Some(ctx.since) >= invite_count { continue; } @@ -372,7 +402,10 @@ pub(crate) async fn sync_events_route( let dont_share_encrypted_room = services() .rooms .user - .get_shared_rooms(vec![sender_user.clone(), user_id.clone()])? + .get_shared_rooms(vec![ + ctx.sender_user.to_owned(), + user_id.clone(), + ])? .filter_map(Result::ok) .filter_map(|other_room_id| { Some( @@ -398,13 +431,13 @@ pub(crate) async fn sync_events_route( // Remove all to-device events the device received *last time* services().users.remove_to_device_events( - &sender_user, - &sender_device, - since, + ctx.sender_user, + ctx.sender_device, + ctx.since, )?; let response = sync_events::v3::Response { - next_batch: next_batch_string, + next_batch: ctx.next_batch_string, rooms: Rooms { leave: left_rooms, join: joined_rooms, @@ -416,7 +449,7 @@ pub(crate) async fn sync_events_route( account_data: GlobalAccountData { events: services() .account_data - .changes_since(None, &sender_user, since)? + .changes_since(None, ctx.sender_user, ctx.since)? .into_iter() .filter_map(|(_, v)| { serde_json::from_str(v.json().get()) @@ -435,18 +468,18 @@ pub(crate) async fn sync_events_route( }, device_one_time_keys_count: services() .users - .count_one_time_keys(&sender_user, &sender_device)?, + .count_one_time_keys(ctx.sender_user, ctx.sender_device)?, to_device: ToDevice { events: services() .users - .get_to_device_events(&sender_user, &sender_device)?, + .get_to_device_events(ctx.sender_user, ctx.sender_device)?, }, // Fallback keys are not yet supported device_unused_fallback_key_types: None, }; // TODO: Retry the endpoint instead of returning (waiting for #118) - if !full_state + if !ctx.full_state && response.rooms.is_empty() && response.presence.is_empty() && response.account_data.is_empty() @@ -470,16 +503,8 @@ pub(crate) async fn sync_events_route( #[tracing::instrument(skip_all, fields(room_id = %room_id))] #[allow(clippy::too_many_arguments, clippy::too_many_lines)] async fn load_joined_room( - sender_user: &UserId, - sender_device: &DeviceId, + ctx: &SyncContext<'_>, room_id: &RoomId, - since: u64, - sincecount: PduCount, - next_batch: u64, - next_batchcount: PduCount, - lazy_load_enabled: bool, - lazy_load_send_redundant: bool, - full_state: bool, device_list_updates: &mut HashSet, left_encrypted_users: &mut HashSet, ) -> Result { @@ -495,14 +520,14 @@ async fn load_joined_room( } let (timeline_pdus, limited) = - load_timeline(sender_user, room_id, sincecount, 10)?; + load_timeline(ctx.sender_user, room_id, ctx.sincecount, 10)?; let send_notification_counts = !timeline_pdus.is_empty() || services() .rooms .user - .last_notification_read(sender_user, room_id)? - > since; + .last_notification_read(ctx.sender_user, room_id)? + > ctx.since; let mut timeline_users = HashSet::new(); for (_, event) in &timeline_pdus { @@ -513,10 +538,10 @@ async fn load_joined_room( .rooms .lazy_loading .lazy_load_confirm_delivery( - sender_user, - sender_device, + ctx.sender_user, + ctx.sender_device, room_id, - sincecount, + ctx.sincecount, ) .await?; @@ -530,7 +555,7 @@ async fn load_joined_room( }; let since_shortstatehash = - services().rooms.user.get_token_shortstatehash(room_id, since)?; + services().rooms.user.get_token_shortstatehash(room_id, ctx.since)?; let ( heroes, @@ -568,7 +593,7 @@ async fn load_joined_room( for hero in services() .rooms .timeline - .all_pdus(sender_user, room_id)? + .all_pdus(ctx.sender_user, room_id)? .filter_map(Result::ok) .filter(|(_, pdu)| { pdu.kind == TimelineEventType::RoomMember @@ -617,7 +642,9 @@ async fn load_joined_room( .filter_map(Result::ok) .flatten() { - if heroes.contains(&hero) || hero == sender_user.as_str() { + if heroes.contains(&hero) + || hero == ctx.sender_user.as_str() + { continue; } @@ -641,7 +668,7 @@ async fn load_joined_room( .state_get( shortstatehash, &StateEventType::RoomMember, - sender_user.as_str(), + ctx.sender_user.as_str(), ) .transpose() }) @@ -692,11 +719,11 @@ async fn load_joined_room( if i % 100 == 0 { tokio::task::yield_now().await; } - } else if !lazy_load_enabled - || full_state + } else if !ctx.lazy_load_enabled + || ctx.full_state || timeline_users.contains(&state_key) // TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565 - || *sender_user == state_key + || *ctx.sender_user == state_key { let Some(pdu) = services().rooms.timeline.get_pdu(&event_id)? @@ -721,8 +748,8 @@ async fn load_joined_room( // Reset lazy loading because this is an initial sync services().rooms.lazy_loading.lazy_load_reset( - sender_user, - sender_device, + ctx.sender_user, + ctx.sender_device, room_id, )?; @@ -732,11 +759,11 @@ async fn load_joined_room( .rooms .lazy_loading .lazy_load_mark_sent( - sender_user, - sender_device, + ctx.sender_user, + ctx.sender_device, room_id, lazy_loaded, - next_batchcount, + ctx.next_batchcount, ) .await; @@ -766,7 +793,7 @@ async fn load_joined_room( .await?; for (key, event_id) in current_state_ids { - if full_state + if ctx.full_state || since_state_ids.get(&key) != Some(&event_id) { let Some(pdu) = @@ -820,7 +847,7 @@ async fn load_joined_room( ) })?; - if user_id == sender_user { + if user_id == ctx.sender_user { continue; } @@ -837,7 +864,7 @@ async fn load_joined_room( MembershipState::Join => { // A new user joined an encrypted room if !share_encrypted_room( - sender_user, + ctx.sender_user, &user_id, room_id, )? { @@ -867,13 +894,17 @@ async fn load_joined_room( .filter(|user_id| { // Don't send key updates from the sender to the // sender - sender_user != user_id + ctx.sender_user != *user_id }) .filter(|user_id| { // Only send keys if the sender doesn't share an // encrypted room with the target already - !share_encrypted_room(sender_user, user_id, room_id) - .unwrap_or(false) + !share_encrypted_room( + ctx.sender_user, + user_id, + room_id, + ) + .unwrap_or(false) }), ); } @@ -919,11 +950,11 @@ async fn load_joined_room( } if !services().rooms.lazy_loading.lazy_load_was_sent_before( - sender_user, - sender_device, + ctx.sender_user, + ctx.sender_device, room_id, &event.sender, - )? || lazy_load_send_redundant + )? || ctx.lazy_load_send_redundant { if let Some(member_event) = services().rooms.state_accessor.room_state_get( @@ -942,11 +973,11 @@ async fn load_joined_room( .rooms .lazy_loading .lazy_load_mark_sent( - sender_user, - sender_device, + ctx.sender_user, + ctx.sender_device, room_id, lazy_loaded, - next_batchcount, + ctx.next_batchcount, ) .await; @@ -964,17 +995,21 @@ async fn load_joined_room( device_list_updates.extend( services() .users - .keys_changed(room_id.as_ref(), since, None) + .keys_changed(room_id.as_ref(), ctx.since, None) .filter_map(Result::ok), ); let notification_count = send_notification_counts - .then(|| services().rooms.user.notification_count(sender_user, room_id)) + .then(|| { + services().rooms.user.notification_count(ctx.sender_user, room_id) + }) .transpose()? .map(|x| x.try_into().expect("notification count can't go that high")); let highlight_count = send_notification_counts - .then(|| services().rooms.user.highlight_count(sender_user, room_id)) + .then(|| { + services().rooms.user.highlight_count(ctx.sender_user, room_id) + }) .transpose()? .map(|x| x.try_into().expect("highlight count can't go that high")); @@ -998,12 +1033,14 @@ async fn load_joined_room( .rooms .edus .read_receipt - .readreceipts_since(room_id, since) + .readreceipts_since(room_id, ctx.since) .filter_map(Result::ok) .map(|(_, _, v)| v) .collect(); - if services().rooms.edus.typing.last_typing_update(room_id).await? > since { + if services().rooms.edus.typing.last_typing_update(room_id).await? + > ctx.since + { edus.push( serde_json::from_str( &serde_json::to_string( @@ -1019,7 +1056,7 @@ async fn load_joined_room( // sync services().rooms.user.associate_token_shortstatehash( room_id, - next_batch, + ctx.next_batch, current_shortstatehash, )?; @@ -1027,7 +1064,7 @@ async fn load_joined_room( account_data: RoomAccountData { events: services() .account_data - .changes_since(Some(room_id), sender_user, since)? + .changes_since(Some(room_id), ctx.sender_user, ctx.since)? .into_iter() .filter_map(|(_, v)| { serde_json::from_str(v.json().get())