diff --git a/book/changelog.md b/book/changelog.md index 66b990d0..fb7933b5 100644 --- a/book/changelog.md +++ b/book/changelog.md @@ -194,6 +194,7 @@ This will be the first release of Grapevine since it was forked from Conduit ([!96](https://gitlab.computer.surgery/matrix/grapevine/-/merge_requests/96)) 18. Fixed incoming HTTP/2 requests failing federation signature check. ([!104](https://gitlab.computer.surgery/matrix/grapevine/-/merge_requests/104)) +<<<<<<< HEAD 19. Return 403 instead of 500 when joins to a local-only room are denied. Consequently fixes Heisenbridge being unable to join puppeted users to its rooms ([#85](https://gitlab.computer.surgery/matrix/grapevine/-/issues/85)). @@ -201,6 +202,14 @@ This will be the first release of Grapevine since it was forked from Conduit 20. Fix handling of v11 rooms with `m.room.create` event content that passes the authorization rules but doesn't match other parts of the spec. ([!139](https://gitlab.computer.surgery/matrix/grapevine/-/merge_requests/139)) +||||||| parent of 963519ec (add changelog entry for global offline server backoff) +======= +19. Remove buggy backoff implementation for remote device key queries that + failed to reset the backoff delay after a successful request. This caused + an increasing rate of key query failures (and therefore UTD messages) over + time until a restart. + ([!70](https://gitlab.computer.surgery/matrix/grapevine/-/merge_requests/70)) +>>>>>>> 963519ec (add changelog entry for global offline server backoff) ### Added @@ -279,3 +288,6 @@ This will be the first release of Grapevine since it was forked from Conduit ([!121](https://gitlab.computer.surgery/matrix/grapevine/-/merge_requests/121)) 25. Add configuration options to tune the value of each cache individually. ([!124](https://gitlab.computer.surgery/matrix/grapevine/-/merge_requests/124)) +26. Attempt to detect offline remote servers and back off all federation + requests to them. + ([!70](https://gitlab.computer.surgery/matrix/grapevine/-/merge_requests/70)) diff --git a/src/api/client_server/keys.rs b/src/api/client_server/keys.rs index c4422079..2a17ab3b 100644 --- a/src/api/client_server/keys.rs +++ b/src/api/client_server/keys.rs @@ -1,6 +1,7 @@ use std::{ - collections::{hash_map, BTreeMap, HashMap, HashSet}, - time::{Duration, Instant}, + collections::{BTreeMap, HashMap, HashSet}, + future::IntoFuture, + time::Duration, }; use futures_util::{stream::FuturesUnordered, StreamExt}; @@ -385,47 +386,9 @@ pub(crate) async fn get_keys_helper bool>( let mut failures = BTreeMap::new(); - let back_off = |id| async { - match services().globals.bad_query_ratelimiter.write().await.entry(id) { - hash_map::Entry::Vacant(e) => { - e.insert((Instant::now(), 1)); - } - hash_map::Entry::Occupied(mut e) => { - *e.get_mut() = (Instant::now(), e.get().1 + 1); - } - } - }; - let mut futures: FuturesUnordered<_> = get_over_federation .into_iter() .map(|(server, vec)| async move { - if let Some((time, tries)) = services() - .globals - .bad_query_ratelimiter - .read() - .await - .get(server) - { - // Exponential backoff - let mut min_elapsed_duration = - Duration::from_secs(30) * (*tries) * (*tries); - if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) { - min_elapsed_duration = Duration::from_secs(60 * 60 * 24); - } - - if let Some(remaining) = - min_elapsed_duration.checked_sub(time.elapsed()) - { - debug!(%server, %tries, ?remaining, "Backing off from server"); - return ( - server, - Err(Error::BadServerResponse( - "bad query, still backing off", - )), - ); - } - } - let mut device_keys_input_fed = BTreeMap::new(); for (user_id, keys) in vec { device_keys_input_fed.insert(user_id.to_owned(), keys.clone()); @@ -436,12 +399,15 @@ pub(crate) async fn get_keys_helper bool>( server, tokio::time::timeout( Duration::from_secs(25), - services().sending.send_federation_request( - server, - federation::keys::get_keys::v1::Request { - device_keys: device_keys_input_fed, - }, - ), + services() + .sending + .send_federation_request( + server, + federation::keys::get_keys::v1::Request { + device_keys: device_keys_input_fed, + }, + ) + .into_future(), ) .await .map_err(|_e| Error::BadServerResponse("Query took too long")) @@ -454,7 +420,6 @@ pub(crate) async fn get_keys_helper bool>( let response = match response { Ok(response) => response, Err(error) => { - back_off(server.to_owned()).await; debug!(%server, %error, "remote device key query failed"); failures.insert(server.to_string(), json!({})); continue; diff --git a/src/config.rs b/src/config.rs index a440a455..b9171a88 100644 --- a/src/config.rs +++ b/src/config.rs @@ -12,7 +12,7 @@ use ruma::{ api::federation::discovery::OldVerifyKey, OwnedServerName, OwnedServerSigningKeyId, RoomVersionId, }; -use serde::Deserialize; +use serde::{Deserialize, Deserializer}; use strum::{Display, EnumIter, IntoEnumIterator}; use crate::error; @@ -407,6 +407,7 @@ pub(crate) struct FederationConfig { pub(crate) max_fetch_prev_events: u16, pub(crate) max_concurrent_requests: u16, pub(crate) old_verify_keys: BTreeMap, + pub(crate) backoff: BackoffConfig, } impl Default for FederationConfig { @@ -420,6 +421,44 @@ impl Default for FederationConfig { max_fetch_prev_events: 100, max_concurrent_requests: 100, old_verify_keys: BTreeMap::new(), + backoff: BackoffConfig::default(), + } + } +} + +#[derive(Debug, Deserialize)] +#[serde(default)] +pub(crate) struct BackoffConfig { + /// Minimum number of consecutive failures for a server before starting to + /// delay requests. + pub(crate) failure_threshold: u8, + + /// Initial delay between requests in seconds, after the number of + /// consecutive failures to a server first exceeds the threshold. + pub(crate) base_delay: f64, + + /// Factor to increase delay by after each additional consecutive failure. + pub(crate) multiplier: f64, + + /// Maximum delay between requests to a server in seconds. + pub(crate) max_delay: f64, + + /// Range of random multipliers to request delay. + #[serde(deserialize_with = "deserialize_jitter_range")] + pub(crate) jitter_range: std::ops::Range, +} + +impl Default for BackoffConfig { + fn default() -> Self { + // After the first 3 consecutive failed requests, increase delay + // exponentially from 5s to 24h over the next 24 failures. It takes an + // average of 4.3 days of failures to reach the maximum delay of 24h. + Self { + failure_threshold: 3, + base_delay: 5.0, + multiplier: 1.5, + max_delay: 60.0 * 60.0 * 24.0, + jitter_range: 0.5..1.5, } } } @@ -482,6 +521,24 @@ pub(crate) fn default_default_room_version() -> RoomVersionId { RoomVersionId::V10 } +fn deserialize_jitter_range<'de, D>( + deserializer: D, +) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let s = String::deserialize(deserializer)?; + let Some((a, b)) = s.split_once("..") else { + return Err(serde::de::Error::custom(crate::Error::bad_config( + "invalid jitter range", + ))); + }; + + a.parse() + .and_then(|a| b.parse().map(|b| a..b)) + .map_err(serde::de::Error::custom) +} + /// Search default locations for a configuration file /// /// If one isn't found, the list of tried paths is returned. diff --git a/src/observability.rs b/src/observability.rs index f2503f61..6b5330ab 100644 --- a/src/observability.rs +++ b/src/observability.rs @@ -290,6 +290,9 @@ pub(crate) struct Metrics { /// Number of entries in an /// [`OnDemandHashMap`](crate::utils::on_demand_hashmap::OnDemandHashMap) on_demand_hashmap_size: opentelemetry::metrics::Gauge, + + /// Number of known remote servers in each state (online or offline) + remote_server_count: opentelemetry::metrics::Gauge, } impl Metrics { @@ -345,11 +348,17 @@ impl Metrics { .with_description("Number of entries in OnDemandHashMap") .init(); + let remote_server_count = meter + .u64_gauge("remote_server_count") + .with_description("Number of known remote servers") + .init(); + Metrics { otel_state: (registry, provider), http_requests_histogram, lookup, on_demand_hashmap_size, + remote_server_count, } } @@ -384,6 +393,18 @@ impl Metrics { &[KeyValue::new("name", name)], ); } + + /// Record number of remote servers marked online or offline. + pub(crate) fn record_remote_server_count( + &self, + online_count: u64, + offline_count: u64, + ) { + self.remote_server_count + .record(online_count, &[KeyValue::new("state", "online")]); + self.remote_server_count + .record(offline_count, &[KeyValue::new("state", "offline")]); + } } /// Track HTTP metrics by converting this into an [`axum`] layer diff --git a/src/service.rs b/src/service.rs index 85e7a302..3bca91fc 100644 --- a/src/service.rs +++ b/src/service.rs @@ -12,6 +12,7 @@ pub(crate) mod pdu; pub(crate) mod pusher; pub(crate) mod rooms; pub(crate) mod sending; +pub(crate) mod server_backoff; pub(crate) mod transaction_ids; pub(crate) mod uiaa; pub(crate) mod users; @@ -35,6 +36,7 @@ pub(crate) struct Services { pub(crate) globals: globals::Service, pub(crate) key_backups: key_backups::Service, pub(crate) media: media::Service, + pub(crate) server_backoff: Arc, pub(crate) sending: Arc, } @@ -120,6 +122,7 @@ impl Services { media: media::Service { db, }, + server_backoff: server_backoff::Service::build(), sending: sending::Service::new(db, &config), globals: globals::Service::new(db, config, reload_handles)?, diff --git a/src/service/globals.rs b/src/service/globals.rs index 0881ce13..c7aa7078 100644 --- a/src/service/globals.rs +++ b/src/service/globals.rs @@ -84,8 +84,6 @@ pub(crate) struct Service { Arc>>, pub(crate) bad_signature_ratelimiter: Arc, RateLimitState>>>, - pub(crate) bad_query_ratelimiter: - Arc>>, pub(crate) servername_ratelimiter: OnDemandHashMap, pub(crate) roomid_mutex_insert: TokenSet, @@ -278,7 +276,6 @@ impl Service { admin_bot_room_alias_id, bad_event_ratelimiter: Arc::new(RwLock::new(HashMap::new())), bad_signature_ratelimiter: Arc::new(RwLock::new(HashMap::new())), - bad_query_ratelimiter: Arc::new(RwLock::new(HashMap::new())), servername_ratelimiter: OnDemandHashMap::new( "servername_ratelimiter".to_owned(), ), diff --git a/src/service/sending.rs b/src/service/sending.rs index 1c3b6ecb..8198249c 100644 --- a/src/service/sending.rs +++ b/src/service/sending.rs @@ -1,8 +1,10 @@ use std::{ collections::{BTreeMap, HashMap, HashSet}, fmt::Debug, + future::{Future, IntoFuture}, + pin::Pin, sync::Arc, - time::{Duration, Instant}, + time::Duration, }; use base64::{engine::general_purpose, Engine as _}; @@ -116,6 +118,31 @@ pub(crate) struct RequestData { requester_span: Span, } +/// Which types of errors should cause us to backoff requests to this server +/// globally. +/// +/// The default is [`BackoffOn::AllExceptWellFormed`], which is conservative, +/// with a high false negative rate and low false positive rate. For endpoints +/// where we have additional information, we should pick a less conservative +/// setting. +#[derive(Copy, Clone, Debug)] +pub(crate) enum BackoffOn { + /// All errors except for error responses that match the expected + /// `{ "errcode": ... }` format for the matrix protocol. + AllExceptWellFormed, + + /// All errors + AllErrors, +} + +#[must_use = "The request builder must be awaited for the request to be sent"] +pub(crate) struct SendFederationRequestBuilder<'a, T> { + destination: &'a ServerName, + request: T, + log_errors: LogRequestError, + backoff_on: BackoffOn, +} + pub(crate) struct Service { db: &'static dyn Data, @@ -128,10 +155,7 @@ pub(crate) struct Service { #[derive(Debug)] enum TransactionStatus { Running, - // number of times failed, time of last failure - Failed(u32, Instant), - // number of times failed - Retrying(u32), + Failed, } struct HandlerInputs { @@ -266,19 +290,14 @@ impl Service { } if let Err(error) = result { - warn!(%error, "Marking transaction as failed"); - current_transaction_status.entry(destination).and_modify(|e| { - use TransactionStatus::{Failed, Retrying, Running}; - - *e = match e { - Running => Failed(1, Instant::now()), - Retrying(n) => Failed(*n + 1, Instant::now()), - Failed(..) => { - error!("Request that was not even running failed?!"); - return; - } - } - }); + // Logging transactions that fail due to backoff produces a lot of + // clutter in the logs. Failure due to backoff is expected behavior, + // and the transaction will be retried later. + if !matches!(error, Error::ServerBackoff { .. }) { + warn!(%error, "Marking transaction as failed"); + } + current_transaction_status + .insert(destination, TransactionStatus::Failed); return Ok(None); } @@ -383,27 +402,13 @@ impl Service { entry .and_modify(|e| match e { - TransactionStatus::Running | TransactionStatus::Retrying(_) => { + TransactionStatus::Running => { // already running allow = false; } - TransactionStatus::Failed(tries, time) => { - // Fail if a request has failed recently (exponential - // backoff) - let mut min_elapsed_duration = - Duration::from_secs(30) * (*tries) * (*tries); - if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) - { - min_elapsed_duration = - Duration::from_secs(60 * 60 * 24); - } - - if time.elapsed() < min_elapsed_duration { - allow = false; - } else { - retry = true; - *e = TransactionStatus::Retrying(*tries); - } + TransactionStatus::Failed => { + retry = true; + *e = TransactionStatus::Running; } }) .or_insert(TransactionStatus::Running); @@ -667,35 +672,20 @@ impl Service { Ok(()) } - #[tracing::instrument(skip(self, request))] - pub(crate) async fn send_federation_request( + // Allowed because `SendFederationRequestBuilder::into_future` uses + // `services()` + #[allow(clippy::unused_self)] + pub(crate) fn send_federation_request<'a, T>( &self, - destination: &ServerName, + destination: &'a ServerName, request: T, - ) -> Result - where - T: OutgoingRequest + Debug, - { - debug!("Waiting for permit"); - let permit = self.maximum_requests.acquire().await; - debug!("Got permit"); - let response = tokio::time::timeout( - Duration::from_secs(2 * 60), - server_server::send_request( - destination, - request, - LogRequestError::Yes, - AllowLoopbackRequests::No, - ), - ) - .await - .map_err(|_| { - warn!("Timeout waiting for server response"); - Error::BadServerResponse("Timeout waiting for server response") - })?; - drop(permit); - - response + ) -> SendFederationRequestBuilder<'a, T> { + SendFederationRequestBuilder { + destination, + request, + log_errors: LogRequestError::Yes, + backoff_on: BackoffOn::AllExceptWellFormed, + } } /// Sends a request to an appservice @@ -723,6 +713,89 @@ impl Service { } } +impl SendFederationRequestBuilder<'_, T> { + /// Enable or disable automatically logging any error making this request. + /// + /// This should be disabled if the error is going to be logged elsewhere, + /// to avoid cluttering logs with duplicate error messages. + pub(crate) fn log_errors(mut self, log_errors: LogRequestError) -> Self { + self.log_errors = log_errors; + self + } + + /// Set the types of errors that will cause us to backoff future requests to + /// this server globally. + pub(crate) fn backoff_on(mut self, backoff_on: BackoffOn) -> Self { + self.backoff_on = backoff_on; + self + } +} + +impl<'a, T> IntoFuture for SendFederationRequestBuilder<'a, T> +where + T: OutgoingRequest + Send + Debug + 'a, + T::IncomingResponse: Send, +{ + // TODO: get rid of the Box once impl_trait_in_assoc_type is stable + // + type IntoFuture = Pin + Send + 'a>>; + type Output = Result; + + #[tracing::instrument( + name = "send_federation_request", + skip(self), + fields(destination = %self.destination) + )] + fn into_future(self) -> Self::IntoFuture { + Box::pin(async move { + debug!("Waiting for permit"); + let permit = services().sending.maximum_requests.acquire().await; + debug!("Got permit"); + + let backoff_guard = + services().server_backoff.server_ready(self.destination)?; + + let response = tokio::time::timeout( + Duration::from_secs(2 * 60), + server_server::send_request( + self.destination, + self.request, + self.log_errors, + AllowLoopbackRequests::No, + ), + ) + .await + .map_err(|_| { + warn!("Timeout waiting for server response"); + Error::BadServerResponse("Timeout waiting for server response") + }) + .and_then(|result| result); + drop(permit); + + match &response { + Err(Error::Federation(_, error)) => { + if error.error_kind().is_some() { + if let BackoffOn::AllExceptWellFormed = self.backoff_on + { + backoff_guard.soft_failure(); + } else { + backoff_guard.hard_failure(); + } + } else { + // The error wasn't in the expected format for matrix + // API responses. + backoff_guard.hard_failure(); + } + } + Err(_) => backoff_guard.hard_failure(), + Ok(_) => backoff_guard.success(), + } + + response + }) + } +} + #[tracing::instrument(skip(events))] async fn handle_appservice_event( id: &str, @@ -894,26 +967,29 @@ async fn handle_federation_event( } } - let permit = services().sending.maximum_requests.acquire().await; - - let response = server_server::send_request( - server, - send_transaction_message::v1::Request { - origin: services().globals.server_name().to_owned(), - pdus: pdu_jsons, - edus: edu_jsons, - origin_server_ts: MilliSecondsSinceUnixEpoch::now(), - transaction_id: general_purpose::URL_SAFE_NO_PAD - .encode(calculate_hash(events.iter().map(|e| match e { - SendingEventType::Edu(b) => b.json().get().as_bytes(), - SendingEventType::Pdu(b) => b.as_bytes(), - }))) - .into(), - }, - LogRequestError::No, - AllowLoopbackRequests::No, - ) - .await?; + let response = services() + .sending + .send_federation_request( + server, + send_transaction_message::v1::Request { + origin: services().globals.server_name().to_owned(), + pdus: pdu_jsons, + edus: edu_jsons, + origin_server_ts: MilliSecondsSinceUnixEpoch::now(), + transaction_id: general_purpose::URL_SAFE_NO_PAD + .encode(calculate_hash(events.iter().map(|e| match e { + SendingEventType::Edu(b) => b.json().get().as_bytes(), + SendingEventType::Pdu(b) => b.as_bytes(), + }))) + .into(), + }, + ) + // The spec states that this endpoint should always return success, even + // if individual PDUs fail. If we get an error, something is wrong. + .backoff_on(BackoffOn::AllErrors) + // The error will be logged in `handle_response` + .log_errors(LogRequestError::No) + .await?; for pdu in response.pdus { if let (event_id, Err(error)) = pdu { @@ -921,8 +997,6 @@ async fn handle_federation_event( } } - drop(permit); - Ok(()) } diff --git a/src/service/server_backoff.rs b/src/service/server_backoff.rs new file mode 100644 index 00000000..564185e2 --- /dev/null +++ b/src/service/server_backoff.rs @@ -0,0 +1,322 @@ +use std::{ + collections::HashMap, + sync::{Arc, Mutex, RwLock}, + time::{Duration, Instant}, +}; + +use rand::{thread_rng, Rng}; +use ruma::{OwnedServerName, ServerName}; +use tracing::{debug, error, info, instrument}; + +use crate::{observability::METRICS, services, Error, Result}; + +/// Service to handle backing off requests to offline servers. +/// +/// Matrix is full of servers that are either temporarily or permanently +/// offline. It's important not to flood offline servers with federation +/// traffic, since this can consume resources on both ends. +/// +/// To limit traffic to offline servers, we track a global exponential backoff +/// state for federation requests to each server name. This mechanism is *only* +/// intended to handle offline servers. Rate limiting and backoff retries for +/// specific requests have different considerations and need to be handled +/// elsewhere. +/// +/// Exponential backoff is typically used in a retry loop for a single request. +/// Because the state of this backoff is global, and requests may be issued +/// concurrently, we do a couple of unusual things: +/// +/// First, we wait for a certain number of consecutive failed requests before we +/// start delaying further requests. This is to avoid delaying requests to a +/// server that is not offline but fails on a small fraction of requests. +/// +/// Second, we only increment the failure counter once for every batch of +/// concurrent requests, instead of on every failed request. This avoids rapidly +/// increasing the counter, proportional to the rate of outgoing requests, when +/// the server is only briefly offline. +pub(crate) struct Service { + servers: RwLock>>>, + + server_counts: Mutex, +} + +/// Guard to record the result of an attempted request to a server. +/// +/// If the request succeeds, call [`BackoffGuard::success`]. If the request +/// fails in a way that indicates the server is unavailble, call +/// [`BackoffGuard::hard_failure`]. If the request fails in a way that doesn't +/// necessarily indicate that the server is unavailable, call +/// [`BackoffGuard::soft_failure`]. Note that this choice is security-sensitive. +/// If an attacker is able to trigger hard failures for an online server, they +/// can cause us to incorrectly mark it as offline and block outgoing requests +/// to it. +#[must_use] +pub(crate) struct BackoffGuard { + result_recorded: bool, + backoff: Arc>, + /// Store the last failure timestamp observed when this request started. If + /// there was another failure recorded since the request started, do not + /// increment the failure count. This ensures that only one failure will + /// be recorded for every batch of concurrent requests, as discussed in + /// the documentation of [`Service`]. + last_failure: Option, +} + +/// State of exponential backoff for a specific server. +#[derive(Clone, Debug)] +struct BackoffState { + server_name: OwnedServerName, + + /// Count of consecutive failed requests to this server. + failure_count: u8, + /// Timestamp of the last failed request to this server. + last_failure: Option, + /// Random multiplier to request delay. + /// + /// This is updated to a new random value after each batch of concurrent + /// requests containing a failure. + jitter_coeff: f64, +} + +/// State transitions for a single server +#[derive(Debug, Copy, Clone)] +enum Transition { + /// A new server, marked as online by default + New, + OnlineToOffline, + OfflineToOnline, +} + +/// Counts of known servers in each state, used for metrics +#[derive(Debug, Copy, Clone, Default)] +struct ServerCounts { + online_count: u64, + offline_count: u64, +} + +impl Service { + pub(crate) fn build() -> Arc { + Arc::new(Service { + servers: RwLock::default(), + server_counts: Mutex::default(), + }) + } + + /// If ready to attempt another request to a server, returns a guard to + /// record the result. + /// + /// If still in the backoff period for this server, returns `Err`. + #[instrument(skip(self))] + pub(crate) fn server_ready( + &self, + server_name: &ServerName, + ) -> Result { + let state = self.server_state(server_name); + + let last_failure = { + let state_lock = state.read().unwrap(); + + if let Some(remaining_delay) = state_lock.remaining_delay() { + debug!(failures = %state_lock.failure_count, ?remaining_delay, "backing off from server"); + return Err(Error::ServerBackoff { + server: server_name.to_owned(), + remaining_delay, + }); + } + + state_lock.last_failure + }; + + Ok(BackoffGuard { + result_recorded: false, + backoff: state, + last_failure, + }) + } + + fn record_transition( + &self, + server_name: &ServerName, + transition: Transition, + ) { + let mut counts = self.server_counts.lock().unwrap(); + + match transition { + Transition::New => { + info!( + %server_name, + "new remote server, marked as online by default" + ); + counts.online_count += 1; + } + Transition::OnlineToOffline => { + info!( + %server_name, + "remote server transitioned from online to offline" + ); + counts.online_count -= 1; + counts.offline_count += 1; + } + Transition::OfflineToOnline => { + info!( + %server_name, + "remote server transitioned from offline to online" + ); + counts.offline_count -= 1; + counts.online_count += 1; + } + } + + METRICS.record_remote_server_count( + counts.online_count, + counts.offline_count, + ); + } + + fn server_state( + &self, + server_name: &ServerName, + ) -> Arc> { + let servers = self.servers.read().unwrap(); + if let Some(state) = servers.get(server_name) { + Arc::clone(state) + } else { + drop(servers); + let mut servers = self.servers.write().unwrap(); + + // We have to check again because it's possible for another thread + // to write in between us dropping the read lock and taking the + // write lock. + if let Some(state) = servers.get(server_name) { + Arc::clone(state) + } else { + let state = Arc::new(RwLock::new(BackoffState::new( + server_name.to_owned(), + ))); + servers.insert(server_name.to_owned(), Arc::clone(&state)); + self.record_transition(server_name, Transition::New); + state + } + } + } +} + +impl BackoffState { + fn new(server_name: OwnedServerName) -> BackoffState { + BackoffState { + server_name, + failure_count: 0, + last_failure: None, + jitter_coeff: 0.0, + } + } + + /// Returns the remaining time before ready to attempt another request to + /// this server. + fn remaining_delay(&self) -> Option { + let config = &services().globals.config.federation.backoff; + + let last_failure = self.last_failure?; + if self.failure_count <= config.failure_threshold { + return None; + } + + let excess_failure_count = + self.failure_count - config.failure_threshold; + let delay_secs = config.max_delay.min( + config.base_delay + * config.multiplier.powi(i32::from(excess_failure_count)), + ) * self.jitter_coeff; + let delay = Duration::from_secs_f64(delay_secs); + delay.checked_sub(last_failure.elapsed()) + } + + /// Returns whether this server is marked as online (no backoff delay). + fn is_online(&self) -> bool { + let config = &services().globals.config.federation.backoff; + self.failure_count <= config.failure_threshold + } +} + +impl BackoffGuard { + /// Record a successful request. + #[instrument(skip(self))] + pub(crate) fn success(mut self) { + self.result_recorded = true; + + let mut state = self.backoff.write().unwrap(); + let was_online = state.is_online(); + + if state.failure_count != 0 { + debug!( + server_name = %&state.server_name, + "successful request to server, resetting failure count" + ); + } + + state.failure_count = 0; + + // Server is always online after setting failure_count = 0 + if !was_online { + services().server_backoff.record_transition( + &state.server_name, + Transition::OfflineToOnline, + ); + } + } + + /// Record a failed request indicating that the server may be unavailable. + /// + /// Examples of failures in this category are a timeout, a 500 status, or + /// a 404 from an endpoint that is not specced to return 404. + #[instrument(skip(self))] + pub(crate) fn hard_failure(mut self) { + self.result_recorded = true; + + let config = &services().globals.config.federation.backoff; + + let mut state = self.backoff.write().unwrap(); + let was_online = state.is_online(); + + if state.last_failure == self.last_failure { + state.failure_count = state.failure_count.saturating_add(1); + state.jitter_coeff = + thread_rng().gen_range(config.jitter_range.clone()); + state.last_failure = Some(Instant::now()); + + debug!( + server_name = %state.server_name, + failure_count = state.failure_count, + "hard failure sending request to server, incrementing failure count" + ); + + if state.is_online() != was_online { + services().server_backoff.record_transition( + &state.server_name, + Transition::OnlineToOffline, + ); + } + } + } + + /// Record a request that failed, but where the failure is likely to occur + /// in normal operation even if the server is not unavailable. + /// + /// An example of a failure in this category is 404 from querying a user + /// profile. This might occur if the server no longer exists, but will also + /// occur if the userid doesn't exist. + #[instrument(skip(self))] + pub(crate) fn soft_failure(mut self) { + self.result_recorded = true; + } +} + +impl Drop for BackoffGuard { + fn drop(&mut self) { + if !self.result_recorded { + error!( + "BackoffGuard dropped without recording result. This is a bug." + ); + } + } +} diff --git a/src/utils/error.rs b/src/utils/error.rs index 49b5ab15..d4e1e417 100644 --- a/src/utils/error.rs +++ b/src/utils/error.rs @@ -1,4 +1,4 @@ -use std::convert::Infallible; +use std::{convert::Infallible, time::Duration}; use http::StatusCode; use ruma::{ @@ -84,6 +84,13 @@ pub(crate) enum Error { UnsupportedRoomVersion(ruma::RoomVersionId), #[error("{0} in {1}")] InconsistentRoomState(&'static str, ruma::OwnedRoomId), + #[error( + "backing off requests to {server} for the next {remaining_delay:?}" + )] + ServerBackoff { + server: OwnedServerName, + remaining_delay: Duration, + }, } impl Error {