simplified

This commit is contained in:
avdb13 2025-05-17 11:48:14 +00:00
parent 0e977481a1
commit f48ca403cf
7 changed files with 175 additions and 186 deletions

View file

@ -140,17 +140,13 @@ pub(crate) struct KeyValueDatabase {
// LazyLoadedIds = UserId + DeviceId + RoomId + LazyLoadedUserId // LazyLoadedIds = UserId + DeviceId + RoomId + LazyLoadedUserId
pub(super) lazyloadedids: Arc<dyn KvTree>, pub(super) lazyloadedids: Arc<dyn KvTree>,
// NotifyCount = u64
pub(super) userroomid_notificationcount: Arc<dyn KvTree>,
// HightlightCount = u64
pub(super) userroomid_highlightcount: Arc<dyn KvTree>,
// LastNotificationRead = u64 // LastNotificationRead = u64
pub(super) roomuserid_lastnotificationread: Arc<dyn KvTree>, pub(super) roomuserid_lastnotificationread: Arc<dyn KvTree>,
// UserPduCountId = UserId + PduCount // UserPduCountId = UserId + PduCount
pub(super) userpducountid_notifications: Arc<dyn KvTree>, pub(super) userpducountid_notification: Arc<dyn KvTree>,
// UserRoomPduCountId = UserId + RoomId + PduCount
pub(super) userroompducountid: Arc<dyn KvTree>,
/// Remember the current state hash of a room. /// Remember the current state hash of a room.
pub(super) roomid_shortstatehash: Arc<dyn KvTree>, pub(super) roomid_shortstatehash: Arc<dyn KvTree>,
@ -395,12 +391,11 @@ impl KeyValueDatabase {
lazyloadedids: builder.open_tree("lazyloadedids")?, lazyloadedids: builder.open_tree("lazyloadedids")?,
userroomid_notificationcount: builder
.open_tree("userroomid_notificationcount")?,
userroomid_highlightcount: builder
.open_tree("userroomid_highlightcount")?,
roomuserid_lastnotificationread: builder roomuserid_lastnotificationread: builder
.open_tree("userroomid_highlightcount")?, .open_tree("roomuserid_lastnotificationread")?,
userpducountid_notification: builder
.open_tree("userpducountid_notifications")?,
userroompducountid: builder.open_tree("userroompducountid")?,
statekey_shortstatekey: builder statekey_shortstatekey: builder
.open_tree("statekey_shortstatekey")?, .open_tree("statekey_shortstatekey")?,
@ -449,8 +444,6 @@ impl KeyValueDatabase {
senderkey_pusher: builder.open_tree("senderkey_pusher")?, senderkey_pusher: builder.open_tree("senderkey_pusher")?,
global: builder.open_tree("global")?, global: builder.open_tree("global")?,
server_signingkeys: builder.open_tree("server_signingkeys")?, server_signingkeys: builder.open_tree("server_signingkeys")?,
userpducountid_notifications: builder
.open_tree("userpducountid_notifications")?,
}; };
Ok(db) Ok(db)

View file

@ -42,6 +42,7 @@ pub(crate) trait KvTree: Send + Sync {
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>; ) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>;
fn increment(&self, key: &[u8]) -> Result<Vec<u8>>; fn increment(&self, key: &[u8]) -> Result<Vec<u8>>;
#[allow(dead_code)]
fn increment_batch( fn increment_batch(
&self, &self,
iter: &mut dyn Iterator<Item = Vec<u8>>, iter: &mut dyn Iterator<Item = Vec<u8>>,

View file

@ -51,11 +51,20 @@ impl service::globals::Data for KeyValueDatabase {
futures.push(self.userroomid_joined.watch_prefix(&userid_prefix)); futures.push(self.userroomid_joined.watch_prefix(&userid_prefix));
futures.push(self.userroomid_invitestate.watch_prefix(&userid_prefix)); futures.push(self.userroomid_invitestate.watch_prefix(&userid_prefix));
futures.push(self.userroomid_leftstate.watch_prefix(&userid_prefix)); futures.push(self.userroomid_leftstate.watch_prefix(&userid_prefix));
// futures.push(
// self.userroomid_notificationcount.watch_prefix(&userid_prefix),
// );
// futures
// .push(self.userroomid_highlightcount.watch_prefix(&
// userid_prefix));
let localpart_bytes = user_id.localpart().as_bytes().to_vec();
let mut localpart_prefix = localpart_bytes.clone();
localpart_prefix.push(0xFF);
futures.push( futures.push(
self.userroomid_notificationcount.watch_prefix(&userid_prefix), self.userpducountid_notification.watch_prefix(&localpart_prefix),
); );
futures
.push(self.userroomid_highlightcount.watch_prefix(&userid_prefix));
// Events for rooms we are in // Events for rooms we are in
for room_id in services() for room_id in services()

View file

@ -1,8 +1,7 @@
use std::{mem::size_of, sync::Arc}; use std::{mem::size_of, sync::Arc};
use ruma::{ use ruma::{
api::client::error::ErrorKind, CanonicalJsonObject, EventId, OwnedUserId, api::client::error::ErrorKind, CanonicalJsonObject, EventId, RoomId, UserId,
RoomId, UserId,
}; };
use service::rooms::timeline::PduCount; use service::rooms::timeline::PduCount;
@ -255,34 +254,6 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
}), }),
)) ))
} }
fn increment_notification_counts(
&self,
room_id: &RoomId,
notifies: Vec<OwnedUserId>,
highlights: Vec<OwnedUserId>,
) -> Result<()> {
let mut notifies_batch = Vec::new();
let mut highlights_batch = Vec::new();
for user in notifies {
let mut userroom_id = user.as_bytes().to_vec();
userroom_id.push(0xFF);
userroom_id.extend_from_slice(room_id.as_bytes());
notifies_batch.push(userroom_id);
}
for user in highlights {
let mut userroom_id = user.as_bytes().to_vec();
userroom_id.push(0xFF);
userroom_id.extend_from_slice(room_id.as_bytes());
highlights_batch.push(userroom_id);
}
self.userroomid_notificationcount
.increment_batch(&mut notifies_batch.into_iter())?;
self.userroomid_highlightcount
.increment_batch(&mut highlights_batch.into_iter())?;
Ok(())
}
} }
/// Returns the `count` of this pdu's id. /// Returns the `count` of this pdu's id.

