diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index 9d9ccc91..23937b12 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -35,7 +35,7 @@ use crate::{ services, utils::{ self, - filter::{AllowDenyList, CompiledFilterDefinition}, + filter::{load_limit, AllowDenyList, CompiledFilterDefinition}, }, Ar, Error, PduEvent, Ra, Result, }; @@ -405,7 +405,7 @@ async fn load_joined_room( drop(insert_lock); } - let (timeline_pdus, limited) = + let (timeline_pdus, oldest_timeline_event, limited) = load_timeline(sender_user, room_id, sincecount, 10, Some(filter))?; let send_notification_counts = !timeline_pdus.is_empty() @@ -901,18 +901,14 @@ async fn load_joined_room( .transpose()? .map(|x| x.try_into().expect("highlight count can't go that high")); - let prev_batch = timeline_pdus.first().map_or( - Ok::<_, Error>(None), - |(pdu_count, _)| { - Ok(Some(match pdu_count { - PduCount::Backfilled(_) => { - error!("timeline in backfill state?!"); - "0".to_owned() - } - PduCount::Normal(c) => c.to_string(), - })) - }, - )?; + let prev_batch = match oldest_timeline_event { + Some(PduCount::Backfilled(_)) => { + error!("timeline in backfill state?!"); + Some("0".to_owned()) + } + Some(PduCount::Normal(c)) => Some(c.to_string()), + None => None, + }; let room_events: Vec<_> = timeline_pdus.iter().map(|(_, pdu)| pdu.to_sync_room_event()).collect(); @@ -1192,55 +1188,74 @@ async fn handle_left_room( Ok(()) } +/// Returns `(events, oldest_event_count, limited)` +/// +/// These roughly match the fields of +/// [`ruma::api::client::sync::sync_events::v3::Timeline`]. +/// +/// - `events` is a list of up to `limit` events newer than `roomsincecount` +/// that are allowed by the filter +/// - `oldest_event_count` is the [`PduCount`] of the oldest event examined. +/// This is not necessarily an event included in `events`, because it may +/// have been rejected by a filter. +/// - `limited` is `true` if there may be some allowed events between +/// `romsincecount` and `oldest_event_count`. These are events that were not +/// returned because of `limit` or `load_limit`. +#[allow(clippy::type_complexity)] fn load_timeline( sender_user: &UserId, room_id: &RoomId, roomsincecount: PduCount, limit: u64, filter: Option<&CompiledFilterDefinition<'_>>, -) -> Result<(Vec<(PduCount, PduEvent)>, bool), Error> { - let timeline_pdus; - let limited; - - if let Some(filter) = filter { - if !filter.room.timeline.room_allowed(room_id) { - return Ok((vec![], false)); - } - } - - if services().rooms.timeline.last_timeline_count(sender_user, room_id)? - > roomsincecount +) -> Result<(Vec<(PduCount, PduEvent)>, Option, bool), Error> { + if filter + .map_or(false, |filter| !filter.room.timeline.room_allowed(room_id)) { - let mut non_timeline_pdus = services() - .rooms - .timeline - .pdus_until(sender_user, room_id, PduCount::MAX)? - .filter_map(|r| { - if r.is_err() { - error!("Bad pdu in pdus_since: {:?}", r); - } - r.ok() - }) - .take_while(|(pducount, _)| pducount > &roomsincecount); - - // Take the last events for the timeline - timeline_pdus = non_timeline_pdus - .by_ref() - .take(limit.try_into().expect("limit should fit in usize")) - .collect::>() - .into_iter() - .rev() - .collect::>(); - - // They /sync response doesn't always return all messages, so we say the - // output is limited unless there are events in - // non_timeline_pdus - limited = non_timeline_pdus.next().is_some(); - } else { - timeline_pdus = Vec::new(); - limited = false; + // the entire room is rejected by the filter + return Ok((vec![], None, false)); } - Ok((timeline_pdus, limited)) + if services().rooms.timeline.last_timeline_count(sender_user, room_id)? + <= roomsincecount + { + // there are no events newer than `roomsincecount` + return Ok((vec![], None, false)); + } + + let mut non_timeline_pdus = services() + .rooms + .timeline + .pdus_until(sender_user, room_id, PduCount::MAX)? + .filter_map(|r| { + if r.is_err() { + error!("Bad pdu in pdus_since: {:?}", r); + } + r.ok() + }) + .take_while(|(pducount, _)| pducount > &roomsincecount); + + // Take the last events for the timeline + let mut oldest_event_count = None; + let limit = usize::try_from(limit).unwrap_or(usize::MAX); + let mut timeline_pdus = non_timeline_pdus + .by_ref() + .take(load_limit(limit)) + .inspect(|&(pducount, _)| oldest_event_count = Some(pducount)) + .filter(|(_, pdu)| { + filter.map_or(true, |filter| { + filter.room.timeline.pdu_event_allowed(pdu) + }) + }) + .take(limit) + .collect::>(); + timeline_pdus.reverse(); + + // The /sync response doesn't always return all messages, so we say the + // output is limited unless there are events in + // non_timeline_pdus + let limited = non_timeline_pdus.next().is_some(); + + Ok((timeline_pdus, oldest_event_count, limited)) } fn share_encrypted_room( @@ -1677,7 +1692,7 @@ pub(crate) async fn sync_events_v4_route( { let roomsincecount = PduCount::Normal(*roomsince); - let (timeline_pdus, limited) = load_timeline( + let (timeline_pdus, _, limited) = load_timeline( &sender_user, room_id, roomsincecount,