mirror of
https://gitlab.computer.surgery/matrix/grapevine.git
synced 2025-12-17 07:41:23 +01:00
fix api/server_server events
This commit is contained in:
parent
db666fe903
commit
9d8e1a1490
1 changed files with 65 additions and 57 deletions
|
|
@ -180,10 +180,11 @@ where
|
||||||
SendAccessToken::IfRequired(""),
|
SendAccessToken::IfRequired(""),
|
||||||
&[MatrixVersion::V1_4],
|
&[MatrixVersion::V1_4],
|
||||||
)
|
)
|
||||||
.map_err(|e| {
|
.map_err(|error| {
|
||||||
warn!(
|
warn!(
|
||||||
"Failed to find destination {}: {}",
|
%error,
|
||||||
actual_destination_str, e
|
actual_destination = actual_destination_str,
|
||||||
|
"Failed to find destination",
|
||||||
);
|
);
|
||||||
Error::BadServerResponse("Invalid destination")
|
Error::BadServerResponse("Invalid destination")
|
||||||
})?;
|
})?;
|
||||||
|
|
@ -278,8 +279,8 @@ where
|
||||||
|
|
||||||
debug!("Getting response bytes");
|
debug!("Getting response bytes");
|
||||||
// TODO: handle timeout
|
// TODO: handle timeout
|
||||||
let body = response.bytes().await.unwrap_or_else(|e| {
|
let body = response.bytes().await.unwrap_or_else(|error| {
|
||||||
warn!("server error {}", e);
|
warn!(%error, "Server error");
|
||||||
Vec::new().into()
|
Vec::new().into()
|
||||||
});
|
});
|
||||||
debug!("Got response bytes");
|
debug!("Got response bytes");
|
||||||
|
|
@ -344,11 +345,11 @@ fn add_port_to_hostname(destination_str: &str) -> FedDest {
|
||||||
/// Numbers in comments below refer to bullet points in linked section of
|
/// Numbers in comments below refer to bullet points in linked section of
|
||||||
/// specification
|
/// specification
|
||||||
#[allow(clippy::too_many_lines)]
|
#[allow(clippy::too_many_lines)]
|
||||||
#[tracing::instrument(skip(destination), ret(level = "debug"))]
|
#[tracing::instrument(ret(level = "debug"))]
|
||||||
async fn find_actual_destination(
|
async fn find_actual_destination(
|
||||||
destination: &'_ ServerName,
|
destination: &'_ ServerName,
|
||||||
) -> (FedDest, FedDest) {
|
) -> (FedDest, FedDest) {
|
||||||
debug!("Finding actual destination for {destination}");
|
debug!("Finding actual destination");
|
||||||
let destination_str = destination.as_str().to_owned();
|
let destination_str = destination.as_str().to_owned();
|
||||||
let mut hostname = destination_str.clone();
|
let mut hostname = destination_str.clone();
|
||||||
let actual_destination = match get_ip_with_port(&destination_str) {
|
let actual_destination = match get_ip_with_port(&destination_str) {
|
||||||
|
|
@ -362,7 +363,7 @@ async fn find_actual_destination(
|
||||||
let (host, port) = destination_str.split_at(pos);
|
let (host, port) = destination_str.split_at(pos);
|
||||||
FedDest::Named(host.to_owned(), port.to_owned())
|
FedDest::Named(host.to_owned(), port.to_owned())
|
||||||
} else {
|
} else {
|
||||||
debug!("Requesting well known for {destination}");
|
debug!(%destination, "Requesting well known");
|
||||||
if let Some(delegated_hostname) =
|
if let Some(delegated_hostname) =
|
||||||
request_well_known(destination.as_str()).await
|
request_well_known(destination.as_str()).await
|
||||||
{
|
{
|
||||||
|
|
@ -471,7 +472,7 @@ async fn find_actual_destination(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
debug!("Actual destination: {actual_destination:?}");
|
debug!(?actual_destination, "Resolved actual destination");
|
||||||
|
|
||||||
// Can't use get_ip_with_port here because we don't want to add a port
|
// Can't use get_ip_with_port here because we don't want to add a port
|
||||||
// to an IP address if it wasn't specified
|
// to an IP address if it wasn't specified
|
||||||
|
|
@ -532,8 +533,8 @@ async fn request_well_known(destination: &str) -> Option<String> {
|
||||||
.send()
|
.send()
|
||||||
.await;
|
.await;
|
||||||
debug!("Got well known response");
|
debug!("Got well known response");
|
||||||
if let Err(e) = &response {
|
if let Err(error) = &response {
|
||||||
debug!("Well known error: {e:?}");
|
debug!(%error, "Failed to request .well-known");
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
let text = response.ok()?.text().await;
|
let text = response.ok()?.text().await;
|
||||||
|
|
@ -670,8 +671,8 @@ pub(crate) fn parse_incoming_pdu(
|
||||||
pdu: &RawJsonValue,
|
pdu: &RawJsonValue,
|
||||||
) -> Result<(OwnedEventId, CanonicalJsonObject, OwnedRoomId)> {
|
) -> Result<(OwnedEventId, CanonicalJsonObject, OwnedRoomId)> {
|
||||||
let value: CanonicalJsonObject =
|
let value: CanonicalJsonObject =
|
||||||
serde_json::from_str(pdu.get()).map_err(|e| {
|
serde_json::from_str(pdu.get()).map_err(|error| {
|
||||||
warn!("Error parsing incoming event {:?}: {:?}", pdu, e);
|
warn!(%error, object = ?pdu, "Error parsing incoming event");
|
||||||
Error::BadServerResponse("Invalid PDU in server response")
|
Error::BadServerResponse("Invalid PDU in server response")
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
|
|
@ -713,8 +714,8 @@ pub(crate) async fn send_transaction_message_route(
|
||||||
|
|
||||||
for pdu in &body.pdus {
|
for pdu in &body.pdus {
|
||||||
let value: CanonicalJsonObject = serde_json::from_str(pdu.get())
|
let value: CanonicalJsonObject = serde_json::from_str(pdu.get())
|
||||||
.map_err(|e| {
|
.map_err(|error| {
|
||||||
warn!("Error parsing incoming event {:?}: {:?}", pdu, e);
|
warn!(%error, object = ?pdu, "Error parsing incoming event");
|
||||||
Error::BadServerResponse("Invalid PDU in server response")
|
Error::BadServerResponse("Invalid PDU in server response")
|
||||||
})?;
|
})?;
|
||||||
let room_id: OwnedRoomId = value
|
let room_id: OwnedRoomId = value
|
||||||
|
|
@ -726,16 +727,15 @@ pub(crate) async fn send_transaction_message_route(
|
||||||
))?;
|
))?;
|
||||||
|
|
||||||
if services().rooms.state.get_room_version(&room_id).is_err() {
|
if services().rooms.state.get_room_version(&room_id).is_err() {
|
||||||
debug!("Server is not in room {room_id}");
|
debug!(%room_id, "This server is not in the room");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let r = parse_incoming_pdu(pdu);
|
let r = parse_incoming_pdu(pdu);
|
||||||
let (event_id, value, room_id) = match r {
|
let (event_id, value, room_id) = match r {
|
||||||
Ok(t) => t,
|
Ok(t) => t,
|
||||||
Err(e) => {
|
Err(error) => {
|
||||||
warn!("Could not parse PDU: {e}");
|
warn!(%error, object = ?pdu, "Error parsing incoming event");
|
||||||
warn!("Full PDU: {:?}", &pdu);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
@ -771,19 +771,17 @@ pub(crate) async fn send_transaction_message_route(
|
||||||
);
|
);
|
||||||
drop(mutex_lock);
|
drop(mutex_lock);
|
||||||
|
|
||||||
let elapsed = start_time.elapsed();
|
|
||||||
debug!(
|
debug!(
|
||||||
"Handling transaction of event {} took {}m{}s",
|
%event_id,
|
||||||
event_id,
|
elapsed = ?start_time.elapsed(),
|
||||||
elapsed.as_secs() / 60,
|
"Finished handling event",
|
||||||
elapsed.as_secs() % 60
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
for pdu in &resolved_map {
|
for pdu in &resolved_map {
|
||||||
if let Err(e) = pdu.1 {
|
if let (event_id, Err(error)) = pdu {
|
||||||
if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) {
|
if matches!(error, Error::BadRequest(ErrorKind::NotFound, _)) {
|
||||||
warn!("Incoming PDU failed {:?}", pdu);
|
warn!(%error, %event_id, "Incoming PDU failed");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -845,8 +843,8 @@ pub(crate) async fn send_transaction_message_route(
|
||||||
} else {
|
} else {
|
||||||
// TODO fetch missing events
|
// TODO fetch missing events
|
||||||
debug!(
|
debug!(
|
||||||
"No known event ids in read receipt: {:?}",
|
?user_updates,
|
||||||
user_updates
|
"No known event ids in read receipt",
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -933,16 +931,19 @@ pub(crate) async fn send_transaction_message_route(
|
||||||
target_user_id,
|
target_user_id,
|
||||||
target_device_id,
|
target_device_id,
|
||||||
&ev_type.to_string(),
|
&ev_type.to_string(),
|
||||||
event.deserialize_as().map_err(|e| {
|
event.deserialize_as().map_err(
|
||||||
warn!(
|
|error| {
|
||||||
"To-Device event is invalid: \
|
warn!(
|
||||||
{event:?} {e}"
|
%error,
|
||||||
);
|
object = ?event.json(),
|
||||||
Error::BadRequest(
|
"To-Device event is invalid",
|
||||||
ErrorKind::InvalidParam,
|
);
|
||||||
"Event is invalid",
|
Error::BadRequest(
|
||||||
)
|
ErrorKind::InvalidParam,
|
||||||
})?,
|
"Event is invalid",
|
||||||
|
)
|
||||||
|
},
|
||||||
|
)?,
|
||||||
)?,
|
)?,
|
||||||
|
|
||||||
DeviceIdOrAllDevices::AllDevices => {
|
DeviceIdOrAllDevices::AllDevices => {
|
||||||
|
|
@ -985,7 +986,12 @@ pub(crate) async fn send_transaction_message_route(
|
||||||
self_signing_key,
|
self_signing_key,
|
||||||
}) => {
|
}) => {
|
||||||
if user_id.server_name() != sender_servername {
|
if user_id.server_name() != sender_servername {
|
||||||
warn!(%user_id, %sender_servername, "Got signing key update from incorrect homeserver, ignoring");
|
warn!(
|
||||||
|
%user_id,
|
||||||
|
%sender_servername,
|
||||||
|
"Got signing key update from incorrect homeserver, \
|
||||||
|
ignoring",
|
||||||
|
);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if let Some(master_key) = master_key {
|
if let Some(master_key) = master_key {
|
||||||
|
|
@ -1025,7 +1031,7 @@ pub(crate) async fn get_event_route(
|
||||||
let event =
|
let event =
|
||||||
services().rooms.timeline.get_pdu_json(&body.event_id)?.ok_or_else(
|
services().rooms.timeline.get_pdu_json(&body.event_id)?.ok_or_else(
|
||||||
|| {
|
|| {
|
||||||
warn!("Event not found, event ID: {:?}", &body.event_id);
|
warn!(event_id = %body.event_id, "Event not found");
|
||||||
Error::BadRequest(ErrorKind::NotFound, "Event not found.")
|
Error::BadRequest(ErrorKind::NotFound, "Event not found.")
|
||||||
},
|
},
|
||||||
)?;
|
)?;
|
||||||
|
|
@ -1078,7 +1084,7 @@ pub(crate) async fn get_backfill_route(
|
||||||
let sender_servername =
|
let sender_servername =
|
||||||
body.sender_servername.as_ref().expect("server is authenticated");
|
body.sender_servername.as_ref().expect("server is authenticated");
|
||||||
|
|
||||||
debug!("Got backfill request from: {}", sender_servername);
|
debug!(server = %sender_servername, "Got backfill request");
|
||||||
|
|
||||||
if !services()
|
if !services()
|
||||||
.rooms
|
.rooms
|
||||||
|
|
@ -1188,9 +1194,10 @@ pub(crate) async fn get_missing_events_route(
|
||||||
|
|
||||||
if event_room_id != body.room_id {
|
if event_room_id != body.room_id {
|
||||||
warn!(
|
warn!(
|
||||||
"Evil event detected: Event {} found while searching in \
|
event_id = %queued_events[i],
|
||||||
room {}",
|
expected_room_id = %body.room_id,
|
||||||
queued_events[i], body.room_id
|
actual_room_id = %event_room_id,
|
||||||
|
"Evil event detected"
|
||||||
);
|
);
|
||||||
return Err(Error::BadRequest(
|
return Err(Error::BadRequest(
|
||||||
ErrorKind::InvalidParam,
|
ErrorKind::InvalidParam,
|
||||||
|
|
@ -1269,7 +1276,7 @@ pub(crate) async fn get_event_authorization_route(
|
||||||
let event =
|
let event =
|
||||||
services().rooms.timeline.get_pdu_json(&body.event_id)?.ok_or_else(
|
services().rooms.timeline.get_pdu_json(&body.event_id)?.ok_or_else(
|
||||||
|| {
|
|| {
|
||||||
warn!("Event not found, event ID: {:?}", &body.event_id);
|
warn!(event_id = %body.event_id, "Event not found");
|
||||||
Error::BadRequest(ErrorKind::NotFound, "Event not found.")
|
Error::BadRequest(ErrorKind::NotFound, "Event not found.")
|
||||||
},
|
},
|
||||||
)?;
|
)?;
|
||||||
|
|
@ -1354,13 +1361,13 @@ pub(crate) async fn get_room_state_route(
|
||||||
|
|
||||||
Ok(Ra(get_room_state::v1::Response {
|
Ok(Ra(get_room_state::v1::Response {
|
||||||
auth_chain: auth_chain_ids
|
auth_chain: auth_chain_ids
|
||||||
.filter_map(|id| {
|
.filter_map(|event_id| {
|
||||||
if let Some(json) =
|
if let Some(json) =
|
||||||
services().rooms.timeline.get_pdu_json(&id).ok()?
|
services().rooms.timeline.get_pdu_json(&event_id).ok()?
|
||||||
{
|
{
|
||||||
Some(PduEvent::convert_to_outgoing_federation_event(json))
|
Some(PduEvent::convert_to_outgoing_federation_event(json))
|
||||||
} else {
|
} else {
|
||||||
error!("Could not find event json for {id} in db.");
|
error!(%event_id, "Could not find event JSON for event");
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
@ -1469,8 +1476,8 @@ pub(crate) async fn create_join_event_template_route(
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.map(|join_rules_event| {
|
.map(|join_rules_event| {
|
||||||
serde_json::from_str(join_rules_event.content.get()).map_err(
|
serde_json::from_str(join_rules_event.content.get()).map_err(
|
||||||
|e| {
|
|error| {
|
||||||
warn!("Invalid join rules event: {}", e);
|
warn!(%error, "Invalid join rules event");
|
||||||
Error::bad_database("Invalid join rules event in db.")
|
Error::bad_database("Invalid join rules event in db.")
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
@ -1565,8 +1572,8 @@ async fn create_join_event(
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.map(|join_rules_event| {
|
.map(|join_rules_event| {
|
||||||
serde_json::from_str(join_rules_event.content.get()).map_err(
|
serde_json::from_str(join_rules_event.content.get()).map_err(
|
||||||
|e| {
|
|error| {
|
||||||
warn!("Invalid join rules event: {}", e);
|
warn!(%error, "Invalid join rules event");
|
||||||
Error::bad_database("Invalid join rules event in db.")
|
Error::bad_database("Invalid join rules event in db.")
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
@ -1829,10 +1836,11 @@ pub(crate) async fn create_invite_route(
|
||||||
|
|
||||||
event.insert("event_id".to_owned(), "$dummy".into());
|
event.insert("event_id".to_owned(), "$dummy".into());
|
||||||
|
|
||||||
let pdu: PduEvent = serde_json::from_value(event.into()).map_err(|e| {
|
let pdu: PduEvent =
|
||||||
warn!("Invalid invite event: {}", e);
|
serde_json::from_value(event.into()).map_err(|error| {
|
||||||
Error::BadRequest(ErrorKind::InvalidParam, "Invalid invite event.")
|
warn!(%error, "Invalid invite event");
|
||||||
})?;
|
Error::BadRequest(ErrorKind::InvalidParam, "Invalid invite event.")
|
||||||
|
})?;
|
||||||
|
|
||||||
invite_state.push(pdu.to_stripped_state_event());
|
invite_state.push(pdu.to_stripped_state_event());
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue