fix service/rooms/event_handler events

This commit is contained in:
Charles Hall 2024-07-15 22:05:34 -07:00
parent 4b5d127368
commit b4d81bb067
No known key found for this signature in database
GPG key ID: 7B8E0645816E07CF

View file

@ -121,10 +121,12 @@ impl Service {
})?;
let create_event_content: RoomCreateEventContent =
serde_json::from_str(create_event.content.get()).map_err(|e| {
error!("Invalid create event: {}", e);
Error::BadDatabase("Invalid create event in db")
})?;
serde_json::from_str(create_event.content.get()).map_err(
|error| {
error!(%error, "Invalid create event");
Error::BadDatabase("Invalid create event.")
},
)?;
let room_version_id = &create_event_content.room_version;
let first_pdu_in_room =
@ -195,7 +197,7 @@ impl Service {
}
if time.elapsed() < min_elapsed_duration {
info!("Backing off from {}", prev_id);
info!(event_id = %prev_id, "Backing off from prev event");
continue;
}
}
@ -236,7 +238,7 @@ impl Service {
((*prev_id).to_owned(), start_time),
);
if let Err(e) = self
if let Err(error) = self
.upgrade_outlier_to_timeline_pdu(
pdu,
json,
@ -248,7 +250,7 @@ impl Service {
.await
{
errors += 1;
warn!("Prev event {} failed: {}", prev_id, e);
warn!(%error, event_id = %prev_id, "Prev event failed");
match services()
.globals
.bad_event_ratelimiter
@ -264,7 +266,6 @@ impl Service {
}
}
}
let elapsed = start_time.elapsed();
services()
.globals
.roomid_federationhandletime
@ -272,10 +273,9 @@ impl Service {
.await
.remove(&room_id.to_owned());
debug!(
"Handling prev event {} took {}m{}s",
prev_id,
elapsed.as_secs() / 60,
elapsed.as_secs() % 60
elapsed = ?start_time.elapsed(),
event_id = %prev_id,
"Finished handling prev event",
);
}
}
@ -334,8 +334,8 @@ impl Service {
// 3. check content hash, redact if doesn't match
let create_event_content: RoomCreateEventContent =
serde_json::from_str(create_event.content.get()).map_err(
|e| {
error!("Invalid create event: {}", e);
|error| {
error!(%error, "Invalid create event");
Error::BadDatabase("Invalid create event in db")
},
)?;
@ -397,9 +397,9 @@ impl Service {
&value,
room_version_id,
) {
Err(e) => {
Err(error) => {
// Drop
warn!("Dropping bad event {}: {}", event_id, e,);
warn!(%event_id, %error, "Dropping bad event");
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Signature verification failed",
@ -407,7 +407,7 @@ impl Service {
}
Ok(ruma::signatures::Verified::Signatures) => {
// Redact
warn!("Calculated hash does not match: {}", event_id);
warn!(%event_id, "Calculated hash does not match");
let Ok(obj) = ruma::canonical_json::redact(
value,
room_version_id,
@ -481,16 +481,17 @@ impl Service {
// 6. Reject "due to auth events" if the event doesn't pass auth
// based on the auth events
debug!(
"Auth check for {} based on auth events",
incoming_pdu.event_id
event_id = %incoming_pdu.event_id,
"Starting auth check for event based on auth events",
);
// Build map of auth events
let mut auth_events = HashMap::new();
for id in &incoming_pdu.auth_events {
let Some(auth_event) = services().rooms.timeline.get_pdu(id)?
for event_id in &incoming_pdu.auth_events {
let Some(auth_event) =
services().rooms.timeline.get_pdu(event_id)?
else {
warn!("Could not find auth event {}", id);
warn!(%event_id, "Could not find auth event");
continue;
};
@ -543,7 +544,7 @@ impl Service {
));
}
debug!("Validation successful.");
debug!("Validation successful");
// 7. Persist the event as an outlier.
services()
@ -551,7 +552,7 @@ impl Service {
.outlier
.add_pdu_outlier(&incoming_pdu.event_id, &val)?;
debug!("Added pdu as outlier.");
debug!("Added pdu as outlier");
Ok((Arc::new(incoming_pdu), val))
})
@ -594,10 +595,12 @@ impl Service {
);
let create_event_content: RoomCreateEventContent =
serde_json::from_str(create_event.content.get()).map_err(|e| {
warn!("Invalid create event: {}", e);
serde_json::from_str(create_event.content.get()).map_err(
|error| {
warn!(%error, "Invalid create event");
Error::BadDatabase("Invalid create event in db")
})?;
},
)?;
let room_version_id = &create_event_content.room_version;
let room_version = RoomVersion::new(room_version_id)
@ -728,7 +731,7 @@ impl Service {
id.clone(),
);
} else {
warn!("Failed to get_statekey_from_short.");
warn!("Failed to get_statekey_from_short");
}
starting_events.push(id);
}
@ -751,10 +754,10 @@ impl Service {
room_version_id,
&fork_states,
auth_chain_sets,
|id| {
let res = services().rooms.timeline.get_pdu(id);
if let Err(e) = &res {
error!("LOOK AT ME Failed to fetch event: {}", e);
|event_id| {
let res = services().rooms.timeline.get_pdu(event_id);
if let Err(error) = &res {
error!(%error, %event_id, "Failed to fetch event");
}
res.ok().flatten()
},
@ -777,12 +780,11 @@ impl Service {
})
.collect::<Result<_>>()?,
),
Err(e) => {
Err(error) => {
warn!(
%error,
"State resolution on prev events failed, either \
an event could not be found or deserialization: \
{}",
e
an event could not be found or deserialization"
);
None
}
@ -807,7 +809,7 @@ impl Service {
.await
{
Ok(res) => {
debug!("Fetching state events at event.");
debug!("Fetching state events at event");
let collect = res
.pdu_ids
.iter()
@ -871,9 +873,9 @@ impl Service {
state_at_incoming_event = Some(state);
}
Err(e) => {
warn!("Fetching state for event failed: {}", e);
return Err(e);
Err(error) => {
warn!(%error, "Fetching state for event failed");
return Err(error);
}
};
}
@ -1092,7 +1094,7 @@ impl Service {
// Soft fail, we keep the event as an outlier but don't add it to
// the timeline
warn!("Event was soft failed: {:?}", incoming_pdu);
warn!("Event was soft failed");
services()
.rooms
.pdu_metadata
@ -1191,8 +1193,8 @@ impl Service {
let fetch_event = |id: &_| {
let res = services().rooms.timeline.get_pdu(id);
if let Err(e) = &res {
error!("LOOK AT ME Failed to fetch event: {}", e);
if let Err(error) = &res {
error!(%error, "Failed to fetch event");
}
res.ok().flatten()
};
@ -1212,7 +1214,7 @@ impl Service {
drop(lock);
debug!("State resolution done. Compressing state");
debug!("State resolution done; compressing state");
let new_room_state = state
.into_iter()
@ -1274,14 +1276,14 @@ impl Service {
};
let mut pdus = vec![];
for id in events {
for event_id in events {
// a. Look in the main timeline (pduid_pdu tree)
// b. Look at outlier pdu tree
// (get_pdu_json checks both)
if let Ok(Some(local_pdu)) =
services().rooms.timeline.get_pdu(id)
services().rooms.timeline.get_pdu(event_id)
{
trace!("Found {} in db", id);
trace!(%event_id, "Found event locally");
pdus.push((local_pdu, None));
continue;
}
@ -1289,7 +1291,7 @@ impl Service {
// c. Ask origin server over federation
// We also handle its auth chain here so we don't get a stack
// overflow in handle_outlier_pdu.
let mut todo_auth_events = vec![Arc::clone(id)];
let mut todo_auth_events = vec![Arc::clone(event_id)];
let mut events_in_reverse_order = Vec::new();
let mut events_all = HashSet::new();
let mut i = 0;
@ -1312,7 +1314,10 @@ impl Service {
}
if time.elapsed() < min_elapsed_duration {
info!("Backing off from {}", next_id);
info!(
event_id = %next_id,
"Backing off from event",
);
continue;
}
}
@ -1329,11 +1334,14 @@ impl Service {
if let Ok(Some(_)) =
services().rooms.timeline.get_pdu(&next_id)
{
trace!("Found {} in db", next_id);
trace!(event_id = %next_id, "Found event locally");
continue;
}
info!("Fetching {} over federation.", next_id);
info!(
event_id = %next_id,
"Fetching event over federation",
);
if let Ok(res) = services()
.sending
.send_federation_request(
@ -1344,7 +1352,7 @@ impl Service {
)
.await
{
info!("Got {} over federation", next_id);
info!(event_id = %next_id, "Got event over federation");
let Ok((calculated_event_id, value)) =
pdu::gen_event_id_canonical_json(
&res.pdu,
@ -1357,9 +1365,10 @@ impl Service {
if calculated_event_id != *next_id {
warn!(
"Server didn't return event id we requested: \
requested: {}, we got {}. Event: {:?}",
next_id, calculated_event_id, &res.pdu
expected_event_id = %next_id,
actual_event_id = %calculated_event_id,
"Server returned an event with a different ID \
than requested",
);
}
@ -1383,7 +1392,7 @@ impl Service {
events_in_reverse_order.push((next_id.clone(), value));
events_all.insert(next_id);
} else {
warn!("Failed to fetch event: {}", next_id);
warn!(event_id = %next_id, "Failed to fetch event");
back_off((*next_id).to_owned()).await;
}
}
@ -1407,7 +1416,10 @@ impl Service {
}
if time.elapsed() < min_elapsed_duration {
info!("Backing off from {}", next_id);
info!(
event_id = %next_id,
"Backing off from event",
);
continue;
}
}
@ -1425,14 +1437,15 @@ impl Service {
.await
{
Ok((pdu, json)) => {
if next_id == id {
if next_id == event_id {
pdus.push((pdu, Some(json)));
}
}
Err(e) => {
Err(error) => {
warn!(
"Authentication of event {} failed: {:?}",
next_id, e
event_id = %next_id,
%error,
"Event failed auth checks",
);
back_off((**next_id).to_owned()).await;
}
@ -1591,10 +1604,7 @@ impl Service {
.await;
let Ok(keys) = fetch_res else {
warn!(
"Signature verification failed: Could not fetch signing \
key.",
);
warn!("Failed to fetch signing key");
continue;
};
@ -1619,8 +1629,8 @@ impl Service {
pub_key_map: &mut RwLockWriteGuard<'_, BTreeMap<String, SigningKeys>>,
) -> Result<()> {
let value: CanonicalJsonObject = serde_json::from_str(pdu.get())
.map_err(|e| {
error!("Invalid PDU in server response: {:?}: {:?}", pdu, e);
.map_err(|error| {
error!(%error, ?pdu, "Invalid PDU in server response");
Error::BadServerResponse("Invalid PDU in server response")
})?;
@ -1643,9 +1653,9 @@ impl Service {
}
if time.elapsed() < min_elapsed_duration {
debug!("Backing off from {}", event_id);
debug!(%event_id, "Backing off from event");
return Err(Error::BadServerResponse(
"bad event, still backing off",
"Bad event, still backing off",
));
}
}
@ -1697,11 +1707,14 @@ impl Service {
continue;
}
trace!("Loading signing keys for {}", origin);
trace!(server = %origin, "Loading signing keys for other server");
if let Some(result) = services().globals.signing_keys_for(origin)? {
if !contains_all_ids(&result) {
trace!("Signing key not loaded for {}", origin);
trace!(
server = %origin,
"Signing key not loaded for server",
);
servers.insert(origin.to_owned(), BTreeMap::new());
}
@ -1744,7 +1757,7 @@ impl Service {
)
.await
{
debug!(%error, "failed to get server keys from cache");
debug!(%error, "Failed to get server keys from cache");
};
}
@ -1757,7 +1770,7 @@ impl Service {
}
for server in services().globals.trusted_servers() {
info!("Asking batch signing keys from trusted server {}", server);
info!(%server, "Asking batch signing keys from trusted server");
if let Ok(keys) = services()
.sending
.send_federation_request(
@ -1768,18 +1781,18 @@ impl Service {
)
.await
{
trace!("Got signing keys: {:?}", keys);
trace!(signing_keys = ?keys, "Got signing keys");
let mut pkm = pub_key_map.write().await;
for k in keys.server_keys {
let k = match k.deserialize() {
Ok(key) => key,
Err(e) => {
Err(error) => {
warn!(
"Received error {} while fetching keys from \
trusted server {}",
e, server
%error,
%server,
object = ?k.json(),
"Failed to fetch keys from trusted server",
);
warn!("{}", k.into_json());
continue;
}
};
@ -1804,7 +1817,7 @@ impl Service {
}
}
info!("Asking individual servers for signing keys: {servers:?}");
info!(?servers, "Asking individual servers for signing keys");
let mut futures: FuturesUnordered<_> = servers
.into_keys()
.map(|server| async move {
@ -1822,9 +1835,8 @@ impl Service {
.collect();
while let Some(result) = futures.next().await {
info!("Received new result");
if let (Ok(get_keys_response), origin) = result {
info!("Result is from {origin}");
info!(server = %origin, "Received new result from server");
if let Ok(key) = get_keys_response.server_key.deserialize() {
let result = services()
.globals
@ -1861,11 +1873,15 @@ impl Service {
return Ok(());
};
let Ok(acl_event_content) = serde_json::from_str::<
let acl_event_content = match serde_json::from_str::<
RoomServerAclEventContent,
>(acl_event.content.get()) else {
warn!("Invalid ACL event");
>(acl_event.content.get())
{
Ok(x) => x,
Err(error) => {
warn!(%error, "Invalid ACL event");
return Ok(());
}
};
if acl_event_content.allow.is_empty() {
@ -1877,8 +1893,9 @@ impl Service {
Ok(())
} else {
info!(
"Server {} was denied by room ACL in {}",
server_name, room_id
server = %server_name,
%room_id,
"Other server was denied by room ACL",
);
Err(Error::BadRequest(
ErrorKind::forbidden(),
@ -1970,7 +1987,7 @@ impl Service {
}
if time.elapsed() < min_elapsed_duration {
debug!("Backing off from {:?}", signature_ids);
debug!(?signature_ids, "Backing off from signatures");
return Err(Error::BadServerResponse(
"bad signature, still backing off",
));
@ -1990,8 +2007,10 @@ impl Service {
.expect("Should be valid until year 500,000,000");
debug!(
"The threshhold is {:?}, found time is {:?} for server {}",
ts_threshold, result.valid_until_ts, origin
server = %origin,
ts_threshold = %ts_threshold.get(),
ts_valid_until = %result.valid_until_ts.get(),
"Loaded signing keys for server",
);
if contains_all_ids(&result) {
@ -2001,7 +2020,7 @@ impl Service {
debug!(
origin = %origin,
valid_until_ts = %result.valid_until_ts.get(),
"Keys for are deemed as valid, as they expire after threshold",
"Keys are valid because they expire after threshold",
);
return Ok(result);
}
@ -2181,7 +2200,12 @@ impl Service {
#[tracing::instrument(skip_all)]
fn check_room_id(room_id: &RoomId, pdu: &PduEvent) -> Result<()> {
if pdu.room_id != room_id {
warn!("Found event from room {} in room {}", pdu.room_id, room_id);
warn!(
event_id = %pdu.event_id,
expected_room_id = %pdu.room_id,
actual_room_id = %room_id,
"Event has wrong room ID",
);
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Event has wrong room id",