implement room.state.(not_)rooms filter on /sync

This commit is contained in:
Benjamin Lee 2024-05-03 17:09:19 -07:00
parent c48abf9f13
commit e6f2b6c9ad
No known key found for this signature in database
GPG key ID: FB9624E2885D55A4
2 changed files with 177 additions and 155 deletions

View file

@ -443,6 +443,8 @@ async fn load_joined_room(
let since_shortstatehash =
services().rooms.user.get_token_shortstatehash(room_id, since)?;
let skip_state_events = !filter.room.state.room_allowed(room_id);
let (
heroes,
joined_member_count,
@ -583,47 +585,51 @@ async fn load_joined_room(
let mut state_events = Vec::new();
let mut lazy_loaded = HashSet::new();
let mut i = 0;
for (shortstatekey, id) in current_state_ids {
let (event_type, state_key) = services()
.rooms
.short
.get_statekey_from_short(shortstatekey)?;
if !skip_state_events {
let mut i = 0;
for (shortstatekey, id) in current_state_ids {
let (event_type, state_key) = services()
.rooms
.short
.get_statekey_from_short(shortstatekey)?;
if event_type != StateEventType::RoomMember {
let Some(pdu) = services().rooms.timeline.get_pdu(&id)?
else {
error!("Pdu in state not found: {}", id);
continue;
};
state_events.push(pdu);
if event_type != StateEventType::RoomMember {
let Some(pdu) =
services().rooms.timeline.get_pdu(&id)?
else {
error!("Pdu in state not found: {}", id);
continue;
};
state_events.push(pdu);
i += 1;
if i % 100 == 0 {
tokio::task::yield_now().await;
}
} else if !lazy_load_enabled
i += 1;
if i % 100 == 0 {
tokio::task::yield_now().await;
}
} else if !lazy_load_enabled
|| full_state
|| timeline_users.contains(&state_key)
// TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
|| *sender_user == state_key
{
let Some(pdu) = services().rooms.timeline.get_pdu(&id)?
else {
error!("Pdu in state not found: {}", id);
continue;
};
{
let Some(pdu) =
services().rooms.timeline.get_pdu(&id)?
else {
error!("Pdu in state not found: {}", id);
continue;
};
// This check is in case a bad user ID made it into the
// database
if let Ok(uid) = UserId::parse(&state_key) {
lazy_loaded.insert(uid);
}
state_events.push(pdu);
// This check is in case a bad user ID made it into the
// database
if let Ok(uid) = UserId::parse(&state_key) {
lazy_loaded.insert(uid);
}
state_events.push(pdu);
i += 1;
if i % 100 == 0 {
tokio::task::yield_now().await;
i += 1;
if i % 100 == 0 {
tokio::task::yield_now().await;
}
}
}
}
@ -792,66 +798,80 @@ async fn load_joined_room(
(None, None, Vec::new())
};
let mut state_events = delta_state_events;
let mut lazy_loaded = HashSet::new();
let state_events = if skip_state_events {
vec![]
} else {
let mut state_events = delta_state_events;
let mut lazy_loaded = HashSet::new();
// Mark all member events we're returning as lazy-loaded
for pdu in &state_events {
if pdu.kind == TimelineEventType::RoomMember {
match UserId::parse(
pdu.state_key
.as_ref()
.expect("State event has state key")
.clone(),
) {
Ok(state_key_userid) => {
lazy_loaded.insert(state_key_userid);
}
Err(e) => {
error!("Invalid state key for member event: {}", e);
// Mark all member events we're returning as lazy-loaded
for pdu in &state_events {
if pdu.kind == TimelineEventType::RoomMember {
match UserId::parse(
pdu.state_key
.as_ref()
.expect("State event has state key")
.clone(),
) {
Ok(state_key_userid) => {
lazy_loaded.insert(state_key_userid);
}
Err(e) => {
error!(
"Invalid state key for member event: {}",
e
);
}
}
}
}
}
// Fetch contextual member state events for events from the
// timeline, and mark them as lazy-loaded as well.
for (_, event) in &timeline_pdus {
if lazy_loaded.contains(&event.sender) {
continue;
}
// Fetch contextual member state events for events from the
// timeline, and mark them as lazy-loaded as
// well.
for (_, event) in &timeline_pdus {
if lazy_loaded.contains(&event.sender) {
continue;
}
if !services().rooms.lazy_loading.lazy_load_was_sent_before(
sender_user,
sender_device,
room_id,
&event.sender,
)? || lazy_load_send_redundant
{
if let Some(member_event) =
services().rooms.state_accessor.room_state_get(
if !services()
.rooms
.lazy_loading
.lazy_load_was_sent_before(
sender_user,
sender_device,
room_id,
&StateEventType::RoomMember,
event.sender.as_str(),
&event.sender,
)?
|| lazy_load_send_redundant
{
lazy_loaded.insert(event.sender.clone());
state_events.push(member_event);
if let Some(member_event) =
services().rooms.state_accessor.room_state_get(
room_id,
&StateEventType::RoomMember,
event.sender.as_str(),
)?
{
lazy_loaded.insert(event.sender.clone());
state_events.push(member_event);
}
}
}
}
services()
.rooms
.lazy_loading
.lazy_load_mark_sent(
sender_user,
sender_device,
room_id,
lazy_loaded,
next_batchcount,
)
.await;
services()
.rooms
.lazy_loading
.lazy_load_mark_sent(
sender_user,
sender_device,
room_id,
lazy_loaded,
next_batchcount,
)
.await;
state_events
};
(
heroes,
@ -1026,7 +1046,11 @@ async fn handle_left_room(
}
};
if !services().rooms.metadata.exists(&room_id)? {
let state = if !filter.room.state.room_allowed(&room_id) {
State {
events: vec![],
}
} else if !services().rooms.metadata.exists(&room_id)? {
// This is just a rejected invite, not a room we know
let event = PduEvent {
event_id: EventId::new(services().globals.server_name()).into(),
@ -1051,90 +1075,88 @@ async fn handle_left_room(
signatures: None,
};
left_rooms.insert(
room_id,
LeftRoom {
account_data: RoomAccountData {
events: Vec::new(),
},
timeline,
state: State {
events: vec![event.to_sync_state_event()],
},
},
);
State {
events: vec![event.to_sync_state_event()],
}
} else {
let mut left_state_events = Vec::new();
return Ok(());
}
let since_shortstatehash =
services().rooms.user.get_token_shortstatehash(&room_id, since)?;
let mut left_state_events = Vec::new();
let since_state_ids = match since_shortstatehash {
Some(s) => {
services().rooms.state_accessor.state_full_ids(s).await?
}
None => HashMap::new(),
};
let since_shortstatehash =
services().rooms.user.get_token_shortstatehash(&room_id, since)?;
let Some(left_event_id) =
services().rooms.state_accessor.room_state_get_id(
&room_id,
&StateEventType::RoomMember,
sender_user.as_str(),
)?
else {
error!("Left room but no left state event");
return Ok(());
};
let since_state_ids = match since_shortstatehash {
Some(s) => services().rooms.state_accessor.state_full_ids(s).await?,
None => HashMap::new(),
};
let Some(left_shortstatehash) = services()
.rooms
.state_accessor
.pdu_shortstatehash(&left_event_id)?
else {
error!("Leave event has no state");
return Ok(());
};
let Some(left_event_id) =
services().rooms.state_accessor.room_state_get_id(
&room_id,
&StateEventType::RoomMember,
sender_user.as_str(),
)?
else {
error!("Left room but no left state event");
return Ok(());
};
let mut left_state_ids = services()
.rooms
.state_accessor
.state_full_ids(left_shortstatehash)
.await?;
let Some(left_shortstatehash) =
services().rooms.state_accessor.pdu_shortstatehash(&left_event_id)?
else {
error!("Leave event has no state");
return Ok(());
};
let leave_shortstatekey =
services().rooms.short.get_or_create_shortstatekey(
&StateEventType::RoomMember,
sender_user.as_str(),
)?;
let mut left_state_ids = services()
.rooms
.state_accessor
.state_full_ids(left_shortstatehash)
.await?;
left_state_ids.insert(leave_shortstatekey, left_event_id);
let leave_shortstatekey =
services().rooms.short.get_or_create_shortstatekey(
&StateEventType::RoomMember,
sender_user.as_str(),
)?;
let mut i = 0;
for (key, id) in left_state_ids {
if full_state || since_state_ids.get(&key) != Some(&id) {
let (event_type, state_key) =
services().rooms.short.get_statekey_from_short(key)?;
left_state_ids.insert(leave_shortstatekey, left_event_id);
if !lazy_load_enabled
|| event_type != StateEventType::RoomMember
|| full_state
// TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
|| *sender_user == state_key
{
let Some(pdu) = services().rooms.timeline.get_pdu(&id)?
else {
error!("Pdu in state not found: {}", id);
continue;
};
let mut i = 0;
for (key, id) in left_state_ids {
if full_state || since_state_ids.get(&key) != Some(&id) {
let (event_type, state_key) =
services().rooms.short.get_statekey_from_short(key)?;
left_state_events.push(pdu.to_sync_state_event());
if !lazy_load_enabled
|| event_type != StateEventType::RoomMember
|| full_state
// TODO: Delete the following line when this is resolved: https://github.com/vector-im/element-web/issues/22565
|| *sender_user == state_key
{
let Some(pdu) = services().rooms.timeline.get_pdu(&id)? else {
error!("Pdu in state not found: {}", id);
continue;
};
left_state_events.push(pdu.to_sync_state_event());
i += 1;
if i % 100 == 0 {
tokio::task::yield_now().await;
i += 1;
if i % 100 == 0 {
tokio::task::yield_now().await;
}
}
}
}
}
State {
events: left_state_events,
}
};
left_rooms.insert(
room_id.clone(),
@ -1143,9 +1165,7 @@ async fn handle_left_room(
events: Vec::new(),
},
timeline,
state: State {
events: left_state_events,
},
state,
},
);

View file

@ -159,6 +159,7 @@ pub(crate) struct CompiledFilterDefinition<'a> {
pub(crate) struct CompiledRoomFilter<'a> {
rooms: AllowDenyList<'a, RoomId>,
pub(crate) timeline: CompiledRoomEventFilter<'a>,
pub(crate) state: CompiledRoomEventFilter<'a>,
}
pub(crate) struct CompiledRoomEventFilter<'a> {
@ -197,6 +198,7 @@ impl<'a> TryFrom<&'a RoomFilter> for CompiledRoomFilter<'a> {
&source.not_rooms,
),
timeline: (&source.timeline).try_into()?,
state: (&source.state).try_into()?,
})
}
}