diff --git a/src/database.rs b/src/database.rs index 1f65ecae..92eae0e2 100644 --- a/src/database.rs +++ b/src/database.rs @@ -140,17 +140,13 @@ pub(crate) struct KeyValueDatabase { // LazyLoadedIds = UserId + DeviceId + RoomId + LazyLoadedUserId pub(super) lazyloadedids: Arc, - // NotifyCount = u64 - pub(super) userroomid_notificationcount: Arc, - - // HightlightCount = u64 - pub(super) userroomid_highlightcount: Arc, - // LastNotificationRead = u64 pub(super) roomuserid_lastnotificationread: Arc, // UserPduCountId = UserId + PduCount - pub(super) userpducountid_notifications: Arc, + pub(super) userpducountid_notification: Arc, + // UserRoomPduCountId = UserId + RoomId + PduCount + pub(super) userroompducountid: Arc, /// Remember the current state hash of a room. pub(super) roomid_shortstatehash: Arc, @@ -395,12 +391,11 @@ impl KeyValueDatabase { lazyloadedids: builder.open_tree("lazyloadedids")?, - userroomid_notificationcount: builder - .open_tree("userroomid_notificationcount")?, - userroomid_highlightcount: builder - .open_tree("userroomid_highlightcount")?, roomuserid_lastnotificationread: builder - .open_tree("userroomid_highlightcount")?, + .open_tree("roomuserid_lastnotificationread")?, + userpducountid_notification: builder + .open_tree("userpducountid_notifications")?, + userroompducountid: builder.open_tree("userroompducountid")?, statekey_shortstatekey: builder .open_tree("statekey_shortstatekey")?, @@ -449,8 +444,6 @@ impl KeyValueDatabase { senderkey_pusher: builder.open_tree("senderkey_pusher")?, global: builder.open_tree("global")?, server_signingkeys: builder.open_tree("server_signingkeys")?, - userpducountid_notifications: builder - .open_tree("userpducountid_notifications")?, }; Ok(db) diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index 63714cd3..240165d6 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -42,6 +42,7 @@ pub(crate) trait KvTree: Send + Sync { ) -> Box, Vec)> + 'a>; fn increment(&self, key: &[u8]) -> Result>; + #[allow(dead_code)] fn increment_batch( &self, iter: &mut dyn Iterator>, diff --git a/src/database/key_value/globals.rs b/src/database/key_value/globals.rs index e3293571..3b2348f9 100644 --- a/src/database/key_value/globals.rs +++ b/src/database/key_value/globals.rs @@ -51,11 +51,20 @@ impl service::globals::Data for KeyValueDatabase { futures.push(self.userroomid_joined.watch_prefix(&userid_prefix)); futures.push(self.userroomid_invitestate.watch_prefix(&userid_prefix)); futures.push(self.userroomid_leftstate.watch_prefix(&userid_prefix)); + // futures.push( + // self.userroomid_notificationcount.watch_prefix(&userid_prefix), + // ); + // futures + // .push(self.userroomid_highlightcount.watch_prefix(& + // userid_prefix)); + + let localpart_bytes = user_id.localpart().as_bytes().to_vec(); + let mut localpart_prefix = localpart_bytes.clone(); + localpart_prefix.push(0xFF); + futures.push( - self.userroomid_notificationcount.watch_prefix(&userid_prefix), + self.userpducountid_notification.watch_prefix(&localpart_prefix), ); - futures - .push(self.userroomid_highlightcount.watch_prefix(&userid_prefix)); // Events for rooms we are in for room_id in services() diff --git a/src/database/key_value/rooms/timeline.rs b/src/database/key_value/rooms/timeline.rs index b46e9bfe..1a8417fc 100644 --- a/src/database/key_value/rooms/timeline.rs +++ b/src/database/key_value/rooms/timeline.rs @@ -1,8 +1,7 @@ use std::{mem::size_of, sync::Arc}; use ruma::{ - api::client::error::ErrorKind, CanonicalJsonObject, EventId, OwnedUserId, - RoomId, UserId, + api::client::error::ErrorKind, CanonicalJsonObject, EventId, RoomId, UserId, }; use service::rooms::timeline::PduCount; @@ -255,34 +254,6 @@ impl service::rooms::timeline::Data for KeyValueDatabase { }), )) } - - fn increment_notification_counts( - &self, - room_id: &RoomId, - notifies: Vec, - highlights: Vec, - ) -> Result<()> { - let mut notifies_batch = Vec::new(); - let mut highlights_batch = Vec::new(); - for user in notifies { - let mut userroom_id = user.as_bytes().to_vec(); - userroom_id.push(0xFF); - userroom_id.extend_from_slice(room_id.as_bytes()); - notifies_batch.push(userroom_id); - } - for user in highlights { - let mut userroom_id = user.as_bytes().to_vec(); - userroom_id.push(0xFF); - userroom_id.extend_from_slice(room_id.as_bytes()); - highlights_batch.push(userroom_id); - } - - self.userroomid_notificationcount - .increment_batch(&mut notifies_batch.into_iter())?; - self.userroomid_highlightcount - .increment_batch(&mut highlights_batch.into_iter())?; - Ok(()) - } } /// Returns the `count` of this pdu's id. diff --git a/src/database/key_value/rooms/user.rs b/src/database/key_value/rooms/user.rs index 27fae530..9ebe0189 100644 --- a/src/database/key_value/rooms/user.rs +++ b/src/database/key_value/rooms/user.rs @@ -1,11 +1,10 @@ -use std::mem::size_of; - use ruma::{ api::client::{ error::ErrorKind, push::get_notifications::v3::Notification, }, - push, MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedUserId, RoomId, UInt, - UserId, + push::Action, + EventId, MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedUserId, RoomId, + UInt, UserId, }; use crate::{ @@ -23,53 +22,15 @@ impl service::rooms::user::Data for KeyValueDatabase { user_id: &UserId, room_id: &RoomId, ) -> Result<()> { - let mut userroom_id = user_id.as_bytes().to_vec(); - userroom_id.push(0xFF); - userroom_id.extend_from_slice(room_id.as_bytes()); let mut roomuser_id = room_id.as_bytes().to_vec(); roomuser_id.push(0xFF); roomuser_id.extend_from_slice(user_id.as_bytes()); - self.userroomid_notificationcount - .insert(&userroom_id, &0_u64.to_be_bytes())?; - self.userroomid_highlightcount - .insert(&userroom_id, &0_u64.to_be_bytes())?; - let next_count = services().globals.next_count()?; self.roomuserid_lastnotificationread .insert(&roomuser_id, &next_count.to_be_bytes())?; - let mut userpducount_id = user_id.localpart().as_bytes().to_vec(); - - userpducount_id.push(0xFF); - userpducount_id.extend_from_slice(&next_count.to_be_bytes()); - - let shortroomid = - services().rooms.short.get_shortroomid(room_id)?.ok_or_else( - || { - Error::bad_database( - "Looked for bad shortroomid for notifications", - ) - }, - )?; - - let it = - self.userpducountid_notifications.iter_from(&userpducount_id, true); - - for (k, mut v) in it.filter(|(_, v)| { - v[2] == 1 - && u64::from_be_bytes( - v[3..3 + size_of::()].try_into().unwrap(), - ) == shortroomid.get() - }) { - self.userpducountid_notifications.remove(&k)?; - - v[2] = 0; - - self.userpducountid_notifications.insert(&k, &v)?; - } - Ok(()) } @@ -78,18 +39,34 @@ impl service::rooms::user::Data for KeyValueDatabase { user_id: &UserId, room_id: &RoomId, ) -> Result { - let mut userroom_id = user_id.as_bytes().to_vec(); - userroom_id.push(0xFF); - userroom_id.extend_from_slice(room_id.as_bytes()); + let pdu_count = self.last_notification_read(user_id, room_id)?; - self.userroomid_notificationcount.get(&userroom_id)?.map_or( - Ok(0), - |bytes| { - utils::u64_from_bytes(&bytes).map_err(|_| { - Error::bad_database("Invalid notification count in db.") - }) - }, - ) + let mut key = user_id.localpart().as_bytes().to_vec(); + key.push(0xFF); + + key.extend_from_slice(&pdu_count.to_be_bytes()); + + let mut it = self.userpducountid_notification.iter_from(&key, false); + + it.try_fold(0, |mut n, (_, value)| { + let (flags, value) = value.split_at(2); + + let event_id = EventId::parse( + utils::string_from_bytes( + value.split(|b| *b == 0xFF).nth(2).unwrap(), + ) + .unwrap(), + ) + .unwrap(); + + let pdu = services().rooms.timeline.get_pdu(&event_id)?; + + if flags[0] == 1 && !pdu.map_or(true, |pdu| pdu.is_redacted()) { + n += 1; + } + + Ok(n) + }) } fn highlight_count( @@ -97,18 +74,34 @@ impl service::rooms::user::Data for KeyValueDatabase { user_id: &UserId, room_id: &RoomId, ) -> Result { - let mut userroom_id = user_id.as_bytes().to_vec(); - userroom_id.push(0xFF); - userroom_id.extend_from_slice(room_id.as_bytes()); + let pdu_count = self.last_notification_read(user_id, room_id)?; - self.userroomid_highlightcount.get(&userroom_id)?.map_or( - Ok(0), - |bytes| { - utils::u64_from_bytes(&bytes).map_err(|_| { - Error::bad_database("Invalid highlight count in db.") - }) - }, - ) + let mut key = user_id.localpart().as_bytes().to_vec(); + key.push(0xFF); + + key.extend_from_slice(&pdu_count.to_be_bytes()); + + let mut it = self.userpducountid_notification.iter_from(&key, false); + + it.try_fold(0, |mut n, (_, value)| { + let (flags, value) = value.split_at(2); + + let event_id = EventId::parse( + utils::string_from_bytes( + value.split(|b| *b == 0xFF).nth(2).unwrap(), + ) + .unwrap(), + ) + .unwrap(); + + let pdu = services().rooms.timeline.get_pdu(&event_id)?; + + if flags[1] == 1 && !pdu.map_or(true, |pdu| pdu.is_redacted()) { + n += 1; + } + + Ok(n) + }) } fn last_notification_read( @@ -140,7 +133,7 @@ impl service::rooms::user::Data for KeyValueDatabase { user_id: &UserId, notify: bool, highlight: bool, - actions: &[push::Action], + actions: &[Action], ) -> Result<()> { let Some(PduCount::Normal(pdu_count)) = services().rooms.timeline.get_pdu_count(&pdu.event_id)? @@ -152,42 +145,42 @@ impl service::rooms::user::Data for KeyValueDatabase { }; let mut key = user_id.localpart().as_bytes().to_vec(); - key.push(0xFF); + key.extend_from_slice(&pdu_count.to_be_bytes()); - let (notify, highlight, unread) = - (u8::from(notify), u8::from(highlight), u8::from(true)); + let (notify, is_highlight) = (u8::from(notify), u8::from(highlight)); - let notification = serde_json::to_vec(&Notification { - actions: actions.to_owned(), - event: pdu.to_sync_room_event(), - profile_tag: None, - read: false, - room_id: pdu.room_id.clone(), - // TODO - ts: MilliSecondsSinceUnixEpoch::now(), - }) - .expect("Notification should serialize"); + let mut value = vec![notify, is_highlight]; - let shortroomid = - services().rooms.short.get_shortroomid(&pdu.room_id)?.ok_or_else( - || { - Error::bad_database( - "Looked for bad shortroomid for notifications", - ) - }, - )?; + value.extend_from_slice(pdu.room_id.as_bytes()); + value.push(0xFF); - let mut value = vec![notify, highlight, unread]; + value.extend_from_slice(pdu.event_id.as_bytes()); + value.push(0xFF); - value.extend_from_slice(&shortroomid.get().to_be_bytes()); + // TODO: timestamp at which the event notification was sent + let ts = serde_json::to_vec(&MilliSecondsSinceUnixEpoch::now()) + .expect("ts should serialize"); - value.extend_from_slice(¬ification); + value.extend_from_slice(&ts); + value.push(0xFF); - self.userpducountid_notifications.insert(&key, &value)?; + let actions = + serde_json::to_vec(actions).expect("actions should serialize"); - Ok(()) + value.extend_from_slice(&actions); + + self.userpducountid_notification.insert(&key, &value)?; + + let mut key = user_id.localpart().as_bytes().to_vec(); + key.push(0xFF); + + key.extend_from_slice(pdu.room_id.as_bytes()); + + key.extend_from_slice(&pdu_count.to_be_bytes()); + + self.userroompducountid.insert(&key, &[]) } fn get_notifications( @@ -204,37 +197,70 @@ impl service::rooms::user::Data for KeyValueDatabase { let limit = limit.and_then(|n| n.try_into().ok()).unwrap_or(50); - let it = self - .userpducountid_notifications - .iter_from(&key, true) - .take_while(move |(_, v)| { - v[0] == 1 // notify - && (!highlight || v[1] == 1) // highlight - && v[2] == 1 // unread - }) - .take(limit + 1); + let it = self.userpducountid_notification.iter_from(&key, true); - let mut notifications: Vec<_> = it - .map(|(k, v)| { - ( - k, - Notification { - read: v[2] != 1, - ..serde_json::from_slice(&v[3 + size_of::()..]) - .unwrap() - }, - ) - }) - .collect(); + let mut notifications = Vec::new(); - let next_token = notifications - .pop() - .and_then(|(k, _)| { - k.split(|b| *b == 0xFF).nth(1).map(<[u8]>::to_vec) - }) - .map(|pdu_count| { - format!("{}", u64::from_be_bytes(pdu_count.try_into().unwrap())) - }); + let mut next_token = None; + + for (key, value) in it { + let pdu_count = utils::u64_from_bytes( + key.split(|b| *b == 0xFF).nth(1).unwrap(), + ) + .unwrap(); + + if notifications.len() == limit { + next_token = Some(format!("{pdu_count}")); + + break; + } + + let (flags, value) = value.split_at(2); + + let (notify, is_highlight) = (flags[0] == 1, flags[1] == 1); + + if !notify || highlight && !is_highlight { + continue; + } + + let mut it = value.split(|b| *b == 0xFF); + + let room_id = RoomId::parse( + utils::string_from_bytes(it.next().unwrap()).unwrap(), + ) + .unwrap(); + + let event_id = EventId::parse( + utils::string_from_bytes(it.next().unwrap()).unwrap(), + ) + .unwrap(); + + let pdu = services().rooms.timeline.get_pdu(&event_id)?; + + // do not bother with missing or redacted PDUs + let Some(pdu) = pdu.filter(|pdu| pdu.is_redacted()) else { + continue; + }; + + let ts: MilliSecondsSinceUnixEpoch = + serde_json::from_slice(it.next().unwrap()).unwrap(); + + let actions: Vec = + serde_json::from_slice(it.next().unwrap()).unwrap(); + + notifications.push(( + key, + Notification { + actions, + event: pdu.to_sync_room_event(), + profile_tag: None, + read: pdu_count + < self.last_notification_read(user_id, &room_id)?, + room_id, + ts, + }, + )); + } Ok((notifications.into_iter().map(|(_, n)| n).collect(), next_token)) } diff --git a/src/service/rooms/timeline.rs b/src/service/rooms/timeline.rs index bcc23e33..38182a83 100644 --- a/src/service/rooms/timeline.rs +++ b/src/service/rooms/timeline.rs @@ -467,23 +467,19 @@ impl Service { highlights.push(user.clone()); } - // Store push action - services() - .rooms - .user - .store_push_action(pdu, user, notify, highlight, actions)?; + if notify || highlight { + // Store push action + services() + .rooms + .user + .store_push_action(pdu, user, notify, highlight, actions)?; + } for push_key in services().pusher.get_pushkeys(user) { services().sending.send_push_pdu(&pdu_id, user, push_key?)?; } } - self.db.increment_notification_counts( - &pdu.room_id, - notifies, - highlights, - )?; - match pdu.kind { TimelineEventType::RoomRedaction => { let room_version_id = services() diff --git a/src/service/rooms/timeline/data.rs b/src/service/rooms/timeline/data.rs index 49b0c8a2..f32b946f 100644 --- a/src/service/rooms/timeline/data.rs +++ b/src/service/rooms/timeline/data.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId}; +use ruma::{CanonicalJsonObject, EventId, RoomId, UserId}; use super::PduCount; use crate::{service::rooms::timeline::PduId, PduEvent, Result}; @@ -91,11 +91,4 @@ pub(crate) trait Data: Send + Sync { room_id: &RoomId, from: PduCount, ) -> Result> + 'a>>; - - fn increment_notification_counts( - &self, - room_id: &RoomId, - notifies: Vec, - highlights: Vec, - ) -> Result<()>; }