From dd24a441121b94d389fb46f08c7ec51886d5aa32 Mon Sep 17 00:00:00 2001 From: Lambda Date: Sun, 23 Jun 2024 20:12:38 +0000 Subject: [PATCH] Use TokenSet for roomid_mutex_federation --- src/api/server_server.rs | 34 ++++++++++++---------------------- src/service/globals.rs | 6 ++++-- src/service/rooms/timeline.rs | 17 ++++++----------- 3 files changed, 22 insertions(+), 35 deletions(-) diff --git a/src/api/server_server.rs b/src/api/server_server.rs index 2208c693..890d9997 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -742,16 +742,11 @@ pub(crate) async fn send_transaction_message_route( // We do not add the event_id field to the pdu here because of signature // and hashes checks - let mutex = Arc::clone( - services() - .globals - .roomid_mutex_federation - .write() - .await - .entry(room_id.clone()) - .or_default(), - ); - let mutex_lock = mutex.lock().await; + let federation_token = services() + .globals + .roomid_mutex_federation + .lock_key(room_id.clone()) + .await; let start_time = Instant::now(); resolved_map.insert( event_id.clone(), @@ -769,7 +764,7 @@ pub(crate) async fn send_transaction_message_route( .await .map(|_| ()), ); - drop(mutex_lock); + drop(federation_token); debug!( %event_id, @@ -1619,16 +1614,11 @@ async fn create_join_event( Error::BadRequest(ErrorKind::InvalidParam, "Origin field is invalid.") })?; - let mutex = Arc::clone( - services() - .globals - .roomid_mutex_federation - .write() - .await - .entry(room_id.to_owned()) - .or_default(), - ); - let mutex_lock = mutex.lock().await; + let federation_token = services() + .globals + .roomid_mutex_federation + .lock_key(room_id.to_owned()) + .await; let pdu_id: Vec = services() .rooms .event_handler @@ -1645,7 +1635,7 @@ async fn create_join_event( ErrorKind::InvalidParam, "Could not accept incoming PDU as timeline event.", ))?; - drop(mutex_lock); + drop(federation_token); let state_ids = services().rooms.state_accessor.state_full_ids(shortstatehash).await?; diff --git a/src/service/globals.rs b/src/service/globals.rs index 662d6da4..7ef6fbb7 100644 --- a/src/service/globals.rs +++ b/src/service/globals.rs @@ -84,7 +84,7 @@ pub(crate) struct Service { // this lock will be held longer pub(crate) roomid_mutex_federation: - RwLock>>>, + TokenSet, pub(crate) roomid_federationhandletime: RwLock>, pub(crate) stateres_mutex: Arc>, @@ -281,7 +281,9 @@ impl Service { roomid_mutex_insert: TokenSet::new( "roomid_mutex_insert".to_owned(), ), - roomid_mutex_federation: RwLock::new(HashMap::new()), + roomid_mutex_federation: TokenSet::new( + "roomid_mutex_federation".to_owned(), + ), roomid_federationhandletime: RwLock::new(HashMap::new()), stateres_mutex: Arc::new(Mutex::new(())), rotate: RotationHandler::new(), diff --git a/src/service/rooms/timeline.rs b/src/service/rooms/timeline.rs index a8472811..b325f198 100644 --- a/src/service/rooms/timeline.rs +++ b/src/service/rooms/timeline.rs @@ -1284,16 +1284,11 @@ impl Service { server_server::parse_incoming_pdu(&pdu)?; // Lock so we cannot backfill the same pdu twice at the same time - let mutex = Arc::clone( - services() - .globals - .roomid_mutex_federation - .write() - .await - .entry(room_id.clone()) - .or_default(), - ); - let mutex_lock = mutex.lock().await; + let federation_token = services() + .globals + .roomid_mutex_federation + .lock_key(room_id.clone()) + .await; // Skip the PDU if we already have it as a timeline event if let Some(pdu_id) = services().rooms.timeline.get_pdu_id(&event_id)? { @@ -1359,7 +1354,7 @@ impl Service { )?; } } - drop(mutex_lock); + drop(federation_token); info!("Prepended backfill pdu"); Ok(())