grapevine/src/service.rs
Olivia Lee 9b22c9b40b
add service for tracking backoffs to offline servers
Currently we have some exponential backoff logic scattered in different
locations, with multiple distinct bad implementations. We should
centralize backoff logic in one place and actually do it correctly.

This backoff logic is similar to synapse's implementation[1], with a
couple fixes:

 - we wait until we observe 5 consecutive failures before we start
   delaying requests, to avoid being sensitive to a small fraction of
   failed requests on an otherwise healthy server.
 - synapse's implementation is kinda similar to our "only increment the
   failure count once per batch of concurrent requests" behavoir, where
   they base the retry state written to the store on the state observed
   at the beginning of the request, rather on the state observed at the
   end of the request. Their implementation has a bug, where a success
   will be ignored if a failure occurs in the same batch. We do not
   replicate this bug.

Our parameter choices are significantly less aggressive than synapse[2], which
starts at 10m delay, has a multiplier of 2, and saturates at 4d delay.

[1]: 70b0e38603/synapse/util/retryutils.py
[2]: 70b0e38603/synapse/config/federation.py (L83)
2024-11-16 20:13:09 -08:00

139 lines
4.6 KiB
Rust

use std::sync::{Arc, OnceLock};
use crate::{observability::FilterReloadHandles, Config, Result};
pub(crate) mod account_data;
pub(crate) mod admin;
pub(crate) mod appservice;
pub(crate) mod globals;
pub(crate) mod key_backups;
pub(crate) mod media;
pub(crate) mod pdu;
pub(crate) mod pusher;
pub(crate) mod rooms;
pub(crate) mod sending;
pub(crate) mod server_backoff;
pub(crate) mod transaction_ids;
pub(crate) mod uiaa;
pub(crate) mod users;
static SERVICES: OnceLock<&'static Services> = OnceLock::new();
/// Convenient access to the global [`Services`] instance
pub(crate) fn services() -> &'static Services {
SERVICES.get().expect("`Services::install` should have been called first")
}
pub(crate) struct Services {
pub(crate) appservice: appservice::Service,
pub(crate) pusher: pusher::Service,
pub(crate) rooms: rooms::Service,
pub(crate) transaction_ids: transaction_ids::Service,
pub(crate) uiaa: uiaa::Service,
pub(crate) users: users::Service,
pub(crate) account_data: account_data::Service,
pub(crate) admin: Arc<admin::Service>,
pub(crate) globals: globals::Service,
pub(crate) key_backups: key_backups::Service,
pub(crate) media: media::Service,
pub(crate) server_backoff: Arc<server_backoff::Service>,
pub(crate) sending: Arc<sending::Service>,
}
impl Services {
#[allow(clippy::too_many_lines)]
pub(crate) fn new<
D: appservice::Data
+ pusher::Data
+ rooms::Data
+ transaction_ids::Data
+ uiaa::Data
+ users::Data
+ account_data::Data
+ globals::Data
+ key_backups::Data
+ media::Data
+ sending::Data
+ 'static,
>(
db: &'static D,
config: Config,
reload_handles: Option<FilterReloadHandles>,
) -> Result<Self> {
Ok(Self {
appservice: appservice::Service::new(db)?,
pusher: pusher::Service {
db,
},
rooms: rooms::Service {
alias: rooms::alias::Service::new(db),
auth_chain: rooms::auth_chain::Service::new(
db,
config.cache.auth_chain,
),
directory: db,
edus: rooms::edus::Service {
read_receipt: db,
typing: rooms::edus::typing::Service::new(),
},
event_handler: rooms::event_handler::Service,
lazy_loading: rooms::lazy_loading::Service::new(db),
metadata: db,
outlier: db,
pdu_metadata: rooms::pdu_metadata::Service {
db,
},
search: db,
short: rooms::short::Service::new(
db,
config.cache.short_eventid,
config.cache.eventid_short,
config.cache.statekey_short,
config.cache.short_statekey,
),
state: rooms::state::Service {
db,
},
state_accessor: rooms::state_accessor::Service::new(
db,
config.cache.server_visibility,
config.cache.user_visibility,
),
state_cache: rooms::state_cache::Service::new(db),
state_compressor: rooms::state_compressor::Service::new(
db,
config.cache.state_info,
),
timeline: rooms::timeline::Service::new(db, config.cache.pdu),
threads: rooms::threads::Service {
db,
},
spaces: rooms::spaces::Service::new(
config.cache.roomid_spacechunk,
),
user: db,
},
transaction_ids: db,
uiaa: uiaa::Service::new(db),
users: users::Service::new(db),
account_data: db,
admin: admin::Service::new(),
key_backups: db,
media: media::Service {
db,
},
server_backoff: server_backoff::Service::build(),
sending: sending::Service::new(db, &config),
globals: globals::Service::new(db, config, reload_handles)?,
})
}
/// Installs `self` to be globally accessed via [`services`]
pub(crate) fn install(self) {
assert!(
SERVICES.set(Box::leak(Box::new(self))).is_ok(),
"Services::install was called more than once"
);
}
}