From 3066a63c03857c5a14825101d167573ca8f04713 Mon Sep 17 00:00:00 2001 From: avdb13 Date: Sun, 20 Apr 2025 02:28:18 +0000 Subject: [PATCH] don't fail PDU/EDU push requests --- src/api/server_server.rs | 519 +++++++++++++++++++++------------------ 1 file changed, 279 insertions(+), 240 deletions(-) diff --git a/src/api/server_server.rs b/src/api/server_server.rs index 9c29b60b..b623a064 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -745,35 +745,29 @@ pub(crate) async fn send_transaction_message_route( let pub_key_map = RwLock::new(BTreeMap::new()); for pdu in &body.pdus { - let value: CanonicalJsonObject = serde_json::from_str(pdu.get()) - .map_err(|error| { - warn!(%error, object = ?pdu, "Error parsing incoming event"); - Error::BadServerResponse("Invalid PDU in server response") - })?; - let room_id: OwnedRoomId = value - .get("room_id") - .and_then(|id| RoomId::parse(id.as_str()?).ok()) - .ok_or(Error::BadRequest( - ErrorKind::InvalidParam, - "Invalid room id in pdu", - ))?; - - if services() - .rooms - .state - .get_create_content::(&room_id) - .is_err() - { - debug!(%room_id, "This server is not in the room"); + let Some(res) = handle_pdu_helper(pdu).transpose() else { continue; - } + }; - let r = parse_incoming_pdu(pdu); - let (event_id, value, room_id) = match r { + #[allow(clippy::manual_let_else)] + let (event_id, value, room_id) = match res { Ok(t) => t, - Err(error) => { - warn!(%error, object = ?pdu, "Error parsing incoming event"); - continue; + // TODO: should we just ignore since we cannot assign an event ID + // here? + Err(_) => { + // TODO: or do we add a fallback? + + // let Ok((event_id, value)) = + // gen_event_id_canonical_json(pdu, + // &ruma::RoomVersionId::V10) else { + // // Event could not be converted to canonical json + // return Err(Error::BadRequest( + // ErrorKind::InvalidParam, + // "Could not convert event to canonical json.", + // )); + // }; + + todo!() } }; // We do not add the event_id field to the pdu here because of signature @@ -823,220 +817,11 @@ pub(crate) async fn send_transaction_message_route( .iter() .filter_map(|edu| serde_json::from_str::(edu.json().get()).ok()) { - match edu { - Edu::Receipt(receipt) => { - for (room_id, room_updates) in receipt.receipts { - for (user_id, user_updates) in room_updates.read { - if user_id.server_name() != sender_servername { - warn!( - %user_id, - %sender_servername, - "Got receipt EDU from incorrect homeserver, \ - ignoring", - ); - continue; - } - if let Some((event_id, _)) = user_updates - .event_ids - .iter() - .filter_map(|id| { - services() - .rooms - .timeline - .get_pdu_count(id) - .ok() - .flatten() - .map(|r| (id, r)) - }) - .max_by_key(|(_, count)| *count) - { - let mut user_receipts = BTreeMap::new(); - user_receipts - .insert(user_id.clone(), user_updates.data); - - let mut receipts = BTreeMap::new(); - receipts.insert(ReceiptType::Read, user_receipts); - - let mut receipt_content = BTreeMap::new(); - receipt_content - .insert(event_id.to_owned(), receipts); - - let event = ReceiptEvent { - content: ReceiptEventContent(receipt_content), - room_id: room_id.clone(), - }; - services() - .rooms - .edus - .read_receipt - .readreceipt_update( - &user_id, &room_id, event, - )?; - } else { - // TODO fetch missing events - debug!( - ?user_updates, - "No known event ids in read receipt", - ); - } - } - } - } - Edu::Typing(typing) => { - if typing.user_id.server_name() != sender_servername { - warn!( - user_id = %typing.user_id, - %sender_servername, - "Got typing EDU from incorrect homeserver, ignoring", - ); - continue; - } - if services() - .rooms - .state_cache - .is_joined(&typing.user_id, &typing.room_id)? - { - if typing.typing { - services() - .rooms - .edus - .typing - .typing_add( - &typing.user_id, - &typing.room_id, - 3000 + utils::millis_since_unix_epoch(), - ) - .await?; - } else { - services() - .rooms - .edus - .typing - .typing_remove(&typing.user_id, &typing.room_id) - .await?; - } - } - } - Edu::DeviceListUpdate(DeviceListUpdateContent { - user_id, - .. - }) => { - if user_id.server_name() != sender_servername { - warn!( - %user_id, - %sender_servername, - "Got device list update EDU from incorrect homeserver, \ - ignoring", - ); - continue; - } - services().users.mark_device_key_update(&user_id)?; - } - Edu::DirectToDevice(DirectDeviceContent { - sender, - ev_type, - message_id, - messages, - }) => { - if sender.server_name() != sender_servername { - warn!( - user_id = %sender, - %sender_servername, - "Got direct-to-device EDU from incorrect homeserver, \ - ignoring", - ); - continue; - } - // Check if this is a new transaction id - if services() - .transaction_ids - .existing_txnid(&sender, None, &message_id)? - .is_none() - { - for (target_user_id, map) in &messages { - for (target_device_id_maybe, event) in map { - match target_device_id_maybe { - DeviceIdOrAllDevices::DeviceId( - target_device_id, - ) => services().users.add_to_device_event( - &sender, - target_user_id, - target_device_id, - &ev_type.to_string(), - event.deserialize_as().map_err( - |error| { - warn!( - %error, - object = ?event.json(), - "To-Device event is invalid", - ); - Error::BadRequest( - ErrorKind::InvalidParam, - "Event is invalid", - ) - }, - )?, - )?, - - DeviceIdOrAllDevices::AllDevices => { - for target_device_id in services() - .users - .all_device_ids(target_user_id) - { - services().users.add_to_device_event( - &sender, - target_user_id, - &target_device_id?, - &ev_type.to_string(), - event.deserialize_as().map_err( - |_| { - Error::BadRequest( - ErrorKind::InvalidParam, - "Event is invalid", - ) - }, - )?, - )?; - } - } - } - } - } - - // Save transaction id with empty data - services().transaction_ids.add_txnid( - &sender, - None, - &message_id, - &[], - )?; - } - } - Edu::SigningKeyUpdate(SigningKeyUpdateContent { - user_id, - master_key, - self_signing_key, - }) => { - if user_id.server_name() != sender_servername { - warn!( - %user_id, - %sender_servername, - "Got signing key update from incorrect homeserver, \ - ignoring", - ); - continue; - } - if let Some(master_key) = master_key { - services().users.add_cross_signing_keys( - &user_id, - &master_key, - self_signing_key.as_ref(), - None, - true, - )?; - } - } - Edu::_Custom(_) | Edu::Presence(_) => {} + #[allow(clippy::redundant_pattern_matching)] + if let Err(_) = handle_edu_helper(sender_servername, edu).await { + // TODO: EDUs should not return their errors in the response, so we + // can safely ignore? + todo!(); } } @@ -2156,6 +1941,260 @@ pub(crate) async fn media_thumbnail_route( })) } +fn handle_pdu_helper( + pdu: &RawJsonValue, +) -> Result> { + let value: CanonicalJsonObject = + serde_json::from_str(pdu.get()).map_err(|error| { + warn!(%error, object = ?pdu, "Error parsing incoming event"); + Error::BadServerResponse("Invalid PDU in server response") + })?; + let room_id: OwnedRoomId = value + .get("room_id") + .and_then(|id| RoomId::parse(id.as_str()?).ok()) + .ok_or(Error::BadRequest( + ErrorKind::InvalidParam, + "Invalid room id in pdu", + ))?; + + if services() + .rooms + .state + .get_create_content::(&room_id) + .is_err() + { + debug!(%room_id, "This server is not in the room"); + return Ok(None); + } + + match parse_incoming_pdu(pdu) { + Ok(t) => Ok(Some(t)), + Err(error) => { + warn!(%error, object = ?pdu, "Error parsing incoming event"); + Ok(None) + } + } +} + +#[allow(clippy::too_many_lines)] +async fn handle_edu_helper( + sender_servername: &ServerName, + edu: Edu, +) -> Result<()> { + match edu { + Edu::Receipt(receipt) => { + for (room_id, room_updates) in receipt.receipts { + for (user_id, user_updates) in room_updates.read { + if user_id.server_name() != sender_servername { + warn!( + %user_id, + %sender_servername, + "Got receipt EDU from incorrect homeserver, \ + ignoring", + ); + continue; + } + if let Some((event_id, _)) = user_updates + .event_ids + .iter() + .filter_map(|id| { + services() + .rooms + .timeline + .get_pdu_count(id) + .ok() + .flatten() + .map(|r| (id, r)) + }) + .max_by_key(|(_, count)| *count) + { + let mut user_receipts = BTreeMap::new(); + user_receipts + .insert(user_id.clone(), user_updates.data); + + let mut receipts = BTreeMap::new(); + receipts.insert(ReceiptType::Read, user_receipts); + + let mut receipt_content = BTreeMap::new(); + receipt_content.insert(event_id.to_owned(), receipts); + + let event = ReceiptEvent { + content: ReceiptEventContent(receipt_content), + room_id: room_id.clone(), + }; + services() + .rooms + .edus + .read_receipt + .readreceipt_update(&user_id, &room_id, event)?; + } else { + // TODO fetch missing events + debug!( + ?user_updates, + "No known event ids in read receipt", + ); + } + } + } + } + Edu::Typing(typing) => { + if typing.user_id.server_name() != sender_servername { + warn!( + user_id = %typing.user_id, + %sender_servername, + "Got typing EDU from incorrect homeserver, ignoring", + ); + return Ok(()); + } + if services() + .rooms + .state_cache + .is_joined(&typing.user_id, &typing.room_id)? + { + if typing.typing { + services() + .rooms + .edus + .typing + .typing_add( + &typing.user_id, + &typing.room_id, + 3000 + utils::millis_since_unix_epoch(), + ) + .await?; + } else { + services() + .rooms + .edus + .typing + .typing_remove(&typing.user_id, &typing.room_id) + .await?; + } + } + } + Edu::DeviceListUpdate(DeviceListUpdateContent { + user_id, + .. + }) => { + if user_id.server_name() != sender_servername { + warn!( + %user_id, + %sender_servername, + "Got device list update EDU from incorrect homeserver, \ + ignoring", + ); + return Ok(()); + } + services().users.mark_device_key_update(&user_id)?; + } + Edu::DirectToDevice(DirectDeviceContent { + sender, + ev_type, + message_id, + messages, + }) => { + if sender.server_name() != sender_servername { + warn!( + user_id = %sender, + %sender_servername, + "Got direct-to-device EDU from incorrect homeserver, \ + ignoring", + ); + return Ok(()); + } + // Check if this is a new transaction id + if services() + .transaction_ids + .existing_txnid(&sender, None, &message_id)? + .is_none() + { + for (target_user_id, map) in &messages { + for (target_device_id_maybe, event) in map { + match target_device_id_maybe { + DeviceIdOrAllDevices::DeviceId( + target_device_id, + ) => services().users.add_to_device_event( + &sender, + target_user_id, + target_device_id, + &ev_type.to_string(), + event.deserialize_as().map_err(|error| { + warn!( + %error, + object = ?event.json(), + "To-Device event is invalid", + ); + Error::BadRequest( + ErrorKind::InvalidParam, + "Event is invalid", + ) + })?, + )?, + + DeviceIdOrAllDevices::AllDevices => { + for target_device_id in services() + .users + .all_device_ids(target_user_id) + { + services().users.add_to_device_event( + &sender, + target_user_id, + &target_device_id?, + &ev_type.to_string(), + event.deserialize_as().map_err( + |_| { + Error::BadRequest( + ErrorKind::InvalidParam, + "Event is invalid", + ) + }, + )?, + )?; + } + } + } + } + } + + // Save transaction id with empty data + services().transaction_ids.add_txnid( + &sender, + None, + &message_id, + &[], + )?; + } + } + Edu::SigningKeyUpdate(SigningKeyUpdateContent { + user_id, + master_key, + self_signing_key, + }) => { + if user_id.server_name() != sender_servername { + warn!( + %user_id, + %sender_servername, + "Got signing key update from incorrect homeserver, \ + ignoring", + ); + return Ok(()); + } + if let Some(master_key) = master_key { + services().users.add_cross_signing_keys( + &user_id, + &master_key, + self_signing_key.as_ref(), + None, + true, + )?; + } + } + Edu::_Custom(_) | Edu::Presence(_) => {} + } + + Ok(()) +} + #[cfg(test)] mod tests { use super::{add_port_to_hostname, get_ip_with_port, FedDest};