From 4c9728cbad4664b256be0adc22daa57a26faa114 Mon Sep 17 00:00:00 2001 From: Benjamin Lee Date: Sat, 4 May 2024 00:51:25 -0700 Subject: [PATCH] implement per-event timeline filtering on /sync This is the filter.room.timeline.{senders,types,contains_url} fields, and their associated not_* pairs. I decided not to change the `prev_batch` calculation for sliding-sync to use the new `oldest_event_count` value, because I'm not confident in the correct behavior. The current sliding-sync behavior is gives `prev_batch = oldest_event_count` except when there are no new events. In this case, `oldest_event_count` is `None`, but the current sliding-sync implementation uses `prev_batch = since`. This is definitely wrong, because both `since` and `prev_batch` are exclusive bounds. If the correct thing to do is to return the lower exclusive bound of the range of events that may have been included in the timeline, then we would want `since - 1`. The other option would be to return `prev_batch = None`, like we have in sync v3. I don't know which of these is correct, so I'm just gonna keep the current (definitely incorrect) behavior to avoid making things worse. --- src/api/client_server/sync.rs | 127 +++++++++++++++++++--------------- 1 file changed, 71 insertions(+), 56 deletions(-) diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index 9d9ccc91..23937b12 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -35,7 +35,7 @@ use crate::{ services, utils::{ self, - filter::{AllowDenyList, CompiledFilterDefinition}, + filter::{load_limit, AllowDenyList, CompiledFilterDefinition}, }, Ar, Error, PduEvent, Ra, Result, }; @@ -405,7 +405,7 @@ async fn load_joined_room( drop(insert_lock); } - let (timeline_pdus, limited) = + let (timeline_pdus, oldest_timeline_event, limited) = load_timeline(sender_user, room_id, sincecount, 10, Some(filter))?; let send_notification_counts = !timeline_pdus.is_empty() @@ -901,18 +901,14 @@ async fn load_joined_room( .transpose()? .map(|x| x.try_into().expect("highlight count can't go that high")); - let prev_batch = timeline_pdus.first().map_or( - Ok::<_, Error>(None), - |(pdu_count, _)| { - Ok(Some(match pdu_count { - PduCount::Backfilled(_) => { - error!("timeline in backfill state?!"); - "0".to_owned() - } - PduCount::Normal(c) => c.to_string(), - })) - }, - )?; + let prev_batch = match oldest_timeline_event { + Some(PduCount::Backfilled(_)) => { + error!("timeline in backfill state?!"); + Some("0".to_owned()) + } + Some(PduCount::Normal(c)) => Some(c.to_string()), + None => None, + }; let room_events: Vec<_> = timeline_pdus.iter().map(|(_, pdu)| pdu.to_sync_room_event()).collect(); @@ -1192,55 +1188,74 @@ async fn handle_left_room( Ok(()) } +/// Returns `(events, oldest_event_count, limited)` +/// +/// These roughly match the fields of +/// [`ruma::api::client::sync::sync_events::v3::Timeline`]. +/// +/// - `events` is a list of up to `limit` events newer than `roomsincecount` +/// that are allowed by the filter +/// - `oldest_event_count` is the [`PduCount`] of the oldest event examined. +/// This is not necessarily an event included in `events`, because it may +/// have been rejected by a filter. +/// - `limited` is `true` if there may be some allowed events between +/// `romsincecount` and `oldest_event_count`. These are events that were not +/// returned because of `limit` or `load_limit`. +#[allow(clippy::type_complexity)] fn load_timeline( sender_user: &UserId, room_id: &RoomId, roomsincecount: PduCount, limit: u64, filter: Option<&CompiledFilterDefinition<'_>>, -) -> Result<(Vec<(PduCount, PduEvent)>, bool), Error> { - let timeline_pdus; - let limited; - - if let Some(filter) = filter { - if !filter.room.timeline.room_allowed(room_id) { - return Ok((vec![], false)); - } - } - - if services().rooms.timeline.last_timeline_count(sender_user, room_id)? - > roomsincecount +) -> Result<(Vec<(PduCount, PduEvent)>, Option, bool), Error> { + if filter + .map_or(false, |filter| !filter.room.timeline.room_allowed(room_id)) { - let mut non_timeline_pdus = services() - .rooms - .timeline - .pdus_until(sender_user, room_id, PduCount::MAX)? - .filter_map(|r| { - if r.is_err() { - error!("Bad pdu in pdus_since: {:?}", r); - } - r.ok() - }) - .take_while(|(pducount, _)| pducount > &roomsincecount); - - // Take the last events for the timeline - timeline_pdus = non_timeline_pdus - .by_ref() - .take(limit.try_into().expect("limit should fit in usize")) - .collect::>() - .into_iter() - .rev() - .collect::>(); - - // They /sync response doesn't always return all messages, so we say the - // output is limited unless there are events in - // non_timeline_pdus - limited = non_timeline_pdus.next().is_some(); - } else { - timeline_pdus = Vec::new(); - limited = false; + // the entire room is rejected by the filter + return Ok((vec![], None, false)); } - Ok((timeline_pdus, limited)) + if services().rooms.timeline.last_timeline_count(sender_user, room_id)? + <= roomsincecount + { + // there are no events newer than `roomsincecount` + return Ok((vec![], None, false)); + } + + let mut non_timeline_pdus = services() + .rooms + .timeline + .pdus_until(sender_user, room_id, PduCount::MAX)? + .filter_map(|r| { + if r.is_err() { + error!("Bad pdu in pdus_since: {:?}", r); + } + r.ok() + }) + .take_while(|(pducount, _)| pducount > &roomsincecount); + + // Take the last events for the timeline + let mut oldest_event_count = None; + let limit = usize::try_from(limit).unwrap_or(usize::MAX); + let mut timeline_pdus = non_timeline_pdus + .by_ref() + .take(load_limit(limit)) + .inspect(|&(pducount, _)| oldest_event_count = Some(pducount)) + .filter(|(_, pdu)| { + filter.map_or(true, |filter| { + filter.room.timeline.pdu_event_allowed(pdu) + }) + }) + .take(limit) + .collect::>(); + timeline_pdus.reverse(); + + // The /sync response doesn't always return all messages, so we say the + // output is limited unless there are events in + // non_timeline_pdus + let limited = non_timeline_pdus.next().is_some(); + + Ok((timeline_pdus, oldest_event_count, limited)) } fn share_encrypted_room( @@ -1677,7 +1692,7 @@ pub(crate) async fn sync_events_v4_route( { let roomsincecount = PduCount::Normal(*roomsince); - let (timeline_pdus, limited) = load_timeline( + let (timeline_pdus, _, limited) = load_timeline( &sender_user, room_id, roomsincecount,