implement per-event filtering for /context

'end_token' and 'start_token' have been refactored a bit because we need
to take the bounds of the examined events *before* filtering, otherwise
we'll send a pagination token to the client that is inside the set of
events we examined on this call. In extreme situations, this may leave a
client unable to make progress at all, because the first event that
matches if filter is more than 'load_limit' away from the base event.

This bug was present before the filtering implementation, but was less
significant because we only dropped events when they were not visible to
the user.
This commit is contained in:
Benjamin Lee 2024-06-03 13:26:09 -07:00
parent fa86a8701d
commit fe5626e93a
No known key found for this signature in database
GPG key ID: FB9624E2885D55A4

View file

@ -107,12 +107,15 @@ pub(crate) async fn get_context_route(
}));
}
let mut start_token = None;
let events_before: Vec<_> = services()
.rooms
.timeline
.pdus_until(sender_user, &room_id, base_token)?
.take(load_limit(half_limit))
.filter_map(Result::ok)
.inspect(|&(count, _)| start_token = Some(count))
.filter(|(_, pdu)| filter.pdu_event_allowed(pdu))
.filter(|(_, pdu)| {
services()
.rooms
@ -123,6 +126,8 @@ pub(crate) async fn get_context_route(
.take(half_limit)
.collect();
let start_token = start_token.map(|token| token.stringify());
for (_, event) in &events_before {
if !services().rooms.lazy_loading.lazy_load_was_sent_before(
sender_user,
@ -135,19 +140,18 @@ pub(crate) async fn get_context_route(
}
}
let start_token = events_before
.last()
.map_or_else(|| base_token.stringify(), |(count, _)| count.stringify());
let events_before: Vec<_> =
events_before.into_iter().map(|(_, pdu)| pdu.to_room_event()).collect();
let mut end_token = None;
let events_after: Vec<_> = services()
.rooms
.timeline
.pdus_after(sender_user, &room_id, base_token)?
.take(load_limit(half_limit))
.filter_map(Result::ok)
.inspect(|&(count, _)| end_token = Some(count))
.filter(|(_, pdu)| filter.pdu_event_allowed(pdu))
.filter(|(_, pdu)| {
services()
.rooms
@ -158,6 +162,8 @@ pub(crate) async fn get_context_route(
.take(half_limit)
.collect();
let end_token = end_token.map(|token| token.stringify());
for (_, event) in &events_after {
if !services().rooms.lazy_loading.lazy_load_was_sent_before(
sender_user,
@ -185,10 +191,6 @@ pub(crate) async fn get_context_route(
let state_ids =
services().rooms.state_accessor.state_full_ids(shortstatehash).await?;
let end_token = events_after
.last()
.map_or_else(|| base_token.stringify(), |(count, _)| count.stringify());
let events_after: Vec<_> =
events_after.into_iter().map(|(_, pdu)| pdu.to_room_event()).collect();
@ -214,8 +216,8 @@ pub(crate) async fn get_context_route(
}
let resp = get_context::v3::Response {
start: Some(start_token),
end: Some(end_token),
start: start_token,
end: end_token,
events_before,
event: Some(base_event),
events_after,