sync/v3: move readonly data to context struct

This makes it a lot easier to factor out parts of the big
sync_events_route().
This commit is contained in:
Lambda 2024-05-30 22:07:00 +00:00
parent 55a04f77c6
commit 8a7f87e9b4

View file

@ -31,6 +31,23 @@ use crate::{
services, utils, Ar, Error, PduEvent, Ra, Result, services, utils, Ar, Error, PduEvent, Ra, Result,
}; };
struct SyncContext<'a> {
sender_user: &'a UserId,
sender_device: &'a DeviceId,
next_batch: u64,
next_batch_string: String,
next_batchcount: PduCount,
since: u64,
sincecount: PduCount,
lazy_load_enabled: bool,
lazy_load_send_redundant: bool,
full_state: bool,
}
/// # `GET /_matrix/client/r0/sync` /// # `GET /_matrix/client/r0/sync`
/// ///
/// Synchronize the client's state with the latest state on the server. /// Synchronize the client's state with the latest state on the server.
@ -93,6 +110,7 @@ pub(crate) async fn sync_events_route(
// Setup watchers, so if there's no response, we can wait for them // Setup watchers, so if there's no response, we can wait for them
let watcher = services().globals.watch(&sender_user, &sender_device); let watcher = services().globals.watch(&sender_user, &sender_device);
let ctx = {
let next_batch = services().globals.current_count()?; let next_batch = services().globals.current_count()?;
current_span.record("next_batch", next_batch); current_span.record("next_batch", next_batch);
let next_batchcount = PduCount::Normal(next_batch); let next_batchcount = PduCount::Normal(next_batch);
@ -116,16 +134,33 @@ pub(crate) async fn sync_events_route(
LazyLoadOptions::Disabled => (false, false), LazyLoadOptions::Disabled => (false, false),
}; };
current_span.record("lazy_load_enabled", lazy_load_enabled); current_span.record("lazy_load_enabled", lazy_load_enabled);
current_span.record("lazy_load_send_redundant", lazy_load_send_redundant); current_span
.record("lazy_load_send_redundant", lazy_load_send_redundant);
let full_state = body.full_state; let full_state = body.full_state;
let mut joined_rooms = BTreeMap::new(); let since = body
let since = .since
body.since.as_ref().and_then(|string| string.parse().ok()).unwrap_or(0); .as_ref()
.and_then(|string| string.parse().ok())
.unwrap_or(0);
current_span.record("since", since); current_span.record("since", since);
let sincecount = PduCount::Normal(since); let sincecount = PduCount::Normal(since);
SyncContext {
sender_user: &sender_user,
sender_device: &sender_device,
next_batch,
next_batch_string,
next_batchcount,
since,
sincecount,
lazy_load_enabled,
lazy_load_send_redundant,
full_state,
}
};
// Users that have left any encrypted rooms the sender was in // Users that have left any encrypted rooms the sender was in
let mut left_encrypted_users = HashSet::new(); let mut left_encrypted_users = HashSet::new();
let mut device_list_updates = HashSet::new(); let mut device_list_updates = HashSet::new();
@ -135,28 +170,21 @@ pub(crate) async fn sync_events_route(
device_list_updates.extend( device_list_updates.extend(
services() services()
.users .users
.keys_changed(sender_user.as_ref(), since, None) .keys_changed(ctx.sender_user.as_ref(), ctx.since, None)
.filter_map(Result::ok), .filter_map(Result::ok),
); );
let mut joined_rooms = BTreeMap::new();
let all_joined_rooms = services() let all_joined_rooms = services()
.rooms .rooms
.state_cache .state_cache
.rooms_joined(&sender_user) .rooms_joined(ctx.sender_user)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
for room_id in all_joined_rooms { for room_id in all_joined_rooms {
let room_id = room_id?; let room_id = room_id?;
if let Ok(joined_room) = load_joined_room( if let Ok(joined_room) = load_joined_room(
&sender_user, &ctx,
&sender_device,
&room_id, &room_id,
since,
sincecount,
next_batch,
next_batchcount,
lazy_load_enabled,
lazy_load_send_redundant,
full_state,
&mut device_list_updates, &mut device_list_updates,
&mut left_encrypted_users, &mut left_encrypted_users,
) )
@ -170,7 +198,7 @@ pub(crate) async fn sync_events_route(
let mut left_rooms = BTreeMap::new(); let mut left_rooms = BTreeMap::new();
let all_left_rooms: Vec<_> = let all_left_rooms: Vec<_> =
services().rooms.state_cache.rooms_left(&sender_user).collect(); services().rooms.state_cache.rooms_left(ctx.sender_user).collect();
for result in all_left_rooms { for result in all_left_rooms {
let (room_id, _) = result?; let (room_id, _) = result?;
@ -187,10 +215,10 @@ pub(crate) async fn sync_events_route(
let left_count = services() let left_count = services()
.rooms .rooms
.state_cache .state_cache
.get_left_count(&room_id, &sender_user)?; .get_left_count(&room_id, ctx.sender_user)?;
// Left before last sync // Left before last sync
if Some(since) >= left_count { if Some(ctx.since) >= left_count {
continue; continue;
} }
@ -198,14 +226,14 @@ pub(crate) async fn sync_events_route(
// This is just a rejected invite, not a room we know // This is just a rejected invite, not a room we know
let event = PduEvent { let event = PduEvent {
event_id: EventId::new(services().globals.server_name()).into(), event_id: EventId::new(services().globals.server_name()).into(),
sender: sender_user.clone(), sender: ctx.sender_user.to_owned(),
origin_server_ts: utils::millis_since_unix_epoch() origin_server_ts: utils::millis_since_unix_epoch()
.try_into() .try_into()
.expect("Timestamp is valid js_int value"), .expect("Timestamp is valid js_int value"),
kind: TimelineEventType::RoomMember, kind: TimelineEventType::RoomMember,
content: serde_json::from_str(r#"{ "membership": "leave"}"#) content: serde_json::from_str(r#"{ "membership": "leave"}"#)
.unwrap(), .unwrap(),
state_key: Some(sender_user.to_string()), state_key: Some(ctx.sender_user.to_string()),
unsigned: None, unsigned: None,
// The following keys are dropped on conversion // The following keys are dropped on conversion
room_id: room_id.clone(), room_id: room_id.clone(),
@ -227,7 +255,7 @@ pub(crate) async fn sync_events_route(
}, },
timeline: Timeline { timeline: Timeline {
limited: false, limited: false,
prev_batch: Some(next_batch_string.clone()), prev_batch: Some(ctx.next_batch_string.clone()),
events: Vec::new(), events: Vec::new(),
}, },
state: State { state: State {
@ -241,8 +269,10 @@ pub(crate) async fn sync_events_route(
let mut left_state_events = Vec::new(); let mut left_state_events = Vec::new();
let since_shortstatehash = let since_shortstatehash = services()
services().rooms.user.get_token_shortstatehash(&room_id, since)?; .rooms
.user
.get_token_shortstatehash(&room_id, ctx.since)?;
let since_state_ids = match since_shortstatehash { let since_state_ids = match since_shortstatehash {
Some(s) => { Some(s) => {
@ -255,7 +285,7 @@ pub(crate) async fn sync_events_route(
services().rooms.state_accessor.room_state_get_id( services().rooms.state_accessor.room_state_get_id(
&room_id, &room_id,
&StateEventType::RoomMember, &StateEventType::RoomMember,
sender_user.as_str(), ctx.sender_user.as_str(),
)? )?
else { else {
error!("Left room but no left state event"); error!("Left room but no left state event");
@ -280,22 +310,22 @@ pub(crate) async fn sync_events_route(
let leave_shortstatekey = let leave_shortstatekey =
services().rooms.short.get_or_create_shortstatekey( services().rooms.short.get_or_create_shortstatekey(
&StateEventType::RoomMember, &StateEventType::RoomMember,
sender_user.as_str(), ctx.sender_user.as_str(),
)?; )?;
left_state_ids.insert(leave_shortstatekey, left_event_id); left_state_ids.insert(leave_shortstatekey, left_event_id);
let mut i = 0; let mut i = 0;
for (key, event_id) in left_state_ids { for (key, event_id) in left_state_ids {
if full_state || since_state_ids.get(&key) != Some(&event_id) { if ctx.full_state || since_state_ids.get(&key) != Some(&event_id) {
let (event_type, state_key) = let (event_type, state_key) =
services().rooms.short.get_statekey_from_short(key)?; services().rooms.short.get_statekey_from_short(key)?;
if !lazy_load_enabled if !ctx.lazy_load_enabled
|| event_type != StateEventType::RoomMember || event_type != StateEventType::RoomMember
|| full_state || ctx.full_state
// TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565 // TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
|| *sender_user == state_key || *ctx.sender_user == state_key
{ {
let Some(pdu) = let Some(pdu) =
services().rooms.timeline.get_pdu(&event_id)? services().rooms.timeline.get_pdu(&event_id)?
@ -322,7 +352,7 @@ pub(crate) async fn sync_events_route(
}, },
timeline: Timeline { timeline: Timeline {
limited: false, limited: false,
prev_batch: Some(next_batch_string.clone()), prev_batch: Some(ctx.next_batch_string.clone()),
events: Vec::new(), events: Vec::new(),
}, },
state: State { state: State {
@ -334,7 +364,7 @@ pub(crate) async fn sync_events_route(
let mut invited_rooms = BTreeMap::new(); let mut invited_rooms = BTreeMap::new();
let all_invited_rooms: Vec<_> = let all_invited_rooms: Vec<_> =
services().rooms.state_cache.rooms_invited(&sender_user).collect(); services().rooms.state_cache.rooms_invited(ctx.sender_user).collect();
for result in all_invited_rooms { for result in all_invited_rooms {
let (room_id, invite_state_events) = result?; let (room_id, invite_state_events) = result?;
@ -351,10 +381,10 @@ pub(crate) async fn sync_events_route(
let invite_count = services() let invite_count = services()
.rooms .rooms
.state_cache .state_cache
.get_invite_count(&room_id, &sender_user)?; .get_invite_count(&room_id, ctx.sender_user)?;
// Invited before last sync // Invited before last sync
if Some(since) >= invite_count { if Some(ctx.since) >= invite_count {
continue; continue;
} }
@ -372,7 +402,10 @@ pub(crate) async fn sync_events_route(
let dont_share_encrypted_room = services() let dont_share_encrypted_room = services()
.rooms .rooms
.user .user
.get_shared_rooms(vec![sender_user.clone(), user_id.clone()])? .get_shared_rooms(vec![
ctx.sender_user.to_owned(),
user_id.clone(),
])?
.filter_map(Result::ok) .filter_map(Result::ok)
.filter_map(|other_room_id| { .filter_map(|other_room_id| {
Some( Some(
@ -398,13 +431,13 @@ pub(crate) async fn sync_events_route(
// Remove all to-device events the device received *last time* // Remove all to-device events the device received *last time*
services().users.remove_to_device_events( services().users.remove_to_device_events(
&sender_user, ctx.sender_user,
&sender_device, ctx.sender_device,
since, ctx.since,
)?; )?;
let response = sync_events::v3::Response { let response = sync_events::v3::Response {
next_batch: next_batch_string, next_batch: ctx.next_batch_string,
rooms: Rooms { rooms: Rooms {
leave: left_rooms, leave: left_rooms,
join: joined_rooms, join: joined_rooms,
@ -416,7 +449,7 @@ pub(crate) async fn sync_events_route(
account_data: GlobalAccountData { account_data: GlobalAccountData {
events: services() events: services()
.account_data .account_data
.changes_since(None, &sender_user, since)? .changes_since(None, ctx.sender_user, ctx.since)?
.into_iter() .into_iter()
.filter_map(|(_, v)| { .filter_map(|(_, v)| {
serde_json::from_str(v.json().get()) serde_json::from_str(v.json().get())
@ -435,18 +468,18 @@ pub(crate) async fn sync_events_route(
}, },
device_one_time_keys_count: services() device_one_time_keys_count: services()
.users .users
.count_one_time_keys(&sender_user, &sender_device)?, .count_one_time_keys(ctx.sender_user, ctx.sender_device)?,
to_device: ToDevice { to_device: ToDevice {
events: services() events: services()
.users .users
.get_to_device_events(&sender_user, &sender_device)?, .get_to_device_events(ctx.sender_user, ctx.sender_device)?,
}, },
// Fallback keys are not yet supported // Fallback keys are not yet supported
device_unused_fallback_key_types: None, device_unused_fallback_key_types: None,
}; };
// TODO: Retry the endpoint instead of returning (waiting for #118) // TODO: Retry the endpoint instead of returning (waiting for #118)
if !full_state if !ctx.full_state
&& response.rooms.is_empty() && response.rooms.is_empty()
&& response.presence.is_empty() && response.presence.is_empty()
&& response.account_data.is_empty() && response.account_data.is_empty()
@ -470,16 +503,8 @@ pub(crate) async fn sync_events_route(
#[tracing::instrument(skip_all, fields(room_id = %room_id))] #[tracing::instrument(skip_all, fields(room_id = %room_id))]
#[allow(clippy::too_many_arguments, clippy::too_many_lines)] #[allow(clippy::too_many_arguments, clippy::too_many_lines)]
async fn load_joined_room( async fn load_joined_room(
sender_user: &UserId, ctx: &SyncContext<'_>,
sender_device: &DeviceId,
room_id: &RoomId, room_id: &RoomId,
since: u64,
sincecount: PduCount,
next_batch: u64,
next_batchcount: PduCount,
lazy_load_enabled: bool,
lazy_load_send_redundant: bool,
full_state: bool,
device_list_updates: &mut HashSet<OwnedUserId>, device_list_updates: &mut HashSet<OwnedUserId>,
left_encrypted_users: &mut HashSet<OwnedUserId>, left_encrypted_users: &mut HashSet<OwnedUserId>,
) -> Result<JoinedRoom> { ) -> Result<JoinedRoom> {
@ -495,14 +520,14 @@ async fn load_joined_room(
} }
let (timeline_pdus, limited) = let (timeline_pdus, limited) =
load_timeline(sender_user, room_id, sincecount, 10)?; load_timeline(ctx.sender_user, room_id, ctx.sincecount, 10)?;
let send_notification_counts = !timeline_pdus.is_empty() let send_notification_counts = !timeline_pdus.is_empty()
|| services() || services()
.rooms .rooms
.user .user
.last_notification_read(sender_user, room_id)? .last_notification_read(ctx.sender_user, room_id)?
> since; > ctx.since;
let mut timeline_users = HashSet::new(); let mut timeline_users = HashSet::new();
for (_, event) in &timeline_pdus { for (_, event) in &timeline_pdus {
@ -513,10 +538,10 @@ async fn load_joined_room(
.rooms .rooms
.lazy_loading .lazy_loading
.lazy_load_confirm_delivery( .lazy_load_confirm_delivery(
sender_user, ctx.sender_user,
sender_device, ctx.sender_device,
room_id, room_id,
sincecount, ctx.sincecount,
) )
.await?; .await?;
@ -530,7 +555,7 @@ async fn load_joined_room(
}; };
let since_shortstatehash = let since_shortstatehash =
services().rooms.user.get_token_shortstatehash(room_id, since)?; services().rooms.user.get_token_shortstatehash(room_id, ctx.since)?;
let ( let (
heroes, heroes,
@ -568,7 +593,7 @@ async fn load_joined_room(
for hero in services() for hero in services()
.rooms .rooms
.timeline .timeline
.all_pdus(sender_user, room_id)? .all_pdus(ctx.sender_user, room_id)?
.filter_map(Result::ok) .filter_map(Result::ok)
.filter(|(_, pdu)| { .filter(|(_, pdu)| {
pdu.kind == TimelineEventType::RoomMember pdu.kind == TimelineEventType::RoomMember
@ -617,7 +642,9 @@ async fn load_joined_room(
.filter_map(Result::ok) .filter_map(Result::ok)
.flatten() .flatten()
{ {
if heroes.contains(&hero) || hero == sender_user.as_str() { if heroes.contains(&hero)
|| hero == ctx.sender_user.as_str()
{
continue; continue;
} }
@ -641,7 +668,7 @@ async fn load_joined_room(
.state_get( .state_get(
shortstatehash, shortstatehash,
&StateEventType::RoomMember, &StateEventType::RoomMember,
sender_user.as_str(), ctx.sender_user.as_str(),
) )
.transpose() .transpose()
}) })
@ -692,11 +719,11 @@ async fn load_joined_room(
if i % 100 == 0 { if i % 100 == 0 {
tokio::task::yield_now().await; tokio::task::yield_now().await;
} }
} else if !lazy_load_enabled } else if !ctx.lazy_load_enabled
|| full_state || ctx.full_state
|| timeline_users.contains(&state_key) || timeline_users.contains(&state_key)
// TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565 // TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
|| *sender_user == state_key || *ctx.sender_user == state_key
{ {
let Some(pdu) = let Some(pdu) =
services().rooms.timeline.get_pdu(&event_id)? services().rooms.timeline.get_pdu(&event_id)?
@ -721,8 +748,8 @@ async fn load_joined_room(
// Reset lazy loading because this is an initial sync // Reset lazy loading because this is an initial sync
services().rooms.lazy_loading.lazy_load_reset( services().rooms.lazy_loading.lazy_load_reset(
sender_user, ctx.sender_user,
sender_device, ctx.sender_device,
room_id, room_id,
)?; )?;
@ -732,11 +759,11 @@ async fn load_joined_room(
.rooms .rooms
.lazy_loading .lazy_loading
.lazy_load_mark_sent( .lazy_load_mark_sent(
sender_user, ctx.sender_user,
sender_device, ctx.sender_device,
room_id, room_id,
lazy_loaded, lazy_loaded,
next_batchcount, ctx.next_batchcount,
) )
.await; .await;
@ -766,7 +793,7 @@ async fn load_joined_room(
.await?; .await?;
for (key, event_id) in current_state_ids { for (key, event_id) in current_state_ids {
if full_state if ctx.full_state
|| since_state_ids.get(&key) != Some(&event_id) || since_state_ids.get(&key) != Some(&event_id)
{ {
let Some(pdu) = let Some(pdu) =
@ -820,7 +847,7 @@ async fn load_joined_room(
) )
})?; })?;
if user_id == sender_user { if user_id == ctx.sender_user {
continue; continue;
} }
@ -837,7 +864,7 @@ async fn load_joined_room(
MembershipState::Join => { MembershipState::Join => {
// A new user joined an encrypted room // A new user joined an encrypted room
if !share_encrypted_room( if !share_encrypted_room(
sender_user, ctx.sender_user,
&user_id, &user_id,
room_id, room_id,
)? { )? {
@ -867,12 +894,16 @@ async fn load_joined_room(
.filter(|user_id| { .filter(|user_id| {
// Don't send key updates from the sender to the // Don't send key updates from the sender to the
// sender // sender
sender_user != user_id ctx.sender_user != *user_id
}) })
.filter(|user_id| { .filter(|user_id| {
// Only send keys if the sender doesn't share an // Only send keys if the sender doesn't share an
// encrypted room with the target already // encrypted room with the target already
!share_encrypted_room(sender_user, user_id, room_id) !share_encrypted_room(
ctx.sender_user,
user_id,
room_id,
)
.unwrap_or(false) .unwrap_or(false)
}), }),
); );
@ -919,11 +950,11 @@ async fn load_joined_room(
} }
if !services().rooms.lazy_loading.lazy_load_was_sent_before( if !services().rooms.lazy_loading.lazy_load_was_sent_before(
sender_user, ctx.sender_user,
sender_device, ctx.sender_device,
room_id, room_id,
&event.sender, &event.sender,
)? || lazy_load_send_redundant )? || ctx.lazy_load_send_redundant
{ {
if let Some(member_event) = if let Some(member_event) =
services().rooms.state_accessor.room_state_get( services().rooms.state_accessor.room_state_get(
@ -942,11 +973,11 @@ async fn load_joined_room(
.rooms .rooms
.lazy_loading .lazy_loading
.lazy_load_mark_sent( .lazy_load_mark_sent(
sender_user, ctx.sender_user,
sender_device, ctx.sender_device,
room_id, room_id,
lazy_loaded, lazy_loaded,
next_batchcount, ctx.next_batchcount,
) )
.await; .await;
@ -964,17 +995,21 @@ async fn load_joined_room(
device_list_updates.extend( device_list_updates.extend(
services() services()
.users .users
.keys_changed(room_id.as_ref(), since, None) .keys_changed(room_id.as_ref(), ctx.since, None)
.filter_map(Result::ok), .filter_map(Result::ok),
); );
let notification_count = send_notification_counts let notification_count = send_notification_counts
.then(|| services().rooms.user.notification_count(sender_user, room_id)) .then(|| {
services().rooms.user.notification_count(ctx.sender_user, room_id)
})
.transpose()? .transpose()?
.map(|x| x.try_into().expect("notification count can't go that high")); .map(|x| x.try_into().expect("notification count can't go that high"));
let highlight_count = send_notification_counts let highlight_count = send_notification_counts
.then(|| services().rooms.user.highlight_count(sender_user, room_id)) .then(|| {
services().rooms.user.highlight_count(ctx.sender_user, room_id)
})
.transpose()? .transpose()?
.map(|x| x.try_into().expect("highlight count can't go that high")); .map(|x| x.try_into().expect("highlight count can't go that high"));
@ -998,12 +1033,14 @@ async fn load_joined_room(
.rooms .rooms
.edus .edus
.read_receipt .read_receipt
.readreceipts_since(room_id, since) .readreceipts_since(room_id, ctx.since)
.filter_map(Result::ok) .filter_map(Result::ok)
.map(|(_, _, v)| v) .map(|(_, _, v)| v)
.collect(); .collect();
if services().rooms.edus.typing.last_typing_update(room_id).await? > since { if services().rooms.edus.typing.last_typing_update(room_id).await?
> ctx.since
{
edus.push( edus.push(
serde_json::from_str( serde_json::from_str(
&serde_json::to_string( &serde_json::to_string(
@ -1019,7 +1056,7 @@ async fn load_joined_room(
// sync // sync
services().rooms.user.associate_token_shortstatehash( services().rooms.user.associate_token_shortstatehash(
room_id, room_id,
next_batch, ctx.next_batch,
current_shortstatehash, current_shortstatehash,
)?; )?;
@ -1027,7 +1064,7 @@ async fn load_joined_room(
account_data: RoomAccountData { account_data: RoomAccountData {
events: services() events: services()
.account_data .account_data
.changes_since(Some(room_id), sender_user, since)? .changes_since(Some(room_id), ctx.sender_user, ctx.since)?
.into_iter() .into_iter()
.filter_map(|(_, v)| { .filter_map(|(_, v)| {
serde_json::from_str(v.json().get()) serde_json::from_str(v.json().get())