Merge branch 'benjamin/refactor-backoff' into 'main'

Centralize backoff logic for offline servers

See merge request matrix/grapevine!70
This commit is contained in:
Olivia Lee 2024-11-17 05:37:57 +00:00
commit eaf349d92c
9 changed files with 595 additions and 137 deletions

View file

@ -194,6 +194,7 @@ This will be the first release of Grapevine since it was forked from Conduit
([!96](https://gitlab.computer.surgery/matrix/grapevine/-/merge_requests/96))
18. Fixed incoming HTTP/2 requests failing federation signature check.
([!104](https://gitlab.computer.surgery/matrix/grapevine/-/merge_requests/104))
<<<<<<< HEAD
19. Return 403 instead of 500 when joins to a local-only room are denied.
Consequently fixes Heisenbridge being unable to join puppeted users to its
rooms ([#85](https://gitlab.computer.surgery/matrix/grapevine/-/issues/85)).
@ -201,6 +202,14 @@ This will be the first release of Grapevine since it was forked from Conduit
20. Fix handling of v11 rooms with `m.room.create` event content that passes
the authorization rules but doesn't match other parts of the spec.
([!139](https://gitlab.computer.surgery/matrix/grapevine/-/merge_requests/139))
||||||| parent of 963519ec (add changelog entry for global offline server backoff)
=======
19. Remove buggy backoff implementation for remote device key queries that
failed to reset the backoff delay after a successful request. This caused
an increasing rate of key query failures (and therefore UTD messages) over
time until a restart.
([!70](https://gitlab.computer.surgery/matrix/grapevine/-/merge_requests/70))
>>>>>>> 963519ec (add changelog entry for global offline server backoff)
### Added
@ -279,3 +288,6 @@ This will be the first release of Grapevine since it was forked from Conduit
([!121](https://gitlab.computer.surgery/matrix/grapevine/-/merge_requests/121))
25. Add configuration options to tune the value of each cache individually.
([!124](https://gitlab.computer.surgery/matrix/grapevine/-/merge_requests/124))
26. Attempt to detect offline remote servers and back off all federation
requests to them.
([!70](https://gitlab.computer.surgery/matrix/grapevine/-/merge_requests/70))

View file

@ -1,6 +1,7 @@
use std::{
collections::{hash_map, BTreeMap, HashMap, HashSet},
time::{Duration, Instant},
collections::{BTreeMap, HashMap, HashSet},
future::IntoFuture,
time::Duration,
};
use futures_util::{stream::FuturesUnordered, StreamExt};
@ -385,47 +386,9 @@ pub(crate) async fn get_keys_helper<F: Fn(&UserId) -> bool>(
let mut failures = BTreeMap::new();
let back_off = |id| async {
match services().globals.bad_query_ratelimiter.write().await.entry(id) {
hash_map::Entry::Vacant(e) => {
e.insert((Instant::now(), 1));
}
hash_map::Entry::Occupied(mut e) => {
*e.get_mut() = (Instant::now(), e.get().1 + 1);
}
}
};
let mut futures: FuturesUnordered<_> = get_over_federation
.into_iter()
.map(|(server, vec)| async move {
if let Some((time, tries)) = services()
.globals
.bad_query_ratelimiter
.read()
.await
.get(server)
{
// Exponential backoff
let mut min_elapsed_duration =
Duration::from_secs(30) * (*tries) * (*tries);
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) {
min_elapsed_duration = Duration::from_secs(60 * 60 * 24);
}
if let Some(remaining) =
min_elapsed_duration.checked_sub(time.elapsed())
{
debug!(%server, %tries, ?remaining, "Backing off from server");
return (
server,
Err(Error::BadServerResponse(
"bad query, still backing off",
)),
);
}
}
let mut device_keys_input_fed = BTreeMap::new();
for (user_id, keys) in vec {
device_keys_input_fed.insert(user_id.to_owned(), keys.clone());
@ -436,12 +399,15 @@ pub(crate) async fn get_keys_helper<F: Fn(&UserId) -> bool>(
server,
tokio::time::timeout(
Duration::from_secs(25),
services().sending.send_federation_request(
services()
.sending
.send_federation_request(
server,
federation::keys::get_keys::v1::Request {
device_keys: device_keys_input_fed,
},
),
)
.into_future(),
)
.await
.map_err(|_e| Error::BadServerResponse("Query took too long"))
@ -454,7 +420,6 @@ pub(crate) async fn get_keys_helper<F: Fn(&UserId) -> bool>(
let response = match response {
Ok(response) => response,
Err(error) => {
back_off(server.to_owned()).await;
debug!(%server, %error, "remote device key query failed");
failures.insert(server.to_string(), json!({}));
continue;

View file

@ -12,7 +12,7 @@ use ruma::{
api::federation::discovery::OldVerifyKey, OwnedServerName,
OwnedServerSigningKeyId, RoomVersionId,
};
use serde::Deserialize;
use serde::{Deserialize, Deserializer};
use strum::{Display, EnumIter, IntoEnumIterator};
use crate::error;
@ -407,6 +407,7 @@ pub(crate) struct FederationConfig {
pub(crate) max_fetch_prev_events: u16,
pub(crate) max_concurrent_requests: u16,
pub(crate) old_verify_keys: BTreeMap<OwnedServerSigningKeyId, OldVerifyKey>,
pub(crate) backoff: BackoffConfig,
}
impl Default for FederationConfig {
@ -420,6 +421,44 @@ impl Default for FederationConfig {
max_fetch_prev_events: 100,
max_concurrent_requests: 100,
old_verify_keys: BTreeMap::new(),
backoff: BackoffConfig::default(),
}
}
}
#[derive(Debug, Deserialize)]
#[serde(default)]
pub(crate) struct BackoffConfig {
/// Minimum number of consecutive failures for a server before starting to
/// delay requests.
pub(crate) failure_threshold: u8,
/// Initial delay between requests in seconds, after the number of
/// consecutive failures to a server first exceeds the threshold.
pub(crate) base_delay: f64,
/// Factor to increase delay by after each additional consecutive failure.
pub(crate) multiplier: f64,
/// Maximum delay between requests to a server in seconds.
pub(crate) max_delay: f64,
/// Range of random multipliers to request delay.
#[serde(deserialize_with = "deserialize_jitter_range")]
pub(crate) jitter_range: std::ops::Range<f64>,
}
impl Default for BackoffConfig {
fn default() -> Self {
// After the first 3 consecutive failed requests, increase delay
// exponentially from 5s to 24h over the next 24 failures. It takes an
// average of 4.3 days of failures to reach the maximum delay of 24h.
Self {
failure_threshold: 3,
base_delay: 5.0,
multiplier: 1.5,
max_delay: 60.0 * 60.0 * 24.0,
jitter_range: 0.5..1.5,
}
}
}
@ -482,6 +521,24 @@ pub(crate) fn default_default_room_version() -> RoomVersionId {
RoomVersionId::V10
}
fn deserialize_jitter_range<'de, D>(
deserializer: D,
) -> Result<std::ops::Range<f64>, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
let Some((a, b)) = s.split_once("..") else {
return Err(serde::de::Error::custom(crate::Error::bad_config(
"invalid jitter range",
)));
};
a.parse()
.and_then(|a| b.parse().map(|b| a..b))
.map_err(serde::de::Error::custom)
}
/// Search default locations for a configuration file
///
/// If one isn't found, the list of tried paths is returned.

View file

@ -290,6 +290,9 @@ pub(crate) struct Metrics {
/// Number of entries in an
/// [`OnDemandHashMap`](crate::utils::on_demand_hashmap::OnDemandHashMap)
on_demand_hashmap_size: opentelemetry::metrics::Gauge<u64>,
/// Number of known remote servers in each state (online or offline)
remote_server_count: opentelemetry::metrics::Gauge<u64>,
}
impl Metrics {
@ -345,11 +348,17 @@ impl Metrics {
.with_description("Number of entries in OnDemandHashMap")
.init();
let remote_server_count = meter
.u64_gauge("remote_server_count")
.with_description("Number of known remote servers")
.init();
Metrics {
otel_state: (registry, provider),
http_requests_histogram,
lookup,
on_demand_hashmap_size,
remote_server_count,
}
}
@ -384,6 +393,18 @@ impl Metrics {
&[KeyValue::new("name", name)],
);
}
/// Record number of remote servers marked online or offline.
pub(crate) fn record_remote_server_count(
&self,
online_count: u64,
offline_count: u64,
) {
self.remote_server_count
.record(online_count, &[KeyValue::new("state", "online")]);
self.remote_server_count
.record(offline_count, &[KeyValue::new("state", "offline")]);
}
}
/// Track HTTP metrics by converting this into an [`axum`] layer

View file

@ -12,6 +12,7 @@ pub(crate) mod pdu;
pub(crate) mod pusher;
pub(crate) mod rooms;
pub(crate) mod sending;
pub(crate) mod server_backoff;
pub(crate) mod transaction_ids;
pub(crate) mod uiaa;
pub(crate) mod users;
@ -35,6 +36,7 @@ pub(crate) struct Services {
pub(crate) globals: globals::Service,
pub(crate) key_backups: key_backups::Service,
pub(crate) media: media::Service,
pub(crate) server_backoff: Arc<server_backoff::Service>,
pub(crate) sending: Arc<sending::Service>,
}
@ -120,6 +122,7 @@ impl Services {
media: media::Service {
db,
},
server_backoff: server_backoff::Service::build(),
sending: sending::Service::new(db, &config),
globals: globals::Service::new(db, config, reload_handles)?,

View file

@ -84,8 +84,6 @@ pub(crate) struct Service {
Arc<RwLock<HashMap<OwnedEventId, RateLimitState>>>,
pub(crate) bad_signature_ratelimiter:
Arc<RwLock<HashMap<Vec<String>, RateLimitState>>>,
pub(crate) bad_query_ratelimiter:
Arc<RwLock<HashMap<OwnedServerName, RateLimitState>>>,
pub(crate) servername_ratelimiter:
OnDemandHashMap<OwnedServerName, Semaphore>,
pub(crate) roomid_mutex_insert: TokenSet<OwnedRoomId, marker::Insert>,
@ -278,7 +276,6 @@ impl Service {
admin_bot_room_alias_id,
bad_event_ratelimiter: Arc::new(RwLock::new(HashMap::new())),
bad_signature_ratelimiter: Arc::new(RwLock::new(HashMap::new())),
bad_query_ratelimiter: Arc::new(RwLock::new(HashMap::new())),
servername_ratelimiter: OnDemandHashMap::new(
"servername_ratelimiter".to_owned(),
),

View file

@ -1,8 +1,10 @@
use std::{
collections::{BTreeMap, HashMap, HashSet},
fmt::Debug,
future::{Future, IntoFuture},
pin::Pin,
sync::Arc,
time::{Duration, Instant},
time::Duration,
};
use base64::{engine::general_purpose, Engine as _};
@ -116,6 +118,31 @@ pub(crate) struct RequestData {
requester_span: Span,
}
/// Which types of errors should cause us to backoff requests to this server
/// globally.
///
/// The default is [`BackoffOn::AllExceptWellFormed`], which is conservative,
/// with a high false negative rate and low false positive rate. For endpoints
/// where we have additional information, we should pick a less conservative
/// setting.
#[derive(Copy, Clone, Debug)]
pub(crate) enum BackoffOn {
/// All errors except for error responses that match the expected
/// `{ "errcode": ... }` format for the matrix protocol.
AllExceptWellFormed,
/// All errors
AllErrors,
}
#[must_use = "The request builder must be awaited for the request to be sent"]
pub(crate) struct SendFederationRequestBuilder<'a, T> {
destination: &'a ServerName,
request: T,
log_errors: LogRequestError,
backoff_on: BackoffOn,
}
pub(crate) struct Service {
db: &'static dyn Data,
@ -128,10 +155,7 @@ pub(crate) struct Service {
#[derive(Debug)]
enum TransactionStatus {
Running,
// number of times failed, time of last failure
Failed(u32, Instant),
// number of times failed
Retrying(u32),
Failed,
}
struct HandlerInputs {
@ -266,19 +290,14 @@ impl Service {
}
if let Err(error) = result {
// Logging transactions that fail due to backoff produces a lot of
// clutter in the logs. Failure due to backoff is expected behavior,
// and the transaction will be retried later.
if !matches!(error, Error::ServerBackoff { .. }) {
warn!(%error, "Marking transaction as failed");
current_transaction_status.entry(destination).and_modify(|e| {
use TransactionStatus::{Failed, Retrying, Running};
*e = match e {
Running => Failed(1, Instant::now()),
Retrying(n) => Failed(*n + 1, Instant::now()),
Failed(..) => {
error!("Request that was not even running failed?!");
return;
}
}
});
current_transaction_status
.insert(destination, TransactionStatus::Failed);
return Ok(None);
}
@ -383,27 +402,13 @@ impl Service {
entry
.and_modify(|e| match e {
TransactionStatus::Running | TransactionStatus::Retrying(_) => {
TransactionStatus::Running => {
// already running
allow = false;
}
TransactionStatus::Failed(tries, time) => {
// Fail if a request has failed recently (exponential
// backoff)
let mut min_elapsed_duration =
Duration::from_secs(30) * (*tries) * (*tries);
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24)
{
min_elapsed_duration =
Duration::from_secs(60 * 60 * 24);
}
if time.elapsed() < min_elapsed_duration {
allow = false;
} else {
TransactionStatus::Failed => {
retry = true;
*e = TransactionStatus::Retrying(*tries);
}
*e = TransactionStatus::Running;
}
})
.or_insert(TransactionStatus::Running);
@ -667,35 +672,20 @@ impl Service {
Ok(())
}
#[tracing::instrument(skip(self, request))]
pub(crate) async fn send_federation_request<T>(
// Allowed because `SendFederationRequestBuilder::into_future` uses
// `services()`
#[allow(clippy::unused_self)]
pub(crate) fn send_federation_request<'a, T>(
&self,
destination: &ServerName,
destination: &'a ServerName,
request: T,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Debug,
{
debug!("Waiting for permit");
let permit = self.maximum_requests.acquire().await;
debug!("Got permit");
let response = tokio::time::timeout(
Duration::from_secs(2 * 60),
server_server::send_request(
) -> SendFederationRequestBuilder<'a, T> {
SendFederationRequestBuilder {
destination,
request,
LogRequestError::Yes,
AllowLoopbackRequests::No,
),
)
.await
.map_err(|_| {
warn!("Timeout waiting for server response");
Error::BadServerResponse("Timeout waiting for server response")
})?;
drop(permit);
response
log_errors: LogRequestError::Yes,
backoff_on: BackoffOn::AllExceptWellFormed,
}
}
/// Sends a request to an appservice
@ -723,6 +713,89 @@ impl Service {
}
}
impl<T> SendFederationRequestBuilder<'_, T> {
/// Enable or disable automatically logging any error making this request.
///
/// This should be disabled if the error is going to be logged elsewhere,
/// to avoid cluttering logs with duplicate error messages.
pub(crate) fn log_errors(mut self, log_errors: LogRequestError) -> Self {
self.log_errors = log_errors;
self
}
/// Set the types of errors that will cause us to backoff future requests to
/// this server globally.
pub(crate) fn backoff_on(mut self, backoff_on: BackoffOn) -> Self {
self.backoff_on = backoff_on;
self
}
}
impl<'a, T> IntoFuture for SendFederationRequestBuilder<'a, T>
where
T: OutgoingRequest + Send + Debug + 'a,
T::IncomingResponse: Send,
{
// TODO: get rid of the Box once impl_trait_in_assoc_type is stable
// <https://github.com/rust-lang/rust/issues/63063>
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
type Output = Result<T::IncomingResponse>;
#[tracing::instrument(
name = "send_federation_request",
skip(self),
fields(destination = %self.destination)
)]
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
debug!("Waiting for permit");
let permit = services().sending.maximum_requests.acquire().await;
debug!("Got permit");
let backoff_guard =
services().server_backoff.server_ready(self.destination)?;
let response = tokio::time::timeout(
Duration::from_secs(2 * 60),
server_server::send_request(
self.destination,
self.request,
self.log_errors,
AllowLoopbackRequests::No,
),
)
.await
.map_err(|_| {
warn!("Timeout waiting for server response");
Error::BadServerResponse("Timeout waiting for server response")
})
.and_then(|result| result);
drop(permit);
match &response {
Err(Error::Federation(_, error)) => {
if error.error_kind().is_some() {
if let BackoffOn::AllExceptWellFormed = self.backoff_on
{
backoff_guard.soft_failure();
} else {
backoff_guard.hard_failure();
}
} else {
// The error wasn't in the expected format for matrix
// API responses.
backoff_guard.hard_failure();
}
}
Err(_) => backoff_guard.hard_failure(),
Ok(_) => backoff_guard.success(),
}
response
})
}
}
#[tracing::instrument(skip(events))]
async fn handle_appservice_event(
id: &str,
@ -894,9 +967,9 @@ async fn handle_federation_event(
}
}
let permit = services().sending.maximum_requests.acquire().await;
let response = server_server::send_request(
let response = services()
.sending
.send_federation_request(
server,
send_transaction_message::v1::Request {
origin: services().globals.server_name().to_owned(),
@ -910,9 +983,12 @@ async fn handle_federation_event(
})))
.into(),
},
LogRequestError::No,
AllowLoopbackRequests::No,
)
// The spec states that this endpoint should always return success, even
// if individual PDUs fail. If we get an error, something is wrong.
.backoff_on(BackoffOn::AllErrors)
// The error will be logged in `handle_response`
.log_errors(LogRequestError::No)
.await?;
for pdu in response.pdus {
@ -921,8 +997,6 @@ async fn handle_federation_event(
}
}
drop(permit);
Ok(())
}

View file

@ -0,0 +1,322 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex, RwLock},
time::{Duration, Instant},
};
use rand::{thread_rng, Rng};
use ruma::{OwnedServerName, ServerName};
use tracing::{debug, error, info, instrument};
use crate::{observability::METRICS, services, Error, Result};
/// Service to handle backing off requests to offline servers.
///
/// Matrix is full of servers that are either temporarily or permanently
/// offline. It's important not to flood offline servers with federation
/// traffic, since this can consume resources on both ends.
///
/// To limit traffic to offline servers, we track a global exponential backoff
/// state for federation requests to each server name. This mechanism is *only*
/// intended to handle offline servers. Rate limiting and backoff retries for
/// specific requests have different considerations and need to be handled
/// elsewhere.
///
/// Exponential backoff is typically used in a retry loop for a single request.
/// Because the state of this backoff is global, and requests may be issued
/// concurrently, we do a couple of unusual things:
///
/// First, we wait for a certain number of consecutive failed requests before we
/// start delaying further requests. This is to avoid delaying requests to a
/// server that is not offline but fails on a small fraction of requests.
///
/// Second, we only increment the failure counter once for every batch of
/// concurrent requests, instead of on every failed request. This avoids rapidly
/// increasing the counter, proportional to the rate of outgoing requests, when
/// the server is only briefly offline.
pub(crate) struct Service {
servers: RwLock<HashMap<OwnedServerName, Arc<RwLock<BackoffState>>>>,
server_counts: Mutex<ServerCounts>,
}
/// Guard to record the result of an attempted request to a server.
///
/// If the request succeeds, call [`BackoffGuard::success`]. If the request
/// fails in a way that indicates the server is unavailble, call
/// [`BackoffGuard::hard_failure`]. If the request fails in a way that doesn't
/// necessarily indicate that the server is unavailable, call
/// [`BackoffGuard::soft_failure`]. Note that this choice is security-sensitive.
/// If an attacker is able to trigger hard failures for an online server, they
/// can cause us to incorrectly mark it as offline and block outgoing requests
/// to it.
#[must_use]
pub(crate) struct BackoffGuard {
result_recorded: bool,
backoff: Arc<RwLock<BackoffState>>,
/// Store the last failure timestamp observed when this request started. If
/// there was another failure recorded since the request started, do not
/// increment the failure count. This ensures that only one failure will
/// be recorded for every batch of concurrent requests, as discussed in
/// the documentation of [`Service`].
last_failure: Option<Instant>,
}
/// State of exponential backoff for a specific server.
#[derive(Clone, Debug)]
struct BackoffState {
server_name: OwnedServerName,
/// Count of consecutive failed requests to this server.
failure_count: u8,
/// Timestamp of the last failed request to this server.
last_failure: Option<Instant>,
/// Random multiplier to request delay.
///
/// This is updated to a new random value after each batch of concurrent
/// requests containing a failure.
jitter_coeff: f64,
}
/// State transitions for a single server
#[derive(Debug, Copy, Clone)]
enum Transition {
/// A new server, marked as online by default
New,
OnlineToOffline,
OfflineToOnline,
}
/// Counts of known servers in each state, used for metrics
#[derive(Debug, Copy, Clone, Default)]
struct ServerCounts {
online_count: u64,
offline_count: u64,
}
impl Service {
pub(crate) fn build() -> Arc<Service> {
Arc::new(Service {
servers: RwLock::default(),
server_counts: Mutex::default(),
})
}
/// If ready to attempt another request to a server, returns a guard to
/// record the result.
///
/// If still in the backoff period for this server, returns `Err`.
#[instrument(skip(self))]
pub(crate) fn server_ready(
&self,
server_name: &ServerName,
) -> Result<BackoffGuard> {
let state = self.server_state(server_name);
let last_failure = {
let state_lock = state.read().unwrap();
if let Some(remaining_delay) = state_lock.remaining_delay() {
debug!(failures = %state_lock.failure_count, ?remaining_delay, "backing off from server");
return Err(Error::ServerBackoff {
server: server_name.to_owned(),
remaining_delay,
});
}
state_lock.last_failure
};
Ok(BackoffGuard {
result_recorded: false,
backoff: state,
last_failure,
})
}
fn record_transition(
&self,
server_name: &ServerName,
transition: Transition,
) {
let mut counts = self.server_counts.lock().unwrap();
match transition {
Transition::New => {
info!(
%server_name,
"new remote server, marked as online by default"
);
counts.online_count += 1;
}
Transition::OnlineToOffline => {
info!(
%server_name,
"remote server transitioned from online to offline"
);
counts.online_count -= 1;
counts.offline_count += 1;
}
Transition::OfflineToOnline => {
info!(
%server_name,
"remote server transitioned from offline to online"
);
counts.offline_count -= 1;
counts.online_count += 1;
}
}
METRICS.record_remote_server_count(
counts.online_count,
counts.offline_count,
);
}
fn server_state(
&self,
server_name: &ServerName,
) -> Arc<RwLock<BackoffState>> {
let servers = self.servers.read().unwrap();
if let Some(state) = servers.get(server_name) {
Arc::clone(state)
} else {
drop(servers);
let mut servers = self.servers.write().unwrap();
// We have to check again because it's possible for another thread
// to write in between us dropping the read lock and taking the
// write lock.
if let Some(state) = servers.get(server_name) {
Arc::clone(state)
} else {
let state = Arc::new(RwLock::new(BackoffState::new(
server_name.to_owned(),
)));
servers.insert(server_name.to_owned(), Arc::clone(&state));
self.record_transition(server_name, Transition::New);
state
}
}
}
}
impl BackoffState {
fn new(server_name: OwnedServerName) -> BackoffState {
BackoffState {
server_name,
failure_count: 0,
last_failure: None,
jitter_coeff: 0.0,
}
}
/// Returns the remaining time before ready to attempt another request to
/// this server.
fn remaining_delay(&self) -> Option<Duration> {
let config = &services().globals.config.federation.backoff;
let last_failure = self.last_failure?;
if self.failure_count <= config.failure_threshold {
return None;
}
let excess_failure_count =
self.failure_count - config.failure_threshold;
let delay_secs = config.max_delay.min(
config.base_delay
* config.multiplier.powi(i32::from(excess_failure_count)),
) * self.jitter_coeff;
let delay = Duration::from_secs_f64(delay_secs);
delay.checked_sub(last_failure.elapsed())
}
/// Returns whether this server is marked as online (no backoff delay).
fn is_online(&self) -> bool {
let config = &services().globals.config.federation.backoff;
self.failure_count <= config.failure_threshold
}
}
impl BackoffGuard {
/// Record a successful request.
#[instrument(skip(self))]
pub(crate) fn success(mut self) {
self.result_recorded = true;
let mut state = self.backoff.write().unwrap();
let was_online = state.is_online();
if state.failure_count != 0 {
debug!(
server_name = %&state.server_name,
"successful request to server, resetting failure count"
);
}
state.failure_count = 0;
// Server is always online after setting failure_count = 0
if !was_online {
services().server_backoff.record_transition(
&state.server_name,
Transition::OfflineToOnline,
);
}
}
/// Record a failed request indicating that the server may be unavailable.
///
/// Examples of failures in this category are a timeout, a 500 status, or
/// a 404 from an endpoint that is not specced to return 404.
#[instrument(skip(self))]
pub(crate) fn hard_failure(mut self) {
self.result_recorded = true;
let config = &services().globals.config.federation.backoff;
let mut state = self.backoff.write().unwrap();
let was_online = state.is_online();
if state.last_failure == self.last_failure {
state.failure_count = state.failure_count.saturating_add(1);
state.jitter_coeff =
thread_rng().gen_range(config.jitter_range.clone());
state.last_failure = Some(Instant::now());
debug!(
server_name = %state.server_name,
failure_count = state.failure_count,
"hard failure sending request to server, incrementing failure count"
);
if state.is_online() != was_online {
services().server_backoff.record_transition(
&state.server_name,
Transition::OnlineToOffline,
);
}
}
}
/// Record a request that failed, but where the failure is likely to occur
/// in normal operation even if the server is not unavailable.
///
/// An example of a failure in this category is 404 from querying a user
/// profile. This might occur if the server no longer exists, but will also
/// occur if the userid doesn't exist.
#[instrument(skip(self))]
pub(crate) fn soft_failure(mut self) {
self.result_recorded = true;
}
}
impl Drop for BackoffGuard {
fn drop(&mut self) {
if !self.result_recorded {
error!(
"BackoffGuard dropped without recording result. This is a bug."
);
}
}
}

View file

@ -1,4 +1,4 @@
use std::convert::Infallible;
use std::{convert::Infallible, time::Duration};
use http::StatusCode;
use ruma::{
@ -84,6 +84,13 @@ pub(crate) enum Error {
UnsupportedRoomVersion(ruma::RoomVersionId),
#[error("{0} in {1}")]
InconsistentRoomState(&'static str, ruma::OwnedRoomId),
#[error(
"backing off requests to {server} for the next {remaining_delay:?}"
)]
ServerBackoff {
server: OwnedServerName,
remaining_delay: Duration,
},
}
impl Error {