Add PduId wrapper struct

Death to Vec<u8>
This commit is contained in:
Lambda 2024-08-26 16:47:50 +00:00
parent 341f4213d0
commit 26322d5a95
15 changed files with 110 additions and 71 deletions

View file

@ -1372,7 +1372,7 @@ pub(crate) async fn invite_helper(
) )
})?; })?;
let pdu_id: Vec<u8> = services() let pdu_id = services()
.rooms .rooms
.event_handler .event_handler
.handle_incoming_pdu( .handle_incoming_pdu(

View file

@ -84,7 +84,9 @@ pub(crate) async fn search_events_route(
if let Some(s) = searches if let Some(s) = searches
.iter_mut() .iter_mut()
.map(|s| (s.peek().cloned(), s)) .map(|s| (s.peek().cloned(), s))
.max_by_key(|(peek, _)| peek.clone()) .max_by_key(|(peek, _)| {
peek.as_ref().map(|id| id.as_bytes().to_vec())
})
.and_then(|(_, i)| i.next()) .and_then(|(_, i)| i.next())
{ {
results.push(s); results.push(s);

View file

@ -1622,7 +1622,7 @@ async fn create_join_event(
.roomid_mutex_federation .roomid_mutex_federation
.lock_key(room_id.to_owned()) .lock_key(room_id.to_owned())
.await; .await;
let pdu_id: Vec<u8> = services() let pdu_id = services()
.rooms .rooms
.event_handler .event_handler
.handle_incoming_pdu( .handle_incoming_pdu(

View file

@ -4,7 +4,10 @@ use ruma::{EventId, RoomId, UserId};
use crate::{ use crate::{
database::KeyValueDatabase, database::KeyValueDatabase,
service::{self, rooms::timeline::PduCount}, service::{
self,
rooms::timeline::{PduCount, PduId},
},
services, utils, Error, PduEvent, Result, services, utils, Error, PduEvent, Result,
}; };
@ -50,6 +53,7 @@ impl service::rooms::pdu_metadata::Data for KeyValueDatabase {
let mut pduid = shortroomid.to_be_bytes().to_vec(); let mut pduid = shortroomid.to_be_bytes().to_vec();
pduid.extend_from_slice(&from.to_be_bytes()); pduid.extend_from_slice(&from.to_be_bytes());
let pduid = PduId::new(pduid);
let mut pdu = services() let mut pdu = services()
.rooms .rooms

View file

@ -1,6 +1,10 @@
use ruma::RoomId; use ruma::RoomId;
use crate::{database::KeyValueDatabase, service, services, utils, Result}; use crate::{
database::KeyValueDatabase,
service::{self, rooms::timeline::PduId},
services, utils, Result,
};
/// Splits a string into tokens used as keys in the search inverted index /// Splits a string into tokens used as keys in the search inverted index
/// ///
@ -18,7 +22,7 @@ impl service::rooms::search::Data for KeyValueDatabase {
fn index_pdu( fn index_pdu(
&self, &self,
shortroomid: u64, shortroomid: u64,
pdu_id: &[u8], pdu_id: &PduId,
message_body: &str, message_body: &str,
) -> Result<()> { ) -> Result<()> {
let mut batch = tokenize(message_body).map(|word| { let mut batch = tokenize(message_body).map(|word| {
@ -26,7 +30,7 @@ impl service::rooms::search::Data for KeyValueDatabase {
key.extend_from_slice(word.as_bytes()); key.extend_from_slice(word.as_bytes());
key.push(0xFF); key.push(0xFF);
// TODO: currently we save the room id a second time here // TODO: currently we save the room id a second time here
key.extend_from_slice(pdu_id); key.extend_from_slice(pdu_id.as_bytes());
(key, Vec::new()) (key, Vec::new())
}); });
@ -37,7 +41,7 @@ impl service::rooms::search::Data for KeyValueDatabase {
fn deindex_pdu( fn deindex_pdu(
&self, &self,
shortroomid: u64, shortroomid: u64,
pdu_id: &[u8], pdu_id: &PduId,
message_body: &str, message_body: &str,
) -> Result<()> { ) -> Result<()> {
let batch = tokenize(message_body).map(|word| { let batch = tokenize(message_body).map(|word| {
@ -45,7 +49,7 @@ impl service::rooms::search::Data for KeyValueDatabase {
key.extend_from_slice(word.as_bytes()); key.extend_from_slice(word.as_bytes());
key.push(0xFF); key.push(0xFF);
// TODO: currently we save the room id a second time here // TODO: currently we save the room id a second time here
key.extend_from_slice(pdu_id); key.extend_from_slice(pdu_id.as_bytes());
key key
}); });
@ -62,7 +66,7 @@ impl service::rooms::search::Data for KeyValueDatabase {
&'a self, &'a self,
room_id: &RoomId, room_id: &RoomId,
search_string: &str, search_string: &str,
) -> Result<Option<(Box<dyn Iterator<Item = Vec<u8>> + 'a>, Vec<String>)>> ) -> Result<Option<(Box<dyn Iterator<Item = PduId> + 'a>, Vec<String>)>>
{ {
let prefix = services() let prefix = services()
.rooms .rooms
@ -87,12 +91,14 @@ impl service::rooms::search::Data for KeyValueDatabase {
// Newest pdus first // Newest pdus first
.iter_from(&last_possible_id, true) .iter_from(&last_possible_id, true)
.take_while(move |(k, _)| k.starts_with(&prefix2)) .take_while(move |(k, _)| k.starts_with(&prefix2))
.map(move |(key, _)| key[prefix3.len()..].to_vec()) .map(move |(key, _)| PduId::new(key[prefix3.len()..].to_vec()))
}); });
// We compare b with a because we reversed the iterator earlier // We compare b with a because we reversed the iterator earlier
let Some(common_elements) = let Some(common_elements) =
utils::common_elements(iterators, |a, b| b.cmp(a)) utils::common_elements(iterators, |a, b| {
b.as_bytes().cmp(a.as_bytes())
})
else { else {
return Ok(None); return Ok(None);
}; };

View file

@ -6,8 +6,9 @@ use ruma::{
}; };
use crate::{ use crate::{
database::KeyValueDatabase, service, services, utils, Error, PduEvent, database::KeyValueDatabase,
Result, service::{self, rooms::timeline::PduId},
services, utils, Error, PduEvent, Result,
}; };
impl service::rooms::threads::Data for KeyValueDatabase { impl service::rooms::threads::Data for KeyValueDatabase {
@ -42,6 +43,9 @@ impl service::rooms::threads::Data for KeyValueDatabase {
"Invalid pduid in threadid_userids.", "Invalid pduid in threadid_userids.",
) )
})?; })?;
let pduid = PduId::new(pduid);
let mut pdu = services() let mut pdu = services()
.rooms .rooms
.timeline .timeline
@ -61,7 +65,7 @@ impl service::rooms::threads::Data for KeyValueDatabase {
fn update_participants( fn update_participants(
&self, &self,
root_id: &[u8], root_id: &PduId,
participants: &[OwnedUserId], participants: &[OwnedUserId],
) -> Result<()> { ) -> Result<()> {
let users = participants let users = participants
@ -70,16 +74,16 @@ impl service::rooms::threads::Data for KeyValueDatabase {
.collect::<Vec<_>>() .collect::<Vec<_>>()
.join(&[0xFF][..]); .join(&[0xFF][..]);
self.threadid_userids.insert(root_id, &users)?; self.threadid_userids.insert(root_id.as_bytes(), &users)?;
Ok(()) Ok(())
} }
fn get_participants( fn get_participants(
&self, &self,
root_id: &[u8], root_id: &PduId,
) -> Result<Option<Vec<OwnedUserId>>> { ) -> Result<Option<Vec<OwnedUserId>>> {
if let Some(users) = self.threadid_userids.get(root_id)? { if let Some(users) = self.threadid_userids.get(root_id.as_bytes())? {
Ok(Some( Ok(Some(
users users
.split(|b| *b == 0xFF) .split(|b| *b == 0xFF)

View file

@ -10,7 +10,8 @@ use tracing::error;
use crate::{ use crate::{
database::KeyValueDatabase, database::KeyValueDatabase,
observability::{FoundIn, Lookup, METRICS}, observability::{FoundIn, Lookup, METRICS},
service, services, utils, Error, PduEvent, Result, service::{self, rooms::timeline::PduId},
services, utils, Error, PduEvent, Result,
}; };
impl service::rooms::timeline::Data for KeyValueDatabase { impl service::rooms::timeline::Data for KeyValueDatabase {
@ -102,8 +103,8 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
} }
/// Returns the pdu's id. /// Returns the pdu's id.
fn get_pdu_id(&self, event_id: &EventId) -> Result<Option<Vec<u8>>> { fn get_pdu_id(&self, event_id: &EventId) -> Result<Option<PduId>> {
self.eventid_pduid.get(event_id.as_bytes()) self.eventid_pduid.get(event_id.as_bytes()).map(|x| x.map(PduId::new))
} }
/// Returns the pdu. /// Returns the pdu.
@ -170,8 +171,8 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
/// Returns the pdu. /// Returns the pdu.
/// ///
/// This does __NOT__ check the outliers `Tree`. /// This does __NOT__ check the outliers `Tree`.
fn get_pdu_from_id(&self, pdu_id: &[u8]) -> Result<Option<PduEvent>> { fn get_pdu_from_id(&self, pdu_id: &PduId) -> Result<Option<PduEvent>> {
self.pduid_pdu.get(pdu_id)?.map_or(Ok(None), |pdu| { self.pduid_pdu.get(pdu_id.as_bytes())?.map_or(Ok(None), |pdu| {
Ok(Some( Ok(Some(
serde_json::from_slice(&pdu) serde_json::from_slice(&pdu)
.map_err(|_| Error::bad_database("Invalid PDU in db."))?, .map_err(|_| Error::bad_database("Invalid PDU in db."))?,
@ -182,9 +183,9 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
/// Returns the pdu as a `BTreeMap<String, CanonicalJsonValue>`. /// Returns the pdu as a `BTreeMap<String, CanonicalJsonValue>`.
fn get_pdu_json_from_id( fn get_pdu_json_from_id(
&self, &self,
pdu_id: &[u8], pdu_id: &PduId,
) -> Result<Option<CanonicalJsonObject>> { ) -> Result<Option<CanonicalJsonObject>> {
self.pduid_pdu.get(pdu_id)?.map_or(Ok(None), |pdu| { self.pduid_pdu.get(pdu_id.as_bytes())?.map_or(Ok(None), |pdu| {
Ok(Some( Ok(Some(
serde_json::from_slice(&pdu) serde_json::from_slice(&pdu)
.map_err(|_| Error::bad_database("Invalid PDU in db."))?, .map_err(|_| Error::bad_database("Invalid PDU in db."))?,
@ -194,13 +195,13 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
fn append_pdu( fn append_pdu(
&self, &self,
pdu_id: &[u8], pdu_id: &PduId,
pdu: &PduEvent, pdu: &PduEvent,
json: &CanonicalJsonObject, json: &CanonicalJsonObject,
count: u64, count: u64,
) -> Result<()> { ) -> Result<()> {
self.pduid_pdu.insert( self.pduid_pdu.insert(
pdu_id, pdu_id.as_bytes(),
&serde_json::to_vec(json) &serde_json::to_vec(json)
.expect("CanonicalJsonObject is always a valid"), .expect("CanonicalJsonObject is always a valid"),
)?; )?;
@ -210,7 +211,8 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
.unwrap() .unwrap()
.insert(pdu.room_id.clone(), PduCount::Normal(count)); .insert(pdu.room_id.clone(), PduCount::Normal(count));
self.eventid_pduid.insert(pdu.event_id.as_bytes(), pdu_id)?; self.eventid_pduid
.insert(pdu.event_id.as_bytes(), pdu_id.as_bytes())?;
self.eventid_outlierpdu.remove(pdu.event_id.as_bytes())?; self.eventid_outlierpdu.remove(pdu.event_id.as_bytes())?;
Ok(()) Ok(())
@ -218,17 +220,17 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
fn prepend_backfill_pdu( fn prepend_backfill_pdu(
&self, &self,
pdu_id: &[u8], pdu_id: &PduId,
event_id: &EventId, event_id: &EventId,
json: &CanonicalJsonObject, json: &CanonicalJsonObject,
) -> Result<()> { ) -> Result<()> {
self.pduid_pdu.insert( self.pduid_pdu.insert(
pdu_id, pdu_id.as_bytes(),
&serde_json::to_vec(json) &serde_json::to_vec(json)
.expect("CanonicalJsonObject is always a valid"), .expect("CanonicalJsonObject is always a valid"),
)?; )?;
self.eventid_pduid.insert(event_id.as_bytes(), pdu_id)?; self.eventid_pduid.insert(event_id.as_bytes(), pdu_id.as_bytes())?;
self.eventid_outlierpdu.remove(event_id.as_bytes())?; self.eventid_outlierpdu.remove(event_id.as_bytes())?;
Ok(()) Ok(())
@ -237,13 +239,13 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
/// Removes a pdu and creates a new one with the same id. /// Removes a pdu and creates a new one with the same id.
fn replace_pdu( fn replace_pdu(
&self, &self,
pdu_id: &[u8], pdu_id: &PduId,
pdu_json: &CanonicalJsonObject, pdu_json: &CanonicalJsonObject,
pdu: &PduEvent, pdu: &PduEvent,
) -> Result<()> { ) -> Result<()> {
if self.pduid_pdu.get(pdu_id)?.is_some() { if self.pduid_pdu.get(pdu_id.as_bytes())?.is_some() {
self.pduid_pdu.insert( self.pduid_pdu.insert(
pdu_id, pdu_id.as_bytes(),
&serde_json::to_vec(pdu_json) &serde_json::to_vec(pdu_json)
.expect("CanonicalJsonObject is always a valid"), .expect("CanonicalJsonObject is always a valid"),
)?; )?;

View file

@ -4,6 +4,7 @@ use crate::{
database::KeyValueDatabase, database::KeyValueDatabase,
service::{ service::{
self, self,
rooms::timeline::PduId,
sending::{Destination, RequestKey, SendingEventType}, sending::{Destination, RequestKey, SendingEventType},
}, },
services, utils, Error, Result, services, utils, Error, Result,
@ -61,7 +62,7 @@ impl service::sending::Data for KeyValueDatabase {
for (destination, event) in requests { for (destination, event) in requests {
let mut key = destination.get_prefix(); let mut key = destination.get_prefix();
if let SendingEventType::Pdu(value) = &event { if let SendingEventType::Pdu(value) = &event {
key.extend_from_slice(value); key.extend_from_slice(value.as_bytes());
} else { } else {
key.extend_from_slice( key.extend_from_slice(
&services().globals.next_count()?.to_be_bytes(), &services().globals.next_count()?.to_be_bytes(),
@ -202,7 +203,7 @@ fn parse_servercurrentevent(
Ok(( Ok((
destination, destination,
if value.is_empty() { if value.is_empty() {
SendingEventType::Pdu(event.to_vec()) SendingEventType::Pdu(PduId::new(event.to_vec()))
} else { } else {
SendingEventType::Edu(value) SendingEventType::Edu(value)
}, },

View file

@ -37,7 +37,7 @@ use serde_json::value::RawValue as RawJsonValue;
use tokio::sync::{RwLock, RwLockWriteGuard, Semaphore}; use tokio::sync::{RwLock, RwLockWriteGuard, Semaphore};
use tracing::{debug, error, info, trace, warn}; use tracing::{debug, error, info, trace, warn};
use super::state_compressor::CompressedStateEvent; use super::{state_compressor::CompressedStateEvent, timeline::PduId};
use crate::{ use crate::{
service::{globals::SigningKeys, pdu}, service::{globals::SigningKeys, pdu},
services, services,
@ -89,7 +89,7 @@ impl Service {
value: CanonicalJsonObject, value: CanonicalJsonObject,
is_timeline_event: bool, is_timeline_event: bool,
pub_key_map: &'a RwLock<BTreeMap<String, SigningKeys>>, pub_key_map: &'a RwLock<BTreeMap<String, SigningKeys>>,
) -> Result<Option<Vec<u8>>> { ) -> Result<Option<PduId>> {
// 0. Check the server is in the room // 0. Check the server is in the room
if !services().rooms.metadata.exists(room_id)? { if !services().rooms.metadata.exists(room_id)? {
return Err(Error::BadRequest( return Err(Error::BadRequest(
@ -565,7 +565,7 @@ impl Service {
origin: &ServerName, origin: &ServerName,
room_id: &RoomId, room_id: &RoomId,
pub_key_map: &RwLock<BTreeMap<String, SigningKeys>>, pub_key_map: &RwLock<BTreeMap<String, SigningKeys>>,
) -> Result<Option<Vec<u8>>> { ) -> Result<Option<PduId>> {
// Skip the PDU if we already have it as a timeline event // Skip the PDU if we already have it as a timeline event
if let Ok(Some(pduid)) = if let Ok(Some(pduid)) =
services().rooms.timeline.get_pdu_id(&incoming_pdu.event_id) services().rooms.timeline.get_pdu_id(&incoming_pdu.event_id)

View file

@ -1,19 +1,19 @@
use ruma::RoomId; use ruma::RoomId;
use crate::Result; use crate::{service::rooms::timeline::PduId, Result};
pub(crate) trait Data: Send + Sync { pub(crate) trait Data: Send + Sync {
fn index_pdu( fn index_pdu(
&self, &self,
shortroomid: u64, shortroomid: u64,
pdu_id: &[u8], pdu_id: &PduId,
message_body: &str, message_body: &str,
) -> Result<()>; ) -> Result<()>;
fn deindex_pdu( fn deindex_pdu(
&self, &self,
shortroomid: u64, shortroomid: u64,
pdu_id: &[u8], pdu_id: &PduId,
message_body: &str, message_body: &str,
) -> Result<()>; ) -> Result<()>;
@ -22,5 +22,5 @@ pub(crate) trait Data: Send + Sync {
&'a self, &'a self,
room_id: &RoomId, room_id: &RoomId,
search_string: &str, search_string: &str,
) -> Result<Option<(Box<dyn Iterator<Item = Vec<u8>> + 'a>, Vec<String>)>>; ) -> Result<Option<(Box<dyn Iterator<Item = PduId> + 'a>, Vec<String>)>>;
} }

View file

@ -3,7 +3,7 @@ use ruma::{
UserId, UserId,
}; };
use crate::{PduEvent, Result}; use crate::{service::rooms::timeline::PduId, PduEvent, Result};
pub(crate) trait Data: Send + Sync { pub(crate) trait Data: Send + Sync {
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
@ -17,11 +17,11 @@ pub(crate) trait Data: Send + Sync {
fn update_participants( fn update_participants(
&self, &self,
root_id: &[u8], root_id: &PduId,
participants: &[OwnedUserId], participants: &[OwnedUserId],
) -> Result<()>; ) -> Result<()>;
fn get_participants( fn get_participants(
&self, &self,
root_id: &[u8], root_id: &PduId,
) -> Result<Option<Vec<OwnedUserId>>>; ) -> Result<Option<Vec<OwnedUserId>>>;
} }

View file

@ -44,6 +44,23 @@ use crate::{
Error, PduEvent, Result, Error, PduEvent, Result,
}; };
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub(crate) struct PduId {
inner: Vec<u8>,
}
impl PduId {
pub(crate) fn new(inner: Vec<u8>) -> Self {
Self {
inner,
}
}
pub(crate) fn as_bytes(&self) -> &[u8] {
&self.inner
}
}
#[derive(Hash, PartialEq, Eq, Clone, Copy, Debug)] #[derive(Hash, PartialEq, Eq, Clone, Copy, Debug)]
pub(crate) enum PduCount { pub(crate) enum PduCount {
Backfilled(u64), Backfilled(u64),
@ -146,7 +163,7 @@ impl Service {
pub(crate) fn get_pdu_id( pub(crate) fn get_pdu_id(
&self, &self,
event_id: &EventId, event_id: &EventId,
) -> Result<Option<Vec<u8>>> { ) -> Result<Option<PduId>> {
self.db.get_pdu_id(event_id) self.db.get_pdu_id(event_id)
} }
@ -165,7 +182,7 @@ impl Service {
/// This does __NOT__ check the outliers `Tree`. /// This does __NOT__ check the outliers `Tree`.
pub(crate) fn get_pdu_from_id( pub(crate) fn get_pdu_from_id(
&self, &self,
pdu_id: &[u8], pdu_id: &PduId,
) -> Result<Option<PduEvent>> { ) -> Result<Option<PduEvent>> {
self.db.get_pdu_from_id(pdu_id) self.db.get_pdu_from_id(pdu_id)
} }
@ -173,7 +190,7 @@ impl Service {
/// Returns the pdu as a `BTreeMap<String, CanonicalJsonValue>`. /// Returns the pdu as a `BTreeMap<String, CanonicalJsonValue>`.
pub(crate) fn get_pdu_json_from_id( pub(crate) fn get_pdu_json_from_id(
&self, &self,
pdu_id: &[u8], pdu_id: &PduId,
) -> Result<Option<CanonicalJsonObject>> { ) -> Result<Option<CanonicalJsonObject>> {
self.db.get_pdu_json_from_id(pdu_id) self.db.get_pdu_json_from_id(pdu_id)
} }
@ -182,7 +199,7 @@ impl Service {
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
pub(crate) fn replace_pdu( pub(crate) fn replace_pdu(
&self, &self,
pdu_id: &[u8], pdu_id: &PduId,
pdu_json: &CanonicalJsonObject, pdu_json: &CanonicalJsonObject,
pdu: &PduEvent, pdu: &PduEvent,
) -> Result<()> { ) -> Result<()> {
@ -202,7 +219,7 @@ impl Service {
mut pdu_json: CanonicalJsonObject, mut pdu_json: CanonicalJsonObject,
leaves: Vec<OwnedEventId>, leaves: Vec<OwnedEventId>,
room_id: &KeyToken<OwnedRoomId, marker::State>, room_id: &KeyToken<OwnedRoomId, marker::State>,
) -> Result<Vec<u8>> { ) -> Result<PduId> {
assert_eq!(*pdu.room_id, **room_id, "Token for incorrect room passed"); assert_eq!(*pdu.room_id, **room_id, "Token for incorrect room passed");
let shortroomid = services() let shortroomid = services()
@ -282,6 +299,7 @@ impl Service {
let count2 = services().globals.next_count()?; let count2 = services().globals.next_count()?;
let mut pdu_id = shortroomid.to_be_bytes().to_vec(); let mut pdu_id = shortroomid.to_be_bytes().to_vec();
pdu_id.extend_from_slice(&count2.to_be_bytes()); pdu_id.extend_from_slice(&count2.to_be_bytes());
let pdu_id = PduId::new(pdu_id);
// Insert pdu // Insert pdu
self.db.append_pdu(&pdu_id, pdu, &pdu_json, count2)?; self.db.append_pdu(&pdu_id, pdu, &pdu_json, count2)?;
@ -1106,7 +1124,7 @@ impl Service {
state_ids_compressed: Arc<HashSet<CompressedStateEvent>>, state_ids_compressed: Arc<HashSet<CompressedStateEvent>>,
soft_fail: bool, soft_fail: bool,
room_id: &KeyToken<OwnedRoomId, marker::State>, room_id: &KeyToken<OwnedRoomId, marker::State>,
) -> Result<Option<Vec<u8>>> { ) -> Result<Option<PduId>> {
assert_eq!(*pdu.room_id, **room_id, "Token for incorrect room passed"); assert_eq!(*pdu.room_id, **room_id, "Token for incorrect room passed");
// We append to state before appending the pdu, so we don't have a // We append to state before appending the pdu, so we don't have a
@ -1344,6 +1362,7 @@ impl Service {
let mut pdu_id = shortroomid.to_be_bytes().to_vec(); let mut pdu_id = shortroomid.to_be_bytes().to_vec();
pdu_id.extend_from_slice(&0_u64.to_be_bytes()); pdu_id.extend_from_slice(&0_u64.to_be_bytes());
pdu_id.extend_from_slice(&(u64::MAX - count).to_be_bytes()); pdu_id.extend_from_slice(&(u64::MAX - count).to_be_bytes());
let pdu_id = PduId::new(pdu_id);
// Insert pdu // Insert pdu
self.db.prepend_backfill_pdu(&pdu_id, &event_id, &value)?; self.db.prepend_backfill_pdu(&pdu_id, &event_id, &value)?;

View file

@ -3,7 +3,7 @@ use std::sync::Arc;
use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId}; use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId};
use super::PduCount; use super::PduCount;
use crate::{PduEvent, Result}; use crate::{service::rooms::timeline::PduId, PduEvent, Result};
pub(crate) trait Data: Send + Sync { pub(crate) trait Data: Send + Sync {
fn last_timeline_count( fn last_timeline_count(
@ -28,7 +28,7 @@ pub(crate) trait Data: Send + Sync {
) -> Result<Option<CanonicalJsonObject>>; ) -> Result<Option<CanonicalJsonObject>>;
/// Returns the pdu's id. /// Returns the pdu's id.
fn get_pdu_id(&self, event_id: &EventId) -> Result<Option<Vec<u8>>>; fn get_pdu_id(&self, event_id: &EventId) -> Result<Option<PduId>>;
/// Returns the pdu. /// Returns the pdu.
/// ///
@ -46,18 +46,18 @@ pub(crate) trait Data: Send + Sync {
/// Returns the pdu. /// Returns the pdu.
/// ///
/// This does __NOT__ check the outliers `Tree`. /// This does __NOT__ check the outliers `Tree`.
fn get_pdu_from_id(&self, pdu_id: &[u8]) -> Result<Option<PduEvent>>; fn get_pdu_from_id(&self, pdu_id: &PduId) -> Result<Option<PduEvent>>;
/// Returns the pdu as a `BTreeMap<String, CanonicalJsonValue>`. /// Returns the pdu as a `BTreeMap<String, CanonicalJsonValue>`.
fn get_pdu_json_from_id( fn get_pdu_json_from_id(
&self, &self,
pdu_id: &[u8], pdu_id: &PduId,
) -> Result<Option<CanonicalJsonObject>>; ) -> Result<Option<CanonicalJsonObject>>;
/// Adds a new pdu to the timeline /// Adds a new pdu to the timeline
fn append_pdu( fn append_pdu(
&self, &self,
pdu_id: &[u8], pdu_id: &PduId,
pdu: &PduEvent, pdu: &PduEvent,
json: &CanonicalJsonObject, json: &CanonicalJsonObject,
count: u64, count: u64,
@ -66,7 +66,7 @@ pub(crate) trait Data: Send + Sync {
// Adds a new pdu to the backfilled timeline // Adds a new pdu to the backfilled timeline
fn prepend_backfill_pdu( fn prepend_backfill_pdu(
&self, &self,
pdu_id: &[u8], pdu_id: &PduId,
event_id: &EventId, event_id: &EventId,
json: &CanonicalJsonObject, json: &CanonicalJsonObject,
) -> Result<()>; ) -> Result<()>;
@ -74,7 +74,7 @@ pub(crate) trait Data: Send + Sync {
/// Removes a pdu and creates a new one with the same id. /// Removes a pdu and creates a new one with the same id.
fn replace_pdu( fn replace_pdu(
&self, &self,
pdu_id: &[u8], pdu_id: &PduId,
pdu_json: &CanonicalJsonObject, pdu_json: &CanonicalJsonObject,
pdu: &PduEvent, pdu: &PduEvent,
) -> Result<()>; ) -> Result<()>;

View file

@ -37,6 +37,7 @@ use tokio::{
}; };
use tracing::{debug, error, warn, Span}; use tracing::{debug, error, warn, Span};
use super::rooms::timeline::PduId;
use crate::{ use crate::{
api::{appservice_server, server_server}, api::{appservice_server, server_server},
services, services,
@ -83,7 +84,7 @@ impl Destination {
#[derive(Clone, Debug, PartialEq, Eq, Hash)] #[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub(crate) enum SendingEventType { pub(crate) enum SendingEventType {
// pduid // pduid
Pdu(Vec<u8>), Pdu(PduId),
// pdu json // pdu json
Edu(Vec<u8>), Edu(Vec<u8>),
} }
@ -565,7 +566,7 @@ impl Service {
#[tracing::instrument(skip(self, pdu_id, user, pushkey))] #[tracing::instrument(skip(self, pdu_id, user, pushkey))]
pub(crate) fn send_push_pdu( pub(crate) fn send_push_pdu(
&self, &self,
pdu_id: &[u8], pdu_id: &PduId,
user: &UserId, user: &UserId,
pushkey: String, pushkey: String,
) -> Result<()> { ) -> Result<()> {
@ -589,7 +590,7 @@ impl Service {
pub(crate) fn send_pdu<I: Iterator<Item = OwnedServerName>>( pub(crate) fn send_pdu<I: Iterator<Item = OwnedServerName>>(
&self, &self,
servers: I, servers: I,
pdu_id: &[u8], pdu_id: &PduId,
) -> Result<()> { ) -> Result<()> {
let requests = servers let requests = servers
.into_iter() .into_iter()
@ -644,7 +645,7 @@ impl Service {
pub(crate) fn send_pdu_appservice( pub(crate) fn send_pdu_appservice(
&self, &self,
appservice_id: String, appservice_id: String,
pdu_id: Vec<u8>, pdu_id: PduId,
) -> Result<()> { ) -> Result<()> {
let destination = Destination::Appservice(appservice_id); let destination = Destination::Appservice(appservice_id);
let event_type = SendingEventType::Pdu(pdu_id); let event_type = SendingEventType::Pdu(pdu_id);
@ -758,8 +759,8 @@ async fn handle_appservice_event(
&events &events
.iter() .iter()
.map(|e| match e { .map(|e| match e {
SendingEventType::Edu(b) SendingEventType::Edu(b) => &**b,
| SendingEventType::Pdu(b) => &**b, SendingEventType::Pdu(b) => b.as_bytes(),
}) })
.collect::<Vec<_>>(), .collect::<Vec<_>>(),
)) ))
@ -905,8 +906,8 @@ async fn handle_federation_event(
&events &events
.iter() .iter()
.map(|e| match e { .map(|e| match e {
SendingEventType::Edu(b) SendingEventType::Edu(b) => &**b,
| SendingEventType::Pdu(b) => &**b, SendingEventType::Pdu(b) => b.as_bytes(),
}) })
.collect::<Vec<_>>(), .collect::<Vec<_>>(),
)) ))

View file

@ -113,14 +113,14 @@ pub(crate) fn calculate_hash(keys: &[&[u8]]) -> Vec<u8> {
hash.as_ref().to_owned() hash.as_ref().to_owned()
} }
pub(crate) fn common_elements<I, F>( pub(crate) fn common_elements<I, T, F>(
mut iterators: I, mut iterators: I,
check_order: F, check_order: F,
) -> Option<impl Iterator<Item = Vec<u8>>> ) -> Option<impl Iterator<Item = T>>
where where
I: Iterator, I: Iterator,
I::Item: Iterator<Item = Vec<u8>>, I::Item: Iterator<Item = T>,
F: Fn(&[u8], &[u8]) -> Ordering, F: Fn(&T, &T) -> Ordering,
{ {
let first_iterator = iterators.next()?; let first_iterator = iterators.next()?;
let mut other_iterators = let mut other_iterators =