Use TokenSet for roomid_mutex_federation

This commit is contained in:
Lambda 2024-06-23 20:12:38 +00:00
parent 4893c54f4f
commit dd24a44112
3 changed files with 22 additions and 35 deletions

View file

@ -742,16 +742,11 @@ pub(crate) async fn send_transaction_message_route(
// We do not add the event_id field to the pdu here because of signature // We do not add the event_id field to the pdu here because of signature
// and hashes checks // and hashes checks
let mutex = Arc::clone( let federation_token = services()
services() .globals
.globals .roomid_mutex_federation
.roomid_mutex_federation .lock_key(room_id.clone())
.write() .await;
.await
.entry(room_id.clone())
.or_default(),
);
let mutex_lock = mutex.lock().await;
let start_time = Instant::now(); let start_time = Instant::now();
resolved_map.insert( resolved_map.insert(
event_id.clone(), event_id.clone(),
@ -769,7 +764,7 @@ pub(crate) async fn send_transaction_message_route(
.await .await
.map(|_| ()), .map(|_| ()),
); );
drop(mutex_lock); drop(federation_token);
debug!( debug!(
%event_id, %event_id,
@ -1619,16 +1614,11 @@ async fn create_join_event(
Error::BadRequest(ErrorKind::InvalidParam, "Origin field is invalid.") Error::BadRequest(ErrorKind::InvalidParam, "Origin field is invalid.")
})?; })?;
let mutex = Arc::clone( let federation_token = services()
services() .globals
.globals .roomid_mutex_federation
.roomid_mutex_federation .lock_key(room_id.to_owned())
.write() .await;
.await
.entry(room_id.to_owned())
.or_default(),
);
let mutex_lock = mutex.lock().await;
let pdu_id: Vec<u8> = services() let pdu_id: Vec<u8> = services()
.rooms .rooms
.event_handler .event_handler
@ -1645,7 +1635,7 @@ async fn create_join_event(
ErrorKind::InvalidParam, ErrorKind::InvalidParam,
"Could not accept incoming PDU as timeline event.", "Could not accept incoming PDU as timeline event.",
))?; ))?;
drop(mutex_lock); drop(federation_token);
let state_ids = let state_ids =
services().rooms.state_accessor.state_full_ids(shortstatehash).await?; services().rooms.state_accessor.state_full_ids(shortstatehash).await?;

View file

@ -84,7 +84,7 @@ pub(crate) struct Service {
// this lock will be held longer // this lock will be held longer
pub(crate) roomid_mutex_federation: pub(crate) roomid_mutex_federation:
RwLock<HashMap<OwnedRoomId, Arc<Mutex<()>>>>, TokenSet<OwnedRoomId, marker::Federation>,
pub(crate) roomid_federationhandletime: pub(crate) roomid_federationhandletime:
RwLock<HashMap<OwnedRoomId, (OwnedEventId, Instant)>>, RwLock<HashMap<OwnedRoomId, (OwnedEventId, Instant)>>,
pub(crate) stateres_mutex: Arc<Mutex<()>>, pub(crate) stateres_mutex: Arc<Mutex<()>>,
@ -281,7 +281,9 @@ impl Service {
roomid_mutex_insert: TokenSet::new( roomid_mutex_insert: TokenSet::new(
"roomid_mutex_insert".to_owned(), "roomid_mutex_insert".to_owned(),
), ),
roomid_mutex_federation: RwLock::new(HashMap::new()), roomid_mutex_federation: TokenSet::new(
"roomid_mutex_federation".to_owned(),
),
roomid_federationhandletime: RwLock::new(HashMap::new()), roomid_federationhandletime: RwLock::new(HashMap::new()),
stateres_mutex: Arc::new(Mutex::new(())), stateres_mutex: Arc::new(Mutex::new(())),
rotate: RotationHandler::new(), rotate: RotationHandler::new(),

View file

@ -1284,16 +1284,11 @@ impl Service {
server_server::parse_incoming_pdu(&pdu)?; server_server::parse_incoming_pdu(&pdu)?;
// Lock so we cannot backfill the same pdu twice at the same time // Lock so we cannot backfill the same pdu twice at the same time
let mutex = Arc::clone( let federation_token = services()
services() .globals
.globals .roomid_mutex_federation
.roomid_mutex_federation .lock_key(room_id.clone())
.write() .await;
.await
.entry(room_id.clone())
.or_default(),
);
let mutex_lock = mutex.lock().await;
// Skip the PDU if we already have it as a timeline event // Skip the PDU if we already have it as a timeline event
if let Some(pdu_id) = services().rooms.timeline.get_pdu_id(&event_id)? { if let Some(pdu_id) = services().rooms.timeline.get_pdu_id(&event_id)? {
@ -1359,7 +1354,7 @@ impl Service {
)?; )?;
} }
} }
drop(mutex_lock); drop(federation_token);
info!("Prepended backfill pdu"); info!("Prepended backfill pdu");
Ok(()) Ok(())