diff --git a/src/service/sending.rs b/src/service/sending.rs index 6c50d2e7..8198249c 100644 --- a/src/service/sending.rs +++ b/src/service/sending.rs @@ -4,7 +4,7 @@ use std::{ future::{Future, IntoFuture}, pin::Pin, sync::Arc, - time::{Duration, Instant}, + time::Duration, }; use base64::{engine::general_purpose, Engine as _}; @@ -155,10 +155,7 @@ pub(crate) struct Service { #[derive(Debug)] enum TransactionStatus { Running, - // number of times failed, time of last failure - Failed(u32, Instant), - // number of times failed - Retrying(u32), + Failed, } struct HandlerInputs { @@ -293,19 +290,14 @@ impl Service { } if let Err(error) = result { - warn!(%error, "Marking transaction as failed"); - current_transaction_status.entry(destination).and_modify(|e| { - use TransactionStatus::{Failed, Retrying, Running}; - - *e = match e { - Running => Failed(1, Instant::now()), - Retrying(n) => Failed(*n + 1, Instant::now()), - Failed(..) => { - error!("Request that was not even running failed?!"); - return; - } - } - }); + // Logging transactions that fail due to backoff produces a lot of + // clutter in the logs. Failure due to backoff is expected behavior, + // and the transaction will be retried later. + if !matches!(error, Error::ServerBackoff { .. }) { + warn!(%error, "Marking transaction as failed"); + } + current_transaction_status + .insert(destination, TransactionStatus::Failed); return Ok(None); } @@ -410,27 +402,13 @@ impl Service { entry .and_modify(|e| match e { - TransactionStatus::Running | TransactionStatus::Retrying(_) => { + TransactionStatus::Running => { // already running allow = false; } - TransactionStatus::Failed(tries, time) => { - // Fail if a request has failed recently (exponential - // backoff) - let mut min_elapsed_duration = - Duration::from_secs(30) * (*tries) * (*tries); - if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) - { - min_elapsed_duration = - Duration::from_secs(60 * 60 * 24); - } - - if time.elapsed() < min_elapsed_duration { - allow = false; - } else { - retry = true; - *e = TransactionStatus::Retrying(*tries); - } + TransactionStatus::Failed => { + retry = true; + *e = TransactionStatus::Running; } }) .or_insert(TransactionStatus::Running); @@ -989,26 +967,29 @@ async fn handle_federation_event( } } - let permit = services().sending.maximum_requests.acquire().await; - - let response = server_server::send_request( - server, - send_transaction_message::v1::Request { - origin: services().globals.server_name().to_owned(), - pdus: pdu_jsons, - edus: edu_jsons, - origin_server_ts: MilliSecondsSinceUnixEpoch::now(), - transaction_id: general_purpose::URL_SAFE_NO_PAD - .encode(calculate_hash(events.iter().map(|e| match e { - SendingEventType::Edu(b) => b.json().get().as_bytes(), - SendingEventType::Pdu(b) => b.as_bytes(), - }))) - .into(), - }, - LogRequestError::No, - AllowLoopbackRequests::No, - ) - .await?; + let response = services() + .sending + .send_federation_request( + server, + send_transaction_message::v1::Request { + origin: services().globals.server_name().to_owned(), + pdus: pdu_jsons, + edus: edu_jsons, + origin_server_ts: MilliSecondsSinceUnixEpoch::now(), + transaction_id: general_purpose::URL_SAFE_NO_PAD + .encode(calculate_hash(events.iter().map(|e| match e { + SendingEventType::Edu(b) => b.json().get().as_bytes(), + SendingEventType::Pdu(b) => b.as_bytes(), + }))) + .into(), + }, + ) + // The spec states that this endpoint should always return success, even + // if individual PDUs fail. If we get an error, something is wrong. + .backoff_on(BackoffOn::AllErrors) + // The error will be logged in `handle_response` + .log_errors(LogRequestError::No) + .await?; for pdu in response.pdus { if let (event_id, Err(error)) = pdu { @@ -1016,8 +997,6 @@ async fn handle_federation_event( } } - drop(permit); - Ok(()) }