SSS: split up sync_events_v5_route()

This commit is contained in:
Lambda 2025-03-27 20:02:16 +00:00
parent 9405e5f16c
commit 7558d3456b

View file

@ -111,21 +111,149 @@ pub(crate) async fn sync_events_v5_route(
)?; )?;
} }
// Users that have left any encrypted rooms the sender was in #[allow(clippy::if_then_some_else_none)]
let mut left_encrypted_users = HashSet::new(); let device_lists = if body.extensions.e2ee.enabled.unwrap_or(false) {
let mut device_list_changes = HashSet::new(); Some(get_e2ee_data(&sender_user, globalsince, &all_joined_rooms).await?)
let mut device_list_left = HashSet::new(); } else {
None
};
if body.extensions.e2ee.enabled.unwrap_or(false) { // and required state
// Look for device list updates of this account let mut todo_rooms: BTreeMap<OwnedRoomId, TodoRoom> = BTreeMap::new();
device_list_changes.extend(
services() let lists = body
.users .lists
.keys_changed(sender_user.as_ref(), globalsince, None) .into_iter()
.filter_map(Result::ok), .filter_map(|(list_id, list)| {
let rooms = rooms_in_list(
&list_id,
list,
&all_joined_rooms,
&known_rooms,
&mut todo_rooms,
)?;
Some((list_id, rooms))
})
.collect();
for (room_id, room) in &body.room_subscriptions {
if !services().rooms.metadata.exists(room_id)? {
warn!(room_id = room_id.as_str(), "Subscribed room does not exist");
continue;
}
todo_rooms.entry(room_id.clone()).or_default().update(
room.required_state.clone(),
room.timeline_limit,
&known_rooms,
room_id,
);
}
services().users.update_sync_known_rooms(
connection_key.clone(),
todo_rooms.keys().cloned().collect(),
globalsince,
); );
for room_id in &all_joined_rooms { let mut rooms = BTreeMap::new();
for (room_id, todo_room) in todo_rooms {
if let Some(room) = process_room(&sender_user, &room_id, &todo_room)? {
rooms.insert(room_id.clone(), room);
}
}
if rooms
.iter()
.all(|(_, r)| r.timeline.is_empty() && r.required_state.is_empty())
{
// Hang a few seconds so requests are not spammed
// Stop hanging if new info arrives
let mut duration = body.timeout.unwrap_or(Duration::from_secs(30));
if duration.as_secs() > 30 {
duration = Duration::from_secs(30);
}
match tokio::time::timeout(duration, watcher).await {
Ok(x) => x.expect("watcher should succeed"),
Err(error) => debug!(%error, "Timed out"),
};
}
Ok(Ra(sync_events::v5::Response {
txn_id: body.txn_id.clone(),
pos: next_batch.to_string(),
lists,
rooms,
extensions: sync_events::v5::response::Extensions {
to_device: body
.extensions
.to_device
.enabled
.unwrap_or(false)
.then(|| {
services()
.users
.get_to_device_events(&sender_user, &sender_device)
.map(|events| sync_events::v5::response::ToDevice {
events,
next_batch: next_batch.to_string(),
})
})
.transpose()?,
e2ee: sync_events::v5::response::E2EE {
device_lists: device_lists.unwrap_or_default(),
device_one_time_keys_count: services()
.users
.count_one_time_keys(&sender_user, &sender_device)?,
// Fallback keys are not yet supported
device_unused_fallback_key_types: None,
},
account_data: sync_events::v5::response::AccountData {
global: if body.extensions.account_data.enabled.unwrap_or(false)
{
services()
.account_data
.global_changes_since(&sender_user, globalsince)?
.into_iter()
.map(|(event_type, content)| {
account_data::raw_global_event_from_parts(
&event_type,
&content,
)
})
.collect()
} else {
Vec::new()
},
rooms: BTreeMap::new(),
},
receipts: sync_events::v5::response::Receipts {
rooms: BTreeMap::new(),
},
typing: sync_events::v5::response::Typing {
rooms: BTreeMap::new(),
},
},
}))
}
#[allow(clippy::too_many_lines)]
#[tracing::instrument(skip_all)]
async fn get_e2ee_data(
sender_user: &UserId,
globalsince: u64,
all_joined_rooms: &[OwnedRoomId],
) -> Result<DeviceLists> {
// Users that have left any encrypted rooms the sender was in
let mut left_encrypted_users = HashSet::new();
// Look for device list updates of this account
let mut device_list_changes: HashSet<_> = services()
.users
.keys_changed(sender_user.as_ref(), globalsince, None)
.filter_map(Result::ok)
.collect();
for room_id in all_joined_rooms {
let Some(current_shortstatehash) = let Some(current_shortstatehash) =
services().rooms.state.get_room_shortstatehash(room_id)? services().rooms.state.get_room_shortstatehash(room_id)?
else { else {
@ -176,8 +304,7 @@ pub(crate) async fn sync_events_v5_route(
continue; continue;
} }
let since_encryption = let since_encryption = services().rooms.state_accessor.state_get(
services().rooms.state_accessor.state_get(
since_shortstatehash, since_shortstatehash,
&StateEventType::RoomEncryption, &StateEventType::RoomEncryption,
"", "",
@ -212,12 +339,10 @@ pub(crate) async fn sync_events_v5_route(
}; };
if pdu.kind == TimelineEventType::RoomMember { if pdu.kind == TimelineEventType::RoomMember {
if let Some(state_key) = &pdu.state_key { if let Some(state_key) = &pdu.state_key {
let user_id = let user_id = UserId::parse(state_key.clone())
UserId::parse(state_key.clone())
.map_err(|_| { .map_err(|_| {
Error::bad_database( Error::bad_database(
"Invalid UserId in member \ "Invalid UserId in member PDU.",
PDU.",
) )
})?; })?;
@ -225,8 +350,7 @@ pub(crate) async fn sync_events_v5_route(
continue; continue;
} }
let new_membership = let new_membership = serde_json::from_str::<
serde_json::from_str::<
RoomMemberEventContent, RoomMemberEventContent,
>( >(
pdu.content.get() pdu.content.get()
@ -243,19 +367,17 @@ pub(crate) async fn sync_events_v5_route(
// A new user joined an encrypted // A new user joined an encrypted
// room // room
if !share_encrypted_room( if !share_encrypted_room(
&sender_user, sender_user,
&user_id, &user_id,
room_id, room_id,
)? { )? {
device_list_changes device_list_changes.insert(user_id);
.insert(user_id);
} }
} }
MembershipState::Leave => { MembershipState::Leave => {
// Write down users that have left // Write down users that have left
// encrypted rooms we are in // encrypted rooms we are in
left_encrypted_users left_encrypted_users.insert(user_id);
.insert(user_id);
} }
_ => {} _ => {}
} }
@ -275,14 +397,14 @@ pub(crate) async fn sync_events_v5_route(
.filter(|user_id| { .filter(|user_id| {
// Don't send key updates from the sender to // Don't send key updates from the sender to
// the sender // the sender
&sender_user != user_id sender_user != user_id
}) })
.filter(|user_id| { .filter(|user_id| {
// Only send keys if the sender doesn't // Only send keys if the sender doesn't
// share an encrypted room with the target // share an encrypted room with the target
// already // already
!share_encrypted_room( !share_encrypted_room(
&sender_user, sender_user,
user_id, user_id,
room_id, room_id,
) )
@ -300,11 +422,13 @@ pub(crate) async fn sync_events_v5_route(
.filter_map(Result::ok), .filter_map(Result::ok),
); );
} }
let mut device_list_left = HashSet::new();
for user_id in left_encrypted_users { for user_id in left_encrypted_users {
let dont_share_encrypted_room = services() let dont_share_encrypted_room = services()
.rooms .rooms
.user .user
.get_shared_rooms(vec![sender_user.clone(), user_id.clone()])? .get_shared_rooms(vec![sender_user.to_owned(), user_id.clone()])?
.filter_map(Result::ok) .filter_map(Result::ok)
.filter_map(|other_room_id| { .filter_map(|other_room_id| {
Some( Some(
@ -327,20 +451,31 @@ pub(crate) async fn sync_events_v5_route(
device_list_left.insert(user_id); device_list_left.insert(user_id);
} }
} }
}
let mut lists = BTreeMap::new(); Ok(DeviceLists {
// and required state changed: device_list_changes.into_iter().collect(),
let mut todo_rooms: BTreeMap<OwnedRoomId, TodoRoom> = BTreeMap::new(); left: device_list_left.into_iter().collect(),
})
}
for (list_id, list) in body.lists { #[tracing::instrument(
skip_all,
fields(list_id = list_id, ?list),
)]
fn rooms_in_list(
list_id: &str,
list: sync_events::v5::request::List,
all_joined_rooms: &[OwnedRoomId],
known_rooms: &BTreeMap<OwnedRoomId, u64>,
todo_rooms: &mut BTreeMap<OwnedRoomId, TodoRoom>,
) -> Option<sync_events::v5::response::List> {
trace!(list_id, ?list, "Collecting rooms in list"); trace!(list_id, ?list, "Collecting rooms in list");
if list.filters.and_then(|f| f.is_invite).unwrap_or(false) { if list.filters.and_then(|f| f.is_invite).unwrap_or(false) {
continue; return None;
} }
let mut list_room_ids = BTreeSet::new(); let mut list_room_ids: BTreeSet<OwnedRoomId> = BTreeSet::new();
for (mut from, mut to) in list.ranges { for (mut from, mut to) in list.ranges {
from = from.clamp( from = from.clamp(
uint!(0), uint!(0),
@ -350,8 +485,7 @@ pub(crate) async fn sync_events_v5_route(
from, from,
UInt::try_from(all_joined_rooms.len() - 1).unwrap_or(UInt::MAX), UInt::try_from(all_joined_rooms.len() - 1).unwrap_or(UInt::MAX),
); );
let room_ids = let room_ids = all_joined_rooms[from.try_into().unwrap_or(usize::MAX)
all_joined_rooms[from.try_into().unwrap_or(usize::MAX)
..=to.try_into().unwrap_or(usize::MAX)] ..=to.try_into().unwrap_or(usize::MAX)]
.to_vec(); .to_vec();
list_room_ids.extend(room_ids); list_room_ids.extend(room_ids);
@ -360,59 +494,36 @@ pub(crate) async fn sync_events_v5_route(
todo_rooms.entry(room_id.clone()).or_default().update( todo_rooms.entry(room_id.clone()).or_default().update(
list.room_details.required_state.clone(), list.room_details.required_state.clone(),
list.room_details.timeline_limit, list.room_details.timeline_limit,
&known_rooms, known_rooms,
room_id, room_id,
); );
} }
let num_rooms = list_room_ids.len(); let num_rooms = list_room_ids.len();
trace!(list_id, num_rooms, "Done collecting rooms"); trace!(list_id, num_rooms, "Done collecting rooms");
lists.insert( Some(sync_events::v5::response::List {
list_id.clone(),
sync_events::v5::response::List {
count: UInt::try_from(num_rooms).unwrap_or(UInt::MAX), count: UInt::try_from(num_rooms).unwrap_or(UInt::MAX),
}, })
); }
}
for (room_id, room) in &body.room_subscriptions { #[allow(clippy::too_many_lines)]
if !services().rooms.metadata.exists(room_id)? { #[tracing::instrument(skip(sender_user))]
warn!(room_id = room_id.as_str(), "Subscribed room does not exist"); fn process_room(
continue; sender_user: &UserId,
} room_id: &RoomId,
todo_rooms.entry(room_id.clone()).or_default().update( todo_room: &TodoRoom,
room.required_state.clone(), ) -> Result<Option<sync_events::v5::response::Room>> {
room.timeline_limit,
&known_rooms,
room_id,
);
}
services().users.update_sync_known_rooms(
connection_key.clone(),
todo_rooms.keys().cloned().collect(),
globalsince,
);
let mut rooms = BTreeMap::new();
for (room_id, todo_room) in &todo_rooms {
trace!(
room_id = room_id.as_str(),
?todo_room,
"Processing matched room"
);
let roomsincecount = PduCount::Normal(todo_room.roomsince); let roomsincecount = PduCount::Normal(todo_room.roomsince);
let (timeline_pdus, limited) = load_timeline( let (timeline_pdus, limited) = load_timeline(
&sender_user, sender_user,
room_id, room_id,
roomsincecount, roomsincecount,
todo_room.timeline_limit, todo_room.timeline_limit,
)?; )?;
if todo_room.roomsince != 0 && timeline_pdus.is_empty() { if todo_room.roomsince != 0 && timeline_pdus.is_empty() {
trace!("No new timeline events, skipping"); return Ok(None);
continue;
} }
let prev_batch = timeline_pdus let prev_batch = timeline_pdus
@ -425,14 +536,11 @@ pub(crate) async fn sync_events_v5_route(
PduCount::Normal(c) => c.to_string(), PduCount::Normal(c) => c.to_string(),
}) })
.or_else(|| { .or_else(|| {
(todo_room.roomsince != 0) (todo_room.roomsince != 0).then(|| todo_room.roomsince.to_string())
.then(|| todo_room.roomsince.to_string())
}); });
let room_events: Vec<_> = timeline_pdus let room_events: Vec<_> =
.iter() timeline_pdus.iter().map(|(_, pdu)| pdu.to_sync_room_event()).collect();
.map(|(_, pdu)| pdu.to_sync_room_event())
.collect();
let required_state = todo_room let required_state = todo_room
.required_state_request .required_state_request
@ -454,7 +562,7 @@ pub(crate) async fn sync_events_v5_route(
.state_cache .state_cache
.room_members(room_id) .room_members(room_id)
.filter_map(Result::ok) .filter_map(Result::ok)
.filter(|member| member != &sender_user) .filter(|member| member != sender_user)
.filter_map(|member| { .filter_map(|member| {
services() services()
.rooms .rooms
@ -506,7 +614,7 @@ pub(crate) async fn sync_events_v5_route(
services() services()
.rooms .rooms
.user .user
.highlight_count(&sender_user, room_id)? .highlight_count(sender_user, room_id)?
.try_into() .try_into()
.expect("notification count can't go that high"), .expect("notification count can't go that high"),
), ),
@ -514,7 +622,7 @@ pub(crate) async fn sync_events_v5_route(
services() services()
.rooms .rooms
.user .user
.notification_count(&sender_user, room_id)? .notification_count(sender_user, room_id)?
.try_into() .try_into()
.expect("notification count can't go that high"), .expect("notification count can't go that high"),
), ),
@ -546,84 +654,7 @@ pub(crate) async fn sync_events_v5_route(
// TODO // TODO
heroes: None, heroes: None,
}; };
trace!(room_id = room_id.as_str(), ?room, "Built room data"); trace!(?room, "Built room data");
rooms.insert(room_id.clone(), room); Ok(Some(room))
}
if rooms
.iter()
.all(|(_, r)| r.timeline.is_empty() && r.required_state.is_empty())
{
// Hang a few seconds so requests are not spammed
// Stop hanging if new info arrives
let mut duration = body.timeout.unwrap_or(Duration::from_secs(30));
if duration.as_secs() > 30 {
duration = Duration::from_secs(30);
}
match tokio::time::timeout(duration, watcher).await {
Ok(x) => x.expect("watcher should succeed"),
Err(error) => debug!(%error, "Timed out"),
};
}
Ok(Ra(sync_events::v5::Response {
txn_id: body.txn_id.clone(),
pos: next_batch.to_string(),
lists,
rooms,
extensions: sync_events::v5::response::Extensions {
to_device: body
.extensions
.to_device
.enabled
.unwrap_or(false)
.then(|| {
services()
.users
.get_to_device_events(&sender_user, &sender_device)
.map(|events| sync_events::v5::response::ToDevice {
events,
next_batch: next_batch.to_string(),
})
})
.transpose()?,
e2ee: sync_events::v5::response::E2EE {
device_lists: DeviceLists {
changed: device_list_changes.into_iter().collect(),
left: device_list_left.into_iter().collect(),
},
device_one_time_keys_count: services()
.users
.count_one_time_keys(&sender_user, &sender_device)?,
// Fallback keys are not yet supported
device_unused_fallback_key_types: None,
},
account_data: sync_events::v5::response::AccountData {
global: if body.extensions.account_data.enabled.unwrap_or(false)
{
services()
.account_data
.global_changes_since(&sender_user, globalsince)?
.into_iter()
.map(|(event_type, content)| {
account_data::raw_global_event_from_parts(
&event_type,
&content,
)
})
.collect()
} else {
Vec::new()
},
rooms: BTreeMap::new(),
},
receipts: sync_events::v5::response::Receipts {
rooms: BTreeMap::new(),
},
typing: sync_events::v5::response::Typing {
rooms: BTreeMap::new(),
},
},
}))
} }