View file

@ -1,11 +1,10 @@
use std::mem::size_of;
use ruma::{ use ruma::{
api::client::{ api::client::{
error::ErrorKind, push::get_notifications::v3::Notification, error::ErrorKind, push::get_notifications::v3::Notification,
}, },
push, MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedUserId, RoomId, UInt, push::Action,
UserId, EventId, MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedUserId, RoomId,
UInt, UserId,
}; };
use crate::{ use crate::{
@ -23,53 +22,15 @@ impl service::rooms::user::Data for KeyValueDatabase {
user_id: &UserId, user_id: &UserId,
room_id: &RoomId, room_id: &RoomId,
) -> Result<()> { ) -> Result<()> {
let mut userroom_id = user_id.as_bytes().to_vec();
userroom_id.push(0xFF);
userroom_id.extend_from_slice(room_id.as_bytes());
let mut roomuser_id = room_id.as_bytes().to_vec(); let mut roomuser_id = room_id.as_bytes().to_vec();
roomuser_id.push(0xFF); roomuser_id.push(0xFF);
roomuser_id.extend_from_slice(user_id.as_bytes()); roomuser_id.extend_from_slice(user_id.as_bytes());
self.userroomid_notificationcount
.insert(&userroom_id, &0_u64.to_be_bytes())?;
self.userroomid_highlightcount
.insert(&userroom_id, &0_u64.to_be_bytes())?;
let next_count = services().globals.next_count()?; let next_count = services().globals.next_count()?;
self.roomuserid_lastnotificationread self.roomuserid_lastnotificationread
.insert(&roomuser_id, &next_count.to_be_bytes())?; .insert(&roomuser_id, &next_count.to_be_bytes())?;
let mut userpducount_id = user_id.localpart().as_bytes().to_vec();
userpducount_id.push(0xFF);
userpducount_id.extend_from_slice(&next_count.to_be_bytes());
let shortroomid =
services().rooms.short.get_shortroomid(room_id)?.ok_or_else(
|| {
Error::bad_database(
"Looked for bad shortroomid for notifications",
)
},
)?;
let it =
self.userpducountid_notifications.iter_from(&userpducount_id, true);
for (k, mut v) in it.filter(|(_, v)| {
v[2] == 1
&& u64::from_be_bytes(
v[3..3 + size_of::<u64>()].try_into().unwrap(),
) == shortroomid.get()
}) {
self.userpducountid_notifications.remove(&k)?;
v[2] = 0;
self.userpducountid_notifications.insert(&k, &v)?;
}
Ok(()) Ok(())
} }
@ -78,18 +39,34 @@ impl service::rooms::user::Data for KeyValueDatabase {
user_id: &UserId, user_id: &UserId,
room_id: &RoomId, room_id: &RoomId,
) -> Result<u64> { ) -> Result<u64> {
let mut userroom_id = user_id.as_bytes().to_vec(); let pdu_count = self.last_notification_read(user_id, room_id)?;
userroom_id.push(0xFF);
userroom_id.extend_from_slice(room_id.as_bytes());
self.userroomid_notificationcount.get(&userroom_id)?.map_or( let mut key = user_id.localpart().as_bytes().to_vec();
Ok(0), key.push(0xFF);
|bytes| {
utils::u64_from_bytes(&bytes).map_err(|_| { key.extend_from_slice(&pdu_count.to_be_bytes());
Error::bad_database("Invalid notification count in db.")
}) let mut it = self.userpducountid_notification.iter_from(&key, false);
},
) it.try_fold(0, |mut n, (_, value)| {
let (flags, value) = value.split_at(2);
let event_id = EventId::parse(
utils::string_from_bytes(
value.split(|b| *b == 0xFF).nth(2).unwrap(),
)
.unwrap(),
)
.unwrap();
let pdu = services().rooms.timeline.get_pdu(&event_id)?;
if flags[0] == 1 && !pdu.map_or(true, |pdu| pdu.is_redacted()) {
n += 1;
}
Ok(n)
})
} }
fn highlight_count( fn highlight_count(
@ -97,18 +74,34 @@ impl service::rooms::user::Data for KeyValueDatabase {
user_id: &UserId, user_id: &UserId,
room_id: &RoomId, room_id: &RoomId,
) -> Result<u64> { ) -> Result<u64> {
let mut userroom_id = user_id.as_bytes().to_vec(); let pdu_count = self.last_notification_read(user_id, room_id)?;
userroom_id.push(0xFF);
userroom_id.extend_from_slice(room_id.as_bytes());
self.userroomid_highlightcount.get(&userroom_id)?.map_or( let mut key = user_id.localpart().as_bytes().to_vec();
Ok(0), key.push(0xFF);
|bytes| {
utils::u64_from_bytes(&bytes).map_err(|_| { key.extend_from_slice(&pdu_count.to_be_bytes());
Error::bad_database("Invalid highlight count in db.")
}) let mut it = self.userpducountid_notification.iter_from(&key, false);
},
) it.try_fold(0, |mut n, (_, value)| {
let (flags, value) = value.split_at(2);
let event_id = EventId::parse(
utils::string_from_bytes(
value.split(|b| *b == 0xFF).nth(2).unwrap(),
)
.unwrap(),
)
.unwrap();
let pdu = services().rooms.timeline.get_pdu(&event_id)?;
if flags[1] == 1 && !pdu.map_or(true, |pdu| pdu.is_redacted()) {
n += 1;
}
Ok(n)
})
} }
fn last_notification_read( fn last_notification_read(
@ -140,7 +133,7 @@ impl service::rooms::user::Data for KeyValueDatabase {
user_id: &UserId, user_id: &UserId,
notify: bool, notify: bool,
highlight: bool, highlight: bool,
actions: &[push::Action], actions: &[Action],
) -> Result<()> { ) -> Result<()> {
let Some(PduCount::Normal(pdu_count)) = let Some(PduCount::Normal(pdu_count)) =
services().rooms.timeline.get_pdu_count(&pdu.event_id)? services().rooms.timeline.get_pdu_count(&pdu.event_id)?
@ -152,42 +145,42 @@ impl service::rooms::user::Data for KeyValueDatabase {
}; };
let mut key = user_id.localpart().as_bytes().to_vec(); let mut key = user_id.localpart().as_bytes().to_vec();
key.push(0xFF); key.push(0xFF);
key.extend_from_slice(&pdu_count.to_be_bytes()); key.extend_from_slice(&pdu_count.to_be_bytes());
let (notify, highlight, unread) = let (notify, is_highlight) = (u8::from(notify), u8::from(highlight));
(u8::from(notify), u8::from(highlight), u8::from(true));
let notification = serde_json::to_vec(&Notification { let mut value = vec![notify, is_highlight];
actions: actions.to_owned(),
event: pdu.to_sync_room_event(),
profile_tag: None,
read: false,
room_id: pdu.room_id.clone(),
// TODO
ts: MilliSecondsSinceUnixEpoch::now(),
})
.expect("Notification should serialize");
let shortroomid = value.extend_from_slice(pdu.room_id.as_bytes());
services().rooms.short.get_shortroomid(&pdu.room_id)?.ok_or_else( value.push(0xFF);
|| {
Error::bad_database(
"Looked for bad shortroomid for notifications",
)
},
)?;
let mut value = vec![notify, highlight, unread]; value.extend_from_slice(pdu.event_id.as_bytes());
value.push(0xFF);
value.extend_from_slice(&shortroomid.get().to_be_bytes()); // TODO: timestamp at which the event notification was sent
let ts = serde_json::to_vec(&MilliSecondsSinceUnixEpoch::now())
.expect("ts should serialize");
value.extend_from_slice(&notification); value.extend_from_slice(&ts);
value.push(0xFF);
self.userpducountid_notifications.insert(&key, &value)?; let actions =
serde_json::to_vec(actions).expect("actions should serialize");
Ok(()) value.extend_from_slice(&actions);
self.userpducountid_notification.insert(&key, &value)?;
let mut key = user_id.localpart().as_bytes().to_vec();
key.push(0xFF);
key.extend_from_slice(pdu.room_id.as_bytes());
key.extend_from_slice(&pdu_count.to_be_bytes());
self.userroompducountid.insert(&key, &[])
} }
fn get_notifications( fn get_notifications(
@ -204,37 +197,70 @@ impl service::rooms::user::Data for KeyValueDatabase {
let limit = limit.and_then(|n| n.try_into().ok()).unwrap_or(50); let limit = limit.and_then(|n| n.try_into().ok()).unwrap_or(50);
let it = self let it = self.userpducountid_notification.iter_from(&key, true);
.userpducountid_notifications
.iter_from(&key, true)
.take_while(move |(_, v)| {
v[0] == 1 // notify
&& (!highlight || v[1] == 1) // highlight
&& v[2] == 1 // unread
})
.take(limit + 1);
let mut notifications: Vec<_> = it let mut notifications = Vec::new();
.map(|(k, v)| {
(
k,
Notification {
read: v[2] != 1,
..serde_json::from_slice(&v[3 + size_of::<u64>()..])
.unwrap()
},
)
})
.collect();
let next_token = notifications let mut next_token = None;
.pop()
.and_then(|(k, _)| { for (key, value) in it {
k.split(|b| *b == 0xFF).nth(1).map(<[u8]>::to_vec) let pdu_count = utils::u64_from_bytes(
}) key.split(|b| *b == 0xFF).nth(1).unwrap(),
.map(|pdu_count| { )
format!("{}", u64::from_be_bytes(pdu_count.try_into().unwrap())) .unwrap();
});
if notifications.len() == limit {
next_token = Some(format!("{pdu_count}"));
break;
}
let (flags, value) = value.split_at(2);
let (notify, is_highlight) = (flags[0] == 1, flags[1] == 1);
if !notify || highlight && !is_highlight {
continue;
}
let mut it = value.split(|b| *b == 0xFF);
let room_id = RoomId::parse(
utils::string_from_bytes(it.next().unwrap()).unwrap(),
)
.unwrap();
let event_id = EventId::parse(
utils::string_from_bytes(it.next().unwrap()).unwrap(),
)
.unwrap();
let pdu = services().rooms.timeline.get_pdu(&event_id)?;
// do not bother with missing or redacted PDUs
let Some(pdu) = pdu.filter(|pdu| pdu.is_redacted()) else {
continue;
};
let ts: MilliSecondsSinceUnixEpoch =
serde_json::from_slice(it.next().unwrap()).unwrap();
let actions: Vec<Action> =
serde_json::from_slice(it.next().unwrap()).unwrap();
notifications.push((
key,
Notification {
actions,
event: pdu.to_sync_room_event(),
profile_tag: None,
read: pdu_count
< self.last_notification_read(user_id, &room_id)?,
room_id,
ts,
},
));
}
Ok((notifications.into_iter().map(|(_, n)| n).collect(), next_token)) Ok((notifications.into_iter().map(|(_, n)| n).collect(), next_token))
} }

View file

@ -467,23 +467,19 @@ impl Service {
highlights.push(user.clone()); highlights.push(user.clone());
} }
// Store push action if notify || highlight {
services() // Store push action
.rooms services()
.user .rooms
.store_push_action(pdu, user, notify, highlight, actions)?; .user
.store_push_action(pdu, user, notify, highlight, actions)?;
}
for push_key in services().pusher.get_pushkeys(user) { for push_key in services().pusher.get_pushkeys(user) {
services().sending.send_push_pdu(&pdu_id, user, push_key?)?; services().sending.send_push_pdu(&pdu_id, user, push_key?)?;
} }
} }
self.db.increment_notification_counts(
&pdu.room_id,
notifies,
highlights,
)?;
match pdu.kind { match pdu.kind {
TimelineEventType::RoomRedaction => { TimelineEventType::RoomRedaction => {
let room_version_id = services() let room_version_id = services()

View file

@ -1,6 +1,6 @@
use std::sync::Arc; use std::sync::Arc;
use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId}; use ruma::{CanonicalJsonObject, EventId, RoomId, UserId};
use super::PduCount; use super::PduCount;
use crate::{service::rooms::timeline::PduId, PduEvent, Result}; use crate::{service::rooms::timeline::PduId, PduEvent, Result};
@ -91,11 +91,4 @@ pub(crate) trait Data: Send + Sync {
room_id: &RoomId, room_id: &RoomId,
from: PduCount, from: PduCount,
) -> Result<Box<dyn Iterator<Item = Result<(PduCount, PduEvent)>> + 'a>>; ) -> Result<Box<dyn Iterator<Item = Result<(PduCount, PduEvent)>> + 'a>>;
fn increment_notification_counts(
&self,
room_id: &RoomId,
notifies: Vec<OwnedUserId>,
highlights: Vec<OwnedUserId>,
) -> Result<()>;
} }