mirror of
https://gitlab.computer.surgery/matrix/grapevine.git
synced 2025-12-16 23:31:24 +01:00
sync/v3: factor out into separate functions
This is both easier to read and produces much better tracing spans.
This commit is contained in:
parent
8a7f87e9b4
commit
daceadb310
1 changed files with 256 additions and 224 deletions
|
|
@ -21,7 +21,7 @@ use ruma::{
|
|||
room::member::{MembershipState, RoomMemberEventContent},
|
||||
StateEventType, TimelineEventType,
|
||||
},
|
||||
uint, DeviceId, EventId, OwnedUserId, RoomId, UInt, UserId,
|
||||
uint, DeviceId, EventId, OwnedRoomId, OwnedUserId, RoomId, UInt, UserId,
|
||||
};
|
||||
use tracing::{debug, error, field};
|
||||
|
||||
|
|
@ -174,229 +174,14 @@ pub(crate) async fn sync_events_route(
|
|||
.filter_map(Result::ok),
|
||||
);
|
||||
|
||||
let mut joined_rooms = BTreeMap::new();
|
||||
let all_joined_rooms = services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.rooms_joined(ctx.sender_user)
|
||||
.collect::<Vec<_>>();
|
||||
for room_id in all_joined_rooms {
|
||||
let room_id = room_id?;
|
||||
if let Ok(joined_room) = load_joined_room(
|
||||
&ctx,
|
||||
&room_id,
|
||||
&mut device_list_updates,
|
||||
&mut left_encrypted_users,
|
||||
)
|
||||
.await
|
||||
{
|
||||
if !joined_room.is_empty() {
|
||||
joined_rooms.insert(room_id.clone(), joined_room);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut left_rooms = BTreeMap::new();
|
||||
let all_left_rooms: Vec<_> =
|
||||
services().rooms.state_cache.rooms_left(ctx.sender_user).collect();
|
||||
for result in all_left_rooms {
|
||||
let (room_id, _) = result?;
|
||||
|
||||
{
|
||||
// Get and drop the lock to wait for remaining operations to finish
|
||||
let room_token = services()
|
||||
.globals
|
||||
.roomid_mutex_insert
|
||||
.lock_key(room_id.clone())
|
||||
.await;
|
||||
drop(room_token);
|
||||
}
|
||||
|
||||
let left_count = services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.get_left_count(&room_id, ctx.sender_user)?;
|
||||
|
||||
// Left before last sync
|
||||
if Some(ctx.since) >= left_count {
|
||||
continue;
|
||||
}
|
||||
|
||||
if !services().rooms.metadata.exists(&room_id)? {
|
||||
// This is just a rejected invite, not a room we know
|
||||
let event = PduEvent {
|
||||
event_id: EventId::new(services().globals.server_name()).into(),
|
||||
sender: ctx.sender_user.to_owned(),
|
||||
origin_server_ts: utils::millis_since_unix_epoch()
|
||||
.try_into()
|
||||
.expect("Timestamp is valid js_int value"),
|
||||
kind: TimelineEventType::RoomMember,
|
||||
content: serde_json::from_str(r#"{ "membership": "leave"}"#)
|
||||
.unwrap(),
|
||||
state_key: Some(ctx.sender_user.to_string()),
|
||||
unsigned: None,
|
||||
// The following keys are dropped on conversion
|
||||
room_id: room_id.clone(),
|
||||
prev_events: vec![],
|
||||
depth: uint!(1),
|
||||
auth_events: vec![],
|
||||
redacts: None,
|
||||
hashes: EventHash {
|
||||
sha256: String::new(),
|
||||
},
|
||||
signatures: None,
|
||||
};
|
||||
|
||||
left_rooms.insert(
|
||||
room_id,
|
||||
LeftRoom {
|
||||
account_data: RoomAccountData {
|
||||
events: Vec::new(),
|
||||
},
|
||||
timeline: Timeline {
|
||||
limited: false,
|
||||
prev_batch: Some(ctx.next_batch_string.clone()),
|
||||
events: Vec::new(),
|
||||
},
|
||||
state: State {
|
||||
events: vec![event.to_sync_state_event()],
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut left_state_events = Vec::new();
|
||||
|
||||
let since_shortstatehash = services()
|
||||
.rooms
|
||||
.user
|
||||
.get_token_shortstatehash(&room_id, ctx.since)?;
|
||||
|
||||
let since_state_ids = match since_shortstatehash {
|
||||
Some(s) => {
|
||||
services().rooms.state_accessor.state_full_ids(s).await?
|
||||
}
|
||||
None => HashMap::new(),
|
||||
};
|
||||
|
||||
let Some(left_event_id) =
|
||||
services().rooms.state_accessor.room_state_get_id(
|
||||
&room_id,
|
||||
&StateEventType::RoomMember,
|
||||
ctx.sender_user.as_str(),
|
||||
)?
|
||||
else {
|
||||
error!("Left room but no left state event");
|
||||
continue;
|
||||
};
|
||||
|
||||
let Some(left_shortstatehash) = services()
|
||||
.rooms
|
||||
.state_accessor
|
||||
.pdu_shortstatehash(&left_event_id)?
|
||||
else {
|
||||
error!("Leave event has no state");
|
||||
continue;
|
||||
};
|
||||
|
||||
let mut left_state_ids = services()
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_full_ids(left_shortstatehash)
|
||||
.await?;
|
||||
|
||||
let leave_shortstatekey =
|
||||
services().rooms.short.get_or_create_shortstatekey(
|
||||
&StateEventType::RoomMember,
|
||||
ctx.sender_user.as_str(),
|
||||
)?;
|
||||
|
||||
left_state_ids.insert(leave_shortstatekey, left_event_id);
|
||||
|
||||
let mut i = 0;
|
||||
for (key, event_id) in left_state_ids {
|
||||
if ctx.full_state || since_state_ids.get(&key) != Some(&event_id) {
|
||||
let (event_type, state_key) =
|
||||
services().rooms.short.get_statekey_from_short(key)?;
|
||||
|
||||
if !ctx.lazy_load_enabled
|
||||
|| event_type != StateEventType::RoomMember
|
||||
|| ctx.full_state
|
||||
// TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
|
||||
|| *ctx.sender_user == state_key
|
||||
{
|
||||
let Some(pdu) =
|
||||
services().rooms.timeline.get_pdu(&event_id)?
|
||||
else {
|
||||
error!(%event_id, "Event in state not found");
|
||||
continue;
|
||||
};
|
||||
|
||||
left_state_events.push(pdu.to_sync_state_event());
|
||||
|
||||
i += 1;
|
||||
if i % 100 == 0 {
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
left_rooms.insert(
|
||||
room_id.clone(),
|
||||
LeftRoom {
|
||||
account_data: RoomAccountData {
|
||||
events: Vec::new(),
|
||||
},
|
||||
timeline: Timeline {
|
||||
limited: false,
|
||||
prev_batch: Some(ctx.next_batch_string.clone()),
|
||||
events: Vec::new(),
|
||||
},
|
||||
state: State {
|
||||
events: left_state_events,
|
||||
},
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
let mut invited_rooms = BTreeMap::new();
|
||||
let all_invited_rooms: Vec<_> =
|
||||
services().rooms.state_cache.rooms_invited(ctx.sender_user).collect();
|
||||
for result in all_invited_rooms {
|
||||
let (room_id, invite_state_events) = result?;
|
||||
|
||||
{
|
||||
// Get and drop the lock to wait for remaining operations to finish
|
||||
let room_token = services()
|
||||
.globals
|
||||
.roomid_mutex_insert
|
||||
.lock_key(room_id.clone())
|
||||
.await;
|
||||
drop(room_token);
|
||||
}
|
||||
|
||||
let invite_count = services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.get_invite_count(&room_id, ctx.sender_user)?;
|
||||
|
||||
// Invited before last sync
|
||||
if Some(ctx.since) >= invite_count {
|
||||
continue;
|
||||
}
|
||||
|
||||
invited_rooms.insert(
|
||||
room_id.clone(),
|
||||
InvitedRoom {
|
||||
invite_state: InviteState {
|
||||
events: invite_state_events,
|
||||
},
|
||||
},
|
||||
);
|
||||
}
|
||||
let joined_rooms = collect_joined_rooms(
|
||||
&ctx,
|
||||
&mut device_list_updates,
|
||||
&mut left_encrypted_users,
|
||||
)
|
||||
.await?;
|
||||
let left_rooms = collect_left_rooms(&ctx).await?;
|
||||
let invited_rooms = collect_invited_rooms(&ctx).await?;
|
||||
|
||||
for user_id in left_encrypted_users {
|
||||
let dont_share_encrypted_room = services()
|
||||
|
|
@ -500,6 +285,37 @@ pub(crate) async fn sync_events_route(
|
|||
Ok(Ra(response))
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn collect_joined_rooms(
|
||||
ctx: &SyncContext<'_>,
|
||||
device_list_updates: &mut HashSet<OwnedUserId>,
|
||||
left_encrypted_users: &mut HashSet<OwnedUserId>,
|
||||
) -> Result<BTreeMap<OwnedRoomId, JoinedRoom>> {
|
||||
let mut joined_rooms = BTreeMap::new();
|
||||
let all_joined_rooms = services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.rooms_joined(ctx.sender_user)
|
||||
.collect::<Vec<_>>();
|
||||
for room_id in all_joined_rooms {
|
||||
let room_id = room_id?;
|
||||
if let Ok(joined_room) = load_joined_room(
|
||||
ctx,
|
||||
&room_id,
|
||||
device_list_updates,
|
||||
left_encrypted_users,
|
||||
)
|
||||
.await
|
||||
{
|
||||
if !joined_room.is_empty() {
|
||||
joined_rooms.insert(room_id.clone(), joined_room);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(joined_rooms)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, fields(room_id = %room_id))]
|
||||
#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
|
||||
async fn load_joined_room(
|
||||
|
|
@ -1104,3 +920,219 @@ async fn load_joined_room(
|
|||
unread_thread_notifications: BTreeMap::new(),
|
||||
})
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn collect_left_rooms(
|
||||
ctx: &SyncContext<'_>,
|
||||
) -> Result<BTreeMap<OwnedRoomId, LeftRoom>> {
|
||||
let mut left_rooms = BTreeMap::new();
|
||||
let all_left_rooms: Vec<_> =
|
||||
services().rooms.state_cache.rooms_left(ctx.sender_user).collect();
|
||||
for result in all_left_rooms {
|
||||
let (room_id, _) = result?;
|
||||
|
||||
{
|
||||
// Get and drop the lock to wait for remaining operations to finish
|
||||
let room_token = services()
|
||||
.globals
|
||||
.roomid_mutex_insert
|
||||
.lock_key(room_id.clone())
|
||||
.await;
|
||||
drop(room_token);
|
||||
}
|
||||
|
||||
let left_count = services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.get_left_count(&room_id, ctx.sender_user)?;
|
||||
|
||||
// Left before last sync
|
||||
if Some(ctx.since) >= left_count {
|
||||
continue;
|
||||
}
|
||||
|
||||
if !services().rooms.metadata.exists(&room_id)? {
|
||||
// This is just a rejected invite, not a room we know
|
||||
let event = PduEvent {
|
||||
event_id: EventId::new(services().globals.server_name()).into(),
|
||||
sender: ctx.sender_user.to_owned(),
|
||||
origin_server_ts: utils::millis_since_unix_epoch()
|
||||
.try_into()
|
||||
.expect("Timestamp is valid js_int value"),
|
||||
kind: TimelineEventType::RoomMember,
|
||||
content: serde_json::from_str(r#"{ "membership": "leave"}"#)
|
||||
.unwrap(),
|
||||
state_key: Some(ctx.sender_user.to_string()),
|
||||
unsigned: None,
|
||||
// The following keys are dropped on conversion
|
||||
room_id: room_id.clone(),
|
||||
prev_events: vec![],
|
||||
depth: uint!(1),
|
||||
auth_events: vec![],
|
||||
redacts: None,
|
||||
hashes: EventHash {
|
||||
sha256: String::new(),
|
||||
},
|
||||
signatures: None,
|
||||
};
|
||||
|
||||
left_rooms.insert(
|
||||
room_id,
|
||||
LeftRoom {
|
||||
account_data: RoomAccountData {
|
||||
events: Vec::new(),
|
||||
},
|
||||
timeline: Timeline {
|
||||
limited: false,
|
||||
prev_batch: Some(ctx.next_batch_string.clone()),
|
||||
events: Vec::new(),
|
||||
},
|
||||
state: State {
|
||||
events: vec![event.to_sync_state_event()],
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut left_state_events = Vec::new();
|
||||
|
||||
let since_shortstatehash = services()
|
||||
.rooms
|
||||
.user
|
||||
.get_token_shortstatehash(&room_id, ctx.since)?;
|
||||
|
||||
let since_state_ids = match since_shortstatehash {
|
||||
Some(s) => {
|
||||
services().rooms.state_accessor.state_full_ids(s).await?
|
||||
}
|
||||
None => HashMap::new(),
|
||||
};
|
||||
|
||||
let Some(left_event_id) =
|
||||
services().rooms.state_accessor.room_state_get_id(
|
||||
&room_id,
|
||||
&StateEventType::RoomMember,
|
||||
ctx.sender_user.as_str(),
|
||||
)?
|
||||
else {
|
||||
error!("Left room but no left state event");
|
||||
continue;
|
||||
};
|
||||
|
||||
let Some(left_shortstatehash) = services()
|
||||
.rooms
|
||||
.state_accessor
|
||||
.pdu_shortstatehash(&left_event_id)?
|
||||
else {
|
||||
error!("Leave event has no state");
|
||||
continue;
|
||||
};
|
||||
|
||||
let mut left_state_ids = services()
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_full_ids(left_shortstatehash)
|
||||
.await?;
|
||||
|
||||
let leave_shortstatekey =
|
||||
services().rooms.short.get_or_create_shortstatekey(
|
||||
&StateEventType::RoomMember,
|
||||
ctx.sender_user.as_str(),
|
||||
)?;
|
||||
|
||||
left_state_ids.insert(leave_shortstatekey, left_event_id);
|
||||
|
||||
let mut i = 0;
|
||||
for (key, event_id) in left_state_ids {
|
||||
if ctx.full_state || since_state_ids.get(&key) != Some(&event_id) {
|
||||
let (event_type, state_key) =
|
||||
services().rooms.short.get_statekey_from_short(key)?;
|
||||
|
||||
if !ctx.lazy_load_enabled
|
||||
|| event_type != StateEventType::RoomMember
|
||||
|| ctx.full_state
|
||||
// TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
|
||||
|| *ctx.sender_user == state_key
|
||||
{
|
||||
let Some(pdu) =
|
||||
services().rooms.timeline.get_pdu(&event_id)?
|
||||
else {
|
||||
error!(%event_id, "Event in state not found");
|
||||
continue;
|
||||
};
|
||||
|
||||
left_state_events.push(pdu.to_sync_state_event());
|
||||
|
||||
i += 1;
|
||||
if i % 100 == 0 {
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
left_rooms.insert(
|
||||
room_id.clone(),
|
||||
LeftRoom {
|
||||
account_data: RoomAccountData {
|
||||
events: Vec::new(),
|
||||
},
|
||||
timeline: Timeline {
|
||||
limited: false,
|
||||
prev_batch: Some(ctx.next_batch_string.clone()),
|
||||
events: Vec::new(),
|
||||
},
|
||||
state: State {
|
||||
events: left_state_events,
|
||||
},
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
Ok(left_rooms)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn collect_invited_rooms(
|
||||
ctx: &SyncContext<'_>,
|
||||
) -> Result<BTreeMap<OwnedRoomId, InvitedRoom>> {
|
||||
let mut invited_rooms = BTreeMap::new();
|
||||
let all_invited_rooms: Vec<_> =
|
||||
services().rooms.state_cache.rooms_invited(ctx.sender_user).collect();
|
||||
for result in all_invited_rooms {
|
||||
let (room_id, invite_state_events) = result?;
|
||||
|
||||
{
|
||||
// Get and drop the lock to wait for remaining operations to finish
|
||||
let room_token = services()
|
||||
.globals
|
||||
.roomid_mutex_insert
|
||||
.lock_key(room_id.clone())
|
||||
.await;
|
||||
drop(room_token);
|
||||
}
|
||||
|
||||
let invite_count = services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.get_invite_count(&room_id, ctx.sender_user)?;
|
||||
|
||||
// Invited before last sync
|
||||
if Some(ctx.since) >= invite_count {
|
||||
continue;
|
||||
}
|
||||
|
||||
invited_rooms.insert(
|
||||
room_id.clone(),
|
||||
InvitedRoom {
|
||||
invite_state: InviteState {
|
||||
events: invite_state_events,
|
||||
},
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
Ok(invited_rooms)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue