Merge branch 'lambda/sss' into 'main'

Draft: Replace Sliding Sync with Simplified Sliding Sync

Closes #92

See merge request matrix/grapevine!171
This commit is contained in:
Lambda 2025-08-10 19:50:42 +00:00
commit 72c1bf6795
11 changed files with 1429 additions and 49 deletions

22
Cargo.lock generated
View file

@ -2442,7 +2442,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma" name = "ruma"
version = "0.12.1" version = "0.12.1"
source = "git+https://github.com/ruma/ruma.git?rev=1387667de806c37a6d7f72125117009bd618e32a#1387667de806c37a6d7f72125117009bd618e32a" source = "git+https://gitlab.computer.surgery/matrix/ruma.git?rev=ruma-0.12.1%2Bgrapevine-1#598c22e1d99ac6ce5b3864259e63a6d5bdc00199"
dependencies = [ dependencies = [
"assign", "assign",
"js_int", "js_int",
@ -2461,7 +2461,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-appservice-api" name = "ruma-appservice-api"
version = "0.12.1" version = "0.12.1"
source = "git+https://github.com/ruma/ruma.git?rev=1387667de806c37a6d7f72125117009bd618e32a#1387667de806c37a6d7f72125117009bd618e32a" source = "git+https://gitlab.computer.surgery/matrix/ruma.git?rev=ruma-0.12.1%2Bgrapevine-1#598c22e1d99ac6ce5b3864259e63a6d5bdc00199"
dependencies = [ dependencies = [
"js_int", "js_int",
"ruma-common", "ruma-common",
@ -2473,7 +2473,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-client-api" name = "ruma-client-api"
version = "0.20.1" version = "0.20.1"
source = "git+https://github.com/ruma/ruma.git?rev=1387667de806c37a6d7f72125117009bd618e32a#1387667de806c37a6d7f72125117009bd618e32a" source = "git+https://gitlab.computer.surgery/matrix/ruma.git?rev=ruma-0.12.1%2Bgrapevine-1#598c22e1d99ac6ce5b3864259e63a6d5bdc00199"
dependencies = [ dependencies = [
"as_variant", "as_variant",
"assign", "assign",
@ -2496,7 +2496,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-common" name = "ruma-common"
version = "0.15.1" version = "0.15.1"
source = "git+https://github.com/ruma/ruma.git?rev=1387667de806c37a6d7f72125117009bd618e32a#1387667de806c37a6d7f72125117009bd618e32a" source = "git+https://gitlab.computer.surgery/matrix/ruma.git?rev=ruma-0.12.1%2Bgrapevine-1#598c22e1d99ac6ce5b3864259e63a6d5bdc00199"
dependencies = [ dependencies = [
"as_variant", "as_variant",
"base64", "base64",
@ -2527,7 +2527,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-events" name = "ruma-events"
version = "0.30.1" version = "0.30.1"
source = "git+https://github.com/ruma/ruma.git?rev=1387667de806c37a6d7f72125117009bd618e32a#1387667de806c37a6d7f72125117009bd618e32a" source = "git+https://gitlab.computer.surgery/matrix/ruma.git?rev=ruma-0.12.1%2Bgrapevine-1#598c22e1d99ac6ce5b3864259e63a6d5bdc00199"
dependencies = [ dependencies = [
"as_variant", "as_variant",
"indexmap 2.10.0", "indexmap 2.10.0",
@ -2550,7 +2550,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-federation-api" name = "ruma-federation-api"
version = "0.11.0" version = "0.11.0"
source = "git+https://github.com/ruma/ruma.git?rev=1387667de806c37a6d7f72125117009bd618e32a#1387667de806c37a6d7f72125117009bd618e32a" source = "git+https://gitlab.computer.surgery/matrix/ruma.git?rev=ruma-0.12.1%2Bgrapevine-1#598c22e1d99ac6ce5b3864259e63a6d5bdc00199"
dependencies = [ dependencies = [
"bytes", "bytes",
"headers", "headers",
@ -2572,7 +2572,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-identifiers-validation" name = "ruma-identifiers-validation"
version = "0.10.1" version = "0.10.1"
source = "git+https://github.com/ruma/ruma.git?rev=1387667de806c37a6d7f72125117009bd618e32a#1387667de806c37a6d7f72125117009bd618e32a" source = "git+https://gitlab.computer.surgery/matrix/ruma.git?rev=ruma-0.12.1%2Bgrapevine-1#598c22e1d99ac6ce5b3864259e63a6d5bdc00199"
dependencies = [ dependencies = [
"js_int", "js_int",
"thiserror 2.0.12", "thiserror 2.0.12",
@ -2581,7 +2581,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-macros" name = "ruma-macros"
version = "0.15.1" version = "0.15.1"
source = "git+https://github.com/ruma/ruma.git?rev=1387667de806c37a6d7f72125117009bd618e32a#1387667de806c37a6d7f72125117009bd618e32a" source = "git+https://gitlab.computer.surgery/matrix/ruma.git?rev=ruma-0.12.1%2Bgrapevine-1#598c22e1d99ac6ce5b3864259e63a6d5bdc00199"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"proc-macro-crate", "proc-macro-crate",
@ -2596,7 +2596,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-push-gateway-api" name = "ruma-push-gateway-api"
version = "0.11.0" version = "0.11.0"
source = "git+https://github.com/ruma/ruma.git?rev=1387667de806c37a6d7f72125117009bd618e32a#1387667de806c37a6d7f72125117009bd618e32a" source = "git+https://gitlab.computer.surgery/matrix/ruma.git?rev=ruma-0.12.1%2Bgrapevine-1#598c22e1d99ac6ce5b3864259e63a6d5bdc00199"
dependencies = [ dependencies = [
"js_int", "js_int",
"ruma-common", "ruma-common",
@ -2608,7 +2608,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-signatures" name = "ruma-signatures"
version = "0.17.0" version = "0.17.0"
source = "git+https://github.com/ruma/ruma.git?rev=1387667de806c37a6d7f72125117009bd618e32a#1387667de806c37a6d7f72125117009bd618e32a" source = "git+https://gitlab.computer.surgery/matrix/ruma.git?rev=ruma-0.12.1%2Bgrapevine-1#598c22e1d99ac6ce5b3864259e63a6d5bdc00199"
dependencies = [ dependencies = [
"base64", "base64",
"ed25519-dalek", "ed25519-dalek",
@ -2624,7 +2624,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-state-res" name = "ruma-state-res"
version = "0.13.0" version = "0.13.0"
source = "git+https://github.com/ruma/ruma.git?rev=1387667de806c37a6d7f72125117009bd618e32a#1387667de806c37a6d7f72125117009bd618e32a" source = "git+https://gitlab.computer.surgery/matrix/ruma.git?rev=ruma-0.12.1%2Bgrapevine-1#598c22e1d99ac6ce5b3864259e63a6d5bdc00199"
dependencies = [ dependencies = [
"js_int", "js_int",
"ruma-common", "ruma-common",

View file

@ -144,8 +144,8 @@ trust-dns-resolver = "0.23.2"
xdg = "2.5.2" xdg = "2.5.2"
[dependencies.ruma] [dependencies.ruma]
git = "https://github.com/ruma/ruma.git" git = "https://gitlab.computer.surgery/matrix/ruma.git"
rev = "1387667de806c37a6d7f72125117009bd618e32a" rev = "ruma-0.12.1+grapevine-1"
features = [ features = [
"compat-server-signing-key-version", "compat-server-signing-key-version",
"compat-empty-string-null", "compat-empty-string-null",
@ -165,6 +165,7 @@ features = [
"state-res", "state-res",
"unstable-msc2448", "unstable-msc2448",
"ring-compat", "ring-compat",
"unstable-msc4186",
] ]
[target.'cfg(unix)'.dependencies] [target.'cfg(unix)'.dependencies]

View file

@ -87,8 +87,6 @@ This will be the first release of Grapevine since it was forked from Conduit
* Instead, it is now possible to configure each cache capacity individually. * Instead, it is now possible to configure each cache capacity individually.
10. Remove jemalloc support. 10. Remove jemalloc support.
([!93](https://gitlab.computer.surgery/matrix/grapevine/-/merge_requests/193)) ([!93](https://gitlab.computer.surgery/matrix/grapevine/-/merge_requests/193))
11. Removed support for MSC3575 (sliding sync), which has been closed.
([!198](https://gitlab.computer.surgery/matrix/grapevine/-/merge_requests/198))
### Changed ### Changed

View file

@ -5,6 +5,7 @@ use crate::{
service::rooms::timeline::PduCount, services, Error, PduEvent, Result, service::rooms::timeline::PduCount, services, Error, PduEvent, Result,
}; };
pub(crate) mod msc4186;
pub(crate) mod v3; pub(crate) mod v3;
fn load_timeline( fn load_timeline(

File diff suppressed because it is too large Load diff

View file

@ -32,6 +32,7 @@ pub(crate) async fn get_supported_versions_route(
unstable_features: BTreeMap::from_iter([ unstable_features: BTreeMap::from_iter([
("org.matrix.e2e_cross_signing".to_owned(), true), ("org.matrix.e2e_cross_signing".to_owned(), true),
("org.matrix.msc3916.stable".to_owned(), true), ("org.matrix.msc3916.stable".to_owned(), true),
("org.matrix.simplified_msc3575".to_owned(), true),
]), ]),
}; };

View file

@ -651,6 +651,7 @@ fn client_routes() -> Router {
.ruma_route(c2s::get_state_events_route) .ruma_route(c2s::get_state_events_route)
.ruma_route(c2s::get_state_events_for_key_route) .ruma_route(c2s::get_state_events_for_key_route)
.ruma_route(c2s::v3::sync_events_route) .ruma_route(c2s::v3::sync_events_route)
.ruma_route(c2s::msc4186::sync_events_v5_route)
.ruma_route(c2s::get_context_route) .ruma_route(c2s::get_context_route)
.ruma_route(c2s::get_message_events_route) .ruma_route(c2s::get_message_events_route)
.ruma_route(c2s::search_events_route) .ruma_route(c2s::search_events_route)

View file

@ -3,6 +3,7 @@ use std::collections::HashMap;
use ruma::{api::client::error::ErrorKind, RoomId, UserId}; use ruma::{api::client::error::ErrorKind, RoomId, UserId};
use serde::Deserialize; use serde::Deserialize;
use serde_json::value::RawValue; use serde_json::value::RawValue;
use tracing::error;
use crate::{ use crate::{
database::KeyValueDatabase, service, services, utils, Error, Result, database::KeyValueDatabase, service, services, utils, Error, Result,
@ -16,16 +17,6 @@ impl service::account_data::Data for KeyValueDatabase {
event_type: &str, event_type: &str,
data: &RawValue, data: &RawValue,
) -> Result<()> { ) -> Result<()> {
// Allowed because we just use this type to validate the schema, and
// don't read the fields.
#[allow(dead_code)]
#[derive(Deserialize)]
struct ExtractEventFields<'a> {
#[serde(rename = "type")]
event_type: &'a str,
content: &'a RawValue,
}
let mut prefix = room_id let mut prefix = room_id
.map(ToString::to_string) .map(ToString::to_string)
.unwrap_or_default() .unwrap_or_default()
@ -44,11 +35,41 @@ impl service::account_data::Data for KeyValueDatabase {
let mut key = prefix; let mut key = prefix;
key.extend_from_slice(event_type.as_bytes()); key.extend_from_slice(event_type.as_bytes());
if serde_json::from_str::<ExtractEventFields<'_>>(data.get()).is_err() { {
return Err(Error::BadRequest( #[derive(Deserialize)]
ErrorKind::InvalidParam, struct ExtractEventFields<'a> {
"Account data doesn't have all required fields.", #[serde(rename = "type")]
)); event_type: &'a str,
// Allowed because we just use this type to validate the schema
// and event type, and don't extract the content.
#[allow(dead_code)]
content: &'a RawValue,
}
let Ok(ExtractEventFields {
event_type: serialised_event_type,
..
}) = serde_json::from_str(data.get())
else {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Account data doesn't have all required fields.",
));
};
if serialised_event_type != event_type {
error!(
%user_id,
?room_id,
event_type,
serialised_event_type,
"Mismatch between discrete and serialised account data event type"
);
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Account data event type mismatch.",
));
}
} }
self.roomuserdataid_accountdata self.roomuserdataid_accountdata

View file

@ -9,27 +9,56 @@ use crate::{
}; };
macro_rules! short_id_type { macro_rules! short_id_type {
($name:ident) => { ($($(#[$doc:meta])* struct $name:ident(u64);)*) => {
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] $(
#[repr(transparent)] #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct $name(u64); #[repr(transparent)]
$(#[$doc])*
pub(crate) struct $name(u64);
impl $name { impl $name {
pub(crate) fn new(id: u64) -> Self { pub(crate) fn new(id: u64) -> Self {
Self(id) Self(id)
} }
pub(crate) fn get(&self) -> u64 { pub(crate) fn get(&self) -> u64 {
self.0 self.0
}
} }
} )*
}; };
} }
short_id_type!(ShortRoomId); short_id_type!(
short_id_type!(ShortEventId); /// Interned [RoomId].
short_id_type!(ShortStateHash); ///
short_id_type!(ShortStateKey); /// Created using [`get_shortroomid()`](Service::get_shortroomid) or
/// [`get_or_create_shortroomid()`](Service::get_or_create_shortroomid).
struct ShortRoomId(u64);
/// Interned [EventId].
///
/// Created using
/// [`get_or_create_shorteventid()`](Service::get_or_create_shorteventid),
/// resolved using
/// [`get_eventid_from_short()`](Service::get_eventid_from_short).
struct ShortEventId(u64);
/// Interned hash of concatenated state events.
///
/// Equal state sets do not necessarily correspond to equal short state
/// hashes, because the calculated hash is dependent on `HashSet`
/// iteration order.
///
/// Created using
/// [`get_or_create_shortstatehash()`](Service::get_or_create_shortstatehash).
struct ShortStateHash(u64);
/// Interned `(event type, state key)` tuple.
///
/// Created using [`get_shortstatekey()`](Service::get_shortstatekey) or
/// [`get_or_create_shortstatekey()`](Service::get_or_create_shortstatekey),
/// resolved using
/// [`get_statekey_from_short()`](Service::get_statekey_from_short).
struct ShortStateKey(u64);
);
mod data; mod data;

View file

@ -7,6 +7,7 @@ use lru_cache::LruCache;
use ruma::{ use ruma::{
events::{ events::{
room::{ room::{
avatar::RoomAvatarEventContent,
history_visibility::{ history_visibility::{
HistoryVisibility, RoomHistoryVisibilityEventContent, HistoryVisibility, RoomHistoryVisibilityEventContent,
}, },
@ -17,8 +18,8 @@ use ruma::{
StateEventType, StateEventType,
}, },
state_res::Event, state_res::Event,
EventId, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, ServerName, EventId, JsOption, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId,
UserId, ServerName, UserId,
}; };
use serde_json::value::to_raw_value; use serde_json::value::to_raw_value;
use tracing::{error, warn}; use tracing::{error, warn};
@ -507,6 +508,23 @@ impl Service {
) )
} }
#[tracing::instrument(skip(self))]
pub(crate) fn get_avatar(
&self,
room_id: &RoomId,
) -> Result<JsOption<RoomAvatarEventContent>> {
self.room_state_get(room_id, &StateEventType::RoomAvatar, "")?.map_or(
Ok(JsOption::Undefined),
|s| {
serde_json::from_str(s.content.get()).map_err(|_| {
Error::bad_database(
"Invalid room avatar event in database.",
)
})
},
)
}
// Allowed because this function uses `services()` // Allowed because this function uses `services()`
#[allow(clippy::unused_self)] #[allow(clippy::unused_self)]
#[tracing::instrument(skip(self), ret(level = "trace"))] #[tracing::instrument(skip(self), ret(level = "trace"))]
@ -535,6 +553,24 @@ impl Service {
.is_ok() .is_ok()
} }
#[tracing::instrument(skip(self))]
pub(crate) fn get_member(
&self,
room_id: &RoomId,
user_id: &UserId,
) -> Result<Option<RoomMemberEventContent>> {
self.room_state_get(
room_id,
&StateEventType::RoomMember,
user_id.as_str(),
)?
.map_or(Ok(None), |s| {
serde_json::from_str(s.content.get()).map_err(|_| {
Error::bad_database("Invalid room member event in database.")
})
})
}
/// Checks if a given user can redact a given event /// Checks if a given user can redact a given event
/// ///
/// If `federation` is `true`, it allows redaction events from any user of /// If `federation` is `true`, it allows redaction events from any user of

View file

@ -1,4 +1,8 @@
use std::{collections::BTreeMap, mem}; use std::{
collections::{BTreeMap, BTreeSet},
mem,
sync::{Arc, Mutex},
};
use ruma::{ use ruma::{
api::client::{device::Device, filter::FilterDefinition}, api::client::{device::Device, filter::FilterDefinition},
@ -6,7 +10,7 @@ use ruma::{
events::AnyToDeviceEvent, events::AnyToDeviceEvent,
serde::Raw, serde::Raw,
DeviceId, OneTimeKeyAlgorithm, OneTimeKeyName, OwnedDeviceId, OwnedKeyId, DeviceId, OneTimeKeyAlgorithm, OneTimeKeyName, OwnedDeviceId, OwnedKeyId,
OwnedMxcUri, OwnedOneTimeKeyId, OwnedUserId, UInt, UserId, OwnedMxcUri, OwnedOneTimeKeyId, OwnedRoomId, OwnedUserId, UInt, UserId,
}; };
use crate::{services, Error, Result}; use crate::{services, Error, Result};
@ -15,22 +19,133 @@ mod data;
pub(crate) use data::Data; pub(crate) use data::Data;
#[derive(Debug)]
struct KnownRooms {
/// The `pos` value of the request that these `room_since` values apply to
pos: u64,
/// `since` values for rooms that have been sent previously
room_since: BTreeMap<OwnedRoomId, u64>,
}
#[derive(Debug, Default)]
pub(crate) struct SlidingSyncCache {
/// `since` values for rooms in the current/previous request. Needed in
/// case the response doesn't arrive and the client requests the same
/// `pos` value again.
current_known_rooms: Option<KnownRooms>,
/// Overlay on top of `current_known_rooms` of `since` values for rooms in
/// the expected next request (where the `pos` value should be the
/// `pos` value from our response).
next_known_rooms: Option<KnownRooms>,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub(crate) struct ConnectionKey {
pub(crate) user: OwnedUserId,
pub(crate) device: OwnedDeviceId,
pub(crate) connection: Option<String>,
}
pub(crate) struct Service { pub(crate) struct Service {
pub(crate) db: &'static dyn Data, pub(crate) db: &'static dyn Data,
#[allow(clippy::type_complexity)]
pub(crate) connections:
Mutex<BTreeMap<ConnectionKey, Arc<Mutex<SlidingSyncCache>>>>,
} }
impl Service { impl Service {
pub(crate) fn new(db: &'static dyn Data) -> Self { pub(crate) fn new(db: &'static dyn Data) -> Self {
Self { Self {
db, db,
connections: Mutex::new(BTreeMap::new()),
} }
} }
fn get_cache_entry(
&self,
key: ConnectionKey,
) -> Arc<Mutex<SlidingSyncCache>> {
let mut cache = self.connections.lock().unwrap();
Arc::clone(cache.entry(key).or_default())
}
/// Check if a user has an account on this homeserver. /// Check if a user has an account on this homeserver.
pub(crate) fn exists(&self, user_id: &UserId) -> Result<bool> { pub(crate) fn exists(&self, user_id: &UserId) -> Result<bool> {
self.db.exists(user_id) self.db.exists(user_id)
} }
pub(crate) fn forget_sync_request_connection(
&self,
connection_key: &ConnectionKey,
) {
self.connections.lock().unwrap().remove(connection_key);
}
#[tracing::instrument(skip(self))]
pub(crate) fn get_rooms_in_connection(
&self,
connection_key: ConnectionKey,
pos: u64,
) -> BTreeMap<OwnedRoomId, u64> {
let cached = self.get_cache_entry(connection_key);
let mut cached = cached.lock().unwrap();
let cached = &mut *cached;
let current_known_rooms =
cached.current_known_rooms.get_or_insert(KnownRooms {
pos,
room_since: BTreeMap::new(),
});
if current_known_rooms.pos == pos {
// Another request for a previous `pos` value, invalidate the next
// result
cached.next_known_rooms = None;
} else if let Some(next_known_rooms) =
cached.next_known_rooms.take().filter(|x| x.pos == pos)
{
// Merge overlay into current_known_rooms
current_known_rooms.pos = next_known_rooms.pos;
current_known_rooms.room_since.extend(next_known_rooms.room_since);
} else {
// Not a repeated request, and we don't have calculated values for a
// next request, start over
*current_known_rooms = KnownRooms {
pos,
room_since: BTreeMap::new(),
};
}
current_known_rooms.room_since.clone()
}
#[tracing::instrument(skip(self))]
pub(crate) fn update_sync_known_rooms(
&self,
connection_key: ConnectionKey,
new_cached_rooms: BTreeSet<OwnedRoomId>,
pos: u64,
next_batch: u64,
) {
let cached = self.get_cache_entry(connection_key);
let mut cached = cached.lock().unwrap();
assert_eq!(
cached.current_known_rooms.as_ref().map(|x| x.pos),
Some(pos),
"current known rooms should match current request's pos"
);
// Add an overlay to the current request's known rooms
cached.next_known_rooms = Some(KnownRooms {
pos: next_batch,
room_since: new_cached_rooms
.into_iter()
.map(|room_id| (room_id, next_batch))
.collect(),
});
}
/// Check if account is deactivated /// Check if account is deactivated
pub(crate) fn is_deactivated(&self, user_id: &UserId) -> Result<bool> { pub(crate) fn is_deactivated(&self, user_id: &UserId) -> Result<bool> {
self.db.is_deactivated(user_id) self.db.is_deactivated(user_id)