Merge branch 'pdu-handling' into 'main'

Draft: fix: don't fail PDU/EDU push requests

See merge request matrix/grapevine!136
This commit is contained in:
mikoto 2025-09-18 02:40:01 +00:00
commit 53fc41ecd0

View file

@ -749,35 +749,29 @@ pub(crate) async fn send_transaction_message_route(
let pub_key_map = RwLock::new(BTreeMap::new());
for pdu in &body.pdus {
let value: CanonicalJsonObject = serde_json::from_str(pdu.get())
.map_err(|error| {
warn!(%error, object = ?pdu, "Error parsing incoming event");
Error::BadServerResponse("Invalid PDU in server response")
})?;
let room_id: OwnedRoomId = value
.get("room_id")
.and_then(|id| RoomId::parse(id.as_str()?).ok())
.ok_or(Error::BadRequest(
ErrorKind::InvalidParam,
"Invalid room id in pdu",
))?;
if services()
.rooms
.state
.get_create_content::<ExtractVersion>(&room_id)
.is_err()
{
debug!(%room_id, "This server is not in the room");
let Some(res) = handle_pdu_helper(pdu).transpose() else {
continue;
}
};
let r = parse_incoming_pdu(pdu);
let (event_id, value, room_id) = match r {
#[allow(clippy::manual_let_else)]
let (event_id, value, room_id) = match res {
Ok(t) => t,
Err(error) => {
warn!(%error, object = ?pdu, "Error parsing incoming event");
continue;
// TODO: should we just ignore since we cannot assign an event ID
// here?
Err(_) => {
// TODO: or do we add a fallback?
// let Ok((event_id, value)) =
// gen_event_id_canonical_json(pdu,
// &ruma::RoomVersionId::V10) else {
// // Event could not be converted to canonical json
// return Err(Error::BadRequest(
// ErrorKind::InvalidParam,
// "Could not convert event to canonical json.",
// ));
// };
todo!()
}
};
// We do not add the event_id field to the pdu here because of signature
@ -827,220 +821,11 @@ pub(crate) async fn send_transaction_message_route(
.iter()
.filter_map(|edu| serde_json::from_str::<Edu>(edu.json().get()).ok())
{
match edu {
Edu::Receipt(receipt) => {
for (room_id, room_updates) in receipt.receipts {
for (user_id, user_updates) in room_updates.read {
if user_id.server_name() != sender_servername {
warn!(
%user_id,
%sender_servername,
"Got receipt EDU from incorrect homeserver, \
ignoring",
);
continue;
}
if let Some((event_id, _)) = user_updates
.event_ids
.iter()
.filter_map(|id| {
services()
.rooms
.timeline
.get_pdu_count(id)
.ok()
.flatten()
.map(|r| (id, r))
})
.max_by_key(|(_, count)| *count)
{
let mut user_receipts = BTreeMap::new();
user_receipts
.insert(user_id.clone(), user_updates.data);
let mut receipts = BTreeMap::new();
receipts.insert(ReceiptType::Read, user_receipts);
let mut receipt_content = BTreeMap::new();
receipt_content
.insert(event_id.to_owned(), receipts);
let event = ReceiptEvent {
content: ReceiptEventContent(receipt_content),
room_id: room_id.clone(),
};
services()
.rooms
.edus
.read_receipt
.readreceipt_update(
&user_id, &room_id, event,
)?;
} else {
// TODO fetch missing events
debug!(
?user_updates,
"No known event ids in read receipt",
);
}
}
}
}
Edu::Typing(typing) => {
if typing.user_id.server_name() != sender_servername {
warn!(
user_id = %typing.user_id,
%sender_servername,
"Got typing EDU from incorrect homeserver, ignoring",
);
continue;
}
if services()
.rooms
.state_cache
.is_joined(&typing.user_id, &typing.room_id)?
{
if typing.typing {
services()
.rooms
.edus
.typing
.typing_add(
&typing.user_id,
&typing.room_id,
3000 + utils::millis_since_unix_epoch(),
)
.await?;
} else {
services()
.rooms
.edus
.typing
.typing_remove(&typing.user_id, &typing.room_id)
.await?;
}
}
}
Edu::DeviceListUpdate(DeviceListUpdateContent {
user_id,
..
}) => {
if user_id.server_name() != sender_servername {
warn!(
%user_id,
%sender_servername,
"Got device list update EDU from incorrect homeserver, \
ignoring",
);
continue;
}
services().users.mark_device_key_update(&user_id)?;
}
Edu::DirectToDevice(DirectDeviceContent {
sender,
ev_type,
message_id,
messages,
}) => {
if sender.server_name() != sender_servername {
warn!(
user_id = %sender,
%sender_servername,
"Got direct-to-device EDU from incorrect homeserver, \
ignoring",
);
continue;
}
// Check if this is a new transaction id
if services()
.transaction_ids
.existing_txnid(&sender, None, &message_id)?
.is_none()
{
for (target_user_id, map) in &messages {
for (target_device_id_maybe, event) in map {
match target_device_id_maybe {
DeviceIdOrAllDevices::DeviceId(
target_device_id,
) => services().users.add_to_device_event(
&sender,
target_user_id,
target_device_id,
&ev_type.to_string(),
event.deserialize_as().map_err(
|error| {
warn!(
%error,
object = ?event.json(),
"To-Device event is invalid",
);
Error::BadRequest(
ErrorKind::InvalidParam,
"Event is invalid",
)
},
)?,
)?,
DeviceIdOrAllDevices::AllDevices => {
for target_device_id in services()
.users
.all_device_ids(target_user_id)
{
services().users.add_to_device_event(
&sender,
target_user_id,
&target_device_id?,
&ev_type.to_string(),
event.deserialize_as().map_err(
|_| {
Error::BadRequest(
ErrorKind::InvalidParam,
"Event is invalid",
)
},
)?,
)?;
}
}
}
}
}
// Save transaction id with empty data
services().transaction_ids.add_txnid(
&sender,
None,
&message_id,
&[],
)?;
}
}
Edu::SigningKeyUpdate(SigningKeyUpdateContent {
user_id,
master_key,
self_signing_key,
}) => {
if user_id.server_name() != sender_servername {
warn!(
%user_id,
%sender_servername,
"Got signing key update from incorrect homeserver, \
ignoring",
);
continue;
}
if let Some(master_key) = master_key {
services().users.add_cross_signing_keys(
&user_id,
&master_key,
self_signing_key.as_ref(),
None,
true,
)?;
}
}
Edu::_Custom(_) | Edu::Presence(_) => {}
#[allow(clippy::redundant_pattern_matching)]
if let Err(_) = handle_edu_helper(sender_servername, edu).await {
// TODO: EDUs should not return their errors in the response, so we
// can safely ignore?
todo!();
}
}
@ -2169,6 +1954,260 @@ pub(crate) async fn media_thumbnail_route(
}))
}
fn handle_pdu_helper(
pdu: &RawJsonValue,
) -> Result<Option<(OwnedEventId, CanonicalJsonObject, OwnedRoomId)>> {
let value: CanonicalJsonObject =
serde_json::from_str(pdu.get()).map_err(|error| {
warn!(%error, object = ?pdu, "Error parsing incoming event");
Error::BadServerResponse("Invalid PDU in server response")
})?;
let room_id: OwnedRoomId = value
.get("room_id")
.and_then(|id| RoomId::parse(id.as_str()?).ok())
.ok_or(Error::BadRequest(
ErrorKind::InvalidParam,
"Invalid room id in pdu",
))?;
if services()
.rooms
.state
.get_create_content::<ExtractVersion>(&room_id)
.is_err()
{
debug!(%room_id, "This server is not in the room");
return Ok(None);
}
match parse_incoming_pdu(pdu) {
Ok(t) => Ok(Some(t)),
Err(error) => {
warn!(%error, object = ?pdu, "Error parsing incoming event");
Ok(None)
}
}
}
#[allow(clippy::too_many_lines)]
async fn handle_edu_helper(
sender_servername: &ServerName,
edu: Edu,
) -> Result<()> {
match edu {
Edu::Receipt(receipt) => {
for (room_id, room_updates) in receipt.receipts {
for (user_id, user_updates) in room_updates.read {
if user_id.server_name() != sender_servername {
warn!(
%user_id,
%sender_servername,
"Got receipt EDU from incorrect homeserver, \
ignoring",
);
continue;
}
if let Some((event_id, _)) = user_updates
.event_ids
.iter()
.filter_map(|id| {
services()
.rooms
.timeline
.get_pdu_count(id)
.ok()
.flatten()
.map(|r| (id, r))
})
.max_by_key(|(_, count)| *count)
{
let mut user_receipts = BTreeMap::new();
user_receipts
.insert(user_id.clone(), user_updates.data);
let mut receipts = BTreeMap::new();
receipts.insert(ReceiptType::Read, user_receipts);
let mut receipt_content = BTreeMap::new();
receipt_content.insert(event_id.to_owned(), receipts);
let event = ReceiptEvent {
content: ReceiptEventContent(receipt_content),
room_id: room_id.clone(),
};
services()
.rooms
.edus
.read_receipt
.readreceipt_update(&user_id, &room_id, event)?;
} else {
// TODO fetch missing events
debug!(
?user_updates,
"No known event ids in read receipt",
);
}
}
}
}
Edu::Typing(typing) => {
if typing.user_id.server_name() != sender_servername {
warn!(
user_id = %typing.user_id,
%sender_servername,
"Got typing EDU from incorrect homeserver, ignoring",
);
return Ok(());
}
if services()
.rooms
.state_cache
.is_joined(&typing.user_id, &typing.room_id)?
{
if typing.typing {
services()
.rooms
.edus
.typing
.typing_add(
&typing.user_id,
&typing.room_id,
3000 + utils::millis_since_unix_epoch(),
)
.await?;
} else {
services()
.rooms
.edus
.typing
.typing_remove(&typing.user_id, &typing.room_id)
.await?;
}
}
}
Edu::DeviceListUpdate(DeviceListUpdateContent {
user_id,
..
}) => {
if user_id.server_name() != sender_servername {
warn!(
%user_id,
%sender_servername,
"Got device list update EDU from incorrect homeserver, \
ignoring",
);
return Ok(());
}
services().users.mark_device_key_update(&user_id)?;
}
Edu::DirectToDevice(DirectDeviceContent {
sender,
ev_type,
message_id,
messages,
}) => {
if sender.server_name() != sender_servername {
warn!(
user_id = %sender,
%sender_servername,
"Got direct-to-device EDU from incorrect homeserver, \
ignoring",
);
return Ok(());
}
// Check if this is a new transaction id
if services()
.transaction_ids
.existing_txnid(&sender, None, &message_id)?
.is_none()
{
for (target_user_id, map) in &messages {
for (target_device_id_maybe, event) in map {
match target_device_id_maybe {
DeviceIdOrAllDevices::DeviceId(
target_device_id,
) => services().users.add_to_device_event(
&sender,
target_user_id,
target_device_id,
&ev_type.to_string(),
event.deserialize_as().map_err(|error| {
warn!(
%error,
object = ?event.json(),
"To-Device event is invalid",
);
Error::BadRequest(
ErrorKind::InvalidParam,
"Event is invalid",
)
})?,
)?,
DeviceIdOrAllDevices::AllDevices => {
for target_device_id in services()
.users
.all_device_ids(target_user_id)
{
services().users.add_to_device_event(
&sender,
target_user_id,
&target_device_id?,
&ev_type.to_string(),
event.deserialize_as().map_err(
|_| {
Error::BadRequest(
ErrorKind::InvalidParam,
"Event is invalid",
)
},
)?,
)?;
}
}
}
}
}
// Save transaction id with empty data
services().transaction_ids.add_txnid(
&sender,
None,
&message_id,
&[],
)?;
}
}
Edu::SigningKeyUpdate(SigningKeyUpdateContent {
user_id,
master_key,
self_signing_key,
}) => {
if user_id.server_name() != sender_servername {
warn!(
%user_id,
%sender_servername,
"Got signing key update from incorrect homeserver, \
ignoring",
);
return Ok(());
}
if let Some(master_key) = master_key {
services().users.add_cross_signing_keys(
&user_id,
&master_key,
self_signing_key.as_ref(),
None,
true,
)?;
}
}
Edu::_Custom(_) | Edu::Presence(_) => {}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::{add_port_to_hostname, get_ip_with_port, FedDest};