implement notifications

This commit is contained in:
avdb13 2025-04-20 03:39:10 +00:00
parent d425ba72f8
commit 0e977481a1
8 changed files with 250 additions and 28 deletions

View file

@ -11,6 +11,7 @@ mod keys;
mod media;
mod membership;
mod message;
mod notifications;
mod profile;
mod push;
mod read_marker;
@ -45,6 +46,7 @@ pub(crate) use keys::*;
pub(crate) use media::*;
pub(crate) use membership::*;
pub(crate) use message::*;
pub(crate) use notifications::*;
pub(crate) use profile::*;
pub(crate) use push::*;
pub(crate) use read_marker::*;

View file

@ -0,0 +1,43 @@
use ruma::api::client::{error::ErrorKind, push::get_notifications};
use crate::{services, Ar, Error, Ra, Result};
/// # `GET /_matrix/client/r0/notifications`
///
/// Gets all events that we have been notified about.
#[allow(dead_code, clippy::unused_async)]
pub(crate) async fn get_notifications_route(
body: Ar<get_notifications::v3::Request>,
) -> Result<Ra<get_notifications::v3::Response>> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let from = body
.from
.as_ref()
.map(|from| {
from.parse().map_err(|_| {
Error::BadRequest(
ErrorKind::InvalidParam,
"failed to parse from ",
)
})
})
.transpose()?;
let only = body
.only
.as_ref()
.is_some_and(|only| only.to_lowercase() == "highlight");
let (notifications, next_token) = services().rooms.user.get_notifications(
sender_user,
from,
body.limit,
only,
)?;
Ok(Ra(get_notifications::v3::Response {
next_token,
notifications,
}))
}

View file

