Use TokenSet for roomid_mutex_insert

This commit is contained in:
Lambda 2024-06-23 20:07:54 +00:00
parent 34ccb2cd06
commit 4893c54f4f
3 changed files with 34 additions and 59 deletions

View file

@ -1,6 +1,5 @@
use std::{
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
sync::Arc,
time::Duration,
};
@ -161,17 +160,12 @@ pub(crate) async fn sync_events_route(
{
// Get and drop the lock to wait for remaining operations to finish
let mutex_insert = Arc::clone(
services()
let room_token = services()
.globals
.roomid_mutex_insert
.write()
.await
.entry(room_id.clone())
.or_default(),
);
let insert_lock = mutex_insert.lock().await;
drop(insert_lock);
.lock_key(room_id.clone())
.await;
drop(room_token);
}
let left_count = services()
@ -330,17 +324,12 @@ pub(crate) async fn sync_events_route(
{
// Get and drop the lock to wait for remaining operations to finish
let mutex_insert = Arc::clone(
services()
let room_token = services()
.globals
.roomid_mutex_insert
.write()
.await
.entry(room_id.clone())
.or_default(),
);
let insert_lock = mutex_insert.lock().await;
drop(insert_lock);
.lock_key(room_id.clone())
.await;
drop(room_token);
}
let invite_count = services()
@ -481,17 +470,12 @@ async fn load_joined_room(
{
// Get and drop the lock to wait for remaining operations to finish
// This will make sure the we have all events until next_batch
let mutex_insert = Arc::clone(
services()
let room_token = services()
.globals
.roomid_mutex_insert
.write()
.await
.entry(room_id.to_owned())
.or_default(),
);
let insert_lock = mutex_insert.lock().await;
drop(insert_lock);
.lock_key(room_id.to_owned())
.await;
drop(room_token);
}
let (timeline_pdus, limited) =

View file

@ -79,8 +79,7 @@ pub(crate) struct Service {
Arc<RwLock<HashMap<OwnedServerName, RateLimitState>>>,
pub(crate) servername_ratelimiter:
OnDemandHashMap<OwnedServerName, Semaphore>,
pub(crate) roomid_mutex_insert:
RwLock<HashMap<OwnedRoomId, Arc<Mutex<()>>>>,
pub(crate) roomid_mutex_insert: TokenSet<OwnedRoomId, marker::Insert>,
pub(crate) roomid_mutex_state: TokenSet<OwnedRoomId, marker::State>,
// this lock will be held longer
@ -279,7 +278,9 @@ impl Service {
"servername_ratelimiter".to_owned(),
),
roomid_mutex_state: TokenSet::new("roomid_mutex_state".to_owned()),
roomid_mutex_insert: RwLock::new(HashMap::new()),
roomid_mutex_insert: TokenSet::new(
"roomid_mutex_insert".to_owned(),
),
roomid_mutex_federation: RwLock::new(HashMap::new()),
roomid_federationhandletime: RwLock::new(HashMap::new()),
stateres_mutex: Arc::new(Mutex::new(())),

View file

@ -259,16 +259,11 @@ impl Service {
.mark_as_referenced(&pdu.room_id, &pdu.prev_events)?;
services().rooms.state.set_forward_extremities(room_id, leaves)?;
let mutex_insert = Arc::clone(
services()
let insert_token = services()
.globals
.roomid_mutex_insert
.write()
.await
.entry(pdu.room_id.clone())
.or_default(),
);
let insert_lock = mutex_insert.lock().await;
.lock_key(pdu.room_id.clone())
.await;
let count1 = services().globals.next_count()?;
// Mark as read first so the sending client doesn't get a notification
@ -290,7 +285,7 @@ impl Service {
// Insert pdu
self.db.append_pdu(&pdu_id, pdu, &pdu_json, count2)?;
drop(insert_lock);
drop(insert_token);
// See if the event matches any known pushers
let power_levels: RoomPowerLevelsEventContent = services()
@ -1328,16 +1323,11 @@ impl Service {
.get_shortroomid(&room_id)?
.expect("room exists");
let mutex_insert = Arc::clone(
services()
let insert_token = services()
.globals
.roomid_mutex_insert
.write()
.await
.entry(room_id.clone())
.or_default(),
);
let insert_lock = mutex_insert.lock().await;
.lock_key(room_id.clone())
.await;
let count = services().globals.next_count()?;
let mut pdu_id = shortroomid.to_be_bytes().to_vec();
@ -1347,7 +1337,7 @@ impl Service {
// Insert pdu
self.db.prepend_backfill_pdu(&pdu_id, &event_id, &value)?;
drop(insert_lock);
drop(insert_token);
if pdu.kind == TimelineEventType::RoomMessage {
#[derive(Deserialize)]