event_handler: remove AsyncRecursiveType alias, simplify signatures

This commit is contained in:
Lambda 2024-09-01 11:34:14 +00:00
parent 542e097cdf
commit 22ce624a81

View file

@ -1,14 +1,10 @@
/// An async function that can recursively call itself.
type AsyncRecursiveType<'a, T> = Pin<Box<dyn Future<Output = T> + 'a + Send>>;
use std::{ use std::{
collections::{hash_map, BTreeMap, HashMap, HashSet}, collections::{hash_map, BTreeMap, HashMap, HashSet},
pin::Pin,
sync::Arc, sync::Arc,
time::{Duration, Instant, SystemTime}, time::{Duration, Instant, SystemTime},
}; };
use futures_util::{stream::FuturesUnordered, Future, StreamExt}; use futures_util::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
use ruma::{ use ruma::{
api::{ api::{
client::error::ErrorKind, client::error::ErrorKind,
@ -84,15 +80,13 @@ impl Service {
/// 13. Use state resolution to find new room state /// 13. Use state resolution to find new room state
/// 14. Check if the event passes auth based on the "current state" of the /// 14. Check if the event passes auth based on the "current state" of the
/// room, if not soft fail it /// room, if not soft fail it
// We use some AsyncRecursiveType hacks here so we can call this async
// funtion recursively
#[tracing::instrument(skip(self, value, is_timeline_event, pub_key_map))] #[tracing::instrument(skip(self, value, is_timeline_event, pub_key_map))]
pub(crate) async fn handle_incoming_pdu<'a>( pub(crate) async fn handle_incoming_pdu<'a>(
&self, &self,
origin: &'a ServerName, origin: &'a ServerName,
event_id: &'a EventId, event_id: &'a EventId,
room_id: &'a RoomId, room_id: &'a RoomId,
value: BTreeMap<String, CanonicalJsonValue>, value: CanonicalJsonObject,
is_timeline_event: bool, is_timeline_event: bool,
pub_key_map: &'a RwLock<BTreeMap<String, SigningKeys>>, pub_key_map: &'a RwLock<BTreeMap<String, SigningKeys>>,
) -> Result<Option<Vec<u8>>> { ) -> Result<Option<Vec<u8>>> {
@ -317,7 +311,7 @@ impl Service {
r r
} }
#[allow(clippy::type_complexity, clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
#[tracing::instrument(skip(self, origin, room_id, value, pub_key_map))] #[tracing::instrument(skip(self, origin, room_id, value, pub_key_map))]
fn handle_outlier_pdu<'a>( fn handle_outlier_pdu<'a>(
&'a self, &'a self,
@ -325,13 +319,10 @@ impl Service {
create_event: &'a PduEvent, create_event: &'a PduEvent,
event_id: &'a EventId, event_id: &'a EventId,
room_id: &'a RoomId, room_id: &'a RoomId,
mut value: BTreeMap<String, CanonicalJsonValue>, mut value: CanonicalJsonObject,
auth_events_known: bool, auth_events_known: bool,
pub_key_map: &'a RwLock<BTreeMap<String, SigningKeys>>, pub_key_map: &'a RwLock<BTreeMap<String, SigningKeys>>,
) -> AsyncRecursiveType< ) -> BoxFuture<'a, Result<(Arc<PduEvent>, CanonicalJsonObject)>> {
'a,
Result<(Arc<PduEvent>, BTreeMap<String, CanonicalJsonValue>)>,
> {
Box::pin(async move { Box::pin(async move {
// 1.1. Remove unsigned field // 1.1. Remove unsigned field
value.remove("unsigned"); value.remove("unsigned");
@ -571,7 +562,7 @@ impl Service {
pub(crate) async fn upgrade_outlier_to_timeline_pdu( pub(crate) async fn upgrade_outlier_to_timeline_pdu(
&self, &self,
incoming_pdu: Arc<PduEvent>, incoming_pdu: Arc<PduEvent>,
val: BTreeMap<String, CanonicalJsonValue>, val: CanonicalJsonObject,
create_event: &PduEvent, create_event: &PduEvent,
origin: &ServerName, origin: &ServerName,
room_id: &RoomId, room_id: &RoomId,
@ -1237,7 +1228,6 @@ impl Service {
/// b. Look at outlier pdu tree /// b. Look at outlier pdu tree
/// c. Ask origin server over federation /// c. Ask origin server over federation
/// d. TODO: Ask other servers over federation? /// d. TODO: Ask other servers over federation?
#[allow(clippy::type_complexity)]
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub(crate) fn fetch_and_handle_outliers<'a>( pub(crate) fn fetch_and_handle_outliers<'a>(
&'a self, &'a self,
@ -1247,10 +1237,7 @@ impl Service {
room_id: &'a RoomId, room_id: &'a RoomId,
room_version_id: &'a RoomVersionId, room_version_id: &'a RoomVersionId,
pub_key_map: &'a RwLock<BTreeMap<String, SigningKeys>>, pub_key_map: &'a RwLock<BTreeMap<String, SigningKeys>>,
) -> AsyncRecursiveType< ) -> BoxFuture<'a, Vec<(Arc<PduEvent>, Option<CanonicalJsonObject>)>> {
'a,
Vec<(Arc<PduEvent>, Option<BTreeMap<String, CanonicalJsonValue>>)>,
> {
Box::pin(async move { Box::pin(async move {
let back_off = |id| async move { let back_off = |id| async move {
match services() match services()
@ -1462,10 +1449,7 @@ impl Service {
initial_set: Vec<Arc<EventId>>, initial_set: Vec<Arc<EventId>>,
) -> Result<( ) -> Result<(
Vec<Arc<EventId>>, Vec<Arc<EventId>>,
HashMap< HashMap<Arc<EventId>, (Arc<PduEvent>, CanonicalJsonObject)>,
Arc<EventId>,
(Arc<PduEvent>, BTreeMap<String, CanonicalJsonValue>),
>,
)> { )> {
let mut graph: HashMap<Arc<EventId>, _> = HashMap::new(); let mut graph: HashMap<Arc<EventId>, _> = HashMap::new();
let mut eventid_info = HashMap::new(); let mut eventid_info = HashMap::new();
@ -1560,7 +1544,7 @@ impl Service {
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub(crate) async fn fetch_required_signing_keys( pub(crate) async fn fetch_required_signing_keys(
&self, &self,
event: &BTreeMap<String, CanonicalJsonValue>, event: &CanonicalJsonObject,
pub_key_map: &RwLock<BTreeMap<String, SigningKeys>>, pub_key_map: &RwLock<BTreeMap<String, SigningKeys>>,
) -> Result<()> { ) -> Result<()> {
let signatures = event let signatures = event