implement per-event timeline filtering on /sync

This is the filter.room.timeline.{senders,types,contains_url} fields, and
their associated not_* pairs.

I decided not to change the `prev_batch` calculation for sliding-sync to
use the new `oldest_event_count` value, because I'm not confident in the
correct behavior. The current sliding-sync behavior is gives `prev_batch
= oldest_event_count` except when there are no new events. In this
case, `oldest_event_count` is `None`, but the current sliding-sync
implementation uses `prev_batch = since`. This is definitely wrong,
because both `since` and `prev_batch` are exclusive bounds. If the
correct thing to do is to return the lower exclusive bound of the range
of events that may have been included in the timeline, then we would
want `since - 1`. The other option would be to return `prev_batch =
None`, like we have in sync v3. I don't know which of these is correct,
so I'm just gonna keep the current (definitely incorrect) behavior to
avoid making things worse.
This commit is contained in:
Benjamin Lee 2024-05-04 00:51:25 -07:00
parent 745eaa9b48
commit 4c9728cbad
No known key found for this signature in database
GPG key ID: FB9624E2885D55A4

View file

@ -35,7 +35,7 @@ use crate::{
services,
utils::{
self,
filter::{AllowDenyList, CompiledFilterDefinition},
filter::{load_limit, AllowDenyList, CompiledFilterDefinition},
},
Ar, Error, PduEvent, Ra, Result,
};
@ -405,7 +405,7 @@ async fn load_joined_room(
drop(insert_lock);
}
let (timeline_pdus, limited) =
let (timeline_pdus, oldest_timeline_event, limited) =
load_timeline(sender_user, room_id, sincecount, 10, Some(filter))?;
let send_notification_counts = !timeline_pdus.is_empty()
@ -901,18 +901,14 @@ async fn load_joined_room(
.transpose()?
.map(|x| x.try_into().expect("highlight count can't go that high"));
let prev_batch = timeline_pdus.first().map_or(
Ok::<_, Error>(None),
|(pdu_count, _)| {
Ok(Some(match pdu_count {
PduCount::Backfilled(_) => {
error!("timeline in backfill state?!");
"0".to_owned()
}
PduCount::Normal(c) => c.to_string(),
}))
},
)?;
let prev_batch = match oldest_timeline_event {
Some(PduCount::Backfilled(_)) => {
error!("timeline in backfill state?!");
Some("0".to_owned())
}
Some(PduCount::Normal(c)) => Some(c.to_string()),
None => None,
};
let room_events: Vec<_> =
timeline_pdus.iter().map(|(_, pdu)| pdu.to_sync_room_event()).collect();
@ -1192,55 +1188,74 @@ async fn handle_left_room(
Ok(())
}
/// Returns `(events, oldest_event_count, limited)`
///
/// These roughly match the fields of
/// [`ruma::api::client::sync::sync_events::v3::Timeline`].
///
/// - `events` is a list of up to `limit` events newer than `roomsincecount`
/// that are allowed by the filter
/// - `oldest_event_count` is the [`PduCount`] of the oldest event examined.
/// This is not necessarily an event included in `events`, because it may
/// have been rejected by a filter.
/// - `limited` is `true` if there may be some allowed events between
/// `romsincecount` and `oldest_event_count`. These are events that were not
/// returned because of `limit` or `load_limit`.
#[allow(clippy::type_complexity)]
fn load_timeline(
sender_user: &UserId,
room_id: &RoomId,
roomsincecount: PduCount,
limit: u64,
filter: Option<&CompiledFilterDefinition<'_>>,
) -> Result<(Vec<(PduCount, PduEvent)>, bool), Error> {
let timeline_pdus;
let limited;
if let Some(filter) = filter {
if !filter.room.timeline.room_allowed(room_id) {
return Ok((vec![], false));
}
}
if services().rooms.timeline.last_timeline_count(sender_user, room_id)?
> roomsincecount
) -> Result<(Vec<(PduCount, PduEvent)>, Option<PduCount>, bool), Error> {
if filter
.map_or(false, |filter| !filter.room.timeline.room_allowed(room_id))
{
let mut non_timeline_pdus = services()
.rooms
.timeline
.pdus_until(sender_user, room_id, PduCount::MAX)?
.filter_map(|r| {
if r.is_err() {
error!("Bad pdu in pdus_since: {:?}", r);
}
r.ok()
})
.take_while(|(pducount, _)| pducount > &roomsincecount);
// Take the last events for the timeline
timeline_pdus = non_timeline_pdus
.by_ref()
.take(limit.try_into().expect("limit should fit in usize"))
.collect::<Vec<_>>()
.into_iter()
.rev()
.collect::<Vec<_>>();
// They /sync response doesn't always return all messages, so we say the
// output is limited unless there are events in
// non_timeline_pdus
limited = non_timeline_pdus.next().is_some();
} else {
timeline_pdus = Vec::new();
limited = false;
// the entire room is rejected by the filter
return Ok((vec![], None, false));
}
Ok((timeline_pdus, limited))
if services().rooms.timeline.last_timeline_count(sender_user, room_id)?
<= roomsincecount
{
// there are no events newer than `roomsincecount`
return Ok((vec![], None, false));
}
let mut non_timeline_pdus = services()
.rooms
.timeline
.pdus_until(sender_user, room_id, PduCount::MAX)?
.filter_map(|r| {
if r.is_err() {
error!("Bad pdu in pdus_since: {:?}", r);
}
r.ok()
})
.take_while(|(pducount, _)| pducount > &roomsincecount);
// Take the last events for the timeline
let mut oldest_event_count = None;
let limit = usize::try_from(limit).unwrap_or(usize::MAX);
let mut timeline_pdus = non_timeline_pdus
.by_ref()
.take(load_limit(limit))
.inspect(|&(pducount, _)| oldest_event_count = Some(pducount))
.filter(|(_, pdu)| {
filter.map_or(true, |filter| {
filter.room.timeline.pdu_event_allowed(pdu)
})
})
.take(limit)
.collect::<Vec<_>>();
timeline_pdus.reverse();
// The /sync response doesn't always return all messages, so we say the
// output is limited unless there are events in
// non_timeline_pdus
let limited = non_timeline_pdus.next().is_some();
Ok((timeline_pdus, oldest_event_count, limited))
}
fn share_encrypted_room(
@ -1677,7 +1692,7 @@ pub(crate) async fn sync_events_v4_route(
{
let roomsincecount = PduCount::Normal(*roomsince);
let (timeline_pdus, limited) = load_timeline(
let (timeline_pdus, _, limited) = load_timeline(
&sender_user,
room_id,
roomsincecount,