use std::{ collections::HashMap, sync::{Arc, Mutex, RwLock}, time::{Duration, Instant}, }; use rand::{thread_rng, Rng}; use ruma::{OwnedServerName, ServerName}; use tracing::{debug, error, info, instrument}; use crate::{observability::METRICS, services, Error, Result}; /// Service to handle backing off requests to offline servers. /// /// Matrix is full of servers that are either temporarily or permanently /// offline. It's important not to flood offline servers with federation /// traffic, since this can consume resources on both ends. /// /// To limit traffic to offline servers, we track a global exponential backoff /// state for federation requests to each server name. This mechanism is *only* /// intended to handle offline servers. Rate limiting and backoff retries for /// specific requests have different considerations and need to be handled /// elsewhere. /// /// Exponential backoff is typically used in a retry loop for a single request. /// Because the state of this backoff is global, and requests may be issued /// concurrently, we do a couple of unusual things: /// /// First, we wait for a certain number of consecutive failed requests before we /// start delaying further requests. This is to avoid delaying requests to a /// server that is not offline but fails on a small fraction of requests. /// /// Second, we only increment the failure counter once for every batch of /// concurrent requests, instead of on every failed request. This avoids rapidly /// increasing the counter, proportional to the rate of outgoing requests, when /// the server is only briefly offline. pub(crate) struct Service { servers: RwLock>>>, server_counts: Mutex, } /// Guard to record the result of an attempted request to a server. /// /// If the request succeeds, call [`BackoffGuard::success`]. If the request /// fails in a way that indicates the server is unavailble, call /// [`BackoffGuard::hard_failure`]. If the request fails in a way that doesn't /// necessarily indicate that the server is unavailable, call /// [`BackoffGuard::soft_failure`]. Note that this choice is security-sensitive. /// If an attacker is able to trigger hard failures for an online server, they /// can cause us to incorrectly mark it as offline and block outgoing requests /// to it. #[must_use] pub(crate) struct BackoffGuard { result_recorded: bool, backoff: Arc>, /// Store the last failure timestamp observed when this request started. If /// there was another failure recorded since the request started, do not /// increment the failure count. This ensures that only one failure will /// be recorded for every batch of concurrent requests, as discussed in /// the documentation of [`Service`]. last_failure: Option, } /// State of exponential backoff for a specific server. #[derive(Clone, Debug)] struct BackoffState { server_name: OwnedServerName, /// Count of consecutive failed requests to this server. failure_count: u8, /// Timestamp of the last failed request to this server. last_failure: Option, /// Random multiplier to request delay. /// /// This is updated to a new random value after each batch of concurrent /// requests containing a failure. jitter_coeff: f64, } /// State transitions for a single server #[derive(Debug, Copy, Clone)] enum Transition { /// A new server, marked as online by default New, OnlineToOffline, OfflineToOnline, } /// Counts of known servers in each state, used for metrics #[derive(Debug, Copy, Clone, Default)] struct ServerCounts { online_count: u64, offline_count: u64, } impl Service { pub(crate) fn build() -> Arc { Arc::new(Service { servers: RwLock::default(), server_counts: Mutex::default(), }) } /// If ready to attempt another request to a server, returns a guard to /// record the result. /// /// If still in the backoff period for this server, returns `Err`. #[instrument(skip(self))] pub(crate) fn server_ready( &self, server_name: &ServerName, ) -> Result { let state = self.server_state(server_name); let last_failure = { let state_lock = state.read().unwrap(); if let Some(remaining_delay) = state_lock.remaining_delay() { debug!(failures = %state_lock.failure_count, ?remaining_delay, "backing off from server"); return Err(Error::ServerBackoff { server: server_name.to_owned(), remaining_delay, }); } state_lock.last_failure }; Ok(BackoffGuard { result_recorded: false, backoff: state, last_failure, }) } fn record_transition( &self, server_name: &ServerName, transition: Transition, ) { let mut counts = self.server_counts.lock().unwrap(); match transition { Transition::New => { info!( %server_name, "new remote server, marked as online by default" ); counts.online_count += 1; } Transition::OnlineToOffline => { info!( %server_name, "remote server transitioned from online to offline" ); counts.online_count -= 1; counts.offline_count += 1; } Transition::OfflineToOnline => { info!( %server_name, "remote server transitioned from offline to online" ); counts.offline_count -= 1; counts.online_count += 1; } } METRICS.record_remote_server_count( counts.online_count, counts.offline_count, ); } fn server_state( &self, server_name: &ServerName, ) -> Arc> { let servers = self.servers.read().unwrap(); if let Some(state) = servers.get(server_name) { Arc::clone(state) } else { drop(servers); let mut servers = self.servers.write().unwrap(); // We have to check again because it's possible for another thread // to write in between us dropping the read lock and taking the // write lock. if let Some(state) = servers.get(server_name) { Arc::clone(state) } else { let state = Arc::new(RwLock::new(BackoffState::new( server_name.to_owned(), ))); servers.insert(server_name.to_owned(), Arc::clone(&state)); self.record_transition(server_name, Transition::New); state } } } } impl BackoffState { fn new(server_name: OwnedServerName) -> BackoffState { BackoffState { server_name, failure_count: 0, last_failure: None, jitter_coeff: 0.0, } } /// Returns the remaining time before ready to attempt another request to /// this server. fn remaining_delay(&self) -> Option { let config = &services().globals.config.federation.backoff; let last_failure = self.last_failure?; if self.failure_count <= config.failure_threshold { return None; } let excess_failure_count = self.failure_count - config.failure_threshold; let delay_secs = config.max_delay.min( config.base_delay * config.multiplier.powi(i32::from(excess_failure_count)), ) * self.jitter_coeff; let delay = Duration::from_secs_f64(delay_secs); delay.checked_sub(last_failure.elapsed()) } /// Returns whether this server is marked as online (no backoff delay). fn is_online(&self) -> bool { let config = &services().globals.config.federation.backoff; self.failure_count <= config.failure_threshold } } impl BackoffGuard { /// Record a successful request. #[instrument(skip(self))] pub(crate) fn success(mut self) { self.result_recorded = true; let mut state = self.backoff.write().unwrap(); let was_online = state.is_online(); if state.failure_count != 0 { debug!( server_name = %&state.server_name, "successful request to server, resetting failure count" ); } state.failure_count = 0; // Server is always online after setting failure_count = 0 if !was_online { services().server_backoff.record_transition( &state.server_name, Transition::OfflineToOnline, ); } } /// Record a failed request indicating that the server may be unavailable. /// /// Examples of failures in this category are a timeout, a 500 status, or /// a 404 from an endpoint that is not specced to return 404. #[instrument(skip(self))] pub(crate) fn hard_failure(mut self) { self.result_recorded = true; let config = &services().globals.config.federation.backoff; let mut state = self.backoff.write().unwrap(); let was_online = state.is_online(); if state.last_failure == self.last_failure { state.failure_count = state.failure_count.saturating_add(1); state.jitter_coeff = thread_rng().gen_range(config.jitter_range.clone()); state.last_failure = Some(Instant::now()); debug!( server_name = %state.server_name, failure_count = state.failure_count, "hard failure sending request to server, incrementing failure count" ); if state.is_online() != was_online { services().server_backoff.record_transition( &state.server_name, Transition::OnlineToOffline, ); } } } /// Record a request that failed, but where the failure is likely to occur /// in normal operation even if the server is not unavailable. /// /// An example of a failure in this category is 404 from querying a user /// profile. This might occur if the server no longer exists, but will also /// occur if the userid doesn't exist. #[instrument(skip(self))] pub(crate) fn soft_failure(mut self) { self.result_recorded = true; } } impl Drop for BackoffGuard { fn drop(&mut self) { if !self.result_recorded { error!( "BackoffGuard dropped without recording result. This is a bug." ); } } }