From 4893c54f4f6048f4be2215455eec9e09be51417f Mon Sep 17 00:00:00 2001 From: Lambda Date: Sun, 23 Jun 2024 20:07:54 +0000 Subject: [PATCH] Use TokenSet for roomid_mutex_insert --- src/api/client_server/sync.rs | 52 ++++++++++++----------------------- src/service/globals.rs | 7 +++-- src/service/rooms/timeline.rs | 34 ++++++++--------------- 3 files changed, 34 insertions(+), 59 deletions(-) diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index 64069f66..f2985301 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -1,6 +1,5 @@ use std::{ collections::{BTreeMap, BTreeSet, HashMap, HashSet}, - sync::Arc, time::Duration, }; @@ -161,17 +160,12 @@ pub(crate) async fn sync_events_route( { // Get and drop the lock to wait for remaining operations to finish - let mutex_insert = Arc::clone( - services() - .globals - .roomid_mutex_insert - .write() - .await - .entry(room_id.clone()) - .or_default(), - ); - let insert_lock = mutex_insert.lock().await; - drop(insert_lock); + let room_token = services() + .globals + .roomid_mutex_insert + .lock_key(room_id.clone()) + .await; + drop(room_token); } let left_count = services() @@ -330,17 +324,12 @@ pub(crate) async fn sync_events_route( { // Get and drop the lock to wait for remaining operations to finish - let mutex_insert = Arc::clone( - services() - .globals - .roomid_mutex_insert - .write() - .await - .entry(room_id.clone()) - .or_default(), - ); - let insert_lock = mutex_insert.lock().await; - drop(insert_lock); + let room_token = services() + .globals + .roomid_mutex_insert + .lock_key(room_id.clone()) + .await; + drop(room_token); } let invite_count = services() @@ -481,17 +470,12 @@ async fn load_joined_room( { // Get and drop the lock to wait for remaining operations to finish // This will make sure the we have all events until next_batch - let mutex_insert = Arc::clone( - services() - .globals - .roomid_mutex_insert - .write() - .await - .entry(room_id.to_owned()) - .or_default(), - ); - let insert_lock = mutex_insert.lock().await; - drop(insert_lock); + let room_token = services() + .globals + .roomid_mutex_insert + .lock_key(room_id.to_owned()) + .await; + drop(room_token); } let (timeline_pdus, limited) = diff --git a/src/service/globals.rs b/src/service/globals.rs index e8fe08d1..662d6da4 100644 --- a/src/service/globals.rs +++ b/src/service/globals.rs @@ -79,8 +79,7 @@ pub(crate) struct Service { Arc>>, pub(crate) servername_ratelimiter: OnDemandHashMap, - pub(crate) roomid_mutex_insert: - RwLock>>>, + pub(crate) roomid_mutex_insert: TokenSet, pub(crate) roomid_mutex_state: TokenSet, // this lock will be held longer @@ -279,7 +278,9 @@ impl Service { "servername_ratelimiter".to_owned(), ), roomid_mutex_state: TokenSet::new("roomid_mutex_state".to_owned()), - roomid_mutex_insert: RwLock::new(HashMap::new()), + roomid_mutex_insert: TokenSet::new( + "roomid_mutex_insert".to_owned(), + ), roomid_mutex_federation: RwLock::new(HashMap::new()), roomid_federationhandletime: RwLock::new(HashMap::new()), stateres_mutex: Arc::new(Mutex::new(())), diff --git a/src/service/rooms/timeline.rs b/src/service/rooms/timeline.rs index cc8e42d3..a8472811 100644 --- a/src/service/rooms/timeline.rs +++ b/src/service/rooms/timeline.rs @@ -259,16 +259,11 @@ impl Service { .mark_as_referenced(&pdu.room_id, &pdu.prev_events)?; services().rooms.state.set_forward_extremities(room_id, leaves)?; - let mutex_insert = Arc::clone( - services() - .globals - .roomid_mutex_insert - .write() - .await - .entry(pdu.room_id.clone()) - .or_default(), - ); - let insert_lock = mutex_insert.lock().await; + let insert_token = services() + .globals + .roomid_mutex_insert + .lock_key(pdu.room_id.clone()) + .await; let count1 = services().globals.next_count()?; // Mark as read first so the sending client doesn't get a notification @@ -290,7 +285,7 @@ impl Service { // Insert pdu self.db.append_pdu(&pdu_id, pdu, &pdu_json, count2)?; - drop(insert_lock); + drop(insert_token); // See if the event matches any known pushers let power_levels: RoomPowerLevelsEventContent = services() @@ -1328,16 +1323,11 @@ impl Service { .get_shortroomid(&room_id)? .expect("room exists"); - let mutex_insert = Arc::clone( - services() - .globals - .roomid_mutex_insert - .write() - .await - .entry(room_id.clone()) - .or_default(), - ); - let insert_lock = mutex_insert.lock().await; + let insert_token = services() + .globals + .roomid_mutex_insert + .lock_key(room_id.clone()) + .await; let count = services().globals.next_count()?; let mut pdu_id = shortroomid.to_be_bytes().to_vec(); @@ -1347,7 +1337,7 @@ impl Service { // Insert pdu self.db.prepend_backfill_pdu(&pdu_id, &event_id, &value)?; - drop(insert_lock); + drop(insert_token); if pdu.kind == TimelineEventType::RoomMessage { #[derive(Deserialize)]