@ -488,6 +488,7 @@ fn client_routes() -> Router {
.ruma_route(c2s::get_pushrule_actions_route)
.ruma_route(c2s::set_pushrule_actions_route)
.ruma_route(c2s::delete_pushrule_route)
.ruma_route(c2s::get_notifications_route)
.ruma_route(c2s::get_room_event_route)
.ruma_route(c2s::get_room_aliases_route)
.ruma_route(c2s::get_filter_route)

View file

@ -149,6 +149,9 @@ pub(crate) struct KeyValueDatabase {
// LastNotificationRead = u64
pub(super) roomuserid_lastnotificationread: Arc<dyn KvTree>,
// UserPduCountId = UserId + PduCount
pub(super) userpducountid_notifications: Arc<dyn KvTree>,
/// Remember the current state hash of a room.
pub(super) roomid_shortstatehash: Arc<dyn KvTree>,
@ -446,6 +449,8 @@ impl KeyValueDatabase {
senderkey_pusher: builder.open_tree("senderkey_pusher")?,
global: builder.open_tree("global")?,
server_signingkeys: builder.open_tree("server_signingkeys")?,
userpducountid_notifications: builder
.open_tree("userpducountid_notifications")?,
};
Ok(db)

View file

@ -1,9 +1,20 @@
use ruma::{OwnedRoomId, OwnedUserId, RoomId, UserId};
use std::mem::size_of;
use ruma::{
api::client::{
error::ErrorKind, push::get_notifications::v3::Notification,
},
push, MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedUserId, RoomId, UInt,
UserId,
};
use crate::{
database::KeyValueDatabase,
service::{self, rooms::short::ShortStateHash},
services, utils, Error, Result,
service::{
self,
rooms::{short::ShortStateHash, timeline::PduCount},
},
services, utils, Error, PduEvent, Result,
};
impl service::rooms::user::Data for KeyValueDatabase {
@ -24,10 +35,40 @@ impl service::rooms::user::Data for KeyValueDatabase {
self.userroomid_highlightcount
.insert(&userroom_id, &0_u64.to_be_bytes())?;
self.roomuserid_lastnotificationread.insert(
&roomuser_id,
&services().globals.next_count()?.to_be_bytes(),
)?;
let next_count = services().globals.next_count()?;
self.roomuserid_lastnotificationread
.insert(&roomuser_id, &next_count.to_be_bytes())?;
let mut userpducount_id = user_id.localpart().as_bytes().to_vec();
userpducount_id.push(0xFF);
userpducount_id.extend_from_slice(&next_count.to_be_bytes());
let shortroomid =
services().rooms.short.get_shortroomid(room_id)?.ok_or_else(
|| {
Error::bad_database(
"Looked for bad shortroomid for notifications",
)
},
)?;
let it =
self.userpducountid_notifications.iter_from(&userpducount_id, true);
for (k, mut v) in it.filter(|(_, v)| {
v[2] == 1
&& u64::from_be_bytes(
v[3..3 + size_of::<u64>()].try_into().unwrap(),
) == shortroomid.get()
}) {
self.userpducountid_notifications.remove(&k)?;
v[2] = 0;
self.userpducountid_notifications.insert(&k, &v)?;
}
Ok(())
}
@ -93,6 +134,111 @@ impl service::rooms::user::Data for KeyValueDatabase {
.unwrap_or(0))
}
fn store_push_action(
&self,
pdu: &PduEvent,
user_id: &UserId,
notify: bool,
highlight: bool,
actions: &[push::Action],
) -> Result<()> {
let Some(PduCount::Normal(pdu_count)) =
services().rooms.timeline.get_pdu_count(&pdu.event_id)?
else {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Event does not exist.",
));
};
let mut key = user_id.localpart().as_bytes().to_vec();
key.push(0xFF);
key.extend_from_slice(&pdu_count.to_be_bytes());
let (notify, highlight, unread) =
(u8::from(notify), u8::from(highlight), u8::from(true));
let notification = serde_json::to_vec(&Notification {
actions: actions.to_owned(),
event: pdu.to_sync_room_event(),
profile_tag: None,
read: false,
room_id: pdu.room_id.clone(),
// TODO
ts: MilliSecondsSinceUnixEpoch::now(),
})
.expect("Notification should serialize");
let shortroomid =
services().rooms.short.get_shortroomid(&pdu.room_id)?.ok_or_else(
|| {
Error::bad_database(
"Looked for bad shortroomid for notifications",
)
},
)?;
let mut value = vec![notify, highlight, unread];
value.extend_from_slice(&shortroomid.get().to_be_bytes());
value.extend_from_slice(&notification);
self.userpducountid_notifications.insert(&key, &value)?;
Ok(())
}
fn get_notifications(
&self,
user_id: &UserId,
from: Option<u64>,
limit: Option<UInt>,
highlight: bool,
) -> Result<(Vec<Notification>, Option<String>)> {
let mut key = user_id.localpart().as_bytes().to_vec();
key.push(0xFF);
key.extend_from_slice(&from.unwrap_or(u64::MAX).to_be_bytes());
let limit = limit.and_then(|n| n.try_into().ok()).unwrap_or(50);
let it = self
.userpducountid_notifications
.iter_from(&key, true)
.take_while(move |(_, v)| {
v[0] == 1 // notify
&& (!highlight || v[1] == 1) // highlight
&& v[2] == 1 // unread
})
.take(limit + 1);
let mut notifications: Vec<_> = it
.map(|(k, v)| {
(
k,
Notification {
read: v[2] != 1,
..serde_json::from_slice(&v[3 + size_of::<u64>()..])
.unwrap()
},
)
})
.collect();
let next_token = notifications
.pop()
.and_then(|(k, _)| {
k.split(|b| *b == 0xFF).nth(1).map(<[u8]>::to_vec)
})
.map(|pdu_count| {
format!("{}", u64::from_be_bytes(pdu_count.try_into().unwrap()))
});
Ok((notifications.into_iter().map(|(_, n)| n).collect(), next_token))
}
fn associate_token_shortstatehash(
&self,
room_id: &RoomId,

View file

@ -177,13 +177,15 @@ impl Service {
.transpose()?
.unwrap_or_default();
for action in self.get_actions(
let actions = Service::get_actions(
user,
&ruleset,
&power_levels,
&pdu.to_sync_room_event(),
&pdu.room_id,
)? {
)?;
for action in actions {
let n = match action {
Action::Notify => true,
Action::SetTweak(tweak) => {
@ -210,9 +212,8 @@ impl Service {
Ok(())
}
#[tracing::instrument(skip(self, user, ruleset, pdu))]
#[tracing::instrument(skip(user, ruleset, pdu))]
pub(crate) fn get_actions<'a>(
&self,
user: &UserId,
ruleset: &'a Ruleset,
power_levels: &RoomPowerLevelsEventContent,
@ -287,13 +288,7 @@ impl Service {
notifi.prio = NotificationPriority::High;
}
if event_id_only {
self.send_request(
&http.url,
send_event_notification::v1::Request::new(notifi),
)
.await?;
} else {
if !event_id_only {
notifi.sender = Some(event.sender.clone());
notifi.event_type = Some(event.kind.clone());
notifi.content =
@ -311,14 +306,14 @@ impl Service {
.rooms
.state_accessor
.get_name(&event.room_id)?;
self.send_request(
&http.url,
send_event_notification::v1::Request::new(notifi),
)
.await?;
}
self.send_request(
&http.url,
send_event_notification::v1::Request::new(notifi),
)
.await?;
Ok(())
}
// TODO: Handle email

View file

@ -36,6 +36,7 @@ use crate::{
appservice::NamespaceRegex,
globals::{marker, SigningKeys},
pdu::{EventHash, PduBuilder},
pusher,
rooms::state::ExtractVersion,
},
services,
@ -440,13 +441,15 @@ impl Service {
let mut highlight = false;
let mut notify = false;
for action in services().pusher.get_actions(
let actions = pusher::Service::get_actions(
user,
&rules_for_user,
&power_levels,
&sync_pdu,
&pdu.room_id,
)? {
)?;
for action in actions {
match action {
Action::Notify => notify = true,
Action::SetTweak(Tweak::Highlight(true)) => {
@ -464,6 +467,12 @@ impl Service {
highlights.push(user.clone());
}
// Store push action
services()
.rooms
.user
.store_push_action(pdu, user, notify, highlight, actions)?;
for push_key in services().pusher.get_pushkeys(user) {
services().sending.send_push_pdu(&pdu_id, user, push_key?)?;
}

View file

@ -1,6 +1,9 @@
use ruma::{OwnedRoomId, OwnedUserId, RoomId, UserId};
use ruma::{
api::client::push::get_notifications, push, OwnedRoomId, OwnedUserId,
RoomId, UInt, UserId,
};
use crate::{service::rooms::short::ShortStateHash, Result};
use crate::{service::rooms::short::ShortStateHash, PduEvent, Result};
pub(crate) trait Data: Send + Sync {
fn reset_notification_counts(
@ -28,6 +31,24 @@ pub(crate) trait Data: Send + Sync {
room_id: &RoomId,
) -> Result<u64>;
fn store_push_action(
&self,
pdu: &PduEvent,
user_id: &UserId,
notify: bool,
highlight: bool,
actions: &[push::Action],
) -> Result<()>;
/// Get all unread or notifying events for a user since ``pdu_count``
fn get_notifications(
&self,
user_id: &UserId,
from: Option<u64>,
limit: Option<UInt>,
highlight: bool,
) -> Result<(Vec<get_notifications::v3::Notification>, Option<String>)>;
fn associate_token_shortstatehash(
&self,
room_id: &RoomId,