replace outgoing transaction backoff with global offline server backoff

The global backoff code is in `send_federation_transaction`, so we had
to switch to using this function instead of
`server_server::send_request` directly. This has the side effect of
introducing a timeout, which we previously didn't have for transactions.
This commit is contained in:
Olivia Lee 2024-08-23 20:34:52 -07:00
parent b876dca45c
commit b9118b1361
No known key found for this signature in database
GPG key ID: 54D568A15B9CD1F9

View file

@ -4,7 +4,7 @@ use std::{
future::{Future, IntoFuture}, future::{Future, IntoFuture},
pin::Pin, pin::Pin,
sync::Arc, sync::Arc,
time::{Duration, Instant}, time::Duration,
}; };
use base64::{engine::general_purpose, Engine as _}; use base64::{engine::general_purpose, Engine as _};
@ -155,10 +155,7 @@ pub(crate) struct Service {
#[derive(Debug)] #[derive(Debug)]
enum TransactionStatus { enum TransactionStatus {
Running, Running,
// number of times failed, time of last failure Failed,
Failed(u32, Instant),
// number of times failed
Retrying(u32),
} }
struct HandlerInputs { struct HandlerInputs {
@ -293,19 +290,14 @@ impl Service {
} }
if let Err(error) = result { if let Err(error) = result {
warn!(%error, "Marking transaction as failed"); // Logging transactions that fail due to backoff produces a lot of
current_transaction_status.entry(destination).and_modify(|e| { // clutter in the logs. Failure due to backoff is expected behavior,
use TransactionStatus::{Failed, Retrying, Running}; // and the transaction will be retried later.
if !matches!(error, Error::ServerBackoff { .. }) {
*e = match e { warn!(%error, "Marking transaction as failed");
Running => Failed(1, Instant::now()), }
Retrying(n) => Failed(*n + 1, Instant::now()), current_transaction_status
Failed(..) => { .insert(destination, TransactionStatus::Failed);
error!("Request that was not even running failed?!");
return;
}
}
});
return Ok(None); return Ok(None);
} }
@ -410,27 +402,13 @@ impl Service {
entry entry
.and_modify(|e| match e { .and_modify(|e| match e {
TransactionStatus::Running | TransactionStatus::Retrying(_) => { TransactionStatus::Running => {
// already running // already running
allow = false; allow = false;
} }
TransactionStatus::Failed(tries, time) => { TransactionStatus::Failed => {
// Fail if a request has failed recently (exponential retry = true;
// backoff) *e = TransactionStatus::Running;
let mut min_elapsed_duration =
Duration::from_secs(30) * (*tries) * (*tries);
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24)
{
min_elapsed_duration =
Duration::from_secs(60 * 60 * 24);
}
if time.elapsed() < min_elapsed_duration {
allow = false;
} else {
retry = true;
*e = TransactionStatus::Retrying(*tries);
}
} }
}) })
.or_insert(TransactionStatus::Running); .or_insert(TransactionStatus::Running);
@ -989,26 +967,29 @@ async fn handle_federation_event(
} }
} }
let permit = services().sending.maximum_requests.acquire().await; let response = services()
.sending
let response = server_server::send_request( .send_federation_request(
server, server,
send_transaction_message::v1::Request { send_transaction_message::v1::Request {
origin: services().globals.server_name().to_owned(), origin: services().globals.server_name().to_owned(),
pdus: pdu_jsons, pdus: pdu_jsons,
edus: edu_jsons, edus: edu_jsons,
origin_server_ts: MilliSecondsSinceUnixEpoch::now(), origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
transaction_id: general_purpose::URL_SAFE_NO_PAD transaction_id: general_purpose::URL_SAFE_NO_PAD
.encode(calculate_hash(events.iter().map(|e| match e { .encode(calculate_hash(events.iter().map(|e| match e {
SendingEventType::Edu(b) => b.json().get().as_bytes(), SendingEventType::Edu(b) => b.json().get().as_bytes(),
SendingEventType::Pdu(b) => b.as_bytes(), SendingEventType::Pdu(b) => b.as_bytes(),
}))) })))
.into(), .into(),
}, },
LogRequestError::No, )
AllowLoopbackRequests::No, // The spec states that this endpoint should always return success, even
) // if individual PDUs fail. If we get an error, something is wrong.
.await?; .backoff_on(BackoffOn::AllErrors)
// The error will be logged in `handle_response`
.log_errors(LogRequestError::No)
.await?;
for pdu in response.pdus { for pdu in response.pdus {
if let (event_id, Err(error)) = pdu { if let (event_id, Err(error)) = pdu {
@ -1016,8 +997,6 @@ async fn handle_federation_event(
} }
} }
drop(permit);
Ok(()) Ok(())
